diff --git a/agent-node/Dockerfile b/agent-node/Dockerfile index 492cba7..9b147d6 100644 --- a/agent-node/Dockerfile +++ b/agent-node/Dockerfile @@ -41,4 +41,4 @@ COPY . . # Run the node -CMD ["python", "-m", "agent_node.main"] +CMD ["python", "src/agent_node/main.py"] diff --git a/agent-node/README.md b/agent-node/README.md new file mode 100644 index 0000000..52a82b8 --- /dev/null +++ b/agent-node/README.md @@ -0,0 +1,26 @@ +# Cortex Agent Node + +This bundle contains the **Cortex Agent Node**, lightweight, modular software that securely connects physical computing resources to the Cortex Hub. + +## Directory Structure +``` +. +├── bootstrap_installer.py # Zero-dependency daemon/upgrade installer +├── install_service.py # Background service utilities (macOS/Linux) +├── requirements.txt # Core dependencies +├── src/ # Primary source code +│ ├── agent_node/ # Hub communication & task execution logic +│ ├── protos/ # gRPC protocol stubs +│ └── shared_core/ # Cross-module shared utilities +``` + +## Running the Node + +### Automated Background Service (Recommended) +To run the node seamlessly in the background (surviving system reboots): +```bash +python3 bootstrap_installer.py --daemon +``` + +### Foreground Usage +Run `python3 src/agent_node/main.py` directly if you want to watch the logs in your terminal. diff --git a/agent-node/agent_node/__init__.py b/agent-node/agent_node/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/agent-node/agent_node/__init__.py +++ /dev/null diff --git a/agent-node/agent_node/config.py b/agent-node/agent_node/config.py deleted file mode 100644 index f321761..0000000 --- a/agent-node/agent_node/config.py +++ /dev/null @@ -1,63 +0,0 @@ -import os -import platform -import yaml - -# Path to the generated config file in the bundled distribution -CONFIG_PATH = "agent_config.yaml" - -# Default values -_defaults = { - "node_id": "agent-node-007", - "node_description": "Modular Stateful Node", - "hub_url": "https://ai.jerxie.com", - "grpc_endpoint": "localhost:50051", - "auth_token": os.getenv("AGENT_AUTH_TOKEN", "cortex-secret-shared-key"), - "sync_root": "/tmp/cortex-sync", - "tls": True, - "max_skill_workers": 10, - "health_report_interval": 10, - "auto_update": True, - "update_check_interval": 300, -} - -# 1. Load from YAML if present -_config = _defaults.copy() -if os.path.exists(CONFIG_PATH): - try: - with open(CONFIG_PATH, 'r') as f: - yaml_config = yaml.safe_load(f) or {} - _config.update(yaml_config) - print(f"[*] Loaded node configuration from {CONFIG_PATH}") - except Exception as e: - print(f"[!] Error loading {CONFIG_PATH}: {e}") - -# 2. Override with Environment Variables (12-Factor style) -NODE_ID = os.getenv("AGENT_NODE_ID", _config["node_id"]) -NODE_DESC = os.getenv("AGENT_NODE_DESC", _config["node_description"]) -SERVER_HOST_PORT = os.getenv("GRPC_ENDPOINT", _config["grpc_endpoint"]) # e.g. "ai.jerxie.com:50051" -AUTH_TOKEN = os.getenv("AGENT_AUTH_TOKEN", _config["auth_token"]) -SYNC_DIR = os.getenv("CORTEX_SYNC_DIR", _config["sync_root"]) -TLS_ENABLED = os.getenv("AGENT_TLS_ENABLED", str(_config["tls"])).lower() == 'true' - -HEALTH_REPORT_INTERVAL = int(os.getenv("HEALTH_REPORT_INTERVAL", _config["health_report_interval"])) -MAX_SKILL_WORKERS = int(os.getenv("MAX_SKILL_WORKERS", _config["max_skill_workers"])) - -DEBUG_GRPC = os.getenv("DEBUG_GRPC", "false").lower() == "true" -SECRET_KEY = os.getenv("AGENT_SECRET_KEY", _config.get("secret_key", "dev-secret-key-1337")) - -# Auto-update settings -HUB_URL = os.getenv("AGENT_HUB_URL", _config.get("hub_url", "https://ai.jerxie.com")) -AUTO_UPDATE = os.getenv("AGENT_AUTO_UPDATE", str(_config.get("auto_update", True))).lower() == "true" -UPDATE_CHECK_INTERVAL = int(os.getenv("AGENT_UPDATE_CHECK_INTERVAL", _config.get("update_check_interval", 300))) - -# These are still available but likely replaced by AUTH_TOKEN / TLS_ENABLED logic -CERT_CA = os.getenv("CERT_CA", "certs/ca.crt") -CERT_CLIENT_CRT = os.getenv("CERT_CLIENT_CRT", "certs/client.crt") -CERT_CLIENT_KEY = os.getenv("CERT_CLIENT_KEY", "certs/client.key") - -# FS Explorer root - Priority: Env > YAML > HOME > / -FS_ROOT = os.getenv("CORTEX_FS_ROOT", _config.get("fs_root")) -if not FS_ROOT: - # Default to user home on unix systems to avoid showing root directory by default - FS_ROOT = os.path.expanduser("~") if platform.system() in ["Darwin", "Linux"] else "C:\\" - diff --git a/agent-node/agent_node/core/__init__.py b/agent-node/agent_node/core/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/agent-node/agent_node/core/__init__.py +++ /dev/null diff --git a/agent-node/agent_node/core/sandbox.py b/agent-node/agent_node/core/sandbox.py deleted file mode 100644 index 8fcfed5..0000000 --- a/agent-node/agent_node/core/sandbox.py +++ /dev/null @@ -1,32 +0,0 @@ -from protos import agent_pb2 - -class SandboxEngine: - """Core Security Engine for Local Command Verification.""" - def __init__(self): - self.policy = None - - def sync(self, p): - """Syncs the latest policy from the Orchestrator.""" - self.policy = { - "MODE": "STRICT" if p.mode == agent_pb2.SandboxPolicy.STRICT else "PERMISSIVE", - "ALLOWED": list(p.allowed_commands), - "DENIED": list(p.denied_commands), - "SENSITIVE": list(p.sensitive_commands), - "WORKING_DIR_JAIL": p.working_dir_jail - } - - def verify(self, command_str): - """Verifies if a command string is allowed under the current policy.""" - if not self.policy: return False, "No Policy" - - parts = (command_str or "").strip().split() - if not parts: return False, "Empty" - - base_cmd = parts[0] - if base_cmd in self.policy["DENIED"]: - return False, f"Forbidden command: {base_cmd}" - - if self.policy["MODE"] == "STRICT" and base_cmd not in self.policy["ALLOWED"]: - return False, f"Command '{base_cmd}' not whitelisted" - - return True, "OK" diff --git a/agent-node/agent_node/core/sync.py b/agent-node/agent_node/core/sync.py deleted file mode 100644 index f130762..0000000 --- a/agent-node/agent_node/core/sync.py +++ /dev/null @@ -1,119 +0,0 @@ -import os -import hashlib -from agent_node.config import SYNC_DIR -from protos import agent_pb2 - -class NodeSyncManager: - """Handles local filesystem synchronization on the Agent Node.""" - def __init__(self, base_sync_dir=SYNC_DIR): - self.base_sync_dir = base_sync_dir - if not os.path.exists(self.base_sync_dir): - os.makedirs(self.base_sync_dir, exist_ok=True) - - def get_session_dir(self, session_id: str, create: bool = False) -> str: - """Returns the unique identifier directory for this session's sync.""" - path = os.path.join(self.base_sync_dir, session_id) - if create: - os.makedirs(path, exist_ok=True) - return path - - def purge(self, session_id: str): - """Completely removes a session's sync directory from the node.""" - path = os.path.join(self.base_sync_dir, session_id) - if os.path.exists(path): - import shutil - shutil.rmtree(path) - print(f" [📁🧹] Node sync directory deleted: {session_id}") - - def cleanup_unused_sessions(self, active_session_ids: list): - """Removes any session directories that are no longer active on the server.""" - if not os.path.exists(self.base_sync_dir): - return - - import shutil - active_set = set(active_session_ids) - for session_id in os.listdir(self.base_sync_dir): - if session_id.startswith("session-") and session_id not in active_set: - path = os.path.join(self.base_sync_dir, session_id) - if os.path.isdir(path): - shutil.rmtree(path) - print(f" [📁🧹] Proactively purged unused session directory: {session_id}") - - def handle_manifest(self, session_id: str, manifest: agent_pb2.DirectoryManifest) -> list: - """Compares local files with the server manifest and returns paths needing update.""" - session_dir = self.get_session_dir(session_id, create=True) - print(f"[📁] Reconciling Sync Directory: {session_dir}") - - from shared_core.ignore import CortexIgnore - ignore_filter = CortexIgnore(session_dir) - expected_paths = {f.path for f in manifest.files} - - # 1. Purge extraneous local files and directories (handles Deletions) - for root, dirs, files in os.walk(session_dir, topdown=False): - for name in files: - abs_path = os.path.join(root, name) - rel_path = os.path.relpath(abs_path, session_dir) - if rel_path in [".cortexignore", ".gitignore"]: continue - if rel_path not in expected_paths and not ignore_filter.is_ignored(rel_path): - try: - os.remove(abs_path) - print(f" [📁🗑️] Deleted extraneous local file: {rel_path}") - except Exception as e: - print(f" [⚠️] Failed to delete file {rel_path}: {e}") - - for name in dirs: - abs_path = os.path.join(root, name) - rel_path = os.path.relpath(abs_path, session_dir) - if rel_path not in expected_paths and not ignore_filter.is_ignored(rel_path): - try: - if not os.listdir(abs_path): - os.rmdir(abs_path) - except Exception: - pass - - needs_update = [] - for file_info in manifest.files: - target_path = os.path.join(session_dir, file_info.path) - - if file_info.is_dir: - os.makedirs(target_path, exist_ok=True) - continue - - # File Check - if not os.path.exists(target_path): - needs_update.append(file_info.path) - else: - # Hash comparison - with open(target_path, "rb") as f: - actual_hash = hashlib.sha256(f.read()).hexdigest() - if actual_hash != file_info.hash: - print(f" [⚠️] Drift Detected: {file_info.path} (Local: {actual_hash[:8]} vs Remote: {file_info.hash[:8]})") - needs_update.append(file_info.path) - - return needs_update - - def write_chunk(self, session_id: str, payload: agent_pb2.FilePayload) -> bool: - """Writes a file chunk to the local session directory.""" - session_dir = self.get_session_dir(session_id, create=True) - target_path = os.path.normpath(os.path.join(session_dir, payload.path)) - - if not target_path.startswith(session_dir): - return False # Path traversal guard - - os.makedirs(os.path.dirname(target_path), exist_ok=True) - - mode = "ab" if payload.chunk_index > 0 else "wb" - with open(target_path, mode) as f: - f.write(payload.chunk) - - if payload.is_final and payload.hash: - return self._verify(target_path, payload.hash) - return True - - def _verify(self, path, expected_hash): - with open(path, "rb") as f: - actual = hashlib.sha256(f.read()).hexdigest() - if actual != expected_hash: - print(f"[⚠️] Sync Hash Mismatch for {path}") - return False - return True diff --git a/agent-node/agent_node/core/updater.py b/agent-node/agent_node/core/updater.py deleted file mode 100644 index 7b9562b..0000000 --- a/agent-node/agent_node/core/updater.py +++ /dev/null @@ -1,134 +0,0 @@ -""" -Auto-Update Trigger for Cortex Agent Node. - -Detects when the running agent is behind the hub's version and -delegates to bootstrap_installer.py to perform the update — the same -program used for Day 0 installation. - -Both bootstrap and version bump follow the exact same code path: - bootstrap_installer.py → download → extract → install deps → launch - -Channel: Stable HTTP REST only. No gRPC/proto. This contract is frozen. -""" - -import os -import sys -import time -import json -import logging -import threading -import subprocess -import urllib.request - -logger = logging.getLogger(__name__) - -_HUB_HTTP_URL: str = "" -_AUTH_TOKEN: str = "" -_CHECK_INTERVAL_SECS: int = 300 - -# bootstrap_installer.py lives at the agent-node root (two levels up from here) -_AGENT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) -_VERSION_FILE = os.path.join(_AGENT_ROOT, "VERSION") -_BOOTSTRAPPER = os.path.join(_AGENT_ROOT, "bootstrap_installer.py") - - -def _read_local_version() -> str: - try: - with open(_VERSION_FILE) as f: - return f.read().strip() - except FileNotFoundError: - return "0.0.0" - - -def _fetch_remote_version() -> str | None: - url = f"{_HUB_HTTP_URL}/api/v1/agent/version" - try: - req = urllib.request.Request(url, headers={"X-Agent-Token": _AUTH_TOKEN}) - with urllib.request.urlopen(req, timeout=10) as resp: - return json.loads(resp.read().decode()).get("version") - except Exception as e: - logger.warning(f"[Updater] Version check failed: {e}") - return None - - -def _version_tuple(v: str): - try: - return tuple(int(x) for x in v.split(".")) - except Exception: - return (0, 0, 0) - - -def _apply_update_via_bootstrapper(): - """ - Delegates to bootstrap_installer.py --update-only — the same code path - as Day 0 installation — then restarts this process. - Does not return on success. - """ - if not os.path.exists(_BOOTSTRAPPER): - logger.error(f"[Updater] bootstrap_installer.py not found at {_BOOTSTRAPPER}") - return False - - logger.info("[Updater] ⬇️ Delegating update to bootstrap_installer.py ...") - result = subprocess.run( - [sys.executable, _BOOTSTRAPPER, - "--hub", _HUB_HTTP_URL, - "--token", _AUTH_TOKEN, - "--update-only", - "--install-dir", _AGENT_ROOT], - cwd=_AGENT_ROOT - ) - - if result.returncode == 0: - logger.info("[Updater] ✅ Update applied. Restarting agent process...") - sys.stdout.flush() - sys.stderr.flush() - os.execv(sys.executable, [sys.executable] + sys.argv) # in-place restart, no return - else: - logger.error(f"[Updater] bootstrap_installer.py failed (exit {result.returncode}). Continuing with current version.") - return False - - -def check_and_update_once(): - """ - Single version check against the hub. If a newer version is available, - triggers bootstrap_installer.py and restarts (does not return if applied). - """ - local = _read_local_version() - logger.info(f"[Updater] Local version: {local}") - - remote = _fetch_remote_version() - if remote is None: - logger.info("[Updater] Hub unreachable — skipping update check.") - return - - logger.info(f"[Updater] Remote version: {remote}") - - if _version_tuple(remote) <= _version_tuple(local): - logger.info("[Updater] ✅ Already up to date.") - return - - logger.info(f"[Updater] 🆕 Update available: {local} → {remote}") - _apply_update_via_bootstrapper() # does not return on success - - -def start_background_updater(): - """Starts a daemon thread that periodically checks for new versions.""" - def _loop(): - while True: - time.sleep(_CHECK_INTERVAL_SECS) - try: - check_and_update_once() - except Exception as e: - logger.error(f"[Updater] Background check error: {e}") - - t = threading.Thread(target=_loop, daemon=True, name="AutoUpdater") - t.start() - logger.info(f"[Updater] Background updater started (interval: {_CHECK_INTERVAL_SECS}s)") - - -def init(hub_http_url: str, auth_token: str, check_interval_secs: int = 300): - """Initialize with hub connection details. Call before any other function.""" - global _HUB_HTTP_URL, _AUTH_TOKEN, _CHECK_INTERVAL_SECS - _HUB_HTTP_URL = hub_http_url.rstrip("/") - _AUTH_TOKEN = auth_token - _CHECK_INTERVAL_SECS = check_interval_secs diff --git a/agent-node/agent_node/core/watcher.py b/agent-node/agent_node/core/watcher.py deleted file mode 100644 index 4946f53..0000000 --- a/agent-node/agent_node/core/watcher.py +++ /dev/null @@ -1,112 +0,0 @@ - -import time -import os -import hashlib -from watchdog.observers import Observer -from watchdog.events import FileSystemEventHandler -from shared_core.ignore import CortexIgnore -from protos import agent_pb2 - -class SyncEventHandler(FileSystemEventHandler): - """Listens for FS events and triggers gRPC delta pushes.""" - def __init__(self, session_id, root_path, callback): - self.session_id = session_id - self.root_path = root_path - self.callback = callback - self.ignore_filter = CortexIgnore(root_path) - self.last_sync = {} # path -> last_hash - self.locked = False - - def on_modified(self, event): - if not event.is_directory: - self._process_change(event.src_path) - - def on_created(self, event): - if not event.is_directory: - self._process_change(event.src_path) - - def on_moved(self, event): - # Simplification: treat move as a delete and create, or just process the dest - self._process_change(event.dest_path) - - def _process_change(self, abs_path): - if self.locked: - return # Block all user edits when session is locked - - rel_path = os.path.normpath(os.path.relpath(abs_path, self.root_path)) - - # Phase 3: Dynamic reload if .cortexignore / .gitignore changed - if rel_path in [".cortexignore", ".gitignore"]: - print(f" [*] Reloading Ignore Filter for {self.session_id}") - self.ignore_filter = CortexIgnore(self.root_path) - - if self.ignore_filter.is_ignored(rel_path): - return - - try: - with open(abs_path, "rb") as f: - content = f.read() - file_hash = hashlib.sha256(content).hexdigest() - - if self.last_sync.get(rel_path) == file_hash: - return # No actual change - - self.last_sync[rel_path] = file_hash - print(f" [📁📤] Detected Change: {rel_path}") - - # Chunk and Send - chunk_size = 64 * 1024 - for i in range(0, len(content), chunk_size): - chunk = content[i:i + chunk_size] - is_final = i + chunk_size >= len(content) - payload = agent_pb2.FilePayload( - path=rel_path, - chunk=chunk, - chunk_index=i // chunk_size, - is_final=is_final, - hash=file_hash if is_final else "" - ) - self.callback(self.session_id, payload) - except Exception as e: - print(f" [!] Watcher Error for {rel_path}: {e}") - -class WorkspaceWatcher: - """Manages FS observers for active synchronization.""" - def __init__(self, callback): - self.callback = callback - self.observers = {} # session_id -> (observer, handler) - - def set_lock(self, session_id, locked=True): - if session_id in self.observers: - print(f"[*] Workspace LOCK for {session_id}: {locked}") - self.observers[session_id][1].locked = locked - - def start_watching(self, session_id, root_path): - if session_id in self.observers: - self.stop_watching(session_id) - - print(f"[*] Starting Watcher for Session {session_id} at {root_path}") - os.makedirs(root_path, exist_ok=True) - - handler = SyncEventHandler(session_id, root_path, self.callback) - observer = Observer() - observer.schedule(handler, root_path, recursive=True) - observer.start() - self.observers[session_id] = (observer, handler) - - def stop_watching(self, session_id): - if session_id in self.observers: - print(f"[*] Stopping Watcher for Session {session_id}") - obs, _ = self.observers[session_id] - obs.stop() - obs.join() - del self.observers[session_id] - - def get_watch_path(self, session_id): - if session_id in self.observers: - return self.observers[session_id][1].root_path - return None - - def shutdown(self): - for sid in list(self.observers.keys()): - self.stop_watching(sid) diff --git a/agent-node/agent_node/main.py b/agent-node/agent_node/main.py deleted file mode 100644 index 02faf4d..0000000 --- a/agent-node/agent_node/main.py +++ /dev/null @@ -1,45 +0,0 @@ -import sys -import os - -# Add root to path to find protos and other packages -sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) - -import signal -from agent_node.node import AgentNode -from agent_node.config import NODE_ID, HUB_URL, AUTH_TOKEN, SECRET_KEY, AUTO_UPDATE, UPDATE_CHECK_INTERVAL -from agent_node.core import updater - -def main(): - print(f"[*] Starting Agent Node: {NODE_ID}...") - - # 0. Auto-Update Check (before anything else — if we're behind, restart now) - if AUTO_UPDATE: - updater.init(hub_http_url=HUB_URL, auth_token=SECRET_KEY, check_interval_secs=UPDATE_CHECK_INTERVAL) - updater.check_and_update_once() # May restart process — does not return if update applied - - # 1. Initialization - node = AgentNode() - - # 2. Signal Handling for Graceful Shutdown - def handle_exit(sig, frame): - node.stop() - sys.exit(0) - - signal.signal(signal.SIGINT, handle_exit) - signal.signal(signal.SIGTERM, handle_exit) - - # Handshake: Sync configuration and Sandbox Policy - node.sync_configuration() - - # 3. Background: Start health reporting (Heartbeats) - node.start_health_reporting() - - # 4. Background: Periodic auto-update checks - if AUTO_UPDATE: - updater.start_background_updater() - - # 5. Foreground: Run Persistent Task Stream - node.run_task_stream() - -if __name__ == '__main__': - main() diff --git a/agent-node/agent_node/node.py b/agent-node/agent_node/node.py deleted file mode 100644 index 4b6efac..0000000 --- a/agent-node/agent_node/node.py +++ /dev/null @@ -1,643 +0,0 @@ -import threading -import queue -import time -import sys -import os -import hashlib -import logging -import psutil -from protos import agent_pb2, agent_pb2_grpc - -logger = logging.getLogger(__name__) -from agent_node.skills.manager import SkillManager -from agent_node.core.sandbox import SandboxEngine -from agent_node.core.sync import NodeSyncManager -from agent_node.core.watcher import WorkspaceWatcher -from agent_node.utils.auth import verify_task_signature -from agent_node.utils.network import get_secure_stub -from agent_node.config import NODE_ID, NODE_DESC, AUTH_TOKEN, HEALTH_REPORT_INTERVAL, MAX_SKILL_WORKERS, DEBUG_GRPC, FS_ROOT - - -class AgentNode: - """The 'Agent Core': Orchestrates Local Skills and Maintains gRPC Connection.""" - def __init__(self, node_id=NODE_ID): - self.node_id = node_id - self.sandbox = SandboxEngine() - self.sync_mgr = NodeSyncManager() - self.skills = SkillManager(max_workers=MAX_SKILL_WORKERS, sync_mgr=self.sync_mgr) - self.watcher = WorkspaceWatcher(self._on_sync_delta) - self.task_queue = queue.Queue() - self.stub = get_secure_stub() - - def _collect_capabilities(self) -> dict: - """Collect hardware metadata to advertise at registration.""" - import platform - import subprocess - import socket - import os - - caps = { - "shell": "v1", - "browser": "playwright-sync-bridge", - "arch": platform.machine(), # e.g. x86_64, arm64, aarch64 - "os": platform.system().lower(), # linux, darwin, windows - "os_release": platform.release(), - } - - # Privilege Detection - # is_root: True if UID 0 (Linux/macOS) — no sudo needed at all - # has_sudo: True if sudo is installed AND available passwordlessly - try: - caps["is_root"] = (os.getuid() == 0) - except AttributeError: - # Windows — os.getuid() doesn't exist; approximate via admin check - try: - import ctypes - caps["is_root"] = bool(ctypes.windll.shell32.IsUserAnAdmin()) - except Exception: - caps["is_root"] = False - - if caps.get("is_root"): - caps["has_sudo"] = False # Root doesn't need sudo - else: - # Check if passwordless sudo is available - try: - r = subprocess.run( - ["sudo", "-n", "true"], - capture_output=True, timeout=3 - ) - caps["has_sudo"] = (r.returncode == 0) - except Exception: - caps["has_sudo"] = False - - # Local IP Detection (best effort) - try: - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.settimeout(0) - # Doesn't even have to be reachable - s.connect(('10.254.254.254', 1)) - caps["local_ip"] = s.getsockname()[0] - s.close() - except Exception: - caps["local_ip"] = "unknown" - - # GPU Detection — try nvidia-smi first, then check for Apple GPU - try: - result = subprocess.run( - ["nvidia-smi", "--query-gpu=name,memory.total", "--format=csv,noheader,nounits"], - capture_output=True, text=True, timeout=5 - ) - if result.returncode == 0 and result.stdout.strip(): - gpu_lines = result.stdout.strip().split("\n") - caps["gpu"] = gpu_lines[0].strip() # e.g. "NVIDIA GeForce RTX 3080, 10240" - caps["gpu_count"] = str(len(gpu_lines)) - else: - caps["gpu"] = "none" - except Exception: - # No nvidia-smi — check if Apple Silicon (arm64 + darwin) - if caps["os"] == "darwin" and "arm" in caps["arch"].lower(): - caps["gpu"] = "apple-silicon" - else: - caps["gpu"] = "none" - - return caps - - def sync_configuration(self): - """Initial handshake to retrieve policy and metadata.""" - print(f"[*] Handshake with Orchestrator: {self.node_id}") - caps = self._collect_capabilities() - print(f"[*] Capabilities: {caps}") - - # Protobuf capabilities is map — all values must be strings - caps_str = {k: str(v).lower() if isinstance(v, bool) else str(v) for k, v in caps.items()} - - reg_req = agent_pb2.RegistrationRequest( - node_id=self.node_id, - auth_token=AUTH_TOKEN, - node_description=NODE_DESC, - capabilities=caps_str - ) - - - try: - res = self.stub.SyncConfiguration(reg_req) - if res.success: - self.sandbox.sync(res.policy) - print("[OK] Sandbox Policy Synced.") - else: - print(f"[!] Rejection: {res.error_message}") - sys.exit(1) - except Exception as e: - print(f"[!] Connection Fail: {e}") - sys.exit(1) - - def start_health_reporting(self): - """Streaming node metrics to the orchestrator for load balancing.""" - def _report(): - while True: - try: - def _gen(): - while True: - ids = self.skills.get_active_ids() - # Collection - cpu = psutil.cpu_percent(interval=1.0) - per_core = psutil.cpu_percent(percpu=True) - - vmem = psutil.virtual_memory() - mem_percent = vmem.percent - - # GB conversion - used_gb = vmem.used / (1024**3) - total_gb = vmem.total / (1024**3) - avail_gb = vmem.available / (1024**3) - - # Freq & Load - try: - freq = psutil.cpu_freq().current - except: - freq = 0 - - try: - load = list(os.getloadavg()) - except: - load = [0.0, 0.0, 0.0] - - yield agent_pb2.Heartbeat( - node_id=self.node_id, - cpu_usage_percent=cpu, - memory_usage_percent=mem_percent, - active_worker_count=len(ids), - max_worker_capacity=MAX_SKILL_WORKERS, - running_task_ids=ids, - cpu_count=psutil.cpu_count(), - memory_used_gb=used_gb, - memory_total_gb=total_gb, - # M6 Fields - cpu_usage_per_core=per_core, - cpu_freq_mhz=freq, - memory_available_gb=avail_gb, - load_avg=load - ) - time.sleep(max(0, HEALTH_REPORT_INTERVAL - 1.0)) - - # Consume the heartbeat stream to keep it alive - for response in self.stub.ReportHealth(_gen()): - # We don't strictly need the server time, but it confirms a round-trip - pass - except Exception as e: - print(f"[!] Health reporting interrupted: {e}. Retrying in 5s...") - time.sleep(5) - - # Non-blocking thread for health heartbeat - threading.Thread(target=_report, daemon=True, name=f"Health-{self.node_id}").start() - - def run_task_stream(self): - """Main Persistent Bi-directional Stream for Task Management with Reconnection.""" - while True: - try: - def _gen(): - # Initial announcement for routing identity - announce_msg = agent_pb2.ClientTaskMessage( - announce=agent_pb2.NodeAnnounce(node_id=self.node_id) - ) - if DEBUG_GRPC: - print(f"[*] [DEBUG-gRPC] OUTBOUND: announce | {announce_msg}", flush=True) - yield announce_msg - - while True: - out_msg = self.task_queue.get() - if DEBUG_GRPC: - kind = out_msg.WhichOneof('payload') - if kind == 'file_sync' and out_msg.file_sync.HasField('file_data'): - print(f"[*] [DEBUG-gRPC] OUTBOUND: {kind} (file_data, path={out_msg.file_sync.file_data.path}, size={len(out_msg.file_sync.file_data.chunk)})", flush=True) - elif kind == 'skill_event' and out_msg.skill_event.WhichOneof('data') == 'terminal_out': - print(f"[*] [DEBUG-gRPC] OUTBOUND: {kind} (terminal_out, size={len(out_msg.skill_event.terminal_out)})", flush=True) - else: - print(f"[*] [DEBUG-gRPC] OUTBOUND: {kind} | {out_msg}", flush=True) - yield out_msg - - responses = self.stub.TaskStream(_gen()) - print(f"[*] Task Stream Online: {self.node_id}", flush=True) - - for msg in responses: - self._process_server_message(msg) - except Exception as e: - print(f"[!] Task Stream Failure: {e}. Reconnecting in 5s...", flush=True) - time.sleep(5) - # Re-sync config in case permissions changed during downtime - try: self.sync_configuration() - except: pass - - def _process_server_message(self, msg): - kind = msg.WhichOneof('payload') - if DEBUG_GRPC: - print(f"[*] [DEBUG-gRPC] INBOUND: {kind} | {msg}", flush=True) - else: - print(f"[*] Inbound: {kind}", flush=True) - - if kind == 'task_request': - self._handle_task(msg.task_request) - - elif kind == 'task_cancel': - if self.skills.cancel(msg.task_cancel.task_id): - self._send_response(msg.task_cancel.task_id, None, agent_pb2.TaskResponse.CANCELLED) - - elif kind == 'work_pool_update': - # Claim logical idle tasks from global pool with slight randomized jitter - # to prevent thundering herd where every node claims the same task at the exact same ms. - if len(self.skills.get_active_ids()) < MAX_SKILL_WORKERS: - for tid in msg.work_pool_update.available_task_ids: - # Deterministic delay based on node_id to distribute claims - import random - time.sleep(random.uniform(0.1, 0.5)) - - self.task_queue.put(agent_pb2.ClientTaskMessage( - task_claim=agent_pb2.TaskClaimRequest(task_id=tid, node_id=self.node_id) - )) - - elif kind == 'claim_status': - status = "GRANTED" if msg.claim_status.granted else "DENIED" - print(f" [📦] Claim {msg.claim_status.task_id}: {status} ({msg.claim_status.reason})", flush=True) - - elif kind == 'file_sync': - self._handle_file_sync(msg.file_sync) - - elif kind == 'policy_update': - print(f" [🔒] Live Sandbox Policy Update Received.") - self.sandbox.sync(msg.policy_update) - - def _on_sync_delta(self, session_id, file_payload): - """Callback from watcher to push local changes to server.""" - self.task_queue.put(agent_pb2.ClientTaskMessage( - file_sync=agent_pb2.FileSyncMessage( - session_id=session_id, - file_data=file_payload - ) - )) - - def _handle_file_sync(self, fs): - """Processes inbound file synchronization messages from the Orchestrator.""" - sid = fs.session_id - # LOGGING - type_str = fs.WhichOneof('payload') - print(f" [📁] Sync MSG: {type_str} | Session: {sid}") - - if fs.HasField("manifest"): - needs_update = self.sync_mgr.handle_manifest(sid, fs.manifest) - if needs_update: - print(f" [📁⚠️] Drift Detected for {sid}: {len(needs_update)} files need sync") - self.task_queue.put(agent_pb2.ClientTaskMessage( - file_sync=agent_pb2.FileSyncMessage( - session_id=sid, - status=agent_pb2.SyncStatus( - code=agent_pb2.SyncStatus.RECONCILE_REQUIRED, - message=f"Drift detected in {len(needs_update)} files", - reconcile_paths=needs_update - ) - ) - )) - else: - self.task_queue.put(agent_pb2.ClientTaskMessage( - file_sync=agent_pb2.FileSyncMessage( - session_id=sid, - status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.OK, message="Synchronized") - ) - )) - elif fs.HasField("file_data"): - success = self.sync_mgr.write_chunk(sid, fs.file_data) - if fs.file_data.is_final: - print(f" [📁] File Received: {fs.file_data.path} (Verified: {success})") - status = agent_pb2.SyncStatus.OK if success else agent_pb2.SyncStatus.ERROR - self.task_queue.put(agent_pb2.ClientTaskMessage( - file_sync=agent_pb2.FileSyncMessage( - session_id=sid, - status=agent_pb2.SyncStatus(code=status, message=f"File {fs.file_data.path} synced") - ) - )) - elif fs.HasField("control"): - ctrl = fs.control - print(f" [📁] Control Action: {ctrl.action} (Path: {ctrl.path})") - if ctrl.action == agent_pb2.SyncControl.START_WATCHING: - # Path relative to sync dir or absolute - watch_path = ctrl.path if os.path.isabs(ctrl.path) else os.path.join(self.sync_mgr.get_session_dir(sid), ctrl.path) - print(f" [📁👁️] Starting Watcher on: {watch_path}") - self.watcher.start_watching(sid, watch_path) - elif ctrl.action == agent_pb2.SyncControl.STOP_WATCHING: - self.watcher.stop_watching(sid) - elif ctrl.action == agent_pb2.SyncControl.LOCK: - self.watcher.set_lock(sid, True) - elif ctrl.action == agent_pb2.SyncControl.UNLOCK: - self.watcher.set_lock(sid, False) - elif ctrl.action == agent_pb2.SyncControl.REFRESH_MANIFEST: - if ctrl.request_paths: - print(f" [📁📤] Pushing {len(ctrl.request_paths)} Requested Files for {sid}") - for path in ctrl.request_paths: - self._push_file(sid, path) - else: - # Node -> Server Manifest Push - self._push_full_manifest(sid, ctrl.path) - elif ctrl.action == agent_pb2.SyncControl.RESYNC: - self._push_full_manifest(sid, ctrl.path) - elif ctrl.action == agent_pb2.SyncControl.PURGE: - print(f" [📁🧹] Node instructed to purge session sync data: {sid}") - self.watcher.stop_watching(sid) # Stop watching before deleting - self.sync_mgr.purge(sid) - elif ctrl.action == agent_pb2.SyncControl.CLEANUP: - print(f" [📁🧹] Node proactively cleaning up defunct sessions. Active: {ctrl.request_paths}") - active_sessions = list(ctrl.request_paths) - self.sync_mgr.cleanup_unused_sessions(active_sessions) - - # --- M6: FS Explorer Handlers --- - elif ctrl.action == agent_pb2.SyncControl.LIST: - print(f" [📁📂] List Directory: {ctrl.path}") - self._push_full_manifest(sid, ctrl.path, task_id=fs.task_id, shallow=True) - elif ctrl.action == agent_pb2.SyncControl.READ: - print(f" [📁📄] Read File: {ctrl.path}") - self._push_file(sid, ctrl.path, task_id=fs.task_id) - elif ctrl.action == agent_pb2.SyncControl.WRITE: - print(f" [📁💾] Write File: {ctrl.path} (is_dir={ctrl.is_dir})") - self._handle_fs_write(sid, ctrl.path, ctrl.content, ctrl.is_dir, task_id=fs.task_id) - elif ctrl.action == agent_pb2.SyncControl.DELETE: - print(f" [📁🗑️] Delete Fragment: {ctrl.path}") - self._handle_fs_delete(sid, ctrl.path, task_id=fs.task_id) - - def _get_base_dir(self, session_id, create=False): - """Helper to resolve the effective root for a session (Watcher > SyncDir).""" - if session_id == "__fs_explorer__": - root = FS_ROOT - print(f" [📁] Explorer Root: {root}") - return root - - # Priority 1: If we have an active watcher, use its root (e.g. Seed from Local) - watched = self.watcher.get_watch_path(session_id) - if watched: - print(f" [📁] Using Watched Path as Base: {watched}") - return watched - - # Priority 2: Standard session-scoped sync directory - fallback = self.sync_mgr.get_session_dir(session_id, create=create) - print(f" [📁] Falling back to SyncDir: {fallback}") - return fallback - - def _push_full_manifest(self, session_id, rel_path=".", task_id="", shallow=False): - """Pushes the current local manifest back to the server.""" - print(f" [📁📤] Pushing {'Shallow' if shallow else 'Full'} Manifest for {session_id}") - - base_dir = self._get_base_dir(session_id, create=True) - - watch_path = os.path.normpath(os.path.join(base_dir, rel_path)) - - if not os.path.exists(watch_path): - # If the specific sub-path doesn't exist, try to create it if it's within the session dir - if session_id != "__fs_explorer__": - os.makedirs(watch_path, exist_ok=True) - else: - self.task_queue.put(agent_pb2.ClientTaskMessage( - file_sync=agent_pb2.FileSyncMessage( - session_id=session_id, - task_id=task_id, - status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.ERROR, message=f"Path {rel_path} not found") - ) - )) - return - - files = [] - try: - if shallow: - # Optimized for Explorer: immediate children only, no hashing - with os.scandir(watch_path) as it: - for entry in it: - is_dir = entry.is_dir() - # Use metadata only - try: - stats = entry.stat() - size = stats.st_size if not is_dir else 0 - except: size = 0 - - # Calculate path relative to the actual base_sync_dir / session_dir - # rel_path is the directory we are currently browsing. - # entry.name is the file within it. - item_rel_path = os.path.relpath(os.path.join(watch_path, entry.name), base_dir) - - files.append(agent_pb2.FileInfo(path=item_rel_path, size=size, hash="", is_dir=is_dir)) - else: - # Deep walk with full hashes for reconciliation - for root, dirs, filenames in os.walk(watch_path): - for filename in filenames: - abs_path = os.path.join(root, filename) - # r_path must be relative to base_dir so the server correctly joins it to the mirror root - r_path = os.path.relpath(abs_path, base_dir) - try: - with open(abs_path, "rb") as f: - h = hashlib.sha256(f.read()).hexdigest() - files.append(agent_pb2.FileInfo(path=r_path, size=os.path.getsize(abs_path), hash=h, is_dir=False)) - except Exception as e: - print(f" [⚠️] Failed to hash {abs_path}: {e}") - - for d in dirs: - abs_path = os.path.join(root, d) - # r_path must be relative to base_dir so the server correctly joins it to the mirror root - r_path = os.path.relpath(abs_path, base_dir) - files.append(agent_pb2.FileInfo(path=r_path, size=0, hash="", is_dir=True)) - except Exception as e: - print(f" [❌] Manifest generation failed for {rel_path}: {e}") - self.task_queue.put(agent_pb2.ClientTaskMessage( - file_sync=agent_pb2.FileSyncMessage( - session_id=session_id, - task_id=task_id, - status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.ERROR, message=str(e)) - ) - )) - return - - self.task_queue.put(agent_pb2.ClientTaskMessage( - file_sync=agent_pb2.FileSyncMessage( - session_id=session_id, - task_id=task_id, - manifest=agent_pb2.DirectoryManifest(root_path=rel_path, files=files) - ) - )) - - def _handle_fs_write(self, session_id, rel_path, content, is_dir, task_id=""): - """Modular FS Write/Create.""" - try: - base_dir = self._get_base_dir(session_id, create=True) - target_path = os.path.normpath(os.path.join(base_dir, rel_path)) - print(f" [📁💾] target_path: {target_path} (base_dir: {base_dir})") - - if not target_path.startswith(base_dir): - raise Exception(f"Path traversal attempt blocked: {target_path} is outside {base_dir}") - - if is_dir: - os.makedirs(target_path, exist_ok=True) - else: - os.makedirs(os.path.dirname(target_path), exist_ok=True) - with open(target_path, "wb") as f: - f.write(content) - - # Send OK status - self.task_queue.put(agent_pb2.ClientTaskMessage( - file_sync=agent_pb2.FileSyncMessage( - session_id=session_id, - task_id=task_id, - status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.OK, message=f"{'Directory' if is_dir else 'File'} written") - ) - )) - # Trigger manifest refresh so UI updates - self._push_full_manifest(session_id, os.path.dirname(rel_path) or ".", task_id=task_id, shallow=True) - except Exception as e: - self.task_queue.put(agent_pb2.ClientTaskMessage( - file_sync=agent_pb2.FileSyncMessage( - session_id=session_id, - task_id=task_id, - status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.ERROR, message=str(e)) - ) - )) - - def _handle_fs_delete(self, session_id, rel_path, task_id=""): - """Modular FS Delete.""" - try: - base_dir = self._get_base_dir(session_id) - - target_path = os.path.normpath(os.path.join(base_dir, rel_path)) - if not target_path.startswith(base_dir): - raise Exception("Path traversal attempt blocked") - - if not os.path.exists(target_path): - raise Exception("File not found") - - import shutil - if os.path.isdir(target_path): - shutil.rmtree(target_path) - else: - os.remove(target_path) - - # Send OK status - self.task_queue.put(agent_pb2.ClientTaskMessage( - file_sync=agent_pb2.FileSyncMessage( - session_id=session_id, - task_id=task_id, - status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.OK, message=f"Deleted {rel_path}") - ) - )) - # Trigger manifest refresh so UI updates - self._push_full_manifest(session_id, os.path.dirname(rel_path) or ".", task_id=task_id, shallow=True) - except Exception as e: - self.task_queue.put(agent_pb2.ClientTaskMessage( - file_sync=agent_pb2.FileSyncMessage( - session_id=session_id, - task_id=task_id, - status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.ERROR, message=str(e)) - ) - )) - - def _push_file(self, session_id, rel_path, task_id=""): - """Pushes a specific file from node to server.""" - watch_path = self._get_base_dir(session_id, create=False) - - abs_path = os.path.normpath(os.path.join(watch_path, rel_path)) - if not abs_path.startswith(watch_path): - print(f" [📁🚫] Blocked traversal attempt in _push_file: {rel_path}") - return - - if not os.path.exists(abs_path): - print(f" [📁❓] Requested file {rel_path} not found on node") - return - - with open(abs_path, "rb") as f: - full_data = f.read() - full_hash = hashlib.sha256(full_data).hexdigest() - f.seek(0) - - index = 0 - while True: - chunk = f.read(1024 * 1024) # 1MB chunks - is_final = len(chunk) < 1024 * 1024 - - self.task_queue.put(agent_pb2.ClientTaskMessage( - file_sync=agent_pb2.FileSyncMessage( - session_id=session_id, - task_id=task_id, - file_data=agent_pb2.FilePayload( - path=rel_path, - chunk=chunk, - chunk_index=index, - is_final=is_final, - hash=full_hash if is_final else "" - ) - ) - )) - - if is_final or not chunk: - break - index += 1 - - def _handle_task(self, task): - print(f"[*] Task Launch: {task.task_id}", flush=True) - # 1. Cryptographic Signature Verification - if not verify_task_signature(task): - print(f"[!] Signature Validation Failed for {task.task_id}", flush=True) - # Report back to hub so the frontend gets a real error, not a silent timeout - self._send_response( - task.task_id, - agent_pb2.TaskResponse( - task_id=task.task_id, - status=agent_pb2.TaskResponse.ERROR, - stderr="[NODE] HMAC signature mismatch — check that AGENT_SECRET_KEY on the node matches the hub SECRET_KEY. Task rejected.", - ) - ) - return - - print(f"[✅] Validated task {task.task_id}", flush=True) - - # 2. Skill Manager Submission - success, reason = self.skills.submit(task, self.sandbox, self._on_finish, self._on_event) - if not success: - print(f"[!] Execution Rejected: {reason}", flush=True) - self._send_response( - task.task_id, - agent_pb2.TaskResponse( - task_id=task.task_id, - status=agent_pb2.TaskResponse.ERROR, - stderr=f"[NODE] Execution Rejected: {reason}", - ) - ) - - def _on_event(self, event): - """Live Event Tunneler: Routes browser/skill events into the main stream.""" - if isinstance(event, agent_pb2.ClientTaskMessage): - self.task_queue.put(event) - else: - # Legacy/Browser Skill fallback - self.task_queue.put(agent_pb2.ClientTaskMessage(browser_event=event)) - - def _on_finish(self, tid, res, trace): - """Final Completion Callback: Routes task results back to server.""" - print(f"[*] Completion: {tid}", flush=True) - # 0 is SUCCESS, 1 is ERROR in Protobuf - status = res.get('status', agent_pb2.TaskResponse.ERROR) - - tr = agent_pb2.TaskResponse( - task_id=tid, status=status, - stdout=res.get('stdout',''), - stderr=res.get('stderr',''), - trace_id=trace, - browser_result=res.get("browser_result") - ) - self._send_response(tid, tr) - - def _send_response(self, tid, tr=None, status=None): - """Utility for placing response messages into the gRPC outbound queue.""" - if tr: - self.task_queue.put(agent_pb2.ClientTaskMessage(task_response=tr)) - else: - self.task_queue.put(agent_pb2.ClientTaskMessage( - task_response=agent_pb2.TaskResponse(task_id=tid, status=status) - )) - - def stop(self): - """Gracefully stops all background services and skills.""" - print(f"\n[🛑] Stopping Agent Node: {self.node_id}") - self.skills.shutdown() - # Optionally close gRPC channel if we want to be very clean - # self.channel.close() diff --git a/agent-node/agent_node/skills/__init__.py b/agent-node/agent_node/skills/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/agent-node/agent_node/skills/__init__.py +++ /dev/null diff --git a/agent-node/agent_node/skills/base.py b/agent-node/agent_node/skills/base.py deleted file mode 100644 index 33c88ec..0000000 --- a/agent-node/agent_node/skills/base.py +++ /dev/null @@ -1,13 +0,0 @@ -class BaseSkill: - """Abstract interface for all Node capabilities (Shell, Browser, etc.).""" - def execute(self, task, sandbox, on_complete, on_event=None): - """Processes the given task and notifies results via callbacks.""" - raise NotImplementedError - - def cancel(self, task_id: str) -> bool: - """Attempts to cancel the task and returns success status.""" - return False - - def shutdown(self): - """Cleanup resources on node exit.""" - pass diff --git a/agent-node/agent_node/skills/manager.py b/agent-node/agent_node/skills/manager.py deleted file mode 100644 index f565b0e..0000000 --- a/agent-node/agent_node/skills/manager.py +++ /dev/null @@ -1,139 +0,0 @@ -import threading -import os -import importlib.util -from concurrent import futures -from agent_node.skills.base import BaseSkill -from agent_node.config import MAX_SKILL_WORKERS - -class SkillManager: - """Orchestrates multiple modular skills and manages the task worker pool.""" - def __init__(self, max_workers=MAX_SKILL_WORKERS, sync_mgr=None): - self.executor = futures.ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="skill-worker") - self.active_tasks = {} # task_id -> future - self.sync_mgr = sync_mgr - self.skills = self._discover_skills(sync_mgr) - self.max_workers = max_workers - self.lock = threading.Lock() - - def _discover_skills(self, sync_mgr): - """Scans the skills/ directory for logic.py and loads skill implementations.""" - # Find candidate locations for skills - # 1. Monorepo root (../../../skills from this file) - # 2. Agent-node local (../../skills from this file) - # 3. Docker standard (/app/skills) - candidates = [ - os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../skills")), - os.path.abspath(os.path.join(os.path.dirname(__file__), "../../skills")), - "/app/skills", - "/app/node_skills" - ] - - skills_dir = None - for cand in candidates: - if os.path.exists(cand) and os.path.isdir(cand): - # Ensure it's not a broken symlink and has actual content - try: - if any(os.path.isdir(os.path.join(cand, d)) for d in os.listdir(cand)): - skills_dir = cand - break - except OSError: - continue - - discovered = {} - if not skills_dir: - print(f" [🔧⚠️] Skills directory not found in candidate locations: {candidates}") - return discovered - - print(f" [🔧] Using skills directory: {skills_dir}") - for skill_dir in os.listdir(skills_dir): - item_path = os.path.join(skills_dir, skill_dir) - if os.path.isdir(item_path): - logic_py = os.path.join(item_path, "logic.py") - if os.path.exists(logic_py): - # Dynamic import - try: - spec = importlib.util.spec_from_file_location(f"skill_{skill_dir}", logic_py) - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) - - # Find the first class that inherits from BaseSkill - for attr_name in dir(module): - attr = getattr(module, attr_name) - if isinstance(attr, type) and issubclass(attr, BaseSkill) and attr is not BaseSkill: - # We map the internal skill name (e.g. mesh_terminal_control) - # if we can find it in the module or assume it based on folder name - # For backward compatibility with task_type routing, we map common ones - instance = attr(sync_mgr=sync_mgr) - discovered[skill_dir] = instance - # Also map legacy names for the routing engine below - if "terminal" in skill_dir or "shell" in skill_dir: - discovered["shell"] = instance - if "browser" in skill_dir: - discovered["browser"] = instance - if "file" in skill_dir: - discovered["file"] = instance - break - except Exception as e: - print(f" [🔧❌] Failed to load skill from {logic_py}: {e}") - - print(f" [🔧] Discovered skills: {list(discovered.keys())}") - return discovered - - def submit(self, task, sandbox, on_complete, on_event=None): - """Routes a task to the appropriate skill and submits it to the thread pool.""" - # --- 0. Transparent TTY Bypass (Gaming Performance) --- - # Keystrokes and Resizes should NEVER wait for a thread or be blocked by sandbox - if "shell" in self.skills and hasattr(self.skills["shell"], "handle_transparent_tty"): - if self.skills["shell"].handle_transparent_tty(task, on_complete, on_event): - return True, "Accepted (Transparent)" - - with self.lock: - if len(self.active_tasks) >= self.max_workers: - return False, "Node Capacity Reached" - - # 1. Routing Engine - skill = None - if task.HasField("browser_action"): - skill = self.skills.get("browser") - elif task.task_type == "file": - skill = self.skills.get("file") - else: - # Default to the one that looks like a shell - skill = self.skills.get("shell") - - if not skill: - return False, f"Target skill not available for task type: {task.task_type}" - - # 2. Execution submission - future = self.executor.submit(skill.execute, task, sandbox, on_complete, on_event) - self.active_tasks[task.task_id] = future - - # Cleanup hook - future.add_done_callback(lambda f: self._cleanup(task.task_id)) - return True, "Accepted" - - def cancel(self, task_id): - """Attempts to cancel an active task through all registered skills.""" - with self.lock: - cancelled = any(s.cancel(task_id) for s in self.skills.values()) - return cancelled - - def get_active_ids(self): - """Returns the list of currently running task IDs.""" - with self.lock: - return list(self.active_tasks.keys()) - - def _cleanup(self, task_id): - """Internal callback to release capacity when a task finishes.""" - with self.lock: - self.active_tasks.pop(task_id, None) - - def shutdown(self): - """Triggers shutdown for all skills and the worker pool.""" - print("[🔧] Shutting down Skill Manager...") - with self.lock: - # Use set to avoid shutting down the same instance multiple times due to alias mapping - for skill in set(self.skills.values()): - skill.shutdown() - # Shutdown thread pool - self.executor.shutdown(wait=True) diff --git a/agent-node/agent_node/utils/__init__.py b/agent-node/agent_node/utils/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/agent-node/agent_node/utils/__init__.py +++ /dev/null diff --git a/agent-node/agent_node/utils/auth.py b/agent-node/agent_node/utils/auth.py deleted file mode 100644 index 202fd4c..0000000 --- a/agent-node/agent_node/utils/auth.py +++ /dev/null @@ -1,28 +0,0 @@ -import jwt -import datetime -import hmac -import hashlib -from protos import agent_pb2 -from agent_node.config import SECRET_KEY - -def create_auth_token(node_id: str) -> str: - """Creates a JWT for node authentication.""" - payload = { - "sub": node_id, - "iat": datetime.datetime.utcnow(), - "exp": datetime.datetime.utcnow() + datetime.timedelta(minutes=10) - } - return jwt.encode(payload, SECRET_KEY, algorithm="HS256") - -def verify_task_signature(task, secret=SECRET_KEY) -> bool: - """Verifies HMAC signature for shell or browser tasks.""" - if task.HasField("browser_action"): - a = task.browser_action - # Aligned with orchestrator's sign_browser_action using the string Name - kind = agent_pb2.BrowserAction.ActionType.Name(a.action) - sign_base = f"{kind}:{a.url}:{a.session_id}" - else: - sign_base = task.payload_json - - expected_sig = hmac.new(secret.encode(), sign_base.encode(), hashlib.sha256).hexdigest() - return hmac.compare_digest(task.signature, expected_sig) diff --git a/agent-node/agent_node/utils/network.py b/agent-node/agent_node/utils/network.py deleted file mode 100644 index 04b97c3..0000000 --- a/agent-node/agent_node/utils/network.py +++ /dev/null @@ -1,38 +0,0 @@ -import grpc -import os -from protos import agent_pb2_grpc -from agent_node.config import SERVER_HOST_PORT, TLS_ENABLED, CERT_CA, CERT_CLIENT_CRT, CERT_CLIENT_KEY - -def get_secure_stub(): - """Initializes a gRPC channel (Secure or Insecure) and returns the orchestrator stub.""" - - options = [ - ('grpc.keepalive_time_ms', 30000), # Send keepalive ping every 30s - ('grpc.keepalive_timeout_ms', 10000), # Wait 10s for pong - ('grpc.keepalive_permit_without_calls', True), - ('grpc.http2.max_pings_without_data', 0), # Allow infinite pings - ('grpc.max_receive_message_length', 128 * 1024 * 1024), - ('grpc.max_send_message_length', 128 * 1024 * 1024), - ] - - if not TLS_ENABLED: - print(f"[!] TLS is disabled. Connecting via insecure channel to {SERVER_HOST_PORT}") - channel = grpc.insecure_channel(SERVER_HOST_PORT, options=options) - return agent_pb2_grpc.AgentOrchestratorStub(channel) - - print(f"[*] Connecting via secure (mTLS) channel to {SERVER_HOST_PORT}") - try: - with open(CERT_CLIENT_KEY, 'rb') as f: pkey = f.read() - with open(CERT_CLIENT_CRT, 'rb') as f: cert = f.read() - with open(CERT_CA, 'rb') as f: ca = f.read() - - creds = grpc.ssl_channel_credentials(ca, pkey, cert) - channel = grpc.secure_channel(SERVER_HOST_PORT, creds, options=options) - return agent_pb2_grpc.AgentOrchestratorStub(channel) - except FileNotFoundError as e: - print(f"[!] mTLS Certificate files not found: {e}. Falling back to standard TLS (Server Verify)...") - # Fallback to standard TLS (uses system CA roots by default) - creds = grpc.ssl_channel_credentials() - channel = grpc.secure_channel(SERVER_HOST_PORT, creds, options=options) - return agent_pb2_grpc.AgentOrchestratorStub(channel) - diff --git a/agent-node/bootstrap_installer.py b/agent-node/bootstrap_installer.py index 35832df..19c8676 100644 --- a/agent-node/bootstrap_installer.py +++ b/agent-node/bootstrap_installer.py @@ -148,7 +148,7 @@ _print(f"ERROR: install_service.py not found at {daemon_script}") sys.exit(1) - entry = os.path.join(install_dir, "agent_node", "main.py") + entry = os.path.join(install_dir, "src", "agent_node", "main.py") _print(f"Launching agent in foreground: {sys.executable} {entry}") sys.stdout.flush() sys.stderr.flush() diff --git a/agent-node/install_service.py b/agent-node/install_service.py index 0c1882e..ee246d2 100755 --- a/agent-node/install_service.py +++ b/agent-node/install_service.py @@ -18,7 +18,7 @@ return sys.executable def get_agent_main_path(): - return os.path.abspath(os.path.join(os.path.dirname(__file__), "agent_node", "main.py")) + return os.path.abspath(os.path.join(os.path.dirname(__file__), "src", "agent_node", "main.py")) def get_working_dir(): return os.path.abspath(os.path.dirname(__file__)) diff --git a/agent-node/protos/__init__.py b/agent-node/protos/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/agent-node/protos/__init__.py +++ /dev/null diff --git a/agent-node/protos/agent.proto b/agent-node/protos/agent.proto deleted file mode 100644 index 46b74fa..0000000 --- a/agent-node/protos/agent.proto +++ /dev/null @@ -1,280 +0,0 @@ -syntax = "proto3"; - -package agent; - -// The Cortex Server exposes this service -service AgentOrchestrator { - // 1. Control Channel: Sync policies and settings (Unary) - rpc SyncConfiguration(RegistrationRequest) returns (RegistrationResponse); - - // 2. Task Channel: Bidirectional work dispatch and reporting (Persistent) - rpc TaskStream(stream ClientTaskMessage) returns (stream ServerTaskMessage); - - // 3. Health Channel: Dedicated Ping-Pong / Heartbeat (Persistent) - rpc ReportHealth(stream Heartbeat) returns (stream HealthCheckResponse); -} - -// --- Channel 1: Registration & Policy --- -message RegistrationRequest { - string node_id = 1; - string version = 2; - string auth_token = 3; - string node_description = 4; // AI-readable description of this node's role - map capabilities = 5; // e.g. "gpu": "nvidia-3080", "os": "ubuntu-22.04" -} - -message SandboxPolicy { - enum Mode { - STRICT = 0; - PERMISSIVE = 1; - } - Mode mode = 1; - repeated string allowed_commands = 2; - repeated string denied_commands = 3; - repeated string sensitive_commands = 4; - string working_dir_jail = 5; -} - -message RegistrationResponse { - bool success = 1; - string error_message = 2; - string session_id = 3; - SandboxPolicy policy = 4; -} - -// --- Channel 2: Tasks & Collaboration --- -message ClientTaskMessage { - oneof payload { - TaskResponse task_response = 1; - TaskClaimRequest task_claim = 2; - BrowserEvent browser_event = 3; - NodeAnnounce announce = 4; // NEW: Identification on stream connect - FileSyncMessage file_sync = 5; // NEW: Ghost Mirror Sync - SkillEvent skill_event = 6; // NEW: Persistent real-time skill data - } -} - -message SkillEvent { - string session_id = 1; - string task_id = 2; - oneof data { - string terminal_out = 3; // Raw stdout/stderr chunks - string prompt = 4; // Interactive prompt (like password) - bool keep_alive = 5; // Session preservation - } -} - -message NodeAnnounce { - string node_id = 1; -} - -message BrowserEvent { - string session_id = 1; - oneof event { - ConsoleMessage console_msg = 2; - NetworkRequest network_req = 3; - } -} - -message ServerTaskMessage { - oneof payload { - TaskRequest task_request = 1; - WorkPoolUpdate work_pool_update = 2; - TaskClaimResponse claim_status = 3; - TaskCancelRequest task_cancel = 4; - FileSyncMessage file_sync = 5; // NEW: Ghost Mirror Sync - SandboxPolicy policy_update = 6; // NEW: Live Policy Update - } -} - -message TaskCancelRequest { - string task_id = 1; -} - -message TaskRequest { - string task_id = 1; - string task_type = 2; - oneof payload { - string payload_json = 3; // For legacy shell/fallback - BrowserAction browser_action = 7; // NEW: Structured Browser Skill - } - int32 timeout_ms = 4; - string trace_id = 5; - string signature = 6; - string session_id = 8; // NEW: Map execution to a sync workspace -} - -message BrowserAction { - enum ActionType { - NAVIGATE = 0; - CLICK = 1; - TYPE = 2; - SCREENSHOT = 3; - GET_DOM = 4; - HOVER = 5; - SCROLL = 6; - CLOSE = 7; - EVAL = 8; - GET_A11Y = 9; - } - ActionType action = 1; - string url = 2; - string selector = 3; - string text = 4; - string session_id = 5; - int32 x = 6; - int32 y = 7; -} - -message TaskResponse { - string task_id = 1; - enum Status { - SUCCESS = 0; - ERROR = 1; - TIMEOUT = 2; - CANCELLED = 3; - } - Status status = 2; - string stdout = 3; - string stderr = 4; - string trace_id = 5; - map artifacts = 6; - - // NEW: Structured Skill Results - oneof result { - BrowserResponse browser_result = 7; - } -} - -message BrowserResponse { - string url = 1; - string title = 2; - bytes snapshot = 3; - string dom_content = 4; - string a11y_tree = 5; - string eval_result = 6; - repeated ConsoleMessage console_history = 7; - repeated NetworkRequest network_history = 8; -} - -message ConsoleMessage { - string level = 1; - string text = 2; - int64 timestamp_ms = 3; -} - -message NetworkRequest { - string method = 1; - string url = 2; - int32 status = 3; - string resource_type = 4; - int64 latency_ms = 5; -} - -message WorkPoolUpdate { - repeated string available_task_ids = 1; -} - -message TaskClaimRequest { - string task_id = 1; - string node_id = 2; -} - -message TaskClaimResponse { - string task_id = 1; - bool granted = 2; - string reason = 3; -} - -// --- Channel 3: Health & Observation --- -message Heartbeat { - string node_id = 1; - float cpu_usage_percent = 2; - float memory_usage_percent = 3; - int32 active_worker_count = 4; - int32 max_worker_capacity = 5; - string status_message = 6; - repeated string running_task_ids = 7; - int32 cpu_count = 8; - float memory_used_gb = 9; - float memory_total_gb = 10; - - // Rich Metrics (M6) - repeated float cpu_usage_per_core = 11; - float cpu_freq_mhz = 12; - float memory_available_gb = 13; - repeated float load_avg = 14; // [1min, 5min, 15min] -} - - -message HealthCheckResponse { - int64 server_time_ms = 1; -} - -// --- Channel 4: Ghost Mirror File Sync --- -message FileSyncMessage { - string session_id = 1; - oneof payload { - DirectoryManifest manifest = 2; - FilePayload file_data = 3; - SyncStatus status = 4; - SyncControl control = 5; - } - string task_id = 6; // NEW: Correlation ID for FS operations -} - -message SyncControl { - enum Action { - START_WATCHING = 0; - STOP_WATCHING = 1; - LOCK = 2; // Server -> Node: Disable user-side edits - UNLOCK = 3; // Server -> Node: Enable user-side edits - REFRESH_MANIFEST = 4; // Server -> Node: Request a full manifest from node - RESYNC = 5; // Server -> Node: Force a hash-based reconciliation - - // FS Operations (Modular Explorer) - LIST = 6; // Server -> Node: List directory contents (returns manifest) - READ = 7; // Server -> Node: Read file content (returns file_data) - WRITE = 8; // Server -> Node: Write/Create file - DELETE = 9; // Server -> Node: Delete file or directory - PURGE = 10; // Server -> Node: Purge local sync directory entirely - CLEANUP = 11; // Server -> Node: Purge any session dirs not in request_paths - } - Action action = 1; - string path = 2; - repeated string request_paths = 3; // NEW: Specific files requested for pull - bytes content = 4; // NEW: For WRITE operation - bool is_dir = 5; // NEW: For TOUCH/WRITE operation -} - -message DirectoryManifest { - string root_path = 1; - repeated FileInfo files = 2; -} - -message FileInfo { - string path = 1; - int64 size = 2; - string hash = 3; // For drift detection - bool is_dir = 4; -} - -message FilePayload { - string path = 1; - bytes chunk = 2; - int32 chunk_index = 3; - bool is_final = 4; - string hash = 5; // Full file hash for verification on final chunk -} - -message SyncStatus { - enum Code { - OK = 0; - ERROR = 1; - RECONCILE_REQUIRED = 2; - IN_PROGRESS = 3; - } - Code code = 1; - string message = 2; - repeated string reconcile_paths = 3; // NEW: Files needing immediate re-sync -} diff --git a/agent-node/protos/agent_pb2.py b/agent-node/protos/agent_pb2.py deleted file mode 100644 index 3075f56..0000000 --- a/agent-node/protos/agent_pb2.py +++ /dev/null @@ -1,96 +0,0 @@ -# -*- coding: utf-8 -*- -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: protos/agent.proto -# Protobuf Python Version: 4.25.1 -"""Generated protocol buffer code.""" -from google.protobuf import descriptor as _descriptor -from google.protobuf import descriptor_pool as _descriptor_pool -from google.protobuf import symbol_database as _symbol_database -from google.protobuf.internal import builder as _builder -# @@protoc_insertion_point(imports) - -_sym_db = _symbol_database.Default() - - - - -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12protos/agent.proto\x12\x05\x61gent\"\xde\x01\n\x13RegistrationRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x12\n\nauth_token\x18\x03 \x01(\t\x12\x18\n\x10node_description\x18\x04 \x01(\t\x12\x42\n\x0c\x63\x61pabilities\x18\x05 \x03(\x0b\x32,.agent.RegistrationRequest.CapabilitiesEntry\x1a\x33\n\x11\x43\x61pabilitiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xc5\x01\n\rSandboxPolicy\x12\'\n\x04mode\x18\x01 \x01(\x0e\x32\x19.agent.SandboxPolicy.Mode\x12\x18\n\x10\x61llowed_commands\x18\x02 \x03(\t\x12\x17\n\x0f\x64\x65nied_commands\x18\x03 \x03(\t\x12\x1a\n\x12sensitive_commands\x18\x04 \x03(\t\x12\x18\n\x10working_dir_jail\x18\x05 \x01(\t\"\"\n\x04Mode\x12\n\n\x06STRICT\x10\x00\x12\x0e\n\nPERMISSIVE\x10\x01\"x\n\x14RegistrationResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nsession_id\x18\x03 \x01(\t\x12$\n\x06policy\x18\x04 \x01(\x0b\x32\x14.agent.SandboxPolicy\"\xa9\x02\n\x11\x43lientTaskMessage\x12,\n\rtask_response\x18\x01 \x01(\x0b\x32\x13.agent.TaskResponseH\x00\x12-\n\ntask_claim\x18\x02 \x01(\x0b\x32\x17.agent.TaskClaimRequestH\x00\x12,\n\rbrowser_event\x18\x03 \x01(\x0b\x32\x13.agent.BrowserEventH\x00\x12\'\n\x08\x61nnounce\x18\x04 \x01(\x0b\x32\x13.agent.NodeAnnounceH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12(\n\x0bskill_event\x18\x06 \x01(\x0b\x32\x11.agent.SkillEventH\x00\x42\t\n\x07payload\"y\n\nSkillEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x0f\n\x07task_id\x18\x02 \x01(\t\x12\x16\n\x0cterminal_out\x18\x03 \x01(\tH\x00\x12\x10\n\x06prompt\x18\x04 \x01(\tH\x00\x12\x14\n\nkeep_alive\x18\x05 \x01(\x08H\x00\x42\x06\n\x04\x64\x61ta\"\x1f\n\x0cNodeAnnounce\x12\x0f\n\x07node_id\x18\x01 \x01(\t\"\x87\x01\n\x0c\x42rowserEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x0b\x63onsole_msg\x18\x02 \x01(\x0b\x32\x15.agent.ConsoleMessageH\x00\x12,\n\x0bnetwork_req\x18\x03 \x01(\x0b\x32\x15.agent.NetworkRequestH\x00\x42\x07\n\x05\x65vent\"\xbc\x02\n\x11ServerTaskMessage\x12*\n\x0ctask_request\x18\x01 \x01(\x0b\x32\x12.agent.TaskRequestH\x00\x12\x31\n\x10work_pool_update\x18\x02 \x01(\x0b\x32\x15.agent.WorkPoolUpdateH\x00\x12\x30\n\x0c\x63laim_status\x18\x03 \x01(\x0b\x32\x18.agent.TaskClaimResponseH\x00\x12/\n\x0btask_cancel\x18\x04 \x01(\x0b\x32\x18.agent.TaskCancelRequestH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12-\n\rpolicy_update\x18\x06 \x01(\x0b\x32\x14.agent.SandboxPolicyH\x00\x42\t\n\x07payload\"$\n\x11TaskCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\xd1\x01\n\x0bTaskRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x11\n\ttask_type\x18\x02 \x01(\t\x12\x16\n\x0cpayload_json\x18\x03 \x01(\tH\x00\x12.\n\x0e\x62rowser_action\x18\x07 \x01(\x0b\x32\x14.agent.BrowserActionH\x00\x12\x12\n\ntimeout_ms\x18\x04 \x01(\x05\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x11\n\tsignature\x18\x06 \x01(\t\x12\x12\n\nsession_id\x18\x08 \x01(\tB\t\n\x07payload\"\xa0\x02\n\rBrowserAction\x12/\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x1f.agent.BrowserAction.ActionType\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x10\n\x08selector\x18\x03 \x01(\t\x12\x0c\n\x04text\x18\x04 \x01(\t\x12\x12\n\nsession_id\x18\x05 \x01(\t\x12\t\n\x01x\x18\x06 \x01(\x05\x12\t\n\x01y\x18\x07 \x01(\x05\"\x86\x01\n\nActionType\x12\x0c\n\x08NAVIGATE\x10\x00\x12\t\n\x05\x43LICK\x10\x01\x12\x08\n\x04TYPE\x10\x02\x12\x0e\n\nSCREENSHOT\x10\x03\x12\x0b\n\x07GET_DOM\x10\x04\x12\t\n\x05HOVER\x10\x05\x12\n\n\x06SCROLL\x10\x06\x12\t\n\x05\x43LOSE\x10\x07\x12\x08\n\x04\x45VAL\x10\x08\x12\x0c\n\x08GET_A11Y\x10\t\"\xe0\x02\n\x0cTaskResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12*\n\x06status\x18\x02 \x01(\x0e\x32\x1a.agent.TaskResponse.Status\x12\x0e\n\x06stdout\x18\x03 \x01(\t\x12\x0e\n\x06stderr\x18\x04 \x01(\t\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x35\n\tartifacts\x18\x06 \x03(\x0b\x32\".agent.TaskResponse.ArtifactsEntry\x12\x30\n\x0e\x62rowser_result\x18\x07 \x01(\x0b\x32\x16.agent.BrowserResponseH\x00\x1a\x30\n\x0e\x41rtifactsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"<\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x0b\n\x07TIMEOUT\x10\x02\x12\r\n\tCANCELLED\x10\x03\x42\x08\n\x06result\"\xdc\x01\n\x0f\x42rowserResponse\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\r\n\x05title\x18\x02 \x01(\t\x12\x10\n\x08snapshot\x18\x03 \x01(\x0c\x12\x13\n\x0b\x64om_content\x18\x04 \x01(\t\x12\x11\n\ta11y_tree\x18\x05 \x01(\t\x12\x13\n\x0b\x65val_result\x18\x06 \x01(\t\x12.\n\x0f\x63onsole_history\x18\x07 \x03(\x0b\x32\x15.agent.ConsoleMessage\x12.\n\x0fnetwork_history\x18\x08 \x03(\x0b\x32\x15.agent.NetworkRequest\"C\n\x0e\x43onsoleMessage\x12\r\n\x05level\x18\x01 \x01(\t\x12\x0c\n\x04text\x18\x02 \x01(\t\x12\x14\n\x0ctimestamp_ms\x18\x03 \x01(\x03\"h\n\x0eNetworkRequest\x12\x0e\n\x06method\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x0e\n\x06status\x18\x03 \x01(\x05\x12\x15\n\rresource_type\x18\x04 \x01(\t\x12\x12\n\nlatency_ms\x18\x05 \x01(\x03\",\n\x0eWorkPoolUpdate\x12\x1a\n\x12\x61vailable_task_ids\x18\x01 \x03(\t\"4\n\x10TaskClaimRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\t\"E\n\x11TaskClaimResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07granted\x18\x02 \x01(\x08\x12\x0e\n\x06reason\x18\x03 \x01(\t\"\xe6\x02\n\tHeartbeat\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x19\n\x11\x63pu_usage_percent\x18\x02 \x01(\x02\x12\x1c\n\x14memory_usage_percent\x18\x03 \x01(\x02\x12\x1b\n\x13\x61\x63tive_worker_count\x18\x04 \x01(\x05\x12\x1b\n\x13max_worker_capacity\x18\x05 \x01(\x05\x12\x16\n\x0estatus_message\x18\x06 \x01(\t\x12\x18\n\x10running_task_ids\x18\x07 \x03(\t\x12\x11\n\tcpu_count\x18\x08 \x01(\x05\x12\x16\n\x0ememory_used_gb\x18\t \x01(\x02\x12\x17\n\x0fmemory_total_gb\x18\n \x01(\x02\x12\x1a\n\x12\x63pu_usage_per_core\x18\x0b \x03(\x02\x12\x14\n\x0c\x63pu_freq_mhz\x18\x0c \x01(\x02\x12\x1b\n\x13memory_available_gb\x18\r \x01(\x02\x12\x10\n\x08load_avg\x18\x0e \x03(\x02\"-\n\x13HealthCheckResponse\x12\x16\n\x0eserver_time_ms\x18\x01 \x01(\x03\"\xe4\x01\n\x0f\x46ileSyncMessage\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x08manifest\x18\x02 \x01(\x0b\x32\x18.agent.DirectoryManifestH\x00\x12\'\n\tfile_data\x18\x03 \x01(\x0b\x32\x12.agent.FilePayloadH\x00\x12#\n\x06status\x18\x04 \x01(\x0b\x32\x11.agent.SyncStatusH\x00\x12%\n\x07\x63ontrol\x18\x05 \x01(\x0b\x32\x12.agent.SyncControlH\x00\x12\x0f\n\x07task_id\x18\x06 \x01(\tB\t\n\x07payload\"\xab\x02\n\x0bSyncControl\x12)\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x19.agent.SyncControl.Action\x12\x0c\n\x04path\x18\x02 \x01(\t\x12\x15\n\rrequest_paths\x18\x03 \x03(\t\x12\x0f\n\x07\x63ontent\x18\x04 \x01(\x0c\x12\x0e\n\x06is_dir\x18\x05 \x01(\x08\"\xaa\x01\n\x06\x41\x63tion\x12\x12\n\x0eSTART_WATCHING\x10\x00\x12\x11\n\rSTOP_WATCHING\x10\x01\x12\x08\n\x04LOCK\x10\x02\x12\n\n\x06UNLOCK\x10\x03\x12\x14\n\x10REFRESH_MANIFEST\x10\x04\x12\n\n\x06RESYNC\x10\x05\x12\x08\n\x04LIST\x10\x06\x12\x08\n\x04READ\x10\x07\x12\t\n\x05WRITE\x10\x08\x12\n\n\x06\x44\x45LETE\x10\t\x12\t\n\x05PURGE\x10\n\x12\x0b\n\x07\x43LEANUP\x10\x0b\"F\n\x11\x44irectoryManifest\x12\x11\n\troot_path\x18\x01 \x01(\t\x12\x1e\n\x05\x66iles\x18\x02 \x03(\x0b\x32\x0f.agent.FileInfo\"D\n\x08\x46ileInfo\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\x0c\n\x04size\x18\x02 \x01(\x03\x12\x0c\n\x04hash\x18\x03 \x01(\t\x12\x0e\n\x06is_dir\x18\x04 \x01(\x08\"_\n\x0b\x46ilePayload\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\r\n\x05\x63hunk\x18\x02 \x01(\x0c\x12\x13\n\x0b\x63hunk_index\x18\x03 \x01(\x05\x12\x10\n\x08is_final\x18\x04 \x01(\x08\x12\x0c\n\x04hash\x18\x05 \x01(\t\"\xa0\x01\n\nSyncStatus\x12$\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x16.agent.SyncStatus.Code\x12\x0f\n\x07message\x18\x02 \x01(\t\x12\x17\n\x0freconcile_paths\x18\x03 \x03(\t\"B\n\x04\x43ode\x12\x06\n\x02OK\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x16\n\x12RECONCILE_REQUIRED\x10\x02\x12\x0f\n\x0bIN_PROGRESS\x10\x03\x32\xe9\x01\n\x11\x41gentOrchestrator\x12L\n\x11SyncConfiguration\x12\x1a.agent.RegistrationRequest\x1a\x1b.agent.RegistrationResponse\x12\x44\n\nTaskStream\x12\x18.agent.ClientTaskMessage\x1a\x18.agent.ServerTaskMessage(\x01\x30\x01\x12@\n\x0cReportHealth\x12\x10.agent.Heartbeat\x1a\x1a.agent.HealthCheckResponse(\x01\x30\x01\x62\x06proto3') - -_globals = globals() -_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'protos.agent_pb2', _globals) -if _descriptor._USE_C_DESCRIPTORS == False: - DESCRIPTOR._options = None - _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._options = None - _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_options = b'8\001' - _globals['_TASKRESPONSE_ARTIFACTSENTRY']._options = None - _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_options = b'8\001' - _globals['_REGISTRATIONREQUEST']._serialized_start=30 - _globals['_REGISTRATIONREQUEST']._serialized_end=252 - _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_start=201 - _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_end=252 - _globals['_SANDBOXPOLICY']._serialized_start=255 - _globals['_SANDBOXPOLICY']._serialized_end=452 - _globals['_SANDBOXPOLICY_MODE']._serialized_start=418 - _globals['_SANDBOXPOLICY_MODE']._serialized_end=452 - _globals['_REGISTRATIONRESPONSE']._serialized_start=454 - _globals['_REGISTRATIONRESPONSE']._serialized_end=574 - _globals['_CLIENTTASKMESSAGE']._serialized_start=577 - _globals['_CLIENTTASKMESSAGE']._serialized_end=874 - _globals['_SKILLEVENT']._serialized_start=876 - _globals['_SKILLEVENT']._serialized_end=997 - _globals['_NODEANNOUNCE']._serialized_start=999 - _globals['_NODEANNOUNCE']._serialized_end=1030 - _globals['_BROWSEREVENT']._serialized_start=1033 - _globals['_BROWSEREVENT']._serialized_end=1168 - _globals['_SERVERTASKMESSAGE']._serialized_start=1171 - _globals['_SERVERTASKMESSAGE']._serialized_end=1487 - _globals['_TASKCANCELREQUEST']._serialized_start=1489 - _globals['_TASKCANCELREQUEST']._serialized_end=1525 - _globals['_TASKREQUEST']._serialized_start=1528 - _globals['_TASKREQUEST']._serialized_end=1737 - _globals['_BROWSERACTION']._serialized_start=1740 - _globals['_BROWSERACTION']._serialized_end=2028 - _globals['_BROWSERACTION_ACTIONTYPE']._serialized_start=1894 - _globals['_BROWSERACTION_ACTIONTYPE']._serialized_end=2028 - _globals['_TASKRESPONSE']._serialized_start=2031 - _globals['_TASKRESPONSE']._serialized_end=2383 - _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_start=2263 - _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_end=2311 - _globals['_TASKRESPONSE_STATUS']._serialized_start=2313 - _globals['_TASKRESPONSE_STATUS']._serialized_end=2373 - _globals['_BROWSERRESPONSE']._serialized_start=2386 - _globals['_BROWSERRESPONSE']._serialized_end=2606 - _globals['_CONSOLEMESSAGE']._serialized_start=2608 - _globals['_CONSOLEMESSAGE']._serialized_end=2675 - _globals['_NETWORKREQUEST']._serialized_start=2677 - _globals['_NETWORKREQUEST']._serialized_end=2781 - _globals['_WORKPOOLUPDATE']._serialized_start=2783 - _globals['_WORKPOOLUPDATE']._serialized_end=2827 - _globals['_TASKCLAIMREQUEST']._serialized_start=2829 - _globals['_TASKCLAIMREQUEST']._serialized_end=2881 - _globals['_TASKCLAIMRESPONSE']._serialized_start=2883 - _globals['_TASKCLAIMRESPONSE']._serialized_end=2952 - _globals['_HEARTBEAT']._serialized_start=2955 - _globals['_HEARTBEAT']._serialized_end=3313 - _globals['_HEALTHCHECKRESPONSE']._serialized_start=3315 - _globals['_HEALTHCHECKRESPONSE']._serialized_end=3360 - _globals['_FILESYNCMESSAGE']._serialized_start=3363 - _globals['_FILESYNCMESSAGE']._serialized_end=3591 - _globals['_SYNCCONTROL']._serialized_start=3594 - _globals['_SYNCCONTROL']._serialized_end=3893 - _globals['_SYNCCONTROL_ACTION']._serialized_start=3723 - _globals['_SYNCCONTROL_ACTION']._serialized_end=3893 - _globals['_DIRECTORYMANIFEST']._serialized_start=3895 - _globals['_DIRECTORYMANIFEST']._serialized_end=3965 - _globals['_FILEINFO']._serialized_start=3967 - _globals['_FILEINFO']._serialized_end=4035 - _globals['_FILEPAYLOAD']._serialized_start=4037 - _globals['_FILEPAYLOAD']._serialized_end=4132 - _globals['_SYNCSTATUS']._serialized_start=4135 - _globals['_SYNCSTATUS']._serialized_end=4295 - _globals['_SYNCSTATUS_CODE']._serialized_start=4229 - _globals['_SYNCSTATUS_CODE']._serialized_end=4295 - _globals['_AGENTORCHESTRATOR']._serialized_start=4298 - _globals['_AGENTORCHESTRATOR']._serialized_end=4531 -# @@protoc_insertion_point(module_scope) diff --git a/agent-node/protos/agent_pb2_grpc.py b/agent-node/protos/agent_pb2_grpc.py deleted file mode 100644 index f551b0b..0000000 --- a/agent-node/protos/agent_pb2_grpc.py +++ /dev/null @@ -1,138 +0,0 @@ -# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! -"""Client and server classes corresponding to protobuf-defined services.""" -import grpc - -from protos import agent_pb2 as protos_dot_agent__pb2 - - -class AgentOrchestratorStub(object): - """The Cortex Server exposes this service - """ - - def __init__(self, channel): - """Constructor. - - Args: - channel: A grpc.Channel. - """ - self.SyncConfiguration = channel.unary_unary( - '/agent.AgentOrchestrator/SyncConfiguration', - request_serializer=protos_dot_agent__pb2.RegistrationRequest.SerializeToString, - response_deserializer=protos_dot_agent__pb2.RegistrationResponse.FromString, - ) - self.TaskStream = channel.stream_stream( - '/agent.AgentOrchestrator/TaskStream', - request_serializer=protos_dot_agent__pb2.ClientTaskMessage.SerializeToString, - response_deserializer=protos_dot_agent__pb2.ServerTaskMessage.FromString, - ) - self.ReportHealth = channel.stream_stream( - '/agent.AgentOrchestrator/ReportHealth', - request_serializer=protos_dot_agent__pb2.Heartbeat.SerializeToString, - response_deserializer=protos_dot_agent__pb2.HealthCheckResponse.FromString, - ) - - -class AgentOrchestratorServicer(object): - """The Cortex Server exposes this service - """ - - def SyncConfiguration(self, request, context): - """1. Control Channel: Sync policies and settings (Unary) - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def TaskStream(self, request_iterator, context): - """2. Task Channel: Bidirectional work dispatch and reporting (Persistent) - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def ReportHealth(self, request_iterator, context): - """3. Health Channel: Dedicated Ping-Pong / Heartbeat (Persistent) - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - -def add_AgentOrchestratorServicer_to_server(servicer, server): - rpc_method_handlers = { - 'SyncConfiguration': grpc.unary_unary_rpc_method_handler( - servicer.SyncConfiguration, - request_deserializer=protos_dot_agent__pb2.RegistrationRequest.FromString, - response_serializer=protos_dot_agent__pb2.RegistrationResponse.SerializeToString, - ), - 'TaskStream': grpc.stream_stream_rpc_method_handler( - servicer.TaskStream, - request_deserializer=protos_dot_agent__pb2.ClientTaskMessage.FromString, - response_serializer=protos_dot_agent__pb2.ServerTaskMessage.SerializeToString, - ), - 'ReportHealth': grpc.stream_stream_rpc_method_handler( - servicer.ReportHealth, - request_deserializer=protos_dot_agent__pb2.Heartbeat.FromString, - response_serializer=protos_dot_agent__pb2.HealthCheckResponse.SerializeToString, - ), - } - generic_handler = grpc.method_handlers_generic_handler( - 'agent.AgentOrchestrator', rpc_method_handlers) - server.add_generic_rpc_handlers((generic_handler,)) - - - # This class is part of an EXPERIMENTAL API. -class AgentOrchestrator(object): - """The Cortex Server exposes this service - """ - - @staticmethod - def SyncConfiguration(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary(request, target, '/agent.AgentOrchestrator/SyncConfiguration', - protos_dot_agent__pb2.RegistrationRequest.SerializeToString, - protos_dot_agent__pb2.RegistrationResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) - - @staticmethod - def TaskStream(request_iterator, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.stream_stream(request_iterator, target, '/agent.AgentOrchestrator/TaskStream', - protos_dot_agent__pb2.ClientTaskMessage.SerializeToString, - protos_dot_agent__pb2.ServerTaskMessage.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) - - @staticmethod - def ReportHealth(request_iterator, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.stream_stream(request_iterator, target, '/agent.AgentOrchestrator/ReportHealth', - protos_dot_agent__pb2.Heartbeat.SerializeToString, - protos_dot_agent__pb2.HealthCheckResponse.FromString, - options, channel_credentials, - insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/agent-node/scripts/compile_protos.sh b/agent-node/scripts/compile_protos.sh index 1af7aa3..d78d1a1 100755 --- a/agent-node/scripts/compile_protos.sh +++ b/agent-node/scripts/compile_protos.sh @@ -1,3 +1,3 @@ #!/bin/bash -python -m grpc_tools.protoc -I./protos --python_out=. --grpc_python_out=. ./protos/agent.proto +python -m grpc_tools.protoc -I./src/protos --python_out=./src --grpc_python_out=./src ./src/protos/agent.proto echo "Protobuf compiled successfully." diff --git a/agent-node/shared_core/__init__.py b/agent-node/shared_core/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/agent-node/shared_core/__init__.py +++ /dev/null diff --git a/agent-node/shared_core/ignore.py b/agent-node/shared_core/ignore.py deleted file mode 100644 index c3f0cb5..0000000 --- a/agent-node/shared_core/ignore.py +++ /dev/null @@ -1,38 +0,0 @@ -import os -import fnmatch - -class CortexIgnore: - """Handles .cortexignore (and .gitignore) pattern matching.""" - def __init__(self, root_path): - self.root_path = root_path - self.patterns = self._load_patterns() - - def _load_patterns(self): - patterns = [".git", "node_modules", ".cortex_sync", "__pycache__", "*.pyc"] # Default ignores - ignore_file = os.path.join(self.root_path, ".cortexignore") - if not os.path.exists(ignore_file): - ignore_file = os.path.join(self.root_path, ".gitignore") - - if os.path.exists(ignore_file): - with open(ignore_file, "r") as f: - for line in f: - line = line.strip() - if line and not line.startswith("#"): - patterns.append(line) - return patterns - - def is_ignored(self, rel_path): - """Returns True if the path matches any ignore pattern.""" - for pattern in self.patterns: - # Handle directory patterns - if pattern.endswith("/"): - if rel_path.startswith(pattern) or f"/{pattern}" in f"/{rel_path}": - return True - # Standard glob matching - if fnmatch.fnmatch(rel_path, pattern) or fnmatch.fnmatch(os.path.basename(rel_path), pattern): - return True - # Handle nested matches - for part in rel_path.split(os.sep): - if fnmatch.fnmatch(part, pattern): - return True - return False diff --git a/agent-node/src/agent_node/__init__.py b/agent-node/src/agent_node/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/agent-node/src/agent_node/__init__.py diff --git a/agent-node/src/agent_node/config.py b/agent-node/src/agent_node/config.py new file mode 100644 index 0000000..f321761 --- /dev/null +++ b/agent-node/src/agent_node/config.py @@ -0,0 +1,63 @@ +import os +import platform +import yaml + +# Path to the generated config file in the bundled distribution +CONFIG_PATH = "agent_config.yaml" + +# Default values +_defaults = { + "node_id": "agent-node-007", + "node_description": "Modular Stateful Node", + "hub_url": "https://ai.jerxie.com", + "grpc_endpoint": "localhost:50051", + "auth_token": os.getenv("AGENT_AUTH_TOKEN", "cortex-secret-shared-key"), + "sync_root": "/tmp/cortex-sync", + "tls": True, + "max_skill_workers": 10, + "health_report_interval": 10, + "auto_update": True, + "update_check_interval": 300, +} + +# 1. Load from YAML if present +_config = _defaults.copy() +if os.path.exists(CONFIG_PATH): + try: + with open(CONFIG_PATH, 'r') as f: + yaml_config = yaml.safe_load(f) or {} + _config.update(yaml_config) + print(f"[*] Loaded node configuration from {CONFIG_PATH}") + except Exception as e: + print(f"[!] Error loading {CONFIG_PATH}: {e}") + +# 2. Override with Environment Variables (12-Factor style) +NODE_ID = os.getenv("AGENT_NODE_ID", _config["node_id"]) +NODE_DESC = os.getenv("AGENT_NODE_DESC", _config["node_description"]) +SERVER_HOST_PORT = os.getenv("GRPC_ENDPOINT", _config["grpc_endpoint"]) # e.g. "ai.jerxie.com:50051" +AUTH_TOKEN = os.getenv("AGENT_AUTH_TOKEN", _config["auth_token"]) +SYNC_DIR = os.getenv("CORTEX_SYNC_DIR", _config["sync_root"]) +TLS_ENABLED = os.getenv("AGENT_TLS_ENABLED", str(_config["tls"])).lower() == 'true' + +HEALTH_REPORT_INTERVAL = int(os.getenv("HEALTH_REPORT_INTERVAL", _config["health_report_interval"])) +MAX_SKILL_WORKERS = int(os.getenv("MAX_SKILL_WORKERS", _config["max_skill_workers"])) + +DEBUG_GRPC = os.getenv("DEBUG_GRPC", "false").lower() == "true" +SECRET_KEY = os.getenv("AGENT_SECRET_KEY", _config.get("secret_key", "dev-secret-key-1337")) + +# Auto-update settings +HUB_URL = os.getenv("AGENT_HUB_URL", _config.get("hub_url", "https://ai.jerxie.com")) +AUTO_UPDATE = os.getenv("AGENT_AUTO_UPDATE", str(_config.get("auto_update", True))).lower() == "true" +UPDATE_CHECK_INTERVAL = int(os.getenv("AGENT_UPDATE_CHECK_INTERVAL", _config.get("update_check_interval", 300))) + +# These are still available but likely replaced by AUTH_TOKEN / TLS_ENABLED logic +CERT_CA = os.getenv("CERT_CA", "certs/ca.crt") +CERT_CLIENT_CRT = os.getenv("CERT_CLIENT_CRT", "certs/client.crt") +CERT_CLIENT_KEY = os.getenv("CERT_CLIENT_KEY", "certs/client.key") + +# FS Explorer root - Priority: Env > YAML > HOME > / +FS_ROOT = os.getenv("CORTEX_FS_ROOT", _config.get("fs_root")) +if not FS_ROOT: + # Default to user home on unix systems to avoid showing root directory by default + FS_ROOT = os.path.expanduser("~") if platform.system() in ["Darwin", "Linux"] else "C:\\" + diff --git a/agent-node/src/agent_node/core/__init__.py b/agent-node/src/agent_node/core/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/agent-node/src/agent_node/core/__init__.py diff --git a/agent-node/src/agent_node/core/sandbox.py b/agent-node/src/agent_node/core/sandbox.py new file mode 100644 index 0000000..8fcfed5 --- /dev/null +++ b/agent-node/src/agent_node/core/sandbox.py @@ -0,0 +1,32 @@ +from protos import agent_pb2 + +class SandboxEngine: + """Core Security Engine for Local Command Verification.""" + def __init__(self): + self.policy = None + + def sync(self, p): + """Syncs the latest policy from the Orchestrator.""" + self.policy = { + "MODE": "STRICT" if p.mode == agent_pb2.SandboxPolicy.STRICT else "PERMISSIVE", + "ALLOWED": list(p.allowed_commands), + "DENIED": list(p.denied_commands), + "SENSITIVE": list(p.sensitive_commands), + "WORKING_DIR_JAIL": p.working_dir_jail + } + + def verify(self, command_str): + """Verifies if a command string is allowed under the current policy.""" + if not self.policy: return False, "No Policy" + + parts = (command_str or "").strip().split() + if not parts: return False, "Empty" + + base_cmd = parts[0] + if base_cmd in self.policy["DENIED"]: + return False, f"Forbidden command: {base_cmd}" + + if self.policy["MODE"] == "STRICT" and base_cmd not in self.policy["ALLOWED"]: + return False, f"Command '{base_cmd}' not whitelisted" + + return True, "OK" diff --git a/agent-node/src/agent_node/core/sync.py b/agent-node/src/agent_node/core/sync.py new file mode 100644 index 0000000..f130762 --- /dev/null +++ b/agent-node/src/agent_node/core/sync.py @@ -0,0 +1,119 @@ +import os +import hashlib +from agent_node.config import SYNC_DIR +from protos import agent_pb2 + +class NodeSyncManager: + """Handles local filesystem synchronization on the Agent Node.""" + def __init__(self, base_sync_dir=SYNC_DIR): + self.base_sync_dir = base_sync_dir + if not os.path.exists(self.base_sync_dir): + os.makedirs(self.base_sync_dir, exist_ok=True) + + def get_session_dir(self, session_id: str, create: bool = False) -> str: + """Returns the unique identifier directory for this session's sync.""" + path = os.path.join(self.base_sync_dir, session_id) + if create: + os.makedirs(path, exist_ok=True) + return path + + def purge(self, session_id: str): + """Completely removes a session's sync directory from the node.""" + path = os.path.join(self.base_sync_dir, session_id) + if os.path.exists(path): + import shutil + shutil.rmtree(path) + print(f" [📁🧹] Node sync directory deleted: {session_id}") + + def cleanup_unused_sessions(self, active_session_ids: list): + """Removes any session directories that are no longer active on the server.""" + if not os.path.exists(self.base_sync_dir): + return + + import shutil + active_set = set(active_session_ids) + for session_id in os.listdir(self.base_sync_dir): + if session_id.startswith("session-") and session_id not in active_set: + path = os.path.join(self.base_sync_dir, session_id) + if os.path.isdir(path): + shutil.rmtree(path) + print(f" [📁🧹] Proactively purged unused session directory: {session_id}") + + def handle_manifest(self, session_id: str, manifest: agent_pb2.DirectoryManifest) -> list: + """Compares local files with the server manifest and returns paths needing update.""" + session_dir = self.get_session_dir(session_id, create=True) + print(f"[📁] Reconciling Sync Directory: {session_dir}") + + from shared_core.ignore import CortexIgnore + ignore_filter = CortexIgnore(session_dir) + expected_paths = {f.path for f in manifest.files} + + # 1. Purge extraneous local files and directories (handles Deletions) + for root, dirs, files in os.walk(session_dir, topdown=False): + for name in files: + abs_path = os.path.join(root, name) + rel_path = os.path.relpath(abs_path, session_dir) + if rel_path in [".cortexignore", ".gitignore"]: continue + if rel_path not in expected_paths and not ignore_filter.is_ignored(rel_path): + try: + os.remove(abs_path) + print(f" [📁🗑️] Deleted extraneous local file: {rel_path}") + except Exception as e: + print(f" [⚠️] Failed to delete file {rel_path}: {e}") + + for name in dirs: + abs_path = os.path.join(root, name) + rel_path = os.path.relpath(abs_path, session_dir) + if rel_path not in expected_paths and not ignore_filter.is_ignored(rel_path): + try: + if not os.listdir(abs_path): + os.rmdir(abs_path) + except Exception: + pass + + needs_update = [] + for file_info in manifest.files: + target_path = os.path.join(session_dir, file_info.path) + + if file_info.is_dir: + os.makedirs(target_path, exist_ok=True) + continue + + # File Check + if not os.path.exists(target_path): + needs_update.append(file_info.path) + else: + # Hash comparison + with open(target_path, "rb") as f: + actual_hash = hashlib.sha256(f.read()).hexdigest() + if actual_hash != file_info.hash: + print(f" [⚠️] Drift Detected: {file_info.path} (Local: {actual_hash[:8]} vs Remote: {file_info.hash[:8]})") + needs_update.append(file_info.path) + + return needs_update + + def write_chunk(self, session_id: str, payload: agent_pb2.FilePayload) -> bool: + """Writes a file chunk to the local session directory.""" + session_dir = self.get_session_dir(session_id, create=True) + target_path = os.path.normpath(os.path.join(session_dir, payload.path)) + + if not target_path.startswith(session_dir): + return False # Path traversal guard + + os.makedirs(os.path.dirname(target_path), exist_ok=True) + + mode = "ab" if payload.chunk_index > 0 else "wb" + with open(target_path, mode) as f: + f.write(payload.chunk) + + if payload.is_final and payload.hash: + return self._verify(target_path, payload.hash) + return True + + def _verify(self, path, expected_hash): + with open(path, "rb") as f: + actual = hashlib.sha256(f.read()).hexdigest() + if actual != expected_hash: + print(f"[⚠️] Sync Hash Mismatch for {path}") + return False + return True diff --git a/agent-node/src/agent_node/core/updater.py b/agent-node/src/agent_node/core/updater.py new file mode 100644 index 0000000..7b9562b --- /dev/null +++ b/agent-node/src/agent_node/core/updater.py @@ -0,0 +1,134 @@ +""" +Auto-Update Trigger for Cortex Agent Node. + +Detects when the running agent is behind the hub's version and +delegates to bootstrap_installer.py to perform the update — the same +program used for Day 0 installation. + +Both bootstrap and version bump follow the exact same code path: + bootstrap_installer.py → download → extract → install deps → launch + +Channel: Stable HTTP REST only. No gRPC/proto. This contract is frozen. +""" + +import os +import sys +import time +import json +import logging +import threading +import subprocess +import urllib.request + +logger = logging.getLogger(__name__) + +_HUB_HTTP_URL: str = "" +_AUTH_TOKEN: str = "" +_CHECK_INTERVAL_SECS: int = 300 + +# bootstrap_installer.py lives at the agent-node root (two levels up from here) +_AGENT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +_VERSION_FILE = os.path.join(_AGENT_ROOT, "VERSION") +_BOOTSTRAPPER = os.path.join(_AGENT_ROOT, "bootstrap_installer.py") + + +def _read_local_version() -> str: + try: + with open(_VERSION_FILE) as f: + return f.read().strip() + except FileNotFoundError: + return "0.0.0" + + +def _fetch_remote_version() -> str | None: + url = f"{_HUB_HTTP_URL}/api/v1/agent/version" + try: + req = urllib.request.Request(url, headers={"X-Agent-Token": _AUTH_TOKEN}) + with urllib.request.urlopen(req, timeout=10) as resp: + return json.loads(resp.read().decode()).get("version") + except Exception as e: + logger.warning(f"[Updater] Version check failed: {e}") + return None + + +def _version_tuple(v: str): + try: + return tuple(int(x) for x in v.split(".")) + except Exception: + return (0, 0, 0) + + +def _apply_update_via_bootstrapper(): + """ + Delegates to bootstrap_installer.py --update-only — the same code path + as Day 0 installation — then restarts this process. + Does not return on success. + """ + if not os.path.exists(_BOOTSTRAPPER): + logger.error(f"[Updater] bootstrap_installer.py not found at {_BOOTSTRAPPER}") + return False + + logger.info("[Updater] ⬇️ Delegating update to bootstrap_installer.py ...") + result = subprocess.run( + [sys.executable, _BOOTSTRAPPER, + "--hub", _HUB_HTTP_URL, + "--token", _AUTH_TOKEN, + "--update-only", + "--install-dir", _AGENT_ROOT], + cwd=_AGENT_ROOT + ) + + if result.returncode == 0: + logger.info("[Updater] ✅ Update applied. Restarting agent process...") + sys.stdout.flush() + sys.stderr.flush() + os.execv(sys.executable, [sys.executable] + sys.argv) # in-place restart, no return + else: + logger.error(f"[Updater] bootstrap_installer.py failed (exit {result.returncode}). Continuing with current version.") + return False + + +def check_and_update_once(): + """ + Single version check against the hub. If a newer version is available, + triggers bootstrap_installer.py and restarts (does not return if applied). + """ + local = _read_local_version() + logger.info(f"[Updater] Local version: {local}") + + remote = _fetch_remote_version() + if remote is None: + logger.info("[Updater] Hub unreachable — skipping update check.") + return + + logger.info(f"[Updater] Remote version: {remote}") + + if _version_tuple(remote) <= _version_tuple(local): + logger.info("[Updater] ✅ Already up to date.") + return + + logger.info(f"[Updater] 🆕 Update available: {local} → {remote}") + _apply_update_via_bootstrapper() # does not return on success + + +def start_background_updater(): + """Starts a daemon thread that periodically checks for new versions.""" + def _loop(): + while True: + time.sleep(_CHECK_INTERVAL_SECS) + try: + check_and_update_once() + except Exception as e: + logger.error(f"[Updater] Background check error: {e}") + + t = threading.Thread(target=_loop, daemon=True, name="AutoUpdater") + t.start() + logger.info(f"[Updater] Background updater started (interval: {_CHECK_INTERVAL_SECS}s)") + + +def init(hub_http_url: str, auth_token: str, check_interval_secs: int = 300): + """Initialize with hub connection details. Call before any other function.""" + global _HUB_HTTP_URL, _AUTH_TOKEN, _CHECK_INTERVAL_SECS + _HUB_HTTP_URL = hub_http_url.rstrip("/") + _AUTH_TOKEN = auth_token + _CHECK_INTERVAL_SECS = check_interval_secs diff --git a/agent-node/src/agent_node/core/watcher.py b/agent-node/src/agent_node/core/watcher.py new file mode 100644 index 0000000..4946f53 --- /dev/null +++ b/agent-node/src/agent_node/core/watcher.py @@ -0,0 +1,112 @@ + +import time +import os +import hashlib +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler +from shared_core.ignore import CortexIgnore +from protos import agent_pb2 + +class SyncEventHandler(FileSystemEventHandler): + """Listens for FS events and triggers gRPC delta pushes.""" + def __init__(self, session_id, root_path, callback): + self.session_id = session_id + self.root_path = root_path + self.callback = callback + self.ignore_filter = CortexIgnore(root_path) + self.last_sync = {} # path -> last_hash + self.locked = False + + def on_modified(self, event): + if not event.is_directory: + self._process_change(event.src_path) + + def on_created(self, event): + if not event.is_directory: + self._process_change(event.src_path) + + def on_moved(self, event): + # Simplification: treat move as a delete and create, or just process the dest + self._process_change(event.dest_path) + + def _process_change(self, abs_path): + if self.locked: + return # Block all user edits when session is locked + + rel_path = os.path.normpath(os.path.relpath(abs_path, self.root_path)) + + # Phase 3: Dynamic reload if .cortexignore / .gitignore changed + if rel_path in [".cortexignore", ".gitignore"]: + print(f" [*] Reloading Ignore Filter for {self.session_id}") + self.ignore_filter = CortexIgnore(self.root_path) + + if self.ignore_filter.is_ignored(rel_path): + return + + try: + with open(abs_path, "rb") as f: + content = f.read() + file_hash = hashlib.sha256(content).hexdigest() + + if self.last_sync.get(rel_path) == file_hash: + return # No actual change + + self.last_sync[rel_path] = file_hash + print(f" [📁📤] Detected Change: {rel_path}") + + # Chunk and Send + chunk_size = 64 * 1024 + for i in range(0, len(content), chunk_size): + chunk = content[i:i + chunk_size] + is_final = i + chunk_size >= len(content) + payload = agent_pb2.FilePayload( + path=rel_path, + chunk=chunk, + chunk_index=i // chunk_size, + is_final=is_final, + hash=file_hash if is_final else "" + ) + self.callback(self.session_id, payload) + except Exception as e: + print(f" [!] Watcher Error for {rel_path}: {e}") + +class WorkspaceWatcher: + """Manages FS observers for active synchronization.""" + def __init__(self, callback): + self.callback = callback + self.observers = {} # session_id -> (observer, handler) + + def set_lock(self, session_id, locked=True): + if session_id in self.observers: + print(f"[*] Workspace LOCK for {session_id}: {locked}") + self.observers[session_id][1].locked = locked + + def start_watching(self, session_id, root_path): + if session_id in self.observers: + self.stop_watching(session_id) + + print(f"[*] Starting Watcher for Session {session_id} at {root_path}") + os.makedirs(root_path, exist_ok=True) + + handler = SyncEventHandler(session_id, root_path, self.callback) + observer = Observer() + observer.schedule(handler, root_path, recursive=True) + observer.start() + self.observers[session_id] = (observer, handler) + + def stop_watching(self, session_id): + if session_id in self.observers: + print(f"[*] Stopping Watcher for Session {session_id}") + obs, _ = self.observers[session_id] + obs.stop() + obs.join() + del self.observers[session_id] + + def get_watch_path(self, session_id): + if session_id in self.observers: + return self.observers[session_id][1].root_path + return None + + def shutdown(self): + for sid in list(self.observers.keys()): + self.stop_watching(sid) diff --git a/agent-node/src/agent_node/main.py b/agent-node/src/agent_node/main.py new file mode 100644 index 0000000..02faf4d --- /dev/null +++ b/agent-node/src/agent_node/main.py @@ -0,0 +1,45 @@ +import sys +import os + +# Add root to path to find protos and other packages +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +import signal +from agent_node.node import AgentNode +from agent_node.config import NODE_ID, HUB_URL, AUTH_TOKEN, SECRET_KEY, AUTO_UPDATE, UPDATE_CHECK_INTERVAL +from agent_node.core import updater + +def main(): + print(f"[*] Starting Agent Node: {NODE_ID}...") + + # 0. Auto-Update Check (before anything else — if we're behind, restart now) + if AUTO_UPDATE: + updater.init(hub_http_url=HUB_URL, auth_token=SECRET_KEY, check_interval_secs=UPDATE_CHECK_INTERVAL) + updater.check_and_update_once() # May restart process — does not return if update applied + + # 1. Initialization + node = AgentNode() + + # 2. Signal Handling for Graceful Shutdown + def handle_exit(sig, frame): + node.stop() + sys.exit(0) + + signal.signal(signal.SIGINT, handle_exit) + signal.signal(signal.SIGTERM, handle_exit) + + # Handshake: Sync configuration and Sandbox Policy + node.sync_configuration() + + # 3. Background: Start health reporting (Heartbeats) + node.start_health_reporting() + + # 4. Background: Periodic auto-update checks + if AUTO_UPDATE: + updater.start_background_updater() + + # 5. Foreground: Run Persistent Task Stream + node.run_task_stream() + +if __name__ == '__main__': + main() diff --git a/agent-node/src/agent_node/node.py b/agent-node/src/agent_node/node.py new file mode 100644 index 0000000..4b6efac --- /dev/null +++ b/agent-node/src/agent_node/node.py @@ -0,0 +1,643 @@ +import threading +import queue +import time +import sys +import os +import hashlib +import logging +import psutil +from protos import agent_pb2, agent_pb2_grpc + +logger = logging.getLogger(__name__) +from agent_node.skills.manager import SkillManager +from agent_node.core.sandbox import SandboxEngine +from agent_node.core.sync import NodeSyncManager +from agent_node.core.watcher import WorkspaceWatcher +from agent_node.utils.auth import verify_task_signature +from agent_node.utils.network import get_secure_stub +from agent_node.config import NODE_ID, NODE_DESC, AUTH_TOKEN, HEALTH_REPORT_INTERVAL, MAX_SKILL_WORKERS, DEBUG_GRPC, FS_ROOT + + +class AgentNode: + """The 'Agent Core': Orchestrates Local Skills and Maintains gRPC Connection.""" + def __init__(self, node_id=NODE_ID): + self.node_id = node_id + self.sandbox = SandboxEngine() + self.sync_mgr = NodeSyncManager() + self.skills = SkillManager(max_workers=MAX_SKILL_WORKERS, sync_mgr=self.sync_mgr) + self.watcher = WorkspaceWatcher(self._on_sync_delta) + self.task_queue = queue.Queue() + self.stub = get_secure_stub() + + def _collect_capabilities(self) -> dict: + """Collect hardware metadata to advertise at registration.""" + import platform + import subprocess + import socket + import os + + caps = { + "shell": "v1", + "browser": "playwright-sync-bridge", + "arch": platform.machine(), # e.g. x86_64, arm64, aarch64 + "os": platform.system().lower(), # linux, darwin, windows + "os_release": platform.release(), + } + + # Privilege Detection + # is_root: True if UID 0 (Linux/macOS) — no sudo needed at all + # has_sudo: True if sudo is installed AND available passwordlessly + try: + caps["is_root"] = (os.getuid() == 0) + except AttributeError: + # Windows — os.getuid() doesn't exist; approximate via admin check + try: + import ctypes + caps["is_root"] = bool(ctypes.windll.shell32.IsUserAnAdmin()) + except Exception: + caps["is_root"] = False + + if caps.get("is_root"): + caps["has_sudo"] = False # Root doesn't need sudo + else: + # Check if passwordless sudo is available + try: + r = subprocess.run( + ["sudo", "-n", "true"], + capture_output=True, timeout=3 + ) + caps["has_sudo"] = (r.returncode == 0) + except Exception: + caps["has_sudo"] = False + + # Local IP Detection (best effort) + try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.settimeout(0) + # Doesn't even have to be reachable + s.connect(('10.254.254.254', 1)) + caps["local_ip"] = s.getsockname()[0] + s.close() + except Exception: + caps["local_ip"] = "unknown" + + # GPU Detection — try nvidia-smi first, then check for Apple GPU + try: + result = subprocess.run( + ["nvidia-smi", "--query-gpu=name,memory.total", "--format=csv,noheader,nounits"], + capture_output=True, text=True, timeout=5 + ) + if result.returncode == 0 and result.stdout.strip(): + gpu_lines = result.stdout.strip().split("\n") + caps["gpu"] = gpu_lines[0].strip() # e.g. "NVIDIA GeForce RTX 3080, 10240" + caps["gpu_count"] = str(len(gpu_lines)) + else: + caps["gpu"] = "none" + except Exception: + # No nvidia-smi — check if Apple Silicon (arm64 + darwin) + if caps["os"] == "darwin" and "arm" in caps["arch"].lower(): + caps["gpu"] = "apple-silicon" + else: + caps["gpu"] = "none" + + return caps + + def sync_configuration(self): + """Initial handshake to retrieve policy and metadata.""" + print(f"[*] Handshake with Orchestrator: {self.node_id}") + caps = self._collect_capabilities() + print(f"[*] Capabilities: {caps}") + + # Protobuf capabilities is map — all values must be strings + caps_str = {k: str(v).lower() if isinstance(v, bool) else str(v) for k, v in caps.items()} + + reg_req = agent_pb2.RegistrationRequest( + node_id=self.node_id, + auth_token=AUTH_TOKEN, + node_description=NODE_DESC, + capabilities=caps_str + ) + + + try: + res = self.stub.SyncConfiguration(reg_req) + if res.success: + self.sandbox.sync(res.policy) + print("[OK] Sandbox Policy Synced.") + else: + print(f"[!] Rejection: {res.error_message}") + sys.exit(1) + except Exception as e: + print(f"[!] Connection Fail: {e}") + sys.exit(1) + + def start_health_reporting(self): + """Streaming node metrics to the orchestrator for load balancing.""" + def _report(): + while True: + try: + def _gen(): + while True: + ids = self.skills.get_active_ids() + # Collection + cpu = psutil.cpu_percent(interval=1.0) + per_core = psutil.cpu_percent(percpu=True) + + vmem = psutil.virtual_memory() + mem_percent = vmem.percent + + # GB conversion + used_gb = vmem.used / (1024**3) + total_gb = vmem.total / (1024**3) + avail_gb = vmem.available / (1024**3) + + # Freq & Load + try: + freq = psutil.cpu_freq().current + except: + freq = 0 + + try: + load = list(os.getloadavg()) + except: + load = [0.0, 0.0, 0.0] + + yield agent_pb2.Heartbeat( + node_id=self.node_id, + cpu_usage_percent=cpu, + memory_usage_percent=mem_percent, + active_worker_count=len(ids), + max_worker_capacity=MAX_SKILL_WORKERS, + running_task_ids=ids, + cpu_count=psutil.cpu_count(), + memory_used_gb=used_gb, + memory_total_gb=total_gb, + # M6 Fields + cpu_usage_per_core=per_core, + cpu_freq_mhz=freq, + memory_available_gb=avail_gb, + load_avg=load + ) + time.sleep(max(0, HEALTH_REPORT_INTERVAL - 1.0)) + + # Consume the heartbeat stream to keep it alive + for response in self.stub.ReportHealth(_gen()): + # We don't strictly need the server time, but it confirms a round-trip + pass + except Exception as e: + print(f"[!] Health reporting interrupted: {e}. Retrying in 5s...") + time.sleep(5) + + # Non-blocking thread for health heartbeat + threading.Thread(target=_report, daemon=True, name=f"Health-{self.node_id}").start() + + def run_task_stream(self): + """Main Persistent Bi-directional Stream for Task Management with Reconnection.""" + while True: + try: + def _gen(): + # Initial announcement for routing identity + announce_msg = agent_pb2.ClientTaskMessage( + announce=agent_pb2.NodeAnnounce(node_id=self.node_id) + ) + if DEBUG_GRPC: + print(f"[*] [DEBUG-gRPC] OUTBOUND: announce | {announce_msg}", flush=True) + yield announce_msg + + while True: + out_msg = self.task_queue.get() + if DEBUG_GRPC: + kind = out_msg.WhichOneof('payload') + if kind == 'file_sync' and out_msg.file_sync.HasField('file_data'): + print(f"[*] [DEBUG-gRPC] OUTBOUND: {kind} (file_data, path={out_msg.file_sync.file_data.path}, size={len(out_msg.file_sync.file_data.chunk)})", flush=True) + elif kind == 'skill_event' and out_msg.skill_event.WhichOneof('data') == 'terminal_out': + print(f"[*] [DEBUG-gRPC] OUTBOUND: {kind} (terminal_out, size={len(out_msg.skill_event.terminal_out)})", flush=True) + else: + print(f"[*] [DEBUG-gRPC] OUTBOUND: {kind} | {out_msg}", flush=True) + yield out_msg + + responses = self.stub.TaskStream(_gen()) + print(f"[*] Task Stream Online: {self.node_id}", flush=True) + + for msg in responses: + self._process_server_message(msg) + except Exception as e: + print(f"[!] Task Stream Failure: {e}. Reconnecting in 5s...", flush=True) + time.sleep(5) + # Re-sync config in case permissions changed during downtime + try: self.sync_configuration() + except: pass + + def _process_server_message(self, msg): + kind = msg.WhichOneof('payload') + if DEBUG_GRPC: + print(f"[*] [DEBUG-gRPC] INBOUND: {kind} | {msg}", flush=True) + else: + print(f"[*] Inbound: {kind}", flush=True) + + if kind == 'task_request': + self._handle_task(msg.task_request) + + elif kind == 'task_cancel': + if self.skills.cancel(msg.task_cancel.task_id): + self._send_response(msg.task_cancel.task_id, None, agent_pb2.TaskResponse.CANCELLED) + + elif kind == 'work_pool_update': + # Claim logical idle tasks from global pool with slight randomized jitter + # to prevent thundering herd where every node claims the same task at the exact same ms. + if len(self.skills.get_active_ids()) < MAX_SKILL_WORKERS: + for tid in msg.work_pool_update.available_task_ids: + # Deterministic delay based on node_id to distribute claims + import random + time.sleep(random.uniform(0.1, 0.5)) + + self.task_queue.put(agent_pb2.ClientTaskMessage( + task_claim=agent_pb2.TaskClaimRequest(task_id=tid, node_id=self.node_id) + )) + + elif kind == 'claim_status': + status = "GRANTED" if msg.claim_status.granted else "DENIED" + print(f" [📦] Claim {msg.claim_status.task_id}: {status} ({msg.claim_status.reason})", flush=True) + + elif kind == 'file_sync': + self._handle_file_sync(msg.file_sync) + + elif kind == 'policy_update': + print(f" [🔒] Live Sandbox Policy Update Received.") + self.sandbox.sync(msg.policy_update) + + def _on_sync_delta(self, session_id, file_payload): + """Callback from watcher to push local changes to server.""" + self.task_queue.put(agent_pb2.ClientTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + file_data=file_payload + ) + )) + + def _handle_file_sync(self, fs): + """Processes inbound file synchronization messages from the Orchestrator.""" + sid = fs.session_id + # LOGGING + type_str = fs.WhichOneof('payload') + print(f" [📁] Sync MSG: {type_str} | Session: {sid}") + + if fs.HasField("manifest"): + needs_update = self.sync_mgr.handle_manifest(sid, fs.manifest) + if needs_update: + print(f" [📁⚠️] Drift Detected for {sid}: {len(needs_update)} files need sync") + self.task_queue.put(agent_pb2.ClientTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=sid, + status=agent_pb2.SyncStatus( + code=agent_pb2.SyncStatus.RECONCILE_REQUIRED, + message=f"Drift detected in {len(needs_update)} files", + reconcile_paths=needs_update + ) + ) + )) + else: + self.task_queue.put(agent_pb2.ClientTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=sid, + status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.OK, message="Synchronized") + ) + )) + elif fs.HasField("file_data"): + success = self.sync_mgr.write_chunk(sid, fs.file_data) + if fs.file_data.is_final: + print(f" [📁] File Received: {fs.file_data.path} (Verified: {success})") + status = agent_pb2.SyncStatus.OK if success else agent_pb2.SyncStatus.ERROR + self.task_queue.put(agent_pb2.ClientTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=sid, + status=agent_pb2.SyncStatus(code=status, message=f"File {fs.file_data.path} synced") + ) + )) + elif fs.HasField("control"): + ctrl = fs.control + print(f" [📁] Control Action: {ctrl.action} (Path: {ctrl.path})") + if ctrl.action == agent_pb2.SyncControl.START_WATCHING: + # Path relative to sync dir or absolute + watch_path = ctrl.path if os.path.isabs(ctrl.path) else os.path.join(self.sync_mgr.get_session_dir(sid), ctrl.path) + print(f" [📁👁️] Starting Watcher on: {watch_path}") + self.watcher.start_watching(sid, watch_path) + elif ctrl.action == agent_pb2.SyncControl.STOP_WATCHING: + self.watcher.stop_watching(sid) + elif ctrl.action == agent_pb2.SyncControl.LOCK: + self.watcher.set_lock(sid, True) + elif ctrl.action == agent_pb2.SyncControl.UNLOCK: + self.watcher.set_lock(sid, False) + elif ctrl.action == agent_pb2.SyncControl.REFRESH_MANIFEST: + if ctrl.request_paths: + print(f" [📁📤] Pushing {len(ctrl.request_paths)} Requested Files for {sid}") + for path in ctrl.request_paths: + self._push_file(sid, path) + else: + # Node -> Server Manifest Push + self._push_full_manifest(sid, ctrl.path) + elif ctrl.action == agent_pb2.SyncControl.RESYNC: + self._push_full_manifest(sid, ctrl.path) + elif ctrl.action == agent_pb2.SyncControl.PURGE: + print(f" [📁🧹] Node instructed to purge session sync data: {sid}") + self.watcher.stop_watching(sid) # Stop watching before deleting + self.sync_mgr.purge(sid) + elif ctrl.action == agent_pb2.SyncControl.CLEANUP: + print(f" [📁🧹] Node proactively cleaning up defunct sessions. Active: {ctrl.request_paths}") + active_sessions = list(ctrl.request_paths) + self.sync_mgr.cleanup_unused_sessions(active_sessions) + + # --- M6: FS Explorer Handlers --- + elif ctrl.action == agent_pb2.SyncControl.LIST: + print(f" [📁📂] List Directory: {ctrl.path}") + self._push_full_manifest(sid, ctrl.path, task_id=fs.task_id, shallow=True) + elif ctrl.action == agent_pb2.SyncControl.READ: + print(f" [📁📄] Read File: {ctrl.path}") + self._push_file(sid, ctrl.path, task_id=fs.task_id) + elif ctrl.action == agent_pb2.SyncControl.WRITE: + print(f" [📁💾] Write File: {ctrl.path} (is_dir={ctrl.is_dir})") + self._handle_fs_write(sid, ctrl.path, ctrl.content, ctrl.is_dir, task_id=fs.task_id) + elif ctrl.action == agent_pb2.SyncControl.DELETE: + print(f" [📁🗑️] Delete Fragment: {ctrl.path}") + self._handle_fs_delete(sid, ctrl.path, task_id=fs.task_id) + + def _get_base_dir(self, session_id, create=False): + """Helper to resolve the effective root for a session (Watcher > SyncDir).""" + if session_id == "__fs_explorer__": + root = FS_ROOT + print(f" [📁] Explorer Root: {root}") + return root + + # Priority 1: If we have an active watcher, use its root (e.g. Seed from Local) + watched = self.watcher.get_watch_path(session_id) + if watched: + print(f" [📁] Using Watched Path as Base: {watched}") + return watched + + # Priority 2: Standard session-scoped sync directory + fallback = self.sync_mgr.get_session_dir(session_id, create=create) + print(f" [📁] Falling back to SyncDir: {fallback}") + return fallback + + def _push_full_manifest(self, session_id, rel_path=".", task_id="", shallow=False): + """Pushes the current local manifest back to the server.""" + print(f" [📁📤] Pushing {'Shallow' if shallow else 'Full'} Manifest for {session_id}") + + base_dir = self._get_base_dir(session_id, create=True) + + watch_path = os.path.normpath(os.path.join(base_dir, rel_path)) + + if not os.path.exists(watch_path): + # If the specific sub-path doesn't exist, try to create it if it's within the session dir + if session_id != "__fs_explorer__": + os.makedirs(watch_path, exist_ok=True) + else: + self.task_queue.put(agent_pb2.ClientTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + task_id=task_id, + status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.ERROR, message=f"Path {rel_path} not found") + ) + )) + return + + files = [] + try: + if shallow: + # Optimized for Explorer: immediate children only, no hashing + with os.scandir(watch_path) as it: + for entry in it: + is_dir = entry.is_dir() + # Use metadata only + try: + stats = entry.stat() + size = stats.st_size if not is_dir else 0 + except: size = 0 + + # Calculate path relative to the actual base_sync_dir / session_dir + # rel_path is the directory we are currently browsing. + # entry.name is the file within it. + item_rel_path = os.path.relpath(os.path.join(watch_path, entry.name), base_dir) + + files.append(agent_pb2.FileInfo(path=item_rel_path, size=size, hash="", is_dir=is_dir)) + else: + # Deep walk with full hashes for reconciliation + for root, dirs, filenames in os.walk(watch_path): + for filename in filenames: + abs_path = os.path.join(root, filename) + # r_path must be relative to base_dir so the server correctly joins it to the mirror root + r_path = os.path.relpath(abs_path, base_dir) + try: + with open(abs_path, "rb") as f: + h = hashlib.sha256(f.read()).hexdigest() + files.append(agent_pb2.FileInfo(path=r_path, size=os.path.getsize(abs_path), hash=h, is_dir=False)) + except Exception as e: + print(f" [⚠️] Failed to hash {abs_path}: {e}") + + for d in dirs: + abs_path = os.path.join(root, d) + # r_path must be relative to base_dir so the server correctly joins it to the mirror root + r_path = os.path.relpath(abs_path, base_dir) + files.append(agent_pb2.FileInfo(path=r_path, size=0, hash="", is_dir=True)) + except Exception as e: + print(f" [❌] Manifest generation failed for {rel_path}: {e}") + self.task_queue.put(agent_pb2.ClientTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + task_id=task_id, + status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.ERROR, message=str(e)) + ) + )) + return + + self.task_queue.put(agent_pb2.ClientTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + task_id=task_id, + manifest=agent_pb2.DirectoryManifest(root_path=rel_path, files=files) + ) + )) + + def _handle_fs_write(self, session_id, rel_path, content, is_dir, task_id=""): + """Modular FS Write/Create.""" + try: + base_dir = self._get_base_dir(session_id, create=True) + target_path = os.path.normpath(os.path.join(base_dir, rel_path)) + print(f" [📁💾] target_path: {target_path} (base_dir: {base_dir})") + + if not target_path.startswith(base_dir): + raise Exception(f"Path traversal attempt blocked: {target_path} is outside {base_dir}") + + if is_dir: + os.makedirs(target_path, exist_ok=True) + else: + os.makedirs(os.path.dirname(target_path), exist_ok=True) + with open(target_path, "wb") as f: + f.write(content) + + # Send OK status + self.task_queue.put(agent_pb2.ClientTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + task_id=task_id, + status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.OK, message=f"{'Directory' if is_dir else 'File'} written") + ) + )) + # Trigger manifest refresh so UI updates + self._push_full_manifest(session_id, os.path.dirname(rel_path) or ".", task_id=task_id, shallow=True) + except Exception as e: + self.task_queue.put(agent_pb2.ClientTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + task_id=task_id, + status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.ERROR, message=str(e)) + ) + )) + + def _handle_fs_delete(self, session_id, rel_path, task_id=""): + """Modular FS Delete.""" + try: + base_dir = self._get_base_dir(session_id) + + target_path = os.path.normpath(os.path.join(base_dir, rel_path)) + if not target_path.startswith(base_dir): + raise Exception("Path traversal attempt blocked") + + if not os.path.exists(target_path): + raise Exception("File not found") + + import shutil + if os.path.isdir(target_path): + shutil.rmtree(target_path) + else: + os.remove(target_path) + + # Send OK status + self.task_queue.put(agent_pb2.ClientTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + task_id=task_id, + status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.OK, message=f"Deleted {rel_path}") + ) + )) + # Trigger manifest refresh so UI updates + self._push_full_manifest(session_id, os.path.dirname(rel_path) or ".", task_id=task_id, shallow=True) + except Exception as e: + self.task_queue.put(agent_pb2.ClientTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + task_id=task_id, + status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.ERROR, message=str(e)) + ) + )) + + def _push_file(self, session_id, rel_path, task_id=""): + """Pushes a specific file from node to server.""" + watch_path = self._get_base_dir(session_id, create=False) + + abs_path = os.path.normpath(os.path.join(watch_path, rel_path)) + if not abs_path.startswith(watch_path): + print(f" [📁🚫] Blocked traversal attempt in _push_file: {rel_path}") + return + + if not os.path.exists(abs_path): + print(f" [📁❓] Requested file {rel_path} not found on node") + return + + with open(abs_path, "rb") as f: + full_data = f.read() + full_hash = hashlib.sha256(full_data).hexdigest() + f.seek(0) + + index = 0 + while True: + chunk = f.read(1024 * 1024) # 1MB chunks + is_final = len(chunk) < 1024 * 1024 + + self.task_queue.put(agent_pb2.ClientTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + task_id=task_id, + file_data=agent_pb2.FilePayload( + path=rel_path, + chunk=chunk, + chunk_index=index, + is_final=is_final, + hash=full_hash if is_final else "" + ) + ) + )) + + if is_final or not chunk: + break + index += 1 + + def _handle_task(self, task): + print(f"[*] Task Launch: {task.task_id}", flush=True) + # 1. Cryptographic Signature Verification + if not verify_task_signature(task): + print(f"[!] Signature Validation Failed for {task.task_id}", flush=True) + # Report back to hub so the frontend gets a real error, not a silent timeout + self._send_response( + task.task_id, + agent_pb2.TaskResponse( + task_id=task.task_id, + status=agent_pb2.TaskResponse.ERROR, + stderr="[NODE] HMAC signature mismatch — check that AGENT_SECRET_KEY on the node matches the hub SECRET_KEY. Task rejected.", + ) + ) + return + + print(f"[✅] Validated task {task.task_id}", flush=True) + + # 2. Skill Manager Submission + success, reason = self.skills.submit(task, self.sandbox, self._on_finish, self._on_event) + if not success: + print(f"[!] Execution Rejected: {reason}", flush=True) + self._send_response( + task.task_id, + agent_pb2.TaskResponse( + task_id=task.task_id, + status=agent_pb2.TaskResponse.ERROR, + stderr=f"[NODE] Execution Rejected: {reason}", + ) + ) + + def _on_event(self, event): + """Live Event Tunneler: Routes browser/skill events into the main stream.""" + if isinstance(event, agent_pb2.ClientTaskMessage): + self.task_queue.put(event) + else: + # Legacy/Browser Skill fallback + self.task_queue.put(agent_pb2.ClientTaskMessage(browser_event=event)) + + def _on_finish(self, tid, res, trace): + """Final Completion Callback: Routes task results back to server.""" + print(f"[*] Completion: {tid}", flush=True) + # 0 is SUCCESS, 1 is ERROR in Protobuf + status = res.get('status', agent_pb2.TaskResponse.ERROR) + + tr = agent_pb2.TaskResponse( + task_id=tid, status=status, + stdout=res.get('stdout',''), + stderr=res.get('stderr',''), + trace_id=trace, + browser_result=res.get("browser_result") + ) + self._send_response(tid, tr) + + def _send_response(self, tid, tr=None, status=None): + """Utility for placing response messages into the gRPC outbound queue.""" + if tr: + self.task_queue.put(agent_pb2.ClientTaskMessage(task_response=tr)) + else: + self.task_queue.put(agent_pb2.ClientTaskMessage( + task_response=agent_pb2.TaskResponse(task_id=tid, status=status) + )) + + def stop(self): + """Gracefully stops all background services and skills.""" + print(f"\n[🛑] Stopping Agent Node: {self.node_id}") + self.skills.shutdown() + # Optionally close gRPC channel if we want to be very clean + # self.channel.close() diff --git a/agent-node/src/agent_node/skills/__init__.py b/agent-node/src/agent_node/skills/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/agent-node/src/agent_node/skills/__init__.py diff --git a/agent-node/src/agent_node/skills/base.py b/agent-node/src/agent_node/skills/base.py new file mode 100644 index 0000000..33c88ec --- /dev/null +++ b/agent-node/src/agent_node/skills/base.py @@ -0,0 +1,13 @@ +class BaseSkill: + """Abstract interface for all Node capabilities (Shell, Browser, etc.).""" + def execute(self, task, sandbox, on_complete, on_event=None): + """Processes the given task and notifies results via callbacks.""" + raise NotImplementedError + + def cancel(self, task_id: str) -> bool: + """Attempts to cancel the task and returns success status.""" + return False + + def shutdown(self): + """Cleanup resources on node exit.""" + pass diff --git a/agent-node/src/agent_node/skills/manager.py b/agent-node/src/agent_node/skills/manager.py new file mode 100644 index 0000000..f565b0e --- /dev/null +++ b/agent-node/src/agent_node/skills/manager.py @@ -0,0 +1,139 @@ +import threading +import os +import importlib.util +from concurrent import futures +from agent_node.skills.base import BaseSkill +from agent_node.config import MAX_SKILL_WORKERS + +class SkillManager: + """Orchestrates multiple modular skills and manages the task worker pool.""" + def __init__(self, max_workers=MAX_SKILL_WORKERS, sync_mgr=None): + self.executor = futures.ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="skill-worker") + self.active_tasks = {} # task_id -> future + self.sync_mgr = sync_mgr + self.skills = self._discover_skills(sync_mgr) + self.max_workers = max_workers + self.lock = threading.Lock() + + def _discover_skills(self, sync_mgr): + """Scans the skills/ directory for logic.py and loads skill implementations.""" + # Find candidate locations for skills + # 1. Monorepo root (../../../skills from this file) + # 2. Agent-node local (../../skills from this file) + # 3. Docker standard (/app/skills) + candidates = [ + os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../skills")), + os.path.abspath(os.path.join(os.path.dirname(__file__), "../../skills")), + "/app/skills", + "/app/node_skills" + ] + + skills_dir = None + for cand in candidates: + if os.path.exists(cand) and os.path.isdir(cand): + # Ensure it's not a broken symlink and has actual content + try: + if any(os.path.isdir(os.path.join(cand, d)) for d in os.listdir(cand)): + skills_dir = cand + break + except OSError: + continue + + discovered = {} + if not skills_dir: + print(f" [🔧⚠️] Skills directory not found in candidate locations: {candidates}") + return discovered + + print(f" [🔧] Using skills directory: {skills_dir}") + for skill_dir in os.listdir(skills_dir): + item_path = os.path.join(skills_dir, skill_dir) + if os.path.isdir(item_path): + logic_py = os.path.join(item_path, "logic.py") + if os.path.exists(logic_py): + # Dynamic import + try: + spec = importlib.util.spec_from_file_location(f"skill_{skill_dir}", logic_py) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + # Find the first class that inherits from BaseSkill + for attr_name in dir(module): + attr = getattr(module, attr_name) + if isinstance(attr, type) and issubclass(attr, BaseSkill) and attr is not BaseSkill: + # We map the internal skill name (e.g. mesh_terminal_control) + # if we can find it in the module or assume it based on folder name + # For backward compatibility with task_type routing, we map common ones + instance = attr(sync_mgr=sync_mgr) + discovered[skill_dir] = instance + # Also map legacy names for the routing engine below + if "terminal" in skill_dir or "shell" in skill_dir: + discovered["shell"] = instance + if "browser" in skill_dir: + discovered["browser"] = instance + if "file" in skill_dir: + discovered["file"] = instance + break + except Exception as e: + print(f" [🔧❌] Failed to load skill from {logic_py}: {e}") + + print(f" [🔧] Discovered skills: {list(discovered.keys())}") + return discovered + + def submit(self, task, sandbox, on_complete, on_event=None): + """Routes a task to the appropriate skill and submits it to the thread pool.""" + # --- 0. Transparent TTY Bypass (Gaming Performance) --- + # Keystrokes and Resizes should NEVER wait for a thread or be blocked by sandbox + if "shell" in self.skills and hasattr(self.skills["shell"], "handle_transparent_tty"): + if self.skills["shell"].handle_transparent_tty(task, on_complete, on_event): + return True, "Accepted (Transparent)" + + with self.lock: + if len(self.active_tasks) >= self.max_workers: + return False, "Node Capacity Reached" + + # 1. Routing Engine + skill = None + if task.HasField("browser_action"): + skill = self.skills.get("browser") + elif task.task_type == "file": + skill = self.skills.get("file") + else: + # Default to the one that looks like a shell + skill = self.skills.get("shell") + + if not skill: + return False, f"Target skill not available for task type: {task.task_type}" + + # 2. Execution submission + future = self.executor.submit(skill.execute, task, sandbox, on_complete, on_event) + self.active_tasks[task.task_id] = future + + # Cleanup hook + future.add_done_callback(lambda f: self._cleanup(task.task_id)) + return True, "Accepted" + + def cancel(self, task_id): + """Attempts to cancel an active task through all registered skills.""" + with self.lock: + cancelled = any(s.cancel(task_id) for s in self.skills.values()) + return cancelled + + def get_active_ids(self): + """Returns the list of currently running task IDs.""" + with self.lock: + return list(self.active_tasks.keys()) + + def _cleanup(self, task_id): + """Internal callback to release capacity when a task finishes.""" + with self.lock: + self.active_tasks.pop(task_id, None) + + def shutdown(self): + """Triggers shutdown for all skills and the worker pool.""" + print("[🔧] Shutting down Skill Manager...") + with self.lock: + # Use set to avoid shutting down the same instance multiple times due to alias mapping + for skill in set(self.skills.values()): + skill.shutdown() + # Shutdown thread pool + self.executor.shutdown(wait=True) diff --git a/agent-node/src/agent_node/utils/__init__.py b/agent-node/src/agent_node/utils/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/agent-node/src/agent_node/utils/__init__.py diff --git a/agent-node/src/agent_node/utils/auth.py b/agent-node/src/agent_node/utils/auth.py new file mode 100644 index 0000000..202fd4c --- /dev/null +++ b/agent-node/src/agent_node/utils/auth.py @@ -0,0 +1,28 @@ +import jwt +import datetime +import hmac +import hashlib +from protos import agent_pb2 +from agent_node.config import SECRET_KEY + +def create_auth_token(node_id: str) -> str: + """Creates a JWT for node authentication.""" + payload = { + "sub": node_id, + "iat": datetime.datetime.utcnow(), + "exp": datetime.datetime.utcnow() + datetime.timedelta(minutes=10) + } + return jwt.encode(payload, SECRET_KEY, algorithm="HS256") + +def verify_task_signature(task, secret=SECRET_KEY) -> bool: + """Verifies HMAC signature for shell or browser tasks.""" + if task.HasField("browser_action"): + a = task.browser_action + # Aligned with orchestrator's sign_browser_action using the string Name + kind = agent_pb2.BrowserAction.ActionType.Name(a.action) + sign_base = f"{kind}:{a.url}:{a.session_id}" + else: + sign_base = task.payload_json + + expected_sig = hmac.new(secret.encode(), sign_base.encode(), hashlib.sha256).hexdigest() + return hmac.compare_digest(task.signature, expected_sig) diff --git a/agent-node/src/agent_node/utils/network.py b/agent-node/src/agent_node/utils/network.py new file mode 100644 index 0000000..04b97c3 --- /dev/null +++ b/agent-node/src/agent_node/utils/network.py @@ -0,0 +1,38 @@ +import grpc +import os +from protos import agent_pb2_grpc +from agent_node.config import SERVER_HOST_PORT, TLS_ENABLED, CERT_CA, CERT_CLIENT_CRT, CERT_CLIENT_KEY + +def get_secure_stub(): + """Initializes a gRPC channel (Secure or Insecure) and returns the orchestrator stub.""" + + options = [ + ('grpc.keepalive_time_ms', 30000), # Send keepalive ping every 30s + ('grpc.keepalive_timeout_ms', 10000), # Wait 10s for pong + ('grpc.keepalive_permit_without_calls', True), + ('grpc.http2.max_pings_without_data', 0), # Allow infinite pings + ('grpc.max_receive_message_length', 128 * 1024 * 1024), + ('grpc.max_send_message_length', 128 * 1024 * 1024), + ] + + if not TLS_ENABLED: + print(f"[!] TLS is disabled. Connecting via insecure channel to {SERVER_HOST_PORT}") + channel = grpc.insecure_channel(SERVER_HOST_PORT, options=options) + return agent_pb2_grpc.AgentOrchestratorStub(channel) + + print(f"[*] Connecting via secure (mTLS) channel to {SERVER_HOST_PORT}") + try: + with open(CERT_CLIENT_KEY, 'rb') as f: pkey = f.read() + with open(CERT_CLIENT_CRT, 'rb') as f: cert = f.read() + with open(CERT_CA, 'rb') as f: ca = f.read() + + creds = grpc.ssl_channel_credentials(ca, pkey, cert) + channel = grpc.secure_channel(SERVER_HOST_PORT, creds, options=options) + return agent_pb2_grpc.AgentOrchestratorStub(channel) + except FileNotFoundError as e: + print(f"[!] mTLS Certificate files not found: {e}. Falling back to standard TLS (Server Verify)...") + # Fallback to standard TLS (uses system CA roots by default) + creds = grpc.ssl_channel_credentials() + channel = grpc.secure_channel(SERVER_HOST_PORT, creds, options=options) + return agent_pb2_grpc.AgentOrchestratorStub(channel) + diff --git a/agent-node/src/protos/__init__.py b/agent-node/src/protos/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/agent-node/src/protos/__init__.py diff --git a/agent-node/src/protos/agent.proto b/agent-node/src/protos/agent.proto new file mode 100644 index 0000000..46b74fa --- /dev/null +++ b/agent-node/src/protos/agent.proto @@ -0,0 +1,280 @@ +syntax = "proto3"; + +package agent; + +// The Cortex Server exposes this service +service AgentOrchestrator { + // 1. Control Channel: Sync policies and settings (Unary) + rpc SyncConfiguration(RegistrationRequest) returns (RegistrationResponse); + + // 2. Task Channel: Bidirectional work dispatch and reporting (Persistent) + rpc TaskStream(stream ClientTaskMessage) returns (stream ServerTaskMessage); + + // 3. Health Channel: Dedicated Ping-Pong / Heartbeat (Persistent) + rpc ReportHealth(stream Heartbeat) returns (stream HealthCheckResponse); +} + +// --- Channel 1: Registration & Policy --- +message RegistrationRequest { + string node_id = 1; + string version = 2; + string auth_token = 3; + string node_description = 4; // AI-readable description of this node's role + map capabilities = 5; // e.g. "gpu": "nvidia-3080", "os": "ubuntu-22.04" +} + +message SandboxPolicy { + enum Mode { + STRICT = 0; + PERMISSIVE = 1; + } + Mode mode = 1; + repeated string allowed_commands = 2; + repeated string denied_commands = 3; + repeated string sensitive_commands = 4; + string working_dir_jail = 5; +} + +message RegistrationResponse { + bool success = 1; + string error_message = 2; + string session_id = 3; + SandboxPolicy policy = 4; +} + +// --- Channel 2: Tasks & Collaboration --- +message ClientTaskMessage { + oneof payload { + TaskResponse task_response = 1; + TaskClaimRequest task_claim = 2; + BrowserEvent browser_event = 3; + NodeAnnounce announce = 4; // NEW: Identification on stream connect + FileSyncMessage file_sync = 5; // NEW: Ghost Mirror Sync + SkillEvent skill_event = 6; // NEW: Persistent real-time skill data + } +} + +message SkillEvent { + string session_id = 1; + string task_id = 2; + oneof data { + string terminal_out = 3; // Raw stdout/stderr chunks + string prompt = 4; // Interactive prompt (like password) + bool keep_alive = 5; // Session preservation + } +} + +message NodeAnnounce { + string node_id = 1; +} + +message BrowserEvent { + string session_id = 1; + oneof event { + ConsoleMessage console_msg = 2; + NetworkRequest network_req = 3; + } +} + +message ServerTaskMessage { + oneof payload { + TaskRequest task_request = 1; + WorkPoolUpdate work_pool_update = 2; + TaskClaimResponse claim_status = 3; + TaskCancelRequest task_cancel = 4; + FileSyncMessage file_sync = 5; // NEW: Ghost Mirror Sync + SandboxPolicy policy_update = 6; // NEW: Live Policy Update + } +} + +message TaskCancelRequest { + string task_id = 1; +} + +message TaskRequest { + string task_id = 1; + string task_type = 2; + oneof payload { + string payload_json = 3; // For legacy shell/fallback + BrowserAction browser_action = 7; // NEW: Structured Browser Skill + } + int32 timeout_ms = 4; + string trace_id = 5; + string signature = 6; + string session_id = 8; // NEW: Map execution to a sync workspace +} + +message BrowserAction { + enum ActionType { + NAVIGATE = 0; + CLICK = 1; + TYPE = 2; + SCREENSHOT = 3; + GET_DOM = 4; + HOVER = 5; + SCROLL = 6; + CLOSE = 7; + EVAL = 8; + GET_A11Y = 9; + } + ActionType action = 1; + string url = 2; + string selector = 3; + string text = 4; + string session_id = 5; + int32 x = 6; + int32 y = 7; +} + +message TaskResponse { + string task_id = 1; + enum Status { + SUCCESS = 0; + ERROR = 1; + TIMEOUT = 2; + CANCELLED = 3; + } + Status status = 2; + string stdout = 3; + string stderr = 4; + string trace_id = 5; + map artifacts = 6; + + // NEW: Structured Skill Results + oneof result { + BrowserResponse browser_result = 7; + } +} + +message BrowserResponse { + string url = 1; + string title = 2; + bytes snapshot = 3; + string dom_content = 4; + string a11y_tree = 5; + string eval_result = 6; + repeated ConsoleMessage console_history = 7; + repeated NetworkRequest network_history = 8; +} + +message ConsoleMessage { + string level = 1; + string text = 2; + int64 timestamp_ms = 3; +} + +message NetworkRequest { + string method = 1; + string url = 2; + int32 status = 3; + string resource_type = 4; + int64 latency_ms = 5; +} + +message WorkPoolUpdate { + repeated string available_task_ids = 1; +} + +message TaskClaimRequest { + string task_id = 1; + string node_id = 2; +} + +message TaskClaimResponse { + string task_id = 1; + bool granted = 2; + string reason = 3; +} + +// --- Channel 3: Health & Observation --- +message Heartbeat { + string node_id = 1; + float cpu_usage_percent = 2; + float memory_usage_percent = 3; + int32 active_worker_count = 4; + int32 max_worker_capacity = 5; + string status_message = 6; + repeated string running_task_ids = 7; + int32 cpu_count = 8; + float memory_used_gb = 9; + float memory_total_gb = 10; + + // Rich Metrics (M6) + repeated float cpu_usage_per_core = 11; + float cpu_freq_mhz = 12; + float memory_available_gb = 13; + repeated float load_avg = 14; // [1min, 5min, 15min] +} + + +message HealthCheckResponse { + int64 server_time_ms = 1; +} + +// --- Channel 4: Ghost Mirror File Sync --- +message FileSyncMessage { + string session_id = 1; + oneof payload { + DirectoryManifest manifest = 2; + FilePayload file_data = 3; + SyncStatus status = 4; + SyncControl control = 5; + } + string task_id = 6; // NEW: Correlation ID for FS operations +} + +message SyncControl { + enum Action { + START_WATCHING = 0; + STOP_WATCHING = 1; + LOCK = 2; // Server -> Node: Disable user-side edits + UNLOCK = 3; // Server -> Node: Enable user-side edits + REFRESH_MANIFEST = 4; // Server -> Node: Request a full manifest from node + RESYNC = 5; // Server -> Node: Force a hash-based reconciliation + + // FS Operations (Modular Explorer) + LIST = 6; // Server -> Node: List directory contents (returns manifest) + READ = 7; // Server -> Node: Read file content (returns file_data) + WRITE = 8; // Server -> Node: Write/Create file + DELETE = 9; // Server -> Node: Delete file or directory + PURGE = 10; // Server -> Node: Purge local sync directory entirely + CLEANUP = 11; // Server -> Node: Purge any session dirs not in request_paths + } + Action action = 1; + string path = 2; + repeated string request_paths = 3; // NEW: Specific files requested for pull + bytes content = 4; // NEW: For WRITE operation + bool is_dir = 5; // NEW: For TOUCH/WRITE operation +} + +message DirectoryManifest { + string root_path = 1; + repeated FileInfo files = 2; +} + +message FileInfo { + string path = 1; + int64 size = 2; + string hash = 3; // For drift detection + bool is_dir = 4; +} + +message FilePayload { + string path = 1; + bytes chunk = 2; + int32 chunk_index = 3; + bool is_final = 4; + string hash = 5; // Full file hash for verification on final chunk +} + +message SyncStatus { + enum Code { + OK = 0; + ERROR = 1; + RECONCILE_REQUIRED = 2; + IN_PROGRESS = 3; + } + Code code = 1; + string message = 2; + repeated string reconcile_paths = 3; // NEW: Files needing immediate re-sync +} diff --git a/agent-node/src/protos/agent_pb2.py b/agent-node/src/protos/agent_pb2.py new file mode 100644 index 0000000..3075f56 --- /dev/null +++ b/agent-node/src/protos/agent_pb2.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: protos/agent.proto +# Protobuf Python Version: 4.25.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12protos/agent.proto\x12\x05\x61gent\"\xde\x01\n\x13RegistrationRequest\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x0f\n\x07version\x18\x02 \x01(\t\x12\x12\n\nauth_token\x18\x03 \x01(\t\x12\x18\n\x10node_description\x18\x04 \x01(\t\x12\x42\n\x0c\x63\x61pabilities\x18\x05 \x03(\x0b\x32,.agent.RegistrationRequest.CapabilitiesEntry\x1a\x33\n\x11\x43\x61pabilitiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xc5\x01\n\rSandboxPolicy\x12\'\n\x04mode\x18\x01 \x01(\x0e\x32\x19.agent.SandboxPolicy.Mode\x12\x18\n\x10\x61llowed_commands\x18\x02 \x03(\t\x12\x17\n\x0f\x64\x65nied_commands\x18\x03 \x03(\t\x12\x1a\n\x12sensitive_commands\x18\x04 \x03(\t\x12\x18\n\x10working_dir_jail\x18\x05 \x01(\t\"\"\n\x04Mode\x12\n\n\x06STRICT\x10\x00\x12\x0e\n\nPERMISSIVE\x10\x01\"x\n\x14RegistrationResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x15\n\rerror_message\x18\x02 \x01(\t\x12\x12\n\nsession_id\x18\x03 \x01(\t\x12$\n\x06policy\x18\x04 \x01(\x0b\x32\x14.agent.SandboxPolicy\"\xa9\x02\n\x11\x43lientTaskMessage\x12,\n\rtask_response\x18\x01 \x01(\x0b\x32\x13.agent.TaskResponseH\x00\x12-\n\ntask_claim\x18\x02 \x01(\x0b\x32\x17.agent.TaskClaimRequestH\x00\x12,\n\rbrowser_event\x18\x03 \x01(\x0b\x32\x13.agent.BrowserEventH\x00\x12\'\n\x08\x61nnounce\x18\x04 \x01(\x0b\x32\x13.agent.NodeAnnounceH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12(\n\x0bskill_event\x18\x06 \x01(\x0b\x32\x11.agent.SkillEventH\x00\x42\t\n\x07payload\"y\n\nSkillEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12\x0f\n\x07task_id\x18\x02 \x01(\t\x12\x16\n\x0cterminal_out\x18\x03 \x01(\tH\x00\x12\x10\n\x06prompt\x18\x04 \x01(\tH\x00\x12\x14\n\nkeep_alive\x18\x05 \x01(\x08H\x00\x42\x06\n\x04\x64\x61ta\"\x1f\n\x0cNodeAnnounce\x12\x0f\n\x07node_id\x18\x01 \x01(\t\"\x87\x01\n\x0c\x42rowserEvent\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x0b\x63onsole_msg\x18\x02 \x01(\x0b\x32\x15.agent.ConsoleMessageH\x00\x12,\n\x0bnetwork_req\x18\x03 \x01(\x0b\x32\x15.agent.NetworkRequestH\x00\x42\x07\n\x05\x65vent\"\xbc\x02\n\x11ServerTaskMessage\x12*\n\x0ctask_request\x18\x01 \x01(\x0b\x32\x12.agent.TaskRequestH\x00\x12\x31\n\x10work_pool_update\x18\x02 \x01(\x0b\x32\x15.agent.WorkPoolUpdateH\x00\x12\x30\n\x0c\x63laim_status\x18\x03 \x01(\x0b\x32\x18.agent.TaskClaimResponseH\x00\x12/\n\x0btask_cancel\x18\x04 \x01(\x0b\x32\x18.agent.TaskCancelRequestH\x00\x12+\n\tfile_sync\x18\x05 \x01(\x0b\x32\x16.agent.FileSyncMessageH\x00\x12-\n\rpolicy_update\x18\x06 \x01(\x0b\x32\x14.agent.SandboxPolicyH\x00\x42\t\n\x07payload\"$\n\x11TaskCancelRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\"\xd1\x01\n\x0bTaskRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x11\n\ttask_type\x18\x02 \x01(\t\x12\x16\n\x0cpayload_json\x18\x03 \x01(\tH\x00\x12.\n\x0e\x62rowser_action\x18\x07 \x01(\x0b\x32\x14.agent.BrowserActionH\x00\x12\x12\n\ntimeout_ms\x18\x04 \x01(\x05\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x11\n\tsignature\x18\x06 \x01(\t\x12\x12\n\nsession_id\x18\x08 \x01(\tB\t\n\x07payload\"\xa0\x02\n\rBrowserAction\x12/\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x1f.agent.BrowserAction.ActionType\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x10\n\x08selector\x18\x03 \x01(\t\x12\x0c\n\x04text\x18\x04 \x01(\t\x12\x12\n\nsession_id\x18\x05 \x01(\t\x12\t\n\x01x\x18\x06 \x01(\x05\x12\t\n\x01y\x18\x07 \x01(\x05\"\x86\x01\n\nActionType\x12\x0c\n\x08NAVIGATE\x10\x00\x12\t\n\x05\x43LICK\x10\x01\x12\x08\n\x04TYPE\x10\x02\x12\x0e\n\nSCREENSHOT\x10\x03\x12\x0b\n\x07GET_DOM\x10\x04\x12\t\n\x05HOVER\x10\x05\x12\n\n\x06SCROLL\x10\x06\x12\t\n\x05\x43LOSE\x10\x07\x12\x08\n\x04\x45VAL\x10\x08\x12\x0c\n\x08GET_A11Y\x10\t\"\xe0\x02\n\x0cTaskResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12*\n\x06status\x18\x02 \x01(\x0e\x32\x1a.agent.TaskResponse.Status\x12\x0e\n\x06stdout\x18\x03 \x01(\t\x12\x0e\n\x06stderr\x18\x04 \x01(\t\x12\x10\n\x08trace_id\x18\x05 \x01(\t\x12\x35\n\tartifacts\x18\x06 \x03(\x0b\x32\".agent.TaskResponse.ArtifactsEntry\x12\x30\n\x0e\x62rowser_result\x18\x07 \x01(\x0b\x32\x16.agent.BrowserResponseH\x00\x1a\x30\n\x0e\x41rtifactsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x0c:\x02\x38\x01\"<\n\x06Status\x12\x0b\n\x07SUCCESS\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x0b\n\x07TIMEOUT\x10\x02\x12\r\n\tCANCELLED\x10\x03\x42\x08\n\x06result\"\xdc\x01\n\x0f\x42rowserResponse\x12\x0b\n\x03url\x18\x01 \x01(\t\x12\r\n\x05title\x18\x02 \x01(\t\x12\x10\n\x08snapshot\x18\x03 \x01(\x0c\x12\x13\n\x0b\x64om_content\x18\x04 \x01(\t\x12\x11\n\ta11y_tree\x18\x05 \x01(\t\x12\x13\n\x0b\x65val_result\x18\x06 \x01(\t\x12.\n\x0f\x63onsole_history\x18\x07 \x03(\x0b\x32\x15.agent.ConsoleMessage\x12.\n\x0fnetwork_history\x18\x08 \x03(\x0b\x32\x15.agent.NetworkRequest\"C\n\x0e\x43onsoleMessage\x12\r\n\x05level\x18\x01 \x01(\t\x12\x0c\n\x04text\x18\x02 \x01(\t\x12\x14\n\x0ctimestamp_ms\x18\x03 \x01(\x03\"h\n\x0eNetworkRequest\x12\x0e\n\x06method\x18\x01 \x01(\t\x12\x0b\n\x03url\x18\x02 \x01(\t\x12\x0e\n\x06status\x18\x03 \x01(\x05\x12\x15\n\rresource_type\x18\x04 \x01(\t\x12\x12\n\nlatency_ms\x18\x05 \x01(\x03\",\n\x0eWorkPoolUpdate\x12\x1a\n\x12\x61vailable_task_ids\x18\x01 \x03(\t\"4\n\x10TaskClaimRequest\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07node_id\x18\x02 \x01(\t\"E\n\x11TaskClaimResponse\x12\x0f\n\x07task_id\x18\x01 \x01(\t\x12\x0f\n\x07granted\x18\x02 \x01(\x08\x12\x0e\n\x06reason\x18\x03 \x01(\t\"\xe6\x02\n\tHeartbeat\x12\x0f\n\x07node_id\x18\x01 \x01(\t\x12\x19\n\x11\x63pu_usage_percent\x18\x02 \x01(\x02\x12\x1c\n\x14memory_usage_percent\x18\x03 \x01(\x02\x12\x1b\n\x13\x61\x63tive_worker_count\x18\x04 \x01(\x05\x12\x1b\n\x13max_worker_capacity\x18\x05 \x01(\x05\x12\x16\n\x0estatus_message\x18\x06 \x01(\t\x12\x18\n\x10running_task_ids\x18\x07 \x03(\t\x12\x11\n\tcpu_count\x18\x08 \x01(\x05\x12\x16\n\x0ememory_used_gb\x18\t \x01(\x02\x12\x17\n\x0fmemory_total_gb\x18\n \x01(\x02\x12\x1a\n\x12\x63pu_usage_per_core\x18\x0b \x03(\x02\x12\x14\n\x0c\x63pu_freq_mhz\x18\x0c \x01(\x02\x12\x1b\n\x13memory_available_gb\x18\r \x01(\x02\x12\x10\n\x08load_avg\x18\x0e \x03(\x02\"-\n\x13HealthCheckResponse\x12\x16\n\x0eserver_time_ms\x18\x01 \x01(\x03\"\xe4\x01\n\x0f\x46ileSyncMessage\x12\x12\n\nsession_id\x18\x01 \x01(\t\x12,\n\x08manifest\x18\x02 \x01(\x0b\x32\x18.agent.DirectoryManifestH\x00\x12\'\n\tfile_data\x18\x03 \x01(\x0b\x32\x12.agent.FilePayloadH\x00\x12#\n\x06status\x18\x04 \x01(\x0b\x32\x11.agent.SyncStatusH\x00\x12%\n\x07\x63ontrol\x18\x05 \x01(\x0b\x32\x12.agent.SyncControlH\x00\x12\x0f\n\x07task_id\x18\x06 \x01(\tB\t\n\x07payload\"\xab\x02\n\x0bSyncControl\x12)\n\x06\x61\x63tion\x18\x01 \x01(\x0e\x32\x19.agent.SyncControl.Action\x12\x0c\n\x04path\x18\x02 \x01(\t\x12\x15\n\rrequest_paths\x18\x03 \x03(\t\x12\x0f\n\x07\x63ontent\x18\x04 \x01(\x0c\x12\x0e\n\x06is_dir\x18\x05 \x01(\x08\"\xaa\x01\n\x06\x41\x63tion\x12\x12\n\x0eSTART_WATCHING\x10\x00\x12\x11\n\rSTOP_WATCHING\x10\x01\x12\x08\n\x04LOCK\x10\x02\x12\n\n\x06UNLOCK\x10\x03\x12\x14\n\x10REFRESH_MANIFEST\x10\x04\x12\n\n\x06RESYNC\x10\x05\x12\x08\n\x04LIST\x10\x06\x12\x08\n\x04READ\x10\x07\x12\t\n\x05WRITE\x10\x08\x12\n\n\x06\x44\x45LETE\x10\t\x12\t\n\x05PURGE\x10\n\x12\x0b\n\x07\x43LEANUP\x10\x0b\"F\n\x11\x44irectoryManifest\x12\x11\n\troot_path\x18\x01 \x01(\t\x12\x1e\n\x05\x66iles\x18\x02 \x03(\x0b\x32\x0f.agent.FileInfo\"D\n\x08\x46ileInfo\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\x0c\n\x04size\x18\x02 \x01(\x03\x12\x0c\n\x04hash\x18\x03 \x01(\t\x12\x0e\n\x06is_dir\x18\x04 \x01(\x08\"_\n\x0b\x46ilePayload\x12\x0c\n\x04path\x18\x01 \x01(\t\x12\r\n\x05\x63hunk\x18\x02 \x01(\x0c\x12\x13\n\x0b\x63hunk_index\x18\x03 \x01(\x05\x12\x10\n\x08is_final\x18\x04 \x01(\x08\x12\x0c\n\x04hash\x18\x05 \x01(\t\"\xa0\x01\n\nSyncStatus\x12$\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x16.agent.SyncStatus.Code\x12\x0f\n\x07message\x18\x02 \x01(\t\x12\x17\n\x0freconcile_paths\x18\x03 \x03(\t\"B\n\x04\x43ode\x12\x06\n\x02OK\x10\x00\x12\t\n\x05\x45RROR\x10\x01\x12\x16\n\x12RECONCILE_REQUIRED\x10\x02\x12\x0f\n\x0bIN_PROGRESS\x10\x03\x32\xe9\x01\n\x11\x41gentOrchestrator\x12L\n\x11SyncConfiguration\x12\x1a.agent.RegistrationRequest\x1a\x1b.agent.RegistrationResponse\x12\x44\n\nTaskStream\x12\x18.agent.ClientTaskMessage\x1a\x18.agent.ServerTaskMessage(\x01\x30\x01\x12@\n\x0cReportHealth\x12\x10.agent.Heartbeat\x1a\x1a.agent.HealthCheckResponse(\x01\x30\x01\x62\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'protos.agent_pb2', _globals) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._options = None + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_options = b'8\001' + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._options = None + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_options = b'8\001' + _globals['_REGISTRATIONREQUEST']._serialized_start=30 + _globals['_REGISTRATIONREQUEST']._serialized_end=252 + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_start=201 + _globals['_REGISTRATIONREQUEST_CAPABILITIESENTRY']._serialized_end=252 + _globals['_SANDBOXPOLICY']._serialized_start=255 + _globals['_SANDBOXPOLICY']._serialized_end=452 + _globals['_SANDBOXPOLICY_MODE']._serialized_start=418 + _globals['_SANDBOXPOLICY_MODE']._serialized_end=452 + _globals['_REGISTRATIONRESPONSE']._serialized_start=454 + _globals['_REGISTRATIONRESPONSE']._serialized_end=574 + _globals['_CLIENTTASKMESSAGE']._serialized_start=577 + _globals['_CLIENTTASKMESSAGE']._serialized_end=874 + _globals['_SKILLEVENT']._serialized_start=876 + _globals['_SKILLEVENT']._serialized_end=997 + _globals['_NODEANNOUNCE']._serialized_start=999 + _globals['_NODEANNOUNCE']._serialized_end=1030 + _globals['_BROWSEREVENT']._serialized_start=1033 + _globals['_BROWSEREVENT']._serialized_end=1168 + _globals['_SERVERTASKMESSAGE']._serialized_start=1171 + _globals['_SERVERTASKMESSAGE']._serialized_end=1487 + _globals['_TASKCANCELREQUEST']._serialized_start=1489 + _globals['_TASKCANCELREQUEST']._serialized_end=1525 + _globals['_TASKREQUEST']._serialized_start=1528 + _globals['_TASKREQUEST']._serialized_end=1737 + _globals['_BROWSERACTION']._serialized_start=1740 + _globals['_BROWSERACTION']._serialized_end=2028 + _globals['_BROWSERACTION_ACTIONTYPE']._serialized_start=1894 + _globals['_BROWSERACTION_ACTIONTYPE']._serialized_end=2028 + _globals['_TASKRESPONSE']._serialized_start=2031 + _globals['_TASKRESPONSE']._serialized_end=2383 + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_start=2263 + _globals['_TASKRESPONSE_ARTIFACTSENTRY']._serialized_end=2311 + _globals['_TASKRESPONSE_STATUS']._serialized_start=2313 + _globals['_TASKRESPONSE_STATUS']._serialized_end=2373 + _globals['_BROWSERRESPONSE']._serialized_start=2386 + _globals['_BROWSERRESPONSE']._serialized_end=2606 + _globals['_CONSOLEMESSAGE']._serialized_start=2608 + _globals['_CONSOLEMESSAGE']._serialized_end=2675 + _globals['_NETWORKREQUEST']._serialized_start=2677 + _globals['_NETWORKREQUEST']._serialized_end=2781 + _globals['_WORKPOOLUPDATE']._serialized_start=2783 + _globals['_WORKPOOLUPDATE']._serialized_end=2827 + _globals['_TASKCLAIMREQUEST']._serialized_start=2829 + _globals['_TASKCLAIMREQUEST']._serialized_end=2881 + _globals['_TASKCLAIMRESPONSE']._serialized_start=2883 + _globals['_TASKCLAIMRESPONSE']._serialized_end=2952 + _globals['_HEARTBEAT']._serialized_start=2955 + _globals['_HEARTBEAT']._serialized_end=3313 + _globals['_HEALTHCHECKRESPONSE']._serialized_start=3315 + _globals['_HEALTHCHECKRESPONSE']._serialized_end=3360 + _globals['_FILESYNCMESSAGE']._serialized_start=3363 + _globals['_FILESYNCMESSAGE']._serialized_end=3591 + _globals['_SYNCCONTROL']._serialized_start=3594 + _globals['_SYNCCONTROL']._serialized_end=3893 + _globals['_SYNCCONTROL_ACTION']._serialized_start=3723 + _globals['_SYNCCONTROL_ACTION']._serialized_end=3893 + _globals['_DIRECTORYMANIFEST']._serialized_start=3895 + _globals['_DIRECTORYMANIFEST']._serialized_end=3965 + _globals['_FILEINFO']._serialized_start=3967 + _globals['_FILEINFO']._serialized_end=4035 + _globals['_FILEPAYLOAD']._serialized_start=4037 + _globals['_FILEPAYLOAD']._serialized_end=4132 + _globals['_SYNCSTATUS']._serialized_start=4135 + _globals['_SYNCSTATUS']._serialized_end=4295 + _globals['_SYNCSTATUS_CODE']._serialized_start=4229 + _globals['_SYNCSTATUS_CODE']._serialized_end=4295 + _globals['_AGENTORCHESTRATOR']._serialized_start=4298 + _globals['_AGENTORCHESTRATOR']._serialized_end=4531 +# @@protoc_insertion_point(module_scope) diff --git a/agent-node/src/protos/agent_pb2_grpc.py b/agent-node/src/protos/agent_pb2_grpc.py new file mode 100644 index 0000000..f551b0b --- /dev/null +++ b/agent-node/src/protos/agent_pb2_grpc.py @@ -0,0 +1,138 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +from protos import agent_pb2 as protos_dot_agent__pb2 + + +class AgentOrchestratorStub(object): + """The Cortex Server exposes this service + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.SyncConfiguration = channel.unary_unary( + '/agent.AgentOrchestrator/SyncConfiguration', + request_serializer=protos_dot_agent__pb2.RegistrationRequest.SerializeToString, + response_deserializer=protos_dot_agent__pb2.RegistrationResponse.FromString, + ) + self.TaskStream = channel.stream_stream( + '/agent.AgentOrchestrator/TaskStream', + request_serializer=protos_dot_agent__pb2.ClientTaskMessage.SerializeToString, + response_deserializer=protos_dot_agent__pb2.ServerTaskMessage.FromString, + ) + self.ReportHealth = channel.stream_stream( + '/agent.AgentOrchestrator/ReportHealth', + request_serializer=protos_dot_agent__pb2.Heartbeat.SerializeToString, + response_deserializer=protos_dot_agent__pb2.HealthCheckResponse.FromString, + ) + + +class AgentOrchestratorServicer(object): + """The Cortex Server exposes this service + """ + + def SyncConfiguration(self, request, context): + """1. Control Channel: Sync policies and settings (Unary) + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def TaskStream(self, request_iterator, context): + """2. Task Channel: Bidirectional work dispatch and reporting (Persistent) + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def ReportHealth(self, request_iterator, context): + """3. Health Channel: Dedicated Ping-Pong / Heartbeat (Persistent) + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_AgentOrchestratorServicer_to_server(servicer, server): + rpc_method_handlers = { + 'SyncConfiguration': grpc.unary_unary_rpc_method_handler( + servicer.SyncConfiguration, + request_deserializer=protos_dot_agent__pb2.RegistrationRequest.FromString, + response_serializer=protos_dot_agent__pb2.RegistrationResponse.SerializeToString, + ), + 'TaskStream': grpc.stream_stream_rpc_method_handler( + servicer.TaskStream, + request_deserializer=protos_dot_agent__pb2.ClientTaskMessage.FromString, + response_serializer=protos_dot_agent__pb2.ServerTaskMessage.SerializeToString, + ), + 'ReportHealth': grpc.stream_stream_rpc_method_handler( + servicer.ReportHealth, + request_deserializer=protos_dot_agent__pb2.Heartbeat.FromString, + response_serializer=protos_dot_agent__pb2.HealthCheckResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'agent.AgentOrchestrator', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class AgentOrchestrator(object): + """The Cortex Server exposes this service + """ + + @staticmethod + def SyncConfiguration(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/agent.AgentOrchestrator/SyncConfiguration', + protos_dot_agent__pb2.RegistrationRequest.SerializeToString, + protos_dot_agent__pb2.RegistrationResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def TaskStream(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_stream(request_iterator, target, '/agent.AgentOrchestrator/TaskStream', + protos_dot_agent__pb2.ClientTaskMessage.SerializeToString, + protos_dot_agent__pb2.ServerTaskMessage.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def ReportHealth(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_stream(request_iterator, target, '/agent.AgentOrchestrator/ReportHealth', + protos_dot_agent__pb2.Heartbeat.SerializeToString, + protos_dot_agent__pb2.HealthCheckResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/agent-node/src/shared_core/__init__.py b/agent-node/src/shared_core/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/agent-node/src/shared_core/__init__.py diff --git a/agent-node/src/shared_core/ignore.py b/agent-node/src/shared_core/ignore.py new file mode 100644 index 0000000..c3f0cb5 --- /dev/null +++ b/agent-node/src/shared_core/ignore.py @@ -0,0 +1,38 @@ +import os +import fnmatch + +class CortexIgnore: + """Handles .cortexignore (and .gitignore) pattern matching.""" + def __init__(self, root_path): + self.root_path = root_path + self.patterns = self._load_patterns() + + def _load_patterns(self): + patterns = [".git", "node_modules", ".cortex_sync", "__pycache__", "*.pyc"] # Default ignores + ignore_file = os.path.join(self.root_path, ".cortexignore") + if not os.path.exists(ignore_file): + ignore_file = os.path.join(self.root_path, ".gitignore") + + if os.path.exists(ignore_file): + with open(ignore_file, "r") as f: + for line in f: + line = line.strip() + if line and not line.startswith("#"): + patterns.append(line) + return patterns + + def is_ignored(self, rel_path): + """Returns True if the path matches any ignore pattern.""" + for pattern in self.patterns: + # Handle directory patterns + if pattern.endswith("/"): + if rel_path.startswith(pattern) or f"/{pattern}" in f"/{rel_path}": + return True + # Standard glob matching + if fnmatch.fnmatch(rel_path, pattern) or fnmatch.fnmatch(os.path.basename(rel_path), pattern): + return True + # Handle nested matches + for part in rel_path.split(os.sep): + if fnmatch.fnmatch(part, pattern): + return True + return False diff --git a/ai-hub/app/api/routes/nodes.py b/ai-hub/app/api/routes/nodes.py index 2893d02..9cb344c 100644 --- a/ai-hub/app/api/routes/nodes.py +++ b/ai-hub/app/api/routes/nodes.py @@ -434,26 +434,34 @@ README_CONTENT = """# Cortex Agent Node -This bundle contains the Cortex Agent Node, a modular software that connects to your Cortex Hub. +This bundle contains the Cortex Agent Node, a modular software that connects your physical computing resources to the Cortex Hub. + +## Structure +- `bootstrap_installer.py`: The daemon & update installer +- `src/`: Core Python modules (`agent_node`, `protos`, `shared_core`) +- `run.sh` / `run.bat`: Simple execution wrappers ## Running the Node -### Linux & macOS -1. Open your terminal in this directory. -2. Make the runner script executable: `chmod +x run.sh` -3. Run: `./run.sh` +### Fast/Production (macOS & Linux) +To run the node cleanly in the background as a daemon (survives system restarts): +1. Open a terminal in this directory. +2. Run: `python3 bootstrap_installer.py --daemon` -> **Note:** Want to run this automatically in the background on startup? -> Run `python3 bootstrap_installer.py --daemon` instead. +That's it! You can safely close your terminal. + +### Debug / Foreground (macOS & Linux) +1. Open a terminal in this directory. +2. Make the runner executable: `chmod +x run.sh` +3. Run: `./run.sh` ### Windows 1. Double-click `run.bat`. -The scripts will automatically detect if the node is provided as a binary executable or source code, and handle the environment setup accordingly. +The scripts perfectly set up the python virtual environment. ## Configuration - -The `agent_config.yaml` file has been pre-configured with your node's identity and security tokens. Do not share this file. +The `agent_config.yaml` file natively holds your node's identity and secrets. Do not share it. """ RUN_SH_CONTENT = """#!/bin/bash @@ -475,7 +483,7 @@ fi # 2. Source Code Fallback -if [ -d "./agent_node" ]; then +if [ -d "./src/agent_node" ]; then echo "[*] Source code detected. Setting up Python environment..." if ! command -v python3 &> /dev/null; then echo "❌ Error: python3 not found. Please install Python 3.10+." @@ -497,9 +505,9 @@ echo "✅ Environment ready. Booting node..." echo "💡 Tip: To install as a persistent background service (survives reboots), run: python3 bootstrap_installer.py --daemon" - python3 -m agent_node.main + python3 src/agent_node/main.py else - echo "❌ Error: No executable ('agent-node') or source code ('agent_node/') found in this bundle." + echo "❌ Error: No executable ('agent-node') or source code ('src/agent_node/') found in this bundle." exit 1 fi """ @@ -515,7 +523,7 @@ exit /b %errorlevel% ) -if exist agent_node ( +if exist src\\agent_node ( echo [*] Source code detected. Checking environment... python --version >nul 2>&1 if %errorlevel% neq 0 ( @@ -534,9 +542,9 @@ pip install -r requirements.txt --quiet ) echo ✅ Environment ready. Booting node... - python -m agent_node.main + python src\\agent_node\\main.py ) else ( - echo ❌ Error: No executable ('agent-node.exe') or source code ('agent_node/') found. + echo ❌ Error: No executable ('agent-node.exe') or source code ('src\\agent_node\\') found. pause exit /b 1 ) diff --git a/docs/features/agent_node_mesh.md b/docs/features/agent_node_mesh.md index 56ae3e3..62d5a0d 100644 --- a/docs/features/agent_node_mesh.md +++ b/docs/features/agent_node_mesh.md @@ -131,7 +131,7 @@ # Hub cd /app/ai-hub/app/protos && python3 -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. agent.proto # Agent - cd /app/agent-node && python3 -m grpc_tools.protoc -Iprotos --python_out=. --grpc_python_out=. protos/agent.proto + cd /app/agent-node && python3 -m grpc_tools.protoc -Isrc/protos --python_out=src --grpc_python_out=src src/protos/agent.proto ``` - **Step**: Commit the `.proto` changes AND the newly generated `_pb2.py` files. - **Action**: Push to production: