Newer
Older
cortex-hub / agent-node / src / agent_node / core / updater.py
"""
Auto-Update Trigger for Cortex Agent Node.

Detects when the running agent is behind the hub's version and
delegates to bootstrap_installer.py to perform the update — the same
program used for Day 0 installation.

Both bootstrap and version bump follow the exact same code path:
  bootstrap_installer.py  →  download → extract → install deps → launch

Channel: Stable HTTP REST only. No gRPC/proto. This contract is frozen.
"""

import os
import sys
import time
import json
import logging
import threading
import subprocess
import urllib.request
from typing import Optional, Union, Dict, List

logger = logging.getLogger(__name__)

_HUB_HTTP_URL: str = ""
_AUTH_TOKEN: str = ""
_CHECK_INTERVAL_SECS: int = 300

# bootstrap_installer.py lives at the agent-node root (three levels up from here)
_AGENT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
_VERSION_FILE = os.path.join(_AGENT_ROOT, "VERSION")
_BOOTSTRAPPER = os.path.join(_AGENT_ROOT, "bootstrap_installer.py")


def _read_local_version() -> str:
    try:
        with open(_VERSION_FILE) as f:
            return f.read().strip()
    except FileNotFoundError:
        return "0.0.0"



def _fetch_remote_version() -> Optional[str]:
    url = f"{_HUB_HTTP_URL}/api/v1/agent/version"
    try:
        req = urllib.request.Request(url, headers={"X-Agent-Token": _AUTH_TOKEN})
        with urllib.request.urlopen(req, timeout=10) as resp:
            return json.loads(resp.read().decode()).get("version")
    except Exception as e:
        logger.warning(f"[Updater] Version check failed: {e}")
        return None


def _version_tuple(v: str):
    try:
        return tuple(int(x) for x in v.split("."))
    except Exception:
        return (0, 0, 0)


def _apply_update_via_bootstrapper():
    """
    Delegates to bootstrap_installer.py --update-only — the same code path
    as Day 0 installation — then restarts this process.
    Does not return on success.
    """
    if not os.path.exists(_BOOTSTRAPPER):
        logger.error(f"[Updater] bootstrap_installer.py not found at {_BOOTSTRAPPER}")
        return False

    logger.info("[Updater] ⬇️  Delegating update to bootstrap_installer.py ...")
    result = subprocess.run(
        [sys.executable, _BOOTSTRAPPER,
         "--hub", _HUB_HTTP_URL,
         "--token", _AUTH_TOKEN,
         "--update-only",
         "--install-dir", _AGENT_ROOT],
        cwd=_AGENT_ROOT
    )

    if result.returncode == 0:
        logger.info("[Updater] ✅ Update applied. Restarting agent process...")
        sys.stdout.flush()
        sys.stderr.flush()
        os.execv(sys.executable, [sys.executable] + sys.argv)  # in-place restart, no return
    else:
        logger.error(f"[Updater] bootstrap_installer.py failed (exit {result.returncode}). Continuing with current version.")
        return False


def check_and_update_once():
    """
    Single version check against the hub. If a newer version is available,
    triggers bootstrap_installer.py and restarts (does not return if applied).
    """
    local = _read_local_version()
    logger.info(f"[Updater] Local version: {local}")

    remote = _fetch_remote_version()
    if remote is None:
        logger.info("[Updater] Hub unreachable — skipping update check.")
        return

    logger.info(f"[Updater] Remote version: {remote}")

    if _version_tuple(remote) <= _version_tuple(local):
        logger.info("[Updater] ✅ Already up to date.")
        return

    logger.info(f"[Updater] 🆕 Update available: {local} → {remote}")
    _apply_update_via_bootstrapper()  # does not return on success


def start_background_updater():
    """Starts a daemon thread that periodically checks for new versions."""
    def _loop():
        while True:
            time.sleep(_CHECK_INTERVAL_SECS)
            try:
                check_and_update_once()
            except Exception as e:
                logger.error(f"[Updater] Background check error: {e}")

    t = threading.Thread(target=_loop, daemon=True, name="AutoUpdater")
    t.start()
    logger.info(f"[Updater] Background updater started (interval: {_CHECK_INTERVAL_SECS}s)")


def init(hub_http_url: str, auth_token: str, check_interval_secs: int = 300):
    """Initialize with hub connection details. Call before any other function."""
    global _HUB_HTTP_URL, _AUTH_TOKEN, _CHECK_INTERVAL_SECS
    _HUB_HTTP_URL = hub_http_url.rstrip("/")
    _AUTH_TOKEN = auth_token
    _CHECK_INTERVAL_SECS = check_interval_secs