diff --git a/agent-node/agent_node/node.py b/agent-node/agent_node/node.py index ce289d4..12823c6 100644 --- a/agent-node/agent_node/node.py +++ b/agent-node/agent_node/node.py @@ -425,7 +425,8 @@ for root, dirs, filenames in os.walk(watch_path): for filename in filenames: abs_path = os.path.join(root, filename) - r_path = os.path.relpath(abs_path, watch_path) + # r_path must be relative to base_dir so the server correctly joins it to the mirror root + r_path = os.path.relpath(abs_path, base_dir) try: with open(abs_path, "rb") as f: h = hashlib.sha256(f.read()).hexdigest() @@ -435,7 +436,8 @@ for d in dirs: abs_path = os.path.join(root, d) - r_path = os.path.relpath(abs_path, watch_path) + # r_path must be relative to base_dir so the server correctly joins it to the mirror root + r_path = os.path.relpath(abs_path, base_dir) files.append(agent_pb2.FileInfo(path=r_path, size=0, hash="", is_dir=True)) except Exception as e: print(f" [❌] Manifest generation failed for {rel_path}: {e}") @@ -612,7 +614,8 @@ def _on_finish(self, tid, res, trace): """Final Completion Callback: Routes task results back to server.""" print(f"[*] Completion: {tid}", flush=True) - status = agent_pb2.TaskResponse.SUCCESS if res['status'] == 1 else agent_pb2.TaskResponse.ERROR + # 0 is SUCCESS, 1 is ERROR in Protobuf + status = res.get('status', agent_pb2.TaskResponse.ERROR) tr = agent_pb2.TaskResponse( task_id=tid, status=status, diff --git a/agent-node/agent_node/skills/manager.py b/agent-node/agent_node/skills/manager.py index 31ec258..b232229 100644 --- a/agent-node/agent_node/skills/manager.py +++ b/agent-node/agent_node/skills/manager.py @@ -18,6 +18,7 @@ def _discover_skills(self, sync_mgr): """Scans the skills/ directory for logic.py and loads skill implementations.""" # Find project root (assumes /app/agent_node/skills/manager.py) + # We need to go up from agent_node/skills/manager.py to /app/ project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../")) skills_dir = os.path.join(project_root, "skills") diff --git a/agent-node/skills b/agent-node/skills new file mode 120000 index 0000000..dc1752f --- /dev/null +++ b/agent-node/skills @@ -0,0 +1 @@ +/app/skills \ No newline at end of file diff --git a/ai-hub/app/api/routes/nodes.py b/ai-hub/app/api/routes/nodes.py index 154ceb3..04a8029 100644 --- a/ai-hub/app/api/routes/nodes.py +++ b/ai-hub/app/api/routes/nodes.py @@ -28,6 +28,7 @@ import secrets import logging from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect, Depends +from fastapi.responses import StreamingResponse from sqlalchemy.orm import Session @@ -384,73 +385,66 @@ """ Generate and return the agent_config.yaml content an admin downloads and places alongside the node client software before deployment. - - The YAML contains: - - node_id, hub_url, invite_token (pre-signed) - - skill enable/disable toggles matching the admin's skill_config - - Example usage: - curl -o agent_config.yaml $HUB/nodes/admin/{node_id}/config.yaml?admin_id=... - # Deploy the node software with this file present. """ - import os _require_admin(admin_id, db) node = _get_node_or_404(node_id, db) - - hub_url = os.getenv("HUB_PUBLIC_URL", "https://ai.jerxie.com") - hub_grpc = os.getenv("HUB_GRPC_ENDPOINT", "ai.jerxie.com:50051") - secret_key = os.getenv("SECRET_KEY", "dev-secret-key-1337") - skill_cfg = node.skill_config or {} - if isinstance(skill_cfg, str): - import json - try: - skill_cfg = json.loads(skill_cfg) - except Exception: - skill_cfg = {} - - lines = [ - "# Cortex Hub — Agent Node Configuration", - f"# Generated for node '{node_id}' — keep this file secret.", - "", - f"node_id: \"{node_id}\"", - f"node_description: \"{node.display_name}\"", - "", - "# Hub connection", - f"hub_url: \"{hub_url}\"", - f"grpc_endpoint: \"{hub_grpc}\"", - "", - "# Authentication — do NOT share these secrets", - f"invite_token: \"{node.invite_token}\"", - f"auth_token: \"{node.invite_token}\"", - "", - "# HMAC signing key — must match the hub's SECRET_KEY exactly", - f"secret_key: \"{secret_key}\"", - "", - "# Skill configuration (mirrors admin settings; node respects these at startup)", - "skills:", - ] - for skill, cfg in skill_cfg.items(): - if not isinstance(cfg, dict): - continue - enabled = cfg.get("enabled", True) - lines.append(f" {skill}:") - lines.append(f" enabled: {str(enabled).lower()}") - for k, v in cfg.items(): - if k != "enabled" and v is not None: - lines.append(f" {k}: {v}") - - lines += [ - "", - "# Workspace sync root — override if needed", - "sync_root: \"/tmp/cortex-workspace\"", - "", - "# TLS — set to false only in dev", - "tls: true", - ] - - config_yaml = "\n".join(lines) + config_yaml = _generate_node_config_yaml(node) return schemas.NodeConfigYamlResponse(node_id=node_id, config_yaml=config_yaml) + @router.get("/admin/{node_id}/download", summary="[Admin] Download Agent Node Bundle (ZIP)") + def admin_download_bundle( + node_id: str, + admin_id: str, + db: Session = Depends(get_db) + ): + """ + Bundles the entire Agent Node source code along with a pre-configured + agent_config.yaml into a single ZIP file for the user to download. + """ + import io + import zipfile + + _require_admin(admin_id, db) + node = _get_node_or_404(node_id, db) + config_yaml = _generate_node_config_yaml(node) + + # Create ZIP in memory + zip_buffer = io.BytesIO() + with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file: + # 1. Add Agent Node source files from /app/agent-node + source_dir = "/app/agent-node" + if os.path.exists(source_dir): + for root, dirs, files in os.walk(source_dir): + # Exclude unwanted directories + dirs[:] = [d for d in dirs if not d.startswith("sync-node-") and d != "__pycache__" and d != ".git"] + + for file in files: + if file == ".env" or file == "agent_config.yaml": continue + + file_path = os.path.join(root, file) + rel_path = os.path.relpath(file_path, source_dir) + zip_file.write(file_path, rel_path) + + # 2. Add skills from /app/skills + skills_dir = "/app/skills" + if os.path.exists(skills_dir): + for root, dirs, files in os.walk(skills_dir): + dirs[:] = [d for d in dirs if d != "__pycache__"] + for file in files: + file_path = os.path.join(root, file) + rel_path = os.path.join("skills", os.path.relpath(file_path, skills_dir)) + zip_file.write(file_path, rel_path) + + # 3. Add the generated config YAML as 'agent_config.yaml' + zip_file.writestr("agent_config.yaml", config_yaml) + + zip_buffer.seek(0) + return StreamingResponse( + zip_buffer, + media_type="application/x-zip-compressed", + headers={"Content-Disposition": f"attachment; filename=cortex-node-{node_id}.zip"} + ) + # ================================================================== # M4: Invite Token Validation (called internally by gRPC server) @@ -565,6 +559,7 @@ try: task_req = agent_pb2.TaskRequest( task_id=task_id, + task_type="shell", payload_json=cmd, signature=sign_payload(cmd), timeout_ms=0, @@ -816,6 +811,60 @@ # Helpers # =========================================================================== +def _generate_node_config_yaml(node: models.AgentNode) -> str: + """Helper to generate the agent_config.yaml content.""" + hub_url = os.getenv("HUB_PUBLIC_URL", "https://ai.jerxie.com") + hub_grpc = os.getenv("HUB_GRPC_ENDPOINT", "ai.jerxie.com:50051") + secret_key = os.getenv("SECRET_KEY", "dev-secret-key-1337") + + skill_cfg = node.skill_config or {} + if isinstance(skill_cfg, str): + try: + skill_cfg = json.loads(skill_cfg) + except Exception: + skill_cfg = {} + + lines = [ + "# Cortex Hub — Agent Node Configuration", + f"# Generated for node '{node.node_id}' — keep this file secret.", + "", + f"node_id: \"{node.node_id}\"", + f"node_description: \"{node.display_name}\"", + "", + "# Hub connection", + f"hub_url: \"{hub_url}\"", + f"grpc_endpoint: \"{hub_grpc}\"", + "", + "# Authentication — do NOT share these secrets", + f"invite_token: \"{node.invite_token}\"", + f"auth_token: \"{node.invite_token}\"", + "", + "# HMAC signing key — must match the hub's SECRET_KEY exactly", + f"secret_key: \"{secret_key}\"", + "", + "# Skill configuration (mirrors admin settings; node respects these at startup)", + "skills:", + ] + for skill, cfg in skill_cfg.items(): + if not isinstance(cfg, dict): + continue + enabled = cfg.get("enabled", True) + lines.append(f" {skill}:") + lines.append(f" enabled: {str(enabled).lower()}") + for k, v in cfg.items(): + if k != "enabled" and v is not None: + lines.append(f" {k}: {v}") + + lines += [ + "", + "# Workspace sync root — override if needed", + "sync_root: \"/tmp/cortex-workspace\"", + "", + "# TLS — set to false only in dev", + "tls: true", + ] + return "\n".join(lines) + def _get_node_or_404(node_id: str, db: Session) -> models.AgentNode: node = db.query(models.AgentNode).filter(models.AgentNode.node_id == node_id).first() if not node: diff --git a/ai-hub/app/app.py b/ai-hub/app/app.py index 6f1ee5a..f7b020a 100644 --- a/ai-hub/app/app.py +++ b/ai-hub/app/app.py @@ -1,9 +1,10 @@ # app/app.py +import asyncio +import logging +import litellm from fastapi import FastAPI from contextlib import asynccontextmanager from typing import List -import litellm -import logging logger = logging.getLogger(__name__) # Import centralized settings and other components @@ -58,6 +59,9 @@ app.state.orchestrator = orchestrator app.state.services.with_service("orchestrator", orchestrator) logger.info("[M6] Agent Orchestrator gRPC server started on port 50051.") + + # Launch periodic Ghost Mirror cleanup + asyncio.create_task(_periodic_mirror_cleanup(orchestrator)) except Exception as e: logger.error(f"[M6] Failed to start gRPC server: {e}") @@ -81,6 +85,27 @@ if hasattr(app.state, 'vector_store'): app.state.vector_store.save_index() +async def _periodic_mirror_cleanup(orchestrator): + """Periodically purges orphaned ghost mirror directories from the server.""" + await asyncio.sleep(10) # Initial delay to let DB settle + while True: + try: + from app.db.session import get_db_session + from app.db import models + with get_db_session() as db: + # Fetch all unique sync_workspace_ids currently in DB + sessions = db.query(models.Session).filter(models.Session.sync_workspace_id != None).all() + active_ids = [s.sync_workspace_id for s in sessions] + + if hasattr(orchestrator, 'mirror'): + orchestrator.mirror.purge_orphaned(active_ids) + else: + logger.warning("[📁🧹] Orchestrator missing mirror manager during cleanup pass.") + except Exception as e: + logger.error(f"[📁🧹] Ghost Mirror periodic cleanup fail: {e}") + + await asyncio.sleep(3600) # Run every hour + def create_app() -> FastAPI: """ Factory function to create and configure the FastAPI application. diff --git a/ai-hub/app/core/grpc/core/mirror.py b/ai-hub/app/core/grpc/core/mirror.py index d32a569..c452cf5 100644 --- a/ai-hub/app/core/grpc/core/mirror.py +++ b/ai-hub/app/core/grpc/core/mirror.py @@ -88,6 +88,8 @@ if actual_hash != expected_hash: print(f"[⚠️] Hash Mismatch for {path}: expected {expected_hash}, got {actual_hash}") + + def reconcile(self, session_id: str, remote_manifest: agent_pb2.DirectoryManifest) -> List[str]: """Compares remote manifest with local mirror and returns list of paths missing/changed.""" workspace = self.get_workspace_path(session_id) @@ -96,10 +98,15 @@ expected_paths = {f.path for f in remote_manifest.files} # 1. Purge extraneous local files and directories (handles Deletions) + manifest_root = remote_manifest.root_path or "." for root, dirs, files in os.walk(workspace, topdown=False): for name in files: abs_path = os.path.join(root, name) rel_path = os.path.relpath(abs_path, workspace) + + is_within_root = (manifest_root == "." or rel_path.startswith(manifest_root + os.sep) or rel_path == manifest_root) + if not is_within_root: continue + if rel_path in [".cortexignore", ".gitignore"]: continue if rel_path not in expected_paths and not ignore_filter.is_ignored(rel_path): try: @@ -110,6 +117,10 @@ for name in dirs: abs_path = os.path.join(root, name) rel_path = os.path.relpath(abs_path, workspace) + + is_within_root = (manifest_root == "." or rel_path.startswith(manifest_root + os.sep) or rel_path == manifest_root) + if not is_within_root: continue + if rel_path not in expected_paths and not ignore_filter.is_ignored(rel_path): try: if not os.listdir(abs_path): @@ -130,3 +141,25 @@ if local_hash != remote_file.hash: needs_update.append(remote_file.path) return needs_update + + def purge_orphaned(self, active_session_ids: List[str]): + """ + Deletes mirror directories that are not in the provided active list. + Ignores special directories like '__fs_explorer__' or internal bookmarks. + """ + if not os.path.exists(self.storage_root): + return + + print(f" [📁🧹] Running Mirror Cleanup. Active Sessions: {len(active_session_ids)}") + active_set = set(active_session_ids) + # Always preserve special system folders + active_set.add("__fs_explorer__") + active_set.add(".cortexignore") + + for entry in os.scandir(self.storage_root): + if entry.is_dir() and entry.name not in active_set: + try: + shutil.rmtree(entry.path) + print(f" [📁🗑️] Purged orphaned ghost mirror: {entry.name}") + except Exception as e: + print(f" [📁⚠️] Failed to purge orphaned mirror {entry.name}: {e}") diff --git a/ai-hub/app/core/grpc/services/assistant.py b/ai-hub/app/core/grpc/services/assistant.py index 803ed18..e79a748 100644 --- a/ai-hub/app/core/grpc/services/assistant.py +++ b/ai-hub/app/core/grpc/services/assistant.py @@ -473,7 +473,7 @@ # 12-Factor Signing Logic sig = sign_payload(cmd) req = agent_pb2.ServerTaskMessage(task_request=agent_pb2.TaskRequest( - task_id=tid, payload_json=cmd, signature=sig, session_id=session_id, + task_id=tid, task_type="shell", payload_json=cmd, signature=sig, session_id=session_id, timeout_ms=timeout * 1000)) logger.info(f"[📤] Dispatching shell {tid} to {node_id}") diff --git a/ai-hub/app/core/orchestration/architect.py b/ai-hub/app/core/orchestration/architect.py index 767ef75..3ae31c4 100644 --- a/ai-hub/app/core/orchestration/architect.py +++ b/ai-hub/app/core/orchestration/architect.py @@ -1,5 +1,6 @@ import logging import queue +import time from typing import List, Dict, Any, Optional from app.db import models from .memory import ContextManager @@ -55,10 +56,12 @@ profile = get_profile(feature_name) self.stream = StreamProcessor(profile=profile) turn = 0 + session_start_time = time.time() try: while turn < profile.autonomous_limit: turn += 1 + turn_start_time = time.time() self.stream.reset_turn() # A. Cancellation / Memory check @@ -110,11 +113,17 @@ # Tool delta accumulation self._accumulate_tool_calls(delta, tool_calls_map) + # End Stream & Flush buffers + async for event in self.stream.end_stream(turn): + if event["type"] == "content": accumulated_content += event["content"] + if event["type"] == "reasoning": accumulated_reasoning += event["content"] + yield event + # E. Branch: Tools or Exit? # Heartbeat Fallback: If no content was sent but tools are being called, force a bridge sentence if tool_calls_map and not self.stream.header_sent and not profile.silent_stream: fallback_text = f"Strategy: Executing orchestrated tasks in progress..." - for event in self.stream.process_chunk(fallback_text, turn): + async for event in self.stream.process_chunk(fallback_text, turn): if event["type"] == "content": accumulated_content += event["content"] yield event @@ -147,6 +156,13 @@ yield {"type": "status", "content": "Watchdog: tasks remain. continuing..."} messages.append({"role": "user", "content": "WATCHDOG: .ai_todo.md has open items. Please continue until all are marked [COMPLETED]."}) continue + + # Turn duration report (Natural Exit) + turn_duration = time.time() - turn_start_time + total_duration = time.time() - session_start_time + duration_marker = f"\n\n> **⏱️ Turn {turn} Duration:** {turn_duration:.1f}s | **Total:** {total_duration:.1f}s\n" + yield {"type": "reasoning", "content": duration_marker} + yield {"type": "status", "content": f"Turn {turn} finished in **{turn_duration:.1f}s**. (Session Total: **{total_duration:.1f}s**)"} return # Natural exit # F. Execute Tools @@ -162,12 +178,19 @@ messages.append(self._format_assistant_msg(accumulated_content, accumulated_reasoning, processed_tc)) # Run parallel execution - executor = ToolExecutor(tool_service, user_id, db, sync_workspace_id) + executor = ToolExecutor(tool_service, user_id, db, sync_workspace_id, session_id) async for event in executor.run_tools(processed_tc, safety, mesh_bridge): if "role" in event: # It's a tool result for history messages.append(event) else: yield event + + # Turn duration report (End of Loop) + turn_duration = time.time() - turn_start_time + total_duration = time.time() - session_start_time + duration_marker = f"\n\n> **⏱️ Turn {turn} Duration:** {turn_duration:.1f}s | **Total:** {total_duration:.1f}s\n" + yield {"type": "reasoning", "content": duration_marker} + yield {"type": "status", "content": f"Turn {turn} finished in **{turn_duration:.1f}s**. (Session Total: **{total_duration:.1f}s**)"} finally: if registry and user_id: @@ -216,6 +239,12 @@ if tcd.function.arguments: t_map[idx].function.arguments += tcd.function.arguments def _format_assistant_msg(self, content, reasoning, tool_calls): + if content: + import re + # Strip system-injected turn headers from history to avoid LLM hallucination + content = re.sub(r"(?i)---\n### 🛰️ \*\*\[Turn \d+\] .*?\*\*\n", "", content) + content = content.strip() + clean_tc = [] for tc in tool_calls: clean_tc.append({ diff --git a/ai-hub/app/core/orchestration/body.py b/ai-hub/app/core/orchestration/body.py index a94cb77..d8400ff 100644 --- a/ai-hub/app/core/orchestration/body.py +++ b/ai-hub/app/core/orchestration/body.py @@ -7,11 +7,12 @@ class ToolExecutor: """Handles parallel tool dispatching and event drainage.""" - def __init__(self, tool_service: Any, user_id: str, db: Any, sync_workspace_id: str): + def __init__(self, tool_service: Any, user_id: str, db: Any, sync_workspace_id: str, session_db_id: int): self.tool_service = tool_service self.user_id = user_id self.db = db self.sync_workspace_id = sync_workspace_id + self.session_db_id = session_db_id self.event_queue = asyncio.Queue() async def _subagent_event_handler(self, event): @@ -37,6 +38,7 @@ db=self.db, user_id=self.user_id, session_id=self.sync_workspace_id, + session_db_id=self.session_db_id, on_event=self._subagent_event_handler ) ) @@ -110,7 +112,7 @@ def _truncate_result(self, result: Any) -> str: s = json.dumps(result) if isinstance(result, dict) else str(result) - limit = 8000 + limit = 32000 # Increased for better RAG/context if len(s) > limit: - return s[:limit] + f"\n...[truncated {len(s)-limit} chars]" + return s[:limit] + f"\n...[SYSTEM: Output Truncated at {limit} chars for safety. Use specific filters or file explorers if more detail is needed.]" return s diff --git a/ai-hub/app/core/orchestration/memory.py b/ai-hub/app/core/orchestration/memory.py index c2e1e68..97d49df 100644 --- a/ai-hub/app/core/orchestration/memory.py +++ b/ai-hub/app/core/orchestration/memory.py @@ -1,5 +1,7 @@ from typing import List, Dict, Any, Optional, Callable import logging +import re +import json from sqlalchemy.orm import Session from app.db import models @@ -32,6 +34,19 @@ # Augmented mesh info mesh_info = mesh_context + attached_count = mesh_info.count("- Node ID:") + + # Factual infrastructure summary (Neutral context) + inventory = f"### 🗺️ Infrastructure Inventory ({attached_count} nodes)\n" + if attached_count == 0: + inventory += "- No nodes are currently attached to this session.\n" + else: + inventory += mesh_info + if "Status: offline" in mesh_info.lower(): + inventory += "\nNote: Certain attached nodes are currently offline and unreachable.\n" + + mesh_info = inventory + if sync_workspace_id: mesh_info += f"\nActive Ghost Mirror Session ID: {sync_workspace_id}\n" mesh_info += f"Instruction: When using `mesh_file_explorer` or `mesh_sync_control`, you MUST use this exactly as the `session_id` (or leave it blank/use 'current' which I will resolve for you).\n" @@ -104,13 +119,20 @@ def _default_history_formatter(self, history: List[models.Message]) -> str: lines = [] + # Pattern to catch our injected headers + header_pattern = r"(?i)---\n###\s*.*\[Turn\s*\d+\]\s*Master-Architect\s*Analysis\n" + for msg in history: role = "Human" if msg.sender == "user" else "Assistant" content = msg.content or "" # If assistant message is empty, try to summarize its action - if role == "Assistant" and not content: - content = "[Action: Calling tools or internal reasoning...]" + if role == "Assistant": + if not content: + content = "[Action: Calling tools or internal reasoning...]" + else: + # Strip system headers from history to prevent AI imitation + content = re.sub(header_pattern, "", content) - lines.append(f"{role}: {content}") + lines.append(f"{role}: {content.strip()}") return "\n".join(lines) diff --git a/ai-hub/app/core/orchestration/profiles.py b/ai-hub/app/core/orchestration/profiles.py index 338b4c8..92ca1dd 100644 --- a/ai-hub/app/core/orchestration/profiles.py +++ b/ai-hub/app/core/orchestration/profiles.py @@ -18,9 +18,17 @@ - **Direct Terminal Answer**: If you possess the information to answer a user's question directly without tools (e.g., questions about your identity or known capabilities), provide the answer and **TERMINATE** the orchestration loop by omitting any further tool calls. - **NO SILENT ACTIONS**: You are **FORBIDDEN** from calling a tool without first providing at least one sentence of **plain text** analysis/strategy. -## ✍️ Interaction Format (MANDATORY): -1. **BRIDGE ANALYSIS (Plain Text)**: At the start of EVERY turn, write 1-2 sentences of auditable analysis. -2. **ACT**: Call the single atomic tool required for your plan. +## ✍️ Interaction Format (MANDATORY PROTOCOL): +1. **TITLE (MANDATORY)**: Your turn **MUST** begin with exactly one line: `Title: Your Specific Objective`. + - **CRITICAL**: This line must appear **BEFORE** any `` tags and before any other text. + - **WHY**: This is required for UI synchronization. +2. **BRIDGE ANALYSIS**: Provide 1-2 sentences of auditable analysis. +3. **ACT**: Call the single atomic tool required for your plan. + +## 🏁 Final Result Format: +- When the task is complete, provide a concise summary of the findings/actions. +- **MANDATORY CODE BLOCKS**: Any terminal output, directory listing, or file content MUST be wrapped in markdown code blocks (e.g. ```text ... ``` or ```bash ... ```). +- Use `### 🛰️ Final Summary` as the header for your terminal response. ## 📂 Infrastructure & Ghost Mirror: - **Node Sync Path**: All synced files are at `/tmp/cortex-sync/{{session_id}}/` on agent nodes. diff --git a/ai-hub/app/core/orchestration/stream.py b/ai-hub/app/core/orchestration/stream.py index e81dc34..8e831f0 100644 --- a/ai-hub/app/core/orchestration/stream.py +++ b/ai-hub/app/core/orchestration/stream.py @@ -7,18 +7,18 @@ def __init__(self, profile: Any): self._in_thinking_tag = False self.tag_buffer = "" + self.prefix_buffer = "" # Accumulates start of content to catch split headers self.header_sent = False self.profile = profile def reset_turn(self): self.header_sent = False - # Note: _in_thinking_tag and tag_buffer might need to persist if the turn ends abruptly - # but usually LLM closes tags before tool calls. + self.prefix_buffer = "" async def process_chunk(self, content_chunk: str, turn: int) -> AsyncGenerator[Dict[str, Any], None]: """Processes a raw content chunk, yields UI events (content, reasoning).""" self.tag_buffer += content_chunk - turn_header = f"---\n### 🛰️ **[Turn {turn}] Master-Architect Analysis**\n" + turn_header = f"---\n### 🛰️ **[Turn {turn}] Thinking...**\n" while self.tag_buffer: if not self._in_thinking_tag: @@ -28,9 +28,9 @@ if start_tag_idx != -1: strategy_part = self.tag_buffer[:start_tag_idx] if strategy_part: - modified_strategy = self._apply_turn_header(strategy_part, turn_header) - if modified_strategy: - yield {"type": "content", "content": modified_strategy} + # Flush prefix buffer if any before tag + async for event in self._flush_prefix(strategy_part, turn_header): + yield event self._in_thinking_tag = True self.tag_buffer = self.tag_buffer[start_tag_idx + len(""):] @@ -40,18 +40,32 @@ if potential_idx != -1 and "".startswith(self.tag_buffer[potential_idx:].lower()): strategy_part = self.tag_buffer[:potential_idx] if strategy_part: - modified_strategy = self._apply_turn_header(strategy_part, turn_header) - if modified_strategy: - yield {"type": "content", "content": modified_strategy} + async for event in self._flush_prefix(strategy_part, turn_header): + yield event self.tag_buffer = self.tag_buffer[potential_idx:] break else: - strategy_part = self._apply_turn_header(self.tag_buffer, turn_header) - if strategy_part: - yield {"type": "content", "content": strategy_part} - self.tag_buffer = "" + # Not in thinking tag, accumulate in prefix buffer if header not sent + if not self.header_sent: + self.prefix_buffer += self.tag_buffer + self.tag_buffer = "" + # If buffer is large enough, or we have a reason to flush + if len(self.prefix_buffer) > 200: + async for event in self._flush_prefix("", turn_header): + yield event + else: + strategy_part = self._apply_turn_header(self.tag_buffer, turn_header) + if strategy_part: + yield {"type": "content", "content": strategy_part} + self.tag_buffer = "" else: + # Inside thinking tag - reasoning should not be prefix-buffered + # But if we were buffering prefix, flush it now + if not self.header_sent and self.prefix_buffer: + async for event in self._flush_prefix("", turn_header): + yield event + lower_buf = self.tag_buffer.lower() end_tag_idx = lower_buf.find("") @@ -75,18 +89,76 @@ yield {"type": "reasoning", "content": self.tag_buffer} self.tag_buffer = "" + async def _flush_prefix(self, extra_text: str, header: str) -> AsyncGenerator[Dict[str, Any], None]: + full_text = self.prefix_buffer + extra_text + self.prefix_buffer = "" # Clear it + processed = self._apply_turn_header(full_text, header) + if processed: + yield {"type": "content", "content": processed} + def _apply_turn_header(self, text: str, header: str) -> Optional[str]: + # List of patterns to strip (hallucinated headers from various LLM generations) + strip_patterns = [ + r"(?i)^.*\[Turn\s*\d+\].*$", + r"(?i)Turn\s*\d+:\s*architecting\s*next\s*step\.*", + r"(?i)🏗️\s*BRIDGE\s*ANALYSIS:?", + r"(?i)Analysis: ", + r"(?i)---" + ] + if self.profile.silent_stream: # Aggressively strip any hallucinated headers defined in the profile for pattern in self.profile.strip_headers: - text = re.sub(pattern, "", text, flags=re.IGNORECASE) + text = re.sub(pattern, "", text, flags=re.IGNORECASE | re.MULTILINE) return text - if not text: - return text + # Even in non-silent mode, strip the patterns from the LLM's raw text to avoid duplication + for pattern in strip_patterns: + text = re.sub(pattern, "", text, flags=re.IGNORECASE | re.MULTILINE) + + # Dynamic Title Extraction + if not self.header_sent: + # 1. Primary: Look for "Title: [Title Text]" + title_match = re.search(r"(?i)Title:\s*(.*?)\n", text) + if title_match: + custom_title = title_match.group(1).strip() + header = re.sub(r"Thinking\.\.\.", custom_title, header) + text = text[:title_match.start()] + text[title_match.end():] + else: + # 2. Secondary: If AI uses a Markdown header like "### 🚀 My Title" + md_header_match = re.search(r"(?i)^###\s*.*?\s*(.*?)\n", text, re.MULTILINE) + if md_header_match: + custom_title = md_header_match.group(1).strip() + header = re.sub(r"Thinking\.\.\.", custom_title, header) + text = text[:md_header_match.start()] + text[md_header_match.end():] + + if not text.strip(): + # If after stripping the text is empty, don't send anything yet (unless it's the very first chunk) + if not self.header_sent: + return None + return "" if not self.header_sent: self.header_sent = True - return header + text + # Prepend the system's authoritative header + return header + text.lstrip() return text + async def end_stream(self, turn: int) -> AsyncGenerator[Dict[str, Any], None]: + """Flushes any remaining buffered text at the very end of the stream.""" + turn_header = f"---\n### 🛰️ **[Turn {turn}] Thinking...**\n" + + # 1. Flush prefix buffer if we haven't sent the header yet + if not self.header_sent and self.prefix_buffer: + async for event in self._flush_prefix("", turn_header): + yield event + + # 2. Flush any leftover tag buffer (rarely happens if LLM cuts off) + if self.tag_buffer: + if self._in_thinking_tag: + yield {"type": "reasoning", "content": self.tag_buffer} + else: + processed = self._apply_turn_header(self.tag_buffer, turn_header) + if processed: + yield {"type": "content", "content": processed} + self.tag_buffer = "" diff --git a/ai-hub/app/core/services/sub_agent.py b/ai-hub/app/core/services/sub_agent.py index bb41e1e..b5be12a 100644 --- a/ai-hub/app/core/services/sub_agent.py +++ b/ai-hub/app/core/services/sub_agent.py @@ -97,34 +97,48 @@ self.end_time = time.time() - # Phase 2: Completion & Intelligence Report - summary = None - if self.llm and self.result: - summary = await self._generate_summary() - + # --- UI Intelligence Report --- + # We still emit the summary/status to the UI for human readability if self.on_event: - # Emit to UI + summary = None + if self.llm and self.result: + summary = await self._generate_summary() + if summary: rep = f"🧠 {summary}" else: rep = "✅ Step Finished." if not self.error else f"❌ Step Failed: {self.error}" await self.on_event({"type": "subagent_thought", "node_id": node_id, "content": rep}) - # --- Final Intelligence Handoff --- - # We ALWAYS return a string report for the Main AI to ensure it can "read" the outcome. - report_lines = [] - if summary: - report_lines.append(f"REPORT: {summary}") - else: - # Fallback if AI summary is unavailable - raw_snippet = str(self.result)[:1000] if self.result else "No output returned." - report_lines.append(f"REPORT: (Auto-Summary Unavailable) Raw outcome: {raw_snippet}") - - report_lines.append(f"STATUS: {self.status}") - if self.error: - report_lines.append(f"ERROR: {self.error}") - - return "\n".join(report_lines) + # --- High-Fidelity Handoff with Smart Summary --- + if isinstance(self.result, dict): + # 1. Generate summary for human UI and as a 'hint' for the Master AI + summary = None + if self.llm: + # We skip summary for simple directory listings/cat if they are clean + # but ALWAYS generate for errors or large blobs + is_data_tool = any(x in self.name for x in ["ls", "cat", "explorer", "find"]) + if not is_data_tool or self.error or self.status != "COMPLETED": + summary = await self._generate_summary() + + # 2. Enrich the result object + self.result["sub_agent_status"] = self.status + if summary: + self.result["sub_agent_summary"] = summary + if self.error: + self.result["sub_agent_error"] = self.error + + return self.result + + # Fallback for non-dictionary results + final_summary = await self._generate_summary() if self.llm else "Completed" + return { + "success": not self.error, + "result": self.result, + "summary": final_summary, + "status": self.status, + "error": self.error + } async def _generate_summary(self) -> Optional[str]: """Uses LLM to summarize logs, file system changes, or errors into a single report sentence.""" diff --git a/ai-hub/app/core/services/tool.py b/ai-hub/app/core/services/tool.py index 9943d37..a2672d0 100644 --- a/ai-hub/app/core/services/tool.py +++ b/ai-hub/app/core/services/tool.py @@ -63,7 +63,7 @@ return tools - async def call_tool(self, tool_name: str, arguments: Dict[str, Any], db: Session = None, user_id: str = None, session_id: str = None, on_event = None) -> Any: + async def call_tool(self, tool_name: str, arguments: Dict[str, Any], db: Session = None, user_id: str = None, session_id: str = None, session_db_id: int = None, on_event = None) -> Any: """ Executes a registered skill. """ @@ -77,23 +77,46 @@ if db: db_skill = db.query(models.Skill).filter(models.Skill.name == tool_name).first() if db_skill and db_skill.is_system: - return await self._execute_system_skill(db_skill, arguments, user_id=user_id, db=db, session_id=session_id, on_event=on_event) + return await self._execute_system_skill(db_skill, arguments, user_id=user_id, db=db, session_id=session_id, session_db_id=session_db_id, on_event=on_event) logger.error(f"Tool '{tool_name}' not found or handled yet.") return {"success": False, "error": "Tool not found"} - async def _execute_system_skill(self, skill: models.Skill, args: Dict[str, Any], user_id: str = None, db: Session = None, session_id: str = None, on_event = None) -> Any: + async def _execute_system_skill(self, skill: models.Skill, args: Dict[str, Any], user_id: str = None, db: Session = None, session_id: str = None, session_db_id: int = None, on_event = None) -> Any: """Routes system skill execution to a stateful SubAgent.""" from app.core.services.sub_agent import SubAgent from app.core.providers.factory import get_llm_provider + # --- Programmatic Access Control (M3/M6) --- + # If targeting a mesh node, we MUST ensure it's actually attached to this session in the DB. + # This prevents AI from 'guessing' node IDs and executing on unauthorized infrastructure. + node_id = args.get("node_id") + node_ids = args.get("node_ids") + + if db and session_db_id: + session = db.query(models.Session).filter(models.Session.id == session_db_id).first() + if session: + attached = session.attached_node_ids or [] + + # Check single node target + if node_id and node_id not in attached: + logger.warning(f"[Security] AI attempted to access unattached node '{node_id}' in session {session_db_id}") + return {"success": False, "error": f"Node '{node_id}' is NOT attached to this session. Access denied."} + + # Check swarm target + if node_ids: + illegal = [nid for nid in node_ids if nid not in attached] + if illegal: + logger.warning(f"[Security] AI attempted to access unattached nodes {illegal} in swarm call") + return {"success": False, "error": f"Nodes {illegal} are NOT attached to this session. Access denied."} + + # --- Standard Preparation --- llm_provider = None orchestrator = getattr(self._services, "orchestrator", None) if not orchestrator: return {"success": False, "error": "Orchestrator not available"} assistant = orchestrator.assistant - node_id = args.get("node_id") # M3: Resolve session_id from either arguments OR the passed session_id context # (AI might use placeholders like 'current' which we resolve here) diff --git a/skills/mesh-terminal-control/logic.py b/skills/mesh-terminal-control/logic.py index d21e6c6..41236ed 100644 --- a/skills/mesh-terminal-control/logic.py +++ b/skills/mesh-terminal-control/logic.py @@ -135,7 +135,7 @@ pure_stdout = pure_stdout.split(marker)[0] sess["result"]["stdout"] = pure_stdout - sess["result"]["status"] = 1 if exit_code == 0 else 2 + sess["result"]["status"] = 0 if exit_code == 0 else 1 # Close the file handle (leaves file on disk) sess["buffer_file"].close() diff --git a/ui/client-app/src/components/ChatWindow.css b/ui/client-app/src/components/ChatWindow.css index 4956b4f..cf04b0c 100644 --- a/ui/client-app/src/components/ChatWindow.css +++ b/ui/client-app/src/components/ChatWindow.css @@ -18,6 +18,9 @@ border-radius: 1.25rem !important; font-family: 'Inter', sans-serif; animation: slideInUp 0.3s ease-out; + overflow-wrap: anywhere; + word-break: break-word; + white-space: pre-wrap; } .user-message-container { @@ -25,6 +28,9 @@ border-radius: 1.25rem !important; font-family: 'Inter', sans-serif; animation: slideInUp 0.3s ease-out; + overflow-wrap: anywhere; + word-break: break-word; + white-space: pre-wrap; } @keyframes slideInUp { @@ -71,6 +77,8 @@ color: #e2e8f0; padding: 1.25rem; border-radius: 0.75rem; + max-width: 100%; + overflow-x: auto; } .streaming-dots::after { diff --git a/ui/client-app/src/components/ChatWindow.js b/ui/client-app/src/components/ChatWindow.js index a6da748..77f7988 100644 --- a/ui/client-app/src/components/ChatWindow.js +++ b/ui/client-app/src/components/ChatWindow.js @@ -22,14 +22,8 @@ } }, [message.audioBlob]); - // Removed auto-expand behavior so AI 'think'/'tool' traces default to collapsed - // unless explicitly requested by the user, improving readability of the main answer. - // Auto-expand the thought trace while reasoning is streaming so the user sees progress - useEffect(() => { - if (message.reasoning && !message.thoughtDone && (message.status === "Thinking" || message.status?.includes("Calling tool"))) { - setIsReasoningExpanded(true); - } - }, [message.reasoning, message.status, message.thoughtDone]); + // Removed auto-expand behavior to keep UI clean during long orchestration tasks. + // The user can manually expand the trace if they wish to see inner-turn details. // Handle exclusive playback: stop if someone else starts playing useEffect(() => { @@ -109,16 +103,7 @@ return (
- {message.status && ( -
-
-
- - {message.status} - -
-
- )} + {/* Status indicator moved to top/bottom for better visibility */} {(message.reasoning || (message.status === "Thinking")) && (