Newer
Older
cortex-hub / agent-node / src / agent_node / main.py
import sys
import os
# gRPC/Mac Stability Tuning
os.environ["GRPC_ENABLE_FORK_SUPPORT"] = "0" 
os.environ["GRPC_FORK_SUPPORT_ONLY_FOR_SIGCHLD"] = "1"
os.environ["GRPC_POLL_STRATEGY"] = "poll"

# RELIABILITY: Ensure sys.argv[0] is an absolute path. 
# This is critical for os.execv(...) restarts during auto-updates, 
# especially when the agent is started with a relative path (e.g., 'python3 src/main.py').
if sys.argv and sys.argv[0] and not os.path.isabs(sys.argv[0]):
    sys.argv[0] = os.path.abspath(sys.argv[0])

# Add root and protos to path with HIGHEST priority to avoid collision with installed packages
_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, _root)
sys.path.insert(0, os.path.join(_root, "protos"))

import signal
import time
from agent_node.config import NODE_ID, HUB_URL, AUTH_TOKEN, SECRET_KEY, AUTO_UPDATE, UPDATE_CHECK_INTERVAL
from agent_node.core import updater

# Pre-flight check for core dependencies
try:
    import grpc
    import google.protobuf
    import watchdog
except ImportError as e:
    err_str = str(e).lower()
    if "grpc" in err_str:
        missing, pkg = "grpcio", "python3-grpcio"
    elif "google" in err_str or "protobuf" in err_str:
        missing, pkg = "protobuf", "python3-protobuf"
    else:
        missing, pkg = "watchdog", "python3-watchdog"
        
    print("\n" + "!"*71)
    print(f" CRITICAL ERROR: '{missing}' library is not installed.")
    print(f" If you are on Raspberry Pi / ARM, run:")
    print(f"     sudo apt-get install {pkg}")
    print("!"*71 + "\n")
except ImportError:
    pass

def enforce_singleton():
    """
    Ensures that only one instance of the agent is running from this directory.
    If siblings are found, they are terminated to prevent resource/port collisions.
    This version is robust across Linux and Darwin and avoids unnecessary OS bails.
    """
    import psutil
    import os
    
    current_pid = os.getpid()
    try:
        # Use realpath to resolve any symlinks for accurate comparison
        my_path = os.path.realpath(__file__)
    except:
        my_path = os.path.abspath(__file__)
    
    cleaned = 0
    try:
        # iterate over all processes once
        for proc in psutil.process_iter(['pid', 'cmdline']):
            try:
                pid = proc.info['pid']
                if pid == current_pid:
                    continue
                    
                cmd = proc.info['cmdline']
                if not cmd or not isinstance(cmd, list):
                    continue
                    
                # We identify a sibling if it's running 'main.py' and resolves to our same directory.
                is_sibling = False
                for arg in cmd:
                    if 'main.py' in arg:
                        try:
                            # 1. Try absolute path resolution
                            if os.path.isabs(arg):
                                check_path = os.path.realpath(arg)
                            else:
                                # 2. Try relative resolution based on the sibling's current working directory
                                try:
                                    cwd = proc.cwd()
                                    check_path = os.path.realpath(os.path.join(cwd, arg))
                                except (psutil.AccessDenied, psutil.NoSuchProcess):
                                    # If we can't get the CWD of the other process, we rely on a direct name match 
                                    # but stay conservative to avoid killing unrelated processes.
                                    check_path = None
                            
                            if check_path and check_path == my_path:
                                is_sibling = True
                                break
                        except:
                            continue
                
                if is_sibling:
                    print(f"[*] Cleaning up orphaned agent instance (PID {pid})...")
                    try:
                        p = psutil.Process(pid)
                        
                        # First, mercilessly slaughter all child processes (e.g. spawned PTY shells)
                        try:
                            children = p.children(recursive=True)
                            for child in children:
                                try:
                                    child.kill()
                                except (psutil.NoSuchProcess, psutil.AccessDenied):
                                    pass
                        except (psutil.NoSuchProcess, psutil.AccessDenied):
                            pass
                            
                        # Then terminate the parent
                        p.terminate()
                        try:
                            p.wait(timeout=2)
                        except psutil.TimeoutExpired:
                            p.kill()
                        cleaned += 1
                    except (psutil.NoSuchProcess, psutil.AccessDenied):
                        pass
            except (psutil.NoSuchProcess, psutil.AccessDenied, KeyError):
                continue
    except Exception as e:
        # Non-fatal: if we can't iterate processes at all, just log and continue.
        # This prevents the agent from being a 'brick' in restricted environments.
        print(f"[!] Singleton check warning: {e}")
            
    if cleaned > 0:
        print(f"[*] Successfully reaped {cleaned} orphaned instances.")
            
    if cleaned > 0:
        print(f"[*] Cleaned up {cleaned} orphaned instances.")
    else:
        print("[*] No conflicting agent instances detected.")

def main():
    import logging
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
        handlers=[logging.StreamHandler(sys.stdout)]
    )
    
    # 0. Singleton Enforcement: Murder siblings before booting
    try:
        import psutil
        enforce_singleton()
    except ImportError:
        print("[!] psutil not installed — skipping singleton enforcement. Beware of orphaned processes!")
    except Exception as e:
        print(f"[!] Singleton check failed: {e}")

    print(f"[*] Starting Agent Node: {NODE_ID}...")

    # 0. Auto-Update Check (before anything else — if we're behind, restart now)
    # This uses only standard libraries, making it extremely resilient.
    if AUTO_UPDATE:
        try:
            updater.init(hub_http_url=HUB_URL, auth_token=SECRET_KEY, check_interval_secs=UPDATE_CHECK_INTERVAL)
            updater.check_and_update_once()  # May restart process — does not return if update applied
            
            # Start background updater BEFORE initializing the node.
            # This ensures that even if AgentNode() crashes (e.g. missing grpcio), 
            # the node can still self-repair if a fix is pushed to the hub.
            updater.start_background_updater()
        except Exception as e:
            print(f"[!] Updater initialization failed: {e}. Moving on to agent boot...")

    # 1. Initialization and Main Persistence Loop
    while True:
        try:
            # Deferred Import: We import the core AgentNode inside the loop.
            # This ensures that even if a future update breaks the code (e.g. missing dependency),
            # the process STILL stays alive and the Background Updater (started above) 
            # can still pull a new fix when it becomes available on the hub.
            from agent_node.node import AgentNode
            
            node = AgentNode()
            
            # 2. Signal Handling for Graceful Shutdown
            def handle_exit(sig, frame):
                node.stop()
                sys.exit(0)
                
            signal.signal(signal.SIGINT, handle_exit)
            signal.signal(signal.SIGTERM, handle_exit)

            # Handshake: Sync configuration and Sandbox Policy
            node.sync_configuration()
            
            # 3. Background: Start health reporting (Heartbeats)
            node.start_health_reporting()

            # 4. Foreground: Run Persistent Task Stream (Indefinite wait)
            node.run_task_stream()
            
        except Exception as e:
            print(f"[!] Main Agent process crashed: {e}. Retrying boot in 10s...", flush=True)
            try:
                if 'node' in locals():
                    node.stop()
            except:
                pass
            import traceback
            traceback.print_exc()
            time.sleep(10)

if __name__ == '__main__':
    main()