diff --git a/agent-node/bootstrap_windows.ps1 b/agent-node/bootstrap_windows.ps1 index 1b3afc2..09dec90 100644 --- a/agent-node/bootstrap_windows.ps1 +++ b/agent-node/bootstrap_windows.ps1 @@ -3,6 +3,7 @@ [string]$AuthToken = "", [string]$HubUrl = "", [string]$GrpcUrl = "", + [bool]$Tls = $false, [switch]$RegisterService = $false, [switch]$ForceFirewall = $false, [switch]$AutoRun = $true @@ -131,7 +132,7 @@ AGENT_AUTH_TOKEN=$AuthToken AGENT_HUB_URL=$HubUrl GRPC_ENDPOINT=$GrpcUrl -AGENT_TLS_ENABLED=false +AGENT_TLS_ENABLED=$($Tls.ToString().ToLower()) PYTHONUTF8=1 "@ $envFile | Out-File -FilePath ".env" -Encoding ascii diff --git a/agent-node/src/agent_node/core/watcher.py b/agent-node/src/agent_node/core/watcher.py index bfebe68..12cff71 100644 --- a/agent-node/src/agent_node/core/watcher.py +++ b/agent-node/src/agent_node/core/watcher.py @@ -4,10 +4,13 @@ import hashlib import zlib try: + import platform + if platform.system() == "Windows": + raise ImportError("Watchdog disabled on Windows for stability") from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler HAS_WATCHDOG = True -except ImportError: +except (ImportError, Exception): # Optional dependency: Only required for live file sync/push-to-node features. Observer = object FileSystemEventHandler = object diff --git a/agent-node/src/agent_node/main.py b/agent-node/src/agent_node/main.py index 4cba41c..529d1cf 100644 --- a/agent-node/src/agent_node/main.py +++ b/agent-node/src/agent_node/main.py @@ -13,11 +13,7 @@ if sys.argv and sys.argv[0] and not os.path.isabs(sys.argv[0]): sys.argv[0] = os.path.abspath(sys.argv[0]) -# M6: Force UTF-8 for stdout/stderr on Windows to prevent GBK crashes with emojis -if os.name == 'nt': - import io - sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') - sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') +# M6: UTF-8 re-wrap removed for stability on Windows # Add root and protos to path _root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) @@ -74,99 +70,102 @@ except ImportError: pass +_windows_mutex_handle = None # Global to prevent GC + def enforce_singleton(): """ Ensures that only one instance of the agent is running from this directory. - If siblings are found, they are terminated to prevent resource/port collisions. - This version is robust across Linux and Darwin and avoids unnecessary OS bails. + - Windows: Uses a Named Mutex via ctypes. + - POSIX: Iterates psutil to find and kill siblings. """ - import psutil import os + import sys + import hashlib - current_pid = os.getpid() + # 1. Identify ourselves based on the installation path try: - # Use realpath to resolve any symlinks for accurate comparison. - # normcase is essential for Windows (case-insensitivity and backslashes). my_path = os.path.normcase(os.path.realpath(__file__)) except: my_path = os.path.normcase(os.path.abspath(__file__)) - - cleaned = 0 - try: - # iterate over all processes once - for proc in psutil.process_iter(['pid', 'cmdline']): - try: - pid = proc.info['pid'] - if pid == current_pid: - continue - - cmd = proc.info['cmdline'] - if not cmd or not isinstance(cmd, list): - continue - - # We identify a sibling if it's running 'main.py' and resolves to our same directory. - is_sibling = False - for arg in cmd: - if 'main.py' in arg: - try: - # 1. Try absolute path resolution - if os.path.isabs(arg): - check_path = os.path.normcase(os.path.realpath(arg)) - else: - # 2. Try relative resolution based on the sibling's current working directory - try: - cwd = proc.cwd() - check_path = os.path.normcase(os.path.realpath(os.path.join(cwd, arg))) - except (psutil.AccessDenied, psutil.NoSuchProcess): - # If we can't get the CWD of the other process, we rely on a direct name match - # but stay conservative to avoid killing unrelated processes. - check_path = None - - if check_path and check_path == my_path: - is_sibling = True - break - except: - continue - - if is_sibling: - print(f"[*] Cleaning up orphaned agent instance (PID {pid})...") - try: - p = psutil.Process(pid) - - # First, mercilessly slaughter all child processes (e.g. spawned PTY shells) - try: - children = p.children(recursive=True) - for child in children: - try: - child.kill() - except (psutil.NoSuchProcess, psutil.AccessDenied): - pass - except (psutil.NoSuchProcess, psutil.AccessDenied): - pass - - # Then terminate the parent - p.terminate() - try: - p.wait(timeout=2) - except psutil.TimeoutExpired: - p.kill() - cleaned += 1 - except (psutil.NoSuchProcess, psutil.AccessDenied): - pass - except (psutil.NoSuchProcess, psutil.AccessDenied, KeyError): - continue - except Exception as e: - # Non-fatal: if we can't iterate processes at all, just log and continue. - # This prevents the agent from being a 'brick' in restricted environments. - print(f"[!] Singleton check warning: {e}") + + if os.name == 'nt': + # --- WINDOWS NAMED MUTEX --- + import ctypes + global _windows_mutex_handle + + # Create a unique mutex name based on the installation path + path_hash = hashlib.md5(my_path.encode()).hexdigest() + mutex_name = f"Global\\CortexAgent_{path_hash}" + + # CreateMutexW returns a handle even if it already exists + _windows_mutex_handle = ctypes.windll.kernel32.CreateMutexW(None, False, mutex_name) + err = ctypes.windll.kernel32.GetLastError() + ERROR_ALREADY_EXISTS = 183 + + if err == ERROR_ALREADY_EXISTS: + print(f"[!] FATAL: Multiple instances detected. Another agent is already running in this directory.") + print(f" (Conflict on Windows Mutex: {mutex_name})") + sys.exit(1) - if cleaned > 0: - print(f"[*] Successfully reaped {cleaned} orphaned instances.") - - if cleaned > 0: - print(f"[*] Cleaned up {cleaned} orphaned instances.") + print(f"[*] Windows Singleton Check Passed. Mutex held: {mutex_name}") else: - print("[*] No conflicting agent instances detected.") + # --- POSIX PSUTIL SEARCH-AND-DESTROY --- + import psutil + current_pid = os.getpid() + cleaned = 0 + try: + for proc in psutil.process_iter(['pid', 'cmdline']): + try: + pid = proc.info['pid'] + if pid == current_pid: + continue + cmd = proc.info['cmdline'] + if not cmd or not isinstance(cmd, list): + continue + + is_sibling = False + for arg in cmd: + if 'main.py' in arg: + try: + if os.path.isabs(arg): + check_path = os.path.normcase(os.path.realpath(arg)) + else: + try: + cwd = proc.cwd() + check_path = os.path.normcase(os.path.realpath(os.path.join(cwd, arg))) + except (psutil.AccessDenied, psutil.NoSuchProcess): + check_path = None + + if check_path and check_path == my_path: + is_sibling = True + break + except: + continue + + if is_sibling: + print(f"[*] Cleaning up orphaned agent instance (PID {pid})...") + try: + p = psutil.Process(pid) + try: + children = p.children(recursive=True) + for child in children: + try: child.kill() + except (psutil.NoSuchProcess, psutil.AccessDenied): pass + except (psutil.NoSuchProcess, psutil.AccessDenied): pass + p.terminate() + try: p.wait(timeout=2) + except psutil.TimeoutExpired: p.kill() + cleaned += 1 + except (psutil.NoSuchProcess, psutil.AccessDenied): pass + except (psutil.NoSuchProcess, psutil.AccessDenied, KeyError): + continue + except Exception as e: + print(f"[!] Singleton check warning: {e}") + + if cleaned > 0: + print(f"[*] Successfully reaped {cleaned} orphaned instances.") + else: + print("[*] No conflicting POSIX agent instances detected.") def main(): import logging @@ -180,10 +179,8 @@ # If the main loop hangs, this will force-reboot the agent. watchdog.start() - # 0. Singleton Enforcement: Murder siblings before booting + # 0. Singleton Enforcement: Ensure only one agent per directory try: - import psutil - print("[*] Running singleton check...") enforce_singleton() except Exception as e: print(f"[!] Singleton check failed: {e}") @@ -237,7 +234,6 @@ node.stop() except: pass - import traceback traceback.print_exc() time.sleep(10) diff --git a/ai-hub/app/config.py b/ai-hub/app/config.py index 28bb186..de09763 100644 --- a/ai-hub/app/config.py +++ b/ai-hub/app/config.py @@ -151,9 +151,9 @@ get_from_yaml(["swarm", "grpc_endpoint"]) or \ config_from_pydantic.swarm.grpc_endpoint - # Infer TLS from endpoint + # Infer TLS from endpoint or port protocol = self.GRPC_EXTERNAL_ENDPOINT.split("://")[0] if self.GRPC_EXTERNAL_ENDPOINT and "://" in self.GRPC_EXTERNAL_ENDPOINT else "http" - self.GRPC_TLS_ENABLED: bool = (protocol == "https") + self.GRPC_TLS_ENABLED: bool = (protocol == "https") or (self.GRPC_TARGET_ORIGIN and ":443" in self.GRPC_TARGET_ORIGIN) # Legacy paths (no longer in UI, but kept for env var parity if needed) self.GRPC_CERT_PATH: Optional[str] = os.getenv("GRPC_CERT_PATH") or get_from_yaml(["swarm", "cert_path"]) diff --git a/ai-hub/app/core/grpc/services/grpc_server.py b/ai-hub/app/core/grpc/services/grpc_server.py index f448f5f..ba38a7c 100644 --- a/ai-hub/app/core/grpc/services/grpc_server.py +++ b/ai-hub/app/core/grpc/services/grpc_server.py @@ -258,6 +258,20 @@ logger.warning(f"[!] TaskStream rejected: Node {node_id} not registered via SyncConfiguration.") return + # M6: Eject 'Zombie' connections. If this node already has a TaskStream context, + # it might be a stale connection from a previous incarnation of the agent. + # We abort the old one to ensure the new one takes over the LiveNodeRecord. + if node.stream: + logger.warning(f"[*] Node {node_id} has a STALE TaskStream. Ejecting Zombie connection...") + try: + node.stream.abort(grpc.StatusCode.ABORTED, "New connection established.") + # Give the old finally block a tiny slice of time to run deregister if needed, + # though we are about to overwrite it anyway. + time.sleep(0.1) + except Exception as ae: + logger.error(f"[!] Failed to abort stale stream for {node_id}: {ae}") + + node.stream = context # Track the active stream logger.info(f"[*] Node {node_id} Online (TaskStream established)") try: @@ -317,7 +331,12 @@ except Exception as e: logger.error(f"[!] TaskStream Error for {node_id}: {e}") finally: - if node_id != "unknown": + if node_id != "unknown" and node: + # 0. Only nullify the record's stream if THIS context is the one owning it. + # This prevents a new successful stream from being cleared by a late-arriving cleanup of a zombie. + if node.stream is context: + node.stream = None + logger.warning(f"[📶] gRPC Stream TERMINATED for {node_id}. Cleaning up.") # M6: Clean up orphaned I/O locks for this node's sessions to prevent memory leak diff --git a/ai-hub/app/core/services/mesh.py b/ai-hub/app/core/services/mesh.py index 99f86a3..6a0e904 100644 --- a/ai-hub/app/core/services/mesh.py +++ b/ai-hub/app/core/services/mesh.py @@ -9,6 +9,7 @@ import jinja2 from app.db import models +from app.config import settings from app.api import schemas from app.api.dependencies import ServiceContainer from app.core.grpc.utils.crypto import sign_payload @@ -269,7 +270,10 @@ return self._render_provision_template("provision.sh.j2", node, config_yaml, base_url) def generate_provisioning_ps1(self, node: models.AgentNode, config_yaml: str, base_url: str, grpc_url: str = "") -> str: - params = {"grpc_url": grpc_url or base_url.replace("http://", "").replace("https://", "")} + params = { + "grpc_url": grpc_url or base_url.replace("http://", "").replace("https://", ""), + "tls": settings.GRPC_TLS_ENABLED + } return self._render_provision_template("provision.ps1.j2", node, config_yaml, base_url, **params) def _render_provision_template(self, template_name: str, node: models.AgentNode, config_yaml: str, base_url: str, **kwargs) -> str: diff --git a/ai-hub/app/core/services/node_registry.py b/ai-hub/app/core/services/node_registry.py index 1b25c42..6e867ba 100644 --- a/ai-hub/app/core/services/node_registry.py +++ b/ai-hub/app/core/services/node_registry.py @@ -79,6 +79,7 @@ self.session_id: str = str(uuid.uuid4()) self.terminal_history: collections.deque = collections.deque(maxlen=150) # Recent PTY lines for AI reading self._registry_executor = None # Set by registry + self.stream: Optional[Any] = None # NEW: Tracks the active gRPC TaskStream context def send_message(self, msg: Any, priority: int = 2): """ diff --git a/ai-hub/app/core/services/preference.py b/ai-hub/app/core/services/preference.py index 6f7fbdb..317d224 100644 --- a/ai-hub/app/core/services/preference.py +++ b/ai-hub/app/core/services/preference.py @@ -448,6 +448,31 @@ except Exception as e: raise ValueError(f"Failed to initialize LLM provider '{litellm_provider}' with model '{resolved_model}': {e}") + def get_user_llm_providers(self, user_id: str, db) -> List[Any]: + """ + Helper method to retrieve all effectively configured LLM providers for a specific user. + Used as a last-resort fallback in resolve_llm_provider. + """ + from app.db.models import User + user = db.query(User).filter(User.id == user_id).first() + if not user: + return [] + + # Use existing merge logic to get effective config + config = self.merge_user_config(user, db) + llm_providers = config.effective.get("llm", {}).get("providers", {}) + + results = [] + for p_name, p_data in llm_providers.items(): + # Create a simple attribute-accessible object for the caller + class ProviderInfo: + def __init__(self, name: str, model: str): + self.name = name + self.model = model + results.append(ProviderInfo(p_name, p_data.get("model", ""))) + + return results + async def get_provider_models(self, provider_name: str, section: str = "llm") -> List[Dict[str, Any]]: """Fetches supported models for a specific provider and section using LiteLLM.""" import litellm diff --git a/ai-hub/app/core/templates/provisioning/provision.ps1.j2 b/ai-hub/app/core/templates/provisioning/provision.ps1.j2 index 64362c4..d121dc2 100644 --- a/ai-hub/app/core/templates/provisioning/provision.ps1.j2 +++ b/ai-hub/app/core/templates/provisioning/provision.ps1.j2 @@ -30,4 +30,4 @@ # 3. Run bootstrap_windows.ps1 Write-Host "[*] Bootstrapping agent..." -ForegroundColor Cyan -powershell.exe -ExecutionPolicy Bypass -File $installerPath -HubUrl "{{ base_url }}" -GrpcUrl "{{ grpc_url }}" -AuthToken "{{ invite_token }}" -NodeId "{{ node_id }}" +powershell.exe -ExecutionPolicy Bypass -File $installerPath -HubUrl "{{ base_url }}" -GrpcUrl "{{ grpc_url }}" -AuthToken "{{ invite_token }}" -NodeId "{{ node_id }}" -Tls {% if tls %}$true{% else %}$false{% endif %}