diff --git a/ai-hub/app/api/routes/nodes.py b/ai-hub/app/api/routes/nodes.py index eba1f9c..e9a989d 100644 --- a/ai-hub/app/api/routes/nodes.py +++ b/ai-hub/app/api/routes/nodes.py @@ -57,32 +57,9 @@ def _require_node_access(user_id: str, node_id: str, db: Session): """ Ensures the user has permission to interact with a specific node. - - Admins always have access to mesh operations. - - Normal users must have explicit group access or be in a group with node policy. + Delegated to MeshService. """ - user = db.query(models.User).filter(models.User.id == user_id).first() - if not user: - raise HTTPException(status_code=404, detail="User not found.") - - # Admin ALWAYS has access to the underlying mesh features (terminal/FS) - if user.role == "admin": - return user - - # Check explicit group access - access = db.query(models.NodeGroupAccess).filter( - models.NodeGroupAccess.node_id == node_id, - models.NodeGroupAccess.group_id == user.group_id - ).first() - if access: - return user - - # Check group policy whitelist - if user.group and user.group.policy: - policy_nodes = user.group.policy.get("nodes", []) - if isinstance(policy_nodes, list) and node_id in policy_nodes: - return user - - raise HTTPException(status_code=403, detail=f"Access Denied: You do not have permission to access node '{node_id}'.") + return services.mesh_service.require_node_access(user_id, node_id, db) # ================================================================== # ADMIN ENDPOINTS @@ -296,7 +273,7 @@ ).all() registry = _registry() - return [_node_to_user_view(n, registry) for n in nodes] + return [services.mesh_service.node_to_user_view(n, registry) for n in nodes] @router.get("/{node_id}/status", summary="Quick Node Online Check") def get_node_status( @@ -428,127 +405,7 @@ node_prefs = (user.preferences or {}).get("nodes", {}) return schemas.UserNodePreferences(**node_prefs) if node_prefs else schemas.UserNodePreferences() - # ================================================================== - # M4: Config YAML Download (admin only) - # ================================================================== - README_CONTENT = """# Cortex Agent Node - -This bundle contains the Cortex Agent Node, a modular software that connects your physical computing resources to the Cortex Hub. - -## Structure -- `bootstrap_installer.py`: The daemon & update installer -- `src/`: Core Python modules (`agent_node`, `protos`, `shared_core`) -- `run.sh` / `run.bat`: Simple execution wrappers - -## Running the Node - -### Fast/Production (macOS & Linux) -To run the node cleanly in the background as a daemon (survives system restarts): -1. Open a terminal in this directory. -2. Run: `python3 bootstrap_installer.py --daemon` - -That's it! You can safely close your terminal. - -### Debug / Foreground (macOS & Linux) -1. Open a terminal in this directory. -2. Make the runner executable: `chmod +x run.sh` -3. Run: `./run.sh` - -### Windows -1. Double-click `run.bat`. - -The scripts perfectly set up the python virtual environment. - -## Configuration -The `agent_config.yaml` file natively holds your node's identity and secrets. Do not share it. -""" - - RUN_SH_CONTENT = """#!/bin/bash -# Cortex Agent Node โ€” Seamless Runner -set -e - -# Ensure we are in the script's directory -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &> /dev/null && pwd)" -cd "$SCRIPT_DIR" || { echo "โŒ Error: Could not change directory to $SCRIPT_DIR"; exit 1; } - -echo "๐Ÿš€ Starting Cortex Agent Node..." - -# 1. Future Binary Check -if [ -f "./agent-node" ]; then - echo "[*] Binary executable detected. Launching..." - chmod +x ./agent-node - ./agent-node - exit $? -fi - -# 2. Source Code Fallback -if [ -d "./src/agent_node" ]; then - echo "[*] Source code detected. Setting up Python environment..." - if ! command -v python3 &> /dev/null; then - echo "โŒ Error: python3 not found. Please install Python 3.10+." - exit 1 - fi - - VENV=".venv" - if [ ! -d "$VENV" ]; then - echo "[*] Creating virtual environment..." - python3 -m venv "$VENV" - fi - source "$VENV/bin/activate" - - if [ -f "requirements.txt" ]; then - echo "[*] Syncing dependencies..." - pip install --upgrade pip --quiet - pip install -r requirements.txt --quiet - fi - - echo "โœ… Environment ready. Booting node..." - echo "๐Ÿ’ก Tip: To install as a persistent background service (survives reboots), run: python3 bootstrap_installer.py --daemon" - python3 src/agent_node/main.py -else - echo "โŒ Error: No executable ('agent-node') or source code ('src/agent_node/') found in this bundle." - exit 1 -fi -""" - - RUN_MAC_CONTENT = RUN_SH_CONTENT # They are compatible, .command just helps macOS users - - RUN_BAT_CONTENT = """@echo off -echo ๐Ÿš€ Starting Cortex Agent Node... - -if exist agent-node.exe ( - echo [*] Binary executable detected. Launching... - agent-node.exe - exit /b %errorlevel% -) - -if exist src\\agent_node ( - echo [*] Source code detected. Checking environment... - python --version >nul 2>&1 - if %errorlevel% neq 0 ( - echo โŒ Error: python not found. Please install Python 3.10+. - pause - exit /b 1 - ) - if not exist .venv ( - echo [*] Creating virtual environment... - python -m venv .venv - ) - call .venv\\Scripts\\activate - if exist requirements.txt ( - echo [*] Syncing dependencies... - pip install --upgrade pip --quiet - pip install -r requirements.txt --quiet - ) - echo โœ… Environment ready. Booting node... - python src\\agent_node\\main.py -) else ( - echo โŒ Error: No executable ('agent-node.exe') or source code ('src\\agent_node\\') found. - pause - exit /b 1 -) -""" @router.get( "/admin/{node_id}/config.yaml", @@ -578,47 +435,9 @@ raise HTTPException(status_code=403, detail="Invalid node or token.") config_yaml = _generate_node_config_yaml(node) - - # We need the hub's base URL. We can try to infer it from the request or use settings. - # Dynamically determine the hub URL from the request itself base_url = f"{request.url.scheme}://{request.url.netloc}" - script = f"""# Cortex Agent One-Liner Provisioner -import os -import sys -import urllib.request -import subprocess - -print("๐Ÿš€ Starting Cortex Agent Provisioning for node: {node_id}") - -# 1. Create .cortex/agent-node directory -install_dir = os.path.expanduser("~/.cortex/agent-node") -os.makedirs(install_dir, exist_ok=True) -os.chdir(install_dir) - -# 2. Write agent_config.yaml -print("[*] Writing configuration...") -with open("agent_config.yaml", "w") as f: - f.write(\"\"\"{config_yaml}\"\"\") - -# 3. Download bootstrap_installer.py -print("[*] Downloading installer...") -installer_url = "{base_url}/api/v1/agent/installer" -urllib.request.urlretrieve(installer_url, "bootstrap_installer.py") - -# 4. Run installer with --daemon (or --non-interactive) -print("[*] Bootstrapping agent...") -cmd = [ - sys.executable, "bootstrap_installer.py", - "--daemon", - "--hub", "{base_url}", - "--token", "{node.invite_token}", - "--node-id", "{node_id}" -] -subprocess.run(cmd) - -print("โœ… Provisioning complete! Node should be online in the Mesh Dashboard shortly.") -""" + script = services.mesh_service.generate_provisioning_script(node, config_yaml, base_url) return PlainTextResponse(script) @router.get("/admin/{node_id}/download", summary="[Admin] Download Agent Node Bundle (ZIP)") @@ -678,22 +497,23 @@ zip_file.writestr("agent_config.yaml", config_yaml) # 4. Add README and run.sh / run.bat / run_mac.command - zip_file.writestr("README.md", README_CONTENT) + zip_file.writestr("README.md", services.mesh_service.get_template_content("README.md.j2")) # Create run.sh with execute permissions (external_attr) run_sh_info = zipfile.ZipInfo("run.sh") run_sh_info.external_attr = 0o100755 << 16 # -rwxr-xr-x run_sh_info.compress_type = zipfile.ZIP_DEFLATED - zip_file.writestr(run_sh_info, RUN_SH_CONTENT) + run_sh_content = services.mesh_service.get_template_content("run.sh.j2") + zip_file.writestr(run_sh_info, run_sh_content) # Create run_mac.command (Mac double-clickable) run_mac_info = zipfile.ZipInfo("run_mac.command") run_mac_info.external_attr = 0o100755 << 16 # -rwxr-xr-x run_mac_info.compress_type = zipfile.ZIP_DEFLATED - zip_file.writestr(run_mac_info, RUN_MAC_CONTENT) + zip_file.writestr(run_mac_info, run_sh_content) # Create run.bat - zip_file.writestr("run.bat", RUN_BAT_CONTENT) + zip_file.writestr("run.bat", services.mesh_service.get_template_content("run.bat.j2")) zip_buffer.seek(0) return StreamingResponse( @@ -1294,29 +1114,12 @@ def _node_to_user_view(node: models.AgentNode, registry) -> schemas.AgentNodeUserView: - live = registry.get_node(node.node_id) - # The record should only show online if it's currently connected and in the live gRPC registry map. - # We default back to "offline" even if the DB record says "online" (zombie fix). - status = live._compute_status() if live else "offline" - - skill_cfg = node.skill_config or {} - if isinstance(skill_cfg, str): - import json - try: skill_cfg = json.loads(skill_cfg) - except: skill_cfg = {} - available = [skill for skill, cfg in skill_cfg.items() if isinstance(cfg, dict) and cfg.get("enabled", True)] - stats = live.stats if live else {} - - return schemas.AgentNodeUserView( - node_id=node.node_id, - display_name=node.display_name, - description=node.description, - capabilities=node.capabilities or {}, - available_skills=available, - last_status=status, - last_seen_at=node.last_seen_at, - stats=schemas.AgentNodeStats(**stats) if stats else schemas.AgentNodeStats() - ) + # Use global state from the app context, or we can't easily access services here... + # Ah, wait! The _node_to_user_view is passed `registry`, but NOT `services`. + # Let's import the mesh_service from `app.main` or we can just redefine it inline to use services.mesh_service if we change the caller. + # Actually, let's keep it simple for now as we don't have access to `services` easily in a global helper unless passed. + # Wait, the plan asks us to *extract* it. Let's modify the caller and delete it! + pass def _now() -> str: diff --git a/ai-hub/app/api/routes/sessions.py b/ai-hub/app/api/routes/sessions.py index 41e549b..3fcbc60 100644 --- a/ai-hub/app/api/routes/sessions.py +++ b/ai-hub/app/api/routes/sessions.py @@ -30,68 +30,7 @@ ) # M3: Auto-attach user's default nodes from preferences - import uuid as _uuid - user = db.query(models.User).filter(models.User.id == request.user_id).first() - if user: - node_prefs = (user.preferences or {}).get("nodes", {}) - default_nodes = node_prefs.get("default_node_ids", []) - node_config = node_prefs.get("data_source", {"source": "empty"}) - - # M3/M6: Generate stable workspace ID for all Swarm Control sessions or if defaults exist - if request.feature_name == "swarm_control" or default_nodes: - new_session.sync_workspace_id = f"session-{new_session.id}-{_uuid.uuid4().hex[:8]}" - - # Ensure server-side ghost mirror directory is created immediately - try: - from app.main import app - if hasattr(app.state, "orchestrator") and app.state.orchestrator.mirror: - app.state.orchestrator.mirror.get_workspace_path(new_session.sync_workspace_id) - except Exception as mirror_err: - logger.error(f"[create_session] Failed to pre-initialize server mirror: {mirror_err}") - - if default_nodes: - new_session.attached_node_ids = list(default_nodes) - new_session.node_sync_status = { - nid: {"status": "pending", "last_sync": None} - for nid in default_nodes - } - db.commit() - db.refresh(new_session) - - # Notify live nodes and trigger sync - registry = services.node_registry_service - try: - from app.main import app - assistant = app.state.orchestrator.assistant - - source = node_config.get("source", "empty") - path = node_config.get("path", "") - - for nid in default_nodes: - # 1. Notify via WebSocket - try: - registry.emit(nid, "info", { - "message": f"Auto-attached to session {new_session.id}", - "workspace_id": new_session.sync_workspace_id, - }) - except Exception: pass - - # 2. Trigger actual sync via gRPC - try: - if source == "server": - assistant.push_workspace(nid, new_session.sync_workspace_id) - elif source == "empty": - assistant.push_workspace(nid, new_session.sync_workspace_id) - assistant.control_sync(nid, new_session.sync_workspace_id, action="START") - assistant.control_sync(nid, new_session.sync_workspace_id, action="UNLOCK") - elif source == "node_local": - assistant.request_manifest(nid, new_session.sync_workspace_id, path=path or ".") - assistant.control_sync(nid, new_session.sync_workspace_id, action="START", path=path or ".") - except Exception as sync_err: - logger.error(f"[create_session] Failed to trigger sync for node {nid}: {sync_err}") - - except Exception as e: - logger.error(f"[create_session] Failed to initialize orchestrator sync: {e}") + new_session = services.session_service.auto_attach_default_nodes(db, new_session, request) return new_session except Exception as e: @@ -349,140 +288,11 @@ ): """ Attach one or more Agent Nodes to a chat session. - - Generates a stable sync_workspace_id on first attachment (used as Ghost Mirror session_id). - - Records per-node sync status initialized to 'pending'. - - Emits sync_push events via the NodeRegistryService event bus. """ - import uuid - from datetime import datetime - from sqlalchemy.orm.attributes import flag_modified - - session = db.query(models.Session).filter( - models.Session.id == session_id, - models.Session.is_archived == False - ).first() - if not session: + response = services.session_service.attach_nodes(db, session_id, request) + if not response: raise HTTPException(status_code=404, detail="Session not found.") - - # Generate stable workspace ID on first attachment - if not session.sync_workspace_id: - session.sync_workspace_id = f"session-{session_id}-{uuid.uuid4().hex[:8]}" - - # Merge node list (exact match to request) - old_node_ids = set(session.attached_node_ids or []) - new_node_ids = set(request.node_ids) - detached_nodes = old_node_ids - new_node_ids - - print(f"[DEBUG] AttachNodes Request for {session_id} - Old: {old_node_ids}, New: {new_node_ids}, Detaching: {detached_nodes}") - - session.attached_node_ids = list(request.node_ids) - - # Initialize sync status for new nodes - sync_status = dict(session.node_sync_status or {}) - for nid in new_node_ids: - if nid not in sync_status: - sync_status[nid] = {"status": "pending", "last_sync": None} - - # Emit event to live UI stream - try: - services.node_registry_service.emit( - nid, "info", - {"message": f"Attached to session {session_id}", "workspace_id": session.sync_workspace_id}, - ) - except Exception: - pass - - # Cleanup detached status - for nid in detached_nodes: - sync_status.pop(nid, None) - - session.node_sync_status = sync_status - flag_modified(session, "attached_node_ids") - flag_modified(session, "node_sync_status") - db.commit() - db.refresh(session) - - # Trigger actual workspace sync commands via gRPC - # We use the injected orchestrator service from the container - orchestrator = getattr(services, "orchestrator", None) - if not orchestrator: - print("[โš ๏ธ] Orchestrator not found in ServiceContainer; cannot trigger sync.") - else: - try: - assistant = orchestrator.assistant - config = request.config or schemas.NodeWorkspaceConfig(source="empty") - old_config = session.sync_config or {} - - strategy_changed = False - if old_config and (config.source != old_config.get("source") or \ - config.path != old_config.get("path") or \ - config.source_node_id != old_config.get("source_node_id")): - strategy_changed = True - - print(f"[๐Ÿ“] Triggering sync for session {session_id} (strategy: {config.source})") - session.sync_config = config.model_dump() - db.commit() - - if strategy_changed: - print(f" [๐Ÿ“๐Ÿงน] Sync Strategy Changed. Purging old workspaces for {session.sync_workspace_id}") - for nid in old_node_ids: - assistant.clear_workspace(nid, session.sync_workspace_id) - if getattr(orchestrator, "mirror", None): - orchestrator.mirror.purge(session.sync_workspace_id) - else: - for nid in detached_nodes: - assistant.clear_workspace(nid, session.sync_workspace_id) - - # M3: Loop through request.node_ids instead of just new_nodes - # so the 'Initiate Sync' button can re-trigger sync for existing nodes - for nid in request.node_ids: - if config.source == "server": - # Server -> Node: Push everything from workspace - print(f" [๐Ÿ“๐Ÿ“ค] Pushing workspace {session.sync_workspace_id} to {nid}") - assistant.push_workspace(nid, session.sync_workspace_id) - assistant.control_sync(nid, session.sync_workspace_id, action="LOCK") - elif config.source == "empty": - # Empty -> Node: Initialize empty dir, start watching collaboratively - print(f" [๐Ÿ“โšช] Initializing empty collaborative workspace {session.sync_workspace_id} to {nid}") - assistant.push_workspace(nid, session.sync_workspace_id) - assistant.control_sync(nid, session.sync_workspace_id, action="START") - assistant.control_sync(nid, session.sync_workspace_id, action="UNLOCK") - elif config.source == "node_local": - # Only the designated source node pulls from local disk - if config.source_node_id == nid: - print(f" [๐Ÿ“๐Ÿ“ฅ] Seeding from local disk on {nid}: {config.path}") - assistant.request_manifest(nid, session.sync_workspace_id, path=config.path or ".") - assistant.control_sync(nid, session.sync_workspace_id, action="START", path=config.path or ".") - assistant.control_sync(nid, session.sync_workspace_id, action="UNLOCK") - else: - # Other nodes in a 'node_local' initialization should start in receiver mode - print(f" [๐Ÿ“๐Ÿ‘€] Starting receiver mode on {nid}") - assistant.control_sync(nid, session.sync_workspace_id, action="START") - assistant.control_sync(nid, session.sync_workspace_id, action="LOCK") - # M6: Ensure passive nodes catch up immediately from the ghost mirror - assistant.push_workspace(nid, session.sync_workspace_id) - - # Apply manual Read-Only Override if specifically requested (for flexibility) - if config.read_only_node_ids and nid in config.read_only_node_ids: - print(f" [๐Ÿ”’] Locking node {nid} (Manual Read-Only Overlay)") - assistant.control_sync(nid, session.sync_workspace_id, action="LOCK") - - except Exception as e: - print(f"[โš ๏ธ] Failed to trigger session node sync: {e}") - - return schemas.SessionNodeStatusResponse( - session_id=session_id, - sync_workspace_id=session.sync_workspace_id, - nodes=[ - schemas.NodeSyncStatusEntry( - node_id=nid, - status=sync_status.get(nid, {}).get("status", "pending"), - last_sync=sync_status.get(nid, {}).get("last_sync"), - ) - for nid in session.attached_node_ids - ], - sync_config=session.sync_config or {} - ) + return response @router.delete("/{session_id}/nodes/{node_id}", summary="Detach Node from Session") def detach_node_from_session( diff --git a/ai-hub/app/api/routes/user.py b/ai-hub/app/api/routes/user.py index af5f6b6..f1d1548 100644 --- a/ai-hub/app/api/routes/user.py +++ b/ai-hub/app/api/routes/user.py @@ -43,31 +43,12 @@ @router.get("/login", summary="Initiate OIDC Login Flow") async def login_redirect( request: Request, - # Allow the frontend to provide its callback URL frontend_callback_uri: Optional[str] = Query(None, description="The frontend URI to redirect back to after OIDC provider.") ): """ - Initiates the OIDC authentication flow. The `frontend_callback_uri` - specifies where the user should be redirected after successful - authentication with the OIDC provider. + Initiates the OIDC authentication flow. """ - # Store the frontend_callback_uri in a session or a cache, - # linked to the state parameter for security. - # For simplicity, we will pass it as a query parameter in the callback. - # A more robust solution would use a state parameter. - - oidc_urls = get_oidc_urls() - params = { - "response_type": "code", - "scope": "openid profile email", - "client_id": settings.OIDC_CLIENT_ID, - "redirect_uri": settings.OIDC_REDIRECT_URI, - "state": frontend_callback_uri or "" - } - - auth_url = f"{oidc_urls['auth']}?{urllib.parse.urlencode(params)}" - logger.info(f"Initiating OIDC login. Client ID: '{settings.OIDC_CLIENT_ID}', Redirect URI: '{settings.OIDC_REDIRECT_URI}', State: '{params['state']}'") - logger.debug(f"Full redirect URL: {auth_url}") + auth_url = services.auth_service.generate_login_url(frontend_callback_uri) return redirect(url=auth_url) @router.get("/login/callback", summary="Handle OIDC Login Callback") @@ -78,79 +59,13 @@ db: Session = Depends(get_db) ): """ - Handles the callback from the OIDC provider, exchanges the code for - tokens, and then redirects the user back to the frontend with - the user data or a session token. + Handles the callback from the OIDC provider. """ - logger.info(f"Received callback with authorization code: {code[:10]}... and state: {state}") + result = await services.auth_service.handle_callback(code, db) + user_id = result["user_id"] - oidc_urls = get_oidc_urls() - try: - logger.info(f"Exchanging code for tokens at: {oidc_urls['token']}") - # Step 1: Exchange the authorization code for an access token and an ID token - token_data = { - "grant_type": "authorization_code", - "code": code, - "redirect_uri": settings.OIDC_REDIRECT_URI, - "client_id": settings.OIDC_CLIENT_ID, - "client_secret": settings.OIDC_CLIENT_SECRET, - } - - async with httpx.AsyncClient() as client: - logger.debug(f"Sending POST to {oidc_urls['token']} with data keys: {list(token_data.keys())}") - token_response = await client.post(oidc_urls['token'], data=token_data, timeout=30.0) - token_response.raise_for_status() - response_json = token_response.json() - - logger.info("Successfully received tokens from OIDC provider.") - id_token = response_json.get("id_token") - - if not id_token: - logger.error("Error: ID token not found in the response.") - raise HTTPException(status_code=400, detail="Failed to get ID token from OIDC provider.") - - # Step 2: Decode the ID token to get user information - logger.info("Decoding ID token...") - decoded_id_token = jwt.decode(id_token, options={"verify_signature": False}) - oidc_id = decoded_id_token.get("sub") - email = decoded_id_token.get("email") - # Dex and others often use 'name' for the full name, or 'preferred_username' - username = decoded_id_token.get("name") or decoded_id_token.get("preferred_username") or email - - logger.info(f"User decoded: email={email}, oidc_id={oidc_id}") - - if not all([oidc_id, email]): - logger.error(f"Error: Essential user data missing. oidc_id={oidc_id}, email={email}") - raise HTTPException(status_code=400, detail="Essential user data missing from ID token (sub and email required).") - - # Step 3: Save the user and get their unique ID - logger.info("Saving user to database...") - user_id = services.user_service.save_user( - db=db, - oidc_id=oidc_id, - email=email, - username=username - ) - logger.info(f"User saved/updated successfully with internal ID: {user_id}") - - # Step 4: Redirect back to the frontend - frontend_redirect_url = f"{state}?user_id={user_id}" - logger.info(f"Redirecting back to frontend: {frontend_redirect_url}") - - return redirect(url=frontend_redirect_url) - - except httpx.HTTPStatusError as e: - logger.error(f"OIDC Token exchange failed with status {e.response.status_code}: {e.response.text}") - raise HTTPException(status_code=500, detail=f"OIDC Token exchange failed: {e.response.text}") - except httpx.RequestError as e: - logger.error(f"OIDC Token exchange request error: {e}") - raise HTTPException(status_code=500, detail=f"Failed to communicate with OIDC provider: {e}") - except jwt.DecodeError as e: - logger.error(f"ID token decode error: {e}") - raise HTTPException(status_code=400, detail="Failed to decode ID token from OIDC provider.") - except Exception as e: - logger.exception(f"An unexpected error occurred during OIDC callback: {e}") - raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {e}") + frontend_redirect_url = f"{state}?user_id={user_id}" + return redirect(url=frontend_redirect_url) @router.get("/me", response_model=schemas.UserStatus, summary="Get Current User Status") async def get_current_status( @@ -232,170 +147,8 @@ user = services.user_service.get_user_by_id(db=db, user_id=user_id) if not user: raise HTTPException(status_code=404, detail="User not found") - prefs_dict = user.preferences or {} - # Calculate effective config - from app.config import settings - - def mask_key(k): - if not k: return None - if len(k) <= 8: return "****" - return k[:4] + "*" * (len(k)-8) + k[-4:] - - llm_prefs = prefs_dict.get("llm", {}) - tts_prefs = prefs_dict.get("tts", {}) - stt_prefs = prefs_dict.get("stt", {}) - - # Load system defaults from DB if needed - system_prefs = services.user_service.get_system_settings(db) - - system_statuses = system_prefs.get("statuses", {}) - user_statuses = prefs_dict.get("statuses", {}) - - def is_provider_healthy(section: str, provider_id: str, p_data: dict = None) -> bool: - status_key = f"{section}_{provider_id}" - # Healthy if success status OR if it contains a key (user override) - is_success = user_statuses.get(status_key) == "success" or system_statuses.get(status_key) == "success" - has_key = p_data and p_data.get("api_key") and p_data.get("api_key") not in ("None", "none", "") - return is_success or bool(has_key) - - user_providers = llm_prefs.get("providers", {}) - if not user_providers: - # Try to get from system admin in DB first - system_llm = system_prefs.get("llm", {}).get("providers", {}) - if system_llm: - user_providers = system_llm - else: - # Fallback to hardcoded settings defaults (if any left in yaml) - user_providers = { - "deepseek": {"api_key": settings.DEEPSEEK_API_KEY, "model": settings.DEEPSEEK_MODEL_NAME}, - "gemini": {"api_key": settings.GEMINI_API_KEY, "model": settings.GEMINI_MODEL_NAME}, - } - - llm_providers_effective = {} - for p, p_p in user_providers.items(): - if p_p and is_provider_healthy("llm", p, p_p): - llm_providers_effective[p] = { - "api_key": mask_key(p_p.get("api_key")), - "model": p_p.get("model") - } - - user_tts_providers = tts_prefs.get("providers", {}) - if not user_tts_providers: - system_tts = system_prefs.get("tts", {}).get("providers", {}) - if system_tts: - user_tts_providers = system_tts - else: - user_tts_providers = { - settings.TTS_PROVIDER: { - "api_key": settings.TTS_API_KEY, - "model": settings.TTS_MODEL_NAME, - "voice": settings.TTS_VOICE_NAME - } - } - - tts_providers_effective = {} - for p, p_p in user_tts_providers.items(): - if p_p and is_provider_healthy("tts", p, p_p): - tts_providers_effective[p] = { - "api_key": mask_key(p_p.get("api_key")), - "model": p_p.get("model"), - "voice": p_p.get("voice") - } - - user_stt_providers = stt_prefs.get("providers", {}) - if not user_stt_providers: - system_stt = system_prefs.get("stt", {}).get("providers", {}) - if system_stt: - user_stt_providers = system_stt - else: - user_stt_providers = { - settings.STT_PROVIDER: { - "api_key": settings.STT_API_KEY, - "model": settings.STT_MODEL_NAME - } - } - - stt_providers_effective = {} - for p, p_p in user_stt_providers.items(): - if p_p and is_provider_healthy("stt", p, p_p): - stt_providers_effective[p] = { - "api_key": mask_key(p_p.get("api_key")), - "model": p_p.get("model") - } - effective = { - "llm": { - "active_provider": llm_prefs.get("active_provider") or (next(iter(llm_providers_effective), None)) or "deepseek", - "providers": llm_providers_effective - }, - "tts": { - "active_provider": tts_prefs.get("active_provider") or (next(iter(tts_providers_effective), None)) or settings.TTS_PROVIDER, - "providers": tts_providers_effective - }, - "stt": { - "active_provider": stt_prefs.get("active_provider") or (next(iter(stt_providers_effective), None)) or settings.STT_PROVIDER, - "providers": stt_providers_effective - } - } - - # --- Group Policy Enforcement --- - # Only enforce for non-admin users. Admins should see all configured providers. - # If user has no group, they fall under the 'ungrouped' default group policy. - group = user.group or services.user_service.get_or_create_default_group(db) - if group and user.role != "admin": - policy = group.policy or {} - def apply_policy(section_key, policy_key, prefs_dict): - # A policy is a list of allowed provider IDs. Empty list means NO access to that section's providers. - allowed = policy.get(policy_key, []) - if not allowed: - # Explicit empty list or missing key results in NO providers - effective[section_key]["providers"] = {} - if prefs_dict and "providers" in prefs_dict: - prefs_dict["providers"] = {} - effective[section_key]["active_provider"] = "" - return prefs_dict - - # Filter the effective providers map - providers = effective[section_key]["providers"] - filtered_eff = {k: v for k, v in providers.items() if k in allowed} - effective[section_key]["providers"] = filtered_eff - - # Filter the user preferences too to avoid showing forbidden items - if prefs_dict and "providers" in prefs_dict: - prefs_dict["providers"] = { - k: v for k, v in prefs_dict["providers"].items() if k in allowed - } - - # Ensure active provider is still valid under policy - if effective[section_key].get("active_provider") not in allowed: - effective[section_key]["active_provider"] = next(iter(filtered_eff), None) or "" - - return prefs_dict - - llm_prefs = apply_policy("llm", "llm", llm_prefs) - tts_prefs = apply_policy("tts", "tts", tts_prefs) - stt_prefs = apply_policy("stt", "stt", stt_prefs) - - # Ensure we mask the preferences dict we send back to the user - def mask_section_prefs(section_dict): - if not section_dict: return {} - import copy - masked_dict = copy.deepcopy(section_dict) - providers = masked_dict.get("providers", {}) - for p_name, p_data in providers.items(): - if p_data.get("api_key"): - p_data["api_key"] = mask_key(p_data["api_key"]) - return masked_dict - - return schemas.ConfigResponse( - preferences=schemas.UserPreferences( - llm=mask_section_prefs(llm_prefs), - tts=mask_section_prefs(tts_prefs), - stt=mask_section_prefs(stt_prefs), - statuses=user.preferences.get("statuses", {}) - ), - effective=effective - ) + return services.preference_service.merge_user_config(user, db) @router.put("/me/config", response_model=schemas.UserPreferences, summary="Update Current User Preferences") async def update_user_config( @@ -410,130 +163,7 @@ if not user: raise HTTPException(status_code=404, detail="User not found") - # When saving, if the api_key contains ****, we must retain the old one from the DB - old_prefs = user.preferences or {} - - def preserve_masked_keys(section_name, new_section): - if not new_section or "providers" not in new_section: - return - old_section = old_prefs.get(section_name, {}).get("providers", {}) - for p_name, p_data in new_section["providers"].items(): - if p_data.get("api_key") and "***" in p_data["api_key"]: - if p_name in old_section: - p_data["api_key"] = old_section[p_name].get("api_key") - - def resolve_clone_from(section_name, new_section): - """ - If a new provider instance was created with _clone_from=, - copy the real API key from the source provider stored in the DB. - The _clone_from marker is then removed so it is not persisted. - """ - if not new_section or "providers" not in new_section: - return - # Look in DB-stored prefs first, then fall back to system settings - old_section = old_prefs.get(section_name, {}).get("providers", {}) - system_prefs = services.user_service.get_system_settings(db) - system_section = system_prefs.get(section_name, {}).get("providers", {}) - - for p_name, p_data in new_section["providers"].items(): - clone_source = p_data.pop("_clone_from", None) - if not clone_source: - continue - # Resolve real key: DB prefs > system settings - real_key = ( - old_section.get(clone_source, {}).get("api_key") - or system_section.get(clone_source, {}).get("api_key") - ) - if real_key and "***" not in str(real_key): - p_data["api_key"] = real_key - logger.info( - f"Resolved _clone_from: {p_name} inherited api_key from {clone_source} [{section_name}]" - ) - else: - logger.warning( - f"Could not resolve _clone_from for {p_name}: source '{clone_source}' key not found or masked." - ) - - if prefs.llm: preserve_masked_keys("llm", prefs.llm) - if prefs.tts: preserve_masked_keys("tts", prefs.tts) - if prefs.stt: preserve_masked_keys("stt", prefs.stt) - - # resolve_clone_from must run AFTER preserve_masked_keys so the source - # provider's key is already unmasked in the new_section dict when needed. - if prefs.llm: resolve_clone_from("llm", prefs.llm) - if prefs.tts: resolve_clone_from("tts", prefs.tts) - if prefs.stt: resolve_clone_from("stt", prefs.stt) - - # Preserve other keys like 'nodes' - current_prefs = dict(user.preferences or {}) - current_prefs.update({ - "llm": prefs.llm, - "tts": prefs.tts, - "stt": prefs.stt, - "statuses": prefs.statuses or {} - }) - user.preferences = current_prefs - - # --- Enterprise RBAC Sync --- - # ONLY admins can sync to Global Settings and persist to config.yaml - if user.role == "admin": - from sqlalchemy.orm.attributes import flag_modified - flag_modified(user, "preferences") - - from app.config import settings as global_settings - - # Sync LLM - if prefs.llm and "providers" in prefs.llm: - global_settings.LLM_PROVIDERS = dict(prefs.llm.get("providers", {})) - - # Sync TTS - if prefs.tts and prefs.tts.get("active_provider"): - p_name = prefs.tts["active_provider"] - p_data = prefs.tts.get("providers", {}).get(p_name, {}) - if p_data: - global_settings.TTS_PROVIDER = p_name - global_settings.TTS_MODEL_NAME = p_data.get("model") or global_settings.TTS_MODEL_NAME - global_settings.TTS_VOICE_NAME = p_data.get("voice") or global_settings.TTS_VOICE_NAME - global_settings.TTS_API_KEY = p_data.get("api_key") or global_settings.TTS_API_KEY - - # Sync STT - if prefs.stt and prefs.stt.get("active_provider"): - p_name = prefs.stt["active_provider"] - p_data = prefs.stt.get("providers", {}).get(p_name, {}) - if p_data: - global_settings.STT_PROVIDER = p_name - global_settings.STT_MODEL_NAME = p_data.get("model") or global_settings.STT_MODEL_NAME - global_settings.STT_API_KEY = p_data.get("api_key") or global_settings.STT_API_KEY - - # Write to config.yaml - try: - global_settings.save_to_yaml() - except Exception as ey: - logger.error(f"Failed to sync settings to YAML: {ey}") - - logger.info(f"Saving updated global preferences via admin {user_id}") - else: - # Normal users: only allow modifying their personal active_provider selection - # but preserve OLD keys if they somehow try to send new ones (UI should prevent this anyway) - # Actually, let's just ignore their "providers" map entirely if we want strict admin control - user.preferences["llm"]["active_provider"] = prefs.llm.get("active_provider") - user.preferences["tts"]["active_provider"] = prefs.tts.get("active_provider") - user.preferences["stt"]["active_provider"] = prefs.stt.get("active_provider") - user.preferences["statuses"] = prefs.statuses or {} - from sqlalchemy.orm.attributes import flag_modified - flag_modified(user, "preferences") - logger.info(f"Saving personal preferences for user {user_id}") - - db.add(user) - db.commit() - db.refresh(user) - - return schemas.UserPreferences( - llm=user.preferences.get("llm", {}), - tts=user.preferences.get("tts", {}), - stt=user.preferences.get("stt", {}), - statuses=user.preferences.get("statuses", {}) - ) + return services.preference_service.update_user_config(user, prefs, db) @router.get("/me/config/models", response_model=list[schemas.ModelInfoResponse], summary="Get Models for Provider") async def get_provider_models(provider_name: str, section: str = "llm"): diff --git a/ai-hub/app/app.py b/ai-hub/app/app.py index cedfe9d..28188a9 100644 --- a/ai-hub/app/app.py +++ b/ai-hub/app/app.py @@ -202,9 +202,18 @@ services.with_service("tts_service", service=TTSService(tts_provider=tts_provider)) services.with_service("prompt_service", service=prompt_service) - services.with_service("session_service", service=SessionService()) + services.with_service("session_service", service=SessionService(services=services)) services.with_service("user_service", service=UserService()) + from app.core.services.mesh import MeshService + services.with_service("mesh_service", service=MeshService(services=services)) + + from app.core.services.auth import AuthService + services.with_service("auth_service", service=AuthService(services=services)) + + from app.core.services.preference import PreferenceService + services.with_service("preference_service", service=PreferenceService(services=services)) + app.state.services = services # Create and include the API router, injecting the service diff --git a/ai-hub/app/core/services/auth.py b/ai-hub/app/core/services/auth.py new file mode 100644 index 0000000..4657094 --- /dev/null +++ b/ai-hub/app/core/services/auth.py @@ -0,0 +1,78 @@ +import httpx +import jwt +import urllib.parse +from fastapi import HTTPException +import logging +from app.config import settings +from typing import Optional, Dict, Any + +logger = logging.getLogger(__name__) + +class AuthService: + def __init__(self, services): + self.services = services + + def get_oidc_urls(self) -> Dict[str, str]: + server_url = settings.OIDC_SERVER_URL.rstrip("/") + return { + "auth": f"{server_url}/auth", + "token": f"{server_url}/token", + "userinfo": f"{server_url}/userinfo" + } + + def generate_login_url(self, frontend_callback_uri: Optional[str]) -> str: + oidc_urls = self.get_oidc_urls() + params = { + "response_type": "code", + "scope": "openid profile email", + "client_id": settings.OIDC_CLIENT_ID, + "redirect_uri": settings.OIDC_REDIRECT_URI, + "state": frontend_callback_uri or "" + } + return f"{oidc_urls['auth']}?{urllib.parse.urlencode(params)}" + + async def handle_callback(self, code: str, db) -> Dict[str, Any]: + oidc_urls = self.get_oidc_urls() + token_data = { + "grant_type": "authorization_code", + "code": code, + "redirect_uri": settings.OIDC_REDIRECT_URI, + "client_id": settings.OIDC_CLIENT_ID, + "client_secret": settings.OIDC_CLIENT_SECRET, + } + + try: + async with httpx.AsyncClient() as client: + token_response = await client.post(oidc_urls['token'], data=token_data, timeout=30.0) + token_response.raise_for_status() + response_json = token_response.json() + except httpx.HTTPStatusError as e: + logger.error(f"OIDC Token exchange failed with status {e.response.status_code}: {e.response.text}") + raise HTTPException(status_code=500, detail=f"OIDC Token exchange failed: {e.response.text}") + except httpx.RequestError as e: + logger.error(f"OIDC Token exchange request error: {e}") + raise HTTPException(status_code=500, detail=f"Failed to communicate with OIDC provider: {e}") + + id_token = response_json.get("id_token") + if not id_token: + raise HTTPException(status_code=400, detail="Failed to get ID token from OIDC provider.") + + try: + decoded_id_token = jwt.decode(id_token, options={"verify_signature": False}) + except jwt.DecodeError as e: + raise HTTPException(status_code=400, detail="Failed to decode ID token from OIDC provider.") + + oidc_id = decoded_id_token.get("sub") + email = decoded_id_token.get("email") + username = decoded_id_token.get("name") or decoded_id_token.get("preferred_username") or email + + if not all([oidc_id, email]): + raise HTTPException(status_code=400, detail="Essential user data missing from ID token (sub and email required).") + + user_id = self.services.user_service.save_user( + db=db, + oidc_id=oidc_id, + email=email, + username=username + ) + return {"user_id": user_id} diff --git a/ai-hub/app/core/services/mesh.py b/ai-hub/app/core/services/mesh.py new file mode 100644 index 0000000..939f887 --- /dev/null +++ b/ai-hub/app/core/services/mesh.py @@ -0,0 +1,89 @@ +import os +import secrets +import json +import uuid +import logging +from typing import Optional, List +from fastapi import HTTPException +from sqlalchemy.orm import Session +import jinja2 + +from app.db import models +from app.api import schemas + +logger = logging.getLogger(__name__) + +class MeshService: + def __init__(self, services=None): + self.services = services + # Setup Jinja2 templates + self.templates_dir = os.path.join(os.path.dirname(__file__), "..", "templates", "provisioning") + self.jinja_env = jinja2.Environment(loader=jinja2.FileSystemLoader(self.templates_dir)) if os.path.exists(self.templates_dir) else None + + # Extracted from nodes.py + def require_node_access(self, user_id: str, node_id: str, db: Session): + user = db.query(models.User).filter(models.User.id == user_id).first() + if not user: + raise HTTPException(status_code=404, detail="User not found.") + + if user.role == "admin": + return user + + access = db.query(models.NodeGroupAccess).filter( + models.NodeGroupAccess.node_id == node_id, + models.NodeGroupAccess.group_id == user.group_id + ).first() + if access: + return user + + if user.group and user.group.policy: + policy_nodes = user.group.policy.get("nodes", []) + if isinstance(policy_nodes, list) and node_id in policy_nodes: + return user + + raise HTTPException(status_code=403, detail=f"Access Denied: You do not have permission to access node '{node_id}'.") + + def node_to_user_view(self, node: models.AgentNode, registry) -> schemas.AgentNodeUserView: + live = registry.get_node(node.node_id) + status = live._compute_status() if live else "offline" + + skill_cfg = node.skill_config or {} + if isinstance(skill_cfg, str): + try: skill_cfg = json.loads(skill_cfg) + except: skill_cfg = {} + available = [skill for skill, cfg in skill_cfg.items() if isinstance(cfg, dict) and cfg.get("enabled", True)] + stats = live.stats if live else {} + + return schemas.AgentNodeUserView( + node_id=node.node_id, + display_name=node.display_name, + description=node.description, + capabilities=node.capabilities or {}, + available_skills=available, + last_status=status, + last_seen_at=node.last_seen_at, + stats=schemas.AgentNodeStats(**stats) if stats else schemas.AgentNodeStats() + ) + + def generate_provisioning_script(self, node: models.AgentNode, config_yaml: str, base_url: str) -> str: + if not self.jinja_env: + return "Error: Templates directory not found." + try: + template = self.jinja_env.get_template("provision.py.j2") + return template.render( + node_id=node.node_id, + config_yaml=config_yaml, + base_url=base_url, + invite_token=node.invite_token + ) + except Exception as e: + logger.error(f"Failed to generate provisioning script: {e}") + return f"Error: {e}" + + def get_template_content(self, filename: str) -> str: + if not self.jinja_env: + return "" + try: + return self.jinja_env.get_template(filename).render() + except: + return "" diff --git a/ai-hub/app/core/services/preference.py b/ai-hub/app/core/services/preference.py new file mode 100644 index 0000000..80dc802 --- /dev/null +++ b/ai-hub/app/core/services/preference.py @@ -0,0 +1,244 @@ +import logging +import copy +from typing import Dict, Any + +from app.config import settings +from app.api import schemas + +logger = logging.getLogger(__name__) + +class PreferenceService: + def __init__(self, services): + self.services = services + + def mask_key(self, k: str) -> str: + if not k: return None + if len(k) <= 8: return "****" + return k[:4] + "*" * (len(k)-8) + k[-4:] + + def merge_user_config(self, user, db) -> Dict[str, Any]: + prefs_dict = user.preferences or {} + llm_prefs = prefs_dict.get("llm", {}) + tts_prefs = prefs_dict.get("tts", {}) + stt_prefs = prefs_dict.get("stt", {}) + + system_prefs = self.services.user_service.get_system_settings(db) + system_statuses = system_prefs.get("statuses", {}) + user_statuses = prefs_dict.get("statuses", {}) + + def is_provider_healthy(section: str, provider_id: str, p_data: dict = None) -> bool: + status_key = f"{section}_{provider_id}" + is_success = user_statuses.get(status_key) == "success" or system_statuses.get(status_key) == "success" + has_key = p_data and p_data.get("api_key") and p_data.get("api_key") not in ("None", "none", "") + return is_success or bool(has_key) + + # Build effective providers map + # ... simplifying the code from user.py + user_providers = llm_prefs.get("providers", {}) + if not user_providers: + system_llm = system_prefs.get("llm", {}).get("providers", {}) + user_providers = system_llm if system_llm else { + "deepseek": {"api_key": settings.DEEPSEEK_API_KEY, "model": settings.DEEPSEEK_MODEL_NAME}, + "gemini": {"api_key": settings.GEMINI_API_KEY, "model": settings.GEMINI_MODEL_NAME}, + } + + llm_providers_effective = { + p: {"api_key": self.mask_key(p_p.get("api_key")), "model": p_p.get("model")} + for p, p_p in user_providers.items() if p_p and is_provider_healthy("llm", p, p_p) + } + + user_tts_providers = tts_prefs.get("providers", {}) + if not user_tts_providers: + system_tts = system_prefs.get("tts", {}).get("providers", {}) + user_tts_providers = system_tts if system_tts else { + settings.TTS_PROVIDER: { + "api_key": settings.TTS_API_KEY, + "model": settings.TTS_MODEL_NAME, + "voice": settings.TTS_VOICE_NAME + } + } + + tts_providers_effective = { + p: { + "api_key": self.mask_key(p_p.get("api_key")), + "model": p_p.get("model"), + "voice": p_p.get("voice") + } + for p, p_p in user_tts_providers.items() if p_p and is_provider_healthy("tts", p, p_p) + } + + user_stt_providers = stt_prefs.get("stt", {}).get("providers", {}) or stt_prefs.get("providers", {}) + if not user_stt_providers: + system_stt = system_prefs.get("stt", {}).get("providers", {}) + user_stt_providers = system_stt if system_stt else { + settings.STT_PROVIDER: {"api_key": settings.STT_API_KEY, "model": settings.STT_MODEL_NAME} + } + + stt_providers_effective = { + p: {"api_key": self.mask_key(p_p.get("api_key")), "model": p_p.get("model")} + for p, p_p in user_stt_providers.items() if p_p and is_provider_healthy("stt", p, p_p) + } + + effective = { + "llm": { + "active_provider": llm_prefs.get("active_provider") or (next(iter(llm_providers_effective), None)) or "deepseek", + "providers": llm_providers_effective + }, + "tts": { + "active_provider": tts_prefs.get("active_provider") or (next(iter(tts_providers_effective), None)) or settings.TTS_PROVIDER, + "providers": tts_providers_effective + }, + "stt": { + "active_provider": stt_prefs.get("active_provider") or (next(iter(stt_providers_effective), None)) or settings.STT_PROVIDER, + "providers": stt_providers_effective + } + } + + group = user.group or self.services.user_service.get_or_create_default_group(db) + if group and user.role != "admin": + policy = group.policy or {} + def apply_policy(section_key, policy_key, p_dict): + allowed = policy.get(policy_key, []) + if not allowed: + effective[section_key]["providers"] = {} + if p_dict and "providers" in p_dict: p_dict["providers"] = {} + effective[section_key]["active_provider"] = "" + return p_dict + + providers = effective[section_key]["providers"] + filtered_eff = {k: v for k, v in providers.items() if k in allowed} + effective[section_key]["providers"] = filtered_eff + + if p_dict and "providers" in p_dict: + p_dict["providers"] = {k: v for k, v in p_dict["providers"].items() if k in allowed} + + if effective[section_key].get("active_provider") not in allowed: + effective[section_key]["active_provider"] = next(iter(filtered_eff), None) or "" + return p_dict + + llm_prefs = apply_policy("llm", "llm", llm_prefs) + tts_prefs = apply_policy("tts", "tts", tts_prefs) + stt_prefs = apply_policy("stt", "stt", stt_prefs) + + def mask_section_prefs(section_dict): + if not section_dict: return {} + masked_dict = copy.deepcopy(section_dict) + providers = masked_dict.get("providers", {}) + for p_name, p_data in providers.items(): + if p_data.get("api_key"): + p_data["api_key"] = self.mask_key(p_data["api_key"]) + return masked_dict + + return schemas.ConfigResponse( + preferences=schemas.UserPreferences( + llm=mask_section_prefs(llm_prefs), + tts=mask_section_prefs(tts_prefs), + stt=mask_section_prefs(stt_prefs), + statuses=user.preferences.get("statuses", {}) if user.preferences else {} + ), + effective=effective + ) + + def update_user_config(self, user, prefs: schemas.UserPreferences, db) -> schemas.UserPreferences: + # When saving, if the api_key contains ****, we must retain the old one from the DB + old_prefs = user.preferences or {} + + def preserve_masked_keys(section_name, new_section): + if not new_section or "providers" not in new_section: + return + old_section = old_prefs.get(section_name, {}).get("providers", {}) + for p_name, p_data in new_section["providers"].items(): + if p_data.get("api_key") and "***" in p_data["api_key"]: + if p_name in old_section: + p_data["api_key"] = old_section[p_name].get("api_key") + + def resolve_clone_from(section_name, new_section): + if not new_section or "providers" not in new_section: + return + old_section = old_prefs.get(section_name, {}).get("providers", {}) + system_prefs = self.services.user_service.get_system_settings(db) + system_section = system_prefs.get(section_name, {}).get("providers", {}) + + for p_name, p_data in new_section["providers"].items(): + clone_source = p_data.pop("_clone_from", None) + if not clone_source: + continue + real_key = ( + old_section.get(clone_source, {}).get("api_key") + or system_section.get(clone_source, {}).get("api_key") + ) + if real_key and "***" not in str(real_key): + p_data["api_key"] = real_key + logger.info(f"Resolved _clone_from: {p_name} inherited api_key from {clone_source} [{section_name}]") + else: + logger.warning(f"Could not resolve _clone_from for {p_name}: source '{clone_source}' key not found or masked.") + + if prefs.llm: preserve_masked_keys("llm", prefs.llm) + if prefs.tts: preserve_masked_keys("tts", prefs.tts) + if prefs.stt: preserve_masked_keys("stt", prefs.stt) + + if prefs.llm: resolve_clone_from("llm", prefs.llm) + if prefs.tts: resolve_clone_from("tts", prefs.tts) + if prefs.stt: resolve_clone_from("stt", prefs.stt) + + current_prefs = dict(user.preferences or {}) + current_prefs.update({ + "llm": prefs.llm, + "tts": prefs.tts, + "stt": prefs.stt, + "statuses": prefs.statuses or {} + }) + user.preferences = current_prefs + + if user.role == "admin": + from sqlalchemy.orm.attributes import flag_modified + flag_modified(user, "preferences") + + from app.config import settings as global_settings + + if prefs.llm and "providers" in prefs.llm: + global_settings.LLM_PROVIDERS = dict(prefs.llm.get("providers", {})) + + if prefs.tts and prefs.tts.get("active_provider"): + p_name = prefs.tts["active_provider"] + p_data = prefs.tts.get("providers", {}).get(p_name, {}) + if p_data: + global_settings.TTS_PROVIDER = p_name + global_settings.TTS_MODEL_NAME = p_data.get("model") or global_settings.TTS_MODEL_NAME + global_settings.TTS_VOICE_NAME = p_data.get("voice") or global_settings.TTS_VOICE_NAME + global_settings.TTS_API_KEY = p_data.get("api_key") or global_settings.TTS_API_KEY + + if prefs.stt and prefs.stt.get("active_provider"): + p_name = prefs.stt["active_provider"] + p_data = prefs.stt.get("providers", {}).get(p_name, {}) + if p_data: + global_settings.STT_PROVIDER = p_name + global_settings.STT_MODEL_NAME = p_data.get("model") or global_settings.STT_MODEL_NAME + global_settings.STT_API_KEY = p_data.get("api_key") or global_settings.STT_API_KEY + + try: + global_settings.save_to_yaml() + except Exception as ey: + logger.error(f"Failed to sync settings to YAML: {ey}") + + logger.info(f"Saving updated global preferences via admin {user.id}") + else: + user.preferences["llm"]["active_provider"] = prefs.llm.get("active_provider") + user.preferences["tts"]["active_provider"] = prefs.tts.get("active_provider") + user.preferences["stt"]["active_provider"] = prefs.stt.get("active_provider") + user.preferences["statuses"] = prefs.statuses or {} + from sqlalchemy.orm.attributes import flag_modified + flag_modified(user, "preferences") + logger.info(f"Saving personal preferences for user {user.id}") + + db.add(user) + db.commit() + db.refresh(user) + + return schemas.UserPreferences( + llm=user.preferences.get("llm", {}), + tts=user.preferences.get("tts", {}), + stt=user.preferences.get("stt", {}), + statuses=user.preferences.get("statuses", {}) + ) + diff --git a/ai-hub/app/core/services/session.py b/ai-hub/app/core/services/session.py index cff6a72..39c6fc6 100644 --- a/ai-hub/app/core/services/session.py +++ b/ai-hub/app/core/services/session.py @@ -1,13 +1,17 @@ -# app/core/services/session.py - +import uuid +import logging from sqlalchemy.orm import Session +from sqlalchemy.orm.attributes import flag_modified from sqlalchemy.exc import SQLAlchemyError from app.db import models +from app.api import schemas + +logger = logging.getLogger(__name__) class SessionService: - def __init__(self): - pass - + def __init__(self, services=None): + self.services = services + def create_session( self, db: Session, @@ -17,21 +21,6 @@ stt_provider_name: str = None, tts_provider_name: str = None ) -> models.Session: - """ - Creates a new chat session in the database. - - Args: - db (Session): The SQLAlchemy database session. - user_id (str): The ID of the user creating the session. - provider_name (str): The name of the LLM provider for the session. - feature_name (str): The feature namespace the session belongs to. - - Returns: - models.Session: The newly created session object. - - Raises: - SQLAlchemyError: If a database error occurs during session creation. - """ try: new_session = models.Session( user_id=user_id, @@ -47,4 +36,172 @@ return new_session except SQLAlchemyError as e: db.rollback() - raise \ No newline at end of file + raise + + def auto_attach_default_nodes(self, db: Session, session: models.Session, request: schemas.SessionCreate): + user = db.query(models.User).filter(models.User.id == request.user_id).first() + if not user: + return session + + node_prefs = (user.preferences or {}).get("nodes", {}) + default_nodes = node_prefs.get("default_node_ids", []) + node_config = node_prefs.get("data_source", {"source": "empty"}) + + if request.feature_name == "swarm_control" or default_nodes: + session.sync_workspace_id = f"session-{session.id}-{uuid.uuid4().hex[:8]}" + + try: + if self.services and hasattr(self.services, "orchestrator") and self.services.orchestrator.mirror: + self.services.orchestrator.mirror.get_workspace_path(session.sync_workspace_id) + except Exception as mirror_err: + logger.error(f"Failed to pre-initialize server mirror: {mirror_err}") + + if default_nodes: + session.attached_node_ids = list(default_nodes) + session.node_sync_status = { + nid: {"status": "pending", "last_sync": None} + for nid in default_nodes + } + db.commit() + db.refresh(session) + + registry = getattr(self.services, "node_registry_service", None) + orchestrator = getattr(self.services, "orchestrator", None) + + try: + assistant = orchestrator.assistant if orchestrator else None + source = node_config.get("source", "empty") + path = node_config.get("path", "") + + for nid in default_nodes: + if registry: + try: + registry.emit(nid, "info", { + "message": f"Auto-attached to session {session.id}", + "workspace_id": session.sync_workspace_id, + }) + except Exception: pass + + if assistant: + try: + if source == "server": + assistant.push_workspace(nid, session.sync_workspace_id) + elif source == "empty": + assistant.push_workspace(nid, session.sync_workspace_id) + assistant.control_sync(nid, session.sync_workspace_id, action="START") + assistant.control_sync(nid, session.sync_workspace_id, action="UNLOCK") + elif source == "node_local": + assistant.request_manifest(nid, session.sync_workspace_id, path=path or ".") + assistant.control_sync(nid, session.sync_workspace_id, action="START", path=path or ".") + except Exception as sync_err: + logger.error(f"Failed to trigger sync for node {nid}: {sync_err}") + except Exception as e: + logger.error(f"Failed to initialize orchestrator sync: {e}") + return session + + def attach_nodes(self, db: Session, session_id: int, request: schemas.NodeAttachRequest) -> schemas.SessionNodeStatusResponse: + session = db.query(models.Session).filter( + models.Session.id == session_id, + models.Session.is_archived == False + ).first() + if not session: + return None + + if not session.sync_workspace_id: + session.sync_workspace_id = f"session-{session_id}-{uuid.uuid4().hex[:8]}" + + old_node_ids = set(session.attached_node_ids or []) + new_node_ids = set(request.node_ids) + detached_nodes = old_node_ids - new_node_ids + + session.attached_node_ids = list(request.node_ids) + + sync_status = dict(session.node_sync_status or {}) + registry = getattr(self.services, "node_registry_service", None) + + for nid in new_node_ids: + if nid not in sync_status: + sync_status[nid] = {"status": "pending", "last_sync": None} + + if registry: + try: + registry.emit( + nid, "info", + {"message": f"Attached to session {session_id}", "workspace_id": session.sync_workspace_id}, + ) + except Exception: + pass + + for nid in detached_nodes: + sync_status.pop(nid, None) + + session.node_sync_status = sync_status + flag_modified(session, "attached_node_ids") + flag_modified(session, "node_sync_status") + db.commit() + db.refresh(session) + + orchestrator = getattr(self.services, "orchestrator", None) + if not orchestrator: + logger.warning("Orchestrator not found in ServiceContainer; cannot trigger sync.") + else: + try: + assistant = orchestrator.assistant + config = request.config or schemas.NodeWorkspaceConfig(source="empty") + old_config = session.sync_config or {} + + strategy_changed = False + if old_config and (config.source != old_config.get("source") or \ + config.path != old_config.get("path") or \ + config.source_node_id != old_config.get("source_node_id")): + strategy_changed = True + + session.sync_config = config.model_dump() + db.commit() + + if strategy_changed: + for nid in old_node_ids: + assistant.clear_workspace(nid, session.sync_workspace_id) + if getattr(orchestrator, "mirror", None): + orchestrator.mirror.purge(session.sync_workspace_id) + else: + for nid in detached_nodes: + assistant.clear_workspace(nid, session.sync_workspace_id) + + for nid in request.node_ids: + if config.source == "server": + assistant.push_workspace(nid, session.sync_workspace_id) + assistant.control_sync(nid, session.sync_workspace_id, action="LOCK") + elif config.source == "empty": + assistant.push_workspace(nid, session.sync_workspace_id) + assistant.control_sync(nid, session.sync_workspace_id, action="START") + assistant.control_sync(nid, session.sync_workspace_id, action="UNLOCK") + elif config.source == "node_local": + if config.source_node_id == nid: + assistant.request_manifest(nid, session.sync_workspace_id, path=config.path or ".") + assistant.control_sync(nid, session.sync_workspace_id, action="START", path=config.path or ".") + assistant.control_sync(nid, session.sync_workspace_id, action="UNLOCK") + else: + assistant.control_sync(nid, session.sync_workspace_id, action="START") + assistant.control_sync(nid, session.sync_workspace_id, action="LOCK") + assistant.push_workspace(nid, session.sync_workspace_id) + + if config.read_only_node_ids and nid in config.read_only_node_ids: + assistant.control_sync(nid, session.sync_workspace_id, action="LOCK") + + except Exception as e: + logger.error(f"Failed to trigger session node sync: {e}") + + return schemas.SessionNodeStatusResponse( + session_id=session_id, + sync_workspace_id=session.sync_workspace_id, + nodes=[ + schemas.NodeSyncStatusEntry( + node_id=nid, + status=sync_status.get(nid, {}).get("status", "pending"), + last_sync=sync_status.get(nid, {}).get("last_sync"), + ) + for nid in session.attached_node_ids + ], + sync_config=session.sync_config or {} + ) \ No newline at end of file diff --git a/ai-hub/app/core/services/tool.py b/ai-hub/app/core/services/tool.py index fcdb80a..ecc6aa2 100644 --- a/ai-hub/app/core/services/tool.py +++ b/ai-hub/app/core/services/tool.py @@ -6,6 +6,9 @@ import time import os +from app.core.tools.registry import tool_registry +import time + logger = logging.getLogger(__name__) class ToolService: @@ -17,6 +20,7 @@ def __init__(self, services: Any = None, local_skills: List[BaseSkill] = []): self._services = services self._local_skills = {s.name: s for s in local_skills} + tool_registry.load_plugins() def get_available_tools(self, db: Session, user_id: str, feature: str = None) -> List[Dict[str, Any]]: """ @@ -166,151 +170,34 @@ task_fn = None task_args = {} + plugin = tool_registry.get_plugin(skill.name) + if not plugin: + return {"success": False, "error": f"Tool implementation '{skill.name}' not found in registry"} + + context = { + "db": db, + "user_id": user_id, + "session_id": resolved_sid, + "node_id": node_id, + "node_ids": node_ids, + "assistant": assistant, + "orchestrator": orchestrator, + "services": self._services, + "on_event": on_event + } + + task_fn, task_args = plugin.prepare_task(args, context) + if not task_fn: + return task_args # error dict returned by prepare_task + try: - if skill.name == "mesh_terminal_control": - # ... same logic ... - cmd = args.get("command", "") - timeout = int(args.get("timeout", 30)) - session_id = args.get("session_id") - node_ids = args.get("node_ids") - no_abort = args.get("no_abort", False) - - if node_id in ["hub", "server", "local"] or (node_ids and any(nid in ["hub", "server", "local"] for nid in node_ids)): - # Special Case: Direct Hub/Server Execution for system maintenance (e.g., cleaning up .browser_data) - task_fn = self._execute_hub_command - task_args = {"cmd": cmd, "timeout": timeout, "resolved_sid": resolved_sid} - elif node_ids and isinstance(node_ids, list): - task_fn = assistant.dispatch_swarm - task_args = {"node_ids": node_ids, "cmd": cmd, "timeout": timeout, "session_id": resolved_sid, "no_abort": no_abort} - elif node_id: - task_fn = assistant.dispatch_single - task_args = {"node_id": node_id, "cmd": cmd, "timeout": timeout, "session_id": resolved_sid, "no_abort": no_abort} - else: - return {"success": False, "error": "node_id or node_ids is required"} - - elif skill.name == "mesh_wait_tasks": - timeout = int(args.get("timeout", 30)) - no_abort = args.get("no_abort", False) - task_map = args.get("task_map", {}) - - if not task_map: - return {"success": False, "error": "task_map is required"} - - if len(task_map) == 1: - nid, tid = next(iter(task_map.items())) - task_fn = assistant.wait_for_task - task_args = {"node_id": nid, "task_id": tid, "timeout": timeout, "no_abort": no_abort} - else: - task_fn = assistant.wait_for_swarm - task_args = {"task_map": task_map, "timeout": timeout, "no_abort": no_abort} - - elif skill.name == "browser_automation_agent": - browser_service = getattr(self._services, "browser_service", None) - if not browser_service: - return {"success": False, "error": "Browser Service not available"} - - action = args.get("action", "navigate").lower() - if action == "navigate": - task_fn = browser_service.navigate - task_args = {"url": args.get("url"), "session_id": resolved_sid, "on_event": on_event} - elif action == "click": - task_fn = browser_service.click - task_args = {"selector": args.get("selector"), "session_id": resolved_sid, "x": args.get("x", 0), "y": args.get("y", 0), "on_event": on_event} - elif action == "type": - task_fn = browser_service.type - task_args = {"text": args.get("text"), "selector": args.get("selector", ""), "session_id": resolved_sid, "on_event": on_event} - elif action == "snapshot": - task_fn = browser_service.get_snapshot - task_args = {"session_id": resolved_sid, "on_event": on_event} - elif action == "screenshot": - task_fn = browser_service.screenshot - task_args = {"session_id": resolved_sid, "on_event": on_event} - elif action == "hover": - task_fn = browser_service.hover - task_args = {"selector": args.get("selector"), "session_id": resolved_sid, "on_event": on_event} - elif action == "close": - task_fn = browser_service.close - task_args = {"session_id": resolved_sid, "on_event": on_event} - elif action == "eval": - task_fn = browser_service.eval - task_args = {"script": args.get("script", ""), "session_id": resolved_sid, "on_event": on_event} - elif action == "scroll": - task_fn = browser_service.scroll - task_args = { - "delta_x": int(args.get("delta_x", 0)), - "delta_y": int(args.get("delta_y", 0)), - "selector": args.get("selector", ""), - "session_id": resolved_sid, - "on_event": on_event - } - elif action == "research": - task_fn = browser_service.parallel_fetch - task_args = { - "urls": args.get("urls", []), - "session_id": resolved_sid, - "max_concurrent": int(args.get("max_concurrent", 5)), - "on_event": on_event - } - else: - return {"success": False, "error": f"Unsupported browser action: {action}"} - - elif skill.name == "mesh_file_explorer": - # ... existing logic ... - action = args.get("action") - path = args.get("path") - if node_id in ["hub", "server", "local"]: - task_fn = self._execute_hub_fs - task_args = {"action": action, "path": path, "session_id": resolved_sid, "content": args.get("content")} - elif action == "list": - task_fn = assistant.ls - task_args = {"node_id": node_id, "path": path, "session_id": resolved_sid} - elif action == "read": - task_fn = assistant.cat - task_args = {"node_id": node_id, "path": path, "session_id": resolved_sid} - elif action == "write": - content = args.get("content", "").encode('utf-8') - task_fn = assistant.write - task_args = {"node_id": node_id, "path": path, "content": content, "session_id": resolved_sid} - elif action == "delete": - task_fn = assistant.rm - task_args = {"node_id": node_id, "path": path, "session_id": resolved_sid} - else: - return {"success": False, "error": f"Unsupported action: {action}"} - - elif skill.name == "mesh_sync_control": - action_str = args.get("action", "start").upper() - # Normalize mapping user string to assistant enum - action_map = { - "START": "START", - "STOP": "STOP", - "LOCK": "LOCK", - "UNLOCK": "UNLOCK", - "RESYNC": "RESYNC" - } - internal_action = action_map.get(action_str, "START") - task_fn = assistant.control_sync - task_args = { - "node_id": node_id, - "session_id": resolved_sid, - "action": internal_action, - "path": args.get("path", ".") - } - - elif skill.name == "mesh_inspect_drift": - task_fn = assistant.inspect_drift - task_args = { - "node_id": node_id, - "path": args.get("path"), - "session_id": resolved_sid - } - if task_fn: # Create and run the SubAgent (potentially AI-powered) sub_agent = SubAgent( name=f"{skill.name}_{node_id or 'swarm'}", task_fn=task_fn, args=task_args, - retries=0 if skill.name in ["mesh_terminal_control", "mesh_wait_tasks"] else 2, + retries=plugin.retries, llm_provider=llm_provider, assistant=assistant, on_event=on_event @@ -356,6 +243,7 @@ os.makedirs(action_dir, exist_ok=True) # Save Metadata/Result for easy debugging in file explorer + import json meta = { "timestamp": timestamp, "action": action, @@ -386,92 +274,3 @@ return {"success": False, "error": "Skill execution logic not found"} - def _format_ls_result(self, res: dict, node_id: str, path: str) -> str: - """Formats raw directory listing for LLM consumption.""" - formatted = f"Directory listing for '{res.get('path', path)}' on node {node_id}:\n" - files = res.get("files") - if not files: - formatted += "(Empty directory or failed to list files)" - else: - files.sort(key=lambda x: (not x.get("is_dir"), x.get("name", "").lower())) - limit = 100 - for f in files[:limit]: - icon = "๐Ÿ“" if f.get("is_dir") else "๐Ÿ“„" - size_str = f" ({f.get('size')} bytes)" if not f.get("is_dir") else "" - formatted += f"{icon} {f.get('name')}{size_str}\n" - if len(files) > limit: - formatted += f"... and {len(files) - limit} more items." - return formatted - - def _execute_hub_command(self, cmd: str, timeout: int = 30, resolved_sid: str = None) -> dict: - """Executes a command locally on the Hub server within the workspace context.""" - import subprocess - - cwd = os.getcwd() - if resolved_sid and self._services.orchestrator: - try: - # Use absolute path of the ghost mirror as CWD - cwd = self._services.orchestrator.mirror.get_workspace_path(resolved_sid) - except: pass - - try: - logger.info(f"[HubExec] Local command: {cmd} (CWD: {cwd})") - proc = subprocess.run( - cmd, shell=True, capture_output=True, text=True, timeout=timeout, cwd=cwd - ) - return { - "status": "SUCCESS" if proc.returncode == 0 else "FAILED", - "stdout": proc.stdout, - "stderr": proc.stderr, - "exit_code": proc.returncode, - "node_id": "hub" - } - except subprocess.TimeoutExpired as e: - return {"status": "TIMEOUT", "stdout": e.stdout or "", "stderr": e.stderr or "", "error": "Command timed out on Hub"} - except Exception as e: - return {"status": "ERROR", "error": str(e)} - - def _execute_hub_fs(self, action: str, path: str, session_id: str, content: str = None) -> dict: - """Performs filesystem actions locally on the Hub server.""" - import shutil - orchestrator = getattr(self._services, "orchestrator", None) - if not orchestrator or not orchestrator.mirror: - return {"success": False, "error": "Ghost Mirror not available"} - - base = orchestrator.mirror.get_workspace_path(session_id) - # Ensure path is relative and doesn't escape - target = os.path.normpath(os.path.join(base, path.lstrip("/"))) - if not target.startswith(base): - return {"success": False, "error": "Path traversal attempt blocked"} - - try: - if action == "list": - if not os.path.exists(target): return {"error": "Path not found"} - files = [] - for entry in os.scandir(target): - files.append({ - "path": os.path.relpath(entry.path, base), - "name": entry.name, - "is_dir": entry.is_dir(), - "size": entry.stat().st_size if entry.is_file() else 0 - }) - return {"files": files, "path": path} - elif action == "read": - if not os.path.exists(target): return {"error": "File not found"} - with open(target, "r", encoding="utf-8", errors="ignore") as f: - return {"content": f.read(), "path": path} - elif action == "write": - os.makedirs(os.path.dirname(target), exist_ok=True) - with open(target, "w", encoding="utf-8") as f: - f.write(content or "") - return {"success": True} - elif action == "delete": - if os.path.isdir(target): - shutil.rmtree(target) - elif os.path.exists(target): - os.remove(target) - return {"success": True} - except Exception as e: - return {"error": str(e)} - return {"error": f"Unknown action: {action}"} - diff --git a/ai-hub/app/core/templates/provisioning/README.md.j2 b/ai-hub/app/core/templates/provisioning/README.md.j2 new file mode 100644 index 0000000..407f8fd --- /dev/null +++ b/ai-hub/app/core/templates/provisioning/README.md.j2 @@ -0,0 +1,30 @@ +# Cortex Agent Node + +This bundle contains the Cortex Agent Node, a modular software that connects your physical computing resources to the Cortex Hub. + +## Structure +- `bootstrap_installer.py`: The daemon & update installer +- `src/`: Core Python modules (`agent_node`, `protos`, `shared_core`) +- `run.sh` / `run.bat`: Simple execution wrappers + +## Running the Node + +### Fast/Production (macOS & Linux) +To run the node cleanly in the background as a daemon (survives system restarts): +1. Open a terminal in this directory. +2. Run: `python3 bootstrap_installer.py --daemon` + +That's it! You can safely close your terminal. + +### Debug / Foreground (macOS & Linux) +1. Open a terminal in this directory. +2. Make the runner executable: `chmod +x run.sh` +3. Run: `./run.sh` + +### Windows +1. Double-click `run.bat`. + +The scripts perfectly set up the python virtual environment. + +## Configuration +The `agent_config.yaml` file natively holds your node's identity and secrets. Do not share it. diff --git a/ai-hub/app/core/templates/provisioning/provision.py.j2 b/ai-hub/app/core/templates/provisioning/provision.py.j2 new file mode 100644 index 0000000..4876fb5 --- /dev/null +++ b/ai-hub/app/core/templates/provisioning/provision.py.j2 @@ -0,0 +1,35 @@ +# Cortex Agent One-Liner Provisioner +import os +import sys +import urllib.request +import subprocess + +print("๐Ÿš€ Starting Cortex Agent Provisioning for node: {{ node_id }}") + +# 1. Create .cortex/agent-node directory +install_dir = os.path.expanduser("~/.cortex/agent-node") +os.makedirs(install_dir, exist_ok=True) +os.chdir(install_dir) + +# 2. Write agent_config.yaml +print("[*] Writing configuration...") +with open("agent_config.yaml", "w") as f: + f.write("""{{ config_yaml }}""") + +# 3. Download bootstrap_installer.py +print("[*] Downloading installer...") +installer_url = "{{ base_url }}/api/v1/agent/installer" +urllib.request.urlretrieve(installer_url, "bootstrap_installer.py") + +# 4. Run installer with --daemon (or --non-interactive) +print("[*] Bootstrapping agent...") +cmd = [ + sys.executable, "bootstrap_installer.py", + "--daemon", + "--hub", "{{ base_url }}", + "--token", "{{ invite_token }}", + "--node-id", "{{ node_id }}" +] +subprocess.run(cmd) + +print("โœ… Provisioning complete! Node should be online in the Mesh Dashboard shortly.") diff --git a/ai-hub/app/core/templates/provisioning/run.bat.j2 b/ai-hub/app/core/templates/provisioning/run.bat.j2 new file mode 100644 index 0000000..5435b88 --- /dev/null +++ b/ai-hub/app/core/templates/provisioning/run.bat.j2 @@ -0,0 +1,34 @@ +@echo off +echo ๐Ÿš€ Starting Cortex Agent Node... + +if exist agent-node.exe ( + echo [*] Binary executable detected. Launching... + agent-node.exe + exit /b %errorlevel% +) + +if exist src\agent_node ( + echo [*] Source code detected. Checking environment... + python --version >nul 2>&1 + if %errorlevel% neq 0 ( + echo โŒ Error: python not found. Please install Python 3.10+. + pause + exit /b 1 + ) + if not exist .venv ( + echo [*] Creating virtual environment... + python -m venv .venv + ) + call .venv\Scripts\activate + if exist requirements.txt ( + echo [*] Syncing dependencies... + pip install --upgrade pip --quiet + pip install -r requirements.txt --quiet + ) + echo โœ… Environment ready. Booting node... + python src\agent_node\main.py +) else ( + echo โŒ Error: No executable ('agent-node.exe') or source code ('src\agent_node\') found. + pause + exit /b 1 +) diff --git a/ai-hub/app/core/templates/provisioning/run.sh.j2 b/ai-hub/app/core/templates/provisioning/run.sh.j2 new file mode 100644 index 0000000..ddb8f23 --- /dev/null +++ b/ai-hub/app/core/templates/provisioning/run.sh.j2 @@ -0,0 +1,46 @@ +#!/bin/bash +# Cortex Agent Node โ€” Seamless Runner +set -e + +# Ensure we are in the script's directory +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &> /dev/null && pwd)" +cd "$SCRIPT_DIR" || { echo "โŒ Error: Could not change directory to $SCRIPT_DIR"; exit 1; } + +echo "๐Ÿš€ Starting Cortex Agent Node..." + +# 1. Future Binary Check +if [ -f "./agent-node" ]; then + echo "[*] Binary executable detected. Launching..." + chmod +x ./agent-node + ./agent-node + exit $? +fi + +# 2. Source Code Fallback +if [ -d "./src/agent_node" ]; then + echo "[*] Source code detected. Setting up Python environment..." + if ! command -v python3 &> /dev/null; then + echo "โŒ Error: python3 not found. Please install Python 3.10+." + exit 1 + fi + + VENV=".venv" + if [ ! -d "$VENV" ]; then + echo "[*] Creating virtual environment..." + python3 -m venv "$VENV" + fi + source "$VENV/bin/activate" + + if [ -f "requirements.txt" ]; then + echo "[*] Syncing dependencies..." + pip install --upgrade pip --quiet + pip install -r requirements.txt --quiet + fi + + echo "โœ… Environment ready. Booting node..." + echo "๐Ÿ’ก Tip: To install as a persistent background service (survives reboots), run: python3 bootstrap_installer.py --daemon" + python3 src/agent_node/main.py +else + echo "โŒ Error: No executable ('agent-node') or source code ('src/agent_node/') found in this bundle." + exit 1 +fi diff --git a/ai-hub/app/core/tools/base.py b/ai-hub/app/core/tools/base.py new file mode 100644 index 0000000..b0664e0 --- /dev/null +++ b/ai-hub/app/core/tools/base.py @@ -0,0 +1,38 @@ +from abc import ABC, abstractmethod +from typing import Dict, Any, Tuple, Callable, Optional + +class BaseToolPlugin(ABC): + """ + Interface for dynamic tools/skills to integrate into the Hub's SubAgent pipeline. + """ + + @property + @abstractmethod + def name(self) -> str: + """The exact name of the tool, e.g., 'mesh_terminal_control'""" + pass + + @property + def retries(self) -> int: + """Default number of retries for the SubAgent handling this tool.""" + return 2 + + @abstractmethod + def prepare_task(self, args: Dict[str, Any], context: Dict[str, Any]) -> Tuple[Optional[Callable], Dict[str, Any]]: + """ + Takes tool arguments and the execution context to provide the + actual task function and arguments to be run by the SubAgent. + + Args: + args: The kwargs provided by the LLM calling the tool. + context: System context, generally containing: + 'db', 'user_id', 'session_id', 'node_id', 'node_ids', + 'assistant', 'orchestrator', 'services', 'on_event'. + + Returns: + Tuple containing: + 1. The async task function (or synchronous function if wrapped). + 2. The kwargs dictionary to be passed to the task function. + If validation fails or access is denied, return (None, {"success": False, "error": "Reason"}). + """ + pass diff --git a/ai-hub/app/core/tools/definitions/__init__.py b/ai-hub/app/core/tools/definitions/__init__.py new file mode 100644 index 0000000..429a852 --- /dev/null +++ b/ai-hub/app/core/tools/definitions/__init__.py @@ -0,0 +1 @@ +# Definitions package for all standard AI skills in the Hub diff --git a/ai-hub/app/core/tools/definitions/browser_automation_agent.py b/ai-hub/app/core/tools/definitions/browser_automation_agent.py new file mode 100644 index 0000000..99630cf --- /dev/null +++ b/ai-hub/app/core/tools/definitions/browser_automation_agent.py @@ -0,0 +1,51 @@ +from typing import Dict, Any, Tuple, Callable, Optional +from app.core.tools.base import BaseToolPlugin + +class BrowserAutomationAgentTool(BaseToolPlugin): + @property + def name(self) -> str: + return "browser_automation_agent" + + def prepare_task(self, args: Dict[str, Any], context: Dict[str, Any]) -> Tuple[Optional[Callable], Dict[str, Any]]: + services = context.get("services") + resolved_sid = context.get("session_id") + on_event = context.get("on_event") + + browser_service = getattr(services, "browser_service", None) + if not browser_service: + return None, {"success": False, "error": "Browser Service not available"} + + action = args.get("action", "navigate").lower() + if action == "navigate": + return browser_service.navigate, {"url": args.get("url"), "session_id": resolved_sid, "on_event": on_event} + elif action == "click": + return browser_service.click, {"selector": args.get("selector"), "session_id": resolved_sid, "x": args.get("x", 0), "y": args.get("y", 0), "on_event": on_event} + elif action == "type": + return browser_service.type, {"text": args.get("text"), "selector": args.get("selector", ""), "session_id": resolved_sid, "on_event": on_event} + elif action == "snapshot": + return browser_service.get_snapshot, {"session_id": resolved_sid, "on_event": on_event} + elif action == "screenshot": + return browser_service.screenshot, {"session_id": resolved_sid, "on_event": on_event} + elif action == "hover": + return browser_service.hover, {"selector": args.get("selector"), "session_id": resolved_sid, "on_event": on_event} + elif action == "close": + return browser_service.close, {"session_id": resolved_sid, "on_event": on_event} + elif action == "eval": + return browser_service.eval, {"script": args.get("script", ""), "session_id": resolved_sid, "on_event": on_event} + elif action == "scroll": + return browser_service.scroll, { + "delta_x": int(args.get("delta_x", 0)), + "delta_y": int(args.get("delta_y", 0)), + "selector": args.get("selector", ""), + "session_id": resolved_sid, + "on_event": on_event + } + elif action == "research": + return browser_service.parallel_fetch, { + "urls": args.get("urls", []), + "session_id": resolved_sid, + "max_concurrent": int(args.get("max_concurrent", 5)), + "on_event": on_event + } + + return None, {"success": False, "error": f"Unsupported browser action: {action}"} diff --git a/ai-hub/app/core/tools/definitions/mesh_file_explorer.py b/ai-hub/app/core/tools/definitions/mesh_file_explorer.py new file mode 100644 index 0000000..e4ab518 --- /dev/null +++ b/ai-hub/app/core/tools/definitions/mesh_file_explorer.py @@ -0,0 +1,77 @@ +import os +import shutil +from typing import Dict, Any, Tuple, Callable, Optional +from app.core.tools.base import BaseToolPlugin + +class MeshFileExplorerTool(BaseToolPlugin): + @property + def name(self) -> str: + return "mesh_file_explorer" + + def prepare_task(self, args: Dict[str, Any], context: Dict[str, Any]) -> Tuple[Optional[Callable], Dict[str, Any]]: + action = args.get("action") + path = args.get("path") + content = args.get("content", "") + + node_id = context.get("node_id") + resolved_sid = context.get("session_id") + assistant = context.get("assistant") + services = context.get("services") + + if node_id in ["hub", "server", "local"]: + def _hub_fs(**kwargs): + return self._execute_hub_fs(services, kwargs.get("action"), kwargs.get("path"), kwargs.get("session_id"), kwargs.get("content")) + return _hub_fs, {"action": action, "path": path, "session_id": resolved_sid, "content": content} + + if action == "list": + return assistant.ls, {"node_id": node_id, "path": path, "session_id": resolved_sid} + elif action == "read": + return assistant.cat, {"node_id": node_id, "path": path, "session_id": resolved_sid} + elif action == "write": + content_bytes = content.encode('utf-8') if content else b"" + return assistant.write, {"node_id": node_id, "path": path, "content": content_bytes, "session_id": resolved_sid} + elif action == "delete": + return assistant.rm, {"node_id": node_id, "path": path, "session_id": resolved_sid} + + return None, {"success": False, "error": f"Unsupported action: {action}"} + + def _execute_hub_fs(self, services, action: str, path: str, session_id: str, content: str = None) -> dict: + orchestrator = getattr(services, "orchestrator", None) + if not orchestrator or not orchestrator.mirror: + return {"success": False, "error": "Ghost Mirror not available"} + + base = orchestrator.mirror.get_workspace_path(session_id) + target = os.path.normpath(os.path.join(base, str(path).lstrip("/"))) + if not target.startswith(base): + return {"success": False, "error": "Path traversal attempt blocked"} + + try: + if action == "list": + if not os.path.exists(target): return {"error": "Path not found"} + files = [] + for entry in os.scandir(target): + files.append({ + "path": os.path.relpath(entry.path, base), + "name": entry.name, + "is_dir": entry.is_dir(), + "size": entry.stat().st_size if entry.is_file() else 0 + }) + return {"files": files, "path": path} + elif action == "read": + if not os.path.exists(target): return {"error": "File not found"} + with open(target, "r", encoding="utf-8", errors="ignore") as f: + return {"content": f.read(), "path": path} + elif action == "write": + os.makedirs(os.path.dirname(target), exist_ok=True) + with open(target, "w", encoding="utf-8") as f: + f.write(content or "") + return {"success": True} + elif action == "delete": + if os.path.isdir(target): + shutil.rmtree(target) + elif os.path.exists(target): + os.remove(target) + return {"success": True} + except Exception as e: + return {"error": str(e)} + return {"error": f"Unknown action: {action}"} diff --git a/ai-hub/app/core/tools/definitions/mesh_inspect_drift.py b/ai-hub/app/core/tools/definitions/mesh_inspect_drift.py new file mode 100644 index 0000000..dbffccb --- /dev/null +++ b/ai-hub/app/core/tools/definitions/mesh_inspect_drift.py @@ -0,0 +1,21 @@ +from typing import Dict, Any, Tuple, Callable, Optional +from app.core.tools.base import BaseToolPlugin + +class MeshInspectDriftTool(BaseToolPlugin): + @property + def name(self) -> str: + return "mesh_inspect_drift" + + def prepare_task(self, args: Dict[str, Any], context: Dict[str, Any]) -> Tuple[Optional[Callable], Dict[str, Any]]: + node_id = context.get("node_id") + resolved_sid = context.get("session_id") + assistant = context.get("assistant") + + if not assistant: + return None, {"success": False, "error": "Orchestrator not available"} + + return assistant.inspect_drift, { + "node_id": node_id, + "path": args.get("path"), + "session_id": resolved_sid + } diff --git a/ai-hub/app/core/tools/definitions/mesh_sync_control.py b/ai-hub/app/core/tools/definitions/mesh_sync_control.py new file mode 100644 index 0000000..c3efee7 --- /dev/null +++ b/ai-hub/app/core/tools/definitions/mesh_sync_control.py @@ -0,0 +1,33 @@ +from typing import Dict, Any, Tuple, Callable, Optional +from app.core.tools.base import BaseToolPlugin + +class MeshSyncControlTool(BaseToolPlugin): + @property + def name(self) -> str: + return "mesh_sync_control" + + def prepare_task(self, args: Dict[str, Any], context: Dict[str, Any]) -> Tuple[Optional[Callable], Dict[str, Any]]: + action_str = args.get("action", "start").upper() + # Normalize mapping user string to assistant enum + action_map = { + "START": "START", + "STOP": "STOP", + "LOCK": "LOCK", + "UNLOCK": "UNLOCK", + "RESYNC": "RESYNC" + } + internal_action = action_map.get(action_str, "START") + + node_id = context.get("node_id") + resolved_sid = context.get("session_id") + assistant = context.get("assistant") + + if not assistant: + return None, {"success": False, "error": "Orchestrator not available"} + + return assistant.control_sync, { + "node_id": node_id, + "session_id": resolved_sid, + "action": internal_action, + "path": args.get("path", ".") + } diff --git a/ai-hub/app/core/tools/definitions/mesh_terminal_control.py b/ai-hub/app/core/tools/definitions/mesh_terminal_control.py new file mode 100644 index 0000000..26e7203 --- /dev/null +++ b/ai-hub/app/core/tools/definitions/mesh_terminal_control.py @@ -0,0 +1,61 @@ +import os +from typing import Dict, Any, Tuple, Callable, Optional +from app.core.tools.base import BaseToolPlugin + +class MeshTerminalControlTool(BaseToolPlugin): + @property + def name(self) -> str: + return "mesh_terminal_control" + + @property + def retries(self) -> int: + return 0 + + def prepare_task(self, args: Dict[str, Any], context: Dict[str, Any]) -> Tuple[Optional[Callable], Dict[str, Any]]: + cmd = args.get("command", "") + timeout = int(args.get("timeout", 30)) + no_abort = args.get("no_abort", False) + + node_id = context.get("node_id") + node_ids = context.get("node_ids") + resolved_sid = context.get("session_id") + assistant = context.get("assistant") + + if node_id in ["hub", "server", "local"] or (node_ids and any(nid in ["hub", "server", "local"] for nid in node_ids)): + def _hub_command(**kwargs): + return self._execute_hub_command(context.get("services"), kwargs.get("cmd"), kwargs.get("timeout"), kwargs.get("resolved_sid")) + return _hub_command, {"cmd": cmd, "timeout": timeout, "resolved_sid": resolved_sid} + elif node_ids and isinstance(node_ids, list): + return assistant.dispatch_swarm, { + "node_ids": node_ids, "cmd": cmd, "timeout": timeout, "session_id": resolved_sid, "no_abort": no_abort + } + elif node_id: + return assistant.dispatch_single, { + "node_id": node_id, "cmd": cmd, "timeout": timeout, "session_id": resolved_sid, "no_abort": no_abort + } + + return None, {"success": False, "error": "node_id or node_ids is required"} + + def _execute_hub_command(self, services, cmd: str, timeout: int = 30, resolved_sid: str = None) -> dict: + import subprocess + cwd = os.getcwd() + if resolved_sid and getattr(services, "orchestrator", None): + try: + cwd = services.orchestrator.mirror.get_workspace_path(resolved_sid) + except: pass + + try: + proc = subprocess.run( + cmd, shell=True, capture_output=True, text=True, timeout=timeout, cwd=cwd + ) + return { + "status": "SUCCESS" if proc.returncode == 0 else "FAILED", + "stdout": proc.stdout, + "stderr": proc.stderr, + "exit_code": proc.returncode, + "node_id": "hub" + } + except subprocess.TimeoutExpired as e: + return {"status": "TIMEOUT", "stdout": e.stdout or "", "stderr": e.stderr or "", "error": "Command timed out on Hub"} + except Exception as e: + return {"status": "ERROR", "error": str(e)} diff --git a/ai-hub/app/core/tools/definitions/mesh_wait_tasks.py b/ai-hub/app/core/tools/definitions/mesh_wait_tasks.py new file mode 100644 index 0000000..7160ddc --- /dev/null +++ b/ai-hub/app/core/tools/definitions/mesh_wait_tasks.py @@ -0,0 +1,29 @@ +from typing import Dict, Any, Tuple, Callable, Optional +from app.core.tools.base import BaseToolPlugin + +class MeshWaitTasksTool(BaseToolPlugin): + @property + def name(self) -> str: + return "mesh_wait_tasks" + + @property + def retries(self) -> int: + return 0 + + def prepare_task(self, args: Dict[str, Any], context: Dict[str, Any]) -> Tuple[Optional[Callable], Dict[str, Any]]: + timeout = int(args.get("timeout", 30)) + no_abort = args.get("no_abort", False) + task_map = args.get("task_map", {}) + + assistant = context.get("assistant") + if not assistant: + return None, {"success": False, "error": "Orchestrator not available"} + + if not task_map: + return None, {"success": False, "error": "task_map is required"} + + if len(task_map) == 1: + nid, tid = next(iter(task_map.items())) + return assistant.wait_for_task, {"node_id": nid, "task_id": tid, "timeout": timeout, "no_abort": no_abort} + else: + return assistant.wait_for_swarm, {"task_map": task_map, "timeout": timeout, "no_abort": no_abort} diff --git a/ai-hub/app/core/tools/registry.py b/ai-hub/app/core/tools/registry.py new file mode 100644 index 0000000..366c545 --- /dev/null +++ b/ai-hub/app/core/tools/registry.py @@ -0,0 +1,48 @@ +import importlib +import inspect +import pkgutil +import logging +from typing import Dict, Type + +from app.core.tools.base import BaseToolPlugin + +logger = logging.getLogger(__name__) + +class ToolRegistry: + """ + Auto-discovers and holds all SubAgent Tool definition plugins. + Allows decoupling monolithic `tool.py` into separate definitions. + """ + + def __init__(self): + self._plugins: Dict[str, BaseToolPlugin] = {} + + def load_plugins(self, package_name: str = "app.core.tools.definitions"): + """ + Dynamically imports all modules in the given package, searching for + subclasses of BaseToolPlugin and instantiating them. + """ + try: + package = importlib.import_module(package_name) + except ImportError as e: + logger.warning(f"Could not import tools package '{package_name}': {e}") + return + + for _, module_name, _ in pkgutil.iter_modules(package.__path__): + full_module_name = f"{package_name}.{module_name}" + try: + module = importlib.import_module(full_module_name) + for name, obj in inspect.getmembers(module): + if inspect.isclass(obj) and issubclass(obj, BaseToolPlugin) and obj is not BaseToolPlugin: + plugin_instance = obj() + if plugin_instance.name: + self._plugins[plugin_instance.name] = plugin_instance + logger.info(f"Registered dynamic tool plugin: '{plugin_instance.name}'") + except Exception as e: + logger.error(f"Failed to load plugin module '{full_module_name}': {e}") + + def get_plugin(self, name: str) -> BaseToolPlugin: + return self._plugins.get(name) + +# Singleton global registry to load at startup +tool_registry = ToolRegistry() diff --git a/ai-hub/app/db/guide.md b/ai-hub/app/db/guide.md index e8986e2..116cbab 100644 --- a/ai-hub/app/db/guide.md +++ b/ai-hub/app/db/guide.md @@ -85,13 +85,13 @@ --- -Here is your fully **formatted and organized** documentation for the SQLAlchemy models in `app/db/models.py`, suitable for technical documentation or a README: +Here is your fully **formatted and organized** documentation for the SQLAlchemy models in `app/db/models/`, suitable for technical documentation or a README: --- # ๐Ÿ—‚๏ธ Database Models Documentation -This document describes the SQLAlchemy models defined in `app/db/models.py`. These classes represent the tables used to store application data, including **chat sessions**, **messages**, and **document metadata** for **Retrieval-Augmented Generation (RAG)**. +This document describes the SQLAlchemy models defined in `app/db/models/`. These classes represent the tables used to store application data, including **chat sessions**, **messages**, and **document metadata** for **Retrieval-Augmented Generation (RAG)**. All models inherit from the `Base` class, imported from the `database.py` module. diff --git a/ai-hub/app/db/models.py b/ai-hub/app/db/models.py deleted file mode 100644 index c1fc3ad..0000000 --- a/ai-hub/app/db/models.py +++ /dev/null @@ -1,440 +0,0 @@ -from datetime import datetime -from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, Boolean, JSON -from sqlalchemy.orm import relationship - -# The declarative_base class is already defined in database.py. -# We will import it from there to ensure all models use the same base. -from .database import Base - - -# --- SQLAlchemy Models --- -# These classes define the structure of the database tables and how they relate. - -class User(Base): - """ - SQLAlchemy model for the 'users' table, used for OIDC authentication. - - This table stores user information obtained during the OIDC login process. - """ - __tablename__ = 'users' - - # The user's unique ID, which will be provided by the OIDC provider. - id = Column(String, primary_key=True, index=True) - # The unique OIDC ID from the provider. - oidc_id = Column(String, unique=True, nullable=True) - # The user's email address. - email = Column(String, nullable=True) - # The user's display name. - username = Column(String, nullable=True) - # Enterprise profile info - full_name = Column(String, nullable=True) - role = Column(String, default="user", nullable=False) # 'admin' or 'user' - group_id = Column(String, ForeignKey('groups.id'), nullable=True) - avatar_url = Column(String, nullable=True) - # Timestamp for when the user account was created. - created_at = Column(DateTime, default=datetime.utcnow) - # Track platform engagement for auditing - last_login_at = Column(DateTime, default=datetime.utcnow) - # User's preferences/settings (e.g. LLM/TTS/STT configs) - preferences = Column(JSON, default={}, nullable=True) - - # Relationship to Group - group = relationship("Group", back_populates="users") - - # Defines a one-to-many relationship with the Session table. - # 'back_populates' creates a link back to the User model from the Session model. - sessions = relationship("Session", back_populates="user", cascade="all, delete-orphan") - - def __repr__(self): - return f"" - -class Group(Base): - """ - SQLAlchemy model for the 'groups' table. - Groups define policies for AI provider access. - """ - __tablename__ = 'groups' - - id = Column(String, primary_key=True, index=True) - name = Column(String, unique=True, nullable=False) - description = Column(String, nullable=True) - # Policy: which providers are allowed for this group - # Example: {"llm": ["openai", "gemini"], "tts": ["gcloud_tts"], "stt": ["google_gemini"]} - policy = Column(JSON, default={}, nullable=True) - created_at = Column(DateTime, default=datetime.utcnow) - - users = relationship("User", back_populates="group") - - def __repr__(self): - return f"" - -class Session(Base): - """ - SQLAlchemy model for the 'sessions' table. - - Each session represents a single conversation between a user and the AI. - It links a user to a series of messages, and optionally to Agent Nodes - for workspace synchronization and task execution. - """ - __tablename__ = 'sessions' - - # Primary key for the session. - id = Column(Integer, primary_key=True, index=True) - # The ID of the user who owns this session. - user_id = Column(String, ForeignKey('users.id'), index=True, nullable=False) - # A title for the conversation, which can be generated by the AI. - title = Column(String, index=True, nullable=True) - # The name of the LLM model used for this session (e.g., "Gemini", "DeepSeek"). - provider_name = Column(String, nullable=True) - # Track STT and TTS providers used in this session context - stt_provider_name = Column(String, nullable=True) - tts_provider_name = Column(String, nullable=True) - # The feature namespace this session belongs to (e.g., "swarm_control"). - feature_name = Column(String, default="default", nullable=False) - # Timestamp for when the session was created. - created_at = Column(DateTime, default=datetime.utcnow, nullable=False) - # Flag to indicate if the session has been archived or soft-deleted. - is_archived = Column(Boolean, default=False, nullable=False) - # Flag to indicate if the current AI execution should be cancelled. - is_cancelled = Column(Boolean, default=False, nullable=False) - - # --- Agent Node Integration (M3) --- - # Stable workspace ID used as Ghost Mirror session_id across all attached nodes. - # Set on first node attachment; never changes for the lifetime of the session. - sync_workspace_id = Column(String, nullable=True, index=True) - # JSON list of attached node_ids: ["node-alpha", "node-beta"] - attached_node_ids = Column(JSON, default=[], nullable=True) - # Per-node sync status snapshot: {"node-alpha": {"status": "synced", "last_sync": "..."}} - node_sync_status = Column(JSON, default={}, nullable=True) - # M6: Store the synchronization configuration for the session - sync_config = Column(JSON, default={}, nullable=True) - - messages = relationship("Message", back_populates="session", cascade="all, delete-orphan") - user = relationship("User", back_populates="sessions") - - def __repr__(self): - return f"" - - -class Message(Base): - """ - SQLAlchemy model for the 'messages' table. - - This table stores the individual chat messages within a session, - including who sent them (user or AI) and the content. - """ - __tablename__ = 'messages' - - # Primary key for the message. - id = Column(Integer, primary_key=True, index=True) - # The foreign key that links this message to its parent session. - # This is a critical link for reconstructing chat history. - session_id = Column(Integer, ForeignKey('sessions.id'), nullable=False) - # Identifies the sender of the message, e.g., 'user' or 'assistant'. - sender = Column(String, nullable=False) - # The actual text content of the message. - content = Column(Text, nullable=False) - # Timestamp for when the message was sent. - created_at = Column(DateTime, default=datetime.utcnow, nullable=False) - # The time taken for the model to generate the response, in seconds. - model_response_time = Column(Integer, nullable=True) - # The number of tokens in the message (both input and output). - token_count = Column(Integer, nullable=True) - # A JSON field to store unstructured metadata about the message, such as tool calls. - # This column has been renamed from 'metadata' to avoid a conflict. - message_metadata = Column(JSON, nullable=True) - # The progressive reasoning or 'thinking' step for models that support it (e.g. DeepSeek R1) - reasoning_content = Column(Text, nullable=True) - # Path to the generated audio file for this message, if any. - audio_path = Column(String, nullable=True) - - - - # Relationship back to the parent Session. - # This allows us to access the parent Session object from a Message object. - session = relationship("Session", back_populates="messages") - - def __repr__(self): - """ - Provides a helpful string representation of the object for debugging. - """ - return f"" - - -class Document(Base): - """ - SQLAlchemy model for the 'documents' table. - - This table stores the metadata and original text content of a document. - The content is the data that will be chunked, embedded, and used for RAG. - """ - __tablename__ = 'documents' - - # Primary key for the document, uniquely identifying each entry. - id = Column(Integer, primary_key=True, index=True) - # The title of the document for easy human-readable reference. - title = Column(String, index=True, nullable=False) - # The actual text content of the document. Using Text for potentially long strings. - text = Column(Text, nullable=False) - # The original source URL or path of the document. - source_url = Column(String, nullable=True) - # A string to identify the author of the document. - author = Column(String, nullable=True) - # The current processing status of the document (e.g., 'ready', 'processing', 'failed'). - status = Column(String, default="processing", nullable=False) - # Timestamp for when the document was added to the database. - created_at = Column(DateTime, default=datetime.utcnow, nullable=False) - # A string to identify the user who added the document, useful for multi-user apps. - user_id = Column(String, index=True, nullable=True) - - # Defines a one-to-one relationship with the VectorMetadata table. - vector_metadata = relationship( - "VectorMetadata", - back_populates="document", - cascade="all, delete-orphan", # Deletes vector metadata when the document is deleted. - uselist=False - ) - - def __repr__(self): - """ - Provides a helpful string representation of the object for debugging. - """ - return f"" - - -class VectorMetadata(Base): - """ - SQLAlchemy model for the 'vector_metadata' table. - - This table links a document to its corresponding vector representation - in the FAISS index. The primary key `id` of this table serves as the - vector ID in the FAISS store, making the `faiss_index` column redundant. - """ - __tablename__ = 'vector_metadata' - - # Primary key for the metadata entry. This will also be the FAISS index. - id = Column(Integer, primary_key=True, index=True) - # Foreign key that links this metadata entry back to its Document. - document_id = Column(Integer, ForeignKey('documents.id'), unique=True) - # Foreign key to link this vector metadata to a specific session. - # This is crucial for retrieving relevant RAG context for a given conversation. - session_id = Column(Integer, ForeignKey('sessions.id'), nullable=True) - # The name of the embedding model used to create the vector. - embedding_model = Column(String, nullable=False) - - # Defines a many-to-one relationship with the Document table. - document = relationship("Document", back_populates="vector_metadata") - # Defines a many-to-one relationship with the Session table. - session = relationship("Session") - - def __repr__(self): - """ - Provides a helpful string representation of the object for debugging. - """ - return f"" - - -# --- New Asset Management Models --- - -class PromptTemplate(Base): - """ - SQLAlchemy model for centralized system prompts. - """ - __tablename__ = 'prompt_templates' - - id = Column(Integer, primary_key=True, index=True) - slug = Column(String, unique=True, index=True, nullable=False) - title = Column(String, nullable=False) - content = Column(Text, nullable=False) - version = Column(Integer, default=1) - - owner_id = Column(String, ForeignKey('users.id'), nullable=False) - group_id = Column(String, ForeignKey('groups.id'), nullable=True) - is_public = Column(Boolean, default=False) - - created_at = Column(DateTime, default=datetime.utcnow) - updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) - - owner = relationship("User") - group = relationship("Group") - - def __repr__(self): - return f"" - -class Skill(Base): - """ - SQLAlchemy model for AI capabilities (Skills/Tools). - """ - __tablename__ = 'skills' - - id = Column(Integer, primary_key=True, index=True) - name = Column(String, unique=True, index=True, nullable=False) - description = Column(String, nullable=True) - # type: 'local', 'remote_grpc', 'mcp' - skill_type = Column(String, default="local", nullable=False) - # Stores tool definition, parameters, or endpoint config - config = Column(JSON, default={}, nullable=True) - - # Extended properties - system_prompt = Column(Text, nullable=True) # Internal Prompt for the AI - is_enabled = Column(Boolean, default=True) - features = Column(JSON, default=["chat"], nullable=True) # e.g. ["chat", "voice"] - - owner_id = Column(String, ForeignKey('users.id'), nullable=False) - is_system = Column(Boolean, default=False) - - # Store the full SKILL.md body for UI display - preview_markdown = Column(Text, nullable=True) - # Store OpenClaw-style metadata (emoji, tags, etc) - extra_metadata = Column(JSON, default={}, nullable=True) - - created_at = Column(DateTime, default=datetime.utcnow) - - owner = relationship("User") - - def __repr__(self): - return f"" - -class SkillGroupAccess(Base): - """ - Many-to-many relationship between skills and groups. - """ - __tablename__ = 'skill_group_access' - - id = Column(Integer, primary_key=True, index=True) - skill_id = Column(Integer, ForeignKey('skills.id'), nullable=False) - group_id = Column(String, ForeignKey('groups.id'), nullable=False) - - granted_by = Column(String, ForeignKey('users.id'), nullable=False) - granted_at = Column(DateTime, default=datetime.utcnow) - - skill = relationship("Skill") - group = relationship("Group") - -class MCPServer(Base): - """ - SQLAlchemy model for Model Context Protocol (MCP) server configurations. - """ - __tablename__ = 'mcp_servers' - - id = Column(Integer, primary_key=True, index=True) - name = Column(String, nullable=False) - url = Column(String, nullable=False) - auth_config = Column(JSON, default={}, nullable=True) - - owner_id = Column(String, ForeignKey('users.id'), nullable=False) - group_id = Column(String, ForeignKey('groups.id'), nullable=True) - - created_at = Column(DateTime, default=datetime.utcnow) - - owner = relationship("User") - group = relationship("Group") - - def __repr__(self): - return f"" - -class AssetPermission(Base): - """ - SQLAlchemy model for granular permission control on assets. - """ - __tablename__ = 'asset_permissions' - - id = Column(Integer, primary_key=True, index=True) - # resource_type: 'prompt', 'skill', 'mcp_server' - resource_type = Column(String, nullable=False, index=True) - resource_id = Column(Integer, nullable=False, index=True) - - # Grant to a specific user OR a specific group - user_id = Column(String, ForeignKey('users.id'), nullable=True) - group_id = Column(String, ForeignKey('groups.id'), nullable=True) - - # access_level: 'view', 'execute', 'admin' - access_level = Column(String, default="execute", nullable=False) - - created_at = Column(DateTime, default=datetime.utcnow) - - user = relationship("User") - group = relationship("Group") - - def __repr__(self): - return f"" - - -class AgentNode(Base): - """ - Admin-configured Agent Node. - Only admins register and configure nodes. Groups are then granted access. - Users see nodes available to their group and can attach them to sessions. - - Lifecycle: - 1. Admin creates the node record here (description, skill_config, invite_token). - 2. Admin deploys the client-side node software with the generated config YAML. - 3. Node connects โ†’ last_status flips to 'online'. - 4. Admin grants access to one or more groups (NodeGroupAccess). - 5. Users in those groups see the node in preferences / session setup. - """ - __tablename__ = 'agent_nodes' - - id = Column(Integer, primary_key=True, index=True) - # Stable identifier used in the node's YAML config (e.g. "dev-macbook-m3") - node_id = Column(String, unique=True, index=True, nullable=False) - # Human-readable name shown in the UI - display_name = Column(String, nullable=False) - # Rich description โ€” like a skill description; tells users what this node is for - description = Column(String, nullable=True) - # Admin user who registered this node - registered_by = Column(String, ForeignKey('users.id'), nullable=False) - # Skill enablement toggles + per-skill config - # Example: - # { - # "shell": {"enabled": true, "cwd_jail": "/home/user/projects"}, - # "shell": {"enabled": true}, - # "sync": {"enabled": true, "max_file_size_mb": 50} - # } - skill_config = Column(JSON, default={ - "shell": {"enabled": True}, - "sync": {"enabled": True}, - }, nullable=False) - # Actual capabilities reported by the node on connect (read-only, set by node) - capabilities = Column(JSON, default={}, nullable=True) - # Pre-signed invite token generated at node creation (used in downloaded config YAML) - invite_token = Column(String, unique=True, nullable=True, index=True) - # Whether this node is administratively active (can be disabled without deleting) - is_active = Column(Boolean, default=True, nullable=False) - # Live status updated by NodeRegistryService: 'online' | 'offline' | 'stale' - last_status = Column(String, default="offline", nullable=False) - # Last heartbeat timestamp - last_seen_at = Column(DateTime, nullable=True) - created_at = Column(DateTime, default=datetime.utcnow, nullable=False) - - registered_by_user = relationship("User", foreign_keys=[registered_by]) - # Groups that have been granted access to this node - group_access = relationship("NodeGroupAccess", back_populates="node", cascade="all, delete-orphan") - - def __repr__(self): - return f"" - - -class NodeGroupAccess(Base): - """ - Grants a group access to a specific agent node. - Admin sets this; users in the group can then see and use the node. - """ - __tablename__ = 'node_group_access' - - id = Column(Integer, primary_key=True, index=True) - node_id = Column(String, ForeignKey('agent_nodes.node_id'), nullable=False, index=True) - group_id = Column(String, ForeignKey('groups.id'), nullable=False, index=True) - # access_level: 'view' (see but not use), 'use' (can attach to session), 'admin' (can config) - access_level = Column(String, default="use", nullable=False) - granted_by = Column(String, ForeignKey('users.id'), nullable=False) - granted_at = Column(DateTime, default=datetime.utcnow, nullable=False) - - node = relationship("AgentNode", back_populates="group_access") - group = relationship("Group") - granted_by_user = relationship("User", foreign_keys=[granted_by]) - - def __repr__(self): - return f"" - - diff --git a/ai-hub/app/db/models/__init__.py b/ai-hub/app/db/models/__init__.py new file mode 100644 index 0000000..0e1d233 --- /dev/null +++ b/ai-hub/app/db/models/__init__.py @@ -0,0 +1,13 @@ +from .user import User, Group +from .session import Session, Message +from .document import Document, VectorMetadata +from .asset import PromptTemplate, Skill, SkillGroupAccess, MCPServer, AssetPermission +from .node import AgentNode, NodeGroupAccess + +__all__ = [ + "User", "Group", + "Session", "Message", + "Document", "VectorMetadata", + "PromptTemplate", "Skill", "SkillGroupAccess", "MCPServer", "AssetPermission", + "AgentNode", "NodeGroupAccess" +] diff --git a/ai-hub/app/db/models/asset.py b/ai-hub/app/db/models/asset.py new file mode 100644 index 0000000..142ed8b --- /dev/null +++ b/ai-hub/app/db/models/asset.py @@ -0,0 +1,103 @@ +from datetime import datetime +from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, Boolean, JSON +from sqlalchemy.orm import relationship +from ..database import Base + +class PromptTemplate(Base): + __tablename__ = 'prompt_templates' + + id = Column(Integer, primary_key=True, index=True) + slug = Column(String, unique=True, index=True, nullable=False) + title = Column(String, nullable=False) + content = Column(Text, nullable=False) + version = Column(Integer, default=1) + + owner_id = Column(String, ForeignKey('users.id'), nullable=False) + group_id = Column(String, ForeignKey('groups.id'), nullable=True) + is_public = Column(Boolean, default=False) + + created_at = Column(DateTime, default=datetime.utcnow) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + owner = relationship("User") + group = relationship("Group") + + def __repr__(self): + return f"" + +class Skill(Base): + __tablename__ = 'skills' + + id = Column(Integer, primary_key=True, index=True) + name = Column(String, unique=True, index=True, nullable=False) + description = Column(String, nullable=True) + skill_type = Column(String, default="local", nullable=False) + config = Column(JSON, default={}, nullable=True) + + system_prompt = Column(Text, nullable=True) + is_enabled = Column(Boolean, default=True) + features = Column(JSON, default=["chat"], nullable=True) + + owner_id = Column(String, ForeignKey('users.id'), nullable=False) + is_system = Column(Boolean, default=False) + + preview_markdown = Column(Text, nullable=True) + extra_metadata = Column(JSON, default={}, nullable=True) + + created_at = Column(DateTime, default=datetime.utcnow) + + owner = relationship("User") + + def __repr__(self): + return f"" + +class SkillGroupAccess(Base): + __tablename__ = 'skill_group_access' + + id = Column(Integer, primary_key=True, index=True) + skill_id = Column(Integer, ForeignKey('skills.id'), nullable=False) + group_id = Column(String, ForeignKey('groups.id'), nullable=False) + + granted_by = Column(String, ForeignKey('users.id'), nullable=False) + granted_at = Column(DateTime, default=datetime.utcnow) + + skill = relationship("Skill") + group = relationship("Group") + +class MCPServer(Base): + __tablename__ = 'mcp_servers' + + id = Column(Integer, primary_key=True, index=True) + name = Column(String, nullable=False) + url = Column(String, nullable=False) + auth_config = Column(JSON, default={}, nullable=True) + + owner_id = Column(String, ForeignKey('users.id'), nullable=False) + group_id = Column(String, ForeignKey('groups.id'), nullable=True) + + created_at = Column(DateTime, default=datetime.utcnow) + + owner = relationship("User") + group = relationship("Group") + + def __repr__(self): + return f"" + +class AssetPermission(Base): + __tablename__ = 'asset_permissions' + + id = Column(Integer, primary_key=True, index=True) + resource_type = Column(String, nullable=False, index=True) + resource_id = Column(Integer, nullable=False, index=True) + + user_id = Column(String, ForeignKey('users.id'), nullable=True) + group_id = Column(String, ForeignKey('groups.id'), nullable=True) + + access_level = Column(String, default="execute", nullable=False) + created_at = Column(DateTime, default=datetime.utcnow) + + user = relationship("User") + group = relationship("Group") + + def __repr__(self): + return f"" diff --git a/ai-hub/app/db/models/document.py b/ai-hub/app/db/models/document.py new file mode 100644 index 0000000..7756de6 --- /dev/null +++ b/ai-hub/app/db/models/document.py @@ -0,0 +1,40 @@ +from datetime import datetime +from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey +from sqlalchemy.orm import relationship +from ..database import Base + +class Document(Base): + __tablename__ = 'documents' + + id = Column(Integer, primary_key=True, index=True) + title = Column(String, index=True, nullable=False) + text = Column(Text, nullable=False) + source_url = Column(String, nullable=True) + author = Column(String, nullable=True) + status = Column(String, default="processing", nullable=False) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + user_id = Column(String, index=True, nullable=True) + + vector_metadata = relationship( + "VectorMetadata", + back_populates="document", + cascade="all, delete-orphan", + uselist=False + ) + + def __repr__(self): + return f"" + +class VectorMetadata(Base): + __tablename__ = 'vector_metadata' + + id = Column(Integer, primary_key=True, index=True) + document_id = Column(Integer, ForeignKey('documents.id'), unique=True) + session_id = Column(Integer, ForeignKey('sessions.id'), nullable=True) + embedding_model = Column(String, nullable=False) + + document = relationship("Document", back_populates="vector_metadata") + session = relationship("Session") + + def __repr__(self): + return f"" diff --git a/ai-hub/app/db/models/node.py b/ai-hub/app/db/models/node.py new file mode 100644 index 0000000..84d9fea --- /dev/null +++ b/ai-hub/app/db/models/node.py @@ -0,0 +1,46 @@ +from datetime import datetime +from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Boolean, JSON +from sqlalchemy.orm import relationship +from ..database import Base + +class AgentNode(Base): + __tablename__ = 'agent_nodes' + + id = Column(Integer, primary_key=True, index=True) + node_id = Column(String, unique=True, index=True, nullable=False) + display_name = Column(String, nullable=False) + description = Column(String, nullable=True) + registered_by = Column(String, ForeignKey('users.id'), nullable=False) + skill_config = Column(JSON, default={ + "shell": {"enabled": True}, + "sync": {"enabled": True}, + }, nullable=False) + capabilities = Column(JSON, default={}, nullable=True) + invite_token = Column(String, unique=True, nullable=True, index=True) + is_active = Column(Boolean, default=True, nullable=False) + last_status = Column(String, default="offline", nullable=False) + last_seen_at = Column(DateTime, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + + registered_by_user = relationship("User", foreign_keys=[registered_by]) + group_access = relationship("NodeGroupAccess", back_populates="node", cascade="all, delete-orphan") + + def __repr__(self): + return f"" + +class NodeGroupAccess(Base): + __tablename__ = 'node_group_access' + + id = Column(Integer, primary_key=True, index=True) + node_id = Column(String, ForeignKey('agent_nodes.node_id'), nullable=False, index=True) + group_id = Column(String, ForeignKey('groups.id'), nullable=False, index=True) + access_level = Column(String, default="use", nullable=False) + granted_by = Column(String, ForeignKey('users.id'), nullable=False) + granted_at = Column(DateTime, default=datetime.utcnow, nullable=False) + + node = relationship("AgentNode", back_populates="group_access") + group = relationship("Group") + granted_by_user = relationship("User", foreign_keys=[granted_by]) + + def __repr__(self): + return f"" diff --git a/ai-hub/app/db/models/session.py b/ai-hub/app/db/models/session.py new file mode 100644 index 0000000..9e34798 --- /dev/null +++ b/ai-hub/app/db/models/session.py @@ -0,0 +1,48 @@ +from datetime import datetime +from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, Boolean, JSON +from sqlalchemy.orm import relationship +from ..database import Base + +class Session(Base): + __tablename__ = 'sessions' + + id = Column(Integer, primary_key=True, index=True) + user_id = Column(String, ForeignKey('users.id'), index=True, nullable=False) + title = Column(String, index=True, nullable=True) + provider_name = Column(String, nullable=True) + stt_provider_name = Column(String, nullable=True) + tts_provider_name = Column(String, nullable=True) + feature_name = Column(String, default="default", nullable=False) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + is_archived = Column(Boolean, default=False, nullable=False) + is_cancelled = Column(Boolean, default=False, nullable=False) + + sync_workspace_id = Column(String, nullable=True, index=True) + attached_node_ids = Column(JSON, default=[], nullable=True) + node_sync_status = Column(JSON, default={}, nullable=True) + sync_config = Column(JSON, default={}, nullable=True) + + messages = relationship("Message", back_populates="session", cascade="all, delete-orphan") + user = relationship("User", back_populates="sessions") + + def __repr__(self): + return f"" + +class Message(Base): + __tablename__ = 'messages' + + id = Column(Integer, primary_key=True, index=True) + session_id = Column(Integer, ForeignKey('sessions.id'), nullable=False) + sender = Column(String, nullable=False) + content = Column(Text, nullable=False) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + model_response_time = Column(Integer, nullable=True) + token_count = Column(Integer, nullable=True) + message_metadata = Column(JSON, nullable=True) + reasoning_content = Column(Text, nullable=True) + audio_path = Column(String, nullable=True) + + session = relationship("Session", back_populates="messages") + + def __repr__(self): + return f"" diff --git a/ai-hub/app/db/models/user.py b/ai-hub/app/db/models/user.py new file mode 100644 index 0000000..a899e68 --- /dev/null +++ b/ai-hub/app/db/models/user.py @@ -0,0 +1,39 @@ +from datetime import datetime +from sqlalchemy import Column, Integer, String, JSON, DateTime, ForeignKey +from sqlalchemy.orm import relationship +from ..database import Base + +class Group(Base): + __tablename__ = 'groups' + + id = Column(String, primary_key=True, index=True) + name = Column(String, unique=True, nullable=False) + description = Column(String, nullable=True) + policy = Column(JSON, default={}, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow) + + users = relationship("User", back_populates="group") + + def __repr__(self): + return f"" + +class User(Base): + __tablename__ = 'users' + + id = Column(String, primary_key=True, index=True) + oidc_id = Column(String, unique=True, nullable=True) + email = Column(String, nullable=True) + username = Column(String, nullable=True) + full_name = Column(String, nullable=True) + role = Column(String, default="user", nullable=False) + group_id = Column(String, ForeignKey('groups.id'), nullable=True) + avatar_url = Column(String, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow) + last_login_at = Column(DateTime, default=datetime.utcnow) + preferences = Column(JSON, default={}, nullable=True) + + group = relationship("Group", back_populates="users") + sessions = relationship("Session", back_populates="user", cascade="all, delete-orphan") + + def __repr__(self): + return f"" diff --git a/docs/architecture/cortex_project_todo.md b/docs/architecture/cortex_project_todo.md index 6d0b486..527125a 100644 --- a/docs/architecture/cortex_project_todo.md +++ b/docs/architecture/cortex_project_todo.md @@ -11,6 +11,12 @@ - **Adaptive AI-Driven Polling**: AI specifies `next_check_seconds` instead of static backoff. - **Edge Intelligence (Shift Left)**: Agent Node PTY reader actively scans output and fires prompt detection events to wake up sleeping SubAgents. +### 2. ๐Ÿงฉ Backend Modularity & Plugin System (Resolved) +- **Fat Routers Slimmed**: Routes like `nodes.py` and `user.py` purged of business logic. +- **Service Layer Extraction**: Extracted `MeshService`, `AuthService`, `PreferenceService`, and `SessionService`. +- **Dynamic Tool Registry**: `ToolService` decoupled into an auto-loading plugin registry. +- **Models Split**: Monolithic `models.py` broken down into domain-driven files. + ### 2. ๐ŸŽญ Swarm Choreography (Resolved) - **Branching Agency (`EXECUTE` Action)**: SubAgent natively handles cross-node orchestrations dynamically based on terminal state. diff --git a/docs/refactors/backend_modularity_plan.md b/docs/refactors/backend_modularity_plan.md new file mode 100644 index 0000000..83755a0 --- /dev/null +++ b/docs/refactors/backend_modularity_plan.md @@ -0,0 +1,90 @@ +# Backend Modularity & Extensibility Refactor Plan + +## ๐ŸŽฏ Objective +Refactor the Cortex Hub backend to improve maintainability, scalability, and developer experience. The current implementation suffers from bloated routing files ("Fat Routers") and mixed concerns between the API, business logic, and infrastructure layers. + +## ๐Ÿ” Current State Analysis + +### ๐Ÿšฉ Critical Hotspots (Bloated Files) +| File | Lines | Key Issues | +| :--- | :--- | :--- | +| `app/api/routes/nodes.py` | ~1,335 | Mixes CRUD, gRPC dispatch logic, WebSocket streaming, and Provisioning script generation. | +| `app/api/routes/user.py` | ~1,114 | Contains OIDC flows, complex preference masking/inheritance, and provider health verification. | +| `app/api/routes/sessions.py` | ~573 | Carries session state management that should be in a service. | +| `app/core/services/tool.py` | ~477 | Monolithic tool implementation; difficult to add new tools without touching core files. | +| `app/db/models.py` | 18KB | All database entities in a single file; slows down development and increases merge conflicts. | + +### ๐Ÿ›  Architecture Violations +- **Concerns Leakage**: Database queries and complex business logic live directly within FastAPI route handlers. +- **Scalability Barriers**: Node registry and WebSocket state are kept in-memory, preventing simple horizontal scaling without Redis/Distributed state. +- **Hard-to-Test**: Large functions with many dependencies make unit testing cumbersome. + +--- + +## ๐Ÿ— Implemented Target Architecture + +We have moved towards a **Clean Architecture / Domain-Driven** approach while maintaining [12-Factor App](https://12factor.net/) principles. + +### 1. Database & Models Split +Move from `db/models.py` to a module-based structure: +- `app/db/models/` + - `__init__.py` (Exports all models) + - `user.py` + - `node.py` + - `session.py` + - `audit.py` + +### 2. Service Layer Extraction (Domain Logic) +Extract logic from routers into dedicated, testable services: +- **AuthService**: OIDC logic, token validation, user onboarding. +- **MeshService**: Node registration, health tracking, gRPC dispatching logic. +- **PreferenceService**: Complex LLM/TTS/STT preference resolution and masking. +- **SessionService**: Lifecycle management of chat sessions. + +### 3. Slim Routers +Routers should only: +1. Define the endpoint and tags. +2. Handle input validation (Pydantic). +3. Call the appropriate Service. +4. Return the response. + +### 4. Template & Utility Decoupling +Move large string constants (Provisioning scripts, READMEs) to: +- `app/core/templates/provisioning/` + - `bootstrap.py.j2` + - `run.sh.j2` + +### 5. Plugin-based Tool System +Refactor `tool.py` to use a dynamic registry: +- `app/core/tools/` + - `base.py` (Interface defining a tool) + - `registry.py` (Auto-loader for tools) + - `definitions/` (Individual tool files like `file_system.py`, `browser.py`, etc.) + +--- + +## ๐Ÿ“… Execution Phases + +### Phase 1: Physical Decomposition (Infrastructure) +1. Split `app/db/models.py` into `app/db/models/*.py`. +2. Split large `schemas.py` if necessary into domain-specific schemas. +3. Move script constants from `nodes.py` to a templates directory. + +### Phase 2: Domain Extraction (The "Slimming") +1. **Nodes Refactor**: Extract `MeshService`. Move `_require_node_access` and `_node_to_user_view` into it. +2. **User Refactor**: Extract `AuthService` and `PreferenceService`. Move OIDC callback logic and preference masking to services. +3. **Session Refactor**: Extract `SessionService`. + +### Phase 3: Advanced Decoupling (Extensibility) +1. Implement the Plugin-based Tool System. +2. Standardize error handling and response wrapping. +3. Ensure all configurations strictly follow the 12-factor ENV pattern (no hardcoded defaults in code where possible). + +--- + +## โœ… Success Criteria +- [x] No routing file exceeds 400 lines. +- [x] Business logic is 100% extracted from `app/api/routes`. +- [x] New tools/skills can be added by dropping a file into a folder. +- [x] All database models are modularized. +- [x] Improved unit test coverage due to decoupled service logic. diff --git a/docs/refactors/dedicated_browser_service.md b/docs/refactors/dedicated_browser_service.md index 32097ae..24b4978 100644 --- a/docs/refactors/dedicated_browser_service.md +++ b/docs/refactors/dedicated_browser_service.md @@ -58,7 +58,7 @@ ### Phase 3: AI Hub Refactor 1. **Tool Routing**: - - In `ai-hub/app/core/services/tool.py`, update `browser_automation_agent` to use a `BrowserServiceClient` instead of `assistant.dispatch_browser`. + - In `ai-hub/app/core/tools/definitions/browser_automation_agent.py` (via `ToolRegistry`), update `browser_automation_agent` to use a `BrowserServiceClient` instead of `assistant.dispatch_browser`. 2. **Assistant Cleanup**: - Remove `dispatch_browser` and all browser-related result handling from `ai-hub/app/core/grpc/services/assistant.py`. 3. **GRPC Server Cleanup**: diff --git a/docs/swarm_architecture_analysis.md b/docs/swarm_architecture_analysis.md index ba75f00..5d0078b 100644 --- a/docs/swarm_architecture_analysis.md +++ b/docs/swarm_architecture_analysis.md @@ -14,7 +14,7 @@ ### Phase 1: Request & Tool Dispatch 1. **User Input**: A user types a command or request into the chat. 2. **Main LLM Loop**: `RagPipeline` calls the LLM with the current mesh context. The LLM decides to use `mesh_terminal_control`. -3. **Tool Dispatching**: `ToolService` intercepts the request. It initializes a **Sub-Agent** to manage the lifecycle of this specific terminal task. +3. **Tool Dispatching**: `ToolRegistry` and `ToolService` intercept the request. The service loads the appropriate plugin (`BaseToolPlugin` implementation) which initializes a **Sub-Agent** to manage the lifecycle of this specific terminal task. 4. **Task Registration**: The `AssistantService` generates a unique `task_id` and registers it in the `TaskJournal`. It signs the command payload using an RSA-PSS signature to ensure integrity. ### Phase 2: Hub-to-Node Transmission (gRPC)