diff --git a/agent-node/Dockerfile b/agent-node/Dockerfile index 8fee39c..492cba7 100644 --- a/agent-node/Dockerfile +++ b/agent-node/Dockerfile @@ -5,10 +5,13 @@ ENV PYTHONDONTWRITEBYTECODE 1 ENV PYTHONUNBUFFERED 1 -# Install system dependencies for psutil and playwright +# Install system dependencies for psutil, playwright, and networking tools RUN apt-get update && apt-get install -y \ build-essential \ curl \ + iperf3 \ + net-tools \ + iputils-ping \ libgudev-1.0-0 \ libnotify4 \ libnss3 \ diff --git a/agent-node/agent_node/node.py b/agent-node/agent_node/node.py index 90d08fc..ce289d4 100644 --- a/agent-node/agent_node/node.py +++ b/agent-node/agent_node/node.py @@ -33,6 +33,8 @@ """Collect hardware metadata to advertise at registration.""" import platform import subprocess + import socket + import os caps = { "shell": "v1", @@ -42,6 +44,43 @@ "os_release": platform.release(), } + # Privilege Detection + # is_root: True if UID 0 (Linux/macOS) β€” no sudo needed at all + # has_sudo: True if sudo is installed AND available passwordlessly + try: + caps["is_root"] = (os.getuid() == 0) + except AttributeError: + # Windows β€” os.getuid() doesn't exist; approximate via admin check + try: + import ctypes + caps["is_root"] = bool(ctypes.windll.shell32.IsUserAnAdmin()) + except Exception: + caps["is_root"] = False + + if caps.get("is_root"): + caps["has_sudo"] = False # Root doesn't need sudo + else: + # Check if passwordless sudo is available + try: + r = subprocess.run( + ["sudo", "-n", "true"], + capture_output=True, timeout=3 + ) + caps["has_sudo"] = (r.returncode == 0) + except Exception: + caps["has_sudo"] = False + + # Local IP Detection (best effort) + try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.settimeout(0) + # Doesn't even have to be reachable + s.connect(('10.254.254.254', 1)) + caps["local_ip"] = s.getsockname()[0] + s.close() + except Exception: + caps["local_ip"] = "unknown" + # GPU Detection β€” try nvidia-smi first, then check for Apple GPU try: result = subprocess.run( @@ -68,11 +107,15 @@ print(f"[*] Handshake with Orchestrator: {self.node_id}") caps = self._collect_capabilities() print(f"[*] Capabilities: {caps}") + + # Protobuf capabilities is map β€” all values must be strings + caps_str = {k: str(v).lower() if isinstance(v, bool) else str(v) for k, v in caps.items()} + reg_req = agent_pb2.RegistrationRequest( - node_id=self.node_id, + node_id=self.node_id, auth_token=AUTH_TOKEN, - node_description=NODE_DESC, - capabilities=caps + node_description=NODE_DESC, + capabilities=caps_str ) diff --git a/agent-node/agent_node/skills/shell.py b/agent-node/agent_node/skills/shell.py index 430eb61..363f343 100644 --- a/agent-node/agent_node/skills/shell.py +++ b/agent-node/agent_node/skills/shell.py @@ -6,6 +6,7 @@ import termios import struct import fcntl +import tempfile from .base import BaseSkill from protos import agent_pb2 @@ -16,6 +17,14 @@ self.sessions = {} # session_id -> {fd, pid, thread, last_activity, ...} self.lock = threading.Lock() + # Phase 3: Prompt Patterns for Edge Intelligence + self.PROMPT_PATTERNS = [ + r"[\r\n].*[@\w\.\-]+:.*[#$]\s*$", # bash/zsh: user@host:~$ + r">>>\s*$", # python + r"\.\.\.\s*$", # python multi-line + r">\s*$", # node/js + ] + # --- M7: Idle Session Reaper --- # Automatically kills dormant bash processes to free up system resources. self.reaper_thread = threading.Thread(target=self._session_reaper, daemon=True, name="ShellReaper") @@ -70,7 +79,8 @@ "fd": fd, "pid": pid, "last_activity": time.time(), - "buffer": "", + "buffer_file": None, + "tail_buffer": "", "active_task": None } @@ -84,36 +94,114 @@ decoded = data.decode("utf-8", errors="replace") - # Blocking/Sync logic + # Streaming/Sync logic (Detect completion marker) with self.lock: active_tid = sess.get("active_task") marker = sess.get("marker") - if active_tid and marker: - sess["buffer"] += decoded - if marker in decoded: + if active_tid and marker and sess.get("buffer_file"): + # Phase 2: Persistence Offloading + # Write directly to disk instead of heap memory + sess["buffer_file"].write(decoded) + sess["buffer_file"].flush() + + # Keep a tiny 4KB tail in RAM for marker detection and prompt scanning + sess["tail_buffer"] = (sess.get("tail_buffer", "") + decoded)[-4096:] + + if marker in sess["tail_buffer"]: # Marker found! Extract exit code try: - parts = sess["buffer"].split(marker) - pure_stdout = parts[0] - after_marker = parts[1].strip().split() + # The tail buffer has the marker + after_marker = sess["tail_buffer"].split(marker)[1].strip().split() exit_code = int(after_marker[0]) if after_marker else 0 - + + # Formulate final stdout summary from the disk file + bf = sess["buffer_file"] + bf.seek(0, 2) + file_len = bf.tell() + + HEAD, TAIL = 10_000, 30_000 + if file_len > HEAD + TAIL: + bf.seek(0) + head_str = bf.read(HEAD) + bf.seek(file_len - TAIL) + tail_str = bf.read() + omitted = file_len - HEAD - TAIL + pure_stdout = head_str + f"\n\n[... {omitted:,} bytes omitted (full output safely preserved at {bf.name}) ...]\n\n" + tail_str + else: + bf.seek(0) + pure_stdout = bf.read() + + # Slice off the marker string and anything after it from the final result + pure_stdout = pure_stdout.split(marker)[0] + sess["result"]["stdout"] = pure_stdout sess["result"]["status"] = 1 if exit_code == 0 else 2 + + # Close the file handle (leaves file on disk) + sess["buffer_file"].close() + sess["buffer_file"] = None + sess["event"].set() - decoded = pure_stdout + decoded = pure_stdout.split(marker)[0][-4096:] if marker in pure_stdout else pure_stdout except Exception as e: print(f" [🐚⚠️] Marker parsing failed: {e}") sess["event"].set() - # Stream raw terminal output back + # Stream terminal output back (with stealth filtering) if on_event: - event = agent_pb2.SkillEvent( - session_id=session_id, - task_id=sess.get("active_task") or "", - terminal_out=decoded - ) - on_event(agent_pb2.ClientTaskMessage(skill_event=event)) + stealth_out = decoded + if "__CORTEX_FIN_SH_" in decoded: + import re + # We remove any line that contains our internal marker to hide plumbing from user. + # This covers both the initial command echo and the exit code output. + stealth_out = re.sub(r'.*__CORTEX_FIN_SH_.*[\r\n]*', '', decoded) + + if stealth_out: + # Phase 3: Client-Side Truncation (Stream Rate Limiting) + # Limit real-time stream to 15KB/sec per session to prevent flooding the Hub over gRPC. + # The full output is still safely written to the tempfile on disk. + with self.lock: + now = time.time() + if now - sess.get("stream_window_start", 0) > 1.0: + sess["stream_window_start"] = now + sess["stream_bytes_sent"] = 0 + dropped = sess.get("stream_dropped_bytes", 0) + if dropped > 0: + drop_msg = f"\n[... {dropped:,} bytes truncated from live stream ...]\n" + event = agent_pb2.SkillEvent( + session_id=session_id, task_id=sess.get("active_task") or "", terminal_out=drop_msg + ) + on_event(agent_pb2.ClientTaskMessage(skill_event=event)) + sess["stream_dropped_bytes"] = 0 + + if sess.get("stream_bytes_sent", 0) + len(stealth_out) > 15_000: + sess["stream_dropped_bytes"] = sess.get("stream_dropped_bytes", 0) + len(stealth_out) + else: + sess["stream_bytes_sent"] = sess.get("stream_bytes_sent", 0) + len(stealth_out) + event = agent_pb2.SkillEvent( + session_id=session_id, + task_id=sess.get("active_task") or "", + terminal_out=stealth_out + ) + on_event(agent_pb2.ClientTaskMessage(skill_event=event)) + + # EDGE INTELLIGENCE: Proactively signal prompt detection + # We only check for prompts if we are actively running a task and haven't found the marker yet. + if active_tid and not sess["event"].is_set(): + import re + tail = sess["tail_buffer"][-100:] if len(sess["tail_buffer"]) > 100 else sess["tail_buffer"] + for pattern in self.PROMPT_PATTERNS: + if re.search(pattern, tail): + # Send specific prompt signal + # Use last 20 chars as the 'prompt' hint + p_hint = tail[-20:].strip() + prompt_event = agent_pb2.SkillEvent( + session_id=session_id, + task_id=active_tid, + prompt=p_hint + ) + on_event(agent_pb2.ClientTaskMessage(skill_event=prompt_event)) + break except (EOFError, OSError): break @@ -195,30 +283,44 @@ # Handle Session Persistent Process sess = self._ensure_session(session_id, cwd, on_event) - # --- 0. Busy Check: Serialize access to the PTY --- + # Check for RAW mode first (bypasses busy check for interactive control) + is_raw = cmd.startswith("!RAW:") + if is_raw: + input_str = cmd[5:] + "\n" + print(f" [🐚⌨️] RAW Input Injection: {input_str.strip()}") + os.write(sess["fd"], input_str.encode("utf-8")) + return on_complete(tid, {"stdout": "INJECTED", "status": 1}, task.trace_id) + + # --- 0. Busy Check: Serialize access to the PTY for standard commands --- with self.lock: if sess.get("active_task"): curr_tid = sess.get("active_task") return on_complete(tid, {"stderr": f"[BUSY] Session {session_id} is already running task {curr_tid}", "status": 2}, task.trace_id) # --- Blocking Wait Logic --- - marker = f"__CORTEX_FIN_SH_{int(time.time())}__" + # --- Blocking Wait Logic --- + marker_id = int(time.time()) + marker = f"__CORTEX_FIN_SH_{marker_id}__" event = threading.Event() - result_container = {"stdout": "", "status": 1} # 1 = Error/Fail by default + result_container = {"stdout": "", "status": 1} # 1 = Success by default (node.py convention) # Register waiter in session state with self.lock: sess["active_task"] = tid sess["marker"] = marker sess["event"] = event - sess["buffer"] = "" + # Create a persistent tempfile for stdout instead of RAM buffer + sess["buffer_file"] = tempfile.NamedTemporaryFile("w+", encoding="utf-8", prefix=f"cortex_task_{tid}_", delete=False) + sess["tail_buffer"] = "" sess["result"] = result_container sess["cancel_event"] = threading.Event() # Input injection: execute command then echo marker and exit code try: # 12-factor bash: ( cmd ) ; echo marker $? - full_input = f"({cmd}) ; echo \"{marker} $?\"\n" + # We use "" concatenation in the echo command to ensure the marker literal + # DOES NOT appear in the PTY input echo, preventing premature completion. + full_input = f"({cmd}) ; echo \"__CORTEX_FIN_SH_\"\"{marker_id}__\" $?\n" os.write(sess["fd"], full_input.encode("utf-8")) # Wait for completion (triggered by reader) OR cancellation @@ -239,12 +341,38 @@ # Timeout Case print(f" [🐚⚠️] Task {tid} timed out on node.") - on_complete(tid, {"stdout": sess["buffer"], "stderr": "TIMEOUT", "status": 2}, task.trace_id) + with self.lock: + if sess.get("buffer_file"): + try: + sess["buffer_file"].seek(0, 2) + file_len = sess["buffer_file"].tell() + HEAD, TAIL = 10_000, 30_000 + if file_len > HEAD + TAIL: + sess["buffer_file"].seek(0) + head_str = sess["buffer_file"].read(HEAD) + sess["buffer_file"].seek(file_len - TAIL) + tail_str = sess["buffer_file"].read() + omitted = file_len - HEAD - TAIL + partial_out = head_str + f"\n\n[... {omitted:,} bytes omitted (full timeout output saved to {sess['buffer_file'].name}) ...]\n\n" + tail_str + else: + sess["buffer_file"].seek(0) + partial_out = sess["buffer_file"].read() + except: + partial_out = "" + else: + partial_out = "" + + on_complete(tid, {"stdout": partial_out, "stderr": "TIMEOUT", "status": 2}, task.trace_id) finally: # Cleanup session task state with self.lock: if sess.get("active_task") == tid: + if sess.get("buffer_file"): + try: + sess["buffer_file"].close() + except: pass + sess["buffer_file"] = None sess["active_task"] = None sess["marker"] = None sess["event"] = None diff --git a/ai-hub/app/core/grpc/core/journal.py b/ai-hub/app/core/grpc/core/journal.py index 566c85c..dba559b 100644 --- a/ai-hub/app/core/grpc/core/journal.py +++ b/ai-hub/app/core/grpc/core/journal.py @@ -1,64 +1,220 @@ import threading +import time + class TaskJournal: - """State machine for tracking tasks through their asynchronous lifecycle.""" + """ + State machine for tracking tasks through their asynchronous lifecycle. + + Memory Pressure Strategy β€” Head + Tail buffers: + - For stream_buffer (stdout): keep first HEAD bytes (command echo + initial + context, which is the most important for understanding what was requested) + and last TAIL bytes (most recent output). The middle is replaced with a + single '[... N bytes omitted ...]' marker. + - For thought_history: same idea β€” keep first HEAD_COUNT entries (initial + reasoning context) and last TAIL_COUNT entries (most recent AI decisions). + """ + def __init__(self): self.lock = threading.Lock() - self.tasks = {} # task_id -> { "event": Event, "result": None, "node_id": str } + self.tasks = {} # task_id -> { event, result, node_id, buffers... } - def register(self, task_id, node_id=None): - """Initializes state for a new task and returns its notification event.""" - event = threading.Event() + # ---- Memory pressure limits ---- + # stream_buffer hard limit = HEAD + TAIL = 40KB + self.STREAM_HEAD_CHARS = 10_000 # ~10KB β€” preserve initial command/echo + self.STREAM_TAIL_CHARS = 30_000 # ~30KB β€” preserve most recent output + self.STREAM_MAX_CHARS = self.STREAM_HEAD_CHARS + self.STREAM_TAIL_CHARS + + # thought_history hard limit = HEAD + TAIL = 20 entries + self.THOUGHT_HEAD_COUNT = 5 # first 5 thoughts (task inception context) + self.THOUGHT_TAIL_COUNT = 15 # last 15 thoughts (most recent AI reasoning) + self.THOUGHT_MAX_COUNT = self.THOUGHT_HEAD_COUNT + self.THOUGHT_TAIL_COUNT + + threading.Thread( + target=self._cleanup_loop, daemon=True, name="JournalCleanup" + ).start() + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _trim_stream(self, buf: str, chunk: str) -> str: + """ + Append chunk to buf, enforcing the head+tail memory limit. + + When the total length exceeds STREAM_MAX_CHARS, the middle section is + replaced with one human-readable '[... N bytes omitted ...]' marker. + The head (STREAM_HEAD_CHARS) and tail (STREAM_TAIL_CHARS) are always kept. + """ + combined = buf + chunk + if len(combined) <= self.STREAM_MAX_CHARS: + return combined + + head = combined[:self.STREAM_HEAD_CHARS] + tail = combined[-self.STREAM_TAIL_CHARS:] + omitted = len(combined) - self.STREAM_HEAD_CHARS - self.STREAM_TAIL_CHARS + marker = f"\n\n[... {omitted:,} bytes omitted β€” memory pressure limit reached ...]\n\n" + return head + marker + tail + + def _trim_thoughts(self, thoughts: list, new_entry: dict) -> list: + """ + Append new_entry to thoughts, enforcing the head+tail thought limit. + + When total entries exceed THOUGHT_MAX_COUNT, the middle entries are + collapsed into a single sentinel entry. + """ + thoughts = list(thoughts) # shallow copy to avoid mutating in-place + thoughts.append(new_entry) + if len(thoughts) <= self.THOUGHT_MAX_COUNT: + return thoughts + + head = thoughts[:self.THOUGHT_HEAD_COUNT] + tail = thoughts[-self.THOUGHT_TAIL_COUNT:] + omitted = len(thoughts) - self.THOUGHT_HEAD_COUNT - self.THOUGHT_TAIL_COUNT + sentinel = { + "time": time.time(), + "thought": f"[... {omitted} intermediate thoughts omitted β€” memory limit reached ...]" + } + return head + [sentinel] + tail + + def _trim_final_stdout(self, stdout: str) -> str: + """Apply head+tail trim to a final stdout blob received from the node.""" + if len(stdout) <= self.STREAM_MAX_CHARS: + return stdout + head = stdout[:self.STREAM_HEAD_CHARS] + tail = stdout[-self.STREAM_TAIL_CHARS:] + omitted = len(stdout) - self.STREAM_HEAD_CHARS - self.STREAM_TAIL_CHARS + marker = f"\n\n[... {omitted:,} bytes omitted β€” output too large ...]\n\n" + return head + marker + tail + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + def _cleanup_loop(self): + while True: + time.sleep(300) # every 5 minutes + try: + self.cleanup() + except Exception: + pass + + def register(self, task_id: str, node_id: str = None) -> threading.Event: + """Initializes state for a new task and returns its completion event.""" + event = threading.Event() + prompt_event = threading.Event() with self.lock: self.tasks[task_id] = { - "event": event, - "result": None, - "node_id": node_id, - "stream_buffer": "" # NEW: Accumulates stdout/stderr chunks live + "event": event, + "prompt_event": prompt_event, + "result": None, + "node_id": node_id, + "stream_buffer": "", # head+tail bounded raw stdout + "thought_history": [], # head+tail bounded AI reasoning log + "created_at": time.time(), + "completed_at": None, } return event - def append_stream(self, task_id, chunk): - """Appends a real-time output chunk to the task's buffer.""" + def add_thought(self, task_id: str, thought: str) -> bool: + """Adds an AI reasoning entry to the task's history (head+tail bounded).""" + with self.lock: + if task_id not in self.tasks: + return False + entry = {"time": time.time(), "thought": thought} + self.tasks[task_id]["thought_history"] = self._trim_thoughts( + self.tasks[task_id]["thought_history"], entry + ) + return True + + def append_stream(self, task_id: str, chunk: str) -> bool: + """Appends a real-time output chunk to the task's stream buffer (head+tail bounded).""" + with self.lock: + if task_id not in self.tasks: + return False + self.tasks[task_id]["stream_buffer"] = self._trim_stream( + self.tasks[task_id]["stream_buffer"], chunk + ) + return True + + def signal_prompt(self, task_id: str) -> bool: + """Signals that an agent node detected an interactive prompt.""" with self.lock: if task_id in self.tasks: - self.tasks[task_id]["stream_buffer"] += chunk + self.tasks[task_id]["prompt_event"].set() return True return False - def fulfill(self, task_id, result): - """Processes a result from a node and triggers the waiting thread.""" + def fulfill(self, task_id: str, result: dict) -> bool: + """ + Stores the final result from a node and wakes all waiting threads. + Also trims the final stdout blob if it exceeds the memory limit. + """ with self.lock: - if task_id in self.tasks: - # Merge stream buffer into the final result if not already present - if isinstance(result, dict) and "stdout" in result: - # If result already has stdout, we prefer the final one - # but ensure we don't lose the early chunks if they weren't cumulative - pass - - self.tasks[task_id]["result"] = result - self.tasks[task_id]["event"].set() - return True - return False + if task_id not in self.tasks: + return False + # Trim oversized final stdout before storing + if isinstance(result, dict) and result.get("stdout"): + result["stdout"] = self._trim_final_stdout(result["stdout"]) + self.tasks[task_id]["result"] = result + self.tasks[task_id]["completed_at"] = time.time() + self.tasks[task_id]["event"].set() + self.tasks[task_id]["prompt_event"].set() # wake sub-agent edge sleeps + return True - def get_result(self, task_id): - """Returns the result associated with the given task ID, including the live stream buffer.""" + def fail_node_tasks(self, node_id: str, error_msg: str = "Node disconnected") -> int: + """Fulfills all pending tasks for a disconnected node with an error.""" + with self.lock: + to_fail = [ + tid for tid, t in self.tasks.items() + if t.get("node_id") == node_id and t.get("result") is None + ] + for tid in to_fail: + self.tasks[tid]["result"] = {"error": error_msg, "status": "ERROR"} + self.tasks[tid]["completed_at"] = time.time() + self.tasks[tid]["event"].set() + self.tasks[tid]["prompt_event"].set() + return len(to_fail) + + def get_result(self, task_id: str): + """Returns the enriched result for a task, or a partial snapshot if still running.""" with self.lock: data = self.tasks.get(task_id) - if not data: return None - - res = data["result"] - # If no final result yet (timeout case), return the stream buffer as stdout + if data is None: + return None + + res = data["result"] + history = data.get("thought_history", []) + if res is None: + # Task still running β€” return partial stream buffer return { - "stdout": data["stream_buffer"], - "stderr": "", - "status": "TIMEOUT", - "partial": True + "stdout": data["stream_buffer"], + "stderr": "", + "status": "RUNNING", + "partial": True, + "thought_history": history, } + + # Enrich final result with reasoning history + if isinstance(res, dict): + res = dict(res) # don't mutate stored result + res["thought_history"] = history return res - def pop(self, task_id): - """Removes the task's state from the journal.""" + def pop(self, task_id: str): + """Removes the task's state from the journal (call after result is consumed).""" with self.lock: return self.tasks.pop(task_id, None) + + def cleanup(self, max_age_s: int = 3600): + """Purges stale tasks to prevent slow memory accumulation.""" + now = time.time() + with self.lock: + to_remove = [ + tid for tid, t in self.tasks.items() + if (t["completed_at"] and (now - t["completed_at"]) > 300) # finished: keep 5m + or (now - t["created_at"]) > max_age_s # pending: keep 1h + ] + for tid in to_remove: + del self.tasks[tid] diff --git a/ai-hub/app/core/grpc/services/assistant.py b/ai-hub/app/core/grpc/services/assistant.py index feab7b1..3e270e6 100644 --- a/ai-hub/app/core/grpc/services/assistant.py +++ b/ai-hub/app/core/grpc/services/assistant.py @@ -329,7 +329,26 @@ self.journal.pop(tid) return {"error": "Timeout"} - def dispatch_single(self, node_id, cmd, timeout=30, session_id=None): + def dispatch_swarm(self, node_ids, cmd, timeout=30, session_id=None, no_abort=False): + """Dispatches a command to multiple nodes in parallel and waits for all results.""" + from concurrent.futures import ThreadPoolExecutor + + results = {} + with ThreadPoolExecutor(max_workers=len(node_ids)) as executor: + future_to_node = { + executor.submit(self.dispatch_single, nid, cmd, timeout, session_id, no_abort): nid + for nid in node_ids + } + for future in future_to_node: + node_id = future_to_node[future] + try: + results[node_id] = future.result() + except Exception as exc: + results[node_id] = {"error": str(exc)} + + return results + + def dispatch_single(self, node_id, cmd, timeout=30, session_id=None, no_abort=False): """Dispatches a shell command to a specific node.""" node = self.registry.get_node(node_id) if not node: return {"error": f"Node {node_id} Offline"} @@ -348,13 +367,25 @@ node.queue.put(req) self.registry.emit(node_id, "task_start", {"command": cmd}, task_id=tid) + # Immediate peek if timeout is 0 + if timeout == 0: + return {"status": "RUNNING", "stdout": "", "task_id": tid} + if event.wait(timeout): res = self.journal.get_result(tid) - self.journal.pop(tid) + # pop only if fully done + if res.get("status") != "RUNNING": + self.journal.pop(tid) return res - # M6: Timeout recovery. If command exceeds AI's defined expectation, we attempt to abort it - # and return whatever was captured in the stream buffer. + # M6: Timeout recovery. + if no_abort: + logger.info(f"[⏳] Shell task {tid} TIMEOUT (no_abort=True). Leaving alive on {node_id}.") + res = self.journal.get_result(tid) or {} + res["task_id"] = tid + res["status"] = "TIMEOUT_PENDING" + return res + logger.warning(f"[⚠️] Shell task {tid} TIMEOUT after {timeout}s on {node_id}. Sending ABORT.") try: node.queue.put(agent_pb2.ServerTaskMessage(task_cancel=agent_pb2.TaskCancelRequest(task_id=tid))) @@ -363,7 +394,7 @@ # Return partial result captured in buffer before popping res = self.journal.get_result(tid) self.journal.pop(tid) - return res if res else {"error": "Timeout", "stdout": "", "stderr": "", "status": "TIMEOUT"} + return res if res else {"error": "Timeout", "stdout": "", "stderr": "", "status": "TIMEOUT", "task_id": tid} def dispatch_browser(self, node_id, action, timeout=60, session_id=None): """Dispatches a browser action to a directed session node.""" @@ -394,3 +425,59 @@ return res self.journal.pop(tid) return {"error": "Timeout"} + + def wait_for_swarm(self, task_map, timeout=30, no_abort=False): + """Waits for multiple tasks (map of node_id -> task_id) in parallel.""" + from concurrent.futures import ThreadPoolExecutor + + results = {} + with ThreadPoolExecutor(max_workers=max(1, len(task_map))) as executor: + # item = (node_id, task_id) + future_to_node = { + executor.submit(self.wait_for_task, nid, tid, timeout, no_abort): nid + for nid, tid in task_map.items() + } + for fut in future_to_node: + nid = future_to_node[fut] + try: results[nid] = fut.result() + except Exception as e: results[nid] = {"error": str(e)} + return results + + def wait_for_task(self, node_id, task_id, timeout=30, no_abort=False): + """Waits for an existing task in the journal.""" + # Check journal first + with self.journal.lock: + data = self.journal.tasks.get(task_id) + if not data: + return {"error": f"Task {task_id} not found in journal (finished or expired)", "status": "NOT_FOUND"} + event = data["event"] + + # Immediate peek if timeout is 0 or event is already set + if timeout == 0 or event.is_set(): + res = self.journal.get_result(task_id) + if res.get("status") != "RUNNING": + self.journal.pop(task_id) + return res + + logger.info(f"[⏳] Re-waiting for task {task_id} on {node_id} for {timeout}s") + if event.wait(timeout): + res = self.journal.get_result(task_id) + if res.get("status") != "RUNNING": + self.journal.pop(task_id) + return res + + if no_abort: + res = self.journal.get_result(task_id) or {} + res["task_id"] = task_id + res["status"] = "TIMEOUT_PENDING" + return res + + logger.warning(f"[⚠️] Wait for task {task_id} TIMEOUT again. Sending ABORT.") + node = self.registry.get_node(node_id) + if node: + try: node.queue.put(agent_pb2.ServerTaskMessage(task_cancel=agent_pb2.TaskCancelRequest(task_id=task_id))) + except: pass + + res = self.journal.get_result(task_id) + self.journal.pop(task_id) + return res if res else {"error": "Timeout", "status": "TIMEOUT", "task_id": task_id} diff --git a/ai-hub/app/core/grpc/services/grpc_server.py b/ai-hub/app/core/grpc/services/grpc_server.py index 9cf6e4f..a999b3b 100644 --- a/ai-hub/app/core/grpc/services/grpc_server.py +++ b/ai-hub/app/core/grpc/services/grpc_server.py @@ -237,6 +237,8 @@ logger.error(f"[!] TaskStream Error for {node_id}: {e}") finally: if node_id != "unknown": + # Fulfill any pending tasks in journal with error immediately + self.journal.fail_node_tasks(node_id, "Edge node disconnected") self.registry.deregister(node_id, record=node) def _handle_client_message(self, msg, node_id, node): @@ -303,6 +305,10 @@ elif data_kind == 'prompt': event_data["data"] = se.prompt event_data["type"] = "prompt" + + # EDGE INTELLIGENCE: Signal SubAgent a prompt was detected + if se.task_id: + self.journal.signal_prompt(se.task_id) self.registry.emit(node_id, "skill_event", event_data) diff --git a/ai-hub/app/core/pipelines/rag_pipeline.py b/ai-hub/app/core/pipelines/rag_pipeline.py index b840698..d8dcb73 100644 --- a/ai-hub/app/core/pipelines/rag_pipeline.py +++ b/ai-hub/app/core/pipelines/rag_pipeline.py @@ -15,6 +15,12 @@ {mesh_context} +## Thinking and Planning: +If the user's request is complex, multi-step, or requires infrastructure analysis, you MUST explicitly think and plan before acting or answering. +Use a `` tag to outline your logic, goals, and steps. This will be extracted and shown to the user as an internal thought trace in a separate UI panel. +DO NOT put your plans, scratchpad thoughts, or reasoning in the final text. Keep all of that strictly inside the `` tags. +The text outside the `` tag should ONLY contain the final result or report. + ## Task: Generate a natural and context-aware answer using the provided knowledge, conversation history, and available tools. @@ -54,6 +60,8 @@ self.context_postprocessor = context_postprocessor or self._default_context_postprocessor self.history_formatter = history_formatter or self._default_history_formatter self.response_postprocessor = response_postprocessor + # Internal state for manual tag extraction + self._in_thinking_tag = False async def forward( self, @@ -103,8 +111,8 @@ import asyncio import time - # 2. Agentic Tool Loop (Max 5 turns to prevent infinite loops) - for turn in range(5): + # 2. Agentic Tool Loop (Max 8 turns to give multi-step tasks enough headroom) + for turn in range(8): request_kwargs = {"stream": True} if tools: request_kwargs["tools"] = tools @@ -136,17 +144,43 @@ delta = chunk.choices[0].delta # A. Handle Reasoning (Thinking) - # Some models use 'reasoning_content' (OpenAI-compatible / DeepSeek) + # Native reasoning content (from DeepSeek or OpenAI O-series) reasoning = getattr(delta, "reasoning_content", None) or delta.get("reasoning_content") if reasoning: accumulated_reasoning += reasoning yield {"type": "reasoning", "content": reasoning} - # B. Handle Content + # B. Handle Content & Manual Thinking Tags content = getattr(delta, "content", None) or delta.get("content") if content: - accumulated_content += content - yield {"type": "content", "content": content} + # Detect and tags in the stream + if "" in content: + self._in_thinking_tag = True + parts = content.split("", 1) + # Yield any text before the tag as normal content + if parts[0]: + accumulated_content += parts[0] + yield {"type": "content", "content": parts[0]} + # The rest starts the thinking block + content = parts[1] + + if "" in content: + parts = content.split("", 1) + # The text before belongs to reasoning + if parts[0]: + accumulated_reasoning += parts[0] + yield {"type": "reasoning", "content": parts[0]} + self._in_thinking_tag = False + # The text after is normal content again + content = parts[1] + + if self._in_thinking_tag: + accumulated_reasoning += content + yield {"type": "reasoning", "content": content} + else: + if content: + accumulated_content += content + yield {"type": "content", "content": content} # C. Handle Tool Calls tool_calls = getattr(delta, "tool_calls", None) or delta.get("tool_calls") @@ -180,6 +214,10 @@ messages.append(assistant_msg) # A. Dispatch all tool calls simultaneously + event_queue = asyncio.Queue() + async def subagent_event_handler(event): + await event_queue.put(event) + tool_tasks = [] for tc in processed_tool_calls: func_name = tc.function.name @@ -197,14 +235,26 @@ yield {"type": "status", "content": f"AI decided to use tool: {func_name}"} logging.info(f"[πŸ”§] Agent calling tool (PARALLEL): {func_name} with {func_args}") - + + # Surface the tool call details as a reasoning event for full transparency + tool_detail_lines = [f"πŸ”§ **Tool Call: `{func_name}`**"] + if func_args.get("command"): + tool_detail_lines.append(f"- Command: `{func_args['command']}`") + if func_args.get("node_id"): + tool_detail_lines.append(f"- Node: `{func_args['node_id']}`") + if func_args.get("node_ids"): + tool_detail_lines.append(f"- Nodes: `{', '.join(func_args['node_ids'])}`") + if func_args.get("task_map"): + tool_detail_lines.append(f"- Watching tasks: `{func_args['task_map']}`") + yield {"type": "reasoning", "content": "\n" + "\n".join(tool_detail_lines) + "\n"} + if tool_service: # Notify UI about tool execution start yield {"type": "tool_start", "name": func_name, "args": func_args} - # Create an async task for each tool call + # Create an async task for each tool call, passing the event handler tool_tasks.append(asyncio.create_task( - tool_service.call_tool(func_name, func_args, db=db, user_id=user_id) + tool_service.call_tool(func_name, func_args, db=db, user_id=user_id, on_event=subagent_event_handler) )) else: # Treat as failure immediately if no service @@ -213,26 +263,54 @@ # B. HEARTBEAT WAIT: Wait for all sub-agent tasks to fulfill in parallel wait_start = time.time() if tool_tasks: - while not all(t.done() for t in tool_tasks): + while True: + all_done = all(t.done() for t in tool_tasks) + + # Drain sub-agent thought events + while not event_queue.empty(): + ev = event_queue.get_nowait() + if ev["type"] == "subagent_thought": + yield { + "type": "reasoning", + "content": f"\n\n> **🧠 Sub-Agent [{ev.get('node_id', 'Swarm')}]:** {ev.get('content')}\n\n" + } + + if all_done: + # Drain one final time after tasks complete to catch last-batch thoughts + while not event_queue.empty(): + ev = event_queue.get_nowait() + if ev["type"] == "subagent_thought": + yield { + "type": "reasoning", + "content": f"\n\n> **🧠 Sub-Agent [{ev.get('node_id', 'Swarm')}]:** {ev.get('content')}\n\n" + } + break + elapsed = int(time.time() - wait_start) - # This status fulfills the requirement: "internal wait seconds (showing this wait seconds in chat)" yield {"type": "status", "content": f"Waiting for nodes result... ({elapsed}s)"} - await asyncio.sleep(1) + await asyncio.sleep(0.5) # C. Collect results and populate history turn for i, task in enumerate(tool_tasks): tc = processed_tool_calls[i] func_name = tc.function.name result = await task - + # Stream the result back so UI can see "behind the scenes" yield {"type": "tool_result", "name": func_name, "result": result} - + + # Serialize result, but TRUNCATE to keep context manageable. + # Large iperf3/shell outputs can cause LLMs to return empty responses. + MAX_TOOL_RESULT_CHARS = 8000 + result_str = json.dumps(result) if isinstance(result, dict) else str(result) + if len(result_str) > MAX_TOOL_RESULT_CHARS: + result_str = result_str[:MAX_TOOL_RESULT_CHARS] + f"\n...[truncated {len(result_str) - MAX_TOOL_RESULT_CHARS} chars]" + messages.append({ "role": "tool", "tool_call_id": tc.id, "name": func_name, - "content": json.dumps(result) if isinstance(result, dict) else str(result) + "content": result_str }) # --- Loop finished without return --- diff --git a/ai-hub/app/core/services/rag.py b/ai-hub/app/core/services/rag.py index a58d6c0..f37cd5b 100644 --- a/ai-hub/app/core/services/rag.py +++ b/ai-hub/app/core/services/rag.py @@ -103,6 +103,26 @@ mesh_context += f" Description: {node.description or 'No description provided.'}\n" mesh_context += f" Status: {node.last_status}\n" + caps = node.capabilities or {} + if caps.get("local_ip"): + mesh_context += f" Local IP: {caps.get('local_ip')}\n" + if caps.get("arch"): + mesh_context += f" Architecture: {caps['arch']} ({caps.get('os', 'unknown')})\n" + if caps.get("gpu") and caps["gpu"] != "none": + mesh_context += f" GPU: {caps['gpu']}\n" + + # Privilege level β€” critical for knowing whether to use sudo + # Values are stored as strings ("true"/"false") due to protobuf map + is_root = caps.get("is_root") + has_sudo = caps.get("has_sudo") + if is_root == "true" or is_root is True: + mesh_context += f" Privilege Level: root (skip sudo β€” run all commands directly)\n" + elif has_sudo == "true" or has_sudo is True: + mesh_context += f" Privilege Level: standard user with passwordless sudo\n" + elif is_root == "false" or is_root is False: + mesh_context += f" Privilege Level: standard user (sudo NOT available β€” avoid privileged ops)\n" + # If neither field exists yet (old node version), omit to avoid confusion + shell_config = (node.skill_config or {}).get("shell", {}) if shell_config.get("enabled"): sandbox = shell_config.get("sandbox", {}) diff --git a/ai-hub/app/core/services/sub_agent.py b/ai-hub/app/core/services/sub_agent.py index 7445069..f6999d1 100644 --- a/ai-hub/app/core/services/sub_agent.py +++ b/ai-hub/app/core/services/sub_agent.py @@ -1,6 +1,8 @@ import asyncio import time import logging +import json +from app.protos import agent_pb2 logger = logging.getLogger(__name__) @@ -9,11 +11,17 @@ A stateful watcher for a specific task on an agent node. Handles execution, result accumulation, and state monitoring. """ - def __init__(self, name: str, task_fn, args: dict, retries: int = 1): + def __init__(self, name: str, task_fn, args: dict, retries: int = 1, + llm_provider=None, assistant=None, subagent_system_prompt: str = None, + on_event=None): self.name = name self.task_fn = task_fn self.args = args self.retries = retries + self.llm = llm_provider + self.assistant = assistant + self.subagent_system_prompt = subagent_system_prompt + self.on_event = on_event self.status = "PENDING" self.result = None self.start_time = None @@ -25,27 +33,26 @@ self.start_time = time.time() self.status = "RUNNING" + # If AI-monitoring is disabled or not enough context, fallback to standard execution + if not self.llm or not self.assistant or not self.subagent_system_prompt: + return await self._run_standard() + + return await self._run_ai_powered() + + async def _run_standard(self): + """Legacy blocking execution with simple retry logic.""" for attempt in range(self.retries + 1): try: - # Execute the blocking assistant method (which uses TaskJournal/Event) - # in a worker thread to keep the async loop free. self.result = await asyncio.to_thread(self.task_fn, **self.args) - - # Basic error detection for retries (e.g. Node Offline or Timeout) if isinstance(self.result, dict) and self.result.get("error"): err_msg = str(self.result.get("error")).lower() - # Only retry on potentially transient network/node issues is_busy = "busy" in err_msg if is_busy or any(x in err_msg for x in ["timeout", "offline", "disconnected", "capacity", "rejected"]): if attempt < self.retries: - backoff = (attempt + 1) * 5 if is_busy else (attempt + 1) * 3 - self.status = f"RETRYING ({attempt+1}/{self.retries}) - {'Session Busy' if is_busy else err_msg}" - logger.info(f"[SubAgent] {self.name} retrying due to: {err_msg}. Backoff={backoff}s") + backoff = (attempt + 1) * 3 + self.status = f"RETRYING ({attempt+1}/{self.retries})" await asyncio.sleep(backoff) continue - - - self.status = "COMPLETED" break except Exception as e: @@ -56,10 +63,241 @@ await asyncio.sleep(2) else: self.status = "FAILED" - self.end_time = time.time() return self.result + async def _run_ai_powered(self): + """AI-powered 'Observe-Think-Act' loop for per-node task management.""" + logger.info(f"[πŸ€– SubAgent] Starting AI-powered monitoring for {self.name}") + + # 1. Initiate task with no_abort=True and respect requested timeout for init + requested_timeout = int(self.args.get("timeout", 5)) + init_timeout = min(5, requested_timeout) if requested_timeout > 0 else 5 + init_args = {**self.args, "no_abort": True, "timeout": init_timeout} + node_id = init_args.get("node_id") or "swarm" + + try: + # Emit a 'start' thought immediately for every dispatch + start_msg = f"πŸš€ Dispatching: `{self.args.get('command', '?')}` on `{node_id}` (init_timeout={init_timeout}s)" + logger.info(f" [πŸ€– SubAgent] {start_msg}") + if self.on_event: + await self.on_event({"type": "subagent_thought", "node_id": node_id, "content": start_msg}) + + res = await asyncio.to_thread(self.task_fn, **init_args) + + # Swarm handling (might return map of node_id -> result) + task_map = {} + if "task_id" in res: + task_map = {node_id: res["task_id"]} + elif isinstance(res, dict) and not any(k in res for k in ["stdout", "error"]): + # Looks like a swarm map + task_map = {nid: r.get("task_id") for nid, r in res.items() if r.get("status") == "TIMEOUT_PENDING"} + + if not task_map: + # Task completed immediately β€” emit a completion thought + status_icon = "βœ…" if not (isinstance(res, dict) and res.get("error")) else "❌" + stdout_preview = "" + if isinstance(res, dict): + raw = res.get("stdout") or res.get("error") or "" + stdout_preview = raw.strip()[-300:] if len(raw.strip()) > 300 else raw.strip() + done_msg = f"{status_icon} Quick-complete on `{node_id}`. Output preview: `{stdout_preview}`" + if self.on_event: + await self.on_event({"type": "subagent_thought", "node_id": node_id, "content": done_msg}) + logger.info(f"[πŸ€– SubAgent] Task finished immediately or failed.") + self.status = "COMPLETED" + self.result = res + self.end_time = time.time() + return res + + # 2. Intelligence Loop + max_loops = 50 + for loop in range(max_loops): + # A. FAST-PATH HEURISTIC: Check for prompts before sleeping/AI analysis + from app.core.grpc.services.assistant import TaskAssistant + peek = await asyncio.to_thread(self.assistant.wait_for_swarm, task_map, timeout=0, no_abort=True) + + heuristic_action = self._check_heuristics(peek) + if heuristic_action == "FINISH": + fast_path_reason = "Prompt detected - Finishing task." + logger.info(f" [⚑ Fast-Path] {fast_path_reason} {self.name}") + + # Emit to UI + for nid, tid in task_map.items(): + self.assistant.registry.emit(nid, "subagent_thought", fast_path_reason) + if tid: + self.assistant.journal.add_thought(tid, fast_path_reason) + + self.result = await asyncio.to_thread(self.assistant.wait_for_swarm, task_map, timeout=2) + self.status = "COMPLETED" + break + + # B. AI Analysis Loop + # Analyze with AI + analysis = await self._analyze_progress(peek) + action = analysis.get("action", "WAIT") + reason = analysis.get("reason", "") + + # C. Smart Wait: AI determines how long to wait before next tick + # Default to 5s if not specified, range 1s to 60s + wait_time = analysis.get("next_wait", 5) + try: + wait_time = max(1, min(60, int(wait_time))) + except: + wait_time = 5 + + logger.info(f" [πŸ” AI] Loop {loop}: Action={action} | Wait={wait_time}s | {reason}") + + # Emit thinking process and record in journal + for nid, tid in task_map.items(): + msg = f"{reason} (Next check in {wait_time}s)" + self.assistant.registry.emit(nid, "subagent_thought", msg) + if self.on_event: + await self.on_event({"type": "subagent_thought", "node_id": nid, "content": msg}) + if tid: + self.assistant.journal.add_thought(tid, reason) + + if action == "FINISH" or all(r.get("status") not in ["RUNNING", "TIMEOUT_PENDING"] for r in peek.values()): + # One last blocking wait to gather final result if needed + self.result = await asyncio.to_thread(self.assistant.wait_for_swarm, task_map, timeout=10) + self.status = "COMPLETED" + # Emit completion summary + for nid in task_map: + res_preview = "" + if isinstance(self.result, dict): + node_res = self.result.get(nid, self.result) + raw = (node_res.get("stdout") or node_res.get("error") or "") if isinstance(node_res, dict) else str(node_res) + res_preview = raw.strip()[-400:] if len(raw.strip()) > 400 else raw.strip() + elapsed = int(time.time() - self.start_time) + done_msg = f"βœ… Task complete on `{nid}` in {elapsed}s. Output:\n```\n{res_preview}\n```" + if self.on_event: + await self.on_event({"type": "subagent_thought", "node_id": nid, "content": done_msg}) + break + + if action == "EXECUTE": + cmd = analysis.get("command") + target_nid = analysis.get("node_id") or analysis.get("node_ids") + if cmd and target_nid: + exec_reason = f"Branching Execution: Running '{cmd}' on {target_nid}" + logger.info(f" [πŸš€ Branch] {exec_reason}") + + # Emit branch thinking + for nid in (target_nid if isinstance(target_nid, list) else [target_nid]): + self.assistant.registry.emit(nid, "subagent_thought", exec_reason) + if self.on_event: + await self.on_event({"type": "subagent_thought", "node_id": nid, "content": exec_reason}) + + # Dispatch new tasks + if isinstance(target_nid, list): + new_res = await asyncio.to_thread(self.assistant.dispatch_swarm, target_nid, cmd, no_abort=True) + for nid, r in new_res.items(): + if r.get("task_id"): + task_map[nid] = r["task_id"] + else: + new_res = await asyncio.to_thread(self.assistant.dispatch_single, target_nid, cmd, no_abort=True) + if new_res.get("task_id") and not cmd.startswith("!RAW:"): + task_map[target_nid] = new_res["task_id"] + + # Continue monitoring all tasks (old + new) + continue + + if action == "ABORT": + # Kill tasks + for nid, tid in task_map.items(): + await asyncio.to_thread(self.assistant.registry.get_node(nid).queue.put, + agent_pb2.ServerTaskMessage(task_cancel=agent_pb2.TaskCancelRequest(task_id=tid))) + self.status = "ABORTED" + self.result = {"error": "AI aborted task", "reason": analysis.get("reason")} + break + + # Dynamic sleep based on AI recommendation OR Edge Signal + await self._edge_aware_sleep(task_map, wait_time) + + self.status = "COMPLETED" + self.end_time = time.time() + return self.result + + except Exception as e: + logger.exception("[πŸ€– SubAgent] AI Intelligence Loop Crashed") + return await self._run_standard() + + async def _edge_aware_sleep(self, task_map, timeout): + """Wait for timeout OR until any node in task_map signals a prompt.""" + # Find all prompt events for our tasks + events = [] + with self.assistant.journal.lock: + for tid in task_map.values(): + if tid in self.assistant.journal.tasks: + events.append(self.assistant.journal.tasks[tid]["prompt_event"]) + + if not events: + await asyncio.sleep(timeout) + return + + def waiter(): + # Wait for ANY of the events to be set, or timeout + start = time.time() + while time.time() - start < timeout: + for ev in events: + if ev.is_set(): + return True + time.sleep(0.1) + return False + + # Run the multi-event wait in a thread to keep Hub event loop free + await asyncio.to_thread(waiter) + + def _check_heuristics(self, peek_results: dict) -> str: + """Detects common shell/REPL prompts in stdout to trigger early finish.""" + import re + # Patterns for bash, zsh, python, node, and generic prompts + # We look at the last ~100 characters of stdout for speed + PROMPT_PATTERNS = [ + r"[\r\n].*[@\w\.\-]+:.*[#$]\s*$", # bash/zsh: user@host:~$ + r">>>\s*$", # python + r"\.\.\.\s*$", # python multi-line + r">\s*$", # node/js + ] + + all_ready = True + for nid, res in peek_results.items(): + # If the task is already finished by the node, it's ready + status = res.get("status") + if status not in ["RUNNING", "TIMEOUT_PENDING"]: + continue + + stdout = res.get("stdout", "") + tail = stdout[-100:] if len(stdout) > 100 else stdout + + is_at_prompt = any(re.search(p, tail) for p in PROMPT_PATTERNS) + if not is_at_prompt: + all_ready = False + break + + return "FINISH" if all_ready and peek_results else "WAIT" + + async def _analyze_progress(self, peek_results): + """Calls LLM to analyze the live stream and decide next move.""" + try: + prompt = ( + f"SYSTEM PROMPT: {self.subagent_system_prompt}\n\n" + f"CURRENT TERMINAL STATE (SWARM):\n" + f"{json.dumps(peek_results, indent=2)}\n\n" + "INSTRUCTIONS: Analyze the status. Respond ONLY with a JSON object:\n" + "{\n" + " \"action\": \"WAIT\" | \"FINISH\" | \"ABORT\" | \"EXECUTE\",\n" + " \"reason\": \"...\",\n" + " \"next_wait\": ,\n" + " \"command\": \"\",\n" + " \"node_id\": \"\"\n" + "}\n" + "Tip: Use 'EXECUTE' for branching agency (e.g. if node 1 is ready, run a new command on node 2). " + "For long tasks, set next_wait to 10-20. For quick ticks, use 3-5." + ) + response = await self.llm.acompletion(prompt=prompt, response_format={"type": "json_object"}) + return json.loads(response.choices[0].message.content) + except: + return {"action": "WAIT", "reason": "AI analysis unavailable, continuing default wait.", "next_wait": 5} + def get_elapsed(self) -> int: if not self.start_time: return 0 diff --git a/ai-hub/app/core/services/tool.py b/ai-hub/app/core/services/tool.py index 7d18e70..e106d6e 100644 --- a/ai-hub/app/core/services/tool.py +++ b/ai-hub/app/core/services/tool.py @@ -57,7 +57,7 @@ return tools - async def call_tool(self, tool_name: str, arguments: Dict[str, Any], db: Session = None, user_id: str = None) -> Any: + async def call_tool(self, tool_name: str, arguments: Dict[str, Any], db: Session = None, user_id: str = None, on_event = None) -> Any: """ Executes a registered skill. """ @@ -71,14 +71,15 @@ if db: db_skill = db.query(models.Skill).filter(models.Skill.name == tool_name).first() if db_skill and db_skill.is_system: - return await self._execute_system_skill(db_skill, arguments) + return await self._execute_system_skill(db_skill, arguments, user_id=user_id, db=db, on_event=on_event) logger.error(f"Tool '{tool_name}' not found or handled yet.") return {"success": False, "error": "Tool not found"} - async def _execute_system_skill(self, skill: models.Skill, args: Dict[str, Any]) -> Any: + async def _execute_system_skill(self, skill: models.Skill, args: Dict[str, Any], user_id: str = None, db: Session = None, on_event = None) -> Any: """Routes system skill execution to a stateful SubAgent.""" from app.core.services.sub_agent import SubAgent + from app.core.providers.factory import get_llm_provider orchestrator = getattr(self._services, "orchestrator", None) if not orchestrator: @@ -86,10 +87,23 @@ assistant = orchestrator.assistant node_id = args.get("node_id") - session_id = args.get("session_id") # Explicit session if provided by AI + # node_id requirement is now handled per-skill below to support swarm/plural fields - if not node_id: - return {"success": False, "error": "node_id is required"} + # --- AI Sub-Agent Setup --- + llm_provider = None + subagent_prompt = skill.config.get("subagent_system_prompt") + + if db and user_id and subagent_prompt: + user = db.query(models.User).filter(models.User.id == user_id).first() + if user: + # Use user's preferred model, or fallback to system default + p_name = user.preferences.get("llm_provider", "gemini") + m_name = user.preferences.get("llm_model", "") + try: + llm_provider = get_llm_provider(p_name, m_name) + logger.info(f"[ToolService] AI Sub-Agent enabled using {p_name}/{m_name}") + except Exception as e: + logger.warning(f"[ToolService] Could not init LLM for sub-agent: {e}") # Define the task function and arguments for the SubAgent task_fn = None @@ -97,28 +111,55 @@ try: if skill.name == "mesh_terminal_control": - # Maps to TaskAssistant.dispatch_single - cmd = args.get("command") + # ... same logic ... + cmd = args.get("command", "") timeout = int(args.get("timeout", 30)) - task_fn = assistant.dispatch_single - task_args = {"node_id": node_id, "cmd": cmd, "timeout": timeout, "session_id": session_id} + session_id = args.get("session_id") + node_ids = args.get("node_ids") + no_abort = args.get("no_abort", False) + + if node_ids and isinstance(node_ids, list): + task_fn = assistant.dispatch_swarm + task_args = {"node_ids": node_ids, "cmd": cmd, "timeout": timeout, "session_id": session_id, "no_abort": no_abort} + elif node_id: + task_fn = assistant.dispatch_single + task_args = {"node_id": node_id, "cmd": cmd, "timeout": timeout, "session_id": session_id, "no_abort": no_abort} + else: + return {"success": False, "error": "node_id or node_ids is required"} + + elif skill.name == "mesh_wait_tasks": + timeout = int(args.get("timeout", 30)) + no_abort = args.get("no_abort", False) + task_map = args.get("task_map", {}) + + if not task_map: + return {"success": False, "error": "task_map is required"} + + if len(task_map) == 1: + nid, tid = next(iter(task_map.items())) + task_fn = assistant.wait_for_task + task_args = {"node_id": nid, "task_id": tid, "timeout": timeout, "no_abort": no_abort} + else: + task_fn = assistant.wait_for_swarm + task_args = {"task_map": task_map, "timeout": timeout, "no_abort": no_abort} elif skill.name == "browser_automation_agent": - # Maps to TaskAssistant.dispatch_browser + # ... existing logic ... from app.protos import agent_pb2 action_str = args.get("action", "navigate").upper() action_type = getattr(agent_pb2.BrowserAction, action_str, agent_pb2.BrowserAction.NAVIGATE) + session_id = args.get("session_id") browser_action = agent_pb2.BrowserAction( action=action_type, url=args.get("url", ""), - session_id=session_id or "" # Bridge to Browser Session + session_id=session_id or "" ) task_fn = assistant.dispatch_browser task_args = {"node_id": node_id, "action": browser_action, "session_id": session_id} elif skill.name == "mesh_file_explorer": - # Maps to TaskAssistant.ls, cat, write, rm + # ... existing logic ... action = args.get("action") path = args.get("path") explorer_sid = session_id or "__fs_explorer__" @@ -139,22 +180,20 @@ else: return {"success": False, "error": f"Unsupported action: {action}"} - if task_fn: - # Create and run the SubAgent + # Create and run the SubAgent (potentially AI-powered) sub_agent = SubAgent( - name=f"{skill.name}_{node_id}", + name=f"{skill.name}_{node_id or 'swarm'}", task_fn=task_fn, args=task_args, - retries=2 # Allow 2 retries for transient node issues + retries=0 if skill.name in ["mesh_terminal_control", "mesh_wait_tasks"] else 2, + llm_provider=llm_provider, + assistant=assistant, + subagent_system_prompt=subagent_prompt, + on_event=on_event ) res = await sub_agent.run() - # Post-process specific results to be more AI-friendly - if skill.name == "mesh_file_explorer" and args.get("action") == "list": - if isinstance(res, dict) and "files" in res: - res = self._format_ls_result(res, node_id, path) - # Standardize output for AI if isinstance(res, dict) and "error" in res: return {"success": False, "error": res["error"], "sub_agent_status": sub_agent.status} diff --git a/ai-hub/app/core/skills/definitions.py b/ai-hub/app/core/skills/definitions.py index f1ecae4..8da71f8 100644 --- a/ai-hub/app/core/skills/definitions.py +++ b/ai-hub/app/core/skills/definitions.py @@ -3,8 +3,26 @@ SYSTEM_SKILLS = [ { "name": "mesh_terminal_control", - "description": "Execute stateful shell commands and manage terminal sessions across the agent mesh.", - "system_prompt": "You are an expert linux terminal operator. When using this skill, you have direct access to a PTY. Format commands clearly and wait for confirmation if they are destructive.", + "description": "Execute stateful shell commands and manage terminal sessions across the agent mesh (Swarm Control).", + "system_prompt": ( + "You are a high-level Mesh Orchestrator. When executing commands:\n" + "1. **Parallel Execution**: Use 'node_ids' (plural) for simultaneous swarm sweeps.\n" + "2. **Immediate Knowledge**: Calls return as soon as the task finishes. If a task takes 1s but you set timeout=60, you get the result in 1s.\n" + "3. **Asynchronous Polling**: For background tasks, set 'no_abort=True'. If it times out, you get 'TIMEOUT_PENDING'. " + "You can then use 'mesh_wait_tasks' with 'timeout=0' to peek at progress without blocking your turn.\n" + "4. **Interactive Sub-shells**: Subsequent REPL inputs MUST use the `!RAW:` prefix.\n" + "5. **Swarm Flow**: To start a background server (e.g. iperf3 -s) and move to node 2 immediately, " + "use 'no_abort=True' and a SMALL 'timeout' (e.g. 2s). Never block your planning turn waiting for a persistent service.\n" + "6. **Privilege-Aware Commands**: Each node's 'Privilege Level' is shown in the mesh context. " + "Use it to decide how to run privileged operations:\n" + " - 'root': Run commands directly (no sudo prefix needed or available).\n" + " - 'standard user with passwordless sudo': Prepend sudo to privileged commands.\n" + " - 'standard user (sudo NOT available)': Avoid privileged ops or inform the user.\n" + "7. **iperf3 Speed Test Pattern**: " + "Step A: On the server node, run 'iperf3 -s -D' (daemon mode) with timeout=3, no_abort=True. " + "Step B: On the client node, run 'iperf3 -c -t 10' with timeout=20. " + "Node IPs are in the mesh context." + ), "skill_type": "remote_grpc", "is_enabled": True, "features": ["chat", "swarm_control"], @@ -12,15 +30,57 @@ "service": "TerminalService", "method": "Execute", "capabilities": ["shell", "pty", "interactive"], - "parameters": { + "subagent_system_prompt": ( + "You are an autonomous Per-Node Monitoring Agent. Your goal is to watch the terminal output of a command and decide its state.\n" + "- If the command output shows it is clearly finished or at a prompt (e.g. >>> or $), respond with action: FINISH.\n" + "- If the output shows a prompt asking for input (e.g. [yes/no], [y/n], or 'Password:'), respond with action: EXECUTE and command: '!RAW:yes' (or appropriate response).\n" + "- If the output shows it is still processing and making progress, respond with action: WAIT.\n" + "- If the output has stopped for a long time or looks stuck in an infinite loop without useful output, respond with action: ABORT.\n" + "Respond ONLY in JSON." + ), + "parameters": { "type": "object", "properties": { - "command": {"type": "string", "description": "The shell command to execute."}, - "node_id": {"type": "string", "description": "The target node ID within the mesh."}, - "timeout": {"type": "integer", "description": "The max seconds to wait for result. Default 30. Use for long-running tasks."}, - "session_id": {"type": "string", "description": "Optional persistent session ID. Use same ID to preserve bash state (cd, env). Leave empty for parallel execution."} + "command": {"type": "string", "description": "Command to run. Use !RAW: prefix for REPL inputs."}, + "node_id": {"type": "string", "description": "Target node ID."}, + "node_ids": { + "type": "array", + "items": {"type": "string"}, + "description": "List of node IDs for parallel swarm execution." + }, + "timeout": {"type": "integer", "description": "Max seconds to wait. Default 30."}, + "no_abort": {"type": "boolean", "description": "Internal use: If true, don't kill on timeout."}, + "session_id": {"type": "string", "description": "Optional persistent session ID."} }, - "required": ["command", "node_id"] + "required": ["command"] + } + }, + "is_system": True + }, + { + "name": "mesh_wait_tasks", + "description": "Smartly poll or wait for background tasks.", + "system_prompt": ( + "Wait for 'TIMEOUT_PENDING' tasks. This uses an AI sub-agent to monitor progress. " + "It will return as soon as the sub-agent detects completion." + ), + "skill_type": "remote_grpc", + "is_enabled": True, + "features": ["chat", "swarm_control"], + "config": { + "subagent_system_prompt": "Monitor the provided task progress. Respond action: FINISH if done, else WAIT.", + "parameters": { + "type": "object", + "properties": { + "task_map": { + "type": "object", + "additionalProperties": {"type": "string"}, + "description": "Map of node_id -> task_id for tasks to wait on." + }, + "timeout": {"type": "integer", "description": "How much longer to wait in seconds. Default 30."}, + "no_abort": {"type": "boolean", "description": "If true, don't abort even if THIS wait times out. Default false."} + }, + "required": ["task_map"] } }, "is_system": True diff --git a/deploy_prod.sh b/deploy_prod.sh index 11403b1..f146992 100755 --- a/deploy_prod.sh +++ b/deploy_prod.sh @@ -91,5 +91,3 @@ EOF echo "Done! The new code is deployed to $HOST." -echo "CRITICAL: Run the automated Frontend Health Check now to verify production stability." -echo "Command: /frontend_tester" diff --git a/docs/architecture/ai_file_sync_integration.md b/docs/architecture/ai_file_sync_integration.md new file mode 100644 index 0000000..ad10069 --- /dev/null +++ b/docs/architecture/ai_file_sync_integration.md @@ -0,0 +1,82 @@ +# πŸ“ Ghost Mirror & File Sync: AI Integration Plan + +## Currently Implemented Architecture (The Baseline) + +The Cortex Swarm features a robust bidirectional file synchronization engine ("Ghost Mirror") built over the bidirectional gRPC task stream. + +### 1. Agent Node (`core/sync.py`, `core/watcher.py`) +- **Real-Time Watcher**: The Node uses `watchdog` to monitor its local workspace folder. When a user or command modifies a file, it chunks the file (`64KB` chunks) into `FilePayload` Protobufs and streams them to the Hub. +- **FS Controller**: The Node listens for `SyncControl` gRPC commands: + - **Watch Controls**: `START_WATCHING`, `STOP_WATCHING`, `LOCK` (blocks local edits). + - **Explorer Actions**: `LIST`, `READ`, `WRITE`, `DELETE`. + +### 2. AI Hub Server (`core/grpc/core/mirror.py`, `assistant.py`) +- **Ghost Mirror**: The Hub receives the `FilePayload` chunks and maintains an exact, real-time replica of the workspace on the Hub's local disk at `/app/data/mirrors/{session_id}`. +- **Mesh Explorer**: The `AssistantService` exposes `.ls()`, `.cat()`, `.write()`, and `.rm()` which send `SyncControl` commands over the network to the Node and wait for the result. +- **AI Access**: The current `mesh_file_explorer` skill uses these `AssistantService` functions, meaning **every time the AI reads a file, it does a full network round-trip to the node**, waiting for the node to read it and send it back. + +## πŸ’‘ Strategic Swarm Use Cases (Why this is powerful) + +The file sync infrastructure (`Ghost Mirror`) is incredibly powerful for the AI. Because the sync engine guarantees eventual consistency across assigned nodes and the central Hub mirror, it natively unlocks several advanced Swarm workflows: + +### Use Case 1: Centralized Large Scale Refactoring (Continuous Integration Flow) +When the AI is tasked with refactoring an entire codebase, it can use the `mesh_file_explorer` to apply massive multi-file changes entirely on the Hub's local mirrored repository. +- **The Flow**: The AI modifies the Hub's master files instantly. The `Ghost Mirror` automatically syncs these changes out to the specific Edge Node. +- **Testing**: Once the AI finishes the refactor, it simply calls the `execute` skill on that remote edge node (e.g., `npm run test`). The code is already there. If it fails, the AI iterates locally on the hub. +- **Benefit**: The AI operates in a clean, consistent local environment without fighting SSH streams or network latency during the heavy editing phase. + +### Use Case 2: Multi-Agent Collaboration & State Sharing +A single synced directory can act as a **live shared memory state** across multiple disjointed agent nodes that are otherwise unaware of each other. +- **The Flow**: Node A is running a backend scraper script. Node B is running a Web UI testing tool. Both are mapped to the same logical `session_id` workspace. +- **State Bridge**: Node A writes its scraped data to `shared/data.json`. The Hub's `Ghost Mirror` immediately replicates that change down to Node B. Node B's crawler detects the new file and begins processing it. +- **Benefit**: By exposing a synced folder, individual node agents can work collaboratively as in a true distributed system, passing data physically rather than relying entirely on complex gRPC signals. + +--- + +## 🎯 The Problem + +Though the AI can technically access files using the `mesh_file_explorer` skill, the current implementation is **inefficient**. If the AI is debugging a file in a synced workspace (`session_id`), it is wasting time doing network round-trips to the Agent Node for `ls` and `cat` commands. + +Since the Hub *already maintains a perfect local Ghost Mirror* (`/app/data/mirrors`), the AI should be querying the local Hub disk instantly. + +--- + +## πŸš€ AI Integration TODO Phase Plan + +### Phase 1: Local Mirror Fast-Paths (Zero-Latency File Explorer) +**Goal**: Reduce AI file exploration latency from ~1000ms (network) to ~1ms (local disk). + +**Implementation Blueprint (For AI reading this document):** +- **Target File**: Modifications should primarily happen in `/app/ai-hub/app/core/grpc/services/assistant.py` where `ls` and `cat` are defined. +- **Task**: Intercept `ls` and `cat` if `session_id != "__fs_explorer__"`. + - The `__fs_explorer__` session is a special session used by the UI to browse arbitrarily anywhere on the machine. This *must* still use gRPC. + - Any other UUID `session_id` means the Hub has a mirror at `/app/data/mirrors/{session_id}`. +- **Local `ls`**: + - Use `os.scandir` or `os.walk` on the local Hub path (`/app/data/mirrors/{session_id}/{path}`). + - Construct a dictionary matching the existing UI response signature: `{"path": path, "files": [{"name": "file.txt", "size": 123, "is_dir": False}, ...]}`. + - Return this instantly. Bypassing the `self.journal.register(...)` and `node.queue.put(...)` entirely. +- **Local `cat`**: + - Use Python's built-in `open(path, 'r').read()` on the local Hub mirror. + - Return `{"path": path, "content": text}`. +- **Reconciled `write`/`rm`**: + - Update the Hub's local mirror synchronously using Python's `os` and `shutil` tools. + - *Keep* the `node.queue.put(FileSyncMessage(SyncControl.WRITE))` line, but make it "fire and forget" or await it concurrently, returning Success to the AI instantly. + +### Phase 2: Active AI Sync Orchestration +**Goal**: Empower the Swarm AI to autonomously manage replication and locks across nodes. + +**Implementation Blueprint (For AI reading this document):** +- **Target Files**: Create or update `/app/ai-hub/app/core/skills/definitions/mesh_sync_control.json` and map it in `/app/ai-hub/app/core/services/tool.py`. +- **Capability Signatures**: + - `start_sync(node_id: str, path: str)`: Sends `SyncControl.START_WATCHING` via `AssistantService` to instruct a new node edge to hook into the mesh. + - `lock_node(node_id: str)`: Sends `SyncControl.LOCK` to prevent a human dev from altering files while the SubAgent is running multi-file edits. + - `resync_node(node_id: str)`: Sends `SyncControl.RESYNC` to force the node to hash-check itself against the master mirror to fix desync errors naturally. + +### Phase 3: Autonomous Conflict Resolution +**Goal**: Allow the AI to act as the ultimate "git merge" authority over the distributed filesystem. + +**Implementation Blueprint (For AI reading this document):** +- **Event Tunnel**: In `/app/ai-hub/app/core/grpc/services/assistant.py` or the main task stream router, intercept `SyncStatus.RECONCILE_REQUIRED` events. +- **Action**: Instead of just warning the UI, drop an `Observation` event directly into the SubAgent's `RagPipeline` queue. + - "Warning: Node A has drifted. Hash mismatch on `/src/lib.js`." +- **New Skill**: Provide the AI with an `inspect_drift(node_id, file_path)` skill which gives a unified diff of what the Hub thinks the file looks like vs. what the Node actually has, empowering the AI to issue the decisive write. diff --git a/docs/architecture/cortex_project_todo.md b/docs/architecture/cortex_project_todo.md index 275c3d4..96135a8 100644 --- a/docs/architecture/cortex_project_todo.md +++ b/docs/architecture/cortex_project_todo.md @@ -1,72 +1,71 @@ -# πŸ“ Cortex Distributed Agent: Project TODO List +# πŸ“ Cortex Distributed Agent: Master Project Roadmap -This document tracks prioritized tasks, technical debt, and future implementations for the Cortex project. +This document consolidates all prioritized tasks, technical debt, swarm optimizations, and future strategic implementations for the Cortex project. -## πŸš€ High Priority (Infrastructure) +--- -### [ ] Persistent Sub-Worker Bridges (CDP/LSP) - βœ… FOUNDATIONS BUILT -- **Status**: Basic Navigation, Screenshotting, and Persistent Session state implemented. -- **Goal**: Support a professional, high-fidelity **Antigravity Browser Skill**. +## βœ… Completed Milestones (Core & Swarm Optimization) -#### 🌐 Comprehensive Browser Skill Requirements: -- **[ ] JS Console Tunnel**: Pipe `console.log/error` from the browser back to the server in real-time. -- **[ ] Network Observability**: Capture and return XHR/Fetch traffic (HAR or failed requests only) for AI debugging. -- **[ ] A11y Tree Perception**: Provide the **Accessibility Tree** (JSON) to the AI instead of just raw HTML/DOM for better semantic understanding. -- **[ ] Advanced Interactions**: Support `Hover`, `Scroll`, `Drag & Drop`, and `Multi-key` complex input. -- **[ ] EVAL Skill**: Allow the AI to inject and execute arbitrary JavaScript (`page.evaluate()`) to extract data or trigger events. -- **[ ] Smart Wait Logic**: Implement `wait_for_network_idle`, `wait_for_selector`, and custom predicates to reduce task flakiness. -- **[ ] Artifact Extraction**: Export high-definition **Videos** (chunked) and **HAR** files for audit trails. +### 1. πŸ•’ LLM Monitoring Latency (Resolved) +- **Fast-Path Pattern Heuristics**: Regex detection for instant completions. +- **Adaptive AI-Driven Polling**: AI specifies `next_check_seconds` instead of static backoff. +- **Edge Intelligence (Shift Left)**: Agent Node PTY reader actively scans output and fires prompt detection events to wake up sleeping SubAgents. -### [ ] Multi-Tenancy & Resource Isolation -- **Description**: Isolate node groups by user/tenant and enforce hardware quotas. -- **Why**: Allows the Main AI (Antigravity) to manage resource usage and forcefully **cancel zombie tasks** that may be hanging or orphaned, ensuring node health. +### 2. 🎭 Swarm Choreography (Resolved) +- **Branching Agency (`EXECUTE` Action)**: SubAgent natively handles cross-node orchestrations dynamically based on terminal state. -### [ ] Binary Artifact & Large Data Handling (Chunking) -- **Description**: Implement gRPC stream-based chunking for large artifacts. -- **Specific Case**: Support high-fidelity **Video Recordings** from Browser sessions (multi-GB files). -- **Requirement**: **Transparency**. The Main AI should just see a "File" result; reassembly happens at the server layer. +### 3. πŸ’Ύ PTY Memory Pressure & Logging (Resolved) +- **TaskJournal Head+Tail Bounds**: 10KB Head + 30KB Tail max memory footprint for AI context. +- **Persistence Offloading**: Nodes stream gigabyte outputs natively to temp disk files instead of expanding the application heap. +- **Client-Side Truncation**: 15KB/sec rate limit on PTY execution to protect the Orchestrator's gRPC stream from being DDOSed. +- **Graceful Shutdown**: SIGTERM/SIGINT grace periods for local node cleanup. +- **Ghost Mirror**: Workspace bidirectional synchronization fully operational. + +--- + +## πŸš€ High Priority (Infrastructure & Scalability) + +### 1. πŸ•ΈοΈ Hub GIL Contention & Scalability +*Goal: Scale horizontally to handle 100+ simultaneous connected nodes.* +- **[ ] Multiprocessing for Serialization**: Refactor `dispatch_swarm` to use `ProcessPoolExecutor` to bypass GIL for heavy Protobuf signing/serialization. +- **[ ] Async gRPC Internalization**: Migrate Orchestrator fully to `grpc.aio` to handle concurrent streaming and queue management without thread locks. +- **[ ] Sharded Registry**: Distribute connections across multiple Hub instances using Redis as a shared state/journal layer. + +### 2. 🌐 Comprehensive Browser Skill (Antigravity CDP) +*Goal: Support a professional, high-fidelity browser interaction layer natively.* +- **[ ] JS Console & Network Tunnels**: Pipe `console.log/error` and XHR/Fetch traffic (HAR) back to the AI. +- **[ ] A11y Tree Perception**: Provide the Accessibility Tree (JSON) to the AI instead of raw DOM for semantic control. +- **[ ] Advanced Interactions**: Add Hover, Scroll, Drag & Drop, Multi-key injection, and EVAL javascript extraction capabilities. +- **[ ] Smart Wait Logic**: `wait_for_network_idle` and custom predicates to eliminate cross-node browser task flakiness. + +### 3. πŸ“¦ Binary Artifact & Large Data Chunking +*Goal: Transmit massive payloads smoothly.* +- **[ ] gRPC Chunking for Large Files**: Enable streaming of high-def browser session videos or database dumps over the task channel without hitting message size limits. + +### 4. 🏒 Multi-Tenancy & Resource Isolation +*Goal: Security and fairness.* +- **[ ] Tenant Segregation**: Isolate node groups by tenant boundary, enforce hardware quotas, and allow the Hub to forcefully reap zombie tasks orphaned on nodes. + +### 5. πŸ’Ύ Server-Side Task Persistence +*Goal: Hub resilience.* +- **[ ] DB Backend Integration**: Migrate `NodeRegistry` and `WorkPool` states from in-memory dicts to Postgres/Redis. Deferred to full system integration. + +--- + +## 🐒 Low Priority & Future Strategic Roadmap ### [ ] Architectural Refinement: Unified Worker Shim -- **Description**: Re-evaluate the "Skill" abstraction. Move towards a model where each task is a specialized worker process that decides its capability (Shell vs Playwright) at startup. -- **Goal**: Simplifies context isolation and reduces manager-thread overhead. +- Move from a Python "Skill" abstraction to isolated background worker processes per task (a dedicated Shell process, a dedicated Playwright daemon) for better fault isolation. -### [x] Graceful Shutdown & Local Task Persistence (Built-in) -- **Description**: Handle node interrupts (SIGTERM/SIGINT) to allow workers to finish or checkpoint. Store a local `task_history.json` on the node to recover state after crash/restart. -- **Status**: SIGTERM/SIGINT implemented in Phase 4 refactor. - -### [ ] Server-Side Registry & Task Persistence -- **Description**: Migrate `NodeRegistry` and `WorkPool` from in-memory to a persistent backend (Postgres/Redis). -- **Priority**: Deferred until **Full System Integration** phase. - -### [x] Workspace Mirroring & Efficient File Sync (Phase 5: Distributed Conflict Resolution) -- Ghost Mirror server-side patterns, real-time sync, isolation, multi-node sync, ignore filters, lock management, and browser skill integration. - -### [ ] Real-time gRPC Log Streaming -- **Description**: Bidirectional stream for live `stdout/stderr`. - ---- - -## 🐒 Low Priority / Observation - -### [ ] OS-Level Isolation (Firecracker/VNC) -- **Description**: Lightweight virtualization (microVMs) for worker execution. -- **Status**: Monitored. - -### [ ] Node Lifecycle: Auto-Updates -- **Description**: Mechanism for nodes to self-update. - -### [ ] Vertical & Horizontal Scalability -- **Description**: Migrate to a stateless server design with load balancing. - ---- - -## πŸ—ΊοΈ Future Roadmap (Strategic) +### [ ] OS-Level Isolation (Firecracker/microVMs) +- Run arbitrary AI tasks in automated, ephemeral microVMs for strict security sandboxing, instead of bare metal or standard containers. ### [ ] Advanced Scheduling & Capability Routing -- **Description**: Sophisticated scheduler to match complex constraints (GPU, Region, Priority). +- Sophisticated task scheduler matching jobs against GPU limits, regions, or specific OS hardware constraints dynamically. -### [ ] mTLS Certificate Lifecycle Management -- **Description**: Automated renewal, revocation, and rotation of node certificates. +### [ ] Infrastructure Automation +- **Node Auto-Updates**: Secure execution pipeline for deployed agent nodes to pull and self-update their binaries/versions. +- **mTLS Lifecycle Management**: Automated certificate renewal, revocation, and rotation. ### [ ] Immutable Audit & Compliance -- **Description**: Cryptographically signed records of every TaskRequest and TaskResponse for forensics. +- Cryptographically signed forensic logging for every `TaskRequest` and `TaskResponse` passing through the Hub. diff --git a/docs/swarm_architecture_analysis.md b/docs/swarm_architecture_analysis.md new file mode 100644 index 0000000..ba75f00 --- /dev/null +++ b/docs/swarm_architecture_analysis.md @@ -0,0 +1,87 @@ +# Swarm Control Architecture Analysis + +This document provides a deep dive into the internal mechanics of the Cortex Swarm Control system, tracing a command from the user interface to the remote agent node execution and back. + +## 1. System Components +- **Frontend (MultiNodeConsole/ChatWindow)**: The user interface for interaction. +- **AI Hub (Orchestrator)**: The central brain managing sessions, RAG, and node communication. +- **Task Assistant/Journal**: Manages task registration, signing, and state tracking. +- **Agent Node (Client)**: Remote worker executing commands via gRPC. +- **AI Sub-Agent**: Specialized autonomous loop that monitors PTY progress. + +## 2. Command Flow Trace + +### Phase 1: Request & Tool Dispatch +1. **User Input**: A user types a command or request into the chat. +2. **Main LLM Loop**: `RagPipeline` calls the LLM with the current mesh context. The LLM decides to use `mesh_terminal_control`. +3. **Tool Dispatching**: `ToolService` intercepts the request. It initializes a **Sub-Agent** to manage the lifecycle of this specific terminal task. +4. **Task Registration**: The `AssistantService` generates a unique `task_id` and registers it in the `TaskJournal`. It signs the command payload using an RSA-PSS signature to ensure integrity. + +### Phase 2: Hub-to-Node Transmission (gRPC) +5. **Queueing**: The signed task is put into the target node's outbound queue (`node.queue`). +6. **Streaming**: The `AgentOrchestrator` service (running a bidirectional gRPC stream) pops the message and sends a `ServerTaskMessage` (Protobuf) to the connected Agent Node. + +### Phase 3: Remote Execution (Agent Node) +7. **Node Reception**: The Agent Node's gRPC client receives the message and validates the signature. +8. **Shell Execution**: The `ShellSkill` initializes a pseudo-terminal (PTY) for the command. This provides a stateful session where environment variables and working directories persist. +9. **Real-time Feedback**: Standard output (stdout) and error (stderr) are immediately streamed back to the Hub via `TaskResult` messages. + +### Phase 4: Monitoring & Decision (The Sub-Agent) +10. **State Tracking**: `grpc_server.py` on the Hub receives the output chunks and updates the `TaskJournal`. It also broadcasts these via WebSockets for the UI's real-time terminal display. +11. **AI Analysis**: The **Sub-Agent** on the Hub runs in parallel. Every few seconds, it evaluates the `thought_history` and current `stdout` of the task. +12. **Branching Agency**: The Sub-Agent determines if the task is `FINISH`, `WAIT`, `ABORT`, or `EXECUTE`. If `EXECUTE` is chosen, the Sub-Agent can launch **new commands** on any node in the swarm (e.g., pivot to Node-B after Node-A is ready). +13. **Autonomy**: It returns the final result once the entire coordinated chain of completion is detected. + +### Phase 5: Result Aggregation +13. **Main Response**: `RagPipeline` receives the Sub-Agent's final report and feeds it back to the main LLM to generate the final user-facing answer. + +## 3. Architecture Diagram + +```mermaid +sequenceDiagram + participant User as πŸ’» User (UI) + participant Hub as 🧠 AI Hub (Orchestrator) + participant Sub as πŸ€– Sub-Agent (Monitor) + participant Node as πŸ“‘ Agent Node (PTY) + + User->>Hub: "List files on Node-A" + Hub->>Hub: LLM: Use mesh_terminal_control + Hub->>Sub: Instantiate Sub-Agent + Sub->>Hub: dispatch_single(cmd, node_a) + Hub->>Hub: TaskJournal.register(tid) + Hub->>Node: gRPC: TaskRequest (Signed) + + loop Monitoring Loop + Node-->>Hub: gRPC: TaskResult (stdout) + Hub-->>User: WS: task_stdout (Live Terminal) + Sub->>Hub: peek_journal(tid) + Sub->>Sub: LLM Decision: WAIT/FINISH/EXECUTE + opt Branching Action + Sub->>Hub: dispatch_single(new_cmd, node_b) + Hub->>Node: gRPC: New Task (Signed) + end + end + + Sub->>Hub: Task Done + Hub->>User: "Here is the list of files..." +``` + +## 4. Performance & Bottlenecks Analysis + +### Current Bottlenecks +1. **Hub Network Serialization**: For massive swarms (100+ nodes), the `ThreadPoolExecutor` in `dispatch_swarm` may encounter GIL contention or network I/O limits on the Hub. +2. **LLM Latency**: The Sub-Agent's "Observation Loop" relies on repeated LLM calls to detect task completion. This adds cost and latency (1-3 seconds per "intelligence" tick). +3. **PTY Memory**: Large terminal buffers (scrollback) are stored in memory on both the Hub and the Node. Extremely long-running commands with massive output can lead to memory pressure. + +### Limitations +1. **Synchronous Tool Wait**: The `RagPipeline` currently blocks while waiting for the `ToolService` to return. This prevents the user from asking follow-up questions *while* a background task is running, unless `no_abort=True` is used for asynchronous polling. +2. **Context Window Limits**: Passing full terminal history back to the LLM for every monitoring tick can quickly saturate the context window of smaller models. + +### Potential Issues +1. **Zombie Tasks**: If a node disconnects abruptly, the PTY process on the node might continue to run (dangling process) if the `TaskCancel` message fails to reach it. +2. **Race Conditions**: In `dispatch_swarm`, results are returned in a dictionary keyed by `node_id`. If multiple tasks are dispatched to the same node in very rapid succession, `session_id` conflicts must be avoided (mitigated by using unique PTY sessions). + +## 5. Future Recommendations +- **Edge Intelligence**: Move the terminal monitoring logic (Sub-Agent) to the local Agent Node to reduce Hub-to-Node traffic and Hub-side LLM costs. +- **WebSocket Binary Compression**: Compress terminal output before sending it to the browser to improve performance on high-latency connections. +- **Persistence Layer**: Move `TaskJournal` state to Redis or a database to allow Hub restarts without losing task monitoring state. diff --git a/ui/client-app/src/components/ChatArea.js b/ui/client-app/src/components/ChatArea.js index 73396c3..90ebb82 100644 --- a/ui/client-app/src/components/ChatArea.js +++ b/ui/client-app/src/components/ChatArea.js @@ -38,9 +38,9 @@ }, [chatHistory]); return ( -
+
{/* Scrollable ChatWindow */} -
+
diff --git a/ui/client-app/src/components/ChatWindow.css b/ui/client-app/src/components/ChatWindow.css index 910f1c5..4956b4f 100644 --- a/ui/client-app/src/components/ChatWindow.css +++ b/ui/client-app/src/components/ChatWindow.css @@ -102,4 +102,36 @@ .assistant-message strong { font-weight: 700; color: #6366f1; +} + +/* Sub-agent thought blocks */ +.thought-panel blockquote { + border-left: 2px solid rgba(99, 102, 241, 0.4) !important; + background: rgba(99, 102, 241, 0.05) !important; + margin: 0.75rem 0 0.75rem 1rem !important; + padding: 0.6rem 0.8rem !important; + border-radius: 0.5rem !important; + font-size: 0.7rem !important; + line-height: 1.4 !important; + color: #4f46e5 !important; +} + +.dark .thought-panel blockquote { + color: #818cf8 !important; + background: rgba(129, 140, 248, 0.05) !important; +} + +.thought-panel blockquote p { + margin: 0 !important; +} + +.thought-panel blockquote strong { + color: #4338ca; + text-transform: uppercase; + letter-spacing: 0.025em; + font-size: 0.65rem; +} + +.dark .thought-panel blockquote strong { + color: #a5b4fc; } \ No newline at end of file diff --git a/ui/client-app/src/components/ChatWindow.js b/ui/client-app/src/components/ChatWindow.js index 5a9d002..0f4859c 100644 --- a/ui/client-app/src/components/ChatWindow.js +++ b/ui/client-app/src/components/ChatWindow.js @@ -22,6 +22,12 @@ } }, [message.audioBlob]); + // Removed auto-expand behavior so AI 'think'/'tool' traces default to collapsed + // unless explicitly requested by the user, improving readability of the main answer. + useEffect(() => { + // Left empty purposely + }, [message.reasoning]); + // Handle exclusive playback: stop if someone else starts playing useEffect(() => { if (activePlayingId && activePlayingId !== currentMsgId && isPlaying) { @@ -88,8 +94,8 @@ } } }; - const assistantMessageClasses = `p-4 rounded-2xl shadow-lg max-w-[85%] assistant-message mr-auto border border-gray-300 dark:border-gray-700/50 text-gray-900 dark:text-gray-100`; - const userMessageClasses = `max-w-[80%] p-4 rounded-2xl shadow-md text-white ml-auto user-message-container`; + const assistantMessageClasses = `p-4 rounded-2xl shadow-lg max-w-[95%] assistant-message mr-auto border border-gray-300 dark:border-gray-700/50 text-gray-900 dark:text-gray-100`; + const userMessageClasses = `max-w-[90%] p-4 rounded-2xl shadow-md text-white ml-auto user-message-container`; const formatTime = (iso) => { if (!iso) return ''; @@ -118,6 +124,14 @@ > β–Ά {message.status && message.status.startsWith("Thought for") ? message.status.toUpperCase() : "THOUGHT TRACE"} + {message.reasoning && (() => { + const count = (message.reasoning.match(/Sub-Agent \[/g) || []).length; + return count > 0 ? ( + + {count} steps + + ) : null; + })()}
{ +const TerminalNodeItem = ({ nodeId, stats, onMount, onUnmount, nodeConfig, isAIProcessing, thoughtHistory = [] }) => { const terminalRef = useRef(null); const xtermRef = useRef(null); const fitAddonRef = useRef(null); + const [isDetailsExpanded, setIsDetailsExpanded] = useState(false); const sandbox = nodeConfig?.skill_config?.shell?.sandbox || {}; const mode = sandbox.mode || 'PASSIVE'; const isStrict = mode === 'STRICT'; // UI Feedback: boundary color & pulse logic - // AI Activity pulsing gives life to the interface when things are happening. const borderClass = isStrict ? (isAIProcessing ? 'border-red-500/80 ring-1 ring-red-500/40 animate-[pulse-red_2s_infinite]' : 'border-red-900/50') : (isAIProcessing ? 'border-blue-500/80 ring-1 ring-blue-500/40 animate-[pulse-blue_2s_infinite]' : 'border-blue-900/50'); @@ -23,15 +23,14 @@ const statusDotClass = isStrict ? 'bg-rose-500' : 'bg-blue-500'; useEffect(() => { - // Initialize Xterm const xterm = new Terminal({ theme: { - background: '#030712', // Slightly deeper than 0d1117 for contrast + background: '#030712', foreground: '#e6edf3', cursor: isAIProcessing ? '#388bfd' : '#22c55e', selectionBackground: '#388bfd', }, - fontSize: 12, // Slightly tighter for multi-view + fontSize: 12, fontFamily: 'Menlo, Monaco, "Courier New", monospace', cursorBlink: isAIProcessing, cursorStyle: 'block', @@ -44,12 +43,10 @@ const fitAddon = new FitAddon(); xterm.loadAddon(fitAddon); xterm.open(terminalRef.current); - setTimeout(() => fitAddon.fit(), 10); xtermRef.current = xterm; fitAddonRef.current = fitAddon; - onMount(nodeId, xterm); const observer = new ResizeObserver(() => fitAddon.fit()); @@ -62,29 +59,80 @@ }; }, [nodeId]); + const latestThought = thoughtHistory.length > 0 ? thoughtHistory[thoughtHistory.length - 1] : null; + return (
- {/* compact Node Header */}
- {/* Mode Indicator Overlay Gradient */}
-
{nodeId} - +
C: {stats?.cpu_usage_percent?.toFixed(1) || '0.0'}% M: {stats?.memory_usage_percent?.toFixed(1) || '0.0'}%
- {/* Terminal Host */} +
- {isAIProcessing && ( + + {/* Internal Reasoning Details Panel */} +
+
+ +
+ Autonomous Analysis Trace +
+ +
+
+ {thoughtHistory.length === 0 ? ( +
+
Idle Monitoring
+
Waiting for agentic activity...
+
+ ) : ( + thoughtHistory.map((t, i) => ( +
+
+ {new Date(t.time * 1000).toLocaleTimeString([], { hour12: false, hour: '2-digit', minute: '2-digit', second: '2-digit' })} +
+
+ {t.thought} +
+
+ )) + )} +
+
+ + {/* Minimalist Live Analysis Notice (Non-intrusive when collapsed) */} + {isAIProcessing && latestThought && !isDetailsExpanded && ( +
+
+ + + + + {latestThought.thought} +
+
+ )} + + {isAIProcessing && !latestThought && !isDetailsExpanded && (
@@ -109,69 +157,85 @@ ); }; -const MultiNodeConsole = ({ attachedNodeIds, nodes, isAIProcessing }) => { +const MultiNodeConsole = ({ attachedNodeIds, nodes, isAIProcessing, isExpanded, onToggleExpand }) => { const [nodeStats, setNodeStats] = useState({}); // node_id -> stats object + const [nodeHistory, setNodeHistory] = useState({}); // node_id -> array of {time, thought} const [connected, setConnected] = useState(false); const wsRef = useRef(null); - const xtermsRef = useRef({}); // nodeId -> Terminal instance + const xtermsRef = useRef({}); useEffect(() => { if (!attachedNodeIds || attachedNodeIds.length === 0) return; let reconnectTimer; - let isClosing = false; // Flag to prevent reconnect on intentional close + let isClosing = false; const connect = () => { if (isClosing) return; - console.log("[πŸ“Ά] Connecting to Agent Mesh Stream (Multiplexed)..."); const ws = new WebSocket(getNodeStreamUrl()); wsRef.current = ws; - ws.onopen = () => { - console.log("[πŸ“Ά] Mesh Stream Connected"); - setConnected(true); - }; + ws.onopen = () => setConnected(true); ws.onmessage = (event) => { if (isClosing) return; try { const msg = JSON.parse(event.data); - // Filter logic... if (msg.node_id && !attachedNodeIds.includes(msg.node_id)) return; - // Update stats if present if ((msg.event === 'mesh_heartbeat' || msg.event === 'heartbeat')) { - if (msg.data?.stats) { - setNodeStats(prev => ({ ...prev, [msg.node_id]: msg.data.stats })); - } - if (msg.data?.nodes) { - msg.data.nodes.forEach(n => { - setNodeStats(prev => ({ ...prev, [n.node_id]: n.stats })); - }); - } + if (msg.data?.stats) setNodeStats(prev => ({ ...prev, [msg.node_id]: msg.data.stats })); + if (msg.data?.nodes) msg.data.nodes.forEach(n => setNodeStats(prev => ({ ...prev, [n.node_id]: n.stats }))); return; } + + // Handle Sub-Agent Thoughts + if (msg.event === 'subagent_thought') { + setNodeHistory(prev => { + const history = prev[msg.node_id] || []; + // Avoid exact duplicates back-to-back + if (history.length > 0 && history[history.length - 1].thought === msg.data) return prev; + const newEntry = { time: Date.now() / 1000, thought: msg.data }; + return { ...prev, [msg.node_id]: [...history, newEntry] }; + }); + return; + } + const xterm = xtermsRef.current[msg.node_id]; if (xterm) { switch (msg.event) { case 'task_assigned': - if (msg.data.command) xterm.write(`\x1b[38;5;33m\x1b[1m$ ${msg.data.command}\x1b[0m\r\n`); + if (msg.data.command) { + if (msg.data.command.includes('__CORTEX_FIN_SH_')) break; + if (msg.data.command.startsWith('!RAW:')) { + xterm.write(`\x1b[38;5;36m${msg.data.command.slice(5)}\x1b[0m\r\n`); + } else { + xterm.write(`\x1b[38;5;33m\x1b[1m$ ${msg.data.command}\x1b[0m\r\n`); + } + // NEW: Clear thought history when a new bash command starts to maintain relevance + setNodeHistory(prev => ({ ...prev, [msg.node_id]: [] })); + } break; - case 'task_stdout': xterm.write(msg.data); break; - case 'skill_event': if (msg.data?.type === 'output') xterm.write(msg.data.data); break; + case 'task_stdout': + case 'skill_event': + let data = msg.event === 'skill_event' ? (msg.data?.data || msg.data?.terminal_out) : msg.data; + if (data && typeof data === 'string') { + const stealthData = data.replace(/.*__CORTEX_FIN_SH_.*[\r\n]*/g, ''); + if (stealthData) xterm.write(stealthData); + } else if (data) xterm.write(data); + break; case 'browser_event': xterm.write(`\x1b[90m${msg.data.type === 'console' ? 'πŸ–₯️' : '🌐'} ${msg.data.text || msg.data.url}\x1b[0m\r\n`); break; } + // Always scroll to bottom on new output + xterm.scrollToBottom(); } } catch (e) { } }; ws.onclose = () => { if (!isClosing) { - console.warn("[πŸ“Ά] Mesh Stream Disconnected. Reconnecting in 3s..."); setConnected(false); reconnectTimer = setTimeout(connect, 3000); - } else { - console.log("[πŸ“Ά] Mesh Stream Closed Intentionally."); } }; }; @@ -183,21 +247,15 @@ if (wsRef.current) wsRef.current.close(); clearTimeout(reconnectTimer); }; - }, [JSON.stringify(attachedNodeIds)]); // Static dependency check + }, [JSON.stringify(attachedNodeIds)]); - const handleMount = (nodeId, xterm) => { - xtermsRef.current[nodeId] = xterm; - }; - - const handleUnmount = (nodeId) => { - delete xtermsRef.current[nodeId]; - }; + const handleMount = (nodeId, xterm) => { xtermsRef.current[nodeId] = xterm; }; + const handleUnmount = (nodeId) => { delete xtermsRef.current[nodeId]; }; if (!attachedNodeIds || attachedNodeIds.length === 0) return null; return (
- {/* Premium AI Observation Notice Banner */}
@@ -210,23 +268,32 @@
- {/* Swarm Status Bar */}
NODE_EXECUTION_SWARM
- - Attached: {attachedNodeIds.length} - - - Live Stream - + {onToggleExpand && ( + + )} + Attached: {attachedNodeIds.length} + Live Stream
- {/* Multi-Terminal Display */}
{attachedNodeIds.map(nodeId => ( n.node_id === nodeId)} isAIProcessing={isAIProcessing} + thoughtHistory={nodeHistory[nodeId]} /> ))}
@@ -245,4 +313,3 @@ }; export default MultiNodeConsole; - diff --git a/ui/client-app/src/pages/SwarmControlPage.js b/ui/client-app/src/pages/SwarmControlPage.js index 9c30a5b..bd62e37 100644 --- a/ui/client-app/src/pages/SwarmControlPage.js +++ b/ui/client-app/src/pages/SwarmControlPage.js @@ -70,6 +70,47 @@ const [hasLoadedDefaults, setHasLoadedDefaults] = useState(false); const [isInitiatingSync, setIsInitiatingSync] = useState(false); const [showFileExplorer, setShowFileExplorer] = useState(true); + const [isConsoleExpanded, setIsConsoleExpanded] = useState(false); + const [consoleHeight, setConsoleHeight] = useState(256); // Default 64 * 4px = 256px + const [isDraggingConsole, setIsDraggingConsole] = useState(false); + const isDraggingConsoleRef = useRef(false); + + // Handle Dragging Console Resizer explicitly + useEffect(() => { + const handleMouseMove = (e) => { + if (!isDraggingConsoleRef.current) return; + e.preventDefault(); + const newHeight = window.innerHeight - e.clientY; + const clampedHeight = Math.max(100, Math.min(window.innerHeight * 0.9, newHeight)); + setConsoleHeight(clampedHeight); + }; + + const handleMouseUp = () => { + if (isDraggingConsoleRef.current) { + isDraggingConsoleRef.current = false; + setIsDraggingConsole(false); + document.body.style.cursor = 'default'; + // Auto-fit xterm when dragged + window.dispatchEvent(new Event('resize')); + } + }; + + window.addEventListener('mousemove', handleMouseMove); + window.addEventListener('mouseup', handleMouseUp); + + return () => { + window.removeEventListener('mousemove', handleMouseMove); + window.removeEventListener('mouseup', handleMouseUp); + }; + }, []); + + const handleConsoleDragStart = (e) => { + e.preventDefault(); // Prevents text selection while dragging + e.stopPropagation(); + isDraggingConsoleRef.current = true; + setIsDraggingConsole(true); + document.body.style.cursor = 'row-resize'; + }; // M6: Persistence - if we have an active config, populate the form with it when modal opens useEffect(() => { @@ -266,6 +307,11 @@ return (
+ {/* Invisible overlay to catch events across the entire screen during fast drag */} + {isDraggingConsole && ( +
+ )} + -
+
{/* Chat Area & Header */}
@@ -440,11 +486,25 @@ {/* Antigravity Console (M6) */} {showConsole && attachedNodeIds.length > 0 && ( -
+
+ {!isConsoleExpanded && ( +
+
+
+ )} setIsConsoleExpanded(!isConsoleExpanded)} />
)}