diff --git a/agent-node/agent_node/skills/browser.py b/agent-node/agent_node/skills/browser.py deleted file mode 100644 index 1c19b9b..0000000 --- a/agent-node/agent_node/skills/browser.py +++ /dev/null @@ -1,150 +0,0 @@ -import threading -import queue -import time -import json -from playwright.sync_api import sync_playwright -from agent_node.skills.base import BaseSkill -from protos import agent_pb2 - -class BrowserSkill(BaseSkill): - """The 'Antigravity Bridge': Persistent Browser Skill using a dedicated Actor thread.""" - def __init__(self, sync_mgr=None): - self.task_queue = queue.Queue() - self.sessions = {} # session_id -> { "context": Context, "page": Page } - self.sync_mgr = sync_mgr - self.lock = threading.Lock() - threading.Thread(target=self._browser_actor, daemon=True, name="BrowserActor").start() - - def _setup_listeners(self, sid, page, on_event): - """Tunnels browser internal events back to the Orchestrator.""" - if not on_event: return - - # Live Console Redirector - page.on("console", lambda msg: on_event(agent_pb2.BrowserEvent( - session_id=sid, console_msg=agent_pb2.ConsoleMessage( - level=msg.type, text=msg.text, timestamp_ms=int(time.time()*1000) - ) - ))) - - # Live Network Redirector - page.on("requestfinished", lambda req: on_event(agent_pb2.BrowserEvent( - session_id=sid, network_req=agent_pb2.NetworkRequest( - method=req.method, url=req.url, status=req.response().status if req.response() else 0, - resource_type=req.resource_type, latency_ms=0 - ) - ))) - - # Live Download Redirector - page.on("download", lambda download: self._handle_download(sid, download)) - - def _handle_download(self, sid, download): - """Saves browser downloads directly into the synchronized session workspace.""" - import os - with self.lock: - sess = self.sessions.get(sid) - if sess and sess.get("download_dir"): - os.makedirs(sess["download_dir"], exist_ok=True) - target = os.path.join(sess["download_dir"], download.suggested_filename) - print(f" [๐ŸŒ๐Ÿ“ฅ] Browser Download Sync: {download.suggested_filename} -> {target}") - download.save_as(target) - - def _browser_actor(self): - """Serializes all Playwright operations on a single dedicated thread.""" - print("[๐ŸŒ] Browser Actor Starting...", flush=True) - pw = None - browser = None - try: - pw = sync_playwright().start() - # 12-Factor/Container Optimization: Standard non-sandbox arguments - browser = pw.chromium.launch(headless=True, args=[ - '--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage', '--disable-gpu' - ]) - print("[๐ŸŒ] Browser Engine Online.", flush=True) - except Exception as e: - print(f"[!] Browser Actor Startup Fail: {e}", flush=True) - if pw: pw.stop() - return - - while True: - try: - item = self.task_queue.get() - if item is None: # Sentinel for shutdown - print("[๐ŸŒ] Browser Actor Shutting Down...", flush=True) - break - - task, sandbox, on_complete, on_event = item - action = task.browser_action - sid = action.session_id or "default" - - with self.lock: - if sid not in self.sessions: - # Phase 4: Mount workspace for downloads/uploads - download_dir = None - if self.sync_mgr and task.session_id: - download_dir = self.sync_mgr.get_session_dir(task.session_id) - print(f" [๐ŸŒ๐Ÿ“] Mapping Browser Context to: {download_dir}") - - ctx = browser.new_context(accept_downloads=True) - pg = ctx.new_page() - self._setup_listeners(sid, pg, on_event) - self.sessions[sid] = {"context": ctx, "page": pg, "download_dir": download_dir} - - page = self.sessions[sid]["page"] - print(f" [๐ŸŒ] Browser Actor Processing: {agent_pb2.BrowserAction.ActionType.Name(action.action)} | Session: {sid}", flush=True) - - res_data = {} - # State-Machine Logic for Actions - if action.action == agent_pb2.BrowserAction.NAVIGATE: - page.goto(action.url, wait_until="commit") - elif action.action == agent_pb2.BrowserAction.CLICK: - page.click(action.selector) - elif action.action == agent_pb2.BrowserAction.TYPE: - page.fill(action.selector, action.text) - elif action.action == agent_pb2.BrowserAction.SCREENSHOT: - res_data["snapshot"] = page.screenshot() - elif action.action == agent_pb2.BrowserAction.GET_DOM: - res_data["dom_content"] = page.content() - elif action.action == agent_pb2.BrowserAction.HOVER: - page.hover(action.selector) - elif action.action == agent_pb2.BrowserAction.SCROLL: - page.mouse.wheel(x=0, y=action.y) - elif action.action == agent_pb2.BrowserAction.EVAL: - res_data["eval_result"] = str(page.evaluate(action.text)) - elif action.action == agent_pb2.BrowserAction.GET_A11Y: - res_data["a11y_tree"] = json.dumps(page.accessibility.snapshot()) - elif action.action == agent_pb2.BrowserAction.CLOSE: - with self.lock: - sess = self.sessions.pop(sid, None) - if sess: sess["context"].close() - - # Results Construction - br_res = agent_pb2.BrowserResponse( - url=page.url, title=page.title(), - snapshot=res_data.get("snapshot", b""), - dom_content=res_data.get("dom_content", ""), - a11y_tree=res_data.get("a11y_tree", ""), - eval_result=res_data.get("eval_result", "") - ) - on_complete(task.task_id, {"status": 1, "browser_result": br_res}, task.trace_id) - except Exception as e: - print(f" [!] Browser Actor Error: {e}", flush=True) - on_complete(task.task_id, {"stderr": str(e), "status": 2}, task.trace_id) - - # Cleanup on loop exit - print("[๐ŸŒ] Cleaning up Browser Engine...", flush=True) - with self.lock: - for s in self.sessions.values(): - try: s["context"].close() - except: pass - self.sessions.clear() - if browser: browser.close() - if pw: pw.stop() - - def execute(self, task, sandbox, on_complete, on_event=None): - self.task_queue.put((task, sandbox, on_complete, on_event)) - - def cancel(self, task_id): return False - - def shutdown(self): - """Triggers graceful shutdown of the browser engine.""" - self.task_queue.put(None) diff --git a/agent-node/agent_node/skills/file.py b/agent-node/agent_node/skills/file.py deleted file mode 100644 index a8bd080..0000000 --- a/agent-node/agent_node/skills/file.py +++ /dev/null @@ -1,77 +0,0 @@ -import os -import json -import logging -from agent_node.skills.base import BaseSkill - -logger = logging.getLogger(__name__) - -class FileSkill(BaseSkill): - """Provides file system navigation and inspection capabilities.""" - - def __init__(self, sync_mgr=None): - self.sync_mgr = sync_mgr - - def execute(self, task, sandbox, on_complete, on_event=None): - """ - Executes a file-related task (list, stats). - Payload JSON: { "action": "list", "path": "...", "recursive": false } - """ - try: - payload = json.loads(task.payload_json) - action = payload.get("action", "list") - path = payload.get("path", ".") - - # 1. Sandbox Jail Check - # (In a real implementation, we'd use sandbox.check_path(path)) - # For now, we'll assume the node allows browsing its root or session dir. - - if action == "list": - result = self._list_dir(path, payload.get("recursive", False)) - on_complete(task.task_id, {"status": 1, "stdout": json.dumps(result)}, task.trace_id) - else: - on_complete(task.task_id, {"status": 0, "stderr": f"Unknown action: {action}"}, task.trace_id) - - except Exception as e: - logger.error(f"[FileSkill] Task {task.task_id} failed: {e}") - on_complete(task.task_id, {"status": 0, "stderr": str(e)}, task.trace_id) - - def _list_dir(self, path, recursive=False): - """Lists directory contents with metadata.""" - if not os.path.exists(path): - return {"error": "Path not found"} - - items = [] - if recursive: - for root, dirs, files in os.walk(path): - for name in dirs + files: - abs_path = os.path.join(root, name) - rel_path = os.path.relpath(abs_path, path) - st = os.stat(abs_path) - items.append({ - "name": name, - "path": rel_path, - "is_dir": os.path.isdir(abs_path), - "size": st.st_size, - "mtime": st.st_mtime - }) - else: - for name in os.listdir(path): - abs_path = os.path.join(path, name) - st = os.stat(abs_path) - items.append({ - "name": name, - "is_dir": os.path.isdir(abs_path), - "size": st.st_size, - "mtime": st.st_mtime - }) - - return { - "root": os.path.abspath(path), - "items": sorted(items, key=lambda x: (not x["is_dir"], x["name"])) - } - - def cancel(self, task_id): - return False # Listing is usually fast, no cancellation needed - - def shutdown(self): - pass diff --git a/agent-node/agent_node/skills/manager.py b/agent-node/agent_node/skills/manager.py index 47555c2..31ec258 100644 --- a/agent-node/agent_node/skills/manager.py +++ b/agent-node/agent_node/skills/manager.py @@ -1,8 +1,8 @@ import threading +import os +import importlib.util from concurrent import futures -from agent_node.skills.shell import ShellSkill -from agent_node.skills.browser import BrowserSkill -from agent_node.skills.file import FileSkill +from agent_node.skills.base import BaseSkill from agent_node.config import MAX_SKILL_WORKERS class SkillManager: @@ -11,32 +11,79 @@ self.executor = futures.ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="skill-worker") self.active_tasks = {} # task_id -> future self.sync_mgr = sync_mgr - self.skills = { - "shell": ShellSkill(sync_mgr=sync_mgr), - "browser": BrowserSkill(sync_mgr=sync_mgr), - "file": FileSkill(sync_mgr=sync_mgr) - } + self.skills = self._discover_skills(sync_mgr) self.max_workers = max_workers self.lock = threading.Lock() + def _discover_skills(self, sync_mgr): + """Scans the skills/ directory for logic.py and loads skill implementations.""" + # Find project root (assumes /app/agent_node/skills/manager.py) + project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../")) + skills_dir = os.path.join(project_root, "skills") + + discovered = {} + if not os.path.exists(skills_dir): + print(f" [๐Ÿ”งโš ๏ธ] Skills directory not found: {skills_dir}") + return discovered + + for skill_dir in os.listdir(skills_dir): + item_path = os.path.join(skills_dir, skill_dir) + if os.path.isdir(item_path): + logic_py = os.path.join(item_path, "logic.py") + if os.path.exists(logic_py): + # Dynamic import + try: + spec = importlib.util.spec_from_file_location(f"skill_{skill_dir}", logic_py) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + # Find the first class that inherits from BaseSkill + for attr_name in dir(module): + attr = getattr(module, attr_name) + if isinstance(attr, type) and issubclass(attr, BaseSkill) and attr is not BaseSkill: + # We map the internal skill name (e.g. mesh_terminal_control) + # if we can find it in the module or assume it based on folder name + # For backward compatibility with task_type routing, we map common ones + instance = attr(sync_mgr=sync_mgr) + discovered[skill_dir] = instance + # Also map legacy names for the routing engine below + if "terminal" in skill_dir or "shell" in skill_dir: + discovered["shell"] = instance + if "browser" in skill_dir: + discovered["browser"] = instance + if "file" in skill_dir: + discovered["file"] = instance + break + except Exception as e: + print(f" [๐Ÿ”งโŒ] Failed to load skill from {logic_py}: {e}") + + print(f" [๐Ÿ”ง] Discovered skills: {list(discovered.keys())}") + return discovered + def submit(self, task, sandbox, on_complete, on_event=None): """Routes a task to the appropriate skill and submits it to the thread pool.""" # --- 0. Transparent TTY Bypass (Gaming Performance) --- # Keystrokes and Resizes should NEVER wait for a thread or be blocked by sandbox - if self.skills["shell"].handle_transparent_tty(task, on_complete, on_event): - return True, "Accepted (Transparent)" + if "shell" in self.skills and hasattr(self.skills["shell"], "handle_transparent_tty"): + if self.skills["shell"].handle_transparent_tty(task, on_complete, on_event): + return True, "Accepted (Transparent)" with self.lock: if len(self.active_tasks) >= self.max_workers: return False, "Node Capacity Reached" # 1. Routing Engine + skill = None if task.HasField("browser_action"): - skill = self.skills["browser"] + skill = self.skills.get("browser") elif task.task_type == "file": - skill = self.skills["file"] + skill = self.skills.get("file") else: - skill = self.skills["shell"] + # Default to the one that looks like a shell + skill = self.skills.get("shell") + + if not skill: + return False, f"Target skill not available for task type: {task.task_type}" # 2. Execution submission future = self.executor.submit(skill.execute, task, sandbox, on_complete, on_event) @@ -66,8 +113,8 @@ """Triggers shutdown for all skills and the worker pool.""" print("[๐Ÿ”ง] Shutting down Skill Manager...") with self.lock: - for name, skill in self.skills.items(): - print(f" [๐Ÿ”ง] Shutting down skill: {name}") + # Use set to avoid shutting down the same instance multiple times due to alias mapping + for skill in set(self.skills.values()): skill.shutdown() # Shutdown thread pool self.executor.shutdown(wait=True) diff --git a/agent-node/agent_node/skills/shell.py b/agent-node/agent_node/skills/shell.py deleted file mode 100644 index 363f343..0000000 --- a/agent-node/agent_node/skills/shell.py +++ /dev/null @@ -1,410 +0,0 @@ -import os -import pty -import select -import threading -import time -import termios -import struct -import fcntl -import tempfile -from .base import BaseSkill -from protos import agent_pb2 - -class ShellSkill(BaseSkill): - """Admin Console Skill: Persistent stateful Bash via PTY.""" - def __init__(self, sync_mgr=None): - self.sync_mgr = sync_mgr - self.sessions = {} # session_id -> {fd, pid, thread, last_activity, ...} - self.lock = threading.Lock() - - # Phase 3: Prompt Patterns for Edge Intelligence - self.PROMPT_PATTERNS = [ - r"[\r\n].*[@\w\.\-]+:.*[#$]\s*$", # bash/zsh: user@host:~$ - r">>>\s*$", # python - r"\.\.\.\s*$", # python multi-line - r">\s*$", # node/js - ] - - # --- M7: Idle Session Reaper --- - # Automatically kills dormant bash processes to free up system resources. - self.reaper_thread = threading.Thread(target=self._session_reaper, daemon=True, name="ShellReaper") - self.reaper_thread.start() - - def _session_reaper(self): - """Background thread that cleans up unused PTY sessions.""" - while True: - time.sleep(60) - with self.lock: - now = time.time() - for sid, sess in list(self.sessions.items()): - # Avoid reaping currently active tasks - if sess.get("active_task"): - continue - - # 10 minute idle timeout - if now - sess.get("last_activity", 0) > 600: - print(f" [๐Ÿš๐Ÿงน] Reaping idle shell session: {sid}") - try: - os.close(sess["fd"]) - os.kill(sess["pid"], 9) - except: pass - self.sessions.pop(sid, None) - - def _ensure_session(self, session_id, cwd, on_event): - with self.lock: - if session_id in self.sessions: - self.sessions[session_id]["last_activity"] = time.time() - return self.sessions[session_id] - - print(f" [๐Ÿš] Initializing Persistent Shell Session: {session_id}") - # Spawn bash in a pty - pid, fd = pty.fork() - if pid == 0: # Child - # Environment prep - os.environ["TERM"] = "xterm-256color" - - # Change to CWD - if cwd and os.path.exists(cwd): - os.chdir(cwd) - - # Launch shell - os.execv("/bin/bash", ["/bin/bash", "--login"]) - - # Parent - # Set non-blocking - fl = fcntl.fcntl(fd, fcntl.F_GETFL) - fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) - - sess = { - "fd": fd, - "pid": pid, - "last_activity": time.time(), - "buffer_file": None, - "tail_buffer": "", - "active_task": None - } - - def reader(): - while True: - try: - r, _, _ = select.select([fd], [], [], 0.1) - if fd in r: - data = os.read(fd, 4096) - if not data: break - - decoded = data.decode("utf-8", errors="replace") - - # Streaming/Sync logic (Detect completion marker) - with self.lock: - active_tid = sess.get("active_task") - marker = sess.get("marker") - if active_tid and marker and sess.get("buffer_file"): - # Phase 2: Persistence Offloading - # Write directly to disk instead of heap memory - sess["buffer_file"].write(decoded) - sess["buffer_file"].flush() - - # Keep a tiny 4KB tail in RAM for marker detection and prompt scanning - sess["tail_buffer"] = (sess.get("tail_buffer", "") + decoded)[-4096:] - - if marker in sess["tail_buffer"]: - # Marker found! Extract exit code - try: - # The tail buffer has the marker - after_marker = sess["tail_buffer"].split(marker)[1].strip().split() - exit_code = int(after_marker[0]) if after_marker else 0 - - # Formulate final stdout summary from the disk file - bf = sess["buffer_file"] - bf.seek(0, 2) - file_len = bf.tell() - - HEAD, TAIL = 10_000, 30_000 - if file_len > HEAD + TAIL: - bf.seek(0) - head_str = bf.read(HEAD) - bf.seek(file_len - TAIL) - tail_str = bf.read() - omitted = file_len - HEAD - TAIL - pure_stdout = head_str + f"\n\n[... {omitted:,} bytes omitted (full output safely preserved at {bf.name}) ...]\n\n" + tail_str - else: - bf.seek(0) - pure_stdout = bf.read() - - # Slice off the marker string and anything after it from the final result - pure_stdout = pure_stdout.split(marker)[0] - - sess["result"]["stdout"] = pure_stdout - sess["result"]["status"] = 1 if exit_code == 0 else 2 - - # Close the file handle (leaves file on disk) - sess["buffer_file"].close() - sess["buffer_file"] = None - - sess["event"].set() - decoded = pure_stdout.split(marker)[0][-4096:] if marker in pure_stdout else pure_stdout - except Exception as e: - print(f" [๐Ÿšโš ๏ธ] Marker parsing failed: {e}") - sess["event"].set() - - # Stream terminal output back (with stealth filtering) - if on_event: - stealth_out = decoded - if "__CORTEX_FIN_SH_" in decoded: - import re - # We remove any line that contains our internal marker to hide plumbing from user. - # This covers both the initial command echo and the exit code output. - stealth_out = re.sub(r'.*__CORTEX_FIN_SH_.*[\r\n]*', '', decoded) - - if stealth_out: - # Phase 3: Client-Side Truncation (Stream Rate Limiting) - # Limit real-time stream to 15KB/sec per session to prevent flooding the Hub over gRPC. - # The full output is still safely written to the tempfile on disk. - with self.lock: - now = time.time() - if now - sess.get("stream_window_start", 0) > 1.0: - sess["stream_window_start"] = now - sess["stream_bytes_sent"] = 0 - dropped = sess.get("stream_dropped_bytes", 0) - if dropped > 0: - drop_msg = f"\n[... {dropped:,} bytes truncated from live stream ...]\n" - event = agent_pb2.SkillEvent( - session_id=session_id, task_id=sess.get("active_task") or "", terminal_out=drop_msg - ) - on_event(agent_pb2.ClientTaskMessage(skill_event=event)) - sess["stream_dropped_bytes"] = 0 - - if sess.get("stream_bytes_sent", 0) + len(stealth_out) > 15_000: - sess["stream_dropped_bytes"] = sess.get("stream_dropped_bytes", 0) + len(stealth_out) - else: - sess["stream_bytes_sent"] = sess.get("stream_bytes_sent", 0) + len(stealth_out) - event = agent_pb2.SkillEvent( - session_id=session_id, - task_id=sess.get("active_task") or "", - terminal_out=stealth_out - ) - on_event(agent_pb2.ClientTaskMessage(skill_event=event)) - - # EDGE INTELLIGENCE: Proactively signal prompt detection - # We only check for prompts if we are actively running a task and haven't found the marker yet. - if active_tid and not sess["event"].is_set(): - import re - tail = sess["tail_buffer"][-100:] if len(sess["tail_buffer"]) > 100 else sess["tail_buffer"] - for pattern in self.PROMPT_PATTERNS: - if re.search(pattern, tail): - # Send specific prompt signal - # Use last 20 chars as the 'prompt' hint - p_hint = tail[-20:].strip() - prompt_event = agent_pb2.SkillEvent( - session_id=session_id, - task_id=active_tid, - prompt=p_hint - ) - on_event(agent_pb2.ClientTaskMessage(skill_event=prompt_event)) - break - except (EOFError, OSError): - break - - # Thread Cleanup - print(f" [๐Ÿš] Shell Session Terminated: {session_id}") - with self.lock: - self.sessions.pop(session_id, None) - - t = threading.Thread(target=reader, daemon=True, name=f"ShellReader-{session_id}") - t.start() - sess["thread"] = t - - self.sessions[session_id] = sess - return sess - - - def handle_transparent_tty(self, task, on_complete, on_event=None): - """Processes raw TTY/Resize events synchronously (bypasses threadpool/sandbox).""" - cmd = task.payload_json - session_id = task.session_id or "default-session" - try: - import json - if cmd.startswith('{') and cmd.endswith('}'): - raw_payload = json.loads(cmd) - - # 1. Raw Keystroke forward - if isinstance(raw_payload, dict) and "tty" in raw_payload: - raw_bytes = raw_payload["tty"] - sess = self._ensure_session(session_id, None, on_event) - os.write(sess["fd"], raw_bytes.encode("utf-8")) - on_complete(task.task_id, {"stdout": "", "status": 0}, task.trace_id) - return True - - # 2. Window Resize - if isinstance(raw_payload, dict) and raw_payload.get("action") == "resize": - cols = raw_payload.get("cols", 80) - rows = raw_payload.get("rows", 24) - sess = self._ensure_session(session_id, None, on_event) - import termios, struct, fcntl - s = struct.pack('HHHH', rows, cols, 0, 0) - fcntl.ioctl(sess["fd"], termios.TIOCSWINSZ, s) - print(f" [๐Ÿš] Terminal Resized to {cols}x{rows}") - on_complete(task.task_id, {"stdout": f"resized to {cols}x{rows}", "status": 0}, task.trace_id) - return True - except Exception as pe: - print(f" [๐Ÿš] Transparent TTY Fail: {pe}") - return False - - def execute(self, task, sandbox, on_complete, on_event=None): - """Dispatches command string to the persistent PTY shell and WAITS for completion.""" - session_id = task.session_id or "default-session" - tid = task.task_id - try: - cmd = task.payload_json - - # --- Legacy Full-Command Execution (Sandboxed) --- - allowed, status_msg = sandbox.verify(cmd) - if not allowed: - err_msg = f"\r\n[System] Command blocked: {status_msg}\r\n" - if on_event: - event = agent_pb2.SkillEvent( - session_id=session_id, task_id=tid, - terminal_out=err_msg - ) - on_event(agent_pb2.ClientTaskMessage(skill_event=event)) - - return on_complete(tid, {"stderr": f"SANDBOX_VIOLATION: {status_msg}", "status": 2}, task.trace_id) - - # Resolve CWD jail - cwd = None - if self.sync_mgr and task.session_id: - cwd = self.sync_mgr.get_session_dir(task.session_id) - elif sandbox.policy.get("WORKING_DIR_JAIL"): - cwd = sandbox.policy["WORKING_DIR_JAIL"] - if not os.path.exists(cwd): - try: os.makedirs(cwd, exist_ok=True) - except: pass - - # Handle Session Persistent Process - sess = self._ensure_session(session_id, cwd, on_event) - - # Check for RAW mode first (bypasses busy check for interactive control) - is_raw = cmd.startswith("!RAW:") - if is_raw: - input_str = cmd[5:] + "\n" - print(f" [๐ŸšโŒจ๏ธ] RAW Input Injection: {input_str.strip()}") - os.write(sess["fd"], input_str.encode("utf-8")) - return on_complete(tid, {"stdout": "INJECTED", "status": 1}, task.trace_id) - - # --- 0. Busy Check: Serialize access to the PTY for standard commands --- - with self.lock: - if sess.get("active_task"): - curr_tid = sess.get("active_task") - return on_complete(tid, {"stderr": f"[BUSY] Session {session_id} is already running task {curr_tid}", "status": 2}, task.trace_id) - - # --- Blocking Wait Logic --- - # --- Blocking Wait Logic --- - marker_id = int(time.time()) - marker = f"__CORTEX_FIN_SH_{marker_id}__" - event = threading.Event() - result_container = {"stdout": "", "status": 1} # 1 = Success by default (node.py convention) - - # Register waiter in session state - with self.lock: - sess["active_task"] = tid - sess["marker"] = marker - sess["event"] = event - # Create a persistent tempfile for stdout instead of RAM buffer - sess["buffer_file"] = tempfile.NamedTemporaryFile("w+", encoding="utf-8", prefix=f"cortex_task_{tid}_", delete=False) - sess["tail_buffer"] = "" - sess["result"] = result_container - sess["cancel_event"] = threading.Event() - - # Input injection: execute command then echo marker and exit code - try: - # 12-factor bash: ( cmd ) ; echo marker $? - # We use "" concatenation in the echo command to ensure the marker literal - # DOES NOT appear in the PTY input echo, preventing premature completion. - full_input = f"({cmd}) ; echo \"__CORTEX_FIN_SH_\"\"{marker_id}__\" $?\n" - os.write(sess["fd"], full_input.encode("utf-8")) - - # Wait for completion (triggered by reader) OR cancellation - timeout = (task.timeout_ms / 1000.0) if task.timeout_ms > 0 else 60.0 - start_time = time.time() - while time.time() - start_time < timeout: - # Check for completion (reader found marker) - if event.is_set(): - return on_complete(tid, result_container, task.trace_id) - - # Check for cancellation (HUB sent cancel) - if sess["cancel_event"].is_set(): - print(f" [๐Ÿš๐Ÿ›‘] Task {tid} cancelled on node.") - return on_complete(tid, {"stderr": "ABORTED", "status": 2}, task.trace_id) - - # Sleep slightly to avoid busy loop - time.sleep(0.1) - - # Timeout Case - print(f" [๐Ÿšโš ๏ธ] Task {tid} timed out on node.") - with self.lock: - if sess.get("buffer_file"): - try: - sess["buffer_file"].seek(0, 2) - file_len = sess["buffer_file"].tell() - HEAD, TAIL = 10_000, 30_000 - if file_len > HEAD + TAIL: - sess["buffer_file"].seek(0) - head_str = sess["buffer_file"].read(HEAD) - sess["buffer_file"].seek(file_len - TAIL) - tail_str = sess["buffer_file"].read() - omitted = file_len - HEAD - TAIL - partial_out = head_str + f"\n\n[... {omitted:,} bytes omitted (full timeout output saved to {sess['buffer_file'].name}) ...]\n\n" + tail_str - else: - sess["buffer_file"].seek(0) - partial_out = sess["buffer_file"].read() - except: - partial_out = "" - else: - partial_out = "" - - on_complete(tid, {"stdout": partial_out, "stderr": "TIMEOUT", "status": 2}, task.trace_id) - - finally: - # Cleanup session task state - with self.lock: - if sess.get("active_task") == tid: - if sess.get("buffer_file"): - try: - sess["buffer_file"].close() - except: pass - sess["buffer_file"] = None - sess["active_task"] = None - sess["marker"] = None - sess["event"] = None - sess["result"] = None - sess["cancel_event"] = None - - except Exception as e: - print(f" [๐ŸšโŒ] Execute Error for {tid}: {e}") - on_complete(tid, {"stderr": str(e), "status": 2}, task.trace_id) - - def cancel(self, task_id: str): - """Cancels an active task โ€” for persistent shell, this sends a SIGINT (Ctrl+C).""" - with self.lock: - for sid, sess in self.sessions.items(): - if sess.get("active_task") == task_id: - print(f"[๐Ÿ›‘] Sending SIGINT (Ctrl+C) to shell session (Task {task_id}): {sid}") - # Write \x03 (Ctrl+C) to the master FD - os.write(sess["fd"], b"\x03") - # Break the wait loop in execute thread - if sess.get("cancel_event"): - sess["cancel_event"].set() - return True - - - def shutdown(self): - """Cleanup: Terminates all persistent shells.""" - with self.lock: - for sid, sess in list(self.sessions.items()): - print(f"[๐Ÿ›‘] Cleaning up persistent shell: {sid}") - try: os.close(sess["fd"]) - except: pass - # kill pid - try: os.kill(sess["pid"], 9) - except: pass - self.sessions.clear() diff --git a/ai-hub/app/api/routes/tts.py b/ai-hub/app/api/routes/tts.py index ac34ebb..6668c1b 100644 --- a/ai-hub/app/api/routes/tts.py +++ b/ai-hub/app/api/routes/tts.py @@ -90,8 +90,8 @@ current_text = text.strip() while current_text: - # Target size: 80 for first (speed), 300 for rest (buffer) - target_size = 80 if is_first else 300 + # Target size: 80 for first (speed), 180 for rest (buffer) + target_size = 80 if is_first else 180 if len(current_text) <= target_size: chunks.append(current_text) break @@ -152,7 +152,7 @@ # Then stream the remaining chunks using parallel fetching but sequential yielding import asyncio - semaphore = asyncio.Semaphore(1) # Strict lock for Beta TTS stability + semaphore = asyncio.Semaphore(2) # Increased to 2 for better overlap async def fetch_chunk(text_chunk, idx): retries = 3 diff --git a/ai-hub/app/api/schemas.py b/ai-hub/app/api/schemas.py index 3a979e5..1307489 100644 --- a/ai-hub/app/api/schemas.py +++ b/ai-hub/app/api/schemas.py @@ -85,6 +85,8 @@ is_enabled: bool = True features: List[str] = Field(default_factory=lambda: ["chat"]) is_system: bool = False + extra_metadata: dict = Field(default_factory=dict) + preview_markdown: Optional[str] = None class SkillCreate(SkillBase): pass @@ -98,6 +100,8 @@ is_enabled: Optional[bool] = None features: Optional[List[str]] = None is_system: Optional[bool] = None + extra_metadata: Optional[dict] = None + preview_markdown: Optional[str] = None class SkillResponse(SkillBase): id: int diff --git a/ai-hub/app/config.py b/ai-hub/app/config.py index 58c34cf..f1caef3 100644 --- a/ai-hub/app/config.py +++ b/ai-hub/app/config.py @@ -160,7 +160,7 @@ self.DEEPSEEK_MODEL_NAME = self.LLM_PROVIDERS.get("deepseek", {}).get("model") or \ get_from_yaml(["llm_providers", "deepseek_model_name"]) or "deepseek-chat" self.GEMINI_MODEL_NAME = self.LLM_PROVIDERS.get("gemini", {}).get("model") or \ - get_from_yaml(["llm_providers", "gemini_model_name"]) or "gemini-2.5-flash" + get_from_yaml(["llm_providers", "gemini_model_name"]) or "gemini-1.5-flash" # 2. Resolve Vector / Embedding self.FAISS_INDEX_PATH: str = os.getenv("FAISS_INDEX_PATH") or \ diff --git a/ai-hub/app/core/orchestration/architect.py b/ai-hub/app/core/orchestration/architect.py index 3ffc52b..767ef75 100644 --- a/ai-hub/app/core/orchestration/architect.py +++ b/ai-hub/app/core/orchestration/architect.py @@ -15,7 +15,7 @@ def __init__(self, context_manager: Optional[ContextManager] = None): self.memory = context_manager or ContextManager() - self.stream = StreamProcessor() + self.stream = None # Created during run() async def run( self, @@ -37,7 +37,8 @@ # 1. Initialize Context & Messages messages = self.memory.prepare_initial_messages( question, context_chunks, history, feature_name, mesh_context, sync_workspace_id, - db=db, user_id=user_id, prompt_service=prompt_service, prompt_slug=prompt_slug + db=db, user_id=user_id, prompt_service=prompt_service, prompt_slug=prompt_slug, + tools=tools ) # 2. Setup Mesh Observation @@ -50,9 +51,13 @@ safety = SafetyGuard(db, session_id) # 4. Main Autonomous Loop + from .profiles import get_profile + profile = get_profile(feature_name) + self.stream = StreamProcessor(profile=profile) + turn = 0 + try: - turn = 0 - while turn < 500: + while turn < profile.autonomous_limit: turn += 1 self.stream.reset_turn() @@ -64,7 +69,8 @@ # B. Turn Start Heartbeat self._update_turn_marker(messages, turn) - yield {"type": "status", "content": f"Turn {turn}: architecting next step..."} + if profile.show_heartbeat: + yield {"type": "status", "content": f"Turn {turn}: architecting next step..."} # C. LLM Call prediction = await self._call_llm(llm_provider, messages, tools) @@ -93,13 +99,49 @@ async for event in self.stream.process_chunk(c, turn): if event["type"] == "content": accumulated_content += event["content"] if event["type"] == "reasoning": accumulated_reasoning += event["content"] - yield event + + if not profile.buffer_content: + yield event + else: + # In buffered mode (voice), we yield reasoning immediately but hold content + if event["type"] == "reasoning": + yield event # Tool delta accumulation self._accumulate_tool_calls(delta, tool_calls_map) # E. Branch: Tools or Exit? + # Heartbeat Fallback: If no content was sent but tools are being called, force a bridge sentence + if tool_calls_map and not self.stream.header_sent and not profile.silent_stream: + fallback_text = f"Strategy: Executing orchestrated tasks in progress..." + for event in self.stream.process_chunk(fallback_text, turn): + if event["type"] == "content": accumulated_content += event["content"] + yield event + + # E. Branch: Tools or Exit? if not tool_calls_map: + # Final Turn: Yield the accumulated content if it was empty + if not accumulated_content.strip(): + import re + fallback = "I've completed the requested task." + if accumulated_reasoning: + fallback = "Analysis finished. Please review the results above." + + # In voice mode (buffered), we apply specialized stripping + if profile.buffer_content: + content_to_yield = fallback + yield {"type": "content", "content": content_to_yield} + else: + # In chat mode, just send the fallback if no content ever came through + yield {"type": "content", "content": fallback} + elif profile.buffer_content: + # Standard buffered yield + import re + content_to_yield = accumulated_content + for pattern in profile.strip_headers: + content_to_yield = re.sub(pattern, "", content_to_yield, flags=re.IGNORECASE) + yield {"type": "content", "content": content_to_yield.strip()} + # Watchdog Check if safety.should_activate_watchdog(self._get_assistant(tool_service), sync_workspace_id): yield {"type": "status", "content": "Watchdog: tasks remain. continuing..."} diff --git a/ai-hub/app/core/orchestration/memory.py b/ai-hub/app/core/orchestration/memory.py index 624a103..c2e1e68 100644 --- a/ai-hub/app/core/orchestration/memory.py +++ b/ai-hub/app/core/orchestration/memory.py @@ -3,49 +3,6 @@ from sqlalchemy.orm import Session from app.db import models -PROMPT_TEMPLATE = """You are the Cortex AI Assistant, the **Master-Architect** of a decentralized agent mesh. - -## ๐Ÿ—๏ธ Orchestration Strategy (The Master-Worker Pattern): -- **Master Control**: YOU are the brain. You define every project step based on intelligence reports from your field agents. -- **Atomic Operations**: Assign ONLY atomic, self-contained tasks. -- **Intelligence Reports**: Sub-agents will return a `REPORT` summarizing their findings. Use this distilled intelligence as your primary source of truth for the NEXT step. -- **Visible Reasoning**: You MUST provide textual analysis/strategy at the start of EVERY turn. - -## ๐Ÿš€ Execution Mandate: -- **Perpetual Pursuit**: DO NOT stop until the user's objective is achieved. -- **No Idle Turns**: If a sub-goal is reached, immediately pivot to the next atomic task. -- **NO SILENT ACTIONS**: You are **FORBIDDEN** from calling a tool without first providing at least one sentence of **plain text** analysis/strategy. - -## โœ๏ธ Interaction Format (MANDATORY): -1. **BRIDGE ANALYSIS (Plain Text)**: At the start of EVERY turn, write 1-2 sentences of auditable analysis. - - *Example*: "The previous report confirms the node is offline. I will check test-node-2 next." - - **Crucial**: This content MUST be outside of `` tags so it is visible to the user as your "Master Strategy". Use `` only for raw technical brainstorming. -2. **ACT**: Call the single atomic tool required for your plan. - -## ๐Ÿ“‚ Infrastructure & Ghost Mirror: -- **Node Sync Path**: All synced files are at `/tmp/cortex-sync/{{session_id}}/` on agent nodes. -- **Hub Mirror**: Use `mesh_file_explorer` with `session_id` to read/list files from the central mirror (~1ms speed). -- **Source of Truth**: Maintain `.ai_todo.md` in the workspace. Mark items `[COMPLETED]` only after verification. - -Infrastructure Context (Mesh): -{mesh_context} - -User Question: {question} - -Answer:""" - -VOICE_PROMPT_TEMPLATE = """You are a conversational voice assistant. -Keep your responses short, natural, and helpful. -Avoid using technical jargon or listing technical infrastructure details unless specifically asked. -Focus on being a friendly companion. - -Conversation History: -{chat_history} - -User Question: {question} - -Answer:""" - class ContextManager: """Handles prompt assembly and conversation history management.""" @@ -56,12 +13,16 @@ def prepare_initial_messages(self, question: str, context_chunks: List[Any], history: List[models.Message], feature_name: str, mesh_context: str, sync_workspace_id: Optional[str], db: Optional[Session] = None, user_id: Optional[str] = None, - prompt_service: Any = None, prompt_slug: str = "rag-pipeline") -> List[Dict[str, str]]: + prompt_service: Any = None, prompt_slug: str = "rag-pipeline", + tools: List[Dict[str, Any]] = None) -> List[Dict[str, str]]: + + from .profiles import get_profile + profile = get_profile(feature_name) history_text = self.history_formatter(history) context_text = self.context_postprocessor(context_chunks) - template = PROMPT_TEMPLATE if feature_name != "voice" else VOICE_PROMPT_TEMPLATE + template = profile.template # dynamic prompt override if prompt_service and db and user_id: @@ -75,12 +36,23 @@ mesh_info += f"\nActive Ghost Mirror Session ID: {sync_workspace_id}\n" mesh_info += f"Instruction: When using `mesh_file_explorer` or `mesh_sync_control`, you MUST use this exactly as the `session_id` (or leave it blank/use 'current' which I will resolve for you).\n" - system_prompt = template.format( - question=question, - context=context_text, - chat_history=history_text, - mesh_context=mesh_info - ) + # List of actual tools for strict enforcement + available_skills = [t["function"]["name"] for t in (tools or [])] + skill_list_str = ", ".join(available_skills) if available_skills else "NONE" + + # Robust string replacement to avoid crashing on braces in context/history + system_prompt = template.replace("{question}", str(question)) + system_prompt = system_prompt.replace("{context}", str(context_text)) + system_prompt = system_prompt.replace("{chat_history}", str(history_text)) + system_prompt = system_prompt.replace("{mesh_context}", str(mesh_info)) + + # Handle escaped session_id marker in DEFAULT_PROMPT_TEMPLATE + system_prompt = system_prompt.replace("{{session_id}}", "{session_id}") + + # Enforce Tool-belt awareness + system_prompt += f"\n\n## ๐Ÿ› ๏ธ ACTIVE TOOL-BELT (FORBIDDEN to mention others):\n" + system_prompt += f"You have ONLY these {len(available_skills)} calibrated tools: [{skill_list_str}].\n" + system_prompt += "If a user asks about your capabilities, you MUST only list these specific tools. DO NOT promise Google Calendar, Wolfram Alpha, or any external integrations not in this list." return [ {"role": "system", "content": system_prompt}, @@ -131,7 +103,14 @@ return "\n\n".join(contexts) or "No context provided." def _default_history_formatter(self, history: List[models.Message]) -> str: - return "\n".join( - f"{'Human' if msg.sender == 'user' else 'Assistant'}: {msg.content}" - for msg in history - ) + lines = [] + for msg in history: + role = "Human" if msg.sender == "user" else "Assistant" + content = msg.content or "" + + # If assistant message is empty, try to summarize its action + if role == "Assistant" and not content: + content = "[Action: Calling tools or internal reasoning...]" + + lines.append(f"{role}: {content}") + return "\n".join(lines) diff --git a/ai-hub/app/core/orchestration/profiles.py b/ai-hub/app/core/orchestration/profiles.py new file mode 100644 index 0000000..338b4c8 --- /dev/null +++ b/ai-hub/app/core/orchestration/profiles.py @@ -0,0 +1,151 @@ +from typing import List, Optional +import re + +# --- Template Definitions (Internal) --- + +DEFAULT_PROMPT_TEMPLATE = """You are the Cortex AI Assistant, the **Master-Architect** of a decentralized agent mesh. + +## ๐Ÿ—๏ธ Orchestration Strategy (The Master-Worker Pattern): +- **Master Control**: YOU are the brain. You define every project step based on intelligence reports from your field agents. +- **Atomic Operations**: Assign ONLY atomic, self-contained tasks. +- **Intelligence Reports**: Sub-agents will return a `REPORT` summarizing their findings. Use this distilled intelligence as your primary source of truth for the NEXT step. +- **Visible Reasoning**: You MUST provide textual analysis/strategy at the start of EVERY turn. +- **Fixed Tool-belt**: You ONLY have the tools literally provided in your tool-belt. DO NOT hallucinate tools like `mesh_tool_explorer`, `list_available_tools`, or any `` tags. Use standard function calling. + +## ๐Ÿš€ Execution Mandate: +- **Perpetual Pursuit**: DO NOT stop until the user's objective is achieved. +- **No Idle Turns**: If a sub-goal is reached, immediately pivot to the next atomic task. +- **Direct Terminal Answer**: If you possess the information to answer a user's question directly without tools (e.g., questions about your identity or known capabilities), provide the answer and **TERMINATE** the orchestration loop by omitting any further tool calls. +- **NO SILENT ACTIONS**: You are **FORBIDDEN** from calling a tool without first providing at least one sentence of **plain text** analysis/strategy. + +## โœ๏ธ Interaction Format (MANDATORY): +1. **BRIDGE ANALYSIS (Plain Text)**: At the start of EVERY turn, write 1-2 sentences of auditable analysis. +2. **ACT**: Call the single atomic tool required for your plan. + +## ๐Ÿ“‚ Infrastructure & Ghost Mirror: +- **Node Sync Path**: All synced files are at `/tmp/cortex-sync/{{session_id}}/` on agent nodes. +- **Hub Mirror**: Use `mesh_file_explorer` with `session_id` to read/list files from the central mirror (~1ms speed). + +Infrastructure Context (Mesh): +{mesh_context} + +RAG Context: +{context} + +Conversation History: +{chat_history} + +User Question: {question} + +Answer:""" + +VOICE_PROMPT_TEMPLATE = """You are a conversational voice assistant. +Keep your responses short, natural, and friendly. + +## ๐ŸŽค Voice Interaction Rules: +- **Final Result Only**: You are a voice assistant. ONLY the final, direct answer will be vocalized. Avoid speaking during intermediate tool-calling turns. +- **Direct Responses**: Do NOT provide any "Master-Architect" analysis, bridging sentences, or internal strategy. JUST provide the final content for the user. +- **Fixed Tool-belt**: You ONLY have access to the tools literally provided to you. DO NOT hallucinate external 'Agents' or 'Tool Explorers' (e.g. `mesh_tool_explorer`). +- **Final Spoken Result**: In your final turn (after any tool calls), you MUST provide a direct, concise summary or answer for the user to hear (e.g., "I've checked the servers; all systems are green."). +- **Outside Thinking Tags**: You MUST provide your final spoken answer OUTSIDE of any `` tags. Anything inside tags is used for internal processing and will NOT be spoken to the user. +- **Conversational Tone**: Focus on rhythm and prosody. + +Current Infrastructure: {mesh_context} + +Conversation History: +{chat_history} + +User Question: {question} + +Answer:""" + +# --- Profile Definitions --- + +class FeatureProfile: + """ + Defines the behavior of the Orchestrator for specific features + (e.g., chat vs voice vs autonomous swarm). + """ + def __init__( + self, + name: str, + template: str, + silent_stream: bool = False, + show_heartbeat: bool = True, + buffer_content: bool = False, + strip_headers: List[str] = [], + default_prompt_slug: str = "rag-pipeline", + include_mesh_context: bool = True, + autonomous_limit: int = 500 + ): + self.name = name + self.template = template + self.silent_stream = silent_stream + self.show_heartbeat = show_heartbeat + self.buffer_content = buffer_content + self.strip_headers = strip_headers + self.default_prompt_slug = default_prompt_slug + self.include_mesh_context = include_mesh_context + self.autonomous_limit = autonomous_limit + +# Central Registry for Interaction Modes +PROFILES = { + "default": FeatureProfile( + name="chat", + template=DEFAULT_PROMPT_TEMPLATE + ), + "chat": FeatureProfile( + name="chat", + template=DEFAULT_PROMPT_TEMPLATE + ), + "swarm_control": FeatureProfile( + name="swarm_control", + template=DEFAULT_PROMPT_TEMPLATE + ), + "swarm": FeatureProfile( + name="swarm_control", + template=DEFAULT_PROMPT_TEMPLATE + ), + "workflow": FeatureProfile( + name="workflow", + template=DEFAULT_PROMPT_TEMPLATE + ), + "voice": FeatureProfile( + name="voice", + template=VOICE_PROMPT_TEMPLATE, + silent_stream=True, + show_heartbeat=False, + buffer_content=True, + strip_headers=[ + r"###\s+๐Ÿ›ฐ๏ธ\s+\*\*\[Turn\s+\d+\]\s+Master-Architect\s+Analysis\*\*", + r"๐Ÿ›ฐ๏ธ\s+\[Turn\s+\d+\]\s+Master-Architect\s+Analysis", + r"Turn\s+\d+:\s+architecting\s+next\s+step\.\.\." + ], + default_prompt_slug="voice-pipeline", + include_mesh_context=False, + autonomous_limit=10 + ), + "voice_chat": FeatureProfile( + name="voice", + template=VOICE_PROMPT_TEMPLATE, + silent_stream=True, + show_heartbeat=False, + buffer_content=True, + strip_headers=[ + r"###\s+๐Ÿ›ฐ๏ธ\s+\*\*\[Turn\s+\d+\]\s+Master-Architect\s+Analysis\*\*", + r"๐Ÿ›ฐ๏ธ\s+\[Turn\s+\d+\]\s+Master-Architect\s+Analysis", + r"Turn\s+\d+:\s+architecting\s+next\s+step\.\.\." + ], + default_prompt_slug="voice-pipeline", + include_mesh_context=False, + autonomous_limit=10 + ) +} + +def get_profile(name: str) -> FeatureProfile: + """Retrieves the interaction profile for a given feature name.""" + return PROFILES.get(name, PROFILES["default"]) + +def get_allowed_features() -> List[str]: + """Returns a list of all strictly supported feature names.""" + return [k for k in PROFILES.keys() if k != "default"] diff --git a/ai-hub/app/core/orchestration/stream.py b/ai-hub/app/core/orchestration/stream.py index 4cfb1d6..e81dc34 100644 --- a/ai-hub/app/core/orchestration/stream.py +++ b/ai-hub/app/core/orchestration/stream.py @@ -4,10 +4,11 @@ class StreamProcessor: """Handles logical processing of LLM streams: thinking tags and content routing.""" - def __init__(self): + def __init__(self, profile: Any): self._in_thinking_tag = False self.tag_buffer = "" self.header_sent = False + self.profile = profile def reset_turn(self): self.header_sent = False @@ -75,13 +76,17 @@ self.tag_buffer = "" def _apply_turn_header(self, text: str, header: str) -> Optional[str]: - if not text or self.header_sent: + if self.profile.silent_stream: + # Aggressively strip any hallucinated headers defined in the profile + for pattern in self.profile.strip_headers: + text = re.sub(pattern, "", text, flags=re.IGNORECASE) return text + + if not text: + return text + + if not self.header_sent: + self.header_sent = True + return header + text - # Idempotency check - has_existing = re.search(r"Turn\s+\d+|Master-Architect\s+Analysis|###\s+๐Ÿ›ฐ๏ธ", text, re.IGNORECASE) - if not has_existing: - text = "\n\n" + header + text - - self.header_sent = True return text diff --git a/ai-hub/app/core/services/rag.py b/ai-hub/app/core/services/rag.py index 34797be..e81c465 100644 --- a/ai-hub/app/core/services/rag.py +++ b/ai-hub/app/core/services/rag.py @@ -6,6 +6,7 @@ from app.core.retrievers.base_retriever import Retriever from app.core.providers.factory import get_llm_provider from app.core.orchestration import Architect +from app.core.orchestration.profiles import get_profile class RAGService: """ @@ -91,9 +92,10 @@ tools = [] if self.tool_service: tools = self.tool_service.get_available_tools(db, session.user_id, feature=session.feature_name) - + + profile = get_profile(session.feature_name) mesh_context = "" - if session.attached_node_ids: + if session.attached_node_ids and profile.include_mesh_context: nodes = db.query(models.AgentNode).filter(models.AgentNode.node_id.in_(session.attached_node_ids)).all() if nodes: mesh_context = "Attached Agent Nodes (Infrastructure):\n" @@ -174,7 +176,7 @@ sync_workspace_id = session.sync_workspace_id, session_id = session_id, feature_name = session.feature_name, - prompt_slug = "rag-pipeline" + prompt_slug = profile.default_prompt_slug ): if event["type"] == "content": full_answer += event["content"] diff --git a/ai-hub/app/core/services/sub_agent.py b/ai-hub/app/core/services/sub_agent.py index 399deb5..bb41e1e 100644 --- a/ai-hub/app/core/services/sub_agent.py +++ b/ai-hub/app/core/services/sub_agent.py @@ -13,15 +13,13 @@ Handles execution, result accumulation, and state monitoring. """ def __init__(self, name: str, task_fn, args: dict, retries: int = 1, - llm_provider=None, assistant=None, subagent_system_prompt: str = None, - on_event=None): + llm_provider=None, assistant=None, on_event=None): self.name = name self.task_fn = task_fn self.args = args self.retries = retries self.llm = llm_provider self.assistant = assistant - self.subagent_system_prompt = subagent_system_prompt self.on_event = on_event self.status = "PENDING" self.result = None diff --git a/ai-hub/app/core/services/tool.py b/ai-hub/app/core/services/tool.py index ce7c119..9943d37 100644 --- a/ai-hub/app/core/services/tool.py +++ b/ai-hub/app/core/services/tool.py @@ -46,11 +46,17 @@ if any(t["function"]["name"] == ds.name for t in tools): continue + # M3: Use the public description, but append internal AI instructions if available + # This makes the "system prompt" invisible to end users but fully visible to the Orchestrator. + description = ds.description or "" + if ds.system_prompt: + description += f"\n\nInternal Intelligence Protocol:\n{ds.system_prompt}" + tools.append({ "type": "function", "function": { "name": ds.name, - "description": ds.description, + "description": description, "parameters": ds.config.get("parameters", {}) } }) @@ -81,6 +87,7 @@ from app.core.services.sub_agent import SubAgent from app.core.providers.factory import get_llm_provider + llm_provider = None orchestrator = getattr(self._services, "orchestrator", None) if not orchestrator: return {"success": False, "error": "Orchestrator not available"} @@ -98,23 +105,32 @@ logger.info(f"[ToolService] Executing {skill.name} on {node_id or 'swarm'} (Resolved Session: {resolved_sid})") - # --- AI Sub-Agent Setup --- - llm_provider = None - subagent_prompt_base = skill.config.get("subagent_system_prompt", "") - - # Inject Mesh Context for Atomic Monitoring - mesh_instruction = f"\n\n## Atomic Step Environment:\n- Node Sync Path: `/tmp/cortex-sync/{resolved_sid}/`.\n- Mission: Execute the requested command and monitor for completion. Do not make autonomous decisions; return the final output to the Master Architect." - subagent_prompt = subagent_prompt_base + mesh_instruction - - if db and user_id and subagent_prompt: + if db and user_id: user = db.query(models.User).filter(models.User.id == user_id).first() if user: # Use user's preferred model, or fallback to system default p_name = user.preferences.get("llm_provider", "gemini") m_name = user.preferences.get("llm_model", "") + + # Fetch provider-specific keys from user or system defaults + llm_prefs = user.preferences.get("llm", {}).get("providers", {}).get(p_name, {}) + user_service = getattr(self._services, "user_service", None) + + if (not llm_prefs or not llm_prefs.get("api_key") or "*" in str(llm_prefs.get("api_key"))) and user_service: + system_prefs = user_service.get_system_settings(db) + system_provider_prefs = system_prefs.get("llm", {}).get("providers", {}).get(p_name, {}) + if system_provider_prefs: + merged = system_provider_prefs.copy() + if llm_prefs: merged.update({k: v for k, v in llm_prefs.items() if v}) + llm_prefs = merged + + api_key_override = llm_prefs.get("api_key") + actual_m_name = m_name or llm_prefs.get("model", "") + kwargs = {k: v for k, v in llm_prefs.items() if k not in ["api_key", "model"]} + try: - llm_provider = get_llm_provider(p_name, m_name) - logger.info(f"[ToolService] AI Sub-Agent enabled using {p_name}/{m_name}") + llm_provider = get_llm_provider(p_name, model_name=actual_m_name, api_key_override=api_key_override, **kwargs) + logger.info(f"[ToolService] AI Sub-Agent enabled using {p_name}/{actual_m_name}") except Exception as e: logger.warning(f"[ToolService] Could not init LLM for sub-agent: {e}") @@ -227,7 +243,6 @@ retries=0 if skill.name in ["mesh_terminal_control", "mesh_wait_tasks"] else 2, llm_provider=llm_provider, assistant=assistant, - subagent_system_prompt=subagent_prompt, on_event=on_event ) res = await sub_agent.run() diff --git a/ai-hub/app/core/skills/bootstrap.py b/ai-hub/app/core/skills/bootstrap.py index fc8cfb3..6163b35 100644 --- a/ai-hub/app/core/skills/bootstrap.py +++ b/ai-hub/app/core/skills/bootstrap.py @@ -30,9 +30,10 @@ existing.description = skill_def.get("description") existing.skill_type = skill_def.get("skill_type") existing.config = skill_def.get("config") - existing.system_prompt = skill_def.get("system_prompt") existing.is_enabled = skill_def.get("is_enabled", True) existing.features = skill_def.get("features", ["chat"]) + existing.extra_metadata = skill_def.get("extra_metadata", {}) + existing.preview_markdown = skill_def.get("preview_markdown") existing.is_system = True existing.owner_id = admin.id else: @@ -41,10 +42,11 @@ name=skill_def["name"], description=skill_def.get("description"), skill_type=skill_def.get("skill_type"), - config=skill_def["config"], - system_prompt=skill_def.get("system_prompt"), + config=skill_def.get("config", {}), is_enabled=skill_def.get("is_enabled", True), features=skill_def.get("features", ["chat"]), + extra_metadata=skill_def.get("extra_metadata", {}), + preview_markdown=skill_def.get("preview_markdown"), is_system=True, owner_id=admin.id ) diff --git a/ai-hub/app/core/skills/definitions.py b/ai-hub/app/core/skills/definitions.py index 35220c6..03f87de 100644 --- a/ai-hub/app/core/skills/definitions.py +++ b/ai-hub/app/core/skills/definitions.py @@ -1,214 +1,14 @@ # app/core/skills/definitions.py +import os +from .loader import load_skills_from_directory +from app.core.orchestration.profiles import get_allowed_features -SYSTEM_SKILLS = [ - { - "name": "mesh_terminal_control", - "description": "Execute stateful shell commands and manage terminal sessions across the agent mesh (Swarm Control).", - "system_prompt": ( - "You are a high-level Mesh Orchestrator. When executing commands:\n" - "1. **Parallel Execution**: Use 'node_ids' (plural) for simultaneous swarm sweeps.\n" - "2. **Immediate Knowledge**: Calls return as soon as the task finishes. If a task takes 1s but you set timeout=60, you get the result in 1s.\n" - "3. **Asynchronous Polling**: For background tasks, set 'no_abort=True'. If it times out, you get 'TIMEOUT_PENDING'. " - "You can then use 'mesh_wait_tasks' with 'timeout=0' to peek at progress without blocking your turn.\n" - "4. **Interactive Sub-shells**: Subsequent REPL inputs MUST use the `!RAW:` prefix.\n" - "5. **Swarm Flow**: To start a background server (e.g. iperf3 -s) and move to node 2 immediately, " - "use 'no_abort=True' and a SMALL 'timeout' (e.g. 2s). Never block your planning turn waiting for a persistent service.\n" - "6. **Privilege-Aware Commands**: Each node's 'Privilege Level' is shown in the mesh context. " - "Use it to decide how to run privileged operations:\n" - " - 'root': Run commands directly (no sudo prefix needed or available).\n" - " - 'standard user with passwordless sudo': Prepend sudo to privileged commands.\n" - " - 'standard user (sudo NOT available)': Avoid privileged ops or inform the user.\n" - "7. **iperf3 Speed Test Pattern**: " - "Step A: On the server node, run 'iperf3 -s -D' (daemon mode) with timeout=3, no_abort=True. " - "Step B: On the client node, run 'iperf3 -c -t 10' with timeout=20. " - "Node IPs are in the mesh context." - ), - "skill_type": "remote_grpc", - "is_enabled": True, - "features": ["chat", "swarm_control"], - "config": { - "service": "TerminalService", - "method": "Execute", - "capabilities": ["shell", "pty", "interactive"], - "subagent_system_prompt": ( - "You are an autonomous Per-Node Monitoring Agent. Your goal is to watch the terminal output of a command and decide its state.\n" - "- If the command output shows it is clearly finished or at a prompt (e.g. >>> or $), respond with action: FINISH.\n" - "- If the output shows a prompt asking for input (e.g. [yes/no], [y/n], or 'Password:'), respond with action: EXECUTE and command: '!RAW:yes' (or appropriate response).\n" - "- If the output shows it is still processing and making progress, respond with action: WAIT.\n" - "- If the output has stopped for a long time or looks stuck in an infinite loop without useful output, respond with action: ABORT.\n" - "Respond ONLY in JSON." - ), - "parameters": { - "type": "object", - "properties": { - "command": {"type": "string", "description": "Command to run. Use !RAW: prefix for REPL inputs."}, - "node_id": {"type": "string", "description": "Target node ID."}, - "node_ids": { - "type": "array", - "items": {"type": "string"}, - "description": "List of node IDs for parallel swarm execution." - }, - "timeout": {"type": "integer", "description": "Max seconds to wait. Default 30."}, - "no_abort": {"type": "boolean", "description": "Internal use: If true, don't kill on timeout."}, - "session_id": {"type": "string", "description": "Optional persistent session ID."} - }, - "required": ["command"] - } - }, - "is_system": True - }, - { - "name": "mesh_wait_tasks", - "description": "Smartly poll or wait for background tasks.", - "system_prompt": ( - "Wait for 'TIMEOUT_PENDING' tasks. This uses an AI sub-agent to monitor progress. " - "It will return as soon as the sub-agent detects completion." - ), - "skill_type": "remote_grpc", - "is_enabled": True, - "features": ["chat", "swarm_control"], - "config": { - "subagent_system_prompt": "Monitor the provided task progress. Respond action: FINISH if done, else WAIT.", - "parameters": { - "type": "object", - "properties": { - "task_map": { - "type": "object", - "additionalProperties": {"type": "string"}, - "description": "Map of node_id -> task_id for tasks to wait on." - }, - "timeout": {"type": "integer", "description": "How much longer to wait in seconds. Default 30."}, - "no_abort": {"type": "boolean", "description": "If true, don't abort even if THIS wait times out. Default false."} - }, - "required": ["task_map"] - } - }, - "is_system": True - }, - { - "name": "browser_automation_agent", - "description": "Perform web browsing, form filling, and UI testing on remote agent nodes using Playwright.", - "system_prompt": "You are an AI browsing assistant. Use the Playwright tool to navigate pages, extract information, and interact with web elements. Always provide reasoning for your actions.", - "skill_type": "remote_grpc", - "is_enabled": True, - "features": ["chat", "workflow", "swarm_control"], - "config": { - "service": "BrowserService", - "method": "Navigate", - "capabilities": ["browser", "screenshot", "click"], - "parameters": { - "type": "object", - "properties": { - "url": {"type": "string", "description": "The URL to navigate to."}, - "action": {"type": "string", "enum": ["navigate", "click", "type", "screenshot"], "description": "The browser action to perform."}, - "node_id": {"type": "string", "description": "The target node ID."}, - "session_id": {"type": "string", "description": "Optional session ID to persist browser state (cookies, login)."} - }, - "required": ["url", "action", "node_id"] - } - }, - "is_system": True - }, - { - "name": "voice_interaction_handler", - "description": "Handle real-time voice interruptions, tone analysis, and speech-to-speech feedback loops.", - "system_prompt": "You are a voice-first AI. Keep your responses concise and conversational. Focus on natural prosody and handle interruptions gracefully.", - "skill_type": "local", - "is_enabled": True, - "features": ["voice"], - "config": { - "interaction_mode": "speech-to-speech", - "latency_target": 300, - "parameters": { - "type": "object", - "properties": { - "mode": {"type": "string", "enum": ["active", "passive"], "description": "Voice interaction mode."} - } - } - }, - "is_system": True - }, - { - "name": "mesh_file_explorer", - "description": "List, read, and manipulate files within the decentralized mesh synchronization system.", - "system_prompt": ( - "You are a file management assistant. Use this tool for high-performance file operations:\n" - "1. **`list`**: Explore directories. If a 'session_id' is provided, it uses the zero-latency Hub mirror.\n" - "2. **`read`**: Fetch file content. Uses local Hub mirror fast-path if available.\n" - "3. **`write`**: Synchronously update Hub mirror and background push to node.\n" - "4. **`delete`**: Remove from Hub and dispatch remote delete.\n" - "Always include 'session_id' for improved performance unless you need to bypass the ghost mirror." - ), - "skill_type": "local", - "is_enabled": True, - "features": ["chat", "workflow", "swarm_control"], - "config": { - "internal_module": "app.core.grpc.core.mirror", - "actions": ["list", "read", "write", "delete"], - "parameters": { - "type": "object", - "properties": { - "action": {"type": "string", "enum": ["list", "read", "write", "delete"], "description": "File system action."}, - "path": {"type": "string", "description": "Relative path to the file/directory."}, - "node_id": {"type": "string", "description": "The target node ID."}, - "content": {"type": "string", "description": "Optional content for write action."}, - "session_id": {"type": "string", "description": "Target sync session workspace."} - }, - "required": ["action", "path", "node_id"] - } - }, +# Get the base directory relative to this file +# This assumes the project structure: +# /app/ +# skills/ +# ai-hub/app/core/skills/definitions.py +PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../")) +SKILLS_DIR = os.path.join(PROJECT_ROOT, "skills") - "is_system": True - }, - { - "name": "mesh_sync_control", - "description": "Manage replication, synchronization, and locks across nodes in the decentralized ghost mirror filesystem.", - "system_prompt": ( - "Use this tool to manage the synchronization state of files across the swarm.\n" - "1. **`start`**: Instruct a node to begin watching and syncing a local directory.\n" - "2. **`lock`**: Disable user-side file watcher on a node. Use this BEFORE starting multi-file refactors to prevent race conditions.\n" - "3. **`unlock`**: Restore user-side sync after an AI refactor is complete.\n" - "4. **`resync`**: Force a node to perform a full hash-based reconciliation against the master mirror on the Hub." - ), - "skill_type": "local", - "is_enabled": True, - "features": ["chat", "workflow", "swarm_control"], - "config": { - "actions": ["start", "stop", "lock", "unlock", "resync"], - "parameters": { - "type": "object", - "properties": { - "action": {"type": "string", "enum": ["start", "stop", "lock", "unlock", "resync"], "description": "Control action."}, - "node_id": {"type": "string", "description": "Target node ID."}, - "session_id": {"type": "string", "description": "The workspace session ID to manage."}, - "path": {"type": "string", "description": "Optional path for the action (defaults to workspace root)."} - }, - "required": ["action", "node_id", "session_id"] - } - }, - "is_system": True - }, - { - "name": "mesh_inspect_drift", - "description": "Perform a deep comparison between the Hub's local record and a node's physical file state.", - "system_prompt": ( - "Use this tool when you suspect the Hub mirror is out of sync with an edge node.\n" - "It will return a unified diff showing exactly what changed on the remote node vs your local Hub copy." - ), - "skill_type": "local", - "is_enabled": True, - "features": ["chat", "workflow", "swarm_control"], - "config": { - "parameters": { - "type": "object", - "properties": { - "node_id": {"type": "string", "description": "Target node ID."}, - "path": {"type": "string", "description": "Relative path to the file to inspect."}, - "session_id": {"type": "string", "description": "The workspace session ID."} - }, - "required": ["node_id", "path", "session_id"] - } - }, - "is_system": True - } -] +SYSTEM_SKILLS = load_skills_from_directory(SKILLS_DIR) diff --git a/ai-hub/app/core/skills/loader.py b/ai-hub/app/core/skills/loader.py new file mode 100644 index 0000000..10e403d --- /dev/null +++ b/ai-hub/app/core/skills/loader.py @@ -0,0 +1,64 @@ +import os +import yaml +from typing import List, Dict + +def load_skills_from_directory(directory: str) -> List[Dict]: + skills = [] + if not os.path.exists(directory): + return skills + + for item in os.listdir(directory): + item_path = os.path.join(directory, item) + if os.path.isdir(item_path): + skill_md_path = os.path.join(item_path, "SKILL.md") + if os.path.exists(skill_md_path): + with open(skill_md_path, "r") as f: + content = f.read() + + # Simple frontmatter parser + if content.startswith("---"): + parts = content.split("---", 2) + if len(parts) >= 3: + try: + frontmatter = yaml.safe_load(parts[1]) + body = parts[2].strip() + + skill_def = frontmatter + + # Split body into Documentation and AI Instructions + # Convention: Use common headers or tags to denote the start of AI instructions + human_docs = body + ai_instructions = "" + + separators = ["# AI Instructions", "# Intelligence Protocol", ""] + for sep in separators: + if sep in body: + split_parts = body.split(sep, 1) + human_docs = split_parts[0].strip() + ai_instructions = split_parts[1].strip() + break + + skill_def["preview_markdown"] = human_docs + skill_def["system_prompt"] = ai_instructions + + # Ensure metadata exists + if "extra_metadata" not in skill_def: + skill_def["extra_metadata"] = {} + + # Handle common OpenClaw metadata locations + if "emoji" in frontmatter and "emoji" not in skill_def["extra_metadata"]: + skill_def["extra_metadata"]["emoji"] = frontmatter["emoji"] + + # Validation: Ensure features match platform profiles + from app.core.orchestration.profiles import get_allowed_features + allowed = get_allowed_features() + skill_features = skill_def.get("features", []) + if not all(f in allowed for f in skill_features): + invalid = [f for f in skill_features if f not in allowed] + # Only log warning, don't crash, to remain developer-friendly + print(f"โš ๏ธ Warning in {skill_def.get('name')}: features {invalid} are not registered in profiles.py. They will use the default 'chat' behavior.") + + skills.append(skill_def) + except Exception as e: + print(f"Error parsing {skill_md_path}: {e}") + return skills diff --git a/ai-hub/app/db/migrate.py b/ai-hub/app/db/migrate.py index c14be17..46d0c3f 100644 --- a/ai-hub/app/db/migrate.py +++ b/ai-hub/app/db/migrate.py @@ -158,6 +158,8 @@ ("features", "TEXT DEFAULT '[\"chat\"]'"), ("is_system", "INTEGER DEFAULT 0"), ("skill_type", "TEXT DEFAULT 'local'"), + ("extra_metadata", "TEXT DEFAULT '{}'"), + ("preview_markdown", "TEXT"), ] for col_name, col_type in skill_required_columns: if col_name not in skill_columns: diff --git a/ai-hub/app/db/models.py b/ai-hub/app/db/models.py index cadd9e7..2dce0b3 100644 --- a/ai-hub/app/db/models.py +++ b/ai-hub/app/db/models.py @@ -276,13 +276,18 @@ config = Column(JSON, default={}, nullable=True) # Extended properties - system_prompt = Column(String, nullable=True) + system_prompt = Column(Text, nullable=True) # Internal Prompt for the AI is_enabled = Column(Boolean, default=True) features = Column(JSON, default=["chat"], nullable=True) # e.g. ["chat", "voice"] owner_id = Column(String, ForeignKey('users.id'), nullable=False) is_system = Column(Boolean, default=False) + # Store the full SKILL.md body for UI display + preview_markdown = Column(Text, nullable=True) + # Store OpenClaw-style metadata (emoji, tags, etc) + extra_metadata = Column(JSON, default={}, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow) owner = relationship("User") diff --git a/docker-compose.test-nodes.yml b/docker-compose.test-nodes.yml index 5dfdd65..4ad80ac 100644 --- a/docker-compose.test-nodes.yml +++ b/docker-compose.test-nodes.yml @@ -18,6 +18,8 @@ cap_add: - NET_ADMIN privileged: true + volumes: + - ./skills:/app/skills:ro test-node-2: build: @@ -35,3 +37,5 @@ cap_add: - NET_ADMIN privileged: true + volumes: + - ./skills:/app/skills:ro diff --git a/docker-compose.yml b/docker-compose.yml index 8e641d8..f50b281 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -43,6 +43,7 @@ volumes: - ai_hub_data:/app/data:rw - ./agent-node:/app/agent-node-source:ro + - ./skills:/app/skills:ro deploy: resources: limits: diff --git a/skills/browser-automation-agent/SKILL.md b/skills/browser-automation-agent/SKILL.md new file mode 100644 index 0000000..30968ee --- /dev/null +++ b/skills/browser-automation-agent/SKILL.md @@ -0,0 +1,48 @@ +--- +name: browser_automation_agent +emoji: "๐ŸŒ" +description: Perform web browsing, form filling, and UI testing on remote agent nodes + using Playwright. +skill_type: remote_grpc +is_enabled: true +features: +- chat +- workflow +- swarm_control +config: + service: BrowserService + method: Navigate + capabilities: + - browser + - screenshot + - click + parameters: + type: object + properties: + url: + type: string + description: The URL to navigate to. + action: + type: string + enum: + - navigate + - click + - type + - screenshot + description: The browser action to perform. + node_id: + type: string + description: The target node ID. + session_id: + type: string + description: Optional session ID to persist browser state (cookies, login). + required: + - url + - action + - node_id +is_system: true +--- + +# Browser Automation Agent + +You are an AI browsing assistant. Use the Playwright tool to navigate pages, extract information, and interact with web elements. Always provide reasoning for your actions. diff --git a/skills/browser-automation-agent/logic.py b/skills/browser-automation-agent/logic.py new file mode 100644 index 0000000..1c19b9b --- /dev/null +++ b/skills/browser-automation-agent/logic.py @@ -0,0 +1,150 @@ +import threading +import queue +import time +import json +from playwright.sync_api import sync_playwright +from agent_node.skills.base import BaseSkill +from protos import agent_pb2 + +class BrowserSkill(BaseSkill): + """The 'Antigravity Bridge': Persistent Browser Skill using a dedicated Actor thread.""" + def __init__(self, sync_mgr=None): + self.task_queue = queue.Queue() + self.sessions = {} # session_id -> { "context": Context, "page": Page } + self.sync_mgr = sync_mgr + self.lock = threading.Lock() + threading.Thread(target=self._browser_actor, daemon=True, name="BrowserActor").start() + + def _setup_listeners(self, sid, page, on_event): + """Tunnels browser internal events back to the Orchestrator.""" + if not on_event: return + + # Live Console Redirector + page.on("console", lambda msg: on_event(agent_pb2.BrowserEvent( + session_id=sid, console_msg=agent_pb2.ConsoleMessage( + level=msg.type, text=msg.text, timestamp_ms=int(time.time()*1000) + ) + ))) + + # Live Network Redirector + page.on("requestfinished", lambda req: on_event(agent_pb2.BrowserEvent( + session_id=sid, network_req=agent_pb2.NetworkRequest( + method=req.method, url=req.url, status=req.response().status if req.response() else 0, + resource_type=req.resource_type, latency_ms=0 + ) + ))) + + # Live Download Redirector + page.on("download", lambda download: self._handle_download(sid, download)) + + def _handle_download(self, sid, download): + """Saves browser downloads directly into the synchronized session workspace.""" + import os + with self.lock: + sess = self.sessions.get(sid) + if sess and sess.get("download_dir"): + os.makedirs(sess["download_dir"], exist_ok=True) + target = os.path.join(sess["download_dir"], download.suggested_filename) + print(f" [๐ŸŒ๐Ÿ“ฅ] Browser Download Sync: {download.suggested_filename} -> {target}") + download.save_as(target) + + def _browser_actor(self): + """Serializes all Playwright operations on a single dedicated thread.""" + print("[๐ŸŒ] Browser Actor Starting...", flush=True) + pw = None + browser = None + try: + pw = sync_playwright().start() + # 12-Factor/Container Optimization: Standard non-sandbox arguments + browser = pw.chromium.launch(headless=True, args=[ + '--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage', '--disable-gpu' + ]) + print("[๐ŸŒ] Browser Engine Online.", flush=True) + except Exception as e: + print(f"[!] Browser Actor Startup Fail: {e}", flush=True) + if pw: pw.stop() + return + + while True: + try: + item = self.task_queue.get() + if item is None: # Sentinel for shutdown + print("[๐ŸŒ] Browser Actor Shutting Down...", flush=True) + break + + task, sandbox, on_complete, on_event = item + action = task.browser_action + sid = action.session_id or "default" + + with self.lock: + if sid not in self.sessions: + # Phase 4: Mount workspace for downloads/uploads + download_dir = None + if self.sync_mgr and task.session_id: + download_dir = self.sync_mgr.get_session_dir(task.session_id) + print(f" [๐ŸŒ๐Ÿ“] Mapping Browser Context to: {download_dir}") + + ctx = browser.new_context(accept_downloads=True) + pg = ctx.new_page() + self._setup_listeners(sid, pg, on_event) + self.sessions[sid] = {"context": ctx, "page": pg, "download_dir": download_dir} + + page = self.sessions[sid]["page"] + print(f" [๐ŸŒ] Browser Actor Processing: {agent_pb2.BrowserAction.ActionType.Name(action.action)} | Session: {sid}", flush=True) + + res_data = {} + # State-Machine Logic for Actions + if action.action == agent_pb2.BrowserAction.NAVIGATE: + page.goto(action.url, wait_until="commit") + elif action.action == agent_pb2.BrowserAction.CLICK: + page.click(action.selector) + elif action.action == agent_pb2.BrowserAction.TYPE: + page.fill(action.selector, action.text) + elif action.action == agent_pb2.BrowserAction.SCREENSHOT: + res_data["snapshot"] = page.screenshot() + elif action.action == agent_pb2.BrowserAction.GET_DOM: + res_data["dom_content"] = page.content() + elif action.action == agent_pb2.BrowserAction.HOVER: + page.hover(action.selector) + elif action.action == agent_pb2.BrowserAction.SCROLL: + page.mouse.wheel(x=0, y=action.y) + elif action.action == agent_pb2.BrowserAction.EVAL: + res_data["eval_result"] = str(page.evaluate(action.text)) + elif action.action == agent_pb2.BrowserAction.GET_A11Y: + res_data["a11y_tree"] = json.dumps(page.accessibility.snapshot()) + elif action.action == agent_pb2.BrowserAction.CLOSE: + with self.lock: + sess = self.sessions.pop(sid, None) + if sess: sess["context"].close() + + # Results Construction + br_res = agent_pb2.BrowserResponse( + url=page.url, title=page.title(), + snapshot=res_data.get("snapshot", b""), + dom_content=res_data.get("dom_content", ""), + a11y_tree=res_data.get("a11y_tree", ""), + eval_result=res_data.get("eval_result", "") + ) + on_complete(task.task_id, {"status": 1, "browser_result": br_res}, task.trace_id) + except Exception as e: + print(f" [!] Browser Actor Error: {e}", flush=True) + on_complete(task.task_id, {"stderr": str(e), "status": 2}, task.trace_id) + + # Cleanup on loop exit + print("[๐ŸŒ] Cleaning up Browser Engine...", flush=True) + with self.lock: + for s in self.sessions.values(): + try: s["context"].close() + except: pass + self.sessions.clear() + if browser: browser.close() + if pw: pw.stop() + + def execute(self, task, sandbox, on_complete, on_event=None): + self.task_queue.put((task, sandbox, on_complete, on_event)) + + def cancel(self, task_id): return False + + def shutdown(self): + """Triggers graceful shutdown of the browser engine.""" + self.task_queue.put(None) diff --git a/skills/mesh-file-explorer/SKILL.md b/skills/mesh-file-explorer/SKILL.md new file mode 100644 index 0000000..a6b05ca --- /dev/null +++ b/skills/mesh-file-explorer/SKILL.md @@ -0,0 +1,56 @@ +--- +name: mesh_file_explorer +emoji: "๐Ÿ“‚" +description: List, read, and manipulate files within the decentralized mesh synchronization + system. +skill_type: local +is_enabled: true +features: +- chat +- workflow +- swarm_control +config: + internal_module: app.core.grpc.core.mirror + actions: + - list + - read + - write + - delete + parameters: + type: object + properties: + action: + type: string + enum: + - list + - read + - write + - delete + description: File system action. + path: + type: string + description: Relative path to the file/directory. + node_id: + type: string + description: The target node ID. + content: + type: string + description: Optional content for write action. + session_id: + type: string + description: Target sync session workspace. + required: + - action + - path + - node_id +is_system: true +--- + +# Mesh File Explorer + +You are a file management assistant. Use this tool for high-performance file operations: +1. **`list`**: Explore directories. If a 'session_id' is provided, it uses the zero-latency Hub mirror. +2. **`read`**: Fetch file content. Uses local Hub mirror fast-path if available. +3. **`write`**: Synchronously update Hub mirror and background push to node. +4. **`delete`**: Remove from Hub and dispatch remote delete. +Always include 'session_id' for improved performance unless you need to bypass the ghost mirror. diff --git a/skills/mesh-file-explorer/logic.py b/skills/mesh-file-explorer/logic.py new file mode 100644 index 0000000..a8bd080 --- /dev/null +++ b/skills/mesh-file-explorer/logic.py @@ -0,0 +1,77 @@ +import os +import json +import logging +from agent_node.skills.base import BaseSkill + +logger = logging.getLogger(__name__) + +class FileSkill(BaseSkill): + """Provides file system navigation and inspection capabilities.""" + + def __init__(self, sync_mgr=None): + self.sync_mgr = sync_mgr + + def execute(self, task, sandbox, on_complete, on_event=None): + """ + Executes a file-related task (list, stats). + Payload JSON: { "action": "list", "path": "...", "recursive": false } + """ + try: + payload = json.loads(task.payload_json) + action = payload.get("action", "list") + path = payload.get("path", ".") + + # 1. Sandbox Jail Check + # (In a real implementation, we'd use sandbox.check_path(path)) + # For now, we'll assume the node allows browsing its root or session dir. + + if action == "list": + result = self._list_dir(path, payload.get("recursive", False)) + on_complete(task.task_id, {"status": 1, "stdout": json.dumps(result)}, task.trace_id) + else: + on_complete(task.task_id, {"status": 0, "stderr": f"Unknown action: {action}"}, task.trace_id) + + except Exception as e: + logger.error(f"[FileSkill] Task {task.task_id} failed: {e}") + on_complete(task.task_id, {"status": 0, "stderr": str(e)}, task.trace_id) + + def _list_dir(self, path, recursive=False): + """Lists directory contents with metadata.""" + if not os.path.exists(path): + return {"error": "Path not found"} + + items = [] + if recursive: + for root, dirs, files in os.walk(path): + for name in dirs + files: + abs_path = os.path.join(root, name) + rel_path = os.path.relpath(abs_path, path) + st = os.stat(abs_path) + items.append({ + "name": name, + "path": rel_path, + "is_dir": os.path.isdir(abs_path), + "size": st.st_size, + "mtime": st.st_mtime + }) + else: + for name in os.listdir(path): + abs_path = os.path.join(path, name) + st = os.stat(abs_path) + items.append({ + "name": name, + "is_dir": os.path.isdir(abs_path), + "size": st.st_size, + "mtime": st.st_mtime + }) + + return { + "root": os.path.abspath(path), + "items": sorted(items, key=lambda x: (not x["is_dir"], x["name"])) + } + + def cancel(self, task_id): + return False # Listing is usually fast, no cancellation needed + + def shutdown(self): + pass diff --git a/skills/mesh-inspect-drift/SKILL.md b/skills/mesh-inspect-drift/SKILL.md new file mode 100644 index 0000000..bfd8d1f --- /dev/null +++ b/skills/mesh-inspect-drift/SKILL.md @@ -0,0 +1,35 @@ +--- +name: mesh_inspect_drift +emoji: "๐Ÿ”" +description: Perform a deep comparison between the Hub's local record and a node's + physical file state. +skill_type: local +is_enabled: true +features: +- chat +- workflow +- swarm_control +config: + parameters: + type: object + properties: + node_id: + type: string + description: Target node ID. + path: + type: string + description: Relative path to the file to inspect. + session_id: + type: string + description: The workspace session ID. + required: + - node_id + - path + - session_id +is_system: true +--- + +# Mesh Inspect Drift + +Use this tool when you suspect the Hub mirror is out of sync with an edge node. +It will return a unified diff showing exactly what changed on the remote node vs your local Hub copy. diff --git a/skills/mesh-sync-control/SKILL.md b/skills/mesh-sync-control/SKILL.md new file mode 100644 index 0000000..5d9e2c5 --- /dev/null +++ b/skills/mesh-sync-control/SKILL.md @@ -0,0 +1,53 @@ +--- +name: mesh_sync_control +emoji: "๐Ÿ”„" +description: Manage replication, synchronization, and locks across nodes in the decentralized + ghost mirror filesystem. +skill_type: local +is_enabled: true +features: +- chat +- workflow +- swarm_control +config: + actions: + - start + - stop + - lock + - unlock + - resync + parameters: + type: object + properties: + action: + type: string + enum: + - start + - stop + - lock + - unlock + - resync + description: Control action. + node_id: + type: string + description: Target node ID. + session_id: + type: string + description: The workspace session ID to manage. + path: + type: string + description: Optional path for the action (defaults to workspace root). + required: + - action + - node_id + - session_id +is_system: true +--- + +# Mesh Sync Control + +Use this tool to manage the synchronization state of files across the swarm. +1. **`start`**: Instruct a node to begin watching and syncing a local directory. +2. **`lock`**: Disable user-side file watcher on a node. Use this BEFORE starting multi-file refactors to prevent race conditions. +3. **`unlock`**: Restore user-side sync after an AI refactor is complete. +4. **`resync`**: Force a node to perform a full hash-based reconciliation against the master mirror on the Hub. diff --git a/skills/mesh-terminal-control/SKILL.md b/skills/mesh-terminal-control/SKILL.md new file mode 100644 index 0000000..e1ecf35 --- /dev/null +++ b/skills/mesh-terminal-control/SKILL.md @@ -0,0 +1,62 @@ +--- +name: mesh_terminal_control +emoji: "๐Ÿ–ฅ๏ธ" +description: Execute stateful shell commands and manage terminal sessions across the + agent mesh (Swarm Control). +skill_type: remote_grpc +is_enabled: true +features: +- chat +- swarm_control +config: + service: TerminalService + method: Execute + capabilities: + - shell + - pty + - interactive + parameters: + type: object + properties: + command: + type: string + description: 'Command to run. Use !RAW: prefix for REPL inputs.' + node_id: + type: string + description: Target node ID. + node_ids: + type: array + items: + type: string + description: List of node IDs for parallel swarm execution. + timeout: + type: integer + description: Max seconds to wait. Default 30. + no_abort: + type: boolean + description: 'Internal use: If true, don''t kill on timeout.' + session_id: + type: string + description: Optional persistent session ID. + required: + - command +is_system: true +--- + +# Documentation + +This capability allows the orchestrator to interact with terminal sessions on remote nodes. It supports stateful REPLs, parallel execution across multiple nodes, and background task management. + +# AI Instructions + +You are a high-level Mesh Orchestrator. When executing commands: +1. **Parallel Execution**: Use 'node_ids' (plural) for simultaneous swarm sweeps. +2. **Immediate Knowledge**: Calls return as soon as the task finishes. If a task takes 1s but you set timeout=60, you get the result in 1s. +3. **Asynchronous Polling**: For background tasks, set 'no_abort=True'. If it times out, you get 'TIMEOUT_PENDING'. You can then use 'mesh_wait_tasks' with 'timeout=0' to peek at progress without blocking your turn. +4. **Interactive Sub-shells**: Subsequent REPL inputs MUST use the `!RAW:` prefix. +5. **Swarm Flow**: To start a background server (e.g. iperf3 -s) and move to node 2 immediately, use 'no_abort=True' and a SMALL 'timeout' (e.g. 2s). Never block your planning turn waiting for a persistent service. +6. **Privilege-Aware Commands**: Each node's 'Privilege Level' is shown in the mesh context. Use it to decide how to run privileged operations: + - 'root': Run commands directly (no sudo prefix needed or available). + - 'standard user with passwordless sudo': Prepend sudo to privileged commands. + - 'standard user (sudo NOT available)': Avoid privileged ops or inform the user. +7. **iperf3 Speed Test Pattern**: Step A: On the server node, run 'iperf3 -s -D' (daemon mode) with timeout=3, no_abort=True. Step B: On the client node, run 'iperf3 -c -t 10' with timeout=20. Node IPs are in the mesh context. diff --git a/skills/mesh-terminal-control/logic.py b/skills/mesh-terminal-control/logic.py new file mode 100644 index 0000000..d21e6c6 --- /dev/null +++ b/skills/mesh-terminal-control/logic.py @@ -0,0 +1,410 @@ +import os +import pty +import select +import threading +import time +import termios +import struct +import fcntl +import tempfile +from agent_node.skills.base import BaseSkill +from protos import agent_pb2 + +class ShellSkill(BaseSkill): + """Admin Console Skill: Persistent stateful Bash via PTY.""" + def __init__(self, sync_mgr=None): + self.sync_mgr = sync_mgr + self.sessions = {} # session_id -> {fd, pid, thread, last_activity, ...} + self.lock = threading.Lock() + + # Phase 3: Prompt Patterns for Edge Intelligence + self.PROMPT_PATTERNS = [ + r"[\r\n].*[@\w\.\-]+:.*[#$]\s*$", # bash/zsh: user@host:~$ + r">>>\s*$", # python + r"\.\.\.\s*$", # python multi-line + r">\s*$", # node/js + ] + + # --- M7: Idle Session Reaper --- + # Automatically kills dormant bash processes to free up system resources. + self.reaper_thread = threading.Thread(target=self._session_reaper, daemon=True, name="ShellReaper") + self.reaper_thread.start() + + def _session_reaper(self): + """Background thread that cleans up unused PTY sessions.""" + while True: + time.sleep(60) + with self.lock: + now = time.time() + for sid, sess in list(self.sessions.items()): + # Avoid reaping currently active tasks + if sess.get("active_task"): + continue + + # 10 minute idle timeout + if now - sess.get("last_activity", 0) > 600: + print(f" [๐Ÿš๐Ÿงน] Reaping idle shell session: {sid}") + try: + os.close(sess["fd"]) + os.kill(sess["pid"], 9) + except: pass + self.sessions.pop(sid, None) + + def _ensure_session(self, session_id, cwd, on_event): + with self.lock: + if session_id in self.sessions: + self.sessions[session_id]["last_activity"] = time.time() + return self.sessions[session_id] + + print(f" [๐Ÿš] Initializing Persistent Shell Session: {session_id}") + # Spawn bash in a pty + pid, fd = pty.fork() + if pid == 0: # Child + # Environment prep + os.environ["TERM"] = "xterm-256color" + + # Change to CWD + if cwd and os.path.exists(cwd): + os.chdir(cwd) + + # Launch shell + os.execv("/bin/bash", ["/bin/bash", "--login"]) + + # Parent + # Set non-blocking + fl = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) + + sess = { + "fd": fd, + "pid": pid, + "last_activity": time.time(), + "buffer_file": None, + "tail_buffer": "", + "active_task": None + } + + def reader(): + while True: + try: + r, _, _ = select.select([fd], [], [], 0.1) + if fd in r: + data = os.read(fd, 4096) + if not data: break + + decoded = data.decode("utf-8", errors="replace") + + # Streaming/Sync logic (Detect completion marker) + with self.lock: + active_tid = sess.get("active_task") + marker = sess.get("marker") + if active_tid and marker and sess.get("buffer_file"): + # Phase 2: Persistence Offloading + # Write directly to disk instead of heap memory + sess["buffer_file"].write(decoded) + sess["buffer_file"].flush() + + # Keep a tiny 4KB tail in RAM for marker detection and prompt scanning + sess["tail_buffer"] = (sess.get("tail_buffer", "") + decoded)[-4096:] + + if marker in sess["tail_buffer"]: + # Marker found! Extract exit code + try: + # The tail buffer has the marker + after_marker = sess["tail_buffer"].split(marker)[1].strip().split() + exit_code = int(after_marker[0]) if after_marker else 0 + + # Formulate final stdout summary from the disk file + bf = sess["buffer_file"] + bf.seek(0, 2) + file_len = bf.tell() + + HEAD, TAIL = 10_000, 30_000 + if file_len > HEAD + TAIL: + bf.seek(0) + head_str = bf.read(HEAD) + bf.seek(file_len - TAIL) + tail_str = bf.read() + omitted = file_len - HEAD - TAIL + pure_stdout = head_str + f"\n\n[... {omitted:,} bytes omitted (full output safely preserved at {bf.name}) ...]\n\n" + tail_str + else: + bf.seek(0) + pure_stdout = bf.read() + + # Slice off the marker string and anything after it from the final result + pure_stdout = pure_stdout.split(marker)[0] + + sess["result"]["stdout"] = pure_stdout + sess["result"]["status"] = 1 if exit_code == 0 else 2 + + # Close the file handle (leaves file on disk) + sess["buffer_file"].close() + sess["buffer_file"] = None + + sess["event"].set() + decoded = pure_stdout.split(marker)[0][-4096:] if marker in pure_stdout else pure_stdout + except Exception as e: + print(f" [๐Ÿšโš ๏ธ] Marker parsing failed: {e}") + sess["event"].set() + + # Stream terminal output back (with stealth filtering) + if on_event: + stealth_out = decoded + if "__CORTEX_FIN_SH_" in decoded: + import re + # We remove any line that contains our internal marker to hide plumbing from user. + # This covers both the initial command echo and the exit code output. + stealth_out = re.sub(r'.*__CORTEX_FIN_SH_.*[\r\n]*', '', decoded) + + if stealth_out: + # Phase 3: Client-Side Truncation (Stream Rate Limiting) + # Limit real-time stream to 15KB/sec per session to prevent flooding the Hub over gRPC. + # The full output is still safely written to the tempfile on disk. + with self.lock: + now = time.time() + if now - sess.get("stream_window_start", 0) > 1.0: + sess["stream_window_start"] = now + sess["stream_bytes_sent"] = 0 + dropped = sess.get("stream_dropped_bytes", 0) + if dropped > 0: + drop_msg = f"\n[... {dropped:,} bytes truncated from live stream ...]\n" + event = agent_pb2.SkillEvent( + session_id=session_id, task_id=sess.get("active_task") or "", terminal_out=drop_msg + ) + on_event(agent_pb2.ClientTaskMessage(skill_event=event)) + sess["stream_dropped_bytes"] = 0 + + if sess.get("stream_bytes_sent", 0) + len(stealth_out) > 15_000: + sess["stream_dropped_bytes"] = sess.get("stream_dropped_bytes", 0) + len(stealth_out) + else: + sess["stream_bytes_sent"] = sess.get("stream_bytes_sent", 0) + len(stealth_out) + event = agent_pb2.SkillEvent( + session_id=session_id, + task_id=sess.get("active_task") or "", + terminal_out=stealth_out + ) + on_event(agent_pb2.ClientTaskMessage(skill_event=event)) + + # EDGE INTELLIGENCE: Proactively signal prompt detection + # We only check for prompts if we are actively running a task and haven't found the marker yet. + if active_tid and not sess["event"].is_set(): + import re + tail = sess["tail_buffer"][-100:] if len(sess["tail_buffer"]) > 100 else sess["tail_buffer"] + for pattern in self.PROMPT_PATTERNS: + if re.search(pattern, tail): + # Send specific prompt signal + # Use last 20 chars as the 'prompt' hint + p_hint = tail[-20:].strip() + prompt_event = agent_pb2.SkillEvent( + session_id=session_id, + task_id=active_tid, + prompt=p_hint + ) + on_event(agent_pb2.ClientTaskMessage(skill_event=prompt_event)) + break + except (EOFError, OSError): + break + + # Thread Cleanup + print(f" [๐Ÿš] Shell Session Terminated: {session_id}") + with self.lock: + self.sessions.pop(session_id, None) + + t = threading.Thread(target=reader, daemon=True, name=f"ShellReader-{session_id}") + t.start() + sess["thread"] = t + + self.sessions[session_id] = sess + return sess + + + def handle_transparent_tty(self, task, on_complete, on_event=None): + """Processes raw TTY/Resize events synchronously (bypasses threadpool/sandbox).""" + cmd = task.payload_json + session_id = task.session_id or "default-session" + try: + import json + if cmd.startswith('{') and cmd.endswith('}'): + raw_payload = json.loads(cmd) + + # 1. Raw Keystroke forward + if isinstance(raw_payload, dict) and "tty" in raw_payload: + raw_bytes = raw_payload["tty"] + sess = self._ensure_session(session_id, None, on_event) + os.write(sess["fd"], raw_bytes.encode("utf-8")) + on_complete(task.task_id, {"stdout": "", "status": 0}, task.trace_id) + return True + + # 2. Window Resize + if isinstance(raw_payload, dict) and raw_payload.get("action") == "resize": + cols = raw_payload.get("cols", 80) + rows = raw_payload.get("rows", 24) + sess = self._ensure_session(session_id, None, on_event) + import termios, struct, fcntl + s = struct.pack('HHHH', rows, cols, 0, 0) + fcntl.ioctl(sess["fd"], termios.TIOCSWINSZ, s) + print(f" [๐Ÿš] Terminal Resized to {cols}x{rows}") + on_complete(task.task_id, {"stdout": f"resized to {cols}x{rows}", "status": 0}, task.trace_id) + return True + except Exception as pe: + print(f" [๐Ÿš] Transparent TTY Fail: {pe}") + return False + + def execute(self, task, sandbox, on_complete, on_event=None): + """Dispatches command string to the persistent PTY shell and WAITS for completion.""" + session_id = task.session_id or "default-session" + tid = task.task_id + try: + cmd = task.payload_json + + # --- Legacy Full-Command Execution (Sandboxed) --- + allowed, status_msg = sandbox.verify(cmd) + if not allowed: + err_msg = f"\r\n[System] Command blocked: {status_msg}\r\n" + if on_event: + event = agent_pb2.SkillEvent( + session_id=session_id, task_id=tid, + terminal_out=err_msg + ) + on_event(agent_pb2.ClientTaskMessage(skill_event=event)) + + return on_complete(tid, {"stderr": f"SANDBOX_VIOLATION: {status_msg}", "status": 2}, task.trace_id) + + # Resolve CWD jail + cwd = None + if self.sync_mgr and task.session_id: + cwd = self.sync_mgr.get_session_dir(task.session_id) + elif sandbox.policy.get("WORKING_DIR_JAIL"): + cwd = sandbox.policy["WORKING_DIR_JAIL"] + if not os.path.exists(cwd): + try: os.makedirs(cwd, exist_ok=True) + except: pass + + # Handle Session Persistent Process + sess = self._ensure_session(session_id, cwd, on_event) + + # Check for RAW mode first (bypasses busy check for interactive control) + is_raw = cmd.startswith("!RAW:") + if is_raw: + input_str = cmd[5:] + "\n" + print(f" [๐ŸšโŒจ๏ธ] RAW Input Injection: {input_str.strip()}") + os.write(sess["fd"], input_str.encode("utf-8")) + return on_complete(tid, {"stdout": "INJECTED", "status": 1}, task.trace_id) + + # --- 0. Busy Check: Serialize access to the PTY for standard commands --- + with self.lock: + if sess.get("active_task"): + curr_tid = sess.get("active_task") + return on_complete(tid, {"stderr": f"[BUSY] Session {session_id} is already running task {curr_tid}", "status": 2}, task.trace_id) + + # --- Blocking Wait Logic --- + # --- Blocking Wait Logic --- + marker_id = int(time.time()) + marker = f"__CORTEX_FIN_SH_{marker_id}__" + event = threading.Event() + result_container = {"stdout": "", "status": 1} # 1 = Success by default (node.py convention) + + # Register waiter in session state + with self.lock: + sess["active_task"] = tid + sess["marker"] = marker + sess["event"] = event + # Create a persistent tempfile for stdout instead of RAM buffer + sess["buffer_file"] = tempfile.NamedTemporaryFile("w+", encoding="utf-8", prefix=f"cortex_task_{tid}_", delete=False) + sess["tail_buffer"] = "" + sess["result"] = result_container + sess["cancel_event"] = threading.Event() + + # Input injection: execute command then echo marker and exit code + try: + # 12-factor bash: ( cmd ) ; echo marker $? + # We use "" concatenation in the echo command to ensure the marker literal + # DOES NOT appear in the PTY input echo, preventing premature completion. + full_input = f"({cmd}) ; echo \"__CORTEX_FIN_SH_\"\"{marker_id}__\" $?\n" + os.write(sess["fd"], full_input.encode("utf-8")) + + # Wait for completion (triggered by reader) OR cancellation + timeout = (task.timeout_ms / 1000.0) if task.timeout_ms > 0 else 60.0 + start_time = time.time() + while time.time() - start_time < timeout: + # Check for completion (reader found marker) + if event.is_set(): + return on_complete(tid, result_container, task.trace_id) + + # Check for cancellation (HUB sent cancel) + if sess["cancel_event"].is_set(): + print(f" [๐Ÿš๐Ÿ›‘] Task {tid} cancelled on node.") + return on_complete(tid, {"stderr": "ABORTED", "status": 2}, task.trace_id) + + # Sleep slightly to avoid busy loop + time.sleep(0.1) + + # Timeout Case + print(f" [๐Ÿšโš ๏ธ] Task {tid} timed out on node.") + with self.lock: + if sess.get("buffer_file"): + try: + sess["buffer_file"].seek(0, 2) + file_len = sess["buffer_file"].tell() + HEAD, TAIL = 10_000, 30_000 + if file_len > HEAD + TAIL: + sess["buffer_file"].seek(0) + head_str = sess["buffer_file"].read(HEAD) + sess["buffer_file"].seek(file_len - TAIL) + tail_str = sess["buffer_file"].read() + omitted = file_len - HEAD - TAIL + partial_out = head_str + f"\n\n[... {omitted:,} bytes omitted (full timeout output saved to {sess['buffer_file'].name}) ...]\n\n" + tail_str + else: + sess["buffer_file"].seek(0) + partial_out = sess["buffer_file"].read() + except: + partial_out = "" + else: + partial_out = "" + + on_complete(tid, {"stdout": partial_out, "stderr": "TIMEOUT", "status": 2}, task.trace_id) + + finally: + # Cleanup session task state + with self.lock: + if sess.get("active_task") == tid: + if sess.get("buffer_file"): + try: + sess["buffer_file"].close() + except: pass + sess["buffer_file"] = None + sess["active_task"] = None + sess["marker"] = None + sess["event"] = None + sess["result"] = None + sess["cancel_event"] = None + + except Exception as e: + print(f" [๐ŸšโŒ] Execute Error for {tid}: {e}") + on_complete(tid, {"stderr": str(e), "status": 2}, task.trace_id) + + def cancel(self, task_id: str): + """Cancels an active task โ€” for persistent shell, this sends a SIGINT (Ctrl+C).""" + with self.lock: + for sid, sess in self.sessions.items(): + if sess.get("active_task") == task_id: + print(f"[๐Ÿ›‘] Sending SIGINT (Ctrl+C) to shell session (Task {task_id}): {sid}") + # Write \x03 (Ctrl+C) to the master FD + os.write(sess["fd"], b"\x03") + # Break the wait loop in execute thread + if sess.get("cancel_event"): + sess["cancel_event"].set() + return True + + + def shutdown(self): + """Cleanup: Terminates all persistent shells.""" + with self.lock: + for sid, sess in list(self.sessions.items()): + print(f"[๐Ÿ›‘] Cleaning up persistent shell: {sid}") + try: os.close(sess["fd"]) + except: pass + # kill pid + try: os.kill(sess["pid"], 9) + except: pass + self.sessions.clear() diff --git a/skills/mesh-wait-tasks/SKILL.md b/skills/mesh-wait-tasks/SKILL.md new file mode 100644 index 0000000..e1f2345 --- /dev/null +++ b/skills/mesh-wait-tasks/SKILL.md @@ -0,0 +1,36 @@ +--- +name: mesh_wait_tasks +emoji: "โณ" +description: Smartly poll or wait for background tasks. +skill_type: remote_grpc +is_enabled: true +features: +- chat +- swarm_control +config: + parameters: + type: object + properties: + task_map: + type: object + additionalProperties: + type: string + description: Map of node_id -> task_id for tasks to wait on. + timeout: + type: integer + description: How much longer to wait in seconds. Default 30. + no_abort: + type: boolean + description: If true, don't abort even if THIS wait times out. Default false. + required: + - task_map +is_system: true +--- + +# Documentation + +Allows the orchestrator to poll the status of background tasks that were started with `no_abort=True`. + +# AI Instructions + +Wait for 'TIMEOUT_PENDING' tasks. This uses an AI sub-agent to monitor progress. It will return as soon as the sub-agent detects completion. diff --git a/skills/voice-interaction-handler/SKILL.md b/skills/voice-interaction-handler/SKILL.md new file mode 100644 index 0000000..d06f903 --- /dev/null +++ b/skills/voice-interaction-handler/SKILL.md @@ -0,0 +1,27 @@ +--- +name: voice_interaction_handler +emoji: "๐ŸŽค" +description: Handle real-time voice interruptions, tone analysis, and speech-to-speech + feedback loops. +skill_type: local +is_enabled: true +features: +- voice +config: + interaction_mode: speech-to-speech + latency_target: 300 + parameters: + type: object + properties: + mode: + type: string + enum: + - active + - passive + description: Voice interaction mode. +is_system: true +--- + +# Voice Interaction Handler + +You are a voice-first AI. Keep your responses concise and conversational. Focus on natural prosody and handle interruptions gracefully. diff --git a/ui/client-app/src/hooks/useVoiceChat.js b/ui/client-app/src/hooks/useVoiceChat.js index 7c53f3d..9f13faa 100644 --- a/ui/client-app/src/hooks/useVoiceChat.js +++ b/ui/client-app/src/hooks/useVoiceChat.js @@ -355,6 +355,11 @@ } }; + if (!text || !text.trim()) { + console.warn("No text to synthesize, skipping audio playback."); + return; + } + await streamSpeech(text, onChunkReceived, onStreamDone, localActivePrefs.tts); } catch (err) { @@ -443,6 +448,8 @@ } }; + if (!text || !text.trim()) return; + await streamSpeech(text, onData, onDone, localActivePrefs.tts); } catch (err) { console.error("Manual synthesis failed", err); diff --git a/ui/client-app/src/index.css b/ui/client-app/src/index.css index bd6213e..a90ea32 100644 --- a/ui/client-app/src/index.css +++ b/ui/client-app/src/index.css @@ -1,3 +1,47 @@ @tailwind base; @tailwind components; -@tailwind utilities; \ No newline at end of file +@tailwind utilities; + +.markdown-preview h1 { + @apply text-2xl font-bold mb-4 mt-6 text-gray-900 dark:text-white; +} + +.markdown-preview h2 { + @apply text-xl font-bold mb-3 mt-5 text-gray-900 dark:text-white; +} + +.markdown-preview h3 { + @apply text-lg font-bold mb-2 mt-4 text-gray-800 dark:text-gray-100; +} + +.markdown-preview p { + @apply mb-4 leading-relaxed; +} + +.markdown-preview ul { + @apply list-disc list-inside mb-4 ml-4; +} + +.markdown-preview ol { + @apply list-decimal list-inside mb-4 ml-4; +} + +.markdown-preview li { + @apply mb-1; +} + +.markdown-preview code { + @apply bg-gray-100 dark:bg-gray-800 px-1.5 py-0.5 rounded-md font-mono text-[0.85em] text-indigo-600 dark:text-indigo-400 border border-gray-200 dark:border-gray-700; +} + +.markdown-preview pre { + @apply bg-gray-900 border border-gray-800 rounded-xl p-4 mb-6 overflow-x-auto; +} + +.markdown-preview pre code { + @apply bg-transparent border-none p-0 text-gray-300; +} + +.markdown-preview strong { + @apply font-black text-gray-900 dark:text-white; +} \ No newline at end of file diff --git a/ui/client-app/src/pages/SkillsPage.js b/ui/client-app/src/pages/SkillsPage.js index 2738d32..29e3586 100644 --- a/ui/client-app/src/pages/SkillsPage.js +++ b/ui/client-app/src/pages/SkillsPage.js @@ -1,10 +1,15 @@ -import React, { useState, useEffect } from 'react'; +import React, { useState, useEffect, useMemo } from 'react'; +import ReactMarkdown from 'react-markdown'; import { getSkills, createSkill, updateSkill, deleteSkill } from '../services/apiService'; export default function SkillsPage({ user, Icon }) { const [skills, setSkills] = useState([]); const [loading, setLoading] = useState(true); const [error, setError] = useState(null); + const [searchQuery, setSearchQuery] = useState(''); + const [activeFilter, setActiveFilter] = useState('all'); // all, system, mine, group + const [viewingDoc, setViewingDoc] = useState(null); + const [showRawDoc, setShowRawDoc] = useState(false); const [isModalOpen, setIsModalOpen] = useState(false); const [editingSkill, setEditingSkill] = useState(null); @@ -17,8 +22,11 @@ is_system: false, system_prompt: '', is_enabled: true, - features: ['chat'] + features: ['chat'], + extra_metadata: { emoji: 'โš™๏ธ' }, + preview_markdown: '' }); + const [showAdvanced, setShowAdvanced] = useState(false); const fetchSkills = async () => { try { @@ -37,6 +45,27 @@ fetchSkills(); }, []); + const filteredSkills = useMemo(() => { + return skills.filter(skill => { + const matchesSearch = skill.name.toLowerCase().includes(searchQuery.toLowerCase()) || + (skill.description || '').toLowerCase().includes(searchQuery.toLowerCase()); + + if (!matchesSearch) return false; + if (activeFilter === 'all') return true; + if (activeFilter === 'system') return skill.is_system; + if (activeFilter === 'mine') return !skill.is_system && skill.owner_id === user?.id; + if (activeFilter === 'group') return skill.group_id && !skill.is_system; + return true; + }); + }, [skills, searchQuery, activeFilter, user]); + + const stats = useMemo(() => ({ + total: skills.length, + system: skills.filter(s => s.is_system).length, + mine: skills.filter(s => !s.is_system && s.owner_id === user?.id).length, + enabled: skills.filter(s => s.is_enabled).length + }), [skills, user]); + const openModal = (skill = null) => { if (skill) { setEditingSkill(skill); @@ -48,7 +77,9 @@ is_system: skill.is_system, system_prompt: skill.system_prompt || '', is_enabled: skill.is_enabled ?? true, - features: skill.features || ['chat'] + features: skill.features || ['chat'], + extra_metadata: skill.extra_metadata || { emoji: 'โš™๏ธ' }, + preview_markdown: skill.preview_markdown || '' }); } else { setEditingSkill(null); @@ -60,14 +91,16 @@ is_system: false, system_prompt: '', is_enabled: true, - features: ['chat'] + features: ['chat'], + extra_metadata: { emoji: 'โš™๏ธ' }, + preview_markdown: '' }); } setIsModalOpen(true); }; const handleClone = (skill) => { - setEditingSkill(null); // Force it to act like a 'Create' + setEditingSkill(null); setFormData({ name: `${skill.name}_clone`, description: skill.description || '', @@ -76,7 +109,9 @@ is_system: false, system_prompt: skill.system_prompt || '', is_enabled: true, - features: skill.features || ['chat'] + features: skill.features || ['chat'], + extra_metadata: skill.extra_metadata || { emoji: 'โš™๏ธ' }, + preview_markdown: skill.preview_markdown || '' }); setIsModalOpen(true); }; @@ -84,6 +119,8 @@ const closeModal = () => { setIsModalOpen(false); setEditingSkill(null); + setViewingDoc(null); + setShowRawDoc(false); }; const handleSave = async () => { @@ -97,14 +134,8 @@ } const payload = { - name: formData.name, - description: formData.description, - skill_type: formData.skill_type, - config: configObj, - is_system: formData.is_system, - system_prompt: formData.system_prompt, - is_enabled: formData.is_enabled, - features: formData.features + ...formData, + config: configObj }; if (editingSkill) { @@ -131,236 +162,392 @@ const isAdmin = user?.role === 'admin'; + // --- Components --- + + const SidebarItem = ({ id, label, icon, count, active }) => ( + + ); + return ( -
-
-
-

- Skills & Workflows +
+ {/* --- Sidebar --- */} +
+
+

+ Cortex Skills

-

- Create, manage, and share AI capabilities and workflows. -

+

Foundational Layer

- + +
+ + + + s.group_id).length} active={activeFilter === 'group'} /> +
+ +
+ +
-
- {loading ? ( -
-
+ {/* --- Main Content --- */} +
+ {/* --- Top Navbar --- */} +
+
+ + setSearchQuery(e.target.value)} + className="w-full bg-gray-100/50 dark:bg-gray-700/50 border-none rounded-2xl py-3 pl-12 pr-4 focus:ring-2 focus:ring-indigo-500 outline-none transition-all placeholder-gray-400 text-sm" + />
- ) : error ? ( -
{error}
- ) : ( -
- {skills.map((skill) => ( -
-
-

- {skill.name} - {skill.is_system && ( - System - )} - {skill.group_id && !skill.is_system && ( - Group - )} - {!skill.is_enabled && ( - Disabled - )} -

-
-
+ + {/* --- Skill Grid --- */} +
+ {loading ? ( +
+
+

Syncing

+
+ ) : error ? ( +
+
+ +
+

Access Denied

+

{error}

+
+ ) : ( +
+ {filteredSkills.map((skill) => ( +
+
+
+ {skill.extra_metadata?.emoji || "โš™๏ธ"} +
+
+

+ {skill.name} +

+
+ {skill.is_system && ( + Core + )} + {skill.skill_type} +
+
+
+

+ {skill.description || "No manifesto defined for this skill."} +

+ +
+ {(skill.features || []).map(f => ( + + {f} + + ))} +
+ +
+
+ {(isAdmin || skill.owner_id === user?.id) && ( + + )} + + {(isAdmin || skill.owner_id === user?.id) && ( + + )} +
+ + - {(isAdmin || skill.owner_id === user?.id) && ( - <> - - - +
+
+ ))} +
+ )} +
+
+ + {/* --- Modals (Logic Edit / Docs View) --- */} + {(isModalOpen || viewingDoc) && ( +
+
+ {/* Modal Header */} +
+
+
+ {isModalOpen ? (formData.extra_metadata?.emoji || "โš™๏ธ") : (viewingDoc?.extra_metadata?.emoji || "๐Ÿ“š")} +
+
+

+ {isModalOpen + ? (editingSkill ? `Engineering: ${formData.name}` : "Create New Pattern") + : `Documentation: ${viewingDoc?.name}` + } +

+
+

+ Skill Library Protocol + +

+ {isModalOpen && ( + )}
- -

- {skill.description || "No description provided."} -

- -
- {(skill.features || []).map(f => ( - - {f} - - ))} -
- -
- - TYPE: {skill.skill_type} - - Created: {new Date(skill.created_at).toLocaleDateString()} -
- ))} - {skills.length === 0 && ( -
- -

No skills found.

-

Create your first skill to extend the AI's capabilities.

-
- )} -
- )} -
- - {isModalOpen && ( -
-
-
-

- - {editingSkill ? 'Edit Skill Configuration' : 'Create New Skill'} -

-
-
-
-
- - setFormData({ ...formData, name: e.target.value })} - className="w-full bg-gray-50 dark:bg-gray-700 border border-gray-300 dark:border-gray-600 rounded-lg px-4 py-2 focus:ring-2 focus:ring-indigo-500 focus:border-transparent outline-none transition-all placeholder-gray-400" - placeholder="e.g. github_search" - /> -
-
- - -

- {formData.skill_type === 'local' && "Runs directly on the Cortex Hub server."} - {formData.skill_type === 'mcp' && "Connects to an external Model Context Protocol server."} - {formData.skill_type === 'remote_grpc' && "Dispatches commands to an attached Agent Node via gRPC."} -

-
-
- -
- -