import os
import platform
try:
import yaml
except ImportError:
yaml = None
# Path to the generated config file in the bundled distribution
# It sits next to the 'src' directory (two levels up from src/agent_node/config.py)
_current_dir = os.path.dirname(os.path.abspath(__file__))
CONFIG_PATH = os.path.abspath(os.path.join(_current_dir, "..", "..", "agent_config.yaml"))
ALT_CONFIG_PATH = os.path.expanduser("~/.cortex/agent.yaml")
_defaults = {
"node_id": "agent-node-007",
"node_description": "Modular Stateful Node",
"hub_url": "http://127.0.0.1:8000",
"grpc_endpoint": "127.0.0.1:50051",
"auth_token": os.getenv("AGENT_AUTH_TOKEN", "cortex-secret-shared-key"),
"sync_root": "/tmp/cortex-sync",
"tls": True,
"max_skill_workers": 10,
"health_report_interval": 10,
"auto_update": True,
"update_check_interval": 300,
}
# Define Globals
NODE_ID = _defaults["node_id"]
NODE_DESC = _defaults["node_description"]
SERVER_HOST_PORT = _defaults["grpc_endpoint"]
AUTH_TOKEN = _defaults["auth_token"]
SYNC_DIR = _defaults["sync_root"]
TLS_ENABLED = _defaults["tls"]
HEALTH_REPORT_INTERVAL = _defaults["health_report_interval"]
MAX_SKILL_WORKERS = _defaults["max_skill_workers"]
DEBUG_GRPC = True
SECRET_KEY = "dev-secret-key-1337"
HUB_URL = _defaults["hub_url"]
AUTO_UPDATE = _defaults["auto_update"]
UPDATE_CHECK_INTERVAL = _defaults["update_check_interval"]
CERT_CA = "certs/ca.crt"
CERT_CLIENT_CRT = "certs/client.crt"
CERT_CLIENT_KEY = "certs/client.key"
FS_ROOT = "/" if platform.system() in ["Darwin", "Linux"] else "C:\\"
BROWSER_HEADLESS = True # Runtime-togglable: False = headed mode (requires has_display)
def reload():
global NODE_ID, NODE_DESC, SERVER_HOST_PORT, AUTH_TOKEN, SYNC_DIR, TLS_ENABLED
global HEALTH_REPORT_INTERVAL, MAX_SKILL_WORKERS, DEBUG_GRPC, SECRET_KEY
global HUB_URL, AUTO_UPDATE, UPDATE_CHECK_INTERVAL, FS_ROOT
_config = _defaults.copy()
# Try reading from known locations
active_path = None
if os.path.exists(CONFIG_PATH):
active_path = CONFIG_PATH
elif os.path.exists(ALT_CONFIG_PATH):
active_path = ALT_CONFIG_PATH
if active_path:
try:
with open(active_path, 'r') as f:
yaml_data = yaml.safe_load(f) or {}
# Check if it's nested (e.g. cortex: node: id: ...)
if "cortex" in yaml_data:
ctx = yaml_data["cortex"]
if "node" in ctx:
if "id" in ctx["node"]: _config["node_id"] = ctx["node"]["id"]
if "desc" in ctx["node"]: _config["node_description"] = ctx["node"]["desc"]
if "hub" in ctx:
if "url" in ctx["hub"]: _config["hub_url"] = ctx["hub"]["url"]
if "grpc" in ctx["hub"]: _config["grpc_endpoint"] = ctx["hub"]["grpc"]
if "auth" in ctx:
if "token" in ctx["auth"]: _config["auth_token"] = ctx["auth"]["token"]
else:
# Flat config
_config.update(yaml_data)
except Exception as e:
print(f"[!] Error loading {active_path}: {e}")
NODE_ID = os.getenv("AGENT_NODE_ID", _config.get("node_id", _defaults["node_id"]))
NODE_DESC = os.getenv("AGENT_NODE_DESC", _config.get("node_description", _defaults["node_description"]))
SERVER_HOST_PORT = os.getenv("GRPC_ENDPOINT", _config.get("grpc_endpoint", _defaults["grpc_endpoint"]))
AUTH_TOKEN = os.getenv("AGENT_AUTH_TOKEN", _config.get("auth_token", _defaults["auth_token"]))
SYNC_DIR = os.getenv("CORTEX_SYNC_DIR", _config.get("sync_root", _defaults["sync_root"]))
TLS_ENABLED = os.getenv("AGENT_TLS_ENABLED", str(_config.get("tls", _defaults["tls"]))).lower() == 'true'
HEALTH_REPORT_INTERVAL = int(os.getenv("HEALTH_REPORT_INTERVAL", _config.get("health_report_interval", _defaults["health_report_interval"])))
MAX_SKILL_WORKERS = int(os.getenv("MAX_SKILL_WORKERS", _config.get("max_skill_workers", _defaults["max_skill_workers"])))
DEBUG_GRPC = os.getenv("DEBUG_GRPC", "false").lower() == "true"
SECRET_KEY = os.getenv("AGENT_SECRET_KEY", _config.get("secret_key", "dev-secret-key-1337"))
HUB_URL = os.getenv("AGENT_HUB_URL", _config.get("hub_url", _defaults["hub_url"]))
AUTO_UPDATE = os.getenv("AGENT_AUTO_UPDATE", str(_config.get("auto_update", _defaults["auto_update"]))).lower() == "true"
UPDATE_CHECK_INTERVAL = int(os.getenv("AGENT_UPDATE_CHECK_INTERVAL", _config.get("update_check_interval", _defaults["update_check_interval"])))
FS_ROOT = os.getenv("CORTEX_FS_ROOT", _config.get("fs_root"))
if not FS_ROOT:
FS_ROOT = "/" if platform.system() in ["Darwin", "Linux"] else "C:\\"
def set_browser_headless(headless: bool):
"""Toggle browser headless mode at runtime (no restart needed)."""
global BROWSER_HEADLESS
BROWSER_HEADLESS = headless
print(f"[🌐] Browser mode updated: {'headless' if headless else 'headed (UI visible)'}")
# Initial load
reload()