diff --git a/ai-hub/app/api/routes/sessions.py b/ai-hub/app/api/routes/sessions.py index 127477e..9595739 100644 --- a/ai-hub/app/api/routes/sessions.py +++ b/ai-hub/app/api/routes/sessions.py @@ -5,7 +5,7 @@ from app.api import schemas from typing import AsyncGenerator, List, Optional from app.db import models -from app.core.pipelines.validator import Validator +from app.core.orchestration.validator import Validator import os import shutil @@ -93,6 +93,12 @@ Streams AI response using Server-Sent Events (SSE). Yields tokens, reasoning, and tool executions in real-time. """ + # Reset cancellation flag on fresh request + session = db.query(models.Session).filter(models.Session.id == session_id).first() + if session: + session.is_cancelled = False + db.commit() + from fastapi.responses import StreamingResponse import json @@ -508,4 +514,18 @@ sync_config=session.sync_config or {}, ) + + @router.post("/{session_id}/cancel", summary="Cancel Running AI Task") + def cancel_session_task( + session_id: int, + db: Session = Depends(get_db) + ): + session = db.query(models.Session).filter(models.Session.id == session_id).first() + if not session: + raise HTTPException(status_code=404, detail="Session not found.") + + session.is_cancelled = True + db.commit() + return {"message": "Cancellation request sent (Watchdog will interrupt on next turn)."} + return router \ No newline at end of file diff --git a/ai-hub/app/core/grpc/services/assistant.py b/ai-hub/app/core/grpc/services/assistant.py index 3e270e6..803ed18 100644 --- a/ai-hub/app/core/grpc/services/assistant.py +++ b/ai-hub/app/core/grpc/services/assistant.py @@ -3,6 +3,7 @@ import os import hashlib import logging +import shutil from app.core.grpc.utils.crypto import sign_payload, sign_browser_action from app.protos import agent_pb2 @@ -174,7 +175,8 @@ "START": agent_pb2.SyncControl.START_WATCHING, "STOP": agent_pb2.SyncControl.STOP_WATCHING, "LOCK": agent_pb2.SyncControl.LOCK, - "UNLOCK": agent_pb2.SyncControl.UNLOCK + "UNLOCK": agent_pb2.SyncControl.UNLOCK, + "RESYNC": agent_pb2.SyncControl.RESYNC } proto_action = action_map.get(action, agent_pb2.SyncControl.START_WATCHING) @@ -195,8 +197,28 @@ # Modular FS Explorer / Mesh Navigation # ================================================================== - def ls(self, node_id: str, path: str = ".", timeout=10, session_id="__fs_explorer__"): + def ls(self, node_id: str, path: str = ".", timeout=10, session_id="__fs_explorer__", force_remote: bool = False): """Requests a directory listing from a node (waits for response).""" + # Phase 1: Local Mirror Fast-Path + if session_id != "__fs_explorer__" and self.mirror and not force_remote: + workspace = self.mirror.get_workspace_path(session_id) + abs_path = os.path.normpath(os.path.join(workspace, path.lstrip("/"))) + if os.path.exists(abs_path) and os.path.isdir(abs_path): + files = [] + try: + for entry in os.scandir(abs_path): + rel = os.path.relpath(entry.path, workspace) + files.append({ + "path": rel, + "name": entry.name, + "is_dir": entry.is_dir(), + "size": entry.stat().st_size if entry.is_file() else 0, + "is_synced": True + }) + return {"files": files, "path": path} + except Exception as e: + logger.error(f"[📁📂] Local ls error for {session_id}/{path}: {e}") + node = self.registry.get_node(node_id) if not node: return {"error": "Offline"} @@ -216,7 +238,6 @@ self.journal.pop(tid) # Proactive Mirroring: start fetching content so dots turn green - # (Only for user sessions, not for node management explorer) if res and "files" in res and session_id != "__fs_explorer__": self._proactive_explorer_sync(node_id, res["files"], session_id) @@ -232,8 +253,21 @@ if not f.get("is_synced") and f.get("size", 0) < 1024 * 512: # Skip large files threading.Thread(target=self.cat, args=(node_id, f["path"], 15, session_id), daemon=True).start() - def cat(self, node_id: str, path: str, timeout=15, session_id="__fs_explorer__"): + def cat(self, node_id: str, path: str, timeout=15, session_id="__fs_explorer__", force_remote: bool = False): """Requests file content from a node (waits for result).""" + # Phase 1: Local Mirror Fast-Path + if session_id != "__fs_explorer__" and self.mirror and not force_remote: + workspace = self.mirror.get_workspace_path(session_id) + abs_path = os.path.normpath(os.path.join(workspace, path.lstrip("/"))) + if os.path.exists(abs_path) and os.path.isfile(abs_path): + try: + # Try reading as text + with open(abs_path, 'r', encoding='utf-8', errors='ignore') as f: + content = f.read() + return {"content": content, "path": path} + except Exception as e: + logger.error(f"[📁📄] Local cat error for {session_id}/{path}: {e}") + node = self.registry.get_node(node_id) if not node: return {"error": "Offline"} @@ -263,6 +297,38 @@ node = self.registry.get_node(node_id) if not node: return {"error": "Offline"} + # Phase 1: Sync local mirror ON HUB instantly (Zero Latency) + if self.mirror and session_id != "__fs_explorer__": + workspace_mirror = self.mirror.get_workspace_path(session_id) + dest = os.path.normpath(os.path.join(workspace_mirror, path.lstrip("/"))) + try: + if is_dir: + os.makedirs(dest, exist_ok=True) + else: + os.makedirs(os.path.dirname(dest), exist_ok=True) + with open(dest, "wb") as f: + f.write(content) + + # Fire and forget synchronization to the edge node + tid = f"fs-write-{int(time.time()*1000)}" + node.queue.put(agent_pb2.ServerTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + task_id=tid, + control=agent_pb2.SyncControl( + action=agent_pb2.SyncControl.WRITE, + path=path, + content=content, + is_dir=is_dir + ) + ) + )) + return {"success": True, "message": "Synchronized to local mirror and dispatched to node"} + except Exception as e: + logger.error(f"[📁✏️] Local mirror write error: {e}") + return {"error": str(e)} + + # Legacy/Explorer path: await node confirmation tid = f"fs-write-{int(time.time()*1000)}" event = self.journal.register(tid, node_id) @@ -282,27 +348,80 @@ if event.wait(timeout): res = self.journal.get_result(tid) self.journal.pop(tid) - - # M6: Update mirror locally on hub so ls sees it as synced (Only for real sessions) - if self.mirror and res.get("success") and session_id != "__fs_explorer__": - workspace_mirror = self.mirror.get_workspace_path(session_id) - dest = os.path.join(workspace_mirror, path) - if is_dir: - os.makedirs(dest, exist_ok=True) - else: - os.makedirs(os.path.dirname(dest), exist_ok=True) - with open(dest, "wb") as f: - f.write(content) - return res self.journal.pop(tid) return {"error": "Timeout"} + def inspect_drift(self, node_id: str, path: str, session_id: str): + """Returns a unified diff between Hub local mirror and Node's actual file.""" + if not self.mirror: return {"error": "Mirror not available"} + + # 1. Get Local Content + workspace = self.mirror.get_workspace_path(session_id) + local_abs = os.path.normpath(os.path.join(workspace, path.lstrip("/"))) + local_content = "" + if os.path.exists(local_abs) and os.path.isfile(local_abs): + try: + with open(local_abs, 'r', encoding='utf-8', errors='ignore') as f: + local_content = f.read() + except: pass + + # 2. Get Remote Content (Force Bypass Fast-Path) + print(f" [📁🔍] Inspecting Drift: Fetching remote content for {path} on {node_id}") + remote_res = self.cat(node_id, path, session_id=session_id, force_remote=True) + if "error" in remote_res: + return {"error": f"Failed to fetch remote content: {remote_res['error']}"} + + remote_content = remote_res.get("content", "") + + # 3. Create Diff + import difflib + diff = difflib.unified_diff( + local_content.splitlines(keepends=True), + remote_content.splitlines(keepends=True), + fromfile=f"hub://{session_id}/{path}", + tofile=f"node://{node_id}/{path}" + ) + + diff_text = "".join(diff) + return { + "path": path, + "has_drift": local_content != remote_content, + "diff": diff_text, + "local_size": len(local_content), + "remote_size": len(remote_content) + } + def rm(self, node_id: str, path: str, timeout=10, session_id="__fs_explorer__"): """Deletes a file or directory on a node (waits for status).""" node = self.registry.get_node(node_id) if not node: return {"error": "Offline"} + # Phase 1: Sync local mirror ON HUB instantly + if self.mirror and session_id != "__fs_explorer__": + workspace_mirror = self.mirror.get_workspace_path(session_id) + dest = os.path.normpath(os.path.join(workspace_mirror, path.lstrip("/"))) + try: + if os.path.isdir(dest): + shutil.rmtree(dest) + elif os.path.exists(dest): + os.remove(dest) + + # Fire and forget to edge node + tid = f"fs-rm-{int(time.time()*1000)}" + node.queue.put(agent_pb2.ServerTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + task_id=tid, + control=agent_pb2.SyncControl(action=agent_pb2.SyncControl.DELETE, path=path) + ) + )) + return {"success": True, "message": "Removed from local mirror and dispatched delete to node"} + except Exception as e: + logger.error(f"[📁🗑️] Local mirror rm error: {e}") + return {"error": str(e)} + + # Legacy/Explorer path: await node confirmation tid = f"fs-rm-{int(time.time()*1000)}" event = self.journal.register(tid, node_id) @@ -317,29 +436,22 @@ if event.wait(timeout): res = self.journal.get_result(tid) self.journal.pop(tid) - - # M6: remove from mirror if successful (Only for real sessions) - if self.mirror and res.get("success") and session_id != "__fs_explorer__": - import shutil - dest = os.path.join(self.mirror.get_workspace_path(session_id), path) - if os.path.isdir(dest): shutil.rmtree(dest) - elif os.path.exists(dest): os.remove(dest) - return res self.journal.pop(tid) return {"error": "Timeout"} def dispatch_swarm(self, node_ids, cmd, timeout=30, session_id=None, no_abort=False): """Dispatches a command to multiple nodes in parallel and waits for all results.""" - from concurrent.futures import ThreadPoolExecutor + from concurrent.futures import ThreadPoolExecutor, as_completed results = {} - with ThreadPoolExecutor(max_workers=len(node_ids)) as executor: + with ThreadPoolExecutor(max_workers=max(1, len(node_ids))) as executor: future_to_node = { executor.submit(self.dispatch_single, nid, cmd, timeout, session_id, no_abort): nid for nid in node_ids } - for future in future_to_node: + # Use as_completed to avoid blocking on a slow node when others are finished + for future in as_completed(future_to_node): node_id = future_to_node[future] try: results[node_id] = future.result() @@ -350,10 +462,12 @@ def dispatch_single(self, node_id, cmd, timeout=30, session_id=None, no_abort=False): """Dispatches a shell command to a specific node.""" + import uuid node = self.registry.get_node(node_id) if not node: return {"error": f"Node {node_id} Offline"} - tid = f"task-{int(time.time()*1000)}" + # Use UUID to prevent timestamp collisions in high-speed swarm dispatch + tid = f"task-{uuid.uuid4().hex[:12]}" event = self.journal.register(tid, node_id) # 12-Factor Signing Logic diff --git a/ai-hub/app/core/grpc/services/grpc_server.py b/ai-hub/app/core/grpc/services/grpc_server.py index a999b3b..d4f889a 100644 --- a/ai-hub/app/core/grpc/services/grpc_server.py +++ b/ai-hub/app/core/grpc/services/grpc_server.py @@ -387,6 +387,13 @@ self.registry.emit(node_id, "sync_status", {"message": fs.status.message, "code": fs.status.code}) if fs.status.code == agent_pb2.SyncStatus.RECONCILE_REQUIRED: + # Phase 3: Notify active orchestrators of drift + self.registry.emit(node_id, "mesh_observation", { + "type": "drift_warning", + "session_id": fs.session_id, + "paths": list(fs.status.reconcile_paths), + "message": f"Critical Drift: Node {node_id} has hash mismatches in {fs.session_id}." + }) for path in fs.status.reconcile_paths: self.assistant.push_file(node_id, fs.session_id, path) diff --git a/ai-hub/app/core/orchestration/__init__.py b/ai-hub/app/core/orchestration/__init__.py new file mode 100644 index 0000000..7d97670 --- /dev/null +++ b/ai-hub/app/core/orchestration/__init__.py @@ -0,0 +1,7 @@ +from .architect import Architect +from .memory import ContextManager +from .stream import StreamProcessor +from .body import ToolExecutor +from .guards import SafetyGuard + +__all__ = ["Architect", "ContextManager", "StreamProcessor", "ToolExecutor", "SafetyGuard"] diff --git a/ai-hub/app/core/orchestration/architect.py b/ai-hub/app/core/orchestration/architect.py new file mode 100644 index 0000000..3ffc52b --- /dev/null +++ b/ai-hub/app/core/orchestration/architect.py @@ -0,0 +1,185 @@ +import logging +import queue +from typing import List, Dict, Any, Optional +from app.db import models +from .memory import ContextManager +from .stream import StreamProcessor +from .body import ToolExecutor +from .guards import SafetyGuard + +class Architect: + """ + The Master-Architect Orchestrator. + Decomposed successor to RagPipeline. + """ + + def __init__(self, context_manager: Optional[ContextManager] = None): + self.memory = context_manager or ContextManager() + self.stream = StreamProcessor() + + async def run( + self, + question: str, + context_chunks: List[Dict[str, Any]], + history: List[models.Message], + llm_provider, + prompt_service = None, + tool_service = None, + tools: List[Dict[str, Any]] = None, + mesh_context: str = "", + db = None, + user_id: Optional[str] = None, + sync_workspace_id: Optional[str] = None, + session_id: Optional[int] = None, + feature_name: str = "chat", + prompt_slug: str = "rag-pipeline" + ): + # 1. Initialize Context & Messages + messages = self.memory.prepare_initial_messages( + question, context_chunks, history, feature_name, mesh_context, sync_workspace_id, + db=db, user_id=user_id, prompt_service=prompt_service, prompt_slug=prompt_slug + ) + + # 2. Setup Mesh Observation + mesh_bridge = queue.Queue() + registry = self._get_registry(tool_service) + if registry and user_id: + registry.subscribe_user(user_id, mesh_bridge) + + # 3. Setup Guards + safety = SafetyGuard(db, session_id) + + # 4. Main Autonomous Loop + try: + turn = 0 + while turn < 500: + turn += 1 + self.stream.reset_turn() + + # A. Cancellation / Memory check + if safety.check_cancellation(): + yield {"type": "reasoning", "content": "\n> **🛑 User Interruption:** Terminating loop.\n"} + return + messages = self.memory.compress_history(messages) + + # B. Turn Start Heartbeat + self._update_turn_marker(messages, turn) + yield {"type": "status", "content": f"Turn {turn}: architecting next step..."} + + # C. LLM Call + prediction = await self._call_llm(llm_provider, messages, tools) + if not prediction: + yield {"type": "error", "content": "LLM Provider failed to generate a response."} + return + + # D. Process Stream + accumulated_content = "" + accumulated_reasoning = "" + tool_calls_map = {} + + async for chunk in prediction: + if not chunk.choices: continue + delta = chunk.choices[0].delta + + # Native reasoning (O-series) + r = getattr(delta, "reasoning_content", None) or delta.get("reasoning_content") + if r: + accumulated_reasoning += r + yield {"type": "reasoning", "content": r} + + # Content & Thinking Tags + c = getattr(delta, "content", None) or delta.get("content") + if c: + async for event in self.stream.process_chunk(c, turn): + if event["type"] == "content": accumulated_content += event["content"] + if event["type"] == "reasoning": accumulated_reasoning += event["content"] + yield event + + # Tool delta accumulation + self._accumulate_tool_calls(delta, tool_calls_map) + + # E. Branch: Tools or Exit? + if not tool_calls_map: + # Watchdog Check + if safety.should_activate_watchdog(self._get_assistant(tool_service), sync_workspace_id): + yield {"type": "status", "content": "Watchdog: tasks remain. continuing..."} + messages.append({"role": "user", "content": "WATCHDOG: .ai_todo.md has open items. Please continue until all are marked [COMPLETED]."}) + continue + return # Natural exit + + # F. Execute Tools + processed_tc = list(tool_calls_map.values()) + if safety.detect_loop(processed_tc): + yield {"type": "reasoning", "content": "\n> **🚨 Loop Guard:** Repetitive plan detected. Retrying with warnings.\n"} + messages.append({"role": "user", "content": "LOOP GUARD: You are stuck. Change strategy."}) + continue + + yield {"type": "status", "content": f"Architect analysis complete. Dispatching {len(processed_tc)} tools..."} + + # Append assistant message to history + messages.append(self._format_assistant_msg(accumulated_content, accumulated_reasoning, processed_tc)) + + # Run parallel execution + executor = ToolExecutor(tool_service, user_id, db, sync_workspace_id) + async for event in executor.run_tools(processed_tc, safety, mesh_bridge): + if "role" in event: # It's a tool result for history + messages.append(event) + else: + yield event + + finally: + if registry and user_id: + registry.unsubscribe_user(user_id, mesh_bridge) + + # --- Internal Helpers --- + + def _get_registry(self, tool_service): + if tool_service and hasattr(tool_service, "_services"): + orchestrator = getattr(tool_service._services, "orchestrator", None) + return getattr(orchestrator, "registry", None) + return None + + def _get_assistant(self, tool_service): + if tool_service and hasattr(tool_service, "_services"): + orchestrator = getattr(tool_service._services, "orchestrator", None) + return getattr(orchestrator, "assistant", None) + return None + + def _update_turn_marker(self, messages, turn): + if messages[0]["role"] == "system": + base = messages[0]["content"].split("[System:")[0].strip() + messages[0]["content"] = base + f"\n\n[System: Current Turn: {turn}]" + + async def _call_llm(self, llm_provider, messages, tools): + kwargs = {"stream": True} + if tools: + kwargs["tools"] = tools + kwargs["tool_choice"] = "auto" + try: + return await llm_provider.acompletion(messages=messages, **kwargs) + except Exception as e: + logging.error(f"[Architect] LLM Exception: {e}") + return None + + def _accumulate_tool_calls(self, delta, t_map): + tc_deltas = getattr(delta, "tool_calls", None) or delta.get("tool_calls") + if not tc_deltas: return + for tcd in tc_deltas: + idx = tcd.index + if idx not in t_map: + t_map[idx] = tcd + else: + if getattr(tcd, "id", None): t_map[idx].id = tcd.id + if tcd.function.name: t_map[idx].function.name = tcd.function.name + if tcd.function.arguments: t_map[idx].function.arguments += tcd.function.arguments + + def _format_assistant_msg(self, content, reasoning, tool_calls): + clean_tc = [] + for tc in tool_calls: + clean_tc.append({ + "id": tc.id, "type": "function", + "function": {"name": tc.function.name, "arguments": tc.function.arguments} + }) + msg = {"role": "assistant", "content": content or None, "tool_calls": clean_tc} + if reasoning: msg["reasoning_content"] = reasoning + return msg diff --git a/ai-hub/app/core/orchestration/body.py b/ai-hub/app/core/orchestration/body.py new file mode 100644 index 0000000..a94cb77 --- /dev/null +++ b/ai-hub/app/core/orchestration/body.py @@ -0,0 +1,116 @@ +import asyncio +import logging +import json +import sys +from typing import List, Dict, Any, AsyncGenerator + +class ToolExecutor: + """Handles parallel tool dispatching and event drainage.""" + + def __init__(self, tool_service: Any, user_id: str, db: Any, sync_workspace_id: str): + self.tool_service = tool_service + self.user_id = user_id + self.db = db + self.sync_workspace_id = sync_workspace_id + self.event_queue = asyncio.Queue() + + async def _subagent_event_handler(self, event): + await self.event_queue.put(event) + + async def run_tools(self, tool_calls: List[Any], safety_guard: Any, mesh_bridge: Any) -> AsyncGenerator[Dict[str, Any], None]: + """Dispatches and monitors tools until all are complete or cancelled.""" + tool_tasks = [] + for tc in tool_calls: + func_name = tc.function.name + func_args = self._parse_args(tc) + + # Surface tool call for UI + for ev in self._yield_tool_details(func_name, func_args): + yield ev + yield {"type": "tool_start", "name": func_name, "args": func_args} + + # Create async task + task = asyncio.create_task( + self.tool_service.call_tool( + func_name, + func_args, + db=self.db, + user_id=self.user_id, + session_id=self.sync_workspace_id, + on_event=self._subagent_event_handler + ) + ) + tool_tasks.append((tc, task)) + + # --- Wait & Monitor loop --- + while True: + all_done = all(item[1].done() for item in tool_tasks) + + # Drain UI events (Thoughts, Mesh Observations) + while not self.event_queue.empty(): + ev = await self.event_queue.get() + if ev["type"] == "subagent_thought": + yield { + "type": "reasoning", + "content": f"\n\n> **🧠 Sub-Agent [{ev.get('node_id', 'Swarm')}]:** {ev.get('content')}\n\n" + } + + if mesh_bridge: + while not mesh_bridge.empty(): + try: + ev = mesh_bridge.get_nowait() + if ev["event"] == "mesh_observation": + yield { + "type": "reasoning", + "content": f"\n\n> **📡 Mesh Observation:** {ev.get('data', {}).get('message', 'Unspecified drift observed.')}\n\n" + } + except: break + + # Cancellation check + if safety_guard.check_cancellation(): + yield {"type": "status", "content": "Cancellation requested. Interrupting..."} + return # Should handle graceful task cancellation in future + + if all_done: + break + await asyncio.sleep(0.1) + + # Yield results for AI history + for tc, task in tool_tasks: + func_name = tc.function.name + try: + result = await task + except Exception as e: + result = {"success": False, "error": f"Tool crashed: {str(e)}"} + + yield {"type": "tool_result", "name": func_name, "result": result} + yield { + "role": "tool", + "tool_call_id": tc.id, + "name": func_name, + "content": self._truncate_result(result) + } + + def _parse_args(self, tc) -> Dict[str, Any]: + try: + args = json.loads(tc.function.arguments) + except: + args = {} + # Parallel PTY Optimization preserved + if tc.function.name == "mesh_terminal_control" and "session_id" not in args: + args["session_id"] = f"subagent-{tc.id[:8]}" + return args + + def _yield_tool_details(self, name, args): + lines = [f"🔧 **Tool Call: `{name}`**"] + if args.get("command"): lines.append(f"- Command: `{args['command']}`") + if args.get("node_id"): lines.append(f"- Node: `{args['node_id']}`") + if args.get("node_ids"): lines.append(f"- Nodes: `{', '.join(args['node_ids'])}`") + yield {"type": "reasoning", "content": "\n" + "\n".join(lines) + "\n"} + + def _truncate_result(self, result: Any) -> str: + s = json.dumps(result) if isinstance(result, dict) else str(result) + limit = 8000 + if len(s) > limit: + return s[:limit] + f"\n...[truncated {len(s)-limit} chars]" + return s diff --git a/ai-hub/app/core/orchestration/guards.py b/ai-hub/app/core/orchestration/guards.py new file mode 100644 index 0000000..7602d67 --- /dev/null +++ b/ai-hub/app/core/orchestration/guards.py @@ -0,0 +1,51 @@ +import os +import logging +from typing import List, Tuple, Dict, Any, Optional +from sqlalchemy.orm import Session +from app.db import models + +class SafetyGuard: + """Handles loop detection, cancellation checks, and task watchdogs.""" + + def __init__(self, db: Optional[Session] = None, session_id: Optional[int] = None): + self.db = db + self.session_id = session_id + self.action_history: List[Tuple] = [] + + def check_cancellation(self) -> bool: + """Returns True if user has requested cancellation.""" + if not self.db or not self.session_id: + return False + try: + # Fresh query to bypass stale cache + session = self.db.query(models.Session).filter(models.Session.id == self.session_id).first() + if session: + self.db.refresh(session) + return session.is_cancelled + except Exception as e: + logging.warning(f"[SafetyGuard] Cancellation check failed: {e}") + return False + + def detect_loop(self, tool_calls: List[Any]) -> bool: + """Returns True if the exact same tool set repeats 3 times sequentially.""" + current_sig = tuple(sorted([(tc.function.name, tc.function.arguments) for tc in tool_calls])) + self.action_history.append(current_sig) + if len(self.action_history) > 10: + self.action_history.pop(0) + + return self.action_history.count(current_sig) >= 3 + + def should_activate_watchdog(self, assistant: Any, sync_workspace_id: str) -> bool: + """Checks if .ai_todo.md has unchecked items.""" + if not assistant or not assistant.mirror or not sync_workspace_id: + return False + try: + workspace = assistant.mirror.get_workspace_path(sync_workspace_id) + todo_path = os.path.join(workspace, ".ai_todo.md") + if os.path.exists(todo_path): + with open(todo_path, "r") as f: + content = f.read() + return "[ ]" in content + except Exception as e: + logging.warning(f"[SafetyGuard] Watchdog error: {e}") + return False diff --git a/ai-hub/app/core/orchestration/memory.py b/ai-hub/app/core/orchestration/memory.py new file mode 100644 index 0000000..624a103 --- /dev/null +++ b/ai-hub/app/core/orchestration/memory.py @@ -0,0 +1,137 @@ +from typing import List, Dict, Any, Optional, Callable +import logging +from sqlalchemy.orm import Session +from app.db import models + +PROMPT_TEMPLATE = """You are the Cortex AI Assistant, the **Master-Architect** of a decentralized agent mesh. + +## 🏗️ Orchestration Strategy (The Master-Worker Pattern): +- **Master Control**: YOU are the brain. You define every project step based on intelligence reports from your field agents. +- **Atomic Operations**: Assign ONLY atomic, self-contained tasks. +- **Intelligence Reports**: Sub-agents will return a `REPORT` summarizing their findings. Use this distilled intelligence as your primary source of truth for the NEXT step. +- **Visible Reasoning**: You MUST provide textual analysis/strategy at the start of EVERY turn. + +## 🚀 Execution Mandate: +- **Perpetual Pursuit**: DO NOT stop until the user's objective is achieved. +- **No Idle Turns**: If a sub-goal is reached, immediately pivot to the next atomic task. +- **NO SILENT ACTIONS**: You are **FORBIDDEN** from calling a tool without first providing at least one sentence of **plain text** analysis/strategy. + +## ✍️ Interaction Format (MANDATORY): +1. **BRIDGE ANALYSIS (Plain Text)**: At the start of EVERY turn, write 1-2 sentences of auditable analysis. + - *Example*: "The previous report confirms the node is offline. I will check test-node-2 next." + - **Crucial**: This content MUST be outside of `` tags so it is visible to the user as your "Master Strategy". Use `` only for raw technical brainstorming. +2. **ACT**: Call the single atomic tool required for your plan. + +## 📂 Infrastructure & Ghost Mirror: +- **Node Sync Path**: All synced files are at `/tmp/cortex-sync/{{session_id}}/` on agent nodes. +- **Hub Mirror**: Use `mesh_file_explorer` with `session_id` to read/list files from the central mirror (~1ms speed). +- **Source of Truth**: Maintain `.ai_todo.md` in the workspace. Mark items `[COMPLETED]` only after verification. + +Infrastructure Context (Mesh): +{mesh_context} + +User Question: {question} + +Answer:""" + +VOICE_PROMPT_TEMPLATE = """You are a conversational voice assistant. +Keep your responses short, natural, and helpful. +Avoid using technical jargon or listing technical infrastructure details unless specifically asked. +Focus on being a friendly companion. + +Conversation History: +{chat_history} + +User Question: {question} + +Answer:""" + +class ContextManager: + """Handles prompt assembly and conversation history management.""" + + def __init__(self, history_formatter: Optional[Callable] = None, context_postprocessor: Optional[Callable] = None): + self.history_formatter = history_formatter or self._default_history_formatter + self.context_postprocessor = context_postprocessor or self._default_context_postprocessor + + def prepare_initial_messages(self, question: str, context_chunks: List[Any], history: List[models.Message], + feature_name: str, mesh_context: str, sync_workspace_id: Optional[str], + db: Optional[Session] = None, user_id: Optional[str] = None, + prompt_service: Any = None, prompt_slug: str = "rag-pipeline") -> List[Dict[str, str]]: + + history_text = self.history_formatter(history) + context_text = self.context_postprocessor(context_chunks) + + template = PROMPT_TEMPLATE if feature_name != "voice" else VOICE_PROMPT_TEMPLATE + + # dynamic prompt override + if prompt_service and db and user_id: + db_prompt = prompt_service.get_prompt_by_slug(db, prompt_slug, user_id) + if db_prompt: + template = db_prompt.content + + # Augmented mesh info + mesh_info = mesh_context + if sync_workspace_id: + mesh_info += f"\nActive Ghost Mirror Session ID: {sync_workspace_id}\n" + mesh_info += f"Instruction: When using `mesh_file_explorer` or `mesh_sync_control`, you MUST use this exactly as the `session_id` (or leave it blank/use 'current' which I will resolve for you).\n" + + system_prompt = template.format( + question=question, + context=context_text, + chat_history=history_text, + mesh_context=mesh_info + ) + + return [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": question} + ] + + def compress_history(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Summarizes middle turns to preserve context window.""" + if len(messages) < 60: + return messages + + logging.info(f"[ContextManager] Memory overflow ({len(messages)}). Compressing middle history...") + system_msg = messages[0] + pivot_idx = len(messages) - 20 + while pivot_idx > 1 and messages[pivot_idx].get("role") != "user": + pivot_idx -= 1 + + if pivot_idx <= 1: pivot_idx = 10 + + middle_to_compress = messages[1:pivot_idx] + recent_msg = messages[pivot_idx:] + + summary_lines = [] + for m in middle_to_compress: + role = m.get("role", "unknown") + content = m.get("content") or "" + if role == "user": + summary_lines.append(f"User: {content[:100]}") + elif role == "assistant" and not m.get("tool_calls"): + summary_lines.append(f"AI: {content[:100]}") + elif role == "tool": + summary_lines.append(f"Result: {content[:100]}") + + summary_text = "\n".join(summary_lines) + + history_msg = { + "role": "user", + "content": f"[SYSTEM: Historical Context Summary]\n\n{summary_text}\n\n[End Summary.]" + } + ack_msg = { + "role": "assistant", + "content": "Understood. Proceeding with current state." + } + + return [system_msg, history_msg, ack_msg] + recent_msg + + def _default_context_postprocessor(self, contexts: List[str]) -> str: + return "\n\n".join(contexts) or "No context provided." + + def _default_history_formatter(self, history: List[models.Message]) -> str: + return "\n".join( + f"{'Human' if msg.sender == 'user' else 'Assistant'}: {msg.content}" + for msg in history + ) diff --git a/ai-hub/app/core/orchestration/stream.py b/ai-hub/app/core/orchestration/stream.py new file mode 100644 index 0000000..4cfb1d6 --- /dev/null +++ b/ai-hub/app/core/orchestration/stream.py @@ -0,0 +1,87 @@ +import re +from typing import AsyncGenerator, Dict, Any, Optional + +class StreamProcessor: + """Handles logical processing of LLM streams: thinking tags and content routing.""" + + def __init__(self): + self._in_thinking_tag = False + self.tag_buffer = "" + self.header_sent = False + + def reset_turn(self): + self.header_sent = False + # Note: _in_thinking_tag and tag_buffer might need to persist if the turn ends abruptly + # but usually LLM closes tags before tool calls. + + async def process_chunk(self, content_chunk: str, turn: int) -> AsyncGenerator[Dict[str, Any], None]: + """Processes a raw content chunk, yields UI events (content, reasoning).""" + self.tag_buffer += content_chunk + turn_header = f"---\n### 🛰️ **[Turn {turn}] Master-Architect Analysis**\n" + + while self.tag_buffer: + if not self._in_thinking_tag: + lower_buf = self.tag_buffer.lower() + start_tag_idx = lower_buf.find("") + + if start_tag_idx != -1: + strategy_part = self.tag_buffer[:start_tag_idx] + if strategy_part: + modified_strategy = self._apply_turn_header(strategy_part, turn_header) + if modified_strategy: + yield {"type": "content", "content": modified_strategy} + + self._in_thinking_tag = True + self.tag_buffer = self.tag_buffer[start_tag_idx + len(""):] + continue + + potential_idx = self.tag_buffer.find("<") + if potential_idx != -1 and "".startswith(self.tag_buffer[potential_idx:].lower()): + strategy_part = self.tag_buffer[:potential_idx] + if strategy_part: + modified_strategy = self._apply_turn_header(strategy_part, turn_header) + if modified_strategy: + yield {"type": "content", "content": modified_strategy} + self.tag_buffer = self.tag_buffer[potential_idx:] + break + else: + strategy_part = self._apply_turn_header(self.tag_buffer, turn_header) + if strategy_part: + yield {"type": "content", "content": strategy_part} + self.tag_buffer = "" + + else: + lower_buf = self.tag_buffer.lower() + end_tag_idx = lower_buf.find("") + + if end_tag_idx != -1: + reasoning_part = self.tag_buffer[:end_tag_idx] + if reasoning_part: + yield {"type": "reasoning", "content": reasoning_part} + + self._in_thinking_tag = False + self.tag_buffer = self.tag_buffer[end_tag_idx + len(""):] + continue + + potential_idx = self.tag_buffer.find("<") + if potential_idx != -1 and "".startswith(self.tag_buffer[potential_idx:].lower()): + reasoning_part = self.tag_buffer[:potential_idx] + if reasoning_part: + yield {"type": "reasoning", "content": reasoning_part} + self.tag_buffer = self.tag_buffer[potential_idx:] + break + else: + yield {"type": "reasoning", "content": self.tag_buffer} + self.tag_buffer = "" + + def _apply_turn_header(self, text: str, header: str) -> Optional[str]: + if not text or self.header_sent: + return text + + # Idempotency check + has_existing = re.search(r"Turn\s+\d+|Master-Architect\s+Analysis|###\s+🛰️", text, re.IGNORECASE) + if not has_existing: + text = "\n\n" + header + text + + self.header_sent = True + return text diff --git a/ai-hub/app/core/orchestration/validator.py b/ai-hub/app/core/orchestration/validator.py new file mode 100644 index 0000000..e78d085 --- /dev/null +++ b/ai-hub/app/core/orchestration/validator.py @@ -0,0 +1,44 @@ +import tiktoken +import json +from typing import Dict, Any + +class TokenLimitExceededError(Exception): + """Custom exception raised when the input payload exceeds the token limit.""" + + def __init__(self, message: str, token_count: int, token_limit: int): + super().__init__(message) + self.token_count = token_count + self.token_limit = token_limit + + +class Validator: + def __init__(self, token_limit: int = 100000, encoding_name: str = "cl100k_base"): + """ + Initializes the Validator with a token limit and encoding. + + Args: + token_limit (int): The maximum number of tokens allowed. + encoding_name (str): The name of the tokenizer encoding to use. + """ + self.token_limit = token_limit + self.encoding = tiktoken.get_encoding(encoding_name=encoding_name) + + def precheck_tokensize(self, input_payload: Dict[str, Any]) -> None: + """ + Checks if the input payload's token count exceeds the configured limit. + + Args: + input_payload (Dict[str, Any]): The payload to be checked. + + Raises: + TokenLimitExceededError: If the payload's token count is too high. + """ + payload_string: str = json.dumps(input_payload) + token_count: int = len(self.encoding.encode(payload_string)) + + if token_count > self.token_limit: + raise TokenLimitExceededError( + f"Input payload token count ({token_count}) exceeds the limit of {self.token_limit} tokens.", + token_count, + self.token_limit, + ) \ No newline at end of file diff --git a/ai-hub/app/core/pipelines/__init__.py b/ai-hub/app/core/pipelines/__init__.py deleted file mode 100644 index 3fbb1fd..0000000 --- a/ai-hub/app/core/pipelines/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# This file can be left empty. diff --git a/ai-hub/app/core/pipelines/rag_pipeline.py b/ai-hub/app/core/pipelines/rag_pipeline.py deleted file mode 100644 index d8dcb73..0000000 --- a/ai-hub/app/core/pipelines/rag_pipeline.py +++ /dev/null @@ -1,342 +0,0 @@ -import logging -from typing import List, Dict, Any, Optional, Callable -from sqlalchemy.orm import Session - -from app.db import models - -# Define a default prompt template outside the class or as a class constant -# This is inferred from the usage in the provided diff. -PROMPT_TEMPLATE = """You are the Cortex AI Assistant, a powerful orchestrator of decentralized agent nodes. - -## Architecture Highlights: -- You operate within a secure, gRPC-based mesh of Agent Nodes. -- You can execute shell commands, browse the web, and manage files on these nodes. -- You use 'skills' to interact with the physical world. - -{mesh_context} - -## Thinking and Planning: -If the user's request is complex, multi-step, or requires infrastructure analysis, you MUST explicitly think and plan before acting or answering. -Use a `` tag to outline your logic, goals, and steps. This will be extracted and shown to the user as an internal thought trace in a separate UI panel. -DO NOT put your plans, scratchpad thoughts, or reasoning in the final text. Keep all of that strictly inside the `` tags. -The text outside the `` tag should ONLY contain the final result or report. - -## Task: -Generate a natural and context-aware answer using the provided knowledge, conversation history, and available tools. - -Relevant excerpts from the knowledge base: -{context} - -Conversation History: -{chat_history} - -User Question: {question} - -Answer:""" - -VOICE_PROMPT_TEMPLATE = """You are a conversational voice assistant. -Keep your responses short, natural, and helpful. -Avoid using technical jargon or listing technical infrastructure details unless specifically asked. -Focus on being a friendly companion. - -Conversation History: -{chat_history} - -User Question: {question} - -Answer:""" - -class RagPipeline: - """ - A flexible and extensible RAG pipeline updated to remove DSPy dependency. - """ - - def __init__( - self, - context_postprocessor: Optional[Callable[[List[str]], str]] = None, - history_formatter: Optional[Callable[[List[models.Message]], str]] = None, - response_postprocessor: Optional[Callable[[str], str]] = None, - ): - self.context_postprocessor = context_postprocessor or self._default_context_postprocessor - self.history_formatter = history_formatter or self._default_history_formatter - self.response_postprocessor = response_postprocessor - # Internal state for manual tag extraction - self._in_thinking_tag = False - - async def forward( - self, - question: str, - context_chunks: List[Dict[str, Any]], - history: List[models.Message], - llm_provider = None, - prompt_service = None, - tool_service = None, - tools: List[Dict[str, Any]] = None, - mesh_context: str = "", - db: Optional[Session] = None, - user_id: Optional[str] = None, - feature_name: str = "chat", - prompt_slug: str = "rag-pipeline" - ): - logging.debug(f"[RagPipeline.forward] Received question: '{question}'") - - if not llm_provider: - raise ValueError("LLM Provider is required.") - - history_text = self.history_formatter(history) - context_text = self.context_postprocessor(context_chunks) - - template = PROMPT_TEMPLATE - if feature_name == "voice": - template = VOICE_PROMPT_TEMPLATE - - if prompt_service and db and user_id: - db_prompt = prompt_service.get_prompt_by_slug(db, prompt_slug, user_id) - if db_prompt: - template = db_prompt.content - - system_prompt = template.format( - question=question, - context=context_text, - chat_history=history_text, - mesh_context=mesh_context - ) - - # 1. Prepare initial messages - messages = [ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": question} - ] - - import asyncio - import time - - # 2. Agentic Tool Loop (Max 8 turns to give multi-step tasks enough headroom) - for turn in range(8): - request_kwargs = {"stream": True} - if tools: - request_kwargs["tools"] = tools - request_kwargs["tool_choice"] = "auto" - - model = getattr(llm_provider, "model_name", "unknown") - msg_lens = [] - for m in messages: - content = "" - if hasattr(m, "content") and m.content is not None: - content = m.content - elif isinstance(m, dict): - content = m.get("content") or "" - msg_lens.append(len(content)) - - total_chars = sum(msg_lens) - - logging.info(f"[RagPipeline] Turn {turn+1} starting (STREAMING). Model: {model}, Messages: {len(messages)}, Total Chars: {total_chars}") - - # LiteLLM streaming call - prediction = await llm_provider.acompletion(messages=messages, **request_kwargs) - - accumulated_content = "" - accumulated_reasoning = "" - tool_calls_map = {} # index -> tc object - - async for chunk in prediction: - if not chunk.choices: continue - delta = chunk.choices[0].delta - - # A. Handle Reasoning (Thinking) - # Native reasoning content (from DeepSeek or OpenAI O-series) - reasoning = getattr(delta, "reasoning_content", None) or delta.get("reasoning_content") - if reasoning: - accumulated_reasoning += reasoning - yield {"type": "reasoning", "content": reasoning} - - # B. Handle Content & Manual Thinking Tags - content = getattr(delta, "content", None) or delta.get("content") - if content: - # Detect and tags in the stream - if "" in content: - self._in_thinking_tag = True - parts = content.split("", 1) - # Yield any text before the tag as normal content - if parts[0]: - accumulated_content += parts[0] - yield {"type": "content", "content": parts[0]} - # The rest starts the thinking block - content = parts[1] - - if "" in content: - parts = content.split("", 1) - # The text before belongs to reasoning - if parts[0]: - accumulated_reasoning += parts[0] - yield {"type": "reasoning", "content": parts[0]} - self._in_thinking_tag = False - # The text after is normal content again - content = parts[1] - - if self._in_thinking_tag: - accumulated_reasoning += content - yield {"type": "reasoning", "content": content} - else: - if content: - accumulated_content += content - yield {"type": "content", "content": content} - - # C. Handle Tool Calls - tool_calls = getattr(delta, "tool_calls", None) or delta.get("tool_calls") - if tool_calls: - for tc_delta in tool_calls: - idx = tc_delta.index - if idx not in tool_calls_map: - tool_calls_map[idx] = tc_delta - else: - # Accumulate arguments - if tc_delta.function.arguments: - tool_calls_map[idx].function.arguments += tc_delta.function.arguments - - # Process completed turn - if not tool_calls_map: - # If no tools, this is the final answer for this forward pass. - return - - # 3. Parallel dispatch logic for tools - processed_tool_calls = list(tool_calls_map.values()) - - # Reconstruct the tool call list and message object for the next turn - assistant_msg = { - "role": "assistant", - "content": accumulated_content or None, - "tool_calls": processed_tool_calls - } - if accumulated_reasoning: - assistant_msg["reasoning_content"] = accumulated_reasoning - - messages.append(assistant_msg) - - # A. Dispatch all tool calls simultaneously - event_queue = asyncio.Queue() - async def subagent_event_handler(event): - await event_queue.put(event) - - tool_tasks = [] - for tc in processed_tool_calls: - func_name = tc.function.name - func_args = {} - try: - import json - func_args = json.loads(tc.function.arguments) - except: pass - - # --- M7 Parallel PTY Optimization --- - # If the tool is terminal control and no session is provided, - # use a unique session ID per SUBAGENT task to avoid PTY SERIALIZATION. - if func_name == "mesh_terminal_control" and "session_id" not in func_args: - func_args["session_id"] = f"subagent-{tc.id[:8]}" - - yield {"type": "status", "content": f"AI decided to use tool: {func_name}"} - logging.info(f"[🔧] Agent calling tool (PARALLEL): {func_name} with {func_args}") - - # Surface the tool call details as a reasoning event for full transparency - tool_detail_lines = [f"🔧 **Tool Call: `{func_name}`**"] - if func_args.get("command"): - tool_detail_lines.append(f"- Command: `{func_args['command']}`") - if func_args.get("node_id"): - tool_detail_lines.append(f"- Node: `{func_args['node_id']}`") - if func_args.get("node_ids"): - tool_detail_lines.append(f"- Nodes: `{', '.join(func_args['node_ids'])}`") - if func_args.get("task_map"): - tool_detail_lines.append(f"- Watching tasks: `{func_args['task_map']}`") - yield {"type": "reasoning", "content": "\n" + "\n".join(tool_detail_lines) + "\n"} - - if tool_service: - # Notify UI about tool execution start - yield {"type": "tool_start", "name": func_name, "args": func_args} - - # Create an async task for each tool call, passing the event handler - tool_tasks.append(asyncio.create_task( - tool_service.call_tool(func_name, func_args, db=db, user_id=user_id, on_event=subagent_event_handler) - )) - else: - # Treat as failure immediately if no service - tool_tasks.append(asyncio.sleep(0, result={"success": False, "error": "Tool service not available"})) - - # B. HEARTBEAT WAIT: Wait for all sub-agent tasks to fulfill in parallel - wait_start = time.time() - if tool_tasks: - while True: - all_done = all(t.done() for t in tool_tasks) - - # Drain sub-agent thought events - while not event_queue.empty(): - ev = event_queue.get_nowait() - if ev["type"] == "subagent_thought": - yield { - "type": "reasoning", - "content": f"\n\n> **🧠 Sub-Agent [{ev.get('node_id', 'Swarm')}]:** {ev.get('content')}\n\n" - } - - if all_done: - # Drain one final time after tasks complete to catch last-batch thoughts - while not event_queue.empty(): - ev = event_queue.get_nowait() - if ev["type"] == "subagent_thought": - yield { - "type": "reasoning", - "content": f"\n\n> **🧠 Sub-Agent [{ev.get('node_id', 'Swarm')}]:** {ev.get('content')}\n\n" - } - break - - elapsed = int(time.time() - wait_start) - yield {"type": "status", "content": f"Waiting for nodes result... ({elapsed}s)"} - await asyncio.sleep(0.5) - - # C. Collect results and populate history turn - for i, task in enumerate(tool_tasks): - tc = processed_tool_calls[i] - func_name = tc.function.name - result = await task - - # Stream the result back so UI can see "behind the scenes" - yield {"type": "tool_result", "name": func_name, "result": result} - - # Serialize result, but TRUNCATE to keep context manageable. - # Large iperf3/shell outputs can cause LLMs to return empty responses. - MAX_TOOL_RESULT_CHARS = 8000 - result_str = json.dumps(result) if isinstance(result, dict) else str(result) - if len(result_str) > MAX_TOOL_RESULT_CHARS: - result_str = result_str[:MAX_TOOL_RESULT_CHARS] + f"\n...[truncated {len(result_str) - MAX_TOOL_RESULT_CHARS} chars]" - - messages.append({ - "role": "tool", - "tool_call_id": tc.id, - "name": func_name, - "content": result_str - }) - - # --- Loop finished without return --- - yield {"type": "error", "content": "Agent loop reached maximum turns (5) without a final response."} - - - def _build_prompt(self, context, history, question): - return f"""Generate a natural and context-aware answer to the user's question using the provided knowledge and conversation history. - -Relevant excerpts from the knowledge base: -{context} - -Conversation History: -{history} - -User Question: {question} - -Answer:""" - - # Default context processor: concatenate chunks - def _default_context_postprocessor(self, contexts: List[str]) -> str: - return "\n\n".join(contexts) or "No context provided." - - # Default history formatter: simple speaker prefix - def _default_history_formatter(self, history: List[models.Message]) -> str: - return "\n".join( - f"{'Human' if msg.sender == 'user' else 'Assistant'}: {msg.content}" - for msg in history - ) \ No newline at end of file diff --git a/ai-hub/app/core/pipelines/utils.py b/ai-hub/app/core/pipelines/utils.py deleted file mode 100644 index 8cb86bb..0000000 --- a/ai-hub/app/core/pipelines/utils.py +++ /dev/null @@ -1,11 +0,0 @@ -import json -import os -from datetime import datetime - -def log_status(msg: str) -> None: - log_dir = "ai_logs" - os.makedirs(log_dir, exist_ok=True) - timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") - filename = os.path.join(log_dir, f"status_{timestamp}.txt") - with open(filename, "w", encoding="utf-8") as f: - f.write(msg) \ No newline at end of file diff --git a/ai-hub/app/core/pipelines/validator.py b/ai-hub/app/core/pipelines/validator.py deleted file mode 100644 index e78d085..0000000 --- a/ai-hub/app/core/pipelines/validator.py +++ /dev/null @@ -1,44 +0,0 @@ -import tiktoken -import json -from typing import Dict, Any - -class TokenLimitExceededError(Exception): - """Custom exception raised when the input payload exceeds the token limit.""" - - def __init__(self, message: str, token_count: int, token_limit: int): - super().__init__(message) - self.token_count = token_count - self.token_limit = token_limit - - -class Validator: - def __init__(self, token_limit: int = 100000, encoding_name: str = "cl100k_base"): - """ - Initializes the Validator with a token limit and encoding. - - Args: - token_limit (int): The maximum number of tokens allowed. - encoding_name (str): The name of the tokenizer encoding to use. - """ - self.token_limit = token_limit - self.encoding = tiktoken.get_encoding(encoding_name=encoding_name) - - def precheck_tokensize(self, input_payload: Dict[str, Any]) -> None: - """ - Checks if the input payload's token count exceeds the configured limit. - - Args: - input_payload (Dict[str, Any]): The payload to be checked. - - Raises: - TokenLimitExceededError: If the payload's token count is too high. - """ - payload_string: str = json.dumps(input_payload) - token_count: int = len(self.encoding.encode(payload_string)) - - if token_count > self.token_limit: - raise TokenLimitExceededError( - f"Input payload token count ({token_count}) exceeds the limit of {self.token_limit} tokens.", - token_count, - self.token_limit, - ) \ No newline at end of file diff --git a/ai-hub/app/core/services/node_registry.py b/ai-hub/app/core/services/node_registry.py index 36d9c4d..3606e70 100644 --- a/ai-hub/app/core/services/node_registry.py +++ b/ai-hub/app/core/services/node_registry.py @@ -292,10 +292,13 @@ """ with self._lock: node = self._nodes.get(node_id) - user_id = node.user_id if node else "" + user_id = node.user_id if node else data.get("user_id", "") if isinstance(data, dict) else "" node_qs = list(self._node_listeners.get(node_id, [])) user_qs = list(self._user_listeners.get(user_id, [])) if user_id else [] + if user_id and not user_qs and event_type in ["node_online", "node_offline"]: + logger.debug(f"[Registry] emit({event_type}) for node {node_id}: No user listeners found for user {user_id}") + event = { "event": event_type, "label": EVENT_TYPES.get(event_type, event_type), diff --git a/ai-hub/app/core/services/rag.py b/ai-hub/app/core/services/rag.py index f37cd5b..34797be 100644 --- a/ai-hub/app/core/services/rag.py +++ b/ai-hub/app/core/services/rag.py @@ -5,7 +5,7 @@ from app.core.retrievers.faiss_db_retriever import FaissDBRetriever from app.core.retrievers.base_retriever import Retriever from app.core.providers.factory import get_llm_provider -from app.core.pipelines.rag_pipeline import RagPipeline +from app.core.orchestration import Architect class RAGService: """ @@ -86,7 +86,7 @@ if self.faiss_retriever: context_chunks.extend(self.faiss_retriever.retrieve_context(query=prompt, db=db)) - rag_pipeline = RagPipeline() + architect = Architect() tools = [] if self.tool_service: @@ -159,8 +159,8 @@ full_answer = "" full_reasoning = "" - # Stream from pipeline - async for event in rag_pipeline.forward( + # Stream from specialized Architect + async for event in architect.run( question=prompt, history=session.messages, context_chunks = context_chunks, @@ -171,6 +171,8 @@ mesh_context = mesh_context, db = db, user_id = user_id or session.user_id, + sync_workspace_id = session.sync_workspace_id, + session_id = session_id, feature_name = session.feature_name, prompt_slug = "rag-pipeline" ): diff --git a/ai-hub/app/core/services/sub_agent.py b/ai-hub/app/core/services/sub_agent.py index f6999d1..399deb5 100644 --- a/ai-hub/app/core/services/sub_agent.py +++ b/ai-hub/app/core/services/sub_agent.py @@ -2,6 +2,7 @@ import time import logging import json +from typing import Optional, List, Dict, Any from app.protos import agent_pb2 logger = logging.getLogger(__name__) @@ -33,21 +34,53 @@ self.start_time = time.time() self.status = "RUNNING" - # If AI-monitoring is disabled or not enough context, fallback to standard execution - if not self.llm or not self.assistant or not self.subagent_system_prompt: - return await self._run_standard() - - return await self._run_ai_powered() + # Every Sub-Agent task is now strictly atomic. + # The AI 'Observe-Think-Act' loop happens in the Main RagPipeline. + return await self._run_standard() async def _run_standard(self): - """Legacy blocking execution with simple retry logic.""" + """Atomic execution with monitoring and simple retry logic.""" + node_id = self.args.get("node_id") or "swarm" + + # Improved task description logic + cmd = self.args.get("cmd") or self.args.get("command") + action = self.args.get("action") + path = self.args.get("path") + + if cmd: + item = cmd + elif action and path: + item = f"{action} {path}" + elif action: + item = str(action) + else: + item = self.name # Fallback to skill name if no command/action found + + # Phase 1: Dispatch + if self.on_event: + # Descriptive task name logic to reduce "Starting atomic step: ifconfig" noise + desc = f"Performing {self.name}" + if "command" in self.args or "cmd" in self.args: + desc = f"Running: {self.args.get('command') or self.args.get('cmd')}" + elif "path" in self.args: + desc = f"Inspecting: {self.args['path']}" + elif "action" in self.args: + desc = f"Executing {self.args['action']}" + + await self.on_event({"type": "subagent_thought", "node_id": node_id, "content": f"✅ {desc}"}) + for attempt in range(self.retries + 1): try: + # Dispatch task via assistant (which handles gRPC signing/sending) self.result = await asyncio.to_thread(self.task_fn, **self.args) + + # If it's a long-running task (Wait-Pending), monitor it with heartbeats + if isinstance(self.result, dict) and self.result.get("status") == "TIMEOUT_PENDING": + self.result = await self._monitor_atomic_task(self.result, node_id) + if isinstance(self.result, dict) and self.result.get("error"): err_msg = str(self.result.get("error")).lower() - is_busy = "busy" in err_msg - if is_busy or any(x in err_msg for x in ["timeout", "offline", "disconnected", "capacity", "rejected"]): + if any(x in err_msg for x in ["timeout", "offline", "disconnected", "capacity", "rejected"]): if attempt < self.retries: backoff = (attempt + 1) * 3 self.status = f"RETRYING ({attempt+1}/{self.retries})" @@ -56,247 +89,95 @@ self.status = "COMPLETED" break except Exception as e: - logger.error(f"SubAgent {self.name} execution error: {e}") + logger.error(f"SubAgent {self.name} error: {e}") self.error = str(e) if attempt < self.retries: self.status = f"ERROR_RETRYING ({attempt+1}/{self.retries})" await asyncio.sleep(2) else: self.status = "FAILED" + self.end_time = time.time() - return self.result + + # Phase 2: Completion & Intelligence Report + summary = None + if self.llm and self.result: + summary = await self._generate_summary() + + if self.on_event: + # Emit to UI + if summary: + rep = f"🧠 {summary}" + else: + rep = "✅ Step Finished." if not self.error else f"❌ Step Failed: {self.error}" + await self.on_event({"type": "subagent_thought", "node_id": node_id, "content": rep}) - async def _run_ai_powered(self): - """AI-powered 'Observe-Think-Act' loop for per-node task management.""" - logger.info(f"[🤖 SubAgent] Starting AI-powered monitoring for {self.name}") - - # 1. Initiate task with no_abort=True and respect requested timeout for init - requested_timeout = int(self.args.get("timeout", 5)) - init_timeout = min(5, requested_timeout) if requested_timeout > 0 else 5 - init_args = {**self.args, "no_abort": True, "timeout": init_timeout} - node_id = init_args.get("node_id") or "swarm" - + # --- Final Intelligence Handoff --- + # We ALWAYS return a string report for the Main AI to ensure it can "read" the outcome. + report_lines = [] + if summary: + report_lines.append(f"REPORT: {summary}") + else: + # Fallback if AI summary is unavailable + raw_snippet = str(self.result)[:1000] if self.result else "No output returned." + report_lines.append(f"REPORT: (Auto-Summary Unavailable) Raw outcome: {raw_snippet}") + + report_lines.append(f"STATUS: {self.status}") + if self.error: + report_lines.append(f"ERROR: {self.error}") + + return "\n".join(report_lines) + + async def _generate_summary(self) -> Optional[str]: + """Uses LLM to summarize logs, file system changes, or errors into a single report sentence.""" try: - # Emit a 'start' thought immediately for every dispatch - start_msg = f"🚀 Dispatching: `{self.args.get('command', '?')}` on `{node_id}` (init_timeout={init_timeout}s)" - logger.info(f" [🤖 SubAgent] {start_msg}") - if self.on_event: - await self.on_event({"type": "subagent_thought", "node_id": node_id, "content": start_msg}) - - res = await asyncio.to_thread(self.task_fn, **init_args) - - # Swarm handling (might return map of node_id -> result) - task_map = {} - if "task_id" in res: - task_map = {node_id: res["task_id"]} - elif isinstance(res, dict) and not any(k in res for k in ["stdout", "error"]): - # Looks like a swarm map - task_map = {nid: r.get("task_id") for nid, r in res.items() if r.get("status") == "TIMEOUT_PENDING"} - - if not task_map: - # Task completed immediately — emit a completion thought - status_icon = "✅" if not (isinstance(res, dict) and res.get("error")) else "❌" - stdout_preview = "" - if isinstance(res, dict): - raw = res.get("stdout") or res.get("error") or "" - stdout_preview = raw.strip()[-300:] if len(raw.strip()) > 300 else raw.strip() - done_msg = f"{status_icon} Quick-complete on `{node_id}`. Output preview: `{stdout_preview}`" - if self.on_event: - await self.on_event({"type": "subagent_thought", "node_id": node_id, "content": done_msg}) - logger.info(f"[🤖 SubAgent] Task finished immediately or failed.") - self.status = "COMPLETED" - self.result = res - self.end_time = time.time() - return res - - # 2. Intelligence Loop - max_loops = 50 - for loop in range(max_loops): - # A. FAST-PATH HEURISTIC: Check for prompts before sleeping/AI analysis - from app.core.grpc.services.assistant import TaskAssistant - peek = await asyncio.to_thread(self.assistant.wait_for_swarm, task_map, timeout=0, no_abort=True) - - heuristic_action = self._check_heuristics(peek) - if heuristic_action == "FINISH": - fast_path_reason = "Prompt detected - Finishing task." - logger.info(f" [⚡ Fast-Path] {fast_path_reason} {self.name}") - - # Emit to UI - for nid, tid in task_map.items(): - self.assistant.registry.emit(nid, "subagent_thought", fast_path_reason) - if tid: - self.assistant.journal.add_thought(tid, fast_path_reason) - - self.result = await asyncio.to_thread(self.assistant.wait_for_swarm, task_map, timeout=2) - self.status = "COMPLETED" - break - - # B. AI Analysis Loop - # Analyze with AI - analysis = await self._analyze_progress(peek) - action = analysis.get("action", "WAIT") - reason = analysis.get("reason", "") - - # C. Smart Wait: AI determines how long to wait before next tick - # Default to 5s if not specified, range 1s to 60s - wait_time = analysis.get("next_wait", 5) - try: - wait_time = max(1, min(60, int(wait_time))) - except: - wait_time = 5 - - logger.info(f" [🔍 AI] Loop {loop}: Action={action} | Wait={wait_time}s | {reason}") - - # Emit thinking process and record in journal - for nid, tid in task_map.items(): - msg = f"{reason} (Next check in {wait_time}s)" - self.assistant.registry.emit(nid, "subagent_thought", msg) - if self.on_event: - await self.on_event({"type": "subagent_thought", "node_id": nid, "content": msg}) - if tid: - self.assistant.journal.add_thought(tid, reason) - - if action == "FINISH" or all(r.get("status") not in ["RUNNING", "TIMEOUT_PENDING"] for r in peek.values()): - # One last blocking wait to gather final result if needed - self.result = await asyncio.to_thread(self.assistant.wait_for_swarm, task_map, timeout=10) - self.status = "COMPLETED" - # Emit completion summary - for nid in task_map: - res_preview = "" - if isinstance(self.result, dict): - node_res = self.result.get(nid, self.result) - raw = (node_res.get("stdout") or node_res.get("error") or "") if isinstance(node_res, dict) else str(node_res) - res_preview = raw.strip()[-400:] if len(raw.strip()) > 400 else raw.strip() - elapsed = int(time.time() - self.start_time) - done_msg = f"✅ Task complete on `{nid}` in {elapsed}s. Output:\n```\n{res_preview}\n```" - if self.on_event: - await self.on_event({"type": "subagent_thought", "node_id": nid, "content": done_msg}) - break - - if action == "EXECUTE": - cmd = analysis.get("command") - target_nid = analysis.get("node_id") or analysis.get("node_ids") - if cmd and target_nid: - exec_reason = f"Branching Execution: Running '{cmd}' on {target_nid}" - logger.info(f" [🚀 Branch] {exec_reason}") - - # Emit branch thinking - for nid in (target_nid if isinstance(target_nid, list) else [target_nid]): - self.assistant.registry.emit(nid, "subagent_thought", exec_reason) - if self.on_event: - await self.on_event({"type": "subagent_thought", "node_id": nid, "content": exec_reason}) - - # Dispatch new tasks - if isinstance(target_nid, list): - new_res = await asyncio.to_thread(self.assistant.dispatch_swarm, target_nid, cmd, no_abort=True) - for nid, r in new_res.items(): - if r.get("task_id"): - task_map[nid] = r["task_id"] - else: - new_res = await asyncio.to_thread(self.assistant.dispatch_single, target_nid, cmd, no_abort=True) - if new_res.get("task_id") and not cmd.startswith("!RAW:"): - task_map[target_nid] = new_res["task_id"] - - # Continue monitoring all tasks (old + new) - continue - - if action == "ABORT": - # Kill tasks - for nid, tid in task_map.items(): - await asyncio.to_thread(self.assistant.registry.get_node(nid).queue.put, - agent_pb2.ServerTaskMessage(task_cancel=agent_pb2.TaskCancelRequest(task_id=tid))) - self.status = "ABORTED" - self.result = {"error": "AI aborted task", "reason": analysis.get("reason")} - break - - # Dynamic sleep based on AI recommendation OR Edge Signal - await self._edge_aware_sleep(task_map, wait_time) - - self.status = "COMPLETED" - self.end_time = time.time() - return self.result - - except Exception as e: - logger.exception("[🤖 SubAgent] AI Intelligence Loop Crashed") - return await self._run_standard() - - async def _edge_aware_sleep(self, task_map, timeout): - """Wait for timeout OR until any node in task_map signals a prompt.""" - # Find all prompt events for our tasks - events = [] - with self.assistant.journal.lock: - for tid in task_map.values(): - if tid in self.assistant.journal.tasks: - events.append(self.assistant.journal.tasks[tid]["prompt_event"]) - - if not events: - await asyncio.sleep(timeout) - return - - def waiter(): - # Wait for ANY of the events to be set, or timeout - start = time.time() - while time.time() - start < timeout: - for ev in events: - if ev.is_set(): - return True - time.sleep(0.1) - return False - - # Run the multi-event wait in a thread to keep Hub event loop free - await asyncio.to_thread(waiter) - - def _check_heuristics(self, peek_results: dict) -> str: - """Detects common shell/REPL prompts in stdout to trigger early finish.""" - import re - # Patterns for bash, zsh, python, node, and generic prompts - # We look at the last ~100 characters of stdout for speed - PROMPT_PATTERNS = [ - r"[\r\n].*[@\w\.\-]+:.*[#$]\s*$", # bash/zsh: user@host:~$ - r">>>\s*$", # python - r"\.\.\.\s*$", # python multi-line - r">\s*$", # node/js - ] - - all_ready = True - for nid, res in peek_results.items(): - # If the task is already finished by the node, it's ready - status = res.get("status") - if status not in ["RUNNING", "TIMEOUT_PENDING"]: - continue - - stdout = res.get("stdout", "") - tail = stdout[-100:] if len(stdout) > 100 else stdout - - is_at_prompt = any(re.search(p, tail) for p in PROMPT_PATTERNS) - if not is_at_prompt: - all_ready = False - break - - return "FINISH" if all_ready and peek_results else "WAIT" - - async def _analyze_progress(self, peek_results): - """Calls LLM to analyze the live stream and decide next move.""" - try: + # Prepare a very compact prompt for the sub-agent reporter + # We provide the command and the raw result + raw_res = str(self.result)[:3000] # Slightly more context prompt = ( - f"SYSTEM PROMPT: {self.subagent_system_prompt}\n\n" - f"CURRENT TERMINAL STATE (SWARM):\n" - f"{json.dumps(peek_results, indent=2)}\n\n" - "INSTRUCTIONS: Analyze the status. Respond ONLY with a JSON object:\n" - "{\n" - " \"action\": \"WAIT\" | \"FINISH\" | \"ABORT\" | \"EXECUTE\",\n" - " \"reason\": \"...\",\n" - " \"next_wait\": ,\n" - " \"command\": \"\",\n" - " \"node_id\": \"\"\n" - "}\n" - "Tip: Use 'EXECUTE' for branching agency (e.g. if node 1 is ready, run a new command on node 2). " - "For long tasks, set next_wait to 10-20. For quick ticks, use 3-5." + f"You are a Sub-Agent reporter on node '{self.args.get('node_id')}'.\n" + f"Task Performed: {self.name} {self.args}\n" + f"Raw Outcome: {raw_res}\n" + f"Status: {self.status}\n" + f"Errors: {self.error if self.error else 'None'}\n\n" + "Task: Summarize the result for your Master-Architect in ONE CONCISE SENTENCE. " + "Include key findings (e.g., 'Found 3 files', 'Command failed with exit 1', 'Subnet is 10.0.0.0/24'). " + "Do not use conversational filler. Be the 'eyes and ears' on the ground." ) - response = await self.llm.acompletion(prompt=prompt, response_format={"type": "json_object"}) - return json.loads(response.choices[0].message.content) - except: - return {"action": "WAIT", "reason": "AI analysis unavailable, continuing default wait.", "next_wait": 5} + + response = await self.llm.acompletion(prompt=prompt, max_tokens=100, temperature=0) + summary = response.choices[0].message.content.strip() + summary = summary.strip('"').strip("'") + if ":" in summary[:15]: + summary = summary.split(":", 1)[1].strip() + return summary + except Exception as e: + logger.warning(f"Failed to generate sub-agent summary: {e}") + return None + + async def _monitor_atomic_task(self, partial_res: dict, node_id: str): + """Monitors a single atomic task until finished without making decisions.""" + task_id = partial_res.get("task_id") + if not task_id: return partial_res + + task_map = {node_id: task_id} + max_checks = 120 # 10 minutes max for an atomic task + for _ in range(max_checks): + # Non-blocking peek + res = await asyncio.to_thread(self.assistant.wait_for_swarm, task_map, timeout=0, no_abort=True) + node_res = res.get(node_id, {}) + + if node_res.get("status") not in ["RUNNING", "TIMEOUT_PENDING"]: + # Finished! Collect final and return + return await asyncio.to_thread(self.assistant.wait_for_swarm, task_map, timeout=2) + + # Simple heartbeat for UI + if self.on_event: + await self.on_event({"type": "subagent_thought", "node_id": node_id, "content": "*(Still executing...)*"}) + + await asyncio.sleep(5) + + return {"error": "SubAgent timed out waiting for atomic task."} def get_elapsed(self) -> int: if not self.start_time: diff --git a/ai-hub/app/core/services/tool.py b/ai-hub/app/core/services/tool.py index e106d6e..ce7c119 100644 --- a/ai-hub/app/core/services/tool.py +++ b/ai-hub/app/core/services/tool.py @@ -57,7 +57,7 @@ return tools - async def call_tool(self, tool_name: str, arguments: Dict[str, Any], db: Session = None, user_id: str = None, on_event = None) -> Any: + async def call_tool(self, tool_name: str, arguments: Dict[str, Any], db: Session = None, user_id: str = None, session_id: str = None, on_event = None) -> Any: """ Executes a registered skill. """ @@ -71,12 +71,12 @@ if db: db_skill = db.query(models.Skill).filter(models.Skill.name == tool_name).first() if db_skill and db_skill.is_system: - return await self._execute_system_skill(db_skill, arguments, user_id=user_id, db=db, on_event=on_event) + return await self._execute_system_skill(db_skill, arguments, user_id=user_id, db=db, session_id=session_id, on_event=on_event) logger.error(f"Tool '{tool_name}' not found or handled yet.") return {"success": False, "error": "Tool not found"} - async def _execute_system_skill(self, skill: models.Skill, args: Dict[str, Any], user_id: str = None, db: Session = None, on_event = None) -> Any: + async def _execute_system_skill(self, skill: models.Skill, args: Dict[str, Any], user_id: str = None, db: Session = None, session_id: str = None, on_event = None) -> Any: """Routes system skill execution to a stateful SubAgent.""" from app.core.services.sub_agent import SubAgent from app.core.providers.factory import get_llm_provider @@ -87,12 +87,25 @@ assistant = orchestrator.assistant node_id = args.get("node_id") - # node_id requirement is now handled per-skill below to support swarm/plural fields + + # M3: Resolve session_id from either arguments OR the passed session_id context + # (AI might use placeholders like 'current' which we resolve here) + session_id_arg = args.get("session_id") + if not session_id_arg or session_id_arg == "current": + resolved_sid = session_id or "__fs_explorer__" + else: + resolved_sid = session_id_arg + + logger.info(f"[ToolService] Executing {skill.name} on {node_id or 'swarm'} (Resolved Session: {resolved_sid})") # --- AI Sub-Agent Setup --- llm_provider = None - subagent_prompt = skill.config.get("subagent_system_prompt") + subagent_prompt_base = skill.config.get("subagent_system_prompt", "") + # Inject Mesh Context for Atomic Monitoring + mesh_instruction = f"\n\n## Atomic Step Environment:\n- Node Sync Path: `/tmp/cortex-sync/{resolved_sid}/`.\n- Mission: Execute the requested command and monitor for completion. Do not make autonomous decisions; return the final output to the Master Architect." + subagent_prompt = subagent_prompt_base + mesh_instruction + if db and user_id and subagent_prompt: user = db.query(models.User).filter(models.User.id == user_id).first() if user: @@ -120,10 +133,10 @@ if node_ids and isinstance(node_ids, list): task_fn = assistant.dispatch_swarm - task_args = {"node_ids": node_ids, "cmd": cmd, "timeout": timeout, "session_id": session_id, "no_abort": no_abort} + task_args = {"node_ids": node_ids, "cmd": cmd, "timeout": timeout, "session_id": resolved_sid, "no_abort": no_abort} elif node_id: task_fn = assistant.dispatch_single - task_args = {"node_id": node_id, "cmd": cmd, "timeout": timeout, "session_id": session_id, "no_abort": no_abort} + task_args = {"node_id": node_id, "cmd": cmd, "timeout": timeout, "session_id": resolved_sid, "no_abort": no_abort} else: return {"success": False, "error": "node_id or node_ids is required"} @@ -153,33 +166,58 @@ browser_action = agent_pb2.BrowserAction( action=action_type, url=args.get("url", ""), - session_id=session_id or "" + session_id=resolved_sid or "" ) task_fn = assistant.dispatch_browser - task_args = {"node_id": node_id, "action": browser_action, "session_id": session_id} + task_args = {"node_id": node_id, "action": browser_action, "session_id": resolved_sid} elif skill.name == "mesh_file_explorer": # ... existing logic ... action = args.get("action") path = args.get("path") - explorer_sid = session_id or "__fs_explorer__" - if action == "list": task_fn = assistant.ls - task_args = {"node_id": node_id, "path": path, "session_id": explorer_sid} + task_args = {"node_id": node_id, "path": path, "session_id": resolved_sid} elif action == "read": task_fn = assistant.cat - task_args = {"node_id": node_id, "path": path, "session_id": explorer_sid} + task_args = {"node_id": node_id, "path": path, "session_id": resolved_sid} elif action == "write": content = args.get("content", "").encode('utf-8') task_fn = assistant.write - task_args = {"node_id": node_id, "path": path, "content": content, "session_id": explorer_sid} + task_args = {"node_id": node_id, "path": path, "content": content, "session_id": resolved_sid} elif action == "delete": task_fn = assistant.rm - task_args = {"node_id": node_id, "path": path, "session_id": explorer_sid} + task_args = {"node_id": node_id, "path": path, "session_id": resolved_sid} else: return {"success": False, "error": f"Unsupported action: {action}"} + elif skill.name == "mesh_sync_control": + action_str = args.get("action", "start").upper() + # Normalize mapping user string to assistant enum + action_map = { + "START": "START", + "STOP": "STOP", + "LOCK": "LOCK", + "UNLOCK": "UNLOCK", + "RESYNC": "RESYNC" + } + internal_action = action_map.get(action_str, "START") + task_fn = assistant.control_sync + task_args = { + "node_id": node_id, + "session_id": resolved_sid, + "action": internal_action, + "path": args.get("path", ".") + } + + elif skill.name == "mesh_inspect_drift": + task_fn = assistant.inspect_drift + task_args = { + "node_id": node_id, + "path": args.get("path"), + "session_id": resolved_sid + } + if task_fn: # Create and run the SubAgent (potentially AI-powered) sub_agent = SubAgent( @@ -194,11 +232,14 @@ ) res = await sub_agent.run() - # Standardize output for AI - if isinstance(res, dict) and "error" in res: + # Standardize output for AI: + # If it's a string (our new Intelligence Report), pass it through directly. + # If it's a dict, only wrap as failure if a non-None error exists. + if isinstance(res, dict) and res.get("error"): return {"success": False, "error": res["error"], "sub_agent_status": sub_agent.status} - return {"success": True, "output": res, "sub_agent_status": sub_agent.status} + logger.info(f"[ToolService] System skill '{skill.name}' completed (Status: {sub_agent.status}).") + return res except Exception as e: logger.exception(f"System skill execution failed: {e}") diff --git a/ai-hub/app/core/skills/definitions.py b/ai-hub/app/core/skills/definitions.py index 8da71f8..35220c6 100644 --- a/ai-hub/app/core/skills/definitions.py +++ b/ai-hub/app/core/skills/definitions.py @@ -131,7 +131,14 @@ { "name": "mesh_file_explorer", "description": "List, read, and manipulate files within the decentralized mesh synchronization system.", - "system_prompt": "You are a file management assistant. You can browse and synchronize files across different agent nodes.", + "system_prompt": ( + "You are a file management assistant. Use this tool for high-performance file operations:\n" + "1. **`list`**: Explore directories. If a 'session_id' is provided, it uses the zero-latency Hub mirror.\n" + "2. **`read`**: Fetch file content. Uses local Hub mirror fast-path if available.\n" + "3. **`write`**: Synchronously update Hub mirror and background push to node.\n" + "4. **`delete`**: Remove from Hub and dispatch remote delete.\n" + "Always include 'session_id' for improved performance unless you need to bypass the ghost mirror." + ), "skill_type": "local", "is_enabled": True, "features": ["chat", "workflow", "swarm_control"], @@ -152,5 +159,56 @@ }, "is_system": True + }, + { + "name": "mesh_sync_control", + "description": "Manage replication, synchronization, and locks across nodes in the decentralized ghost mirror filesystem.", + "system_prompt": ( + "Use this tool to manage the synchronization state of files across the swarm.\n" + "1. **`start`**: Instruct a node to begin watching and syncing a local directory.\n" + "2. **`lock`**: Disable user-side file watcher on a node. Use this BEFORE starting multi-file refactors to prevent race conditions.\n" + "3. **`unlock`**: Restore user-side sync after an AI refactor is complete.\n" + "4. **`resync`**: Force a node to perform a full hash-based reconciliation against the master mirror on the Hub." + ), + "skill_type": "local", + "is_enabled": True, + "features": ["chat", "workflow", "swarm_control"], + "config": { + "actions": ["start", "stop", "lock", "unlock", "resync"], + "parameters": { + "type": "object", + "properties": { + "action": {"type": "string", "enum": ["start", "stop", "lock", "unlock", "resync"], "description": "Control action."}, + "node_id": {"type": "string", "description": "Target node ID."}, + "session_id": {"type": "string", "description": "The workspace session ID to manage."}, + "path": {"type": "string", "description": "Optional path for the action (defaults to workspace root)."} + }, + "required": ["action", "node_id", "session_id"] + } + }, + "is_system": True + }, + { + "name": "mesh_inspect_drift", + "description": "Perform a deep comparison between the Hub's local record and a node's physical file state.", + "system_prompt": ( + "Use this tool when you suspect the Hub mirror is out of sync with an edge node.\n" + "It will return a unified diff showing exactly what changed on the remote node vs your local Hub copy." + ), + "skill_type": "local", + "is_enabled": True, + "features": ["chat", "workflow", "swarm_control"], + "config": { + "parameters": { + "type": "object", + "properties": { + "node_id": {"type": "string", "description": "Target node ID."}, + "path": {"type": "string", "description": "Relative path to the file to inspect."}, + "session_id": {"type": "string", "description": "The workspace session ID."} + }, + "required": ["node_id", "path", "session_id"] + } + }, + "is_system": True } ] diff --git a/ai-hub/app/db/migrate.py b/ai-hub/app/db/migrate.py index 07cef65..c14be17 100644 --- a/ai-hub/app/db/migrate.py +++ b/ai-hub/app/db/migrate.py @@ -51,6 +51,7 @@ ("attached_node_ids", "TEXT"), ("node_sync_status", "TEXT"), ("sync_config", "TEXT"), + ("is_cancelled", "INTEGER DEFAULT 0"), ] for col_name, col_type in session_required_columns: if col_name not in session_columns: diff --git a/ai-hub/app/db/models.py b/ai-hub/app/db/models.py index b780b58..cadd9e7 100644 --- a/ai-hub/app/db/models.py +++ b/ai-hub/app/db/models.py @@ -95,6 +95,8 @@ created_at = Column(DateTime, default=datetime.utcnow, nullable=False) # Flag to indicate if the session has been archived or soft-deleted. is_archived = Column(Boolean, default=False, nullable=False) + # Flag to indicate if the current AI execution should be cancelled. + is_cancelled = Column(Boolean, default=False, nullable=False) # --- Agent Node Integration (M3) --- # Stable workspace ID used as Ghost Mirror session_id across all attached nodes. diff --git a/docker-compose.test-nodes.yml b/docker-compose.test-nodes.yml index 722f2fa..5dfdd65 100644 --- a/docker-compose.test-nodes.yml +++ b/docker-compose.test-nodes.yml @@ -15,6 +15,9 @@ - AGENT_TLS_ENABLED=false - DEBUG_GRPC=true restart: unless-stopped + cap_add: + - NET_ADMIN + privileged: true test-node-2: build: @@ -29,3 +32,6 @@ - AGENT_TLS_ENABLED=false - DEBUG_GRPC=true restart: unless-stopped + cap_add: + - NET_ADMIN + privileged: true diff --git a/ui/client-app/src/components/ChatArea.js b/ui/client-app/src/components/ChatArea.js index 90ebb82..343a5ee 100644 --- a/ui/client-app/src/components/ChatArea.js +++ b/ui/client-app/src/components/ChatArea.js @@ -5,6 +5,7 @@ const ChatArea = ({ chatHistory, onSendMessage, + onCancel, isProcessing, featureName = "default", workspaceId = null, @@ -108,16 +109,27 @@ : "Type a message..." } > - +
+ + {isProcessing && ( + + )} +
diff --git a/ui/client-app/src/components/ChatWindow.js b/ui/client-app/src/components/ChatWindow.js index 0f4859c..a6da748 100644 --- a/ui/client-app/src/components/ChatWindow.js +++ b/ui/client-app/src/components/ChatWindow.js @@ -24,9 +24,12 @@ // Removed auto-expand behavior so AI 'think'/'tool' traces default to collapsed // unless explicitly requested by the user, improving readability of the main answer. + // Auto-expand the thought trace while reasoning is streaming so the user sees progress useEffect(() => { - // Left empty purposely - }, [message.reasoning]); + if (message.reasoning && !message.thoughtDone && (message.status === "Thinking" || message.status?.includes("Calling tool"))) { + setIsReasoningExpanded(true); + } + }, [message.reasoning, message.status, message.thoughtDone]); // Handle exclusive playback: stop if someone else starts playing useEffect(() => { diff --git a/ui/client-app/src/components/MultiNodeConsole.js b/ui/client-app/src/components/MultiNodeConsole.js index e052b7a..f073a40 100644 --- a/ui/client-app/src/components/MultiNodeConsole.js +++ b/ui/client-app/src/components/MultiNodeConsole.js @@ -107,10 +107,11 @@ ) : ( thoughtHistory.map((t, i) => (
-
+
{new Date(t.time * 1000).toLocaleTimeString([], { hour12: false, hour: '2-digit', minute: '2-digit', second: '2-digit' })}
-
+
+ {t.type === 'mesh_observation' && '⚠️ '} {t.thought}
@@ -189,14 +190,15 @@ return; } - // Handle Sub-Agent Thoughts - if (msg.event === 'subagent_thought') { + // Handle Sub-Agent Thoughts & Mesh Observations + if (msg.event === 'subagent_thought' || msg.event === 'mesh_observation') { setNodeHistory(prev => { - const history = prev[msg.node_id] || []; + const nodeHistory = prev[msg.node_id] || []; + const content = msg.event === 'mesh_observation' ? msg.data.message : msg.data; // Avoid exact duplicates back-to-back - if (history.length > 0 && history[history.length - 1].thought === msg.data) return prev; - const newEntry = { time: Date.now() / 1000, thought: msg.data }; - return { ...prev, [msg.node_id]: [...history, newEntry] }; + if (nodeHistory.length > 0 && nodeHistory[nodeHistory.length - 1].thought === content) return prev; + const newEntry = { time: Date.now() / 1000, thought: content, type: msg.event }; + return { ...prev, [msg.node_id]: [...nodeHistory, newEntry] }; }); return; } diff --git a/ui/client-app/src/hooks/useSwarmControl.js b/ui/client-app/src/hooks/useSwarmControl.js index a1bc392..8ea32c2 100644 --- a/ui/client-app/src/hooks/useSwarmControl.js +++ b/ui/client-app/src/hooks/useSwarmControl.js @@ -1,6 +1,6 @@ import { useState, useEffect, useRef, useCallback } from "react"; import { getSessionId } from "../services/websocket"; -import { getSessionTokenStatus, getSessionMessages, chatWithAI, getUserConfig, getSession } from "../services/apiService"; +import { getSessionTokenStatus, getSessionMessages, chatWithAI, getUserConfig, getSession, cancelSession } from "../services/apiService"; const useSwarmControl = ({ pageContainerRef, onNewSessionCreated }) => { const [chatHistory, setChatHistory] = useState([]); @@ -156,14 +156,16 @@ if (event.type === "reasoning") { if (!reasoningStartTime) reasoningStartTime = Date.now(); lastMsg.reasoning += event.content; - lastMsg.status = "Thinking"; // Switch to Thinking if reasoning tokens arrive + // Only update status to planning if we are in the brain's strategy phase. + if (!lastMsg.status || (lastMsg.status === "Generating" || lastMsg.status === "Analyzing & Planning")) { + lastMsg.status = "Analyzing & Planning"; + } } else if (event.type === "content") { if (reasoningStartTime && !lastMsg.thoughtDone) { reasoningDuration = Math.round((Date.now() - reasoningStartTime) / 1000); - lastMsg.status = `Thought for ${reasoningDuration}s`; + lastMsg.status = reasoningDuration > 0 ? `Thought for ${reasoningDuration}s` : null; lastMsg.thoughtDone = true; - } else if (!reasoningStartTime && lastMsg.status === "Generating") { - // Direct answer case: hide status once tokens start arriving + } else if (!reasoningStartTime && (lastMsg.status === "Generating" || lastMsg.status === "Analyzing & Planning")) { lastMsg.status = null; } lastMsg.text += event.content; @@ -190,6 +192,17 @@ }, [isConfigured, localActiveLLM, fetchTokenUsage]); + const handleCancelChat = useCallback(async () => { + if (!sessionIdRef.current) return; + try { + await cancelSession(sessionIdRef.current); + // We don't set isProcessing false here immediately if we want to wait for the stream + // to actually close, but the user wants immediate feedback. + // RagPipeline already checks session.is_cancelled at the start of each turn. + } catch (err) { + console.warn("Failed to cancel session", err); + } + }, []); const handleSwitchSession = useCallback(async (targetSessionId) => { localStorage.setItem("sessionId_swarm_control", targetSessionId); @@ -218,6 +231,7 @@ isConfigured, missingConfigs, handleSendChat, + handleCancelChat, setShowErrorModal, handleSwitchSession, sessionId, diff --git a/ui/client-app/src/pages/SwarmControlPage.js b/ui/client-app/src/pages/SwarmControlPage.js index bd62e37..89ad3a7 100644 --- a/ui/client-app/src/pages/SwarmControlPage.js +++ b/ui/client-app/src/pages/SwarmControlPage.js @@ -38,6 +38,7 @@ showErrorModal, tokenUsage, handleSendChat, + handleCancelChat, setShowErrorModal, handleSwitchSession, sessionId, @@ -468,6 +469,7 @@ { + const userId = getUserId(); + const response = await fetch(`${API_BASE_URL}/sessions/${sessionId}/cancel`, { + method: "POST", + headers: { "X-User-ID": userId }, + }); + if (!response.ok) { + throw new Error(`Failed to cancel session. Status: ${response.status}`); + } + return await response.json(); +}; // --- Unchanged Functions ---