import threading
import queue
import time
import sys
import os
import hashlib
import logging
import json
import zlib
try:
import psutil
except ImportError:
psutil = None
from protos import agent_pb2, agent_pb2_grpc
logger = logging.getLogger(__name__)
from agent_node.skills.manager import SkillManager
from agent_node.core.sandbox import SandboxEngine
from agent_node.core.sync import NodeSyncManager
from agent_node.core.watcher import WorkspaceWatcher
from agent_node.utils.auth import verify_task_signature
from agent_node.utils.network import get_secure_stub
import agent_node.config as config
class AgentNode:
"""The 'Agent Core': Orchestrates Local Skills and Maintains gRPC Connection."""
def __init__(self):
# Dynamically read config instead of caching static defaults
self.node_id = config.NODE_ID
self.sandbox = SandboxEngine()
self.sync_mgr = NodeSyncManager()
self.skills = SkillManager(max_workers=config.MAX_SKILL_WORKERS, sync_mgr=self.sync_mgr)
self.watcher = WorkspaceWatcher(self._on_sync_delta)
self.task_queue = queue.Queue(maxsize=100) # backpressure
self.stub = None
self.channel = None
self._stop_event = threading.Event()
self._refresh_stub()
# M6: Parallel Disk I/O Workers
from concurrent.futures import ThreadPoolExecutor
self.io_executor = ThreadPoolExecutor(max_workers=10, thread_name_prefix="NodeIO")
# Backpressure for I/O: Prevent memory ballooning during heavy sync
self.io_semaphore = threading.Semaphore(50)
self.write_locks = {} # (sid, path) -> threading.Lock
self.lock_map_mutex = threading.Lock()
def _refresh_stub(self):
"""Force refreshes the gRPC channel and stub, ensuring old ones are closed."""
if self.channel:
try:
self.channel.close()
except:
pass
self.stub, self.channel = get_secure_stub()
self._setup_connectivity_watcher()
def _setup_connectivity_watcher(self):
"""Monitor gRPC channel state and log only on actual transitions."""
import grpc
self._last_grpc_state = None
def _on_state_change(state):
if self._stop_event.is_set():
return
if state != self._last_grpc_state:
print(f"[*] [gRPC-State] {state}", flush=True)
self._last_grpc_state = state
# Persistent subscription — only call ONCE per channel.
# Re-subscribing inside the callback causes an exponential callback leak.
self.channel.subscribe(_on_state_change, try_to_connect=True)
def _collect_capabilities(self) -> dict:
"""Collect hardware metadata to advertise at registration."""
import platform
import subprocess
import socket
import os
caps = {
"shell": "v1",
"arch": platform.machine(), # e.g. x86_64, arm64, aarch64
"os": platform.system().lower(), # linux, darwin, windows
"os_release": platform.release(),
}
# Dynamic Browser Capability Detection
if hasattr(self, "skills"):
browser_skill = self.skills.skills.get("browser")
if browser_skill and browser_skill.is_available():
caps["browser"] = "playwright-sync-bridge"
# Privilege Detection
# is_root: True if UID 0 (Linux/macOS) — no sudo needed at all
# has_sudo: True if sudo is installed AND available passwordlessly
try:
caps["is_root"] = (os.getuid() == 0)
except AttributeError:
# Windows — os.getuid() doesn't exist; approximate via admin check
try:
import ctypes
caps["is_root"] = bool(ctypes.windll.shell32.IsUserAnAdmin())
except Exception:
caps["is_root"] = False
if caps.get("is_root"):
caps["has_sudo"] = False # Root doesn't need sudo
else:
# Check if passwordless sudo is available
try:
r = subprocess.run(
["sudo", "-n", "true"],
capture_output=True, timeout=3
)
caps["has_sudo"] = (r.returncode == 0)
except Exception:
caps["has_sudo"] = False
# Local IP Detection (best effort)
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0)
# Doesn't even have to be reachable
s.connect(('10.254.254.254', 1))
caps["local_ip"] = s.getsockname()[0]
s.close()
except Exception:
caps["local_ip"] = "unknown"
# GPU Detection — try nvidia-smi first, then check for Apple GPU
try:
result = subprocess.run(
["nvidia-smi", "--query-gpu=name,memory.total", "--format=csv,noheader,nounits"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0 and result.stdout.strip():
gpu_lines = result.stdout.strip().split("\n")
caps["gpu"] = gpu_lines[0].strip() # e.g. "NVIDIA GeForce RTX 3080, 10240"
caps["gpu_count"] = str(len(gpu_lines))
else:
caps["gpu"] = "none"
except Exception:
# No nvidia-smi — check if Apple Silicon (arm64 + darwin)
if caps["os"] == "darwin" and "arm" in caps["arch"].lower():
caps["gpu"] = "apple-silicon"
else:
caps["gpu"] = "none"
# Display Detection — can this node show a real browser window?
# macOS and Windows always have a graphical display.
# Linux needs DISPLAY (X11) or WAYLAND_DISPLAY set.
_os = caps.get("os", "")
if _os in ("darwin", "windows"):
caps["has_display"] = True
else:
has_x11 = bool(os.environ.get("DISPLAY", "").strip())
has_wayland = bool(os.environ.get("WAYLAND_DISPLAY", "").strip())
caps["has_display"] = has_x11 or has_wayland
return caps
def sync_configuration(self):
"""Initial handshake to retrieve policy and metadata."""
while True:
# Reload configuration from disk dynamically before each attempt
config.reload()
self.node_id = config.NODE_ID
self.skills.max_workers = config.MAX_SKILL_WORKERS
# Ensure stub is fresh if we re-enter from a crash
if not self.stub:
self._refresh_stub()
print(f"[*] Handshake with Orchestrator: {self.node_id}")
caps = self._collect_capabilities()
print(f"[*] Capabilities: {caps}")
# Protobuf capabilities is map<string, string> — all values must be strings
caps_str = {k: str(v).lower() if isinstance(v, bool) else str(v) for k, v in caps.items()}
reg_req = agent_pb2.RegistrationRequest(
node_id=self.node_id,
auth_token=config.AUTH_TOKEN,
node_description=config.NODE_DESC,
capabilities=caps_str
)
try:
print(f"[*] [gRPC-Handshake] Sending SyncConfiguration (timeout=10s)...", flush=True)
res = self.stub.SyncConfiguration(reg_req, timeout=10)
if res.success:
self.sandbox.sync(res.policy)
print("[OK] [gRPC-Handshake] Handshake successful. Sandbox Policy Synced.")
# Apply initial skill config
if res.policy.skill_config_json:
try:
cfg = json.loads(res.policy.skill_config_json)
for name, skill in self.skills.skills.items():
if hasattr(skill, "apply_config"):
skill.apply_config(cfg)
except Exception as e:
print(f"[!] Error applying initial skill config: {e}")
break # Success, exit the retry loop
else:
print(f"[!] Rejection: {res.error_message}")
print("[!] Retrying handshake in 5 seconds...")
time.sleep(5)
except Exception as e:
err_desc = self._format_grpc_error(e)
print(f"[!] Connection Fail: {err_desc}")
print("[!] Retrying handshake in 5 seconds...")
time.sleep(5)
def _format_grpc_error(self, e) -> str:
"""Helper to extract detailed info from gRPC exceptions."""
try:
import grpc
if isinstance(e, grpc.RpcError):
return f"gRPC Error {e.code()} | {e.details()}"
except:
pass
return str(e)
def start_health_reporting(self):
"""Streaming node metrics to the orchestrator for load balancing."""
def _report():
while not self._stop_event.is_set():
try:
def _gen():
while not self._stop_event.is_set():
ids = self.skills.get_active_ids()
# Collection
if psutil:
# Optimization: Use non-blocking CPU check.
# interval=None (default) prevents blocking the gRPC thread.
cpu = psutil.cpu_percent(interval=None)
per_core = psutil.cpu_percent(percpu=True, interval=None)
vmem = psutil.virtual_memory()
mem_percent = vmem.percent
used_gb = vmem.used / (1024**3)
total_gb = vmem.total / (1024**3)
avail_gb = vmem.available / (1024**3)
cpu_count = psutil.cpu_count()
else:
cpu = 0.0
per_core = []
mem_percent = 0.0
used_gb = 0.0
total_gb = 0.0
avail_gb = 0.0
cpu_count = 0
# Freq & Load
freq = 0
if psutil:
try:
freq = psutil.cpu_freq().current
except:
pass
try:
load = list(os.getloadavg())
except:
load = [0.0, 0.0, 0.0]
yield agent_pb2.Heartbeat(
node_id=self.node_id,
cpu_usage_percent=cpu,
memory_usage_percent=mem_percent,
active_worker_count=len(ids),
max_worker_capacity=config.MAX_SKILL_WORKERS,
running_task_ids=ids,
cpu_count=cpu_count,
memory_used_gb=used_gb,
memory_total_gb=total_gb,
# M6 Fields
cpu_usage_per_core=per_core,
cpu_freq_mhz=freq,
memory_available_gb=avail_gb,
load_avg=load
)
time.sleep(max(0, config.HEALTH_REPORT_INTERVAL - 1.0))
# Consume the heartbeat stream to keep it alive
for response in self.stub.ReportHealth(_gen()):
# We don't strictly need the server time, but it confirms a round-trip
pass
except Exception as e:
err_desc = self._format_grpc_error(e)
print(f"[!] Health reporting interrupted: {err_desc}. Retrying in 5s...")
time.sleep(5)
# Non-blocking thread for health heartbeat
threading.Thread(target=_report, daemon=True, name=f"Health-{self.node_id}").start()
def run_task_stream(self):
"""Main Persistent Bi-directional Stream for Task Management with Reconnection."""
while True:
try:
def _gen():
# Initial announcement for routing identity
announce_msg = agent_pb2.ClientTaskMessage(
announce=agent_pb2.NodeAnnounce(node_id=self.node_id)
)
yield announce_msg
while True:
out_msg = self.task_queue.get()
try:
yield out_msg
except Exception as ye:
print(f"[*] [gRPC-Stream] !!! Send Error: {ye}", flush=True)
raise ye
responses = self.stub.TaskStream(_gen())
print(f"[*] [gRPC-Stream] Connected to Orchestrator ({self.node_id}).", flush=True)
for msg in responses:
self._process_server_message(msg)
print(f"[*] [gRPC-Stream] Connection closed by server.", flush=True)
except Exception as e:
err_desc = self._format_grpc_error(e)
print(f"[!] Task Stream Failure: {err_desc}. Reconnecting in 5s...", flush=True)
# Force refresh stub on reconnection, closing old channel
self._refresh_stub()
time.sleep(5)
# Re-sync config in case permissions changed during downtime
try: self.sync_configuration()
except: pass
def _process_server_message(self, msg):
kind = msg.WhichOneof('payload')
if config.DEBUG_GRPC or True: # Force logging for now to debug Mac
if kind == 'file_sync' and msg.file_sync.HasField('control'):
print(f"[*] Inbound: {kind} (control={msg.file_sync.control.action})", flush=True)
else:
print(f"[*] Inbound: {kind}", flush=True)
if kind == 'task_request':
self._handle_task(msg.task_request)
elif kind == 'task_cancel':
if self.skills.cancel(msg.task_cancel.task_id):
self._send_response(msg.task_cancel.task_id, None, agent_pb2.TaskResponse.CANCELLED)
elif kind == 'work_pool_update':
# Claim logical idle tasks from global pool with slight randomized jitter
# to prevent thundering herd where every node claims the same task at the exact same ms.
if len(self.skills.get_active_ids()) < config.MAX_SKILL_WORKERS:
for tid in msg.work_pool_update.available_task_ids:
# Deterministic delay based on node_id to distribute claims
import random
time.sleep(random.uniform(0.1, 0.5))
self.task_queue.put(agent_pb2.ClientTaskMessage(
task_claim=agent_pb2.TaskClaimRequest(task_id=tid, node_id=self.node_id)
))
elif kind == 'claim_status':
status = "GRANTED" if msg.claim_status.granted else "DENIED"
print(f" [📦] Claim {msg.claim_status.task_id}: {status} ({msg.claim_status.reason})", flush=True)
elif kind == 'file_sync':
self._handle_file_sync(msg.file_sync)
elif kind == 'policy_update':
print(f" [🔒] Live Sandbox Policy Update Received.")
self.sandbox.sync(msg.policy_update)
# Apply skill config updates
if msg.policy_update.skill_config_json:
try:
cfg = json.loads(msg.policy_update.skill_config_json)
for name, skill in self.skills.skills.items():
if hasattr(skill, "apply_config"):
skill.apply_config(cfg)
except Exception as e:
print(f" [!] Error applying skill config update: {e}")
def _on_sync_delta(self, session_id, payload):
"""Callback from watcher to push local changes to server."""
if isinstance(payload, agent_pb2.FileSyncMessage):
# Already a full message (e.g. deletion control)
self.task_queue.put(agent_pb2.ClientTaskMessage(file_sync=payload))
else:
# Legacy/Standard chunk update
self.task_queue.put(agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
file_data=payload
)
))
def _handle_file_sync(self, fs):
"""Processes inbound file synchronization messages from the Orchestrator."""
sid = fs.session_id
# LOGGING
type_str = fs.WhichOneof('payload')
print(f" [📁] Sync MSG: {type_str} | Session: {sid}")
if fs.HasField("manifest"):
needs_update = self.sync_mgr.handle_manifest(sid, fs.manifest)
if needs_update:
print(f" [📁⚠️] Drift Detected for {sid}: {len(needs_update)} files need sync")
self.task_queue.put(agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=sid,
status=agent_pb2.SyncStatus(
code=agent_pb2.SyncStatus.RECONCILE_REQUIRED,
message=f"Drift detected in {len(needs_update)} files",
reconcile_paths=needs_update
)
)
))
else:
self.task_queue.put(agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=sid,
status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.OK, message="Synchronized")
)
))
elif fs.HasField("file_data"):
# M6: High-Concurrency Disk I/O Offloading with Backpressure
# We use a semaphore to limit the number of pending I/O tasks in the executor queue.
# This prevents memory ballooning if the network is faster than the disk.
self.io_semaphore.acquire()
try:
self.io_executor.submit(self._async_write_chunk, sid, fs.file_data)
except Exception:
self.io_semaphore.release() # Release if submission fails
elif fs.HasField("control"):
ctrl = fs.control
print(f" [📁] Control Action: {ctrl.action} (Path: {ctrl.path})")
if ctrl.action == agent_pb2.SyncControl.START_WATCHING:
# Path relative to sync dir or absolute
watch_path = ctrl.path if os.path.isabs(ctrl.path) else os.path.join(self.sync_mgr.get_session_dir(sid), ctrl.path)
print(f" [📁👁️] Starting Watcher on: {watch_path}")
self.watcher.start_watching(sid, watch_path)
elif ctrl.action == agent_pb2.SyncControl.STOP_WATCHING:
self.watcher.stop_watching(sid)
elif ctrl.action == agent_pb2.SyncControl.LOCK:
self.watcher.set_lock(sid, True)
elif ctrl.action == agent_pb2.SyncControl.UNLOCK:
self.watcher.set_lock(sid, False)
elif ctrl.action == agent_pb2.SyncControl.REFRESH_MANIFEST:
if ctrl.request_paths:
print(f" [📁📤] Turbo Pushing {len(ctrl.request_paths)} Requested Files for {sid} in parallel")
from concurrent.futures import ThreadPoolExecutor
requested = list(ctrl.request_paths)
# Increased worker count for high-concurrency sync
max_workers = min(100, len(requested))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for path in requested:
# Pre-check existence to avoid redundant executor tasks
watch_path = self._get_base_dir(sid, create=False)
abs_path = os.path.normpath(os.path.join(watch_path, path))
if os.path.exists(abs_path):
executor.submit(self._push_file, sid, path)
else:
print(f" [📁❓] Skipping push for non-existent file: {path}")
else:
# Node -> Server Manifest Push
self._push_full_manifest(sid, ctrl.path)
elif ctrl.action == agent_pb2.SyncControl.RESYNC:
self._push_full_manifest(sid, ctrl.path)
elif ctrl.action == agent_pb2.SyncControl.PURGE:
print(f" [📁🧹] Node instructed to purge session sync data: {sid}")
self.watcher.stop_watching(sid) # Stop watching before deleting
self.sync_mgr.purge(sid)
elif ctrl.action == agent_pb2.SyncControl.CLEANUP:
print(f" [📁🧹] Node proactively cleaning up defunct sessions. Active: {ctrl.request_paths}")
active_sessions = list(ctrl.request_paths)
self.sync_mgr.cleanup_unused_sessions(active_sessions)
# --- M6: FS Explorer Handlers ---
elif ctrl.action == agent_pb2.SyncControl.LIST:
print(f" [📁📂] List Directory: {ctrl.path}")
self._push_full_manifest(sid, ctrl.path, task_id=fs.task_id, shallow=True)
elif ctrl.action == agent_pb2.SyncControl.READ:
print(f" [📁📄] Read File: {ctrl.path}")
self._push_file(sid, ctrl.path, task_id=fs.task_id)
elif ctrl.action == agent_pb2.SyncControl.WRITE:
print(f" [📁💾] Write File: {ctrl.path} (is_dir={ctrl.is_dir})")
self._handle_fs_write(sid, ctrl.path, ctrl.content, ctrl.is_dir, task_id=fs.task_id)
elif ctrl.action == agent_pb2.SyncControl.DELETE:
print(f" [📁🗑️] Delete Fragment: {ctrl.path}")
self._handle_fs_delete(sid, ctrl.path, task_id=fs.task_id)
def _get_base_dir(self, session_id, create=False):
"""Helper to resolve the effective root for a session (Watcher > SyncDir)."""
if session_id == "__fs_explorer__":
root = config.FS_ROOT
print(f" [📁] Explorer Root: {root}")
return root
# Priority 1: If we have an active watcher, use its root (e.g. Seed from Local)
watched = self.watcher.get_watch_path(session_id)
if watched:
print(f" [📁] Using Watched Path as Base: {watched}")
return watched
# Priority 2: Standard session-scoped sync directory
fallback = self.sync_mgr.get_session_dir(session_id, create=create)
print(f" [📁] Falling back to SyncDir: {fallback}")
return fallback
def _push_full_manifest(self, session_id, rel_path=".", task_id="", shallow=False):
"""Pushes the current local manifest back to the server."""
print(f" [📁📤] Pushing {'Shallow' if shallow else 'Full'} Manifest for {session_id}")
base_dir = self._get_base_dir(session_id, create=True)
# Ensure rel_path is relative if it's within a session sync dir
safe_rel = rel_path.lstrip("/") if session_id != "__fs_explorer__" else rel_path
watch_path = os.path.normpath(os.path.join(base_dir, safe_rel))
if not os.path.exists(watch_path):
# If the specific sub-path doesn't exist, try to create it if it's within the session dir
if session_id != "__fs_explorer__":
os.makedirs(watch_path, exist_ok=True)
else:
self.task_queue.put(agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
task_id=task_id,
status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.ERROR, message=f"Path {rel_path} not found")
)
))
return
files = []
try:
if shallow:
# Optimized for Explorer: immediate children only, no hashing
with os.scandir(watch_path) as it:
for entry in it:
is_dir = entry.is_dir()
# Use metadata only
try:
stats = entry.stat()
size = stats.st_size if not is_dir else 0
except: size = 0
# Calculate path relative to the actual base_sync_dir / session_dir
# rel_path is the directory we are currently browsing.
# entry.name is the file within it.
item_rel_path = os.path.relpath(os.path.join(watch_path, entry.name), base_dir)
files.append(agent_pb2.FileInfo(path=item_rel_path, size=size, hash="", is_dir=is_dir))
else:
# Deep walk with full hashes for reconciliation
for root, dirs, filenames in os.walk(watch_path):
for filename in filenames:
abs_path = os.path.join(root, filename)
# r_path must be relative to base_dir so the server correctly joins it to the mirror root
r_path = os.path.relpath(abs_path, base_dir)
try:
# Memory-safe incremental hashing
h = hashlib.sha256()
with open(abs_path, "rb") as f:
while True:
chunk = f.read(1024 * 1024) # 1MB chunks
if not chunk: break
h.update(chunk)
file_hash = h.hexdigest()
files.append(agent_pb2.FileInfo(
path=r_path,
size=os.path.getsize(abs_path),
hash=file_hash,
is_dir=False
))
except Exception: continue
for d in dirs:
abs_path = os.path.join(root, d)
# r_path must be relative to base_dir so the server correctly joins it to the mirror root
r_path = os.path.relpath(abs_path, base_dir)
files.append(agent_pb2.FileInfo(path=r_path, size=0, hash="", is_dir=True))
except Exception as e:
print(f" [❌] Manifest generation failed for {rel_path}: {e}")
self.task_queue.put(agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
task_id=task_id,
status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.ERROR, message=str(e))
)
))
return
self.task_queue.put(agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
task_id=task_id,
manifest=agent_pb2.DirectoryManifest(root_path=rel_path, files=files)
)
))
def _handle_fs_write(self, session_id, rel_path, content, is_dir, task_id=""):
"""Modular FS Write/Create."""
try:
base_dir = os.path.normpath(self._get_base_dir(session_id, create=True))
# Ensure rel_path is relative if it's within a session sync dir
safe_rel = rel_path.lstrip("/") if session_id != "__fs_explorer__" else rel_path
target_path = os.path.normpath(os.path.join(base_dir, safe_rel))
print(f" [📁💾] target_path: {target_path} (base_dir: {base_dir})")
# M6: Check if path is within session base_dir OR global config.FS_ROOT
allowed = target_path.startswith(base_dir)
if not allowed and config.FS_ROOT:
allowed = target_path.startswith(os.path.normpath(config.FS_ROOT))
if not allowed:
raise Exception(f"Path traversal attempt blocked: {target_path} is outside {base_dir} (config.FS_ROOT: {config.FS_ROOT})")
if is_dir:
os.makedirs(target_path, exist_ok=True)
else:
os.makedirs(os.path.dirname(target_path), exist_ok=True)
with open(target_path, "wb") as f:
f.write(content)
# Send OK status
self.task_queue.put(agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
task_id=task_id,
status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.OK, message=f"{'Directory' if is_dir else 'File'} written")
)
))
# Trigger manifest refresh so UI updates
self._push_full_manifest(session_id, os.path.dirname(rel_path) or ".", task_id=task_id, shallow=True)
except Exception as e:
self.task_queue.put(agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
task_id=task_id,
status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.ERROR, message=str(e))
)
))
def _handle_fs_delete(self, session_id, rel_path, task_id=""):
"""Modular FS Delete."""
try:
base_dir = os.path.normpath(self._get_base_dir(session_id))
# Ensure rel_path is relative if it's within a session sync dir
safe_rel = rel_path.lstrip("/") if session_id != "__fs_explorer__" else rel_path
target_path = os.path.normpath(os.path.join(base_dir, safe_rel))
allowed = target_path.startswith(base_dir)
if not allowed and config.FS_ROOT:
allowed = target_path.startswith(os.path.normpath(config.FS_ROOT))
if not allowed:
raise Exception(f"Path traversal attempt blocked: {target_path} is outside {base_dir} (config.FS_ROOT: {config.FS_ROOT})")
if not os.path.exists(target_path):
raise Exception("File not found")
import shutil
if os.path.isdir(target_path):
shutil.rmtree(target_path)
else:
os.remove(target_path)
# Send OK status
self.task_queue.put(agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
task_id=task_id,
status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.OK, message=f"Deleted {rel_path}")
)
))
# Trigger manifest refresh so UI updates
self._push_full_manifest(session_id, os.path.dirname(rel_path) or ".", task_id=task_id, shallow=True)
except Exception as e:
self.task_queue.put(agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
task_id=task_id,
status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.ERROR, message=str(e))
)
))
def _async_write_chunk(self, sid, payload):
"""Worker function for background parallelized I/O with out-of-order chunk support."""
path = payload.path
# M6: Path-Level Locking for Sequential Consistency
# While chunks can arrive out of order, we must process them sequentially
# to guarantee the final hash verify/swap is correctly ordered.
with self.lock_map_mutex:
lock_key = (sid, path)
if lock_key not in self.write_locks:
self.write_locks[lock_key] = threading.Lock()
lock = self.write_locks[lock_key]
with lock:
try:
if payload.chunk_index == 0:
self.watcher.suppress_path(sid, path)
success = self.sync_mgr.write_chunk(sid, payload)
if payload.is_final:
if success and payload.hash:
self.watcher.acknowledge_remote_write(sid, path, payload.hash)
self.watcher.unsuppress_path(sid, path)
print(f" [📁] Async File Sync Complete (Sequenced Parallel): {path} (Success: {success})")
# M6: Clean up the lock entry after finalization
with self.lock_map_mutex:
if lock_key in self.write_locks:
del self.write_locks[lock_key]
# Report status back to orchestrator
status = agent_pb2.SyncStatus.OK if success else agent_pb2.SyncStatus.ERROR
self.task_queue.put(agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=sid,
status=agent_pb2.SyncStatus(code=status, message=f"File {path} synced")
)
))
finally:
# Always release the semaphore to allow the next I/O task
self.io_semaphore.release()
def _push_file(self, session_id, rel_path, task_id=""):
"""Pushes a specific file from node to server."""
watch_path = os.path.normpath(self._get_base_dir(session_id, create=False))
# Ensure rel_path is relative if it's within a session sync dir
safe_rel = rel_path.lstrip("/") if session_id != "__fs_explorer__" else rel_path
abs_path = os.path.normpath(os.path.join(watch_path, safe_rel))
allowed = abs_path.startswith(watch_path)
if not allowed and config.FS_ROOT:
allowed = abs_path.startswith(os.path.normpath(config.FS_ROOT))
if not allowed:
print(f" [📁🚫] Blocked traversal attempt in _push_file: {rel_path} (Valid Roots: {watch_path}, config.FS_ROOT: {config.FS_ROOT})")
return
if not os.path.exists(abs_path):
print(f" [📁❓] Requested file {rel_path} not found on node")
return
# Optimization: 4MB Incremental Hashing + Zero Throttling
hasher = hashlib.sha256()
file_size = os.path.getsize(abs_path)
chunk_size = 4 * 1024 * 1024
total_chunks = (file_size + chunk_size - 1) // chunk_size if file_size > 0 else 1
try:
with open(abs_path, "rb") as f:
index = 0
while True:
chunk = f.read(chunk_size)
if not chunk: break
hasher.update(chunk)
offset = f.tell() - len(chunk)
is_final = f.tell() >= file_size
# Compress Chunk for transit
compressed_chunk = zlib.compress(chunk)
# M6: Use dictionary unpack for safe assignment (robust against old proto versions)
payload_fields = {
"path": rel_path,
"chunk": compressed_chunk,
"chunk_index": index,
"is_final": is_final,
"hash": hasher.hexdigest() if is_final else "",
"offset": offset,
"compressed": True,
}
# Only add new fields if supported by the compiled proto
if hasattr(agent_pb2.FilePayload, "total_chunks"):
payload_fields["total_chunks"] = total_chunks
payload_fields["total_size"] = file_size
self.task_queue.put(agent_pb2.ClientTaskMessage(
file_sync=agent_pb2.FileSyncMessage(
session_id=session_id,
task_id=task_id,
file_data=agent_pb2.FilePayload(**payload_fields)
)
))
if is_final: break
index += 1
except Exception as e:
print(f" [📁📤] Error pushing {rel_path}: {e}")
def _handle_task(self, task):
print(f"[*] Task Launch: {task.task_id}", flush=True)
# 1. Cryptographic Signature Verification
if not verify_task_signature(task):
print(f"[!] Signature Validation Failed for {task.task_id}", flush=True)
# Report back to hub so the frontend gets a real error, not a silent timeout
self._send_response(
task.task_id,
agent_pb2.TaskResponse(
task_id=task.task_id,
status=agent_pb2.TaskResponse.ERROR,
stderr="[NODE] HMAC signature mismatch — check that AGENT_SECRET_KEY on the node matches the hub SECRET_KEY. Task rejected.",
)
)
return
print(f"[✅] Validated task {task.task_id}", flush=True)
# 2. Skill Manager Submission
success, reason = self.skills.submit(task, self.sandbox, self._on_finish, self._on_event)
if not success:
print(f"[!] Execution Rejected: {reason}", flush=True)
self._send_response(
task.task_id,
agent_pb2.TaskResponse(
task_id=task.task_id,
status=agent_pb2.TaskResponse.ERROR,
stderr=f"[NODE] Execution Rejected: {reason}",
)
)
def _on_event(self, event):
"""Live Event Tunneler: Routes browser/skill events into the main stream."""
if isinstance(event, agent_pb2.ClientTaskMessage):
self.task_queue.put(event)
else:
# Legacy/Browser Skill fallback
self.task_queue.put(agent_pb2.ClientTaskMessage(browser_event=event))
def _on_finish(self, tid, res, trace):
"""Final Completion Callback: Routes task results back to server."""
print(f"[*] Completion: {tid}", flush=True)
# 0 is SUCCESS, 1 is ERROR in Protobuf
status = res.get('status', agent_pb2.TaskResponse.ERROR)
tr = agent_pb2.TaskResponse(
task_id=tid, status=status,
stdout=res.get('stdout',''),
stderr=res.get('stderr',''),
trace_id=trace,
browser_result=res.get("browser_result")
)
self._send_response(tid, tr)
def _send_response(self, tid, tr=None, status=None):
"""Utility for placing response messages into the gRPC outbound queue."""
if tr:
self.task_queue.put(agent_pb2.ClientTaskMessage(task_response=tr))
else:
self.task_queue.put(agent_pb2.ClientTaskMessage(
task_response=agent_pb2.TaskResponse(task_id=tid, status=status)
))
def stop(self):
"""Gracefully stops all background services and skills."""
print(f"\n[🛑] Stopping Agent Node: {self.node_id}")
self._stop_event.set()
# 1. Stop Skills
self.skills.shutdown()
# 2. Stop Watcher
self.watcher.shutdown()
# 3. Shutdown IO Executor
self.io_executor.shutdown(wait=False)
# 4. Close gRPC channel
if self.channel:
try:
self.channel.close()
except Exception as e:
print(f"[!] Error closing channel: {e}")