diff --git a/agent-node/VERSION b/agent-node/VERSION index b112f91..c787b21 100644 --- a/agent-node/VERSION +++ b/agent-node/VERSION @@ -1 +1 @@ -1.0.21 +1.0.22 diff --git a/agent-node/src/agent_node/config.py b/agent-node/src/agent_node/config.py index 59e1da1..7233881 100644 --- a/agent-node/src/agent_node/config.py +++ b/agent-node/src/agent_node/config.py @@ -6,8 +6,8 @@ # It sits next to the 'src' directory (two levels up from src/agent_node/config.py) _current_dir = os.path.dirname(os.path.abspath(__file__)) CONFIG_PATH = os.path.abspath(os.path.join(_current_dir, "..", "..", "agent_config.yaml")) +ALT_CONFIG_PATH = os.path.expanduser("~/.cortex/agent.yaml") -# Default values _defaults = { "node_id": "agent-node-007", "node_description": "Modular Stateful Node", @@ -22,43 +22,81 @@ "update_check_interval": 300, } -# 1. Load from YAML if present -_config = _defaults.copy() -if os.path.exists(CONFIG_PATH): - try: - with open(CONFIG_PATH, 'r') as f: - yaml_config = yaml.safe_load(f) or {} - _config.update(yaml_config) - print(f"[*] Loaded node configuration from {CONFIG_PATH}") - except Exception as e: - print(f"[!] Error loading {CONFIG_PATH}: {e}") +# Define Globals +NODE_ID = _defaults["node_id"] +NODE_DESC = _defaults["node_description"] +SERVER_HOST_PORT = _defaults["grpc_endpoint"] +AUTH_TOKEN = _defaults["auth_token"] +SYNC_DIR = _defaults["sync_root"] +TLS_ENABLED = _defaults["tls"] +HEALTH_REPORT_INTERVAL = _defaults["health_report_interval"] +MAX_SKILL_WORKERS = _defaults["max_skill_workers"] +DEBUG_GRPC = False +SECRET_KEY = "dev-secret-key-1337" +HUB_URL = _defaults["hub_url"] +AUTO_UPDATE = _defaults["auto_update"] +UPDATE_CHECK_INTERVAL = _defaults["update_check_interval"] +CERT_CA = "certs/ca.crt" +CERT_CLIENT_CRT = "certs/client.crt" +CERT_CLIENT_KEY = "certs/client.key" +FS_ROOT = "/" if platform.system() in ["Darwin", "Linux"] else "C:\\" -# 2. Override with Environment Variables (12-Factor style) -NODE_ID = os.getenv("AGENT_NODE_ID", _config["node_id"]) -NODE_DESC = os.getenv("AGENT_NODE_DESC", _config["node_description"]) -SERVER_HOST_PORT = os.getenv("GRPC_ENDPOINT", _config["grpc_endpoint"]) # e.g. "ai.jerxie.com:50051" -AUTH_TOKEN = os.getenv("AGENT_AUTH_TOKEN", _config["auth_token"]) -SYNC_DIR = os.getenv("CORTEX_SYNC_DIR", _config["sync_root"]) -TLS_ENABLED = os.getenv("AGENT_TLS_ENABLED", str(_config["tls"])).lower() == 'true' +def reload(): + global NODE_ID, NODE_DESC, SERVER_HOST_PORT, AUTH_TOKEN, SYNC_DIR, TLS_ENABLED + global HEALTH_REPORT_INTERVAL, MAX_SKILL_WORKERS, DEBUG_GRPC, SECRET_KEY + global HUB_URL, AUTO_UPDATE, UPDATE_CHECK_INTERVAL, FS_ROOT + + _config = _defaults.copy() + + # Try reading from known locations + active_path = None + if os.path.exists(CONFIG_PATH): + active_path = CONFIG_PATH + elif os.path.exists(ALT_CONFIG_PATH): + active_path = ALT_CONFIG_PATH + + if active_path: + try: + with open(active_path, 'r') as f: + yaml_data = yaml.safe_load(f) or {} + + # Check if it's nested (e.g. cortex: node: id: ...) + if "cortex" in yaml_data: + ctx = yaml_data["cortex"] + if "node" in ctx: + if "id" in ctx["node"]: _config["node_id"] = ctx["node"]["id"] + if "desc" in ctx["node"]: _config["node_description"] = ctx["node"]["desc"] + if "hub" in ctx: + if "url" in ctx["hub"]: _config["hub_url"] = ctx["hub"]["url"] + if "grpc" in ctx["hub"]: _config["grpc_endpoint"] = ctx["hub"]["grpc"] + if "auth" in ctx: + if "token" in ctx["auth"]: _config["auth_token"] = ctx["auth"]["token"] + else: + # Flat config + _config.update(yaml_data) + except Exception as e: + print(f"[!] Error loading {active_path}: {e}") -HEALTH_REPORT_INTERVAL = int(os.getenv("HEALTH_REPORT_INTERVAL", _config["health_report_interval"])) -MAX_SKILL_WORKERS = int(os.getenv("MAX_SKILL_WORKERS", _config["max_skill_workers"])) + NODE_ID = os.getenv("AGENT_NODE_ID", _config.get("node_id", _defaults["node_id"])) + NODE_DESC = os.getenv("AGENT_NODE_DESC", _config.get("node_description", _defaults["node_description"])) + SERVER_HOST_PORT = os.getenv("GRPC_ENDPOINT", _config.get("grpc_endpoint", _defaults["grpc_endpoint"])) + AUTH_TOKEN = os.getenv("AGENT_AUTH_TOKEN", _config.get("auth_token", _defaults["auth_token"])) + SYNC_DIR = os.getenv("CORTEX_SYNC_DIR", _config.get("sync_root", _defaults["sync_root"])) + TLS_ENABLED = os.getenv("AGENT_TLS_ENABLED", str(_config.get("tls", _defaults["tls"]))).lower() == 'true' -DEBUG_GRPC = os.getenv("DEBUG_GRPC", "false").lower() == "true" -SECRET_KEY = os.getenv("AGENT_SECRET_KEY", _config.get("secret_key", "dev-secret-key-1337")) + HEALTH_REPORT_INTERVAL = int(os.getenv("HEALTH_REPORT_INTERVAL", _config.get("health_report_interval", _defaults["health_report_interval"]))) + MAX_SKILL_WORKERS = int(os.getenv("MAX_SKILL_WORKERS", _config.get("max_skill_workers", _defaults["max_skill_workers"]))) -# Auto-update settings -HUB_URL = os.getenv("AGENT_HUB_URL", _config.get("hub_url", "https://ai.jerxie.com")) -AUTO_UPDATE = os.getenv("AGENT_AUTO_UPDATE", str(_config.get("auto_update", True))).lower() == "true" -UPDATE_CHECK_INTERVAL = int(os.getenv("AGENT_UPDATE_CHECK_INTERVAL", _config.get("update_check_interval", 300))) + DEBUG_GRPC = os.getenv("DEBUG_GRPC", "false").lower() == "true" + SECRET_KEY = os.getenv("AGENT_SECRET_KEY", _config.get("secret_key", "dev-secret-key-1337")) -# These are still available but likely replaced by AUTH_TOKEN / TLS_ENABLED logic -CERT_CA = os.getenv("CERT_CA", "certs/ca.crt") -CERT_CLIENT_CRT = os.getenv("CERT_CLIENT_CRT", "certs/client.crt") -CERT_CLIENT_KEY = os.getenv("CERT_CLIENT_KEY", "certs/client.key") + HUB_URL = os.getenv("AGENT_HUB_URL", _config.get("hub_url", _defaults["hub_url"])) + AUTO_UPDATE = os.getenv("AGENT_AUTO_UPDATE", str(_config.get("auto_update", _defaults["auto_update"]))).lower() == "true" + UPDATE_CHECK_INTERVAL = int(os.getenv("AGENT_UPDATE_CHECK_INTERVAL", _config.get("update_check_interval", _defaults["update_check_interval"]))) -# FS Explorer root - Priority: Env > YAML > / -FS_ROOT = os.getenv("CORTEX_FS_ROOT", _config.get("fs_root")) -if not FS_ROOT: - FS_ROOT = "/" if platform.system() in ["Darwin", "Linux"] else "C:\\" + FS_ROOT = os.getenv("CORTEX_FS_ROOT", _config.get("fs_root")) + if not FS_ROOT: + FS_ROOT = "/" if platform.system() in ["Darwin", "Linux"] else "C:\\" +# Initial load +reload() diff --git a/agent-node/src/agent_node/node.py b/agent-node/src/agent_node/node.py index 66852ee..f7ba8b7 100644 --- a/agent-node/src/agent_node/node.py +++ b/agent-node/src/agent_node/node.py @@ -15,16 +15,17 @@ from agent_node.core.watcher import WorkspaceWatcher from agent_node.utils.auth import verify_task_signature from agent_node.utils.network import get_secure_stub -from agent_node.config import NODE_ID, NODE_DESC, AUTH_TOKEN, HEALTH_REPORT_INTERVAL, MAX_SKILL_WORKERS, DEBUG_GRPC, FS_ROOT +import agent_node.config as config class AgentNode: """The 'Agent Core': Orchestrates Local Skills and Maintains gRPC Connection.""" - def __init__(self, node_id=NODE_ID): - self.node_id = node_id + def __init__(self): + # Dynamically read config instead of caching static defaults + self.node_id = config.NODE_ID self.sandbox = SandboxEngine() self.sync_mgr = NodeSyncManager() - self.skills = SkillManager(max_workers=MAX_SKILL_WORKERS, sync_mgr=self.sync_mgr) + self.skills = SkillManager(max_workers=config.MAX_SKILL_WORKERS, sync_mgr=self.sync_mgr) self.watcher = WorkspaceWatcher(self._on_sync_delta) self.task_queue = queue.Queue() self.stub = get_secure_stub() @@ -104,21 +105,26 @@ def sync_configuration(self): """Initial handshake to retrieve policy and metadata.""" - print(f"[*] Handshake with Orchestrator: {self.node_id}") - caps = self._collect_capabilities() - print(f"[*] Capabilities: {caps}") - - # Protobuf capabilities is map — all values must be strings - caps_str = {k: str(v).lower() if isinstance(v, bool) else str(v) for k, v in caps.items()} - - reg_req = agent_pb2.RegistrationRequest( - node_id=self.node_id, - auth_token=AUTH_TOKEN, - node_description=NODE_DESC, - capabilities=caps_str - ) - while True: + # Reload configuration from disk dynamically before each attempt + config.reload() + self.node_id = config.NODE_ID + self.skills.max_workers = config.MAX_SKILL_WORKERS + + print(f"[*] Handshake with Orchestrator: {self.node_id}") + caps = self._collect_capabilities() + print(f"[*] Capabilities: {caps}") + + # Protobuf capabilities is map — all values must be strings + caps_str = {k: str(v).lower() if isinstance(v, bool) else str(v) for k, v in caps.items()} + + reg_req = agent_pb2.RegistrationRequest( + node_id=self.node_id, + auth_token=config.AUTH_TOKEN, + node_description=config.NODE_DESC, + capabilities=caps_str + ) + try: res = self.stub.SyncConfiguration(reg_req) if res.success: @@ -170,7 +176,7 @@ cpu_usage_percent=cpu, memory_usage_percent=mem_percent, active_worker_count=len(ids), - max_worker_capacity=MAX_SKILL_WORKERS, + max_worker_capacity=config.MAX_SKILL_WORKERS, running_task_ids=ids, cpu_count=psutil.cpu_count(), memory_used_gb=used_gb, @@ -181,7 +187,7 @@ memory_available_gb=avail_gb, load_avg=load ) - time.sleep(max(0, HEALTH_REPORT_INTERVAL - 1.0)) + time.sleep(max(0, config.HEALTH_REPORT_INTERVAL - 1.0)) # Consume the heartbeat stream to keep it alive for response in self.stub.ReportHealth(_gen()): @@ -203,13 +209,13 @@ announce_msg = agent_pb2.ClientTaskMessage( announce=agent_pb2.NodeAnnounce(node_id=self.node_id) ) - if DEBUG_GRPC: + if config.DEBUG_GRPC: print(f"[*] [DEBUG-gRPC] OUTBOUND: announce | {announce_msg}", flush=True) yield announce_msg while True: out_msg = self.task_queue.get() - if DEBUG_GRPC: + if config.DEBUG_GRPC: kind = out_msg.WhichOneof('payload') if kind == 'file_sync' and out_msg.file_sync.HasField('file_data'): print(f"[*] [DEBUG-gRPC] OUTBOUND: {kind} (file_data, path={out_msg.file_sync.file_data.path}, size={len(out_msg.file_sync.file_data.chunk)})", flush=True) @@ -233,7 +239,7 @@ def _process_server_message(self, msg): kind = msg.WhichOneof('payload') - if DEBUG_GRPC: + if config.DEBUG_GRPC: print(f"[*] [DEBUG-gRPC] INBOUND: {kind} | {msg}", flush=True) else: print(f"[*] Inbound: {kind}", flush=True) @@ -248,7 +254,7 @@ elif kind == 'work_pool_update': # Claim logical idle tasks from global pool with slight randomized jitter # to prevent thundering herd where every node claims the same task at the exact same ms. - if len(self.skills.get_active_ids()) < MAX_SKILL_WORKERS: + if len(self.skills.get_active_ids()) < config.MAX_SKILL_WORKERS: for tid in msg.work_pool_update.available_task_ids: # Deterministic delay based on node_id to distribute claims import random @@ -367,7 +373,7 @@ def _get_base_dir(self, session_id, create=False): """Helper to resolve the effective root for a session (Watcher > SyncDir).""" if session_id == "__fs_explorer__": - root = FS_ROOT + root = config.FS_ROOT print(f" [📁] Explorer Root: {root}") return root @@ -468,13 +474,13 @@ target_path = os.path.normpath(os.path.join(base_dir, rel_path)) print(f" [📁💾] target_path: {target_path} (base_dir: {base_dir})") - # M6: Check if path is within session base_dir OR global FS_ROOT + # M6: Check if path is within session base_dir OR global config.FS_ROOT allowed = target_path.startswith(base_dir) - if not allowed and FS_ROOT: - allowed = target_path.startswith(os.path.normpath(FS_ROOT)) + if not allowed and config.FS_ROOT: + allowed = target_path.startswith(os.path.normpath(config.FS_ROOT)) if not allowed: - raise Exception(f"Path traversal attempt blocked: {target_path} is outside {base_dir} (FS_ROOT: {FS_ROOT})") + raise Exception(f"Path traversal attempt blocked: {target_path} is outside {base_dir} (config.FS_ROOT: {config.FS_ROOT})") if is_dir: os.makedirs(target_path, exist_ok=True) @@ -509,11 +515,11 @@ target_path = os.path.normpath(os.path.join(base_dir, rel_path)) allowed = target_path.startswith(base_dir) - if not allowed and FS_ROOT: - allowed = target_path.startswith(os.path.normpath(FS_ROOT)) + if not allowed and config.FS_ROOT: + allowed = target_path.startswith(os.path.normpath(config.FS_ROOT)) if not allowed: - raise Exception(f"Path traversal attempt blocked: {target_path} is outside {base_dir} (FS_ROOT: {FS_ROOT})") + raise Exception(f"Path traversal attempt blocked: {target_path} is outside {base_dir} (config.FS_ROOT: {config.FS_ROOT})") if not os.path.exists(target_path): raise Exception("File not found") @@ -549,11 +555,11 @@ abs_path = os.path.normpath(os.path.join(watch_path, rel_path)) allowed = abs_path.startswith(watch_path) - if not allowed and FS_ROOT: - allowed = abs_path.startswith(os.path.normpath(FS_ROOT)) + if not allowed and config.FS_ROOT: + allowed = abs_path.startswith(os.path.normpath(config.FS_ROOT)) if not allowed: - print(f" [📁🚫] Blocked traversal attempt in _push_file: {rel_path} (Valid Roots: {watch_path}, FS_ROOT: {FS_ROOT})") + print(f" [📁🚫] Blocked traversal attempt in _push_file: {rel_path} (Valid Roots: {watch_path}, config.FS_ROOT: {config.FS_ROOT})") return if not os.path.exists(abs_path):