Newer
Older
cortex-hub / agent-node / src / agent_node / main.py
import os
import sys
import traceback

# Consolidate gRPC/Mac Stability Tuning
# On macOS, gRPC + Fork (pty.fork) is stable ONLY if fork support is disabled
# or carefully managed. We disable it to be safe.
os.environ["GRPC_ENABLE_FORK_SUPPORT"] = "1" 
os.environ["GRPC_FORK_SUPPORT_ONLY_FOR_SIGCHLD"] = "1"
os.environ["GRPC_POLL_STRATEGY"] = "poll"

# RELIABILITY: Ensure sys.argv[0] is an absolute path. 
# This is critical for os.execv(...) restarts during auto-updates.
if sys.argv and sys.argv[0] and not os.path.isabs(sys.argv[0]):
    sys.argv[0] = os.path.abspath(sys.argv[0])

# Force UTF-8 encoding for stdout/stderr to handle emojis on Windows with 'gbk' locales
if sys.platform == 'win32':
    import io
    try:
        if hasattr(sys.stdout, 'buffer'):
            sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
        if hasattr(sys.stderr, 'buffer'):
            sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
    except Exception:
        pass
    os.environ['PYTHONUTF8'] = '1'

# Add root and protos to path
_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, _root)
sys.path.insert(0, os.path.join(_root, "protos"))

import signal
import time
# Try to load version from VERSION file, fallback to 1.1.2
def get_version():
    try:
        v_path = os.path.join(os.path.dirname(__file__), "..", "..", "VERSION")
        if os.path.exists(v_path):
            with open(v_path, "r") as f:
                return f.read().strip()
    except: pass
    return "1.1.2"

VERSION = get_version()
import threading
from agent_node.config import NODE_ID, HUB_URL, AUTH_TOKEN, SECRET_KEY, AUTO_UPDATE, UPDATE_CHECK_INTERVAL
from agent_node.core import updater
from agent_node.utils.watchdog import watchdog

def handle_sigquit(sig, frame):
    """Prints a thread dump to stdout for debugging deadlocks."""
    print("\n" + "="*80)
    print(" [SIGQUIT] THREAD DUMP - ANALYZING AGENT STATE")
    print("="*80)
    for thread_id, stack in sys._current_frames().items():
        # Identify thread name if possible
        t_name = "Unknown"
        for t in threading.enumerate():
            if t.ident == thread_id:
                t_name = t.name
                break
        print(f"\nThread: {t_name} (ID: {thread_id})")
        traceback.print_stack(stack)
    print("="*80 + "\n", flush=True)

# Register early
if hasattr(signal, 'SIGQUIT'):
    signal.signal(signal.SIGQUIT, handle_sigquit)

# Pre-flight check for core dependencies
try:
    import grpc
    import google.protobuf
    import psutil
except ImportError as e:
    err_str = str(e).lower()
    if "grpc" in err_str:
        missing, pkg = "grpcio", "python3-grpcio"
    elif "google" in err_str or "protobuf" in err_str:
        missing, pkg = "protobuf", "python3-protobuf"
    else:
        missing, pkg = "psutil", "python3-psutil"
        
    print("\n" + "!"*71)
    print(f" CRITICAL ERROR: '{missing}' library is not installed.")
    print(f" If you are on Raspberry Pi / ARM, run:")
    print(f"     sudo apt-get install {pkg}")
    print("!"*71 + "\n")
except ImportError:
    pass

_windows_mutex_handle = None  # Global to prevent GC

def enforce_singleton():
    """
    Ensures that only one instance of the agent is running from this directory.
    - Windows: Uses a Named Mutex via ctypes.
    - POSIX: Iterates psutil to find and kill siblings.
    """
    import os
    import sys
    import hashlib
    
    # 1. Identify ourselves based on the installation path
    try:
        my_path = os.path.normcase(os.path.realpath(__file__))
    except:
        my_path = os.path.normcase(os.path.abspath(__file__))
        
    if os.name == 'nt':
        # --- WINDOWS NAMED MUTEX ---
        import ctypes
        global _windows_mutex_handle
        
        # Create a unique mutex name based on the installation path
        path_hash = hashlib.md5(my_path.encode()).hexdigest()
        mutex_name = f"Global\\CortexAgent_{path_hash}"
        
        # CreateMutexW returns a handle even if it already exists
        _windows_mutex_handle = ctypes.windll.kernel32.CreateMutexW(None, False, mutex_name)
        err = ctypes.windll.kernel32.GetLastError()
        ERROR_ALREADY_EXISTS = 183
        
        if err == ERROR_ALREADY_EXISTS:
            print(f"[!] FATAL: Multiple instances detected. Another agent is already running in this directory.")
            print(f"    (Conflict on Windows Mutex: {mutex_name})")
            sys.exit(1)
            
        print(f"[*] Windows Singleton Check Passed. Mutex held: {mutex_name}")
    else:
        # --- POSIX PSUTIL SEARCH-AND-DESTROY ---
        import psutil
        current_pid = os.getpid()
        cleaned = 0
        try:
            for proc in psutil.process_iter(['pid', 'cmdline']):
                try:
                    pid = proc.info['pid']
                    if pid == current_pid:
                        continue
                    cmd = proc.info['cmdline']
                    if not cmd or not isinstance(cmd, list):
                        continue
                    
                    is_sibling = False
                    for arg in cmd:
                        if 'main.py' in arg:
                            try:
                                if os.path.isabs(arg):
                                    check_path = os.path.normcase(os.path.realpath(arg))
                                else:
                                    try:
                                        cwd = proc.cwd()
                                        check_path = os.path.normcase(os.path.realpath(os.path.join(cwd, arg)))
                                    except (psutil.AccessDenied, psutil.NoSuchProcess):
                                        check_path = None
                                
                                if check_path and check_path == my_path:
                                    is_sibling = True
                                    break
                            except:
                                continue
                    
                    if is_sibling:
                        print(f"[*] Cleaning up orphaned agent instance (PID {pid})...")
                        try:
                            p = psutil.Process(pid)
                            try:
                                children = p.children(recursive=True)
                                for child in children:
                                    try: child.kill()
                                    except (psutil.NoSuchProcess, psutil.AccessDenied): pass
                            except (psutil.NoSuchProcess, psutil.AccessDenied): pass
                            p.terminate()
                            try: p.wait(timeout=2)
                            except psutil.TimeoutExpired: p.kill()
                            cleaned += 1
                        except (psutil.NoSuchProcess, psutil.AccessDenied): pass
                except (psutil.NoSuchProcess, psutil.AccessDenied, KeyError):
                    continue
        except Exception as e:
            print(f"[!] Singleton check warning: {e}")
            
        if cleaned > 0:
            print(f"[*] Successfully reaped {cleaned} orphaned instances.")
        else:
            print("[*] No conflicting POSIX agent instances detected.")

def main():
    import logging
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
        handlers=[logging.StreamHandler(sys.stdout)]
    )
    
    # RELIABILITY: Start the self-watchdog to prevent silent deadlocks
    # If the main loop hangs, this will force-reboot the agent.
    watchdog.start()
    
    # 0. Singleton Enforcement: Ensure only one agent per directory
    try:
        enforce_singleton()
    except Exception as e:
        print(f"[!] Singleton check failed: {e}")
        traceback.print_exc()

    print(f"[*] Starting Agent Node: {NODE_ID}...")

    # 0. Auto-Update Check
    if AUTO_UPDATE:
        try:
            print("[*] Initializing auto-updater...")
            updater.init(hub_http_url=HUB_URL, auth_token=SECRET_KEY, check_interval_secs=UPDATE_CHECK_INTERVAL)
            updater.check_and_update_once()  # May restart process
            updater.start_background_updater()
        except Exception as e:
            print(f"[!] Updater initialization failed: {e}. Moving on...")
            traceback.print_exc()

    # 1. Initialization and Main Persistence Loop
    while True:
        try:
            # Deferred Import: We import the core AgentNode inside the loop.
            # This ensures that even if a future update breaks the code (e.g. missing dependency),
            # the process STILL stays alive and the Background Updater (started above) 
            # can still pull a new fix when it becomes available on the hub.
            from agent_node.node import AgentNode
            
            node = AgentNode()
            
            # 2. Signal Handling for Graceful Shutdown
            def handle_exit(sig, frame):
                node.stop()
                sys.exit(0)
                
            signal.signal(signal.SIGINT, handle_exit)
            signal.signal(signal.SIGTERM, handle_exit)

            # Handshake: Sync configuration and Sandbox Policy
            node.sync_configuration()
            
            # 3. Background: Start health reporting (Heartbeats)
            node.start_health_reporting()

            # 4. Foreground: Run Persistent Task Stream (Indefinite wait)
            node.run_task_stream()
            
        except Exception as e:
            print(f"[!] Main Agent process crashed: {e}. Retrying boot in 10s...", flush=True)
            try:
                if 'node' in locals():
                    node.stop()
            except:
                pass
            traceback.print_exc()
            time.sleep(10)

if __name__ == '__main__':
    if len(sys.argv) > 1 and sys.argv[1] == '--version':
        print(f"Cortex Agent {VERSION}")
        sys.exit(0)
    main()