import os
import uuid
import logging
from sqlalchemy.orm import Session
from sqlalchemy.orm.attributes import flag_modified
from sqlalchemy.exc import SQLAlchemyError
from app.db import models
from app.api import schemas
logger = logging.getLogger(__name__)
class SessionService:
def __init__(self, services=None):
self.services = services
def _mount_skills_to_workspace(self, db: Session, session: models.Session):
if not session.sync_workspace_id: return
try:
orchestrator = getattr(self.services, "orchestrator", None)
tool_service = getattr(self.services, "tool_service", None)
if not orchestrator or not orchestrator.mirror or not tool_service: return
workspace_path = orchestrator.mirror.get_workspace_path(session.sync_workspace_id)
skills_dir = os.path.join(workspace_path, ".skills")
os.makedirs(skills_dir, exist_ok=True)
tools = tool_service.get_available_tools(db, user_id=session.user_id, feature=session.feature_name)
valid_tool_names = {t["function"]["name"] for t in tools}
from app.core.skills.fs_loader import fs_loader
from app.config import settings
all_fs_skills = fs_loader.get_all_skills()
for fs_skill in all_fs_skills:
skill_name = fs_skill.get("name")
if skill_name in valid_tool_names:
feature = fs_skill.get("features", ["chat"])[0]
skill_id = fs_skill.get("id", "").replace("fs-", "")
skill_path = os.path.join(settings.DATA_DIR, "skills", feature, skill_id)
link_path = os.path.join(skills_dir, skill_name)
if os.path.exists(skill_path):
if not os.path.exists(link_path):
try:
os.symlink(skill_path, link_path, target_is_directory=True)
except OSError:
pass
except Exception as e:
logger.error(f"Failed to mount skills to workspace: {e}")
def create_session(
self,
db: Session,
user_id: str,
provider_name: str,
feature_name: str = "default",
stt_provider_name: str = None,
tts_provider_name: str = None
) -> models.Session:
try:
new_session = models.Session(
user_id=user_id,
provider_name=provider_name,
stt_provider_name=stt_provider_name,
tts_provider_name=tts_provider_name,
feature_name=feature_name,
title=f"New Chat Session"
)
db.add(new_session)
db.commit()
db.refresh(new_session)
return new_session
except SQLAlchemyError as e:
db.rollback()
raise
def auto_attach_default_nodes(self, db: Session, session: models.Session, request: schemas.SessionCreate):
user = db.query(models.User).filter(models.User.id == request.user_id).first()
if not user:
return session
node_prefs = (user.preferences or {}).get("nodes", {})
default_nodes = node_prefs.get("default_node_ids", [])
node_config = node_prefs.get("data_source", {"source": "empty"})
if request.feature_name == "swarm_control" or default_nodes:
session.sync_workspace_id = f"session-{session.id}-{uuid.uuid4().hex[:8]}"
db.commit()
db.refresh(session)
try:
if self.services and hasattr(self.services, "orchestrator") and self.services.orchestrator.mirror:
self.services.orchestrator.mirror.get_workspace_path(session.sync_workspace_id)
except Exception as mirror_err:
logger.error(f"Failed to pre-initialize server mirror: {mirror_err}")
if default_nodes:
session.attached_node_ids = list(default_nodes)
session.node_sync_status = {
nid: {"status": "pending", "last_sync": None}
for nid in default_nodes
}
db.commit()
db.refresh(session)
registry = getattr(self.services, "node_registry_service", None)
orchestrator = getattr(self.services, "orchestrator", None)
try:
assistant = orchestrator.assistant if orchestrator else None
source = node_config.get("source", "empty")
path = node_config.get("path", "")
for nid in default_nodes:
if registry:
try:
registry.emit(nid, "info", {
"message": f"Auto-attached to session {session.id}",
"workspace_id": session.sync_workspace_id,
})
except Exception: pass
if assistant:
try:
if source == "server":
assistant.push_workspace(nid, session.sync_workspace_id)
elif source == "empty":
assistant.push_workspace(nid, session.sync_workspace_id)
assistant.control_sync(nid, session.sync_workspace_id, action="START")
assistant.control_sync(nid, session.sync_workspace_id, action="UNLOCK")
elif source == "node_local":
assistant.request_manifest(nid, session.sync_workspace_id, path=path or ".")
assistant.control_sync(nid, session.sync_workspace_id, action="START", path=path or ".")
except Exception as sync_err:
logger.error(f"Failed to trigger sync for node {nid}: {sync_err}")
except Exception as e:
logger.error(f"Failed to initialize orchestrator sync: {e}")
self._mount_skills_to_workspace(db, session)
return session
def attach_nodes(self, db: Session, session_id: int, request: schemas.NodeAttachRequest) -> schemas.SessionNodeStatusResponse:
session = db.query(models.Session).filter(
models.Session.id == session_id,
models.Session.is_archived == False
).first()
if not session:
return None
if not session.sync_workspace_id:
session.sync_workspace_id = f"session-{session_id}-{uuid.uuid4().hex[:8]}"
old_node_ids = set(session.attached_node_ids or [])
new_node_ids = set(request.node_ids)
detached_nodes = old_node_ids - new_node_ids
session.attached_node_ids = list(request.node_ids)
sync_status = dict(session.node_sync_status or {})
registry = getattr(self.services, "node_registry_service", None)
for nid in new_node_ids:
if nid not in sync_status:
sync_status[nid] = {"status": "pending", "last_sync": None}
if registry:
try:
registry.emit(
nid, "info",
{"message": f"Attached to session {session_id}", "workspace_id": session.sync_workspace_id},
)
except Exception:
pass
for nid in detached_nodes:
sync_status.pop(nid, None)
session.node_sync_status = sync_status
flag_modified(session, "attached_node_ids")
flag_modified(session, "node_sync_status")
db.commit()
db.refresh(session)
orchestrator = getattr(self.services, "orchestrator", None)
if not orchestrator:
logger.warning("Orchestrator not found in ServiceContainer; cannot trigger sync.")
else:
try:
assistant = orchestrator.assistant
config = request.config or schemas.NodeWorkspaceConfig(source="empty")
old_config = session.sync_config or {}
strategy_changed = False
if old_config and (config.source != old_config.get("source") or \
config.path != old_config.get("path") or \
config.source_node_id != old_config.get("source_node_id")):
strategy_changed = True
session.sync_config = config.model_dump()
db.commit()
if strategy_changed:
for nid in old_node_ids:
assistant.clear_workspace(nid, session.sync_workspace_id)
if getattr(orchestrator, "mirror", None):
orchestrator.mirror.purge(session.sync_workspace_id)
else:
for nid in detached_nodes:
assistant.clear_workspace(nid, session.sync_workspace_id)
for nid in request.node_ids:
if config.source == "server":
assistant.push_workspace(nid, session.sync_workspace_id)
assistant.control_sync(nid, session.sync_workspace_id, action="LOCK")
elif config.source == "empty":
assistant.push_workspace(nid, session.sync_workspace_id)
assistant.control_sync(nid, session.sync_workspace_id, action="START")
assistant.control_sync(nid, session.sync_workspace_id, action="UNLOCK")
elif config.source == "node_local":
if config.source_node_id == nid:
assistant.request_manifest(nid, session.sync_workspace_id, path=config.path or ".")
assistant.control_sync(nid, session.sync_workspace_id, action="START", path=config.path or ".")
assistant.control_sync(nid, session.sync_workspace_id, action="UNLOCK")
else:
assistant.control_sync(nid, session.sync_workspace_id, action="START")
assistant.control_sync(nid, session.sync_workspace_id, action="LOCK")
assistant.push_workspace(nid, session.sync_workspace_id)
if config.read_only_node_ids and nid in config.read_only_node_ids:
assistant.control_sync(nid, session.sync_workspace_id, action="LOCK")
except Exception as e:
logger.error(f"Failed to trigger session node sync: {e}")
self._mount_skills_to_workspace(db, session)
return schemas.SessionNodeStatusResponse(
session_id=session_id,
sync_workspace_id=session.sync_workspace_id,
nodes=[
schemas.NodeSyncStatusEntry(
node_id=nid,
status=sync_status.get(nid, {}).get("status", "pending"),
last_sync=sync_status.get(nid, {}).get("last_sync"),
)
for nid in session.attached_node_ids
],
sync_config=session.sync_config or {}
)