import logging
from app.db.session import engine
from sqlalchemy import text, inspect
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def run_migrations():
"""
Checks for missing columns and adds them if necessary.
This is a simple idempotent migration system to handle schema updates.
"""
logger.info("Starting database migrations...")
with engine.connect() as conn:
inspector = inspect(engine)
if not inspector.has_table("messages"):
logger.info("Table 'messages' does not exist, skipping migrations (will be handled by Base.metadata.create_all).")
return
columns = [c["name"] for c in inspector.get_columns("messages")]
# List of (column_name, column_type) to ensure existence
required_columns = [
("audio_path", "TEXT"),
("model_response_time", "INTEGER"),
("token_count", "INTEGER"),
("reasoning_content", "TEXT")
]
for col_name, col_type in required_columns:
if col_name not in columns:
logger.info(f"Adding column '{col_name}' to 'messages' table...")
try:
conn.execute(text(f"ALTER TABLE messages ADD COLUMN {col_name} {col_type}"))
conn.commit()
logger.info(f"Successfully added '{col_name}'.")
except Exception as e:
logger.error(f"Failed to add column '{col_name}': {e}")
else:
logger.info(f"Column '{col_name}' already exists in 'messages'.")
# Session table migrations
session_columns = [c["name"] for c in inspector.get_columns("sessions")]
session_required_columns = [
("stt_provider_name", "TEXT"),
("tts_provider_name", "TEXT"),
# M3: Agent Node attachment
("sync_workspace_id", "TEXT"),
("attached_node_ids", "TEXT"),
("node_sync_status", "TEXT"),
("sync_config", "TEXT"),
("is_cancelled", "INTEGER DEFAULT 0"),
]
for col_name, col_type in session_required_columns:
if col_name not in session_columns:
logger.info(f"Adding column '{col_name}' to 'sessions' table...")
try:
conn.execute(text(f"ALTER TABLE sessions ADD COLUMN {col_name} {col_type}"))
conn.commit()
logger.info(f"Successfully added '{col_name}'.")
except Exception as e:
logger.error(f"Failed to add column '{col_name}': {e}")
else:
logger.info(f"Column '{col_name}' already exists in 'sessions'.")
# --- M6: Agent Node Tables ---
# Create agent_nodes table if it doesn't exist
if not inspector.has_table("agent_nodes"):
logger.info("Creating table 'agent_nodes'...")
try:
conn.execute(text("""
CREATE TABLE IF NOT EXISTS agent_nodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
node_id TEXT UNIQUE NOT NULL,
display_name TEXT NOT NULL,
description TEXT,
registered_by TEXT NOT NULL,
skill_config TEXT NOT NULL DEFAULT '{}',
capabilities TEXT DEFAULT '{}',
invite_token TEXT UNIQUE,
is_active INTEGER NOT NULL DEFAULT 1,
last_status TEXT NOT NULL DEFAULT 'offline',
last_seen_at DATETIME,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""))
conn.commit()
logger.info("Table 'agent_nodes' created.")
except Exception as e:
logger.error(f"Failed to create 'agent_nodes': {e}")
else:
# Table exists — ensure all columns are present
node_columns = [c["name"] for c in inspector.get_columns("agent_nodes")]
node_required_columns = [
("display_name", "TEXT"),
("registered_by", "TEXT"),
("skill_config", "TEXT"),
("invite_token", "TEXT"),
("is_active", "INTEGER"),
("last_status", "TEXT"),
("last_seen_at", "DATETIME"),
("capabilities", "TEXT"),
]
for col_name, col_type in node_required_columns:
if col_name not in node_columns:
logger.info(f"Adding column '{col_name}' to 'agent_nodes' table...")
try:
conn.execute(text(f"ALTER TABLE agent_nodes ADD COLUMN {col_name} {col_type}"))
conn.commit()
except Exception as e:
logger.error(f"Failed to add column '{col_name}': {e}")
# Create node_group_access table if it doesn't exist
if not inspector.has_table("node_group_access"):
logger.info("Creating table 'node_group_access'...")
try:
conn.execute(text("""
CREATE TABLE IF NOT EXISTS node_group_access (
id INTEGER PRIMARY KEY AUTOINCREMENT,
node_id TEXT NOT NULL REFERENCES agent_nodes(node_id),
group_id TEXT NOT NULL REFERENCES groups(id),
access_level TEXT NOT NULL DEFAULT 'use',
granted_by TEXT NOT NULL REFERENCES users(id),
granted_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""))
conn.commit()
logger.info("Table 'node_group_access' created.")
except Exception as e:
logger.error(f"Failed to create 'node_group_access': {e}")
# Create skill_group_access table if it doesn't exist
if not inspector.has_table("skill_group_access"):
logger.info("Creating table 'skill_group_access'...")
try:
conn.execute(text("""
CREATE TABLE IF NOT EXISTS skill_group_access (
id INTEGER PRIMARY KEY AUTOINCREMENT,
skill_id INTEGER NOT NULL REFERENCES skills(id),
group_id TEXT NOT NULL REFERENCES groups(id),
granted_by TEXT NOT NULL REFERENCES users(id),
granted_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""))
conn.commit()
logger.info("Table 'skill_group_access' created.")
except Exception as e:
logger.error(f"Failed to create 'skill_group_access': {e}")
# --- Skill table migrations ---
if inspector.has_table("skills"):
skill_columns = [c["name"] for c in inspector.get_columns("skills")]
skill_required_columns = [
("system_prompt", "TEXT"),
("is_enabled", "INTEGER DEFAULT 1"),
("features", "TEXT DEFAULT '[\"chat\"]'"),
("is_system", "INTEGER DEFAULT 0"),
("skill_type", "TEXT DEFAULT 'local'"),
("extra_metadata", "TEXT DEFAULT '{}'"),
("preview_markdown", "TEXT"),
]
for col_name, col_type in skill_required_columns:
if col_name not in skill_columns:
logger.info(f"Adding column '{col_name}' to 'skills' table...")
try:
conn.execute(text(f"ALTER TABLE skills ADD COLUMN {col_name} {col_type}"))
conn.commit()
logger.info(f"Successfully added '{col_name}' to 'skills'.")
except Exception as e:
logger.error(f"Failed to add column '{col_name}' to 'skills': {e}")
logger.info("Database migrations complete.")
if __name__ == "__main__":
run_migrations()