import threading
import queue
import time
import sys
import os
import hashlib
import logging
import psutil
from protos import agent_pb2, agent_pb2_grpc
logger = logging.getLogger(__name__)
from agent_node.skills.manager import SkillManager
from agent_node.core.sandbox import SandboxEngine
from agent_node.core.sync import NodeSyncManager
from agent_node.core.watcher import WorkspaceWatcher
from agent_node.utils.auth import verify_task_signature
from agent_node.utils.network import get_secure_stub
from agent_node.config import NODE_ID, NODE_DESC, AUTH_TOKEN, HEALTH_REPORT_INTERVAL, MAX_SKILL_WORKERS
class AgentNode:
"""The 'Agent Core': Orchestrates Local Skills and Maintains gRPC Connection."""
def __init__(self, node_id=NODE_ID):
self.node_id = node_id
self.sandbox = SandboxEngine()
self.sync_mgr = NodeSyncManager()
self.skills = SkillManager(max_workers=MAX_SKILL_WORKERS, sync_mgr=self.sync_mgr)
self.watcher = WorkspaceWatcher(self._on_sync_delta)
self.task_queue = queue.Queue()
self.stub = get_secure_stub()
def sync_configuration(self):
"""Initial handshake to retrieve policy and metadata."""
print(f"[*] Handshake with Orchestrator: {self.node_id}")
reg_req = agent_pb2.RegistrationRequest(
node_id=self.node_id,
auth_token=AUTH_TOKEN,
node_description=NODE_DESC,
capabilities={"shell": "v1", "browser": "playwright-sync-bridge"}
)
try:
res = self.stub.SyncConfiguration(reg_req)
if res.success:
self.sandbox.sync(res.policy)
print("[OK] Sandbox Policy Synced.")
else:
print(f"[!] Rejection: {res.error_message}")
sys.exit(1)
except Exception as e:
print(f"[!] Connection Fail: {e}")
sys.exit(1)
def start_health_reporting(self):
"""Streaming node metrics to the orchestrator for load balancing."""
def _report():
while True:
try:
def _gen():
while True:
ids = self.skills.get_active_ids()
# Collection
cpu = psutil.cpu_percent(interval=1.0)
per_core = psutil.cpu_percent(percpu=True)
vmem = psutil.virtual_memory()
mem_percent = vmem.percent
# GB conversion
used_gb = vmem.used / (1024**3)
total_gb = vmem.total / (1024**3)
avail_gb = vmem.available / (1024**3)
# Freq & Load
try:
freq = psutil.cpu_freq().current
except:
freq = 0
try:
load = list(os.getloadavg())
except:
load = [0.0, 0.0, 0.0]
yield agent_pb2.Heartbeat(
node_id=self.node_id,
cpu_usage_percent=cpu,
memory_usage_percent=mem_percent,
active_worker_count=len(ids),
max_worker_capacity=MAX_SKILL_WORKERS,
running_task_ids=ids,
cpu_count=psutil.cpu_count(),
memory_used_gb=used_gb,
memory_total_gb=total_gb,
# M6 Fields
cpu_usage_per_core=per_core,
cpu_freq_mhz=freq,
memory_available_gb=avail_gb,
load_avg=load
)
time.sleep(max(0, HEALTH_REPORT_INTERVAL - 1.0))
# Consume the heartbeat stream to keep it alive
for response in self.stub.ReportHealth(_gen()):
# We don't strictly need the server time, but it confirms a round-trip
pass
except Exception as e:
print(f"[!] Health reporting interrupted: {e}. Retrying in 5s...")
time.sleep(5)
# Non-blocking thread for health heartbeat
threading.Thread(target=_report, daemon=True, name=f"Health-{self.node_id}").start()
def run_task_stream(self):
"""Main Persistent Bi-directional Stream for Task Management with Reconnection."""
while True:
try:
def _gen():
# Initial announcement for routing identity
yield agent_pb2.ClientTaskMessage(
announce=agent_pb2.NodeAnnounce(node_id=self.node_id)
)
while True:
yield self.task_queue.get()
responses = self.stub.TaskStream(_gen())
print(f"[*] Task Stream Online: {self.node_id}", flush=True)
for msg in responses:
self._process_server_message(msg)
except Exception as e:
print(f"[!] Task Stream Failure: {e}. Reconnecting in 5s...", flush=True)
time.sleep(5)
# Re-sync config in case permissions changed during downtime
try: self.sync_configuration()
except: pass
def _process_server_message(self, msg):
kind = msg.WhichOneof('payload')
print(f"[*] Inbound: {kind}", flush=True)
if kind == 'task_request':
self._handle_task(msg.task_request)
elif kind == 'task_cancel':
if self.skills.cancel(msg.task_cancel.task_id):
self._send_response(msg.task_cancel.task_id, None, agent_pb2.TaskResponse.CANCELLED)
elif kind == 'work_pool_update':
# Claim logical idle tasks from global pool with slight randomized jitter
# to prevent thundering herd where every node claims the same task at the exact same ms.
if len(self.skills.get_active_ids()) < MAX_SKILL_WORKERS:
for tid in msg.work_pool_update.available_task_ids:
# Deterministic delay based on node_id to distribute claims
import random
time.sleep(random.uniform(0.1, 0.5))
self.task_queue.put(agent_pb2.ClientTaskMessage(
task_claim=agent_pb2.TaskClaimRequest(task_id=tid, node_id=self.node_id)
))
elif kind == 'claim_status':
status = "GRANTED" if msg.claim_status.granted else "DENIED"
print(f" [📦] Claim {msg.claim_status.task_id}: {status} ({msg.claim_status.reason})", flush=True)
elif kind == 'file_sync':
self._handle_file_sync(msg.file_sync)
def _on_sync_delta(self, session_id, file_payload):
"""Callback from watcher to push local changes to server."""
self.task_queue.put(agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
file_data=file_payload
)
))
def _handle_file_sync(self, fs):
"""Processes inbound file synchronization messages from the Orchestrator."""
sid = fs.session_id
if fs.HasField("manifest"):
needs_update = self.sync_mgr.handle_manifest(sid, fs.manifest)
if needs_update:
print(f" [📁⚠️] Drift Detected for {sid}: {len(needs_update)} files need sync")
self.task_queue.put(agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=sid,
status=agent_pb2.SyncStatus(
code=agent_pb2.SyncStatus.RECONCILE_REQUIRED,
message=f"Drift detected in {len(needs_update)} files",
reconcile_paths=needs_update
)
)
))
else:
self.task_queue.put(agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=sid,
status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.OK, message="Synchronized")
)
))
elif fs.HasField("file_data"):
success = self.sync_mgr.write_chunk(sid, fs.file_data)
if fs.file_data.is_final:
print(f" [📁] File Received: {fs.file_data.path} (Verified: {success})")
status = agent_pb2.SyncStatus.OK if success else agent_pb2.SyncStatus.ERROR
self.task_queue.put(agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=sid,
status=agent_pb2.SyncStatus(code=status, message=f"File {fs.file_data.path} synced")
)
))
elif fs.HasField("control"):
ctrl = fs.control
if ctrl.action == agent_pb2.SyncControl.START_WATCHING:
# Path relative to sync dir or absolute
watch_path = ctrl.path if os.path.isabs(ctrl.path) else os.path.join(self.sync_mgr.get_session_dir(sid), ctrl.path)
self.watcher.start_watching(sid, watch_path)
elif ctrl.action == agent_pb2.SyncControl.STOP_WATCHING:
self.watcher.stop_watching(sid)
elif ctrl.action == agent_pb2.SyncControl.LOCK:
self.watcher.set_lock(sid, True)
elif ctrl.action == agent_pb2.SyncControl.UNLOCK:
self.watcher.set_lock(sid, False)
elif ctrl.action == agent_pb2.SyncControl.REFRESH_MANIFEST:
if ctrl.request_paths:
print(f" [📁📤] Pushing {len(ctrl.request_paths)} Requested Files for {sid}")
for path in ctrl.request_paths:
self._push_file(sid, path)
else:
# Node -> Server Manifest Push
self._push_full_manifest(sid, ctrl.path)
elif ctrl.action == agent_pb2.SyncControl.RESYNC:
self._push_full_manifest(sid, ctrl.path)
# --- M6: FS Explorer Handlers ---
elif ctrl.action == agent_pb2.SyncControl.LIST:
print(f" [📁📂] List Directory: {ctrl.path}")
self._push_full_manifest(sid, ctrl.path, task_id=fs.task_id, shallow=True)
elif ctrl.action == agent_pb2.SyncControl.READ:
print(f" [📁📄] Read File: {ctrl.path}")
self._push_file(sid, ctrl.path, task_id=fs.task_id)
elif ctrl.action == agent_pb2.SyncControl.WRITE:
print(f" [📁💾] Write File: {ctrl.path} (is_dir={ctrl.is_dir})")
self._handle_fs_write(sid, ctrl.path, ctrl.content, ctrl.is_dir, task_id=fs.task_id)
elif ctrl.action == agent_pb2.SyncControl.DELETE:
print(f" [📁🗑️] Delete Fragment: {ctrl.path}")
self._handle_fs_delete(sid, ctrl.path, task_id=fs.task_id)
def _push_full_manifest(self, session_id, rel_path=".", task_id="", shallow=False):
"""Pushes the current local manifest back to the server."""
print(f" [📁📤] Pushing {'Shallow' if shallow else 'Full'} Manifest for {session_id}")
# M6: If __fs_explorer__, we are browsing the root filesystem (terminal-like), otherwise session-scoped
if session_id == "__fs_explorer__":
base_dir = os.getenv("CORTEX_FS_ROOT", "/")
else:
base_dir = self.sync_mgr.get_session_dir(session_id)
watch_path = rel_path if os.path.isabs(rel_path) else os.path.normpath(os.path.join(base_dir, rel_path))
if not os.path.exists(watch_path):
self.task_queue.put(agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
task_id=task_id,
status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.ERROR, message=f"Path {rel_path} not found")
)
))
return
files = []
try:
if shallow:
# Optimized for Explorer: immediate children only, no hashing
with os.scandir(watch_path) as it:
for entry in it:
is_dir = entry.is_dir()
# Use metadata only
try:
stats = entry.stat()
size = stats.st_size if not is_dir else 0
except: size = 0
# Calculate path relative to the actual base_sync_dir / session_dir
# rel_path is the directory we are currently browsing.
# entry.name is the file within it.
item_rel_path = os.path.relpath(os.path.join(watch_path, entry.name), base_dir)
files.append(agent_pb2.FileInfo(path=item_rel_path, size=size, hash="", is_dir=is_dir))
else:
# Deep walk with full hashes for reconciliation
for root, dirs, filenames in os.walk(watch_path):
for filename in filenames:
abs_path = os.path.join(root, filename)
r_path = os.path.relpath(abs_path, watch_path)
try:
with open(abs_path, "rb") as f:
h = hashlib.sha256(f.read()).hexdigest()
files.append(agent_pb2.FileInfo(path=r_path, size=os.path.getsize(abs_path), hash=h, is_dir=False))
except Exception as e:
print(f" [⚠️] Failed to hash {abs_path}: {e}")
for d in dirs:
abs_path = os.path.join(root, d)
r_path = os.path.relpath(abs_path, watch_path)
files.append(agent_pb2.FileInfo(path=r_path, size=0, hash="", is_dir=True))
except Exception as e:
print(f" [❌] Manifest generation failed for {rel_path}: {e}")
self.task_queue.put(agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
task_id=task_id,
status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.ERROR, message=str(e))
)
))
return
self.task_queue.put(agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
task_id=task_id,
manifest=agent_pb2.DirectoryManifest(root_path=rel_path, files=files)
)
))
def _handle_fs_write(self, session_id, rel_path, content, is_dir, task_id=""):
"""Modular FS Write/Create."""
try:
if session_id == "__fs_explorer__":
base_dir = os.getenv("CORTEX_FS_ROOT", "/")
else:
base_dir = self.sync_mgr.get_session_dir(session_id)
target_path = os.path.normpath(os.path.join(base_dir, rel_path))
if not target_path.startswith(base_dir):
raise Exception("Path traversal attempt blocked")
if is_dir:
os.makedirs(target_path, exist_ok=True)
else:
os.makedirs(os.path.dirname(target_path), exist_ok=True)
with open(target_path, "wb") as f:
f.write(content)
# Send OK status
self.task_queue.put(agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
task_id=task_id,
status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.OK, message=f"{'Directory' if is_dir else 'File'} written")
)
))
# Trigger manifest refresh so UI updates
self._push_full_manifest(session_id, os.path.dirname(rel_path) or ".", task_id=task_id, shallow=True)
except Exception as e:
self.task_queue.put(agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
task_id=task_id,
status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.ERROR, message=str(e))
)
))
def _handle_fs_delete(self, session_id, rel_path, task_id=""):
"""Modular FS Delete."""
try:
if session_id == "__fs_explorer__":
base_dir = os.getenv("CORTEX_FS_ROOT", "/")
else:
base_dir = self.sync_mgr.get_session_dir(session_id)
target_path = os.path.normpath(os.path.join(base_dir, rel_path))
if not target_path.startswith(base_dir):
raise Exception("Path traversal attempt blocked")
if not os.path.exists(target_path):
raise Exception("File not found")
import shutil
if os.path.isdir(target_path):
shutil.rmtree(target_path)
else:
os.remove(target_path)
# Send OK status
self.task_queue.put(agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
task_id=task_id,
status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.OK, message=f"Deleted {rel_path}")
)
))
# Trigger manifest refresh so UI updates
self._push_full_manifest(session_id, os.path.dirname(rel_path) or ".", task_id=task_id, shallow=True)
except Exception as e:
self.task_queue.put(agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
task_id=task_id,
status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.ERROR, message=str(e))
)
))
def _push_file(self, session_id, rel_path, task_id=""):
"""Pushes a specific file from node to server."""
if session_id == "__fs_explorer__":
watch_path = os.getenv("CORTEX_FS_ROOT", "/")
else:
watch_path = self.watcher.get_watch_path(session_id)
if not watch_path:
# Fallback to sync dir if watcher not started
watch_path = self.sync_mgr.get_session_dir(session_id)
abs_path = os.path.normpath(os.path.join(watch_path, rel_path))
if not abs_path.startswith(watch_path):
print(f" [📁🚫] Blocked traversal attempt in _push_file: {rel_path}")
return
if not os.path.exists(abs_path):
print(f" [📁❓] Requested file {rel_path} not found on node")
return
with open(abs_path, "rb") as f:
full_data = f.read()
full_hash = hashlib.sha256(full_data).hexdigest()
f.seek(0)
index = 0
while True:
chunk = f.read(1024 * 1024) # 1MB chunks
is_final = len(chunk) < 1024 * 1024
self.task_queue.put(agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
task_id=task_id,
file_data=agent_pb2.FilePayload(
path=rel_path,
chunk=chunk,
chunk_index=index,
is_final=is_final,
hash=full_hash if is_final else ""
)
)
))
if is_final or not chunk:
break
index += 1
def _handle_task(self, task):
print(f"[*] Task Launch: {task.task_id}", flush=True)
# 1. Cryptographic Signature Verification
if not verify_task_signature(task):
print(f"[!] Signature Validation Failed for {task.task_id}", flush=True)
# Report back to hub so the frontend gets a real error, not a silent timeout
self._send_response(
task.task_id,
agent_pb2.TaskResponse(
task_id=task.task_id,
status=agent_pb2.TaskResponse.ERROR,
stderr="[NODE] HMAC signature mismatch — check that AGENT_SECRET_KEY on the node matches the hub SECRET_KEY. Task rejected.",
)
)
return
print(f"[✅] Validated task {task.task_id}", flush=True)
# 2. Skill Manager Submission
success, reason = self.skills.submit(task, self.sandbox, self._on_finish, self._on_event)
if not success:
print(f"[!] Execution Rejected: {reason}", flush=True)
def _on_event(self, event):
"""Live Event Tunneler: Routes browser/skill events into the main stream."""
if isinstance(event, agent_pb2.ClientTaskMessage):
self.task_queue.put(event)
else:
# Legacy/Browser Skill fallback
self.task_queue.put(agent_pb2.ClientTaskMessage(browser_event=event))
def _on_finish(self, tid, res, trace):
"""Final Completion Callback: Routes task results back to server."""
print(f"[*] Completion: {tid}", flush=True)
status = agent_pb2.TaskResponse.SUCCESS if res['status'] == 1 else agent_pb2.TaskResponse.ERROR
tr = agent_pb2.TaskResponse(
task_id=tid, status=status,
stdout=res.get('stdout',''),
stderr=res.get('stderr',''),
trace_id=trace,
browser_result=res.get("browser_result")
)
self._send_response(tid, tr)
def _send_response(self, tid, tr=None, status=None):
"""Utility for placing response messages into the gRPC outbound queue."""
if tr:
self.task_queue.put(agent_pb2.ClientTaskMessage(task_response=tr))
else:
self.task_queue.put(agent_pb2.ClientTaskMessage(
task_response=agent_pb2.TaskResponse(task_id=tid, status=status)
))
def stop(self):
"""Gracefully stops all background services and skills."""
print(f"\n[🛑] Stopping Agent Node: {self.node_id}")
self.skills.shutdown()
# Optionally close gRPC channel if we want to be very clean
# self.channel.close()