diff --git a/agent-node/.dockerignore b/agent-node/.dockerignore new file mode 100644 index 0000000..275819c --- /dev/null +++ b/agent-node/.dockerignore @@ -0,0 +1,12 @@ +**/__pycache__ +**/.DS_Store +**/._* +**/.git +**/.venv +**/venv +*.pyc +*.pyo +.DS_Store +dist/ +build/ +*.egg-info diff --git a/agent-node/VERSION b/agent-node/VERSION index 9084fa2..65087b4 100644 --- a/agent-node/VERSION +++ b/agent-node/VERSION @@ -1 +1 @@ -1.1.0 +1.1.4 diff --git a/agent-node/bootstrap_installer.py b/agent-node/bootstrap_installer.py index c3b5691..fce96f8 100644 --- a/agent-node/bootstrap_installer.py +++ b/agent-node/bootstrap_installer.py @@ -17,6 +17,12 @@ import os import sys + +# M6: Force UTF-8 for stdout/stderr on Windows to prevent GBK crashes with emojis +if os.name == 'nt': + import io + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') + sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') import json import shutil import tarfile @@ -125,11 +131,15 @@ except: continue + # Force disable --user if inside a virtualenv + is_venv = (sys.prefix != sys.base_prefix) + user_flag = ["--user"] if not is_venv else [] + if not pip_cmd: _print("pip not found as module or command. Attempting to bootstrap pip via ensurepip...") try: - # Try ensurepip with --user to avoid permission errors on system paths - subprocess.check_call([sys.executable, "-m", "ensurepip", "--user", "--default-pip"], stdout=subprocess.DEVNULL) + # Try ensurepip + subprocess.check_call([sys.executable, "-m", "ensurepip"] + user_flag + ["--default-pip"], stdout=subprocess.DEVNULL) pip_cmd = [sys.executable, "-m", "pip"] _print("pip bootstrapped successfully via ensurepip.") except Exception as e: @@ -137,9 +147,10 @@ _print("Attempting to download get-pip.py as last resort...") try: get_pip_url = "https://bootstrap.pypa.io/get-pip.py" + import tempfile tmp_pip = os.path.join(tempfile.gettempdir(), "get-pip.py") urllib.request.urlretrieve(get_pip_url, tmp_pip) - subprocess.check_call([sys.executable, tmp_pip, "--user"], stdout=subprocess.DEVNULL) + subprocess.check_call([sys.executable, tmp_pip] + user_flag, stdout=subprocess.DEVNULL) pip_cmd = [sys.executable, "-m", "pip"] _print("pip installed successfully via get-pip.py.") except Exception as e2: @@ -153,8 +164,8 @@ _print(f"Installing Python dependencies using {' '.join(pip_cmd)} ...") try: - # Use --user and --ignore-installed for maximum resilience on restricted environments like NAS - args = pip_cmd + ["install", "-r", req_file, "--quiet", "--ignore-installed", "--user"] + # Use --ignore-installed for maximum resilience on restricted environments like NAS + args = pip_cmd + ["install", "-r", req_file, "--quiet", "--ignore-installed"] + user_flag subprocess.check_call(args, cwd=install_dir) _print("Dependencies installed successfully.") except Exception as e: diff --git a/agent-node/bootstrap_windows.ps1 b/agent-node/bootstrap_windows.ps1 new file mode 100644 index 0000000..52547c6 --- /dev/null +++ b/agent-node/bootstrap_windows.ps1 @@ -0,0 +1,139 @@ +param ( + [string]$NodeId = "", + [string]$AuthToken = "", + [string]$HubUrl = "", + [string]$GrpcUrl = "" +) + +$ErrorActionPreference = "Stop" + +Write-Host "==========================================" -ForegroundColor Cyan +Write-Host " CORTEX AGENT WINDOWS BOOTSTRAP " -ForegroundColor Cyan +Write-Host "==========================================" -ForegroundColor Cyan + +# 1. Check Python installation (defensively avoid Microsoft Store alias) +$pythonValid = $false +try { + $out = python --version 2>&1 + if ($out -like "*Python *") { $pythonValid = $true } +} catch { } + +if (!$pythonValid) { + Write-Host "[!] Python not found or invalid. Installing via winget..." -ForegroundColor Yellow + winget install -e --id Python.Python.3.12 --accept-package-agreements --accept-source-agreements + # Refresh PATH + $env:Path = [System.Environment]::GetEnvironmentVariable("Path","Machine") + ";" + [System.Environment]::GetEnvironmentVariable("Path","User") +} + +# 2. Verify Python version +$pyVer = python --version +Write-Host "[*] Found $pyVer" + +# 3. Create working directory +$workDir = "C:\CortexAgent" +if (!(Test-Path $workDir)) { + New-Item -ItemType Directory -Path $workDir +} +Set-Location $workDir + +# 4. Download agent code from Hub (Matching Linux pattern) +if ((-not $HubUrl) -or (-not $AuthToken)) { + Write-Host "[!] Hub details missing. Will prompt for them later after basic setup." -ForegroundColor Yellow +} else { + Write-Host "[*] Fetching agent source from Hub..." -ForegroundColor Cyan + $baseUrl = $HubUrl.Split(":")[0] + if ($HubUrl.Contains("http")) { + $downloadUrl = "$HubUrl/api/v1/agent/download" + } else { + $downloadUrl = "http://$baseUrl:8002/api/v1/agent/download" + } + + $tarPath = Join-Path $workDir "agent.tar.gz" + $headers = @{"X-Agent-Token" = $AuthToken} + + try { + Invoke-WebRequest -Uri $downloadUrl -Headers $headers -OutFile $tarPath + Write-Host "[+] Download complete. Extracting..." -ForegroundColor Green + + # Windows 10+ has tar.exe built-in. Fallback to Expand-Archive if needed. + if (Get-Command tar -ErrorAction SilentlyContinue) { + tar -xzf $tarPath --strip-components=1 + } else { + Write-Warning "tar.exe not found. Attempting Expand-Archive (may not support tar.gz natively without 7-Zip/etc)." + # Note: PowerShell's Expand-Archive usually only likes .zip. + # We recommend users have tar or we provide a zip endpoint. + } + Remove-Item $tarPath + } catch { + Write-Warning "Failed to download code directly: $_" + Write-Host "[!] Please ensure agent-node source is manually placed at $workDir" -ForegroundColor Yellow + } +} + +# 5. Setup Virtual Environment +Write-Host "[*] Creating Virtual Environment..." +python -m venv venv +.\venv\Scripts\Activate.ps1 + +# 6. Install Dependencies +Write-Host "[*] Installing Dependencies..." +python -m pip install --upgrade pip +python -m pip install -r requirements.txt +python -m pip install pywinpty pypiwin32 # Windows-specific requirements + +# 7. Environment Setup +Write-Host "------------------------------------------" +Write-Host " Enter Agent Configuration Details:" + +if (-not $NodeId) { $NodeId = Read-Host " AGENT_NODE_ID (e.g. pc-node-1)" } +if (-not $AuthToken) { $AuthToken = Read-Host " AGENT_AUTH_TOKEN" } +if (-not $HubUrl) { $HubUrl = Read-Host " HUB_URL (e.g. http://192.168.68.140:8002)" } +if (-not $GrpcUrl) { $GrpcUrl = Read-Host " GRPC_ENDPOINT (e.g. 192.168.68.140:50051)" } + +$envFile = @" +AGENT_NODE_ID=$NodeId +AGENT_AUTH_TOKEN=$AuthToken +AGENT_HUB_URL=$HubUrl +GRPC_ENDPOINT=$GrpcUrl +AGENT_TLS_ENABLED=false +PYTHONUTF8=1 +"@ +$envFile | Out-File -FilePath ".env" -Encoding ascii + +# 8. Test Execution +Write-Host "[*] Bootstrap complete. You can now run the agent with:" -ForegroundColor Green +Write-Host " .\venv\Scripts\python.exe src\agent_node\main.py" -ForegroundColor Yellow + +# 9. Optional Service Registration +$installService = Read-Host "Would you like to register this as a startup task? (y/n)" +if ($installService -eq "y") { + Write-Host "[*] Registering Scheduled Task..." + python install_service.py --name "CortexAgent" --run +} + +# 10. Security & Firewall Check +Write-Host "------------------------------------------" +Write-Host "[*] Checking Windows Firewall status..." -ForegroundColor Cyan +$profiles = Get-NetFirewallProfile +$disabled = $profiles | Where-Object { $_.Enabled -eq "False" } + +if ($disabled) { + Write-Host "[!] Warning: One or more Firewall profiles are DISABLED." -ForegroundColor Yellow + $enable = Read-Host "Would you like to ENABLE the Windows Firewall now? (y/n)" + if ($enable -eq "y") { + Set-NetFirewallProfile -Profile Domain,Public,Private -Enabled True + Write-Host "[+] Windows Firewall enabled." -ForegroundColor Green + } +} + +# Add rule for Python communication +Write-Host "[*] Adding firewall exception for agent communication..." +$pythonPath = (Get-Command python).Source +if ($pythonPath) { + New-NetFirewallRule -DisplayName "Cortex Agent Communication" -Direction Outbound -Program $pythonPath -Action Allow -Description "Allows Cortex Agent to reach the Hub" -ErrorAction SilentlyContinue + New-NetFirewallRule -DisplayName "Cortex Agent Communication" -Direction Inbound -Program $pythonPath -Action Allow -Description "Allows Cortex Agent Mesh Communication" -Profile Any -ErrorAction SilentlyContinue +} + +Write-Host "==========================================" +Write-Host " DONE! Check the Hub for node status. " -ForegroundColor Cyan +Write-Host "==========================================" diff --git a/agent-node/install_service.py b/agent-node/install_service.py index 970872b..43b13f6 100755 --- a/agent-node/install_service.py +++ b/agent-node/install_service.py @@ -3,7 +3,7 @@ Cortex Agent Node - Daemon Installer ==================================== Configures the Cortex Agent to run automatically as a background daemon -on macOS (launchd) or Linux (systemd). +using platform-specific abstractions. Usage: python3 install_service.py @@ -12,219 +12,39 @@ import os import sys import platform -import subprocess +from agent_node.utils.service_manager import get_service_manager def get_python_path(): return sys.executable def get_agent_main_path(): + # Use absolute path to the main.py entry point return os.path.abspath(os.path.join(os.path.dirname(__file__), "src", "agent_node", "main.py")) def get_working_dir(): return os.path.abspath(os.path.dirname(__file__)) -def install_mac_launchd(): - print("Installing macOS launchd service...") - - plist_content = f""" - - - - Label - com.jerxie.cortex.agent - ProgramArguments - - {get_python_path()} - {get_agent_main_path()} - - WorkingDirectory - {get_working_dir()} - KeepAlive - - RunAtLoad - - StandardErrorPath - {os.path.expanduser("~")}/.cortex/agent.err.log - StandardOutPath - {os.path.expanduser("~")}/.cortex/agent.out.log - EnvironmentVariables - - GRPC_ENABLE_FORK_SUPPORT - 0 - PATH - /usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin - - - -""" - agents_dir = os.path.expanduser("~/Library/LaunchAgents") - os.makedirs(agents_dir, exist_ok=True) - os.makedirs(os.path.expanduser("~/.cortex"), exist_ok=True) - - plist_path = os.path.join(agents_dir, "com.jerxie.cortex.agent.plist") - - with open(plist_path, "w") as f: - f.write(plist_content) - - print(f"Created plist at {plist_path}") - - try: - # Unload if exists - subprocess.run(["launchctl", "unload", plist_path], capture_output=True) - # Load new - subprocess.run(["launchctl", "load", plist_path], check=True) - print("✅ macOS Daemon successfully started!") - print(f"Logs: ~/.cortex/agent.out.log and ~/.cortex/agent.err.log") - print("Commands:") - print(f" Stop: launchctl unload {plist_path}") - print(f" Start: launchctl load {plist_path}") - except subprocess.CalledProcessError as e: - print(f"❌ Failed to load launchd service: {e}") - -def _is_systemd_available(): - try: - # Check if systemd is running (PID 1) - with open("/proc/1/comm", "r") as f: - if "systemd" in f.read(): - # Even if systemd is PID 1, --user might fail if no session D-Bus - r = subprocess.run(["systemctl", "--user", "list-units"], capture_output=True) - return r.returncode == 0 - except: - pass - return False - -def install_linux_background_loop(): - print("Systemd not available or refusing connection. Falling back to background loop (nohup)...") - - # Create a small control script to manage the background process - ctl_script = os.path.join(get_working_dir(), "cortex-ctl") - - script_content = f"""#!/bin/sh -# Cortex Agent Control Script (Background-loop mode) -PIDFILE="{get_working_dir()}/agent.pid" -LOGFILE="{os.path.expanduser("~")}/.cortex/agent.out.log" - -case "$1" in - start) - if [ -f "$PIDFILE" ] && kill -0 $(cat "$PIDFILE") 2>/dev/null; then - echo "Agent is already running (PID $(cat $PIDFILE))" - exit 0 - fi - echo "Starting Cortex Agent..." - mkdir -p "$(dirname "$LOGFILE")" - cd "{get_working_dir()}" - export GRPC_ENABLE_FORK_SUPPORT=0 - nohup {get_python_path()} {get_agent_main_path()} >> "$LOGFILE" 2>&1 & - echo $! > "$PIDFILE" - echo "Agent started (PID $!)" - ;; - stop) - if [ -f "$PIDFILE" ]; then - echo "Stopping Cortex Agent (PID $(cat $PIDFILE))..." - kill $(cat "$PIDFILE") - rm "$PIDFILE" - else - echo "Agent is not running" - fi - ;; - status) - if [ -f "$PIDFILE" ] && kill -0 $(cat "$PIDFILE") 2>/dev/null; then - echo "Agent is RUNNING (PID $(cat $PIDFILE))" - else - echo "Agent is STOPPED" - fi - ;; - logs) - tail -f "$LOGFILE" - ;; - *) - echo "Usage: $0 {{start|stop|status|logs}}" - exit 1 -esac -""" - with open(ctl_script, "w") as f: - f.write(script_content) - os.chmod(ctl_script, 0o755) - - # Start it - try: - subprocess.run([ctl_script, "start"], check=True) - print("✅ Linux Background Service successfully started!") - print(f"Management script created at: {ctl_script}") - print(f"Use '{ctl_script} status' to check.") - except Exception as e: - print(f"❌ Failed to start background loop: {e}") - -def install_linux_systemd(): - if not _is_systemd_available(): - install_linux_background_loop() - return - - print("Installing Linux systemd user service...") - - service_content = f"""[Unit] -Description=Cortex Agent Node -After=network.target - -[Service] -Type=simple -ExecStart={get_python_path()} {get_agent_main_path()} -WorkingDirectory={get_working_dir()} -Restart=always -RestartSec=5 -Environment=GRPC_ENABLE_FORK_SUPPORT=0 -StandardOutput=append:{os.path.expanduser("~")}/.cortex/agent.out.log -StandardError=append:{os.path.expanduser("~")}/.cortex/agent.err.log - -[Install] -WantedBy=default.target -""" - - systemd_dir = os.path.expanduser("~/.config/systemd/user") - os.makedirs(systemd_dir, exist_ok=True) - os.makedirs(os.path.expanduser("~/.cortex"), exist_ok=True) - - service_path = os.path.join(systemd_dir, "cortex-agent.service") - - with open(service_path, "w") as f: - f.write(service_content) - - print(f"Created systemd service at {service_path}") - - try: - subprocess.run(["systemctl", "--user", "daemon-reload"], check=True) - subprocess.run(["systemctl", "--user", "enable", "cortex-agent"], check=True) - subprocess.run(["systemctl", "--user", "restart", "cortex-agent"], check=True) - - # Ensure user services run even when not logged in - subprocess.run(["loginctl", "enable-linger", os.environ.get("USER", "root")], capture_output=True) - - print("✅ Linux Daemon successfully started!") - print(f"Logs: ~/.cortex/agent.out.log and ~/.cortex/agent.err.log") - print("Commands:") - print(" Status: systemctl --user status cortex-agent") - print(" Stop: systemctl --user stop cortex-agent") - print(" Start: systemctl --user start cortex-agent") - print(" Logs: journalctl --user -u cortex-agent -f") - except subprocess.CalledProcessError as e: - print(f"❌ Failed to configure systemd service: {e}") - # Final fallback - install_linux_background_loop() - def main(): - if not os.path.exists(get_agent_main_path()): - print(f"❌ Error: Could not find main agent script at {get_agent_main_path()}") + agent_path = get_agent_main_path() + if not os.path.exists(agent_path): + print(f"❌ Error: Could not find main agent script at {agent_path}") sys.exit(1) - system = platform.system().lower() + print(f"[*] Detected Platform: {platform.system()}") + manager = get_service_manager() - if system == "darwin": - install_mac_launchd() - elif system == "linux": - install_linux_systemd() + python_exe = get_python_path() + working_dir = get_working_dir() + + print(f"[*] Installing service: {python_exe} {agent_path}") + if manager.install(python_exe, agent_path, working_dir): + print("✅ Agent service/daemon successfully installed and started!") else: - print(f"❌ Unsupported OS for automated daemon install: {system}") - print("Please configure your system service manager manually.") + print("❌ Failed to install service automatically.") + if platform.system() != "Windows": + print("Try running with higher privileges or check system logs.") + else: + print("Please ensure you are running as an Administrator.") if __name__ == "__main__": main() diff --git a/agent-node/src/agent_node/config.py b/agent-node/src/agent_node/config.py index c355717..47b37d7 100644 --- a/agent-node/src/agent_node/config.py +++ b/agent-node/src/agent_node/config.py @@ -17,7 +17,7 @@ "hub_url": "http://127.0.0.1:8000", "grpc_endpoint": "127.0.0.1:50051", "auth_token": os.getenv("AGENT_AUTH_TOKEN", "cortex-secret-shared-key"), - "sync_root": "/tmp/cortex-sync", + "sync_root": "/tmp/cortex-sync" if platform.system() != "Windows" else "C:\\tmp\\cortex-sync", "tls": True, "max_skill_workers": 10, "health_report_interval": 10, @@ -61,7 +61,7 @@ if active_path: try: - with open(active_path, 'r') as f: + with open(active_path, 'r', encoding='utf-8') as f: yaml_data = yaml.safe_load(f) or {} # Check if it's nested (e.g. cortex: node: id: ...) diff --git a/agent-node/src/agent_node/core/sandbox.py b/agent-node/src/agent_node/core/sandbox.py index 33b2cf0..7a1d218 100644 --- a/agent-node/src/agent_node/core/sandbox.py +++ b/agent-node/src/agent_node/core/sandbox.py @@ -23,7 +23,11 @@ parts = (command_str or "").strip().split() if not parts: return False, "Empty" - base_cmd = parts[0] + # M6 FIX: Strip trailing shell operators (;, &, |, etc) from the base command + # This prevents "whoami;" from being blocked if "whoami" is whitelisted. + import re + base_cmd = re.sub(r'[;&|]+$', '', parts[0]) + if base_cmd in self.policy["DENIED"]: return False, f"Forbidden command: {base_cmd}" diff --git a/agent-node/src/agent_node/main.py b/agent-node/src/agent_node/main.py index c775a7c..0e891c0 100644 --- a/agent-node/src/agent_node/main.py +++ b/agent-node/src/agent_node/main.py @@ -13,6 +13,12 @@ if sys.argv and sys.argv[0] and not os.path.isabs(sys.argv[0]): sys.argv[0] = os.path.abspath(sys.argv[0]) +# M6: Force UTF-8 for stdout/stderr on Windows to prevent GBK crashes with emojis +if os.name == 'nt': + import io + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') + sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') + # Add root and protos to path _root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) sys.path.insert(0, _root) @@ -176,27 +182,27 @@ # 0. Singleton Enforcement: Murder siblings before booting try: import psutil - enforce_singleton() - except ImportError: - print("[!] psutil not installed — skipping singleton enforcement. Beware of orphaned processes!") + if os.name != 'nt': + print("[*] Running singleton check...") + enforce_singleton() + else: + print("[*] Skipping singleton check on Windows...") except Exception as e: print(f"[!] Singleton check failed: {e}") + traceback.print_exc() print(f"[*] Starting Agent Node: {NODE_ID}...") - # 0. Auto-Update Check (before anything else — if we're behind, restart now) - # This uses only standard libraries, making it extremely resilient. + # 0. Auto-Update Check if AUTO_UPDATE: try: + print("[*] Initializing auto-updater...") updater.init(hub_http_url=HUB_URL, auth_token=SECRET_KEY, check_interval_secs=UPDATE_CHECK_INTERVAL) - updater.check_and_update_once() # May restart process — does not return if update applied - - # Start background updater BEFORE initializing the node. - # This ensures that even if AgentNode() crashes (e.g. missing grpcio), - # the node can still self-repair if a fix is pushed to the hub. + updater.check_and_update_once() # May restart process updater.start_background_updater() except Exception as e: - print(f"[!] Updater initialization failed: {e}. Moving on to agent boot...") + print(f"[!] Updater initialization failed: {e}. Moving on...") + traceback.print_exc() # 1. Initialization and Main Persistence Loop while True: diff --git a/agent-node/src/agent_node/node.py b/agent-node/src/agent_node/node.py index f14787c..171c010 100644 --- a/agent-node/src/agent_node/node.py +++ b/agent-node/src/agent_node/node.py @@ -75,97 +75,31 @@ self.channel.subscribe(_on_state_change, try_to_connect=True) def _collect_capabilities(self) -> dict: - """Collect hardware metadata to advertise at registration.""" - import platform - import subprocess + """Collect hardware metadata using abstract platform metrics.""" + from agent_node.utils.platform_metrics import get_platform_metrics import socket - import os + + metrics = get_platform_metrics() + caps = metrics.collect_capabilities() - caps = { - "shell": "v1", - "arch": platform.machine(), # e.g. x86_64, arm64, aarch64 - "os": platform.system().lower(), # linux, darwin, windows - "os_release": platform.release(), - } - - - # Privilege Detection - # is_root: True if UID 0 (Linux/macOS) — no sudo needed at all - # has_sudo: True if sudo is installed AND available passwordlessly - try: - caps["is_root"] = (os.getuid() == 0) - except AttributeError: - # Windows — os.getuid() doesn't exist; approximate via admin check - try: - import ctypes - caps["is_root"] = bool(ctypes.windll.shell32.IsUserAnAdmin()) - except Exception: - caps["is_root"] = False - - if caps.get("is_root"): - caps["has_sudo"] = False # Root doesn't need sudo - else: - # Check if passwordless sudo is available - try: - r = subprocess.run( - ["sudo", "-n", "true"], - capture_output=True, timeout=3 - ) - caps["has_sudo"] = (r.returncode == 0) - except Exception: - caps["has_sudo"] = False - - # Local IP Detection (best effort) + # Shared: Local IP Detection try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.settimeout(0) - # Doesn't even have to be reachable s.connect(('10.254.254.254', 1)) caps["local_ip"] = s.getsockname()[0] s.close() except Exception: caps["local_ip"] = "unknown" - # GPU Detection — try nvidia-smi first, then check for Apple GPU - try: - result = subprocess.run( - ["nvidia-smi", "--query-gpu=name,memory.total", "--format=csv,noheader,nounits"], - capture_output=True, text=True, timeout=5 - ) - if result.returncode == 0 and result.stdout.strip(): - gpu_lines = result.stdout.strip().split("\n") - caps["gpu"] = gpu_lines[0].strip() # e.g. "NVIDIA GeForce RTX 3080, 10240" - caps["gpu_count"] = str(len(gpu_lines)) - else: - caps["gpu"] = "none" - except Exception: - # No nvidia-smi — check if Apple Silicon (arm64 + darwin) - if caps["os"] == "darwin" and "arm" in caps["arch"].lower(): - caps["gpu"] = "apple-silicon" - else: - caps["gpu"] = "none" - - # Display Detection — can this node show a real browser window? - # macOS and Windows always have a graphical display. - # Linux needs DISPLAY (X11) or WAYLAND_DISPLAY set. - _os = caps.get("os", "") - if _os in ("darwin", "windows"): - caps["has_display"] = True - else: - has_x11 = bool(os.environ.get("DISPLAY", "").strip()) - has_wayland = bool(os.environ.get("WAYLAND_DISPLAY", "").strip()) - caps["has_display"] = has_x11 or has_wayland - return caps def sync_configuration(self): """Initial handshake to retrieve policy and metadata.""" while True: - # Reload configuration from disk dynamically before each attempt config.reload() self.node_id = config.NODE_ID self.skills.max_workers = config.MAX_SKILL_WORKERS - # Ensure stub is fresh if we re-enter from a crash if not self.stub: self._refresh_stub() @@ -173,7 +107,6 @@ caps = self._collect_capabilities() print(f"[*] Capabilities: {caps}") - # Protobuf capabilities is map — all values must be strings caps_str = {k: str(v).lower() if isinstance(v, bool) else str(v) for k, v in caps.items()} reg_req = agent_pb2.RegistrationRequest( @@ -190,7 +123,6 @@ self.sandbox.sync(res.policy) print("[OK] [gRPC-Handshake] Handshake successful. Sandbox Policy Synced.") - # Apply initial skill config if res.policy.skill_config_json: try: cfg = json.loads(res.policy.skill_config_json) @@ -199,7 +131,7 @@ skill.apply_config(cfg) except Exception as e: print(f"[!] Error applying initial skill config: {e}") - break # Success, exit the retry loop + break else: print(f"[!] Rejection: {res.error_message}") print("[!] Retrying handshake in 5 seconds...") @@ -221,17 +153,17 @@ return str(e) def start_health_reporting(self): - """Streaming node metrics to the orchestrator for load balancing.""" + """Streaming node metrics using abstract platform metrics.""" + from agent_node.utils.platform_metrics import get_platform_metrics + metrics_tool = get_platform_metrics() + def _report(): while not self._stop_event.is_set(): try: def _gen(): while not self._stop_event.is_set(): ids = self.skills.get_active_ids() - # Collection if psutil: - # Optimization: Use non-blocking CPU check. - # interval=None (default) prevents blocking the gRPC thread. cpu = psutil.cpu_percent(interval=None) per_core = psutil.cpu_percent(percpu=True, interval=None) vmem = psutil.virtual_memory() @@ -241,26 +173,15 @@ avail_gb = vmem.available / (1024**3) cpu_count = psutil.cpu_count() else: - cpu = 0.0 - per_core = [] - mem_percent = 0.0 - used_gb = 0.0 - total_gb = 0.0 - avail_gb = 0.0 - cpu_count = 0 - - # Freq & Load + cpu, per_core, mem_percent = 0.0, [], 0.0 + used_gb, total_gb, avail_gb, cpu_count = 0.0, 0.0, 0.0, 0 + freq = 0 if psutil: - try: - freq = psutil.cpu_freq().current - except: - pass + try: freq = psutil.cpu_freq().current + except: pass - try: - load = list(os.getloadavg()) - except: - load = [0.0, 0.0, 0.0] + load = metrics_tool.get_load_avg() yield agent_pb2.Heartbeat( node_id=self.node_id, @@ -272,7 +193,6 @@ cpu_count=cpu_count, memory_used_gb=used_gb, memory_total_gb=total_gb, - # M6 Fields cpu_usage_per_core=per_core, cpu_freq_mhz=freq, memory_available_gb=avail_gb, @@ -280,17 +200,13 @@ ) time.sleep(max(0, config.HEALTH_REPORT_INTERVAL - 1.0)) - # Consume the heartbeat stream to keep it alive - # Consume the heartbeat stream to keep it alive for response in self.stub.ReportHealth(_gen()): - # We don't strictly need the server time, but it confirms a round-trip watchdog.tick() except Exception as e: err_desc = self._format_grpc_error(e) print(f"[!] Health reporting interrupted: {err_desc}. Retrying in 5s...") time.sleep(5) - # Non-blocking thread for health heartbeat threading.Thread(target=_report, daemon=True, name=f"Health-{self.node_id}").start() def run_task_stream(self): @@ -566,7 +482,7 @@ # Calculate path relative to the actual base_sync_dir / session_dir # rel_path is the directory we are currently browsing. # entry.name is the file within it. - item_rel_path = os.path.relpath(os.path.join(watch_path, entry.name), base_dir) + item_rel_path = os.path.relpath(os.path.join(watch_path, entry.name), base_dir).replace("\\", "/") files.append(agent_pb2.FileInfo(path=item_rel_path, size=size, hash="", is_dir=is_dir)) else: @@ -575,7 +491,7 @@ for filename in filenames: abs_path = os.path.join(root, filename) # r_path must be relative to base_dir so the server correctly joins it to the mirror root - r_path = os.path.relpath(abs_path, base_dir) + r_path = os.path.relpath(abs_path, base_dir).replace("\\", "/") try: # Memory-safe hashing with metadata cache file_hash = self.sync_mgr.get_file_hash(abs_path) @@ -592,7 +508,7 @@ for d in dirs: abs_path = os.path.join(root, d) # r_path must be relative to base_dir so the server correctly joins it to the mirror root - r_path = os.path.relpath(abs_path, base_dir) + r_path = os.path.relpath(abs_path, base_dir).replace("\\", "/") files.append(agent_pb2.FileInfo(path=r_path, size=0, hash="", is_dir=True)) except Exception as e: print(f" [❌] Manifest generation failed for {rel_path}: {e}") @@ -822,7 +738,7 @@ # M6: Use dictionary unpack for safe assignment (robust against old proto versions) payload_fields = { - "path": rel_path, + "path": rel_path.replace("\\", "/"), "chunk": compressed_chunk, "chunk_index": index, "is_final": is_final, diff --git a/agent-node/src/agent_node/skills/shell_bridge.py b/agent-node/src/agent_node/skills/shell_bridge.py index 9f84ec5..d5cb278 100644 --- a/agent-node/src/agent_node/skills/shell_bridge.py +++ b/agent-node/src/agent_node/skills/shell_bridge.py @@ -1,20 +1,12 @@ -import os -import pty -import select -import threading -import time -import termios -import struct -import fcntl -import tempfile from agent_node.skills.base import BaseSkill +from agent_node.skills.terminal_backends import get_terminal_backend from protos import agent_pb2 class ShellSkill(BaseSkill): - """Admin Console Skill: Persistent stateful Bash via PTY.""" + """Admin Console Skill: Persistent stateful Shell via Abstract Terminal Backend.""" def __init__(self, sync_mgr=None): self.sync_mgr = sync_mgr - self.sessions = {} # session_id -> {fd, pid, thread, last_activity, ...} + self.sessions = {} # session_id -> {backend, thread, last_activity, ...} self.lock = threading.Lock() # Phase 3: Prompt Patterns for Edge Intelligence @@ -23,30 +15,27 @@ r">>>\s*$", # python r"\.\.\.\s*$", # python multi-line r">\s*$", # node/js + r"PS\s+.*>\s*$", # powershell ] # --- M7: Idle Session Reaper --- - # Automatically kills dormant bash processes to free up system resources. self.reaper_thread = threading.Thread(target=self._session_reaper, daemon=True, name="ShellReaper") self.reaper_thread.start() def _session_reaper(self): - """Background thread that cleans up unused PTY sessions.""" + """Background thread that cleans up unused Shell sessions.""" while True: time.sleep(60) with self.lock: now = time.time() for sid, sess in list(self.sessions.items()): - # Avoid reaping currently active tasks if sess.get("active_task"): continue - # 10 minute idle timeout if now - sess.get("last_activity", 0) > 600: print(f" [🐚🧹] Reaping idle shell session: {sid}") try: - os.close(sess["fd"]) - os.kill(sess["pid"], 9) + sess["backend"].kill() except: pass self.sessions.pop(sid, None) @@ -57,30 +46,13 @@ return self.sessions[session_id] print(f" [🐚] Initializing Persistent Shell Session: {session_id}") - # Spawn bash in a pty - pid, fd = pty.fork() - if pid == 0: # Child - # Environment prep - os.environ["TERM"] = "xterm-256color" - - # Change to CWD - if cwd and os.path.exists(cwd): - os.chdir(cwd) - - # Launch shell - shell_path = "/bin/bash" - if not os.path.exists(shell_path): - shell_path = "/bin/sh" - os.execv(shell_path, [shell_path, "--login"]) - - # Parent - # Set non-blocking - fl = fcntl.fcntl(fd, fcntl.F_GETFL) - fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) + backend = get_terminal_backend() + import os + backend.spawn(cwd=cwd, env=os.environ.copy()) + print(f" [🐚] Terminal Spawned (PID Check: {backend.is_alive()})") sess = { - "fd": fd, - "pid": pid, + "backend": backend, "last_activity": time.time(), "buffer_file": None, "tail_buffer": "", @@ -90,134 +62,155 @@ def reader(): while True: try: - r, _, _ = select.select([fd], [], [], 0.1) - if fd in r: - data = os.read(fd, 4096) - if not data: break - + data = backend.read(4096) + if not data: + if not backend.is_alive(): + break + time.sleep(0.05) + continue + + if isinstance(data, str): + decoded = data + else: decoded = data.decode("utf-8", errors="replace") - - # Streaming/Sync logic (Detect completion marker) - with self.lock: - active_tid = sess.get("active_task") - marker = sess.get("marker") - if active_tid and marker and sess.get("buffer_file"): - # Phase 2: Persistence Offloading - # Write directly to disk instead of heap memory - sess["buffer_file"].write(decoded) - sess["buffer_file"].flush() - - # Keep a tiny 4KB tail in RAM for marker detection and prompt scanning - sess["tail_buffer"] = (sess.get("tail_buffer", "") + decoded)[-4096:] - - if marker in sess["tail_buffer"]: - # Marker found! Extract exit code - try: - # The tail buffer has the marker - after_marker = sess["tail_buffer"].split(marker)[1].strip().split() - exit_code = int(after_marker[0]) if after_marker else 0 - - # Formulate final stdout summary from the disk file - bf = sess["buffer_file"] - bf.seek(0, 2) - file_len = bf.tell() - - HEAD, TAIL = 10_000, 30_000 - if file_len > HEAD + TAIL: - bf.seek(0) - head_str = bf.read(HEAD) - bf.seek(file_len - TAIL) - tail_str = bf.read() - omitted = file_len - HEAD - TAIL - pure_stdout = head_str + f"\n\n[... {omitted:,} bytes omitted (full output safely preserved at {bf.name}) ...]\n\n" + tail_str - else: - bf.seek(0) - pure_stdout = bf.read() - - # Slice off the marker string and anything after it from the final result - pure_stdout = pure_stdout.split(marker)[0] - - sess["result"]["stdout"] = pure_stdout - sess["result"]["status"] = 0 if exit_code == 0 else 1 - - # Close the file handle (leaves file on disk) - sess["buffer_file"].close() - sess["buffer_file"] = None - - sess["event"].set() - decoded = pure_stdout.split(marker)[0][-4096:] if marker in pure_stdout else pure_stdout - except Exception as e: - print(f" [🐚⚠️] Marker parsing failed: {e}") - sess["event"].set() - - # Stream terminal output back (with stealth filtering) - if on_event: - stealth_out = decoded - import re - # Remove the short source command payload from the PTY echo - if " source " in stealth_out and "ctx_" in stealth_out: - stealth_out = re.sub(r'.*source .*ctx_\d+\.sh.*[\r\n]*', '', stealth_out) + + # M7: Protocol-Aware Framing (OSC 1337) + # We use non-printable fences to accurately slice the command output + with self.lock: + active_tid = sess.get("active_task") + if active_tid and sess.get("buffer_file"): + start_fence = f"\x1b]1337;TaskStart;id={active_tid}\x07" + end_fence_prefix = f"\x1b]1337;TaskEnd;id={active_tid};exit=" - if "__CORTEX_FIN_SH_" in stealth_out: - # Remove any line that contains our internal marker from the output stream. - # This covers both the fallback echo and the actual execution exit code emission. - stealth_out = re.sub(r'.*__CORTEX_FIN_SH_.*[\r\n]*', '', stealth_out) + bracket_start_fence = f"[[1337;TaskStart;id={active_tid}]]" + bracket_end_fence_prefix = f"[[1337;TaskEnd;id={active_tid};exit=" + + sess["buffer_file"].write(decoded) + sess["buffer_file"].flush() + + # Byte-accurate 16KB tail for fence detection + sess["tail_buffer"] = (sess.get("tail_buffer", "") + decoded)[-16384:] - if stealth_out: - # Phase 3: Client-Side Truncation (Stream Rate Limiting) - # Limit real-time stream to 15KB/sec per session to prevent flooding the Hub over gRPC. - # The full output is still safely written to the tempfile on disk. - with self.lock: - now = time.time() - if now - sess.get("stream_window_start", 0) > 1.0: - sess["stream_window_start"] = now - sess["stream_bytes_sent"] = 0 - dropped = sess.get("stream_dropped_bytes", 0) - if dropped > 0: - drop_msg = f"\n[... {dropped:,} bytes truncated from live stream ...]\n" - event = agent_pb2.SkillEvent( - session_id=session_id, task_id=sess.get("active_task") or "", terminal_out=drop_msg - ) - on_event(agent_pb2.ClientTaskMessage(skill_event=event)) - sess["stream_dropped_bytes"] = 0 - - if sess.get("stream_bytes_sent", 0) + len(stealth_out) > 15_000: - sess["stream_dropped_bytes"] = sess.get("stream_dropped_bytes", 0) + len(stealth_out) + # Clean ANSI from the tail buffer to prevent ConPTY injecting random cursor positions inside our marker strings + import re + ansi_escape = re.compile(r'\x1b(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') + clean_tail = ansi_escape.sub('', sess["tail_buffer"]) + + if end_fence_prefix in clean_tail or bracket_end_fence_prefix in clean_tail: + # Task completed via protocol fence! + try: + is_bracket = bracket_end_fence_prefix in clean_tail + active_end_prefix = bracket_end_fence_prefix if is_bracket else end_fence_prefix + active_start_fence = bracket_start_fence if is_bracket else start_fence + + # Extract exit code from the trailer: TaskEnd;id=...;exit=N + after_end = clean_tail.split(active_end_prefix)[1] + exit_match = re.search(r'(\d+)', after_end) + exit_code = int(exit_match.group(1)) if exit_match else 0 + + bf = sess["buffer_file"] + bf.seek(0) + full_raw = bf.read() + clean_full_raw = ansi_escape.sub('', full_raw) + + print(f" [🐚DEBUG] Fence Match! Buffer: {len(clean_full_raw)} bytes. Tail: {repr(clean_full_raw[-200:])}") + + # Clean extraction between fences (using ANSI stripped content) + if active_start_fence in clean_full_raw: + # We take the content AFTER the last start fence to avoid echo-back collision + content = clean_full_raw.split(active_start_fence)[-1].split(active_end_prefix)[0] else: - sess["stream_bytes_sent"] = sess.get("stream_bytes_sent", 0) + len(stealth_out) + content = clean_full_raw.split(active_end_prefix)[0] + + # Minimal post-processing: remove the echo of the end command itself + content = re.sub(r'echo \x1b]1337;TaskEnd;.*', '', content).strip() + content = re.sub(r'echo \[\[1337;TaskEnd;.*', '', content).strip() + + sess["result"]["stdout"] = content + sess["result"]["status"] = 0 if exit_code == 0 else 1 + + sess["buffer_file"].close() + sess["buffer_file"] = None + sess["event"].set() + + # Strip the protocol fences from the live UI stream to keep it clean (ANSI and Bracket) + decoded = re.sub(r'\x1b]1337;Task(Start|End);id=.*?\x07', '', decoded) + decoded = re.sub(r'\[\[1337;Task(Start|End);id=.*?\]\]', '', decoded) + except Exception as e: + print(f" [🐚⚠️] Protocol parsing failed: {e}") + sess["event"].set() + + # Stream terminal output back to UI + if on_event: + import re + + # M9: Filter Native Escaped cmd Echo framing from bouncing back to the UI + # e.g., "echo [[1337;Task^Start;id=xyz]] & " + decoded = re.sub(r'echo \s*\[\[1337;Task\^Start;id=[a-zA-Z0-9-]*\]\]\s*&\s*', '', decoded) + decoded = re.sub(r'\s*&\s*echo \s*\[\[1337;Task\^End;id=[a-zA-Z0-9-]*;exit=%errorlevel%\]\]', '', decoded) + + # M7: Line-Aware Hyper-Aggressive Stealthing + # Instead of complex regex on the whole buffer, we nuke any lines + # that carry our internal protocol baggage. + lines = decoded.splitlines(keepends=True) + clean_lines = [] + ansi_escape_ui = re.compile(r'\x1b(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') + for line in lines: + stripped_line = ansi_escape_ui.sub('', line) + # If the line contains our protocol marker, it's plumbing - drop it. + if "1337;Task" in stripped_line or "`e]" in line or "\\033]" in line: + continue + clean_lines.append(line) + + stealth_out = "".join(clean_lines) + + if stealth_out.strip(): + with self.lock: + now = time.time() + if now - sess.get("stream_window_start", 0) > 1.0: + sess["stream_window_start"] = now + sess["stream_bytes_sent"] = 0 + dropped = sess.get("stream_dropped_bytes", 0) + if dropped > 0: + drop_msg = f"\n[... {dropped:,} bytes truncated from live stream ...]\n" event = agent_pb2.SkillEvent( - session_id=session_id, - task_id=sess.get("active_task") or "", - terminal_out=stealth_out + session_id=session_id, task_id=sess.get("active_task") or "", terminal_out=drop_msg ) on_event(agent_pb2.ClientTaskMessage(skill_event=event)) + sess["stream_dropped_bytes"] = 0 - # EDGE INTELLIGENCE: Proactively signal prompt detection - # We only check for prompts if we are actively running a task and haven't found the marker yet. - current_event = sess.get("event") - if active_tid and current_event and not current_event.is_set(): - import re - tail = sess["tail_buffer"][-100:] if len(sess["tail_buffer"]) > 100 else sess["tail_buffer"] - for pattern in self.PROMPT_PATTERNS: - if re.search(pattern, tail): - # Send specific prompt signal - # Use last 20 chars as the 'prompt' hint - p_hint = tail[-20:].strip() - prompt_event = agent_pb2.SkillEvent( - session_id=session_id, - task_id=active_tid, - prompt=p_hint - ) - on_event(agent_pb2.ClientTaskMessage(skill_event=prompt_event)) - break + if sess.get("stream_bytes_sent", 0) + len(stealth_out) > 100_000: + sess["stream_dropped_bytes"] = sess.get("stream_dropped_bytes", 0) + len(stealth_out) + else: + sess["stream_bytes_sent"] = sess.get("stream_bytes_sent", 0) + len(stealth_out) + event = agent_pb2.SkillEvent( + session_id=session_id, + task_id=sess.get("active_task") or "", + terminal_out=stealth_out + ) + on_event(agent_pb2.ClientTaskMessage(skill_event=event)) + + # EDGE INTELLIGENCE: Proactively signal prompt detection + current_event = sess.get("event") + if active_tid and current_event and not current_event.is_set(): + import re + tail = sess["tail_buffer"][-100:] if len(sess["tail_buffer"]) > 100 else sess["tail_buffer"] + for pattern in self.PROMPT_PATTERNS: + if re.search(pattern, tail): + p_hint = tail[-20:].strip() + prompt_event = agent_pb2.SkillEvent( + session_id=session_id, + task_id=active_tid, + prompt=p_hint + ) + on_event(agent_pb2.ClientTaskMessage(skill_event=prompt_event)) + break except (EOFError, OSError): break except Exception as catch_all: print(f" [🐚❌] Reader thread FATAL exception: {catch_all}") break - - # Thread Cleanup print(f" [🐚] Shell Session Terminated: {session_id}") with self.lock: self.sessions.pop(session_id, None) @@ -229,9 +222,8 @@ self.sessions[session_id] = sess return sess - def handle_transparent_tty(self, task, on_complete, on_event=None): - """Processes raw TTY/Resize events synchronously (bypasses threadpool/sandbox).""" + """Processes raw TTY/Resize events synchronously.""" cmd = task.payload_json session_id = task.session_id or "default-session" try: @@ -243,7 +235,7 @@ if isinstance(raw_payload, dict) and "tty" in raw_payload: raw_bytes = raw_payload["tty"] sess = self._ensure_session(session_id, None, on_event) - os.write(sess["fd"], raw_bytes.encode("utf-8")) + sess["backend"].write(raw_bytes.encode("utf-8")) on_complete(task.task_id, {"stdout": "", "status": 0}, task.trace_id) return True @@ -252,9 +244,7 @@ cols = raw_payload.get("cols", 80) rows = raw_payload.get("rows", 24) sess = self._ensure_session(session_id, None, on_event) - import termios, struct, fcntl - s = struct.pack('HHHH', rows, cols, 0, 0) - fcntl.ioctl(sess["fd"], termios.TIOCSWINSZ, s) + sess["backend"].resize(cols, rows) print(f" [🐚] Terminal Resized to {cols}x{rows}") on_complete(task.task_id, {"stdout": f"resized to {cols}x{rows}", "status": 0}, task.trace_id) return True @@ -263,13 +253,12 @@ return False def execute(self, task, sandbox, on_complete, on_event=None): - """Dispatches command string to the persistent PTY shell and WAITS for completion.""" + """Dispatches command string to the abstract terminal backend and WAITS for completion.""" session_id = task.session_id or "default-session" tid = task.task_id try: cmd = task.payload_json - # --- Legacy Full-Command Execution (Sandboxed) --- allowed, status_msg = sandbox.verify(cmd) if not allowed: err_msg = f"\r\n[System] Command blocked: {status_msg}\r\n" @@ -282,84 +271,93 @@ return on_complete(tid, {"stderr": f"SANDBOX_VIOLATION: {status_msg}", "status": 2}, task.trace_id) - # Resolve CWD jail cwd = None if self.sync_mgr and task.session_id: - cwd = self.sync_mgr.get_session_dir(task.session_id) + cwd = self.sync_mgr.get_session_dir(task.session_id, create=True) elif sandbox.policy.get("WORKING_DIR_JAIL"): cwd = sandbox.policy["WORKING_DIR_JAIL"] if not os.path.exists(cwd): try: os.makedirs(cwd, exist_ok=True) except: pass - # Handle Session Persistent Process sess = self._ensure_session(session_id, cwd, on_event) - # Check for RAW mode first (bypasses busy check for interactive control) is_raw = cmd.startswith("!RAW:") if is_raw: - input_str = cmd[5:] + "\n" - print(f" [🐚⌨️] RAW Input Injection: {input_str.strip()}") - os.write(sess["fd"], input_str.encode("utf-8")) - return on_complete(tid, {"stdout": "INJECTED", "status": 0}, task.trace_id) + # M7 Fix: Agentic tasks (starting with 'task-') MUST use framing + # to ensure results are captured. Forced bypass is only allowed for manual UI typing. + if tid.startswith("task-"): + cmd = cmd[5:] + is_raw = False + else: + input_str = cmd[5:] + "\n" + print(f" [🐚⌨️] RAW Input Injection: {input_str.strip()}") + sess["backend"].write(input_str.encode("utf-8")) + return on_complete(tid, {"stdout": "INJECTED", "status": 0}, task.trace_id) - # --- 0. Busy Check: Serialize access to the PTY for standard commands --- - with self.lock: - if sess.get("active_task"): - curr_tid = sess.get("active_task") - return on_complete(tid, {"stderr": f"[BUSY] Session {session_id} is already running task {curr_tid}", "status": 1}, task.trace_id) - - # --- Blocking Wait Logic --- - # --- Blocking Wait Logic --- marker_id = int(time.time()) marker = f"__CORTEX_FIN_SH_{marker_id}__" event = threading.Event() - result_container = {"stdout": "", "status": 0} # 0 = Success (Protobuf Convention) + result_container = {"stdout": "", "status": 0} - # Register waiter in session state with self.lock: sess["active_task"] = tid - sess["marker"] = marker sess["event"] = event - # Create a persistent tempfile for stdout instead of RAM buffer sess["buffer_file"] = tempfile.NamedTemporaryFile("w+", encoding="utf-8", prefix=f"cortex_task_{tid}_", delete=False) sess["tail_buffer"] = "" sess["result"] = result_container sess["cancel_event"] = threading.Event() - # Input injection: execute command via sourced script to hide plumbing from ZLE/Readline echo try: - script_path = os.path.join(tempfile.gettempdir(), f"ctx_{marker_id}.sh") - script_content = f"{cmd}\n__ctx_exit=$?\necho \"__CORTEX_FIN_SH_{marker_id}__\" $__ctx_exit\nrm -f {script_path}\n" - try: - with open(script_path, "w") as f: - f.write(script_content) - # Space prefix often hides it from shell history. Extremely short payload prevents ZLE line-wrap scrambling! - full_input = f" source {script_path}\n" - except Exception as e: - print(f" [🐚⚠️] Failed to write injection script dictating fallback: {e}") - # Fallback if writing file natively fails for some system reason - full_input = f"({cmd}) ; echo \"__CORTEX_FIN_SH_\"\"{marker_id}__\" $?\n" + # M7: Protocol-Aware Command Framing (OSC 1337) + # We wrap the command in non-printable control sequences. + # Format: ESC ] 1337 ; ST (\x07) + start_marker = f"1337;TaskStart;id={tid}" + end_marker = f"1337;TaskEnd;id={tid}" + + import platform + if platform.system() == "Windows": + # M7: EncodedCommand for Windows (Bypasses Quote Hell) + # This ensures byte-accurate delivery of ESC ([char]27) and BEL ([char]7) + import base64 - os.write(sess["fd"], full_input.encode("utf-8")) + # M8: Ultimate Windows Shell Boundary Method (File Spooling) + # Bypasses Conhost VTP Redraw byte swallowing caused by line wrapping in PTY + # Bypasses powershell encoded limits. + import os + import tempfile as tf + spool_dir = os.path.join(tf.gettempdir(), "cortex_pty_tasks") + os.makedirs(spool_dir, exist_ok=True) + task_path = os.path.join(spool_dir, f"{tid}.bat") + + # We write the logic to a native shell file so the PTY simply executes a short path + with open(task_path, "w", encoding="utf-8") as f: + f.write(f"@echo off\r\n") + f.write(f"echo [[1337;TaskStart;id={tid}]]\r\n") + f.write(f"{cmd}\r\n") + f.write(f"echo [[1337;TaskEnd;id={tid};exit=%errorlevel%]]\r\n") + # optionally clean up itself + f.write(f"del \"%~f0\"\r\n") + + full_input = f"\"{task_path}\"\r\n" + else: + # On Linux, we use echo -e with octal escapes + s_m = f"\\033]{start_marker}\\007" + e_m = f"\\033]{end_marker};exit=$__ctx_exit\\007" + full_input = f"echo -e -n \"{s_m}\"; {cmd}; __ctx_exit=$?; echo -e -n \"{e_m}\"\n" + + sess["backend"].write(full_input.encode("utf-8")) - # Wait for completion (triggered by reader) OR cancellation timeout = (task.timeout_ms / 1000.0) if task.timeout_ms > 0 else 60.0 start_time = time.time() while time.time() - start_time < timeout: - # Check for completion (reader found marker) if event.is_set(): return on_complete(tid, result_container, task.trace_id) - - # Check for cancellation (HUB sent cancel) if sess["cancel_event"].is_set(): print(f" [🐚🛑] Task {tid} cancelled on node.") return on_complete(tid, {"stderr": "ABORTED", "status": 2}, task.trace_id) - - # Sleep slightly to avoid busy loop time.sleep(0.1) - # Timeout Case print(f" [🐚⚠️] Task {tid} timed out on node.") with self.lock: if sess.get("buffer_file"): @@ -385,7 +383,6 @@ on_complete(tid, {"stdout": partial_out, "stderr": "TIMEOUT", "status": 2}, task.trace_id) finally: - # Cleanup session task state with self.lock: if sess.get("active_task") == tid: if sess.get("buffer_file"): @@ -409,22 +406,21 @@ for sid, sess in self.sessions.items(): if sess.get("active_task") == task_id: print(f"[🛑] Sending SIGINT (Ctrl+C) to shell session (Task {task_id}): {sid}") - # Write \x03 (Ctrl+C) to the master FD - os.write(sess["fd"], b"\x03") - # Break the wait loop in execute thread + sess["backend"].write(b"\x03") if sess.get("cancel_event"): sess["cancel_event"].set() return True - def shutdown(self): - """Cleanup: Terminates all persistent shells.""" + """Cleanup: Terminates all persistent shells via backends.""" with self.lock: for sid, sess in list(self.sessions.items()): print(f"[🛑] Cleaning up persistent shell: {sid}") - try: os.close(sess["fd"]) - except: pass - # kill pid - try: os.kill(sess["pid"], 9) + try: sess["backend"].kill() except: pass self.sessions.clear() + +import os +import threading +import time +import tempfile diff --git a/agent-node/src/agent_node/skills/terminal_backends.py b/agent-node/src/agent_node/skills/terminal_backends.py new file mode 100644 index 0000000..7609b39 --- /dev/null +++ b/agent-node/src/agent_node/skills/terminal_backends.py @@ -0,0 +1,184 @@ +import os +import platform +import abc +import threading +import time + +class BaseTerminal(abc.ABC): + """Abstract Base Class for Terminal Backends.""" + + @abc.abstractmethod + def spawn(self, cwd=None, env=None): + """Initializes the shell process in a PTY/ConPTY.""" + pass + + @abc.abstractmethod + def read(self, size=4096) -> bytes: + """Non-blocking read from the terminal.""" + pass + + @abc.abstractmethod + def write(self, data: bytes): + """Writes data (keystrokes) to the terminal.""" + pass + + @abc.abstractmethod + def resize(self, cols: int, rows: int): + """Resizes the terminal window.""" + pass + + @abc.abstractmethod + def kill(self): + """Terminates the shell process and cleans up resources.""" + pass + + @abc.abstractmethod + def is_alive(self) -> bool: + """Checks if the terminal process is still running.""" + pass + + +class PosixTerminal(BaseTerminal): + """POSIX implementation using pty.fork() and standard fcntl/termios.""" + + def __init__(self): + self.fd = None + self.pid = None + + def spawn(self, cwd=None, env=None): + import pty + self.pid, self.fd = pty.fork() + + if self.pid == 0: # Child process + # Environment prep + if env: + os.environ.update(env) + os.environ["TERM"] = "xterm-256color" + + if cwd and os.path.exists(cwd): + os.chdir(cwd) + + # Launch shell + shell_path = "/bin/bash" + if not os.path.exists(shell_path): + shell_path = "/bin/sh" + os.execv(shell_path, [shell_path, "--login"]) + + # Parent process: Set non-blocking + import fcntl + fl = fcntl.fcntl(self.fd, fcntl.F_GETFL) + fcntl.fcntl(self.fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) + + def read(self, size=4096) -> bytes: + if self.fd is None: + return b"" + try: + import select + r, _, _ = select.select([self.fd], [], [], 0.05) + if self.fd in r: + return os.read(self.fd, size) + except (OSError, EOFError): + pass + return b"" + + def write(self, data: bytes): + if self.fd is not None: + os.write(self.fd, data) + + def resize(self, cols: int, rows: int): + if self.fd is not None: + import termios + import struct + import fcntl + s = struct.pack('HHHH', rows, cols, 0, 0) + fcntl.ioctl(self.fd, termios.TIOCSWINSZ, s) + + def kill(self): + if self.fd is not None: + try: os.close(self.fd) + except: pass + self.fd = None + if self.pid is not None: + try: os.kill(self.pid, 9) + except: pass + self.pid = None + + def is_alive(self) -> bool: + if self.pid is None: + return False + try: + # waitpid with WNOHANG to check if it's still running + pid, status = os.waitpid(self.pid, os.WNOHANG) + return pid == 0 + except OSError: + return False + + +class WindowsTerminal(BaseTerminal): + """Windows implementation using pywinpty (ConPTY).""" + + def __init__(self): + self.pty = None + + def spawn(self, cwd=None, env=None): + try: + from winpty import PTY + except ImportError: + raise ImportError("pywinpty is required for Windows terminal support. Please install it with 'pip install pywinpty'.") + + # Default shell for Windows: CMD is much more stable for gRPC PTY streams + shell_cmd = "cmd.exe" + + # M7: Force TERM=dumb to suppress complex ANSI sequences that clobber rendering + if env is None: env = os.environ.copy() + env["TERM"] = "dumb" + + self.pty = PTY(120, 30) + self.pty.spawn(shell_cmd, cwd=cwd, env=None) + + def read(self, size=4096) -> bytes: + if self.pty is None: + return b"" + # pywinpty's read is usually non-blocking or can be polled + try: + # Newer pywinpty versions use 'blocking' instead of 'size' + data = self.pty.read(blocking=False) + return data.encode('utf-8') if isinstance(data, str) else data + except EOFError: + return b"" + + def write(self, data: bytes): + if self.pty is not None: + import time + # pywinpty expects strings for input + text = data.decode('utf-8', errors='replace') + # Chunk writes to prevent PyWinPTY/ConHost input buffer saturation drops on Windows + # Conhost is highly sensitive to rapid buffer writes over 120 bytes. + chunk_size = 32 + for i in range(0, len(text), chunk_size): + self.pty.write(text[i:i+chunk_size]) + time.sleep(0.02) + + def resize(self, cols: int, rows: int): + if self.pty is not None: + self.pty.set_size(cols, rows) + + def kill(self): + if self.pty is not None: + # PTY object usually cleans up the process on deletion, + # but we can be explicit if the library supports it + self.pty = None + + def is_alive(self) -> bool: + if self.pty is None: + return False + # pywinpty's isalive method + return self.pty.isalive() + + +def get_terminal_backend() -> BaseTerminal: + """Factory function to return the correct terminal backend based on the platform.""" + if platform.system() == "Windows": + return WindowsTerminal() + else: + return PosixTerminal() diff --git a/agent-node/src/agent_node/utils/platform_metrics.py b/agent-node/src/agent_node/utils/platform_metrics.py new file mode 100644 index 0000000..350c363 --- /dev/null +++ b/agent-node/src/agent_node/utils/platform_metrics.py @@ -0,0 +1,149 @@ +import os +import platform +import subprocess +import socket +import abc + +class BasePlatformMetrics(abc.ABC): + """Abstract Base Class for Platform-Specific Metrics and Capabilities.""" + + @abc.abstractmethod + def collect_capabilities(self) -> dict: + """Collects hardware and OS capabilities.""" + pass + + @abc.abstractmethod + def get_load_avg(self) -> list: + """Returns 1, 5, 15 min load averages.""" + pass + +class PosixMetrics(BasePlatformMetrics): + """POSIX (Linux/macOS) Metrics Implementation.""" + + def collect_capabilities(self) -> dict: + import platform + caps = { + "shell": "v1", + "arch": platform.machine(), + "os": platform.system().lower(), + "os_release": platform.release(), + } + + # Privilege Detection + try: + caps["is_root"] = (os.getuid() == 0) + except: + caps["is_root"] = False + + if not caps["is_root"]: + try: + r = subprocess.run(["sudo", "-n", "true"], capture_output=True, timeout=2) + caps["has_sudo"] = (r.returncode == 0) + except: + caps["has_sudo"] = False + else: + caps["has_sudo"] = False + + # GPU Detection + caps["gpu"] = self._detect_gpu(caps) + + # Display Detection + if caps["os"] == "darwin": + caps["has_display"] = True + else: + caps["has_display"] = bool(os.environ.get("DISPLAY") or os.environ.get("WAYLAND_DISPLAY")) + + return caps + + def _detect_gpu(self, caps) -> str: + try: + result = subprocess.run( + ["nvidia-smi", "--query-gpu=name", "--format=csv,noheader"], + capture_output=True, text=True, timeout=5 + ) + if result.returncode == 0 and result.stdout.strip(): + return result.stdout.strip().split("\n")[0] + except: + pass + + if caps["os"] == "darwin" and "arm" in caps["arch"].lower(): + return "apple-silicon" + + return "none" + + def get_load_avg(self) -> list: + try: + return list(os.getloadavg()) + except: + return [0.0, 0.0, 0.0] + + +class WindowsMetrics(BasePlatformMetrics): + """Windows Metrics Implementation.""" + + def collect_capabilities(self) -> dict: + import platform + caps = { + "shell": "v1", + "arch": platform.machine(), + "os": "windows", + "os_release": platform.release(), + } + + # Admin Detection + try: + import ctypes + caps["is_root"] = bool(ctypes.windll.shell32.IsUserAnAdmin()) + except: + caps["is_root"] = False + + caps["has_sudo"] = False # sudo isn't a thing on Windows usually + + # GPU Detection + gpu_name = "none" + try: + # 1. Try nvidia-smi.exe (Discrete NVIDIA GPU) + result = subprocess.run( + ["nvidia-smi", "--query-gpu=name", "--format=csv,noheader"], + capture_output=True, text=True, timeout=5 + ) + if result.returncode == 0 and result.stdout.strip(): + gpu_name = result.stdout.strip().split("\n")[0] + except: + pass # nvidia-smi likely missing + + if gpu_name == "none": + try: + # 2. Try WMIC fallback (Integrated Intel/AMD or other) + wmic_res = subprocess.run( + ["wmic", "path", "win32_VideoController", "get", "name"], + capture_output=True, text=True, timeout=5 + ) + if wmic_res.returncode == 0: + names = [l.strip() for l in wmic_res.stdout.split("\n") if l.strip() and "Name" not in l] + # Filter out basic/driver stubs + filtered = [n for n in names if "Microsoft" not in n] + if filtered: + gpu_name = filtered[0] + elif names: + gpu_name = names[0] + except: + pass + + caps["gpu"] = gpu_name + + caps["has_display"] = True # Windows usually has a display + return caps + + def get_load_avg(self) -> list: + # Windows doesn't have loadavg; approximate using psutil in the caller if needed + # or just return zero. Hub can derive load from cpu_usage history. + return [0.0, 0.0, 0.0] + + +def get_platform_metrics() -> BasePlatformMetrics: + """Factory to get the platform-appropriate metrics tool.""" + if platform.system() == "Windows": + return WindowsMetrics() + else: + return PosixMetrics() diff --git a/agent-node/src/agent_node/utils/service_manager.py b/agent-node/src/agent_node/utils/service_manager.py new file mode 100644 index 0000000..dfbad2a --- /dev/null +++ b/agent-node/src/agent_node/utils/service_manager.py @@ -0,0 +1,212 @@ +import os +import platform +import abc +import subprocess +import sys + +class BaseServiceManager(abc.ABC): + """Abstract Base Class for Platform-Specific Service Management.""" + + @abc.abstractmethod + def install(self, python_path, script_path, working_dir) -> bool: + """Installs the agent as a background service/daemon.""" + pass + + @abc.abstractmethod + def uninstall(self) -> bool: + """Removes the agent service.""" + pass + + @abc.abstractmethod + def is_installed(self) -> bool: + """Checks if the service is currently registered.""" + pass + + @abc.abstractmethod + def start(self) -> bool: + """Starts the service.""" + pass + + @abc.abstractmethod + def stop(self) -> bool: + """Stops the service.""" + pass + + +class LaunchdManager(BaseServiceManager): + """macOS implementation using launchd.""" + LABEL = "com.jerxie.cortex.agent" + + def _get_plist_path(self): + return os.path.expanduser(f"~/Library/LaunchAgents/{self.LABEL}.plist") + + def install(self, python_path, script_path, working_dir) -> bool: + plist_content = f""" + + + + Label + {self.LABEL} + ProgramArguments + + {python_path} + {script_path} + + WorkingDirectory + {working_dir} + KeepAlive + + RunAtLoad + + StandardErrorPath + {os.path.expanduser("~")}/.cortex/agent.err.log + StandardOutPath + {os.path.expanduser("~")}/.cortex/agent.out.log + + +""" + path = self._get_plist_path() + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, "w") as f: + f.write(plist_content) + + try: + subprocess.run(["launchctl", "load", path], check=True) + return True + except: + return False + + def uninstall(self) -> bool: + path = self._get_plist_path() + if os.path.exists(path): + subprocess.run(["launchctl", "unload", path], capture_output=True) + os.remove(path) + return True + return False + + def is_installed(self) -> bool: + return os.path.exists(self._get_plist_path()) + + def start(self) -> bool: + try: + subprocess.run(["launchctl", "load", self._get_plist_path()], check=True) + return True + except: + return False + + def stop(self) -> bool: + try: + subprocess.run(["launchctl", "unload", self._get_plist_path()], check=True) + return True + except: + return False + + +class SystemdManager(BaseServiceManager): + """Linux implementation using systemd user services.""" + SERVICE_NAME = "cortex-agent" + + def _get_service_path(self): + return os.path.expanduser(f"~/.config/systemd/user/{self.SERVICE_NAME}.service") + + def install(self, python_path, script_path, working_dir) -> bool: + content = f"""[Unit] +Description=Cortex Agent Node +After=network.target + +[Service] +Type=simple +ExecStart={python_path} {script_path} +WorkingDirectory={working_dir} +Restart=always +RestartSec=5 +StandardOutput=append:{os.path.expanduser("~")}/.cortex/agent.out.log +StandardError=append:{os.path.expanduser("~")}/.cortex/agent.err.log + +[Install] +WantedBy=default.target +""" + path = self._get_service_path() + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, "w") as f: + f.write(content) + + try: + subprocess.run(["systemctl", "--user", "daemon-reload"], check=True) + subprocess.run(["systemctl", "--user", "enable", self.SERVICE_NAME], check=True) + subprocess.run(["systemctl", "--user", "start", self.SERVICE_NAME], check=True) + return True + except: + return False + + def uninstall(self) -> bool: + try: + subprocess.run(["systemctl", "--user", "stop", self.SERVICE_NAME], capture_output=True) + subprocess.run(["systemctl", "--user", "disable", self.SERVICE_NAME], capture_output=True) + path = self._get_service_path() + if os.path.exists(path): + os.remove(path) + return True + except: + return False + + def is_installed(self) -> bool: + return os.path.exists(self._get_service_path()) + + def start(self) -> bool: + return subprocess.run(["systemctl", "--user", "start", self.SERVICE_NAME]).returncode == 0 + + def stop(self) -> bool: + return subprocess.run(["systemctl", "--user", "stop", self.SERVICE_NAME]).returncode == 0 + + +class SchtasksManager(BaseServiceManager): + """Windows implementation using Task Scheduler (schtasks).""" + TASK_NAME = "CortexAgent" + + def install(self, python_path, script_path, working_dir) -> bool: + # Create a scheduled task that runs on system start + # Use /SC ONSTART or /SC ONLOGON + # We'll use ONLOGON for now as it's more reliable for user-space agents + cmd = [ + "schtasks", "/Create", "/TN", self.TASK_NAME, + "/TR", f'"{python_path}" "{script_path}"', + "/SC", "ONLOGON", "/F", "/RL", "HIGHEST" + ] + try: + subprocess.run(cmd, check=True, capture_output=True) + # Start it immediately + subprocess.run(["schtasks", "/Run", "/TN", self.TASK_NAME], check=True, capture_output=True) + return True + except Exception as e: + print(f"Error installing Windows task: {e}") + return False + + def uninstall(self) -> bool: + try: + subprocess.run(["schtasks", "/Delete", "/TN", self.TASK_NAME, "/F"], capture_output=True) + return True + except: + return False + + def is_installed(self) -> bool: + r = subprocess.run(["schtasks", "/Query", "/TN", self.TASK_NAME], capture_output=True) + return r.returncode == 0 + + def start(self) -> bool: + return subprocess.run(["schtasks", "/Run", "/TN", self.TASK_NAME]).returncode == 0 + + def stop(self) -> bool: + return subprocess.run(["schtasks", "/End", "/TN", self.TASK_NAME]).returncode == 0 + + +def get_service_manager() -> BaseServiceManager: + """Factory to get the platform-appropriate service manager.""" + sys_type = platform.system().lower() + if sys_type == "darwin": + return LaunchdManager() + elif sys_type == "windows": + return SchtasksManager() + else: + # Fallback to systemd for Linux + return SystemdManager() diff --git a/ai-hub/.dockerignore b/ai-hub/.dockerignore index b7749fb..24c9de5 100644 --- a/ai-hub/.dockerignore +++ b/ai-hub/.dockerignore @@ -1,2 +1,12 @@ **/__pycache__ +**/.DS_Store **/._* +**/.git +**/.venv +**/venv +*.pyc +*.pyo +.DS_Store +data/ +logs/ +temp/ diff --git a/ai-hub/app/api/routes/agent_update.py b/ai-hub/app/api/routes/agent_update.py index 67648e9..987e000 100644 --- a/ai-hub/app/api/routes/agent_update.py +++ b/ai-hub/app/api/routes/agent_update.py @@ -174,6 +174,25 @@ headers={"Content-Disposition": "attachment; filename=bootstrap_installer.py"} ) + @router.get("/installer/ps1", summary="Download bootstrap_windows.ps1") + def download_installer_ps1(): + """ + Returns the bootstrap_windows.ps1 script itself. + Used for initial one-liner provisioning on Windows nodes. + """ + installer_path = os.path.join(_AGENT_NODE_DIR, "bootstrap_windows.ps1") + if not os.path.exists(installer_path): + raise HTTPException(status_code=404, detail="Windows Installer script not found on hub.") + + with open(installer_path, "rb") as f: + content = f.read() + + return Response( + content=content, + media_type="text/plain", # Use text/plain for easy piping to IEX + headers={"Content-Disposition": "attachment; filename=bootstrap_windows.ps1"} + ) + @router.get("/binary/{arch}", summary="Download Standalone Binary") def download_binary(arch: str): """ diff --git a/ai-hub/app/api/routes/nodes.py b/ai-hub/app/api/routes/nodes.py index 0518cb8..fb64dcd 100644 --- a/ai-hub/app/api/routes/nodes.py +++ b/ai-hub/app/api/routes/nodes.py @@ -34,6 +34,7 @@ from app.api.dependencies import ServiceContainer, get_db +from app.config import settings from app.api import schemas from app.db import models @@ -455,11 +456,31 @@ raise HTTPException(status_code=403, detail="Invalid node or token.") config_yaml = _generate_node_config_yaml(node, None, db) - base_url = f"{request.url.scheme}://{request.url.netloc}" + base_url = settings.HUB_PUBLIC_URL or f"{request.url.scheme}://{request.url.netloc}" script = services.mesh_service.generate_provisioning_sh(node, config_yaml, base_url) return PlainTextResponse(script) + @router.get("/provision/ps1/{node_id}", summary="Headless Provisioning Script (PowerShell)") + def provision_node_ps1(node_id: str, token: str, request: Request, db: Session = Depends(get_db)): + """ + Returns a PowerShell script that can be piped into IEX to automatically + install and start the agent node on Windows. + + Usage: irm http://.../provision/ps1/{node_id}?token={token} | iex + """ + from fastapi.responses import PlainTextResponse + node = db.query(models.AgentNode).filter(models.AgentNode.node_id == node_id).first() + if not node or node.invite_token != token: + raise HTTPException(status_code=403, detail="Invalid node or token.") + + config_yaml = _generate_node_config_yaml(node, None, db) + base_url = settings.HUB_PUBLIC_URL or f"{request.url.scheme}://{request.url.netloc}" + grpc_url = settings.GRPC_TARGET_ORIGIN or f"{request.url.hostname}:{settings.GRPC_PORT}" + + script = services.mesh_service.generate_provisioning_ps1(node, config_yaml, base_url, grpc_url=grpc_url) + return PlainTextResponse(script) + @router.get("/provision/binary/{node_id}/{arch}", summary="Download Self-Contained Binary ZIP Bundle") def provision_node_binary_bundle(node_id: str, arch: str, token: str, db: Session = Depends(get_db)): """ @@ -727,6 +748,41 @@ }) continue + if data.get("action") == "terminal_in": + live_node = registry.get_node(node_id) + if not live_node: continue + + cmd = json.dumps({"tty": data.get("data", "")}) + # Wrap keystrokes in a high-priority TaskRequest + task_req = agent_pb2.TaskRequest( + task_id=f"tty-{id(data)}", + payload_json=cmd, + signature=sign_payload(cmd), + timeout_ms=0, + session_id=data.get("session_id", "") + ) + live_node.send_message(agent_pb2.ServerTaskMessage(task_request=task_req), priority=1) + continue + + if data.get("action") == "resize": + live_node = registry.get_node(node_id) + if not live_node: continue + + cmd = json.dumps({ + "action": "resize", + "cols": data.get("cols", 80), + "rows": data.get("rows", 24) + }) + task_req = agent_pb2.TaskRequest( + task_id=f"resize-{id(data)}", + payload_json=cmd, + signature=sign_payload(cmd), + timeout_ms=0, + session_id=data.get("session_id", "") + ) + live_node.send_message(agent_pb2.ServerTaskMessage(task_request=task_req), priority=1) + continue + if data.get("action") == "dispatch": live_node = registry.get_node(node_id) if not live_node: @@ -1242,7 +1298,7 @@ "# fs_root: \"/User/username/Documents\"", "", "# TLS — set to false only in dev", - "tls: true", + f"tls: {str(settings.GRPC_TLS_ENABLED).lower()}", ] return "\n".join(lines) diff --git a/ai-hub/app/api/routes/sessions.py b/ai-hub/app/api/routes/sessions.py index d5de67a..d9fdb9f 100644 --- a/ai-hub/app/api/routes/sessions.py +++ b/ai-hub/app/api/routes/sessions.py @@ -24,6 +24,7 @@ db=db, user_id=request.user_id, provider_name=request.provider_name, + model_name=request.model_name, feature_name=request.feature_name, stt_provider_name=request.stt_provider_name, tts_provider_name=request.tts_provider_name @@ -255,6 +256,8 @@ session.title = session_update.title if session_update.provider_name is not None: session.provider_name = session_update.provider_name + if session_update.model_name is not None: + session.model_name = session_update.model_name if session_update.stt_provider_name is not None: session.stt_provider_name = session_update.stt_provider_name if session_update.tts_provider_name is not None: diff --git a/ai-hub/app/api/schemas.py b/ai-hub/app/api/schemas.py index d660d31..be60d57 100644 --- a/ai-hub/app/api/schemas.py +++ b/ai-hub/app/api/schemas.py @@ -209,6 +209,7 @@ """Defines the shape for starting a new conversation session.""" user_id: str provider_name: str = "deepseek" + model_name: Optional[str] = None stt_provider_name: Optional[str] = None tts_provider_name: Optional[str] = None feature_name: Optional[str] = "default" @@ -216,6 +217,7 @@ class SessionUpdate(BaseModel): title: Optional[str] = None provider_name: Optional[str] = None + model_name: Optional[str] = None stt_provider_name: Optional[str] = None tts_provider_name: Optional[str] = None restrict_skills: Optional[bool] = None diff --git a/ai-hub/app/core/grpc/core/journal.py b/ai-hub/app/core/grpc/core/journal.py index 231d619..8d2d09e 100644 --- a/ai-hub/app/core/grpc/core/journal.py +++ b/ai-hub/app/core/grpc/core/journal.py @@ -1,5 +1,6 @@ import threading import time +from typing import Optional from app.config import settings class TaskJournal: @@ -109,6 +110,13 @@ } return event + def get_task_event(self, task_id: str) -> Optional[threading.Event]: + """Returns the completion event for an existing task, or None if not found.""" + shard = self._get_shard(task_id) + with shard["lock"]: + task = shard["tasks"].get(task_id) + return task["event"] if task else None + def add_thought(self, task_id: str, thought: str) -> bool: """Adds an AI reasoning entry to the task's history (head+tail bounded).""" shard = self._get_shard(task_id) diff --git a/ai-hub/app/core/grpc/services/assistant.py b/ai-hub/app/core/grpc/services/assistant.py index 923ae10..c13f8c4 100644 --- a/ai-hub/app/core/grpc/services/assistant.py +++ b/ai-hub/app/core/grpc/services/assistant.py @@ -738,11 +738,9 @@ def wait_for_task(self, node_id, task_id, timeout=30, no_abort=False): """Waits for an existing task in the journal.""" # Check journal first - with self.journal.lock: - data = self.journal.tasks.get(task_id) - if not data: - return {"error": f"Task {task_id} not found in journal (finished or expired)", "status": "NOT_FOUND"} - event = data["event"] + event = self.journal.get_task_event(task_id) + if not event: + return {"error": f"Task {task_id} not found in journal (finished or expired)", "status": "NOT_FOUND"} # Immediate peek if timeout is 0 or event is already set if timeout == 0 or event.is_set(): diff --git a/ai-hub/app/core/grpc/services/grpc_server.py b/ai-hub/app/core/grpc/services/grpc_server.py index 77b66af..c4dad91 100644 --- a/ai-hub/app/core/grpc/services/grpc_server.py +++ b/ai-hub/app/core/grpc/services/grpc_server.py @@ -134,19 +134,29 @@ sandbox_cfg = shell_cfg.get("sandbox", {}) if isinstance(shell_cfg, dict) else {} if sandbox_cfg is None: sandbox_cfg = {} - # 1. Resolve Mode (Default to STRICT to follow Secure-by-Default principle) - mode_str = (sandbox_cfg.get("mode") or "STRICT").upper() - grpc_mode = agent_pb2.SandboxPolicy.STRICT if mode_str == "STRICT" else agent_pb2.SandboxPolicy.PERMISSIVE + # 1. Resolve Mode (M6 FIX: Hub now defaults to PERMISSIVE/PASSIVE to match UI expectation) + raw_mode = (sandbox_cfg.get("mode") or "PERMISSIVE").upper() + # Aliasing: UI calls it PASSIVE, Backend calls it PERMISSIVE + if raw_mode in ["PERMISSIVE", "PASSIVE"]: + grpc_mode = agent_pb2.SandboxPolicy.PERMISSIVE + else: + grpc_mode = agent_pb2.SandboxPolicy.STRICT - # 2. Resolve Command Lists (fallback to some safe defaults if enabled but empty) + # 2. Resolve Command Lists (M6 FIX: Expanded with common Windows/PowerShell tools) allowed = sandbox_cfg.get("allowed_commands", []) or [] if not allowed and shell_cfg.get("enabled", True): allowed = [ + # Linux/macOS Defaults "ls", "cat", "echo", "pwd", "uname", "curl", "python3", "git", "lscpu", "free", "df", "uptime", "nproc", "grep", "awk", "sed", "hostname", "id", "whoami", "ip", "ping", "ps", "top", "which", "sh", "bash", "zsh", "set", "export", "env", "find", "mkdir", - "touch", "rm", "mv", "cp", "tail", "head", "less", "more" + "touch", "rm", "mv", "cp", "tail", "head", "less", "more", + # Windows/PowerShell Defaults (M6) + "ipconfig", "systeminfo", "ver", "dir", "cmd", "powershell", "pwsh", + "Get-CimInstance", "Select-Object", "Measure-Object", "Get-Process", + "Get-Date", "Get-NetIPAddress", "cls", "type", "where", "findstr", + "tasklist", "netstat", "whoami", "hostname" ] denied = sandbox_cfg.get("denied_commands", []) or [] @@ -264,16 +274,22 @@ logger.warning(f"Results listener closed for {node_id}: {e}") threading.Thread(target=_read_results, daemon=True, name=f"Results-{node_id}").start() - # 3. Work Dispatcher (Main Stream) last_keepalive = 0 while context.is_active(): try: priority_item = node.queue.get(timeout=1.0) msg = priority_item[2] # Unpack (priority, ts, msg) + + # Unified Signing: Ensure every outbound message is signed for the node's strict verify logic + msg.signature = "" + msg_bytes = msg.SerializeToString(deterministic=True) + msg.signature = sign_bytes(msg_bytes) + if os.getenv("DEBUG_GRPC"): kind = msg.WhichOneof("payload") - logger.info(f"[DEBUG-gRPC] OUTBOUND to {node_id}: {kind}") + logger.info(f"[DEBUG-gRPC] OUTBOUND to {node_id}: {kind} (Signed)") + yield msg except queue.Empty: now = time.time() diff --git a/ai-hub/app/core/orchestration/agent_loop.py b/ai-hub/app/core/orchestration/agent_loop.py index f0eb50b..ae7e84b 100644 --- a/ai-hub/app/core/orchestration/agent_loop.py +++ b/ai-hub/app/core/orchestration/agent_loop.py @@ -198,6 +198,16 @@ total_task_output_tokens = 0 total_task_tool_counts = {} + # --- AWAIT INITIAL SETUP (Sync Point) --- + if rubric_task: + # We wait for the first round's rubric to ensure the node is ready for the test/UI + # Subsequent rounds use the already-captured rubric_content + rubric_content = await rubric_task + if not rubric_content: + rubric_content = "# Evaluation Rubric\nComplete the requested task with high technical accuracy." + # Reset task to None so we don't await it again in the loop + rubric_task = None + # --- MAIN REWORK LOOP --- loop_start = time.time() # Handle scope for exception reporting while current_attempt <= max_rework_attempts: @@ -262,11 +272,15 @@ now = time.time() if now - last_db_sync_time > 2.0 or sync_token_count >= 50: instance = db.query(AgentInstance).filter(AgentInstance.id == agent_id).first() - instance.last_reasoning = (instance.last_reasoning or "") + content_buffer - content_buffer = "" - last_db_sync_time = now - sync_token_count = 0 - if not safe_commit(): return + if instance: + instance.last_reasoning = (instance.last_reasoning or "") + content_buffer + content_buffer = "" + last_db_sync_time = now + sync_token_count = 0 + if not safe_commit(): return + else: + # Agent deleted, stop streaming + return try: if registry and instance.mesh_node_id: @@ -282,8 +296,9 @@ # Final flush if content_buffer: instance = db.query(AgentInstance).filter(AgentInstance.id == agent_id).first() - instance.last_reasoning = (instance.last_reasoning or "") + content_buffer - if not safe_commit(): return + if instance: + instance.last_reasoning = (instance.last_reasoning or "") + content_buffer + if not safe_commit(): return content_buffer = "" # --- Persistence: Update Cumulative Metrics in DB (Real-time) --- diff --git a/ai-hub/app/core/services/mesh.py b/ai-hub/app/core/services/mesh.py index 0607849..8de05ad 100644 --- a/ai-hub/app/core/services/mesh.py +++ b/ai-hub/app/core/services/mesh.py @@ -95,6 +95,22 @@ logger.error(f"Failed to generate provisioning script: {e}") return f"Error: {e}" + def generate_provisioning_ps1(self, node: models.AgentNode, config_yaml: str, base_url: str, grpc_url: str = "") -> str: + if not self.jinja_env: + return "Error: Templates directory not found." + try: + template = self.jinja_env.get_template("provision.ps1.j2") + return template.render( + node_id=node.node_id, + config_yaml=config_yaml, + base_url=base_url, + grpc_url=grpc_url or base_url.replace("http://", "").replace("https://", ""), + invite_token=node.invite_token + ) + except Exception as e: + logger.error(f"Failed to generate provisioning script: {e}") + return f"Error: {e}" + def get_template_content(self, filename: str) -> str: if not self.jinja_env: return "" diff --git a/ai-hub/app/core/services/node_registry.py b/ai-hub/app/core/services/node_registry.py index 38eb650..0567c29 100644 --- a/ai-hub/app/core/services/node_registry.py +++ b/ai-hub/app/core/services/node_registry.py @@ -310,7 +310,15 @@ logger.warning(f"[⚠️] FLAPPING DETECTED for node '{node_id}': {len(history)} connects in {self._FLAP_WINDOW_S}s.") # 2. Register the live connection + # M7: Persist terminal history across reconnections to handle flapping + existing_history = None + if node_id in self._nodes: + existing_history = self._nodes[node_id].terminal_history + record = LiveNodeRecord(node_id=node_id, user_id=user_id, metadata=metadata) + if existing_history: + record.terminal_history = existing_history + record._registry_executor = self.executor # Inject shared executor self._nodes[node_id] = record diff --git a/ai-hub/app/core/services/preference.py b/ai-hub/app/core/services/preference.py index 6352f05..66e129a 100644 --- a/ai-hub/app/core/services/preference.py +++ b/ai-hub/app/core/services/preference.py @@ -62,29 +62,64 @@ # Build effective combined config for processing def get_effective_providers(section_name, user_section_providers, sys_defaults): - # Start with system defaults if user has none - effective_providers = {} - if not user_section_providers: - effective_providers = copy.deepcopy(sys_defaults) - else: - effective_providers = copy.deepcopy(user_section_providers) + # M6/M3: Deep merge system defaults with user overrides. + # This ensures that if a user configures a model name in the UI, + # they don't lose the API key from config.yaml if it's not provided in the UI. - # Filter by health and mask keys + # Start with system defaults (config.yaml / env) + effective = copy.deepcopy(sys_defaults) + + # Layer on overrides (Admin or User) + if user_section_providers: + for p_id, p_data in user_section_providers.items(): + if p_id not in effective: + # New provider defined in UI + effective[p_id] = copy.deepcopy(p_data) + else: + # Override existing provider fields + for field, val in p_data.items(): + # Only override if the value is meaningful (not empty/null) + if val is not None and val != "" and str(val).lower() != "none": + effective[p_id][field] = val + + # Filter by health and mask keys for the response res = {} - for p, p_data in effective_providers.items(): + for p, p_data in effective.items(): if p_data and is_provider_healthy(section_name, p, p_data): masked_data = copy.deepcopy(p_data) masked_data["api_key"] = self.mask_key(p_data.get("api_key")) res[p] = masked_data return res - system_llm = system_prefs.get("llm", {}).get("providers", { + def get_merged_system_defaults(section_name, hardcoded_defaults): + # M6/M3: Merge Admin overrides with hardcoded config.yaml defaults. + # This prevents the admin from losing system-level keys when they + # customize other fields (like model names) in the UI. + sys_prefs_section = system_prefs.get(section_name, {}) + sys_providers = sys_prefs_section.get("providers", {}) + + if not sys_providers: + return hardcoded_defaults + + # Start with hardcoded defaults + merged = copy.deepcopy(hardcoded_defaults) + for p_id, p_data in sys_providers.items(): + if p_id not in merged: + merged[p_id] = p_data + else: + for field, val in p_data.items(): + if val is not None and val != "" and str(val).lower() != "none": + merged[p_id][field] = val + return merged + + system_llm = get_merged_system_defaults("llm", { "deepseek": {"api_key": settings.DEEPSEEK_API_KEY, "model": settings.DEEPSEEK_MODEL_NAME}, "gemini": {"api_key": settings.GEMINI_API_KEY, "model": settings.GEMINI_MODEL_NAME}, + "openai": {"api_key": settings.OPENAI_API_KEY} }) llm_providers_effective = get_effective_providers("llm", llm_prefs["providers"], system_llm) - system_tts = system_prefs.get("tts", {}).get("providers", { + system_tts = get_merged_system_defaults("tts", { settings.TTS_PROVIDER: { "api_key": settings.TTS_API_KEY, "model": settings.TTS_MODEL_NAME, @@ -93,7 +128,7 @@ }) tts_providers_effective = get_effective_providers("tts", tts_prefs["providers"], system_tts) - system_stt = system_prefs.get("stt", {}).get("providers", { + system_stt = get_merged_system_defaults("stt", { settings.STT_PROVIDER: {"api_key": settings.STT_API_KEY, "model": settings.STT_MODEL_NAME} }) stt_providers_effective = get_effective_providers("stt", stt_prefs["providers"], system_stt) diff --git a/ai-hub/app/core/services/rag.py b/ai-hub/app/core/services/rag.py index 57ab633..168971c 100644 --- a/ai-hub/app/core/services/rag.py +++ b/ai-hub/app/core/services/rag.py @@ -88,7 +88,11 @@ if "/" in provider_name: model_name_override = "" # Let factory extract model from provider_name else: - model_name_override = llm_prefs.get("model", "") + # M3: Priority order: + # 1. Message-level override (done via provider_name) + # 2. Session-level override (persisted in DB) + # 3. User-level preference + model_name_override = session.model_name or llm_prefs.get("model", "") diff --git a/ai-hub/app/core/services/session.py b/ai-hub/app/core/services/session.py index 8700417..b858953 100644 --- a/ai-hub/app/core/services/session.py +++ b/ai-hub/app/core/services/session.py @@ -52,7 +52,8 @@ self, db: Session, user_id: str, - provider_name: str, + provider_name: str, + model_name: str = None, feature_name: str = "default", stt_provider_name: str = None, tts_provider_name: str = None @@ -61,6 +62,7 @@ new_session = models.Session( user_id=user_id, provider_name=provider_name, + model_name=model_name, stt_provider_name=stt_provider_name, tts_provider_name=tts_provider_name, feature_name=feature_name, diff --git a/ai-hub/app/core/templates/provisioning/provision.ps1.j2 b/ai-hub/app/core/templates/provisioning/provision.ps1.j2 new file mode 100644 index 0000000..fa3a41d --- /dev/null +++ b/ai-hub/app/core/templates/provisioning/provision.ps1.j2 @@ -0,0 +1,33 @@ +# Cortex Agent Windows One-Liner Provisioner +Write-Host "🚀 Starting Cortex Agent Provisioning for node: {{ node_id }}" -ForegroundColor Cyan + +$installDir = "C:\CortexAgent" +if (!(Test-Path $installDir)) { + New-Item -ItemType Directory -Path $installDir | Out-Null +} +Set-Location $installDir + +# 1. Write agent_config.yaml +Write-Host "[*] Writing configuration..." -ForegroundColor Gray +$configContent = @" +{{ config_yaml.replace('—', '-') }} +"@ +$configPath = Join-Path $installDir "agent_config.yaml" +$utf8NoBom = New-Object System.Text.UTF8Encoding $false +[System.IO.File]::WriteAllText($configPath, $configContent, $utf8NoBom) + +# 2. Download bootstrap_windows.ps1 +$installerUrl = "{{ base_url }}/api/v1/agent/installer/ps1" +$installerPath = Join-Path $installDir "bootstrap_windows.ps1" +Write-Host "[*] Downloading installer from $installerUrl ..." -ForegroundColor Gray + +try { + Invoke-WebRequest -Uri $installerUrl -OutFile $installerPath +} catch { + Write-Host "❌ Failed to download installer: $_" -ForegroundColor Red + exit 1 +} + +# 3. Run bootstrap_windows.ps1 +Write-Host "[*] Bootstrapping agent..." -ForegroundColor Cyan +& $installerPath -HubUrl "{{ base_url }}" -GrpcUrl "{{ grpc_url }}" -AuthToken "{{ invite_token }}" -NodeId "{{ node_id }}" diff --git a/ai-hub/integration_tests/conftest.py b/ai-hub/integration_tests/conftest.py index 911ca33..a2ef1ce 100644 --- a/ai-hub/integration_tests/conftest.py +++ b/ai-hub/integration_tests/conftest.py @@ -52,7 +52,7 @@ "providers": { "gemini": { "api_key": os.getenv("GEMINI_API_KEY", ""), - "model": "gemini/gemini-1.5-flash" + "model": "gemini/gemini-3-flash-preview" } } }, @@ -145,8 +145,15 @@ print("[conftest] Starting local docker node containers...") network = "cortexai_default" subprocess.run(["docker", "rm", "-f", NODE_1, NODE_2], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - image_proc = subprocess.run(["docker", "build", "-q", "./agent-node"], capture_output=True, text=True) - image_id = image_proc.stdout.strip() + print("[conftest] Building agent-node image...") + try: + image_proc = subprocess.run(["docker", "build", "-q", "./agent-node"], capture_output=True, text=True, check=True) + image_id = image_proc.stdout.strip() + if not image_id: + raise Exception("Docker build -q returned empty image ID") + except subprocess.CalledProcessError as e: + print(f"❌ [conftest] Docker build failed!\nSTDOUT: {e.stdout}\nSTDERR: {e.stderr}") + raise e for node_id in [NODE_1, NODE_2]: cmd = ["docker", "run", "-d", "--name", node_id, "--network", network, "-e", f"AGENT_NODE_ID={node_id}", "-e", f"AGENT_AUTH_TOKEN={tokens[node_id]}", "-e", f"AGENT_SECRET_KEY={os.getenv('SECRET_KEY')}", "-e", "GRPC_ENDPOINT=ai-hub:50051", "-e", "HUB_URL=http://ai-hub:8000", "-e", "AGENT_TLS_ENABLED=false", image_id] subprocess.run(cmd, check=True) diff --git a/ai-hub/integration_tests/test_agents.py b/ai-hub/integration_tests/test_agents.py index 313589f..c704b4b 100644 --- a/ai-hub/integration_tests/test_agents.py +++ b/ai-hub/integration_tests/test_agents.py @@ -2,6 +2,7 @@ import httpx import os import uuid +import time from conftest import BASE_URL def _headers(): @@ -72,7 +73,6 @@ # 6. Verify Agent Periodical Execution print("\n[test] Waiting for background interval scheduler to wake the agent (timeout 60s)...") - import time messages = [] for _ in range(150): # 300s r_msgs = client.get(f"{BASE_URL}/sessions/{session_id}/messages", headers=_headers()) @@ -168,7 +168,6 @@ # 5. Wait for agent to process print(f"\n[test] Waiting for agent to process webhook signal '{custom_msg}'...") - import time found = False for _ in range(150): # 300s r_msgs = client.get(f"{BASE_URL}/sessions/{session_id}/messages", headers=_headers()) diff --git a/ai-hub/integration_tests/test_coworker_flow.py b/ai-hub/integration_tests/test_coworker_flow.py index 46bc9d7..792a0db 100644 --- a/ai-hub/integration_tests/test_coworker_flow.py +++ b/ai-hub/integration_tests/test_coworker_flow.py @@ -20,7 +20,7 @@ admin_id = os.getenv("SYNC_TEST_USER_ID", "") instance_id = None - with httpx.Client(timeout=30.0) as client: + with httpx.Client(timeout=90.0) as client: try: # 2. Deploy Agent with co_worker_quality_gate=True deploy_payload = { @@ -30,7 +30,7 @@ "max_loop_iterations": 1, "mesh_node_id": node_id, "provider_name": "gemini", - "model_name": "gemini-1.5-flash", + "model_name": "gemini-3-flash-preview", "trigger_type": "interval", "interval_seconds": 60, "co_worker_quality_gate": True, @@ -58,14 +58,23 @@ assert found_evaluating, f"Agent did not reach evaluation status." # 4. Use the /nodes/{id}/fs/ls API to verify the .cortex folder existence - params = {"path": ".cortex", "session_id": sync_workspace_id} - r_ls = client.get(f"{BASE_URL}/nodes/{node_id}/fs/ls", params=params, headers=_headers()) - assert r_ls.status_code == 200, f"Failed to ls .cortex: {r_ls.text}" - data = r_ls.json() - filenames = [f["name"] for f in data.get("files", [])] - # Verify rubric.md and history.log are present as per test plan - assert any("rubric.md" in f for f in filenames), f"rubric.md not found in {filenames}" - assert any("history.log" in f for f in filenames), f"history.log not found in {filenames}" + print("[test] Verifying .cortex mirror files...") + files_found = False + for _ in range(30): + params = {"path": ".cortex", "session_id": sync_workspace_id} + r_ls = client.get(f"{BASE_URL}/nodes/{node_id}/fs/ls", params=params, headers=_headers()) + if r_ls.status_code == 200: + data = r_ls.json() + filenames = [f["name"] for f in data.get("files", [])] + print(f" [debug] Found files: {filenames}") + has_rubric = any("rubric.md" in f for f in filenames) + has_history = any("history.log" in f for f in filenames) + if has_rubric and has_history: + files_found = True + break + time.sleep(2) + + assert files_found, f"Required mirror files (rubric.md, history.log) not found in .cortex directory." finally: if instance_id: @@ -83,7 +92,7 @@ admin_id = os.getenv("SYNC_TEST_USER_ID", "") instance_id = None - with httpx.Client(timeout=30.0) as client: + with httpx.Client(timeout=90.0) as client: try: # 2. Deploy Agent with max_rework_attempts=1 and rework_threshold=100 deploy_payload = { @@ -92,7 +101,7 @@ "max_loop_iterations": 2, "mesh_node_id": node_id, "provider_name": "gemini", - "model_name": "gemini-1.5-flash", + "model_name": "gemini-3-flash-preview", "trigger_type": "webhook", # Use webhook to trigger manually "co_worker_quality_gate": True, "max_rework_attempts": 1, @@ -150,7 +159,7 @@ admin_id = os.getenv("SYNC_TEST_USER_ID", "") instance_id = None - with httpx.Client(timeout=30.0) as client: + with httpx.Client(timeout=90.0) as client: try: # 2. Deploy Agent with rework loop deploy_payload = { @@ -159,7 +168,7 @@ "max_loop_iterations": 2, "mesh_node_id": node_id, "provider_name": "gemini", - "model_name": "gemini-1.5-flash", + "model_name": "gemini-3-flash-preview", "trigger_type": "webhook", "co_worker_quality_gate": True, "max_rework_attempts": 3, @@ -205,7 +214,7 @@ node_id = os.getenv("SYNC_TEST_NODE2", "test-node-2") admin_id = os.getenv("SYNC_TEST_USER_ID", "") instance_id = None - with httpx.Client(timeout=30.0) as client: + with httpx.Client(timeout=90.0) as client: try: # Register node check removed (leveraging session-scoped nodes) @@ -215,7 +224,7 @@ "max_loop_iterations": 1, "mesh_node_id": node_id, "provider_name": "gemini", - "model_name": "gemini-1.5-flash", + "model_name": "gemini-3-flash-preview", "trigger_type": "interval", "co_worker_quality_gate": True, "max_rework_attempts": 5, diff --git a/ai-hub/integration_tests/test_coworker_full_journey.py b/ai-hub/integration_tests/test_coworker_full_journey.py index d92e0be..71d9b25 100644 --- a/ai-hub/integration_tests/test_coworker_full_journey.py +++ b/ai-hub/integration_tests/test_coworker_full_journey.py @@ -43,7 +43,7 @@ "max_loop_iterations": 5, "mesh_node_id": node_id, "provider_name": "gemini", - "model_name": "gemini-1.5-flash", + "model_name": "gemini-3-flash-preview", "trigger_type": "webhook", "co_worker_quality_gate": True, "max_rework_attempts": 2, diff --git a/ai-hub/integration_tests/test_parallel_coworker.py b/ai-hub/integration_tests/test_parallel_coworker.py index 5471843..930302c 100644 --- a/ai-hub/integration_tests/test_parallel_coworker.py +++ b/ai-hub/integration_tests/test_parallel_coworker.py @@ -26,7 +26,7 @@ "max_loop_iterations": 1, "mesh_node_id": node_id, "provider_name": "gemini", - "model_name": "gemini-1.5-flash", + "model_name": "gemini-3-flash-preview", "trigger_type": "webhook", "co_worker_quality_gate": True, "default_prompt": "Tell me about the history of the internet.", diff --git a/docker-compose.yml b/docker-compose.yml index 310bd44..e68d162 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -26,9 +26,9 @@ - "50051:50051" environment: - PATH_PREFIX=/api/v1 - - HUB_API_URL=http://localhost:8000 - - HUB_PUBLIC_URL=http://localhost:8002 - - HUB_GRPC_ENDPOINT=localhost:50051 + - HUB_API_URL=${HUB_API_URL:-http://localhost:8000} + - HUB_PUBLIC_URL=${HUB_PUBLIC_URL:-http://localhost:8002} + - HUB_GRPC_ENDPOINT=${HUB_GRPC_ENDPOINT:-localhost:50051} - SUPER_ADMINS=${SUPER_ADMINS:-admin@example.com} - CORTEX_ADMIN_PASSWORD=${CORTEX_ADMIN_PASSWORD} - SECRET_KEY=${SECRET_KEY:-default-insecure-key} diff --git a/frontend/src/features/nodes/pages/NodesPage.js b/frontend/src/features/nodes/pages/NodesPage.js index 78fdc45..52b9527 100644 --- a/frontend/src/features/nodes/pages/NodesPage.js +++ b/frontend/src/features/nodes/pages/NodesPage.js @@ -714,25 +714,41 @@ -
-
- - +
+
+
+ + +
+ curl -sSL '{window.location.origin}/api/v1/nodes/provision/{node.node_id}?token={node.invite_token}' | python3 -
-

Best for terminal-only servers. Installs agent as a persistent service.

+ +
+
+ + +
+ + irm '{window.location.origin}/api/v1/nodes/provision/ps1/${node.node_id}?token=${node.invite_token}' | iex + +
+

Best for terminal-only servers. Installs agent as a persistent service.

)} diff --git a/frontend/src/features/swarm/hooks/useSwarmControl.js b/frontend/src/features/swarm/hooks/useSwarmControl.js index 648b1fe..0f95ecf 100644 --- a/frontend/src/features/swarm/hooks/useSwarmControl.js +++ b/frontend/src/features/swarm/hooks/useSwarmControl.js @@ -84,21 +84,6 @@ } catch (e) { console.warn("Could not check session provider", e); } setLocalActiveLLM(llm); - // Config check - const eff = configData?.effective || {}; - const missing = []; - const llmProviders = eff.llm?.providers || {}; - const hasLLMKey = Object.values(llmProviders).some(p => p.api_key && p.api_key !== 'None'); - if (!hasLLMKey) missing.push("Language Model (LLM) API Key"); - - if (missing.length > 0) { - setIsConfigured(false); - setMissingConfigs(missing); - } else { - setIsConfigured(true); - setMissingConfigs([]); - } - await fetchSessionHistory(sid); await fetchTokenUsage(); } catch (error) { @@ -109,6 +94,36 @@ setup(); }, [fetchSessionHistory, fetchTokenUsage]); + // M6/M3: Re-validate configuration whenever config data or selection changes + useEffect(() => { + if (!userConfigData) return; + + const eff = userConfigData.effective || {}; + const missing = []; + const llmProviders = eff.llm?.providers || {}; + + // 1. Structural check: does ANY provider have a key? (Global gate) + const overallHasKey = Object.values(llmProviders).some(p => p.api_key && !["None", "none", ""].includes(String(p.api_key))); + + // 2. Contextual check: does the CURRENT selection have a key? + const activeProviderData = llmProviders[localActiveLLM]; + const activeHasKey = activeProviderData?.api_key && !["None", "none", ""].includes(String(activeProviderData.api_key)); + + if (!overallHasKey) { + missing.push("Language Model (LLM) API Key"); + } else if (localActiveLLM && !activeHasKey) { + missing.push(`API Key missing for ${localActiveLLM}`); + } + + if (missing.length > 0) { + setIsConfigured(false); + setMissingConfigs(missing); + } else { + setIsConfigured(true); + setMissingConfigs([]); + } + }, [userConfigData, localActiveLLM]); + const handleSendChat = useCallback(async (text) => { if (!isConfigured && text.trim().toLowerCase() !== "/new") { setErrorMessage("Swarm Control requires a valid LLM configuration. Please visit Settings to set up your API keys.");