import sys
import os
# gRPC/Mac Stability Tuning
os.environ["GRPC_ENABLE_FORK_SUPPORT"] = "0"
os.environ["GRPC_FORK_SUPPORT_ONLY_FOR_SIGCHLD"] = "1"
os.environ["GRPC_POLL_STRATEGY"] = "poll"
# RELIABILITY: Ensure sys.argv[0] is an absolute path.
# This is critical for os.execv(...) restarts during auto-updates,
# especially when the agent is started with a relative path (e.g., 'python3 src/main.py').
if sys.argv and sys.argv[0] and not os.path.isabs(sys.argv[0]):
sys.argv[0] = os.path.abspath(sys.argv[0])
# Add root and protos to path with HIGHEST priority to avoid collision with installed packages
_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, _root)
sys.path.insert(0, os.path.join(_root, "protos"))
import signal
import time
from agent_node.config import NODE_ID, HUB_URL, AUTH_TOKEN, SECRET_KEY, AUTO_UPDATE, UPDATE_CHECK_INTERVAL
from agent_node.core import updater
# Pre-flight check for core dependencies
try:
import grpc
import google.protobuf
import watchdog
except ImportError as e:
err_str = str(e).lower()
if "grpc" in err_str:
missing, pkg = "grpcio", "python3-grpcio"
elif "google" in err_str or "protobuf" in err_str:
missing, pkg = "protobuf", "python3-protobuf"
else:
missing, pkg = "watchdog", "python3-watchdog"
print("\n" + "!"*71)
print(f" CRITICAL ERROR: '{missing}' library is not installed.")
print(f" If you are on Raspberry Pi / ARM, run:")
print(f" sudo apt-get install {pkg}")
print("!"*71 + "\n")
except ImportError:
pass
def enforce_singleton():
"""
Ensures that only one instance of the agent is running from this directory.
If siblings are found, they are terminated to prevent resource/port collisions.
This version is robust across Linux and Darwin and avoids unnecessary OS bails.
"""
import psutil
import os
current_pid = os.getpid()
try:
# Use realpath to resolve any symlinks for accurate comparison
my_path = os.path.realpath(__file__)
except:
my_path = os.path.abspath(__file__)
cleaned = 0
try:
# iterate over all processes once
for proc in psutil.process_iter(['pid', 'cmdline']):
try:
pid = proc.info['pid']
if pid == current_pid:
continue
cmd = proc.info['cmdline']
if not cmd or not isinstance(cmd, list):
continue
# We identify a sibling if it's running 'main.py' and resolves to our same directory.
is_sibling = False
for arg in cmd:
if 'main.py' in arg:
try:
# 1. Try absolute path resolution
if os.path.isabs(arg):
check_path = os.path.realpath(arg)
else:
# 2. Try relative resolution based on the sibling's current working directory
try:
cwd = proc.cwd()
check_path = os.path.realpath(os.path.join(cwd, arg))
except (psutil.AccessDenied, psutil.NoSuchProcess):
# If we can't get the CWD of the other process, we rely on a direct name match
# but stay conservative to avoid killing unrelated processes.
check_path = None
if check_path and check_path == my_path:
is_sibling = True
break
except:
continue
if is_sibling:
print(f"[*] Cleaning up orphaned agent instance (PID {pid})...")
try:
p = psutil.Process(pid)
p.terminate()
try:
p.wait(timeout=2)
except psutil.TimeoutExpired:
p.kill()
cleaned += 1
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
except (psutil.NoSuchProcess, psutil.AccessDenied, KeyError):
continue
except Exception as e:
# Non-fatal: if we can't iterate processes at all, just log and continue.
# This prevents the agent from being a 'brick' in restricted environments.
print(f"[!] Singleton check warning: {e}")
if cleaned > 0:
print(f"[*] Successfully reaped {cleaned} orphaned instances.")
if cleaned > 0:
print(f"[*] Cleaned up {cleaned} orphaned instances.")
else:
print("[*] No conflicting agent instances detected.")
def main():
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[logging.StreamHandler(sys.stdout)]
)
# 0. Singleton Enforcement: Murder siblings before booting
try:
import psutil
enforce_singleton()
except ImportError:
print("[!] psutil not installed — skipping singleton enforcement. Beware of orphaned processes!")
except Exception as e:
print(f"[!] Singleton check failed: {e}")
print(f"[*] Starting Agent Node: {NODE_ID}...")
# 0. Auto-Update Check (before anything else — if we're behind, restart now)
# This uses only standard libraries, making it extremely resilient.
if AUTO_UPDATE:
try:
updater.init(hub_http_url=HUB_URL, auth_token=SECRET_KEY, check_interval_secs=UPDATE_CHECK_INTERVAL)
updater.check_and_update_once() # May restart process — does not return if update applied
# Start background updater BEFORE initializing the node.
# This ensures that even if AgentNode() crashes (e.g. missing grpcio),
# the node can still self-repair if a fix is pushed to the hub.
updater.start_background_updater()
except Exception as e:
print(f"[!] Updater initialization failed: {e}. Moving on to agent boot...")
# 1. Initialization and Main Persistence Loop
while True:
try:
# Deferred Import: We import the core AgentNode inside the loop.
# This ensures that even if a future update breaks the code (e.g. missing dependency),
# the process STILL stays alive and the Background Updater (started above)
# can still pull a new fix when it becomes available on the hub.
from agent_node.node import AgentNode
node = AgentNode()
# 2. Signal Handling for Graceful Shutdown
def handle_exit(sig, frame):
node.stop()
sys.exit(0)
signal.signal(signal.SIGINT, handle_exit)
signal.signal(signal.SIGTERM, handle_exit)
# Handshake: Sync configuration and Sandbox Policy
node.sync_configuration()
# 3. Background: Start health reporting (Heartbeats)
node.start_health_reporting()
# 4. Foreground: Run Persistent Task Stream (Indefinite wait)
node.run_task_stream()
except Exception as e:
print(f"[!] Main Agent process crashed: {e}. Retrying boot in 10s...", flush=True)
try:
if 'node' in locals():
node.stop()
except:
pass
import traceback
traceback.print_exc()
time.sleep(10)
if __name__ == '__main__':
main()