diff --git a/agent-node/VERSION b/agent-node/VERSION index c787b21..c3ccccf 100644 --- a/agent-node/VERSION +++ b/agent-node/VERSION @@ -1 +1 @@ -1.0.22 +1.0.59 diff --git a/agent-node/src/agent_node/core/watcher.py b/agent-node/src/agent_node/core/watcher.py index 4946f53..d755d33 100644 --- a/agent-node/src/agent_node/core/watcher.py +++ b/agent-node/src/agent_node/core/watcher.py @@ -2,8 +2,16 @@ import time import os import hashlib -from watchdog.observers import Observer -from watchdog.events import FileSystemEventHandler +import zlib +try: + from watchdog.observers import Observer + from watchdog.events import FileSystemEventHandler + HAS_WATCHDOG = True +except ImportError: + # Optional dependency: Only required for live file sync/push-to-node features. + Observer = object + FileSystemEventHandler = object + HAS_WATCHDOG = False from shared_core.ignore import CortexIgnore from protos import agent_pb2 @@ -16,6 +24,8 @@ self.ignore_filter = CortexIgnore(root_path) self.last_sync = {} # path -> last_hash self.locked = False + self.suppressed_paths = set() # Paths currently being modified by the system + self.syncing_paths = set() # Paths currently being scanned/pushed def on_modified(self, event): if not event.is_directory: @@ -25,16 +35,34 @@ if not event.is_directory: self._process_change(event.src_path) - def on_moved(self, event): - # Simplification: treat move as a delete and create, or just process the dest - self._process_change(event.dest_path) + def on_closed(self, event): + # critical for large writes like 'dd' or 'cp' that trigger many modified events + if not event.is_directory: + self._process_change(event.src_path, force=True) - def _process_change(self, abs_path): + def on_deleted(self, event): + if not event.is_directory: + rel_path = os.path.normpath(os.path.relpath(event.src_path, self.root_path)) + if not self.ignore_filter.is_ignored(rel_path): + self.callback(self.session_id, agent_pb2.FileSyncMessage( + session_id=self.session_id, + control=agent_pb2.SyncControl(action=agent_pb2.SyncControl.DELETE, path=rel_path) + )) + + def on_moved(self, event): + # Treat as delete of src and create of dest + self.on_deleted(event) + self._process_change(event.dest_path, force=True) + + def _process_change(self, abs_path, force=False): if self.locked: return # Block all user edits when session is locked rel_path = os.path.normpath(os.path.relpath(abs_path, self.root_path)) + if rel_path in self.suppressed_paths: + return # Ignore changes coming from the sync manager + # Phase 3: Dynamic reload if .cortexignore / .gitignore changed if rel_path in [".cortexignore", ".gitignore"]: print(f" [*] Reloading Ignore Filter for {self.session_id}") @@ -43,32 +71,106 @@ if self.ignore_filter.is_ignored(rel_path): return + if rel_path in self.syncing_paths: + return + + self.syncing_paths.add(rel_path) + try: - with open(abs_path, "rb") as f: - content = f.read() - file_hash = hashlib.sha256(content).hexdigest() - - if self.last_sync.get(rel_path) == file_hash: - return # No actual change + # Step 0: Settle Check (Debounce) + if not force: + # Wait a moment to see if the file is still being written to. + # This is critical for tools like 'dd' or 'cp' that write in bursts. + try: + initial_mtime = os.path.getmtime(abs_path) + initial_size = os.path.getsize(abs_path) + time.sleep(1.0) + if not os.path.exists(abs_path): return # File deleted during wait + + current_mtime = os.path.getmtime(abs_path) + current_size = os.path.getsize(abs_path) + + if current_mtime != initial_mtime or current_size != initial_size: + # Still being modified. We'll skip this event and let the next + # 'modified' event trigger the actual sync. + return + except (OSError, FileNotFoundError): + return - self.last_sync[rel_path] = file_hash - print(f" [๐Ÿ“๐Ÿ“ค] Detected Change: {rel_path}") + if not os.path.exists(abs_path): + return + + file_size = os.path.getsize(abs_path) + chunk_size = 1024 * 1024 # 1MB buffer for hashing/stream + total_chunks = (file_size + chunk_size - 1) // chunk_size if file_size > 0 else 1 - # Chunk and Send - chunk_size = 64 * 1024 - for i in range(0, len(content), chunk_size): - chunk = content[i:i + chunk_size] - is_final = i + chunk_size >= len(content) - payload = agent_pb2.FilePayload( - path=rel_path, - chunk=chunk, - chunk_index=i // chunk_size, - is_final=is_final, - hash=file_hash if is_final else "" - ) - self.callback(self.session_id, payload) + # Memory-safe incremental hashing + hasher = hashlib.sha256() + + with open(abs_path, "rb") as f: + offset = 0 + index = 0 + while True: + chunk = f.read(chunk_size) + if not chunk: + break + + hasher.update(chunk) + + # Compress Chunk for transit + compressed_chunk = zlib.compress(chunk) + + payload_fields = { + "path": rel_path, + "chunk": compressed_chunk, + "chunk_index": index, + "is_final": False, + "offset": offset, + "compressed": True, + "hash": "", + } + if hasattr(agent_pb2.FilePayload, "total_chunks"): + payload_fields["total_chunks"] = total_chunks + payload_fields["total_size"] = file_size + + payload = agent_pb2.FilePayload(**payload_fields) + + # Callback pushes to gRPC queue + self.callback(self.session_id, payload) + + offset += len(chunk) + index += 1 + + # Update internal tracking with the final hash + file_hash = hasher.hexdigest() + + # Signal completion with the hash to the server + # (The server uses this to verify integrity and perform the atomic swap) + sentinel_fields = { + "path": rel_path, + "is_final": True, + "hash": file_hash, + "chunk_index": index, + "offset": offset, + } + if hasattr(agent_pb2.FilePayload, "total_chunks"): + sentinel_fields["total_chunks"] = total_chunks + sentinel_fields["total_size"] = file_size + + self.callback(self.session_id, agent_pb2.FilePayload(**sentinel_fields)) + + if self.last_sync.get(rel_path) == file_hash: + # Chunks were already sent, so we must send sentinel above, + # but we can skip the log message. + return + + self.last_sync[rel_path] = file_hash + print(f" [๐Ÿ“๐Ÿ“ค] Streaming Sync Complete: {rel_path} ({file_size} bytes)") + except Exception as e: print(f" [!] Watcher Error for {rel_path}: {e}") + finally: + self.syncing_paths.discard(rel_path) class WorkspaceWatcher: """Manages FS observers for active synchronization.""" @@ -84,7 +186,11 @@ def start_watching(self, session_id, root_path): if session_id in self.observers: self.stop_watching(session_id) - + + if not HAS_WATCHDOG: + print(f"[!] Warning: 'watchdog' not installed. File sync disabled for session {session_id}") + return + print(f"[*] Starting Watcher for Session {session_id} at {root_path}") os.makedirs(root_path, exist_ok=True) @@ -107,6 +213,25 @@ return self.observers[session_id][1].root_path return None + def acknowledge_remote_write(self, session_id, rel_path, file_hash): + """Updates the internal hash record to match a remote write, preventing an echo-back.""" + if session_id in self.observers: + _, handler = self.observers[session_id] + handler.last_sync[rel_path] = file_hash + + def suppress_path(self, session_id, rel_path): + """Tells the watcher to ignore events for a specific path (e.g. during sync).""" + if session_id in self.observers: + _, handler = self.observers[session_id] + handler.suppressed_paths.add(rel_path) + + def unsuppress_path(self, session_id, rel_path): + """Resumes watching a specific path.""" + if session_id in self.observers: + _, handler = self.observers[session_id] + # Use discard to avoid KeyError if it wasn't there + handler.suppressed_paths.discard(rel_path) + def shutdown(self): for sid in list(self.observers.keys()): self.stop_watching(sid) diff --git a/agent-node/src/agent_node/node.py b/agent-node/src/agent_node/node.py index f7ba8b7..3cd8855 100644 --- a/agent-node/src/agent_node/node.py +++ b/agent-node/src/agent_node/node.py @@ -5,7 +5,12 @@ import os import hashlib import logging -import psutil +import json +import zlib +try: + import psutil +except ImportError: + psutil = None from protos import agent_pb2, agent_pb2_grpc logger = logging.getLogger(__name__) @@ -27,8 +32,45 @@ self.sync_mgr = NodeSyncManager() self.skills = SkillManager(max_workers=config.MAX_SKILL_WORKERS, sync_mgr=self.sync_mgr) self.watcher = WorkspaceWatcher(self._on_sync_delta) - self.task_queue = queue.Queue() - self.stub = get_secure_stub() + self.task_queue = queue.Queue(maxsize=100) # backpressure + self.stub = None + self.channel = None + self._stop_event = threading.Event() + self._refresh_stub() + + # M6: Parallel Disk I/O Workers + from concurrent.futures import ThreadPoolExecutor + self.io_executor = ThreadPoolExecutor(max_workers=10, thread_name_prefix="NodeIO") + # Backpressure for I/O: Prevent memory ballooning during heavy sync + self.io_semaphore = threading.Semaphore(50) + self.write_locks = {} # (sid, path) -> threading.Lock + self.lock_map_mutex = threading.Lock() + + def _refresh_stub(self): + """Force refreshes the gRPC channel and stub, ensuring old ones are closed.""" + if self.channel: + try: + self.channel.close() + except: + pass + self.stub, self.channel = get_secure_stub() + self._setup_connectivity_watcher() + + def _setup_connectivity_watcher(self): + """Monitor gRPC channel state and log only on actual transitions.""" + import grpc + self._last_grpc_state = None + + def _on_state_change(state): + if self._stop_event.is_set(): + return + if state != self._last_grpc_state: + print(f"[*] [gRPC-State] {state}", flush=True) + self._last_grpc_state = state + + # Persistent subscription โ€” only call ONCE per channel. + # Re-subscribing inside the callback causes an exponential callback leak. + self.channel.subscribe(_on_state_change, try_to_connect=True) def _collect_capabilities(self) -> dict: """Collect hardware metadata to advertise at registration.""" @@ -39,12 +81,17 @@ caps = { "shell": "v1", - "browser": "playwright-sync-bridge", "arch": platform.machine(), # e.g. x86_64, arm64, aarch64 "os": platform.system().lower(), # linux, darwin, windows "os_release": platform.release(), } + # Dynamic Browser Capability Detection + if hasattr(self, "skills"): + browser_skill = self.skills.skills.get("browser") + if browser_skill and browser_skill.is_available(): + caps["browser"] = "playwright-sync-bridge" + # Privilege Detection # is_root: True if UID 0 (Linux/macOS) โ€” no sudo needed at all # has_sudo: True if sudo is installed AND available passwordlessly @@ -101,6 +148,17 @@ else: caps["gpu"] = "none" + # Display Detection โ€” can this node show a real browser window? + # macOS and Windows always have a graphical display. + # Linux needs DISPLAY (X11) or WAYLAND_DISPLAY set. + _os = caps.get("os", "") + if _os in ("darwin", "windows"): + caps["has_display"] = True + else: + has_x11 = bool(os.environ.get("DISPLAY", "").strip()) + has_wayland = bool(os.environ.get("WAYLAND_DISPLAY", "").strip()) + caps["has_display"] = has_x11 or has_wayland + return caps def sync_configuration(self): @@ -110,6 +168,9 @@ config.reload() self.node_id = config.NODE_ID self.skills.max_workers = config.MAX_SKILL_WORKERS + # Ensure stub is fresh if we re-enter from a crash + if not self.stub: + self._refresh_stub() print(f"[*] Handshake with Orchestrator: {self.node_id}") caps = self._collect_capabilities() @@ -126,45 +187,78 @@ ) try: - res = self.stub.SyncConfiguration(reg_req) + print(f"[*] [gRPC-Handshake] Sending SyncConfiguration (timeout=10s)...", flush=True) + res = self.stub.SyncConfiguration(reg_req, timeout=10) if res.success: self.sandbox.sync(res.policy) - print("[OK] Sandbox Policy Synced.") + print("[OK] [gRPC-Handshake] Handshake successful. Sandbox Policy Synced.") + + # Apply initial skill config + if res.policy.skill_config_json: + try: + cfg = json.loads(res.policy.skill_config_json) + for name, skill in self.skills.skills.items(): + if hasattr(skill, "apply_config"): + skill.apply_config(cfg) + except Exception as e: + print(f"[!] Error applying initial skill config: {e}") break # Success, exit the retry loop else: print(f"[!] Rejection: {res.error_message}") print("[!] Retrying handshake in 5 seconds...") time.sleep(5) except Exception as e: - print(f"[!] Connection Fail: {e}") + err_desc = self._format_grpc_error(e) + print(f"[!] Connection Fail: {err_desc}") print("[!] Retrying handshake in 5 seconds...") time.sleep(5) + def _format_grpc_error(self, e) -> str: + """Helper to extract detailed info from gRPC exceptions.""" + try: + import grpc + if isinstance(e, grpc.RpcError): + return f"gRPC Error {e.code()} | {e.details()}" + except: + pass + return str(e) + def start_health_reporting(self): """Streaming node metrics to the orchestrator for load balancing.""" def _report(): - while True: + while not self._stop_event.is_set(): try: def _gen(): - while True: + while not self._stop_event.is_set(): ids = self.skills.get_active_ids() # Collection - cpu = psutil.cpu_percent(interval=1.0) - per_core = psutil.cpu_percent(percpu=True) - - vmem = psutil.virtual_memory() - mem_percent = vmem.percent - - # GB conversion - used_gb = vmem.used / (1024**3) - total_gb = vmem.total / (1024**3) - avail_gb = vmem.available / (1024**3) + if psutil: + # Optimization: Use non-blocking CPU check. + # interval=None (default) prevents blocking the gRPC thread. + cpu = psutil.cpu_percent(interval=None) + per_core = psutil.cpu_percent(percpu=True, interval=None) + vmem = psutil.virtual_memory() + mem_percent = vmem.percent + used_gb = vmem.used / (1024**3) + total_gb = vmem.total / (1024**3) + avail_gb = vmem.available / (1024**3) + cpu_count = psutil.cpu_count() + else: + cpu = 0.0 + per_core = [] + mem_percent = 0.0 + used_gb = 0.0 + total_gb = 0.0 + avail_gb = 0.0 + cpu_count = 0 # Freq & Load - try: - freq = psutil.cpu_freq().current - except: - freq = 0 + freq = 0 + if psutil: + try: + freq = psutil.cpu_freq().current + except: + pass try: load = list(os.getloadavg()) @@ -178,7 +272,7 @@ active_worker_count=len(ids), max_worker_capacity=config.MAX_SKILL_WORKERS, running_task_ids=ids, - cpu_count=psutil.cpu_count(), + cpu_count=cpu_count, memory_used_gb=used_gb, memory_total_gb=total_gb, # M6 Fields @@ -194,7 +288,8 @@ # We don't strictly need the server time, but it confirms a round-trip pass except Exception as e: - print(f"[!] Health reporting interrupted: {e}. Retrying in 5s...") + err_desc = self._format_grpc_error(e) + print(f"[!] Health reporting interrupted: {err_desc}. Retrying in 5s...") time.sleep(5) # Non-blocking thread for health heartbeat @@ -209,29 +304,28 @@ announce_msg = agent_pb2.ClientTaskMessage( announce=agent_pb2.NodeAnnounce(node_id=self.node_id) ) - if config.DEBUG_GRPC: - print(f"[*] [DEBUG-gRPC] OUTBOUND: announce | {announce_msg}", flush=True) yield announce_msg while True: out_msg = self.task_queue.get() - if config.DEBUG_GRPC: - kind = out_msg.WhichOneof('payload') - if kind == 'file_sync' and out_msg.file_sync.HasField('file_data'): - print(f"[*] [DEBUG-gRPC] OUTBOUND: {kind} (file_data, path={out_msg.file_sync.file_data.path}, size={len(out_msg.file_sync.file_data.chunk)})", flush=True) - elif kind == 'skill_event' and out_msg.skill_event.WhichOneof('data') == 'terminal_out': - print(f"[*] [DEBUG-gRPC] OUTBOUND: {kind} (terminal_out, size={len(out_msg.skill_event.terminal_out)})", flush=True) - else: - print(f"[*] [DEBUG-gRPC] OUTBOUND: {kind} | {out_msg}", flush=True) - yield out_msg + try: + yield out_msg + except Exception as ye: + print(f"[*] [gRPC-Stream] !!! Send Error: {ye}", flush=True) + raise ye responses = self.stub.TaskStream(_gen()) - print(f"[*] Task Stream Online: {self.node_id}", flush=True) + print(f"[*] [gRPC-Stream] Connected to Orchestrator ({self.node_id}).", flush=True) for msg in responses: self._process_server_message(msg) + + print(f"[*] [gRPC-Stream] Connection closed by server.", flush=True) except Exception as e: - print(f"[!] Task Stream Failure: {e}. Reconnecting in 5s...", flush=True) + err_desc = self._format_grpc_error(e) + print(f"[!] Task Stream Failure: {err_desc}. Reconnecting in 5s...", flush=True) + # Force refresh stub on reconnection, closing old channel + self._refresh_stub() time.sleep(5) # Re-sync config in case permissions changed during downtime try: self.sync_configuration() @@ -239,10 +333,11 @@ def _process_server_message(self, msg): kind = msg.WhichOneof('payload') - if config.DEBUG_GRPC: - print(f"[*] [DEBUG-gRPC] INBOUND: {kind} | {msg}", flush=True) - else: - print(f"[*] Inbound: {kind}", flush=True) + if config.DEBUG_GRPC or True: # Force logging for now to debug Mac + if kind == 'file_sync' and msg.file_sync.HasField('control'): + print(f"[*] Inbound: {kind} (control={msg.file_sync.control.action})", flush=True) + else: + print(f"[*] Inbound: {kind}", flush=True) if kind == 'task_request': self._handle_task(msg.task_request) @@ -274,15 +369,30 @@ elif kind == 'policy_update': print(f" [๐Ÿ”’] Live Sandbox Policy Update Received.") self.sandbox.sync(msg.policy_update) + + # Apply skill config updates + if msg.policy_update.skill_config_json: + try: + cfg = json.loads(msg.policy_update.skill_config_json) + for name, skill in self.skills.skills.items(): + if hasattr(skill, "apply_config"): + skill.apply_config(cfg) + except Exception as e: + print(f" [!] Error applying skill config update: {e}") - def _on_sync_delta(self, session_id, file_payload): + def _on_sync_delta(self, session_id, payload): """Callback from watcher to push local changes to server.""" - self.task_queue.put(agent_pb2.ClientTaskMessage( - file_sync=agent_pb2.FileSyncMessage( - session_id=session_id, - file_data=file_payload - ) - )) + if isinstance(payload, agent_pb2.FileSyncMessage): + # Already a full message (e.g. deletion control) + self.task_queue.put(agent_pb2.ClientTaskMessage(file_sync=payload)) + else: + # Legacy/Standard chunk update + self.task_queue.put(agent_pb2.ClientTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + file_data=payload + ) + )) def _handle_file_sync(self, fs): """Processes inbound file synchronization messages from the Orchestrator.""" @@ -313,16 +423,14 @@ ) )) elif fs.HasField("file_data"): - success = self.sync_mgr.write_chunk(sid, fs.file_data) - if fs.file_data.is_final: - print(f" [๐Ÿ“] File Received: {fs.file_data.path} (Verified: {success})") - status = agent_pb2.SyncStatus.OK if success else agent_pb2.SyncStatus.ERROR - self.task_queue.put(agent_pb2.ClientTaskMessage( - file_sync=agent_pb2.FileSyncMessage( - session_id=sid, - status=agent_pb2.SyncStatus(code=status, message=f"File {fs.file_data.path} synced") - ) - )) + # M6: High-Concurrency Disk I/O Offloading with Backpressure + # We use a semaphore to limit the number of pending I/O tasks in the executor queue. + # This prevents memory ballooning if the network is faster than the disk. + self.io_semaphore.acquire() + try: + self.io_executor.submit(self._async_write_chunk, sid, fs.file_data) + except Exception: + self.io_semaphore.release() # Release if submission fails elif fs.HasField("control"): ctrl = fs.control print(f" [๐Ÿ“] Control Action: {ctrl.action} (Path: {ctrl.path})") @@ -339,9 +447,20 @@ self.watcher.set_lock(sid, False) elif ctrl.action == agent_pb2.SyncControl.REFRESH_MANIFEST: if ctrl.request_paths: - print(f" [๐Ÿ“๐Ÿ“ค] Pushing {len(ctrl.request_paths)} Requested Files for {sid}") - for path in ctrl.request_paths: - self._push_file(sid, path) + print(f" [๐Ÿ“๐Ÿ“ค] Turbo Pushing {len(ctrl.request_paths)} Requested Files for {sid} in parallel") + from concurrent.futures import ThreadPoolExecutor + requested = list(ctrl.request_paths) + # Increased worker count for high-concurrency sync + max_workers = min(100, len(requested)) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + for path in requested: + # Pre-check existence to avoid redundant executor tasks + watch_path = self._get_base_dir(sid, create=False) + abs_path = os.path.normpath(os.path.join(watch_path, path)) + if os.path.exists(abs_path): + executor.submit(self._push_file, sid, path) + else: + print(f" [๐Ÿ“โ“] Skipping push for non-existent file: {path}") else: # Node -> Server Manifest Push self._push_full_manifest(sid, ctrl.path) @@ -393,8 +512,9 @@ print(f" [๐Ÿ“๐Ÿ“ค] Pushing {'Shallow' if shallow else 'Full'} Manifest for {session_id}") base_dir = self._get_base_dir(session_id, create=True) - - watch_path = os.path.normpath(os.path.join(base_dir, rel_path)) + # Ensure rel_path is relative if it's within a session sync dir + safe_rel = rel_path.lstrip("/") if session_id != "__fs_explorer__" else rel_path + watch_path = os.path.normpath(os.path.join(base_dir, safe_rel)) if not os.path.exists(watch_path): # If the specific sub-path doesn't exist, try to create it if it's within the session dir @@ -437,11 +557,21 @@ # r_path must be relative to base_dir so the server correctly joins it to the mirror root r_path = os.path.relpath(abs_path, base_dir) try: + # Memory-safe incremental hashing + h = hashlib.sha256() with open(abs_path, "rb") as f: - h = hashlib.sha256(f.read()).hexdigest() - files.append(agent_pb2.FileInfo(path=r_path, size=os.path.getsize(abs_path), hash=h, is_dir=False)) - except Exception as e: - print(f" [โš ๏ธ] Failed to hash {abs_path}: {e}") + while True: + chunk = f.read(1024 * 1024) # 1MB chunks + if not chunk: break + h.update(chunk) + file_hash = h.hexdigest() + files.append(agent_pb2.FileInfo( + path=r_path, + size=os.path.getsize(abs_path), + hash=file_hash, + is_dir=False + )) + except Exception: continue for d in dirs: abs_path = os.path.join(root, d) @@ -471,7 +601,9 @@ """Modular FS Write/Create.""" try: base_dir = os.path.normpath(self._get_base_dir(session_id, create=True)) - target_path = os.path.normpath(os.path.join(base_dir, rel_path)) + # Ensure rel_path is relative if it's within a session sync dir + safe_rel = rel_path.lstrip("/") if session_id != "__fs_explorer__" else rel_path + target_path = os.path.normpath(os.path.join(base_dir, safe_rel)) print(f" [๐Ÿ“๐Ÿ’พ] target_path: {target_path} (base_dir: {base_dir})") # M6: Check if path is within session base_dir OR global config.FS_ROOT @@ -512,7 +644,9 @@ """Modular FS Delete.""" try: base_dir = os.path.normpath(self._get_base_dir(session_id)) - target_path = os.path.normpath(os.path.join(base_dir, rel_path)) + # Ensure rel_path is relative if it's within a session sync dir + safe_rel = rel_path.lstrip("/") if session_id != "__fs_explorer__" else rel_path + target_path = os.path.normpath(os.path.join(base_dir, safe_rel)) allowed = target_path.startswith(base_dir) if not allowed and config.FS_ROOT: @@ -549,10 +683,56 @@ ) )) + def _async_write_chunk(self, sid, payload): + """Worker function for background parallelized I/O with out-of-order chunk support.""" + path = payload.path + + # M6: Path-Level Locking for Sequential Consistency + # While chunks can arrive out of order, we must process them sequentially + # to guarantee the final hash verify/swap is correctly ordered. + with self.lock_map_mutex: + lock_key = (sid, path) + if lock_key not in self.write_locks: + self.write_locks[lock_key] = threading.Lock() + lock = self.write_locks[lock_key] + + with lock: + try: + if payload.chunk_index == 0: + self.watcher.suppress_path(sid, path) + + success = self.sync_mgr.write_chunk(sid, payload) + + if payload.is_final: + if success and payload.hash: + self.watcher.acknowledge_remote_write(sid, path, payload.hash) + + self.watcher.unsuppress_path(sid, path) + print(f" [๐Ÿ“] Async File Sync Complete (Sequenced Parallel): {path} (Success: {success})") + + # M6: Clean up the lock entry after finalization + with self.lock_map_mutex: + if lock_key in self.write_locks: + del self.write_locks[lock_key] + + # Report status back to orchestrator + status = agent_pb2.SyncStatus.OK if success else agent_pb2.SyncStatus.ERROR + self.task_queue.put(agent_pb2.ClientTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=sid, + status=agent_pb2.SyncStatus(code=status, message=f"File {path} synced") + ) + )) + finally: + # Always release the semaphore to allow the next I/O task + self.io_semaphore.release() + def _push_file(self, session_id, rel_path, task_id=""): """Pushes a specific file from node to server.""" watch_path = os.path.normpath(self._get_base_dir(session_id, create=False)) - abs_path = os.path.normpath(os.path.join(watch_path, rel_path)) + # Ensure rel_path is relative if it's within a session sync dir + safe_rel = rel_path.lstrip("/") if session_id != "__fs_explorer__" else rel_path + abs_path = os.path.normpath(os.path.join(watch_path, safe_rel)) allowed = abs_path.startswith(watch_path) if not allowed and config.FS_ROOT: @@ -566,36 +746,53 @@ print(f" [๐Ÿ“โ“] Requested file {rel_path} not found on node") return - with open(abs_path, "rb") as f: - full_data = f.read() - full_hash = hashlib.sha256(full_data).hexdigest() - f.seek(0) - - index = 0 - while True: - chunk = f.read(1024 * 512) # 512KB chunks - is_final = len(chunk) < 1024 * 512 - - self.task_queue.put(agent_pb2.ClientTaskMessage( - file_sync=agent_pb2.FileSyncMessage( - session_id=session_id, - task_id=task_id, - file_data=agent_pb2.FilePayload( - path=rel_path, - chunk=chunk, - chunk_index=index, - is_final=is_final, - hash=full_hash if is_final else "" + # Optimization: 4MB Incremental Hashing + Zero Throttling + hasher = hashlib.sha256() + file_size = os.path.getsize(abs_path) + chunk_size = 4 * 1024 * 1024 + total_chunks = (file_size + chunk_size - 1) // chunk_size if file_size > 0 else 1 + + try: + with open(abs_path, "rb") as f: + index = 0 + while True: + chunk = f.read(chunk_size) + if not chunk: break + + hasher.update(chunk) + offset = f.tell() - len(chunk) + is_final = f.tell() >= file_size + + # Compress Chunk for transit + compressed_chunk = zlib.compress(chunk) + + # M6: Use dictionary unpack for safe assignment (robust against old proto versions) + payload_fields = { + "path": rel_path, + "chunk": compressed_chunk, + "chunk_index": index, + "is_final": is_final, + "hash": hasher.hexdigest() if is_final else "", + "offset": offset, + "compressed": True, + } + # Only add new fields if supported by the compiled proto + if hasattr(agent_pb2.FilePayload, "total_chunks"): + payload_fields["total_chunks"] = total_chunks + payload_fields["total_size"] = file_size + + self.task_queue.put(agent_pb2.ClientTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + task_id=task_id, + file_data=agent_pb2.FilePayload(**payload_fields) ) - ) - )) - - import time - time.sleep(0.02) # Throttle to ~25MB/s, preserves gRPC bandwidth for health reporting - - if is_final or not chunk: - break - index += 1 + )) + + if is_final: break + index += 1 + except Exception as e: + print(f" [๐Ÿ“๐Ÿ“ค] Error pushing {rel_path}: {e}") def _handle_task(self, task): print(f"[*] Task Launch: {task.task_id}", flush=True) @@ -663,6 +860,20 @@ def stop(self): """Gracefully stops all background services and skills.""" print(f"\n[๐Ÿ›‘] Stopping Agent Node: {self.node_id}") + self._stop_event.set() + + # 1. Stop Skills self.skills.shutdown() - # Optionally close gRPC channel if we want to be very clean - # self.channel.close() + + # 2. Stop Watcher + self.watcher.shutdown() + + # 3. Shutdown IO Executor + self.io_executor.shutdown(wait=False) + + # 4. Close gRPC channel + if self.channel: + try: + self.channel.close() + except Exception as e: + print(f"[!] Error closing channel: {e}") diff --git a/ai-hub/app/core/grpc/core/mirror.py b/ai-hub/app/core/grpc/core/mirror.py index c452cf5..836f930 100644 --- a/ai-hub/app/core/grpc/core/mirror.py +++ b/ai-hub/app/core/grpc/core/mirror.py @@ -1,6 +1,9 @@ import os import shutil import hashlib +import time +import json +import zlib from typing import Dict, List from app.core.grpc.shared_core.ignore import CortexIgnore from app.protos import agent_pb2 @@ -46,100 +49,214 @@ raise ValueError(f"Malicious path detected: {file_payload.path}") os.makedirs(os.path.dirname(safe_path), exist_ok=True) + tmp_path = safe_path + ".cortex_tmp" + lock_path = safe_path + ".cortex_lock" - mode = "ab" if file_payload.chunk_index > 0 else "wb" - with open(safe_path, mode) as f: - f.write(file_payload.chunk) + # 1. Lock/Metadata initialization or update + if file_payload.chunk_index == 0: + try: + metadata = { + "ts": time.time(), + "owner": "hub", + "path": file_payload.path, + "total_chunks": file_payload.total_chunks, + "total_size": file_payload.total_size, + "received_chunks": [0] + } + with open(lock_path, "w") as lf: + lf.write(json.dumps(metadata)) + except: pass + elif file_payload.chunk_index % 20 == 0: + try: + if os.path.exists(lock_path): + with open(lock_path, "r+") as lf: + meta = json.loads(lf.read()) + if "received_chunks" not in meta: meta["received_chunks"] = [] + if file_payload.chunk_index not in meta["received_chunks"]: + meta["received_chunks"].append(file_payload.chunk_index) + lf.seek(0) + lf.write(json.dumps(meta)) + lf.truncate() + except: pass + + # 2. Decompress and Write Data + data = file_payload.chunk + if file_payload.compressed: + try: data = zlib.decompress(data) + except: pass + + if file_payload.chunk_index == 0: + with open(tmp_path, "wb") as f: + f.write(data) + else: + if not os.path.exists(tmp_path): + # Ensure file exists even if chunk 0 was somehow bypassed (unlikely but safe) + with open(tmp_path, "ab") as f: pass - if file_payload.is_final and file_payload.hash: - self._verify_hash(safe_path, file_payload.hash) + with open(tmp_path, "r+b") as f: + f.seek(file_payload.offset if file_payload.HasField("offset") else 0) + f.write(data) + + if file_payload.is_final: + success = True + if file_payload.hash: + success = self._verify_hash(tmp_path, file_payload.hash) + + if success: + try: + os.replace(tmp_path, safe_path) + print(f" [๐Ÿ“โœ…] Sync Complete: {file_payload.path} (Swapped and verified)") + except Exception as e: + print(f" [๐Ÿ“โŒ] Server atomic swap failed for {file_payload.path}: {e}") + success = False + + if os.path.exists(lock_path): + try: os.remove(lock_path) + except: pass + if os.path.exists(tmp_path) and not success: + try: os.remove(tmp_path) + except: pass + + def delete_file(self, session_id: str, rel_path: str): + """Deletes a file or directory from the local mirror.""" + workspace = self.get_workspace_path(session_id) + path_safe = rel_path.lstrip("/") + safe_path = os.path.normpath(os.path.join(workspace, path_safe)) + + if not safe_path.startswith(workspace): + raise ValueError(f"Malicious path detected: {rel_path}") + + if os.path.exists(safe_path): + if os.path.isdir(safe_path): + shutil.rmtree(safe_path) + else: + os.remove(safe_path) + print(f" [๐Ÿ“๐Ÿ—‘๏ธ] Mirror Sync: Deleted {rel_path}") def generate_manifest(self, session_id: str) -> agent_pb2.DirectoryManifest: - """Generates a manifest of the current local mirror state.""" + """Generates a manifest of the current local mirror state using parallel hashing.""" + from concurrent.futures import ThreadPoolExecutor workspace = self.get_workspace_path(session_id) ignore_filter = self.get_ignore_filter(session_id) - files = [] + + raw_files = [] for root, dirs, filenames in os.walk(workspace): - # Efficiently prune skipped directories dirs[:] = [d for d in dirs if not ignore_filter.is_ignored(os.path.relpath(os.path.join(root, d), workspace))] - for filename in filenames: abs_path = os.path.join(root, filename) rel_path = os.path.relpath(abs_path, workspace) - if ignore_filter.is_ignored(rel_path): continue - - with open(abs_path, "rb") as f: - file_hash = hashlib.sha256(f.read()).hexdigest() - - files.append(agent_pb2.FileInfo( - path=rel_path, - size=os.path.getsize(abs_path), - hash=file_hash, + raw_files.append((abs_path, rel_path)) + + def _hash_file(paths): + abs_p, rel_p = paths + try: + # Memory-safe incremental hashing + h = hashlib.sha256() + with open(abs_p, "rb") as f: + while True: + chunk = f.read(1024 * 1024) # 1MB chunks + if not chunk: break + h.update(chunk) + return agent_pb2.FileInfo( + path=rel_p, + size=os.path.getsize(abs_p), + hash=h.hexdigest(), is_dir=False - )) + ) + except Exception: + return None + + files = [] + with ThreadPoolExecutor(max_workers=min(32, len(raw_files) or 1)) as executor: + results = list(executor.map(_hash_file, raw_files)) + files = [r for r in results if r is not None] + return agent_pb2.DirectoryManifest(root_path=workspace, files=files) - def _verify_hash(self, path: str, expected_hash: str): - if not os.path.exists(path): return - content = open(path, "rb").read() - actual_hash = hashlib.sha256(content).hexdigest() - if actual_hash != expected_hash: - print(f"[โš ๏ธ] Hash Mismatch for {path}: expected {expected_hash}, got {actual_hash}") - - + def _verify_hash(self, path: str, expected_hash: str) -> bool: + if not os.path.exists(path): return False + try: + h = hashlib.sha256() + with open(path, "rb") as f: + while True: + chunk = f.read(1024 * 1024) + if not chunk: break + h.update(chunk) + actual_hash = h.hexdigest() + if actual_hash != expected_hash: + print(f"[โš ๏ธ] Hash Mismatch for {path}: expected {expected_hash}, got {actual_hash}") + return False + return True + except Exception as e: + print(f"[โš ๏ธ] Hash verification error: {e}") + return False def reconcile(self, session_id: str, remote_manifest: agent_pb2.DirectoryManifest) -> List[str]: - """Compares remote manifest with local mirror and returns list of paths missing/changed.""" + """Compares remote manifest with local mirror using parallel verification.""" + from concurrent.futures import ThreadPoolExecutor workspace = self.get_workspace_path(session_id) ignore_filter = self.get_ignore_filter(session_id) expected_paths = {f.path for f in remote_manifest.files} - # 1. Purge extraneous local files and directories (handles Deletions) + # 1. Purge extraneous local files (Legacy remains synchronous as it's metadata-only) manifest_root = remote_manifest.root_path or "." - for root, dirs, files in os.walk(workspace, topdown=False): - for name in files: + for root, dirs, files_in_dir in os.walk(workspace, topdown=False): + for name in files_in_dir: abs_path = os.path.join(root, name) rel_path = os.path.relpath(abs_path, workspace) - is_within_root = (manifest_root == "." or rel_path.startswith(manifest_root + os.sep) or rel_path == manifest_root) if not is_within_root: continue - - if rel_path in [".cortexignore", ".gitignore"]: continue + if rel_path in [".cortexignore", ".gitignore"] or ".cortex_browser" in rel_path: continue if rel_path not in expected_paths and not ignore_filter.is_ignored(rel_path): - try: - os.remove(abs_path) - print(f" [๐Ÿ“๐Ÿ—‘๏ธ] Ghost Mirror deleted extraneous file: {rel_path}") + try: os.remove(abs_path) except Exception: pass for name in dirs: abs_path = os.path.join(root, name) rel_path = os.path.relpath(abs_path, workspace) - is_within_root = (manifest_root == "." or rel_path.startswith(manifest_root + os.sep) or rel_path == manifest_root) if not is_within_root: continue - - if rel_path not in expected_paths and not ignore_filter.is_ignored(rel_path): - try: - if not os.listdir(abs_path): - os.rmdir(abs_path) + if rel_path not in expected_paths and not ignore_filter.is_ignored(rel_path) and ".cortex_browser" not in rel_path: + try: + if not os.listdir(abs_path): os.rmdir(abs_path) except Exception: pass - needs_update = [] - for remote_file in remote_manifest.files: - if remote_file.is_dir: continue - + # 2. Reconcile with parallel hash checks + def _check_drift(remote_file): + if remote_file.is_dir: return None local_path = os.path.join(workspace, remote_file.path) - if not os.path.exists(local_path): - needs_update.append(remote_file.path) - continue - with open(local_path, "rb") as f: - local_hash = hashlib.sha256(f.read()).hexdigest() - if local_hash != remote_file.hash: - needs_update.append(remote_file.path) + if not os.path.exists(local_path): + return remote_file.path + + # Metadata pre-check (Fast path) + try: + if os.path.getsize(local_path) != remote_file.size: + return remote_file.path + except: return remote_file.path + + # Hash check (Slow path) - Incremental + try: + h = hashlib.sha256() + with open(local_path, "rb") as f: + while True: + chunk = f.read(1024 * 1024) + if not chunk: break + h.update(chunk) + if h.hexdigest() != remote_file.hash: + return remote_file.path + except: + return remote_file.path + return None + + needs_update = [] + with ThreadPoolExecutor(max_workers=32) as executor: + drift_results = list(executor.map(_check_drift, remote_manifest.files)) + needs_update = [d for d in drift_results if d is not None] + return needs_update def purge_orphaned(self, active_session_ids: List[str]): @@ -163,3 +280,42 @@ print(f" [๐Ÿ“๐Ÿ—‘๏ธ] Purged orphaned ghost mirror: {entry.name}") except Exception as e: print(f" [๐Ÿ“โš ๏ธ] Failed to purge orphaned mirror {entry.name}: {e}") + + def purge_stale_locks(self, session_id: str, ttl: int = 60): + """Scans a mirror for stale .cortex_lock files and removes them.""" + workspace = self.get_workspace_path(session_id) + now = time.time() + purged = 0 + + for root, _, files in os.walk(workspace): + for f in files: + if f.endswith(".cortex_lock"): + abs_path = os.path.join(root, f) + tmp_path = abs_path.replace(".cortex_lock", ".cortex_tmp") + try: + with open(abs_path, "r") as lf: + data = json.loads(lf.read()) + if now - data.get("ts", 0) > ttl: + if os.path.exists(abs_path): os.remove(abs_path) + if os.path.exists(tmp_path): os.remove(tmp_path) + purged += 1 + print(f" [๐Ÿ“๐Ÿ”“] Purged stale lock/tmp from {data.get('owner')}: {data.get('path')}") + except: + # If unreadable/corrupt, purge it anyway + try: + if os.path.exists(abs_path): os.remove(abs_path) + if os.path.exists(tmp_path): os.remove(tmp_path) + purged += 1 + except: pass + elif f.endswith(".cortex_tmp"): + # Orphaned tmp file check + abs_path = os.path.join(root, f) + lock_path = abs_path.replace(".cortex_tmp", ".cortex_lock") + if not os.path.exists(lock_path): + try: + if now - os.path.getmtime(abs_path) > ttl: + os.remove(abs_path) + purged += 1 + except: pass + if purged > 0: + print(f" [๐Ÿ“๐Ÿงน] Cleaned up {purged} stale locks/tmp files in {session_id}") diff --git a/ai-hub/app/core/grpc/services/grpc_server.py b/ai-hub/app/core/grpc/services/grpc_server.py index a61476f..d25b07b 100644 --- a/ai-hub/app/core/grpc/services/grpc_server.py +++ b/ai-hub/app/core/grpc/services/grpc_server.py @@ -6,10 +6,7 @@ import json import uuid -try: - import requests as _requests -except ImportError: - _requests = None +# removed requests import as we now validate tokens directly import grpc from app.protos import agent_pb2, agent_pb2_grpc @@ -22,11 +19,7 @@ logger = logging.getLogger(__name__) -# M4: Hub HTTP API for invite-token validation -# Calls POST /api/v1/nodes/validate-token before accepting any SyncConfiguration. -PATH_PREFIX = os.getenv("PATH_PREFIX", "/api/v1") -HUB_API_URL = os.getenv("HUB_API_URL", "http://localhost:8000") -HUB_API_PATH = f"{PATH_PREFIX}/nodes/validate-token" +# M4: Token validation is now handled directly via NodeRegistryService class AgentOrchestrator(agent_pb2_grpc.AgentOrchestratorServicer): """Integrated gRPC Servicer for Agent Orchestration within AI Hub.""" @@ -35,12 +28,17 @@ self.journal = TaskJournal() self.pool = GlobalWorkPool() self.mirror = GhostMirrorManager() + self.io_locks = {} # key -> threading.Lock + self.io_locks_lock = threading.Lock() self.assistant = TaskAssistant(self.registry, self.journal, self.pool, self.mirror) self.pool.on_new_work = self._broadcast_work # 4. Mesh Observation (Aggregated Health Dashboard) threading.Thread(target=self._monitor_mesh, daemon=True, name="MeshMonitor").start() + # 5. Mirror Cleanup Loop (Purge archived sessions) + threading.Thread(target=self._mirror_cleanup_loop, daemon=True, name="MirrorCleanup").start() + def _monitor_mesh(self): """Periodically prints status of all nodes in the mesh.""" while True: @@ -62,14 +60,44 @@ except Exception as e: logger.error(f"[MeshMonitor] Error: {e}") + def _mirror_cleanup_loop(self): + """Periodically purges ghost mirrors for archived sessions.""" + while True: + try: + # Run every 10 minutes to avoid high disk I/O + time.sleep(600) + + from app.db.session import get_db_session + from app.db.models import Session + + with get_db_session() as db: + # Fetch all workspace IDs for non-archived sessions + active_sessions = db.query(Session).filter( + Session.is_archived == False, + Session.sync_workspace_id.isnot(None) + ).all() + + active_ids = [s.sync_workspace_id for s in active_sessions] + + if self.mirror: + self.mirror.purge_orphaned(active_ids) + + # M6: Distributed Lock Scavenging + for sid in active_ids: + self.mirror.purge_stale_locks(sid, ttl=60) # 1-minute TTL + + except Exception as e: + print(f"[๐Ÿ“โš ๏ธ] Mirror Cleanup Thread Error: {e}") + def _broadcast_work(self, _): """Pushes work notifications to all active nodes.""" live_nodes = self.registry.list_nodes() for node in live_nodes: logger.debug(f"[๐Ÿ“ข] Broadcasting availability to {node.node_id}") - node.queue.put(agent_pb2.ServerTaskMessage( + # Priority 1: Interactive/Work Dispatch + node.send_message(agent_pb2.ServerTaskMessage( work_pool_update=agent_pb2.WorkPoolUpdate(available_task_ids=self.pool.list_available()) - )) + ), priority=1) def _build_sandbox_policy(self, skill_cfg): """Builds a SandboxPolicy protobuf from raw skill configuration.""" @@ -104,7 +132,8 @@ allowed_commands=allowed, denied_commands=denied, sensitive_commands=sensitive, - working_dir_jail=jail + working_dir_jail=jail, + skill_config_json=json.dumps(skill_cfg) ) def push_policy(self, node_id, skill_config): @@ -124,31 +153,26 @@ def SyncConfiguration(self, request, context): """M4 Authenticated Handshake: Validate invite_token via Hub DB, then send policy.""" node_id = request.node_id - logger.info(f"[๐Ÿ”‘] SyncConfiguration REQUEST from {node_id}") invite_token = request.auth_token + logger.info(f"[๐Ÿ”‘] SyncConfiguration REQUEST from {node_id} (token prefix: {invite_token[:4]}...)") - # --- M4: Token validation via Hub API --- + # --- M4: Token validation via Hub API (M6: switched to direct call to avoid deadlock) --- user_id = "default" - skill_cfg = {} + skill_cfg = { } - if HUB_API_URL and _requests: - try: - resp = _requests.post( - f"{HUB_API_URL}{HUB_API_PATH}", - params={"node_id": node_id, "token": invite_token}, - timeout=5, - ) - payload = resp.json() or {} - if not payload.get("valid"): - reason = payload.get("reason", "Token rejected") or "Token rejected" - logger.warning(f"[๐Ÿ”’] SyncConfiguration REJECTED {node_id}: {reason}") - return agent_pb2.RegistrationResponse(success=False, error_message=reason) - - user_id = payload.get("user_id", "default") - skill_cfg = payload.get("skill_config", {}) - logger.info(f"[๐Ÿ”‘] Token validated for {node_id} (owner: {user_id})") - except Exception as e: - logger.error(f"[โš ๏ธ] Hub token validation unavailable ({e}); proceeding as 'default' user.") + try: + payload = self.registry.validate_invite_token(node_id, invite_token) + if not payload.get("valid"): + reason = payload.get("reason", "Token rejected") or "Token rejected" + logger.warning(f"[๐Ÿ”’] SyncConfiguration REJECTED {node_id}: {reason}") + return agent_pb2.RegistrationResponse(success=False, error_message=reason) + + user_id = payload.get("user_id", "default") + skill_cfg = payload.get("skill_config", {}) + logger.info(f"[๐Ÿ”‘] Token validated for {node_id} (owner: {user_id})") + except Exception as e: + logger.error(f"[๐Ÿ”’] SyncConfiguration FAILED internal validation for {node_id}: {e}") + return agent_pb2.RegistrationResponse(success=False, error_message=f"Internal Hub Error: {e}") policy = self._build_sandbox_policy(skill_cfg) logger.info(f"[๐Ÿ”‘] Handshake successful for {node_id} (owner: {user_id})") @@ -165,25 +189,43 @@ ) def TaskStream(self, request_iterator, context): - """Persistent Bi-directional Stream for Command & Control.""" + """Persistent bi-directional stream for dispatching work and collecting results.""" + # 1. Identification Handshake node_id = "unknown" node = None try: - # 1. Blocking wait for Node Identity first_msg = next(request_iterator) - if first_msg.WhichOneof('payload') != 'announce': - logger.error("[!] Stream rejected: No NodeAnnounce") + if not first_msg.HasField("announce"): + logger.warning("[!] Initial TaskStream message MUST be 'announce'") return node_id = first_msg.announce.node_id - node = self.registry.get_node(node_id) - if not node: - logger.error(f"[!] Stream rejected: Node {node_id} not registered") - node_id = "unknown" # Don't deregister if we didn't find the node - return + logger.info(f"[*] Node {node_id} Attempting to establish TaskStream...") + except StopIteration: + logger.warning("[!] Empty TaskStream received.") + return + except Exception as e: + logger.error(f"[!] TaskStream handshake error: {e}") + return - logger.info(f"[๐Ÿ“ถ] gRPC Stream Online: {node_id}") - self.assistant.reconcile_node(node_id) + node = self.registry.get_node(node_id) + if not node: + logger.warning(f"[!] TaskStream rejected: Node {node_id} not registered via SyncConfiguration.") + return + + logger.info(f"[*] Node {node_id} Online (TaskStream established)") + + try: + self.registry.update_stats(node_id, {"status": "streaming"}) + + # M6: Offload reconciliation to a background thread to prevent blocking the stream initialization + # especially when the database (on NFS) is experiencing latency. + threading.Thread( + target=self.assistant.reconcile_node, + args=(node_id,), + daemon=True, + name=f"Reconcile-{node_id}" + ).start() def _read_results(): try: @@ -198,24 +240,30 @@ last_keepalive = 0 while context.is_active(): try: - msg = node.queue.get(timeout=1.0) + priority_item = node.queue.get(timeout=1.0) + msg = priority_item[2] # Unpack (priority, ts, msg) + if os.getenv("DEBUG_GRPC"): + kind = msg.WhichOneof("payload") + logger.info(f"[DEBUG-gRPC] OUTBOUND to {node_id}: {kind}") yield msg except queue.Empty: now = time.time() - if (now - last_keepalive) > 15.0: + if (now - last_keepalive) > 10.0: last_keepalive = now - if self.pool.available: - yield agent_pb2.ServerTaskMessage( - work_pool_update=agent_pb2.WorkPoolUpdate(available_task_ids=self.pool.list_available()) - ) + # Yield a NOP ping (work pool update) to keep proxy connections hot + yield agent_pb2.ServerTaskMessage( + work_pool_update=agent_pb2.WorkPoolUpdate(available_task_ids=self.pool.list_available()) + ) continue - except StopIteration: pass + except StopIteration: + logger.info(f"[๐Ÿ“ถ] gRPC Stream StopIteration for {node_id}") except Exception as e: logger.error(f"[!] TaskStream Error for {node_id}: {e}") finally: if node_id != "unknown": + logger.warning(f"[๐Ÿ“ถ] gRPC Stream TERMINATED for {node_id}. Cleaning up.") # Fulfill any pending tasks in journal with error immediately - self.journal.fail_node_tasks(node_id, "Edge node disconnected") + self.journal.fail_node_tasks(node_id, f"Node {node_id} gRPC stream closed.") self.registry.deregister(node_id, record=node) def _handle_client_message(self, msg, node_id, node): @@ -233,46 +281,72 @@ task_id = msg.task_claim.task_id success, payload = self.pool.claim(task_id, node_id) - node.queue.put(agent_pb2.ServerTaskMessage( + node.send_message(agent_pb2.ServerTaskMessage( claim_status=agent_pb2.TaskClaimResponse( task_id=task_id, granted=success, reason="Task successfully claimed" if success else "Task already claimed by another node" ) - )) + ), priority=1) # M6: Notify UI that a node is claiming a global task self.registry.emit(node_id, "task_claim", {"task_id": task_id, "granted": success}) if success: sig = sign_payload(payload) - node.queue.put(agent_pb2.ServerTaskMessage( + node.send_message(agent_pb2.ServerTaskMessage( task_request=agent_pb2.TaskRequest( task_id=task_id, payload_json=payload, signature=sig ) - )) + ), priority=1) elif kind == 'task_response': tr = msg.task_response res_obj = {"stdout": tr.stdout, "stderr": tr.stderr, "status": tr.status} if tr.HasField("browser_result"): br = tr.browser_result + + # M6: Support for transparently reloading offloaded data from the sync mirror + a11y_tree = br.a11y_tree + dom_content = br.dom_content + snapshot = br.snapshot + + if br.offloaded: + node_record = self.registry.get_node(node_id) + if node_record: + workspace = self.mirror.get_workspace_path(node_record.session_id) + offload_dir = os.path.join(workspace, ".cortex_browser") + try: + a11y_path = os.path.join(offload_dir, "last_a11y.txt") + dom_path = os.path.join(offload_dir, "last_dom.html") + snap_path = os.path.join(offload_dir, "last_screenshot.png") + + if os.path.exists(a11y_path): + with open(a11y_path, "r") as f: a11y_tree = f.read() + if os.path.exists(dom_path): + with open(dom_path, "r") as f: dom_content = f.read() + if os.path.exists(snap_path): + with open(snap_path, "rb") as f: snapshot = f.read() + logger.info(f"[๐ŸŒ๐Ÿ“] Reloaded offloaded browser state for {node_id} from mirror.") + except Exception as e: + logger.error(f"[๐ŸŒโŒ] Failed to reload offloaded browser state for {node_id}: {e}") + res_obj["browser"] = { "url": br.url, "title": br.title, - "has_snapshot": len(br.snapshot) > 0, + "has_snapshot": len(snapshot) > 0, "eval": br.eval_result, # OpenClaw-inspired: include the full aria role tree for AI reasoning - "aria_snapshot": br.a11y_tree if br.a11y_tree else None, + "aria_snapshot": a11y_tree if a11y_tree else None, } # Flatten key fields to stdout for easy AI consumption - if br.a11y_tree: + if a11y_tree: res_obj["stdout"] = ( f"[Browser] URL: {br.url}\n" f"[Browser] Title: {br.title}\n" f"[Browser] Page Snapshot ({br.eval_result}):\n\n" - f"{br.a11y_tree}" + f"{a11y_tree}" ) elif br.eval_result: res_obj["stdout"] = f"[Browser] {br.url} | {br.title}\nResult: {br.eval_result}" @@ -319,17 +393,22 @@ task_id = fs.task_id if fs.HasField("file_data"): - # M6: Handle interactive 'cat' result correlation - if task_id and task_id.startswith("fs-cat-"): - # For small files in explorer, we fulfill on final chunk - if fs.file_data.is_final: - try: - content = fs.file_data.chunk.decode('utf-8', errors='ignore') - self.journal.fulfill(task_id, {"content": content}) - except Exception as e: - self.journal.fulfill(task_id, {"error": str(e)}) - - self.mirror.write_file_chunk(fs.session_id, fs.file_data) + # Keyed Lock per file to prevent write/hash/swap race + lock_key = f"{fs.session_id}:{fs.file_data.path}" + with self.io_locks_lock: + if lock_key not in self.io_locks: + self.io_locks[lock_key] = threading.Lock() + lock = self.io_locks[lock_key] + + with lock: + self.mirror.write_file_chunk(fs.session_id, fs.file_data) + + # M6: Clean up the lock entry after finalization to prevent memory leak + if fs.file_data.is_final: + with self.io_locks_lock: + if lock_key in self.io_locks: + del self.io_locks[lock_key] + self.assistant.broadcast_file_chunk(fs.session_id, node_id, fs.file_data) # M6: Emit sync progress (rarely to avoid flood, but good for large pushes) if fs.file_data.chunk_index % 10 == 0: @@ -352,7 +431,8 @@ if drifts: print(f" [๐Ÿ“๐Ÿƒ] Drift Detected (Node -> Server): Requesting {len(drifts)} files") # Request node to push these specific files - node.queue.put(agent_pb2.ServerTaskMessage( + # Priority 1: Drift Reconciliation Request + node.send_message(agent_pb2.ServerTaskMessage( file_sync=agent_pb2.FileSyncMessage( session_id=fs.session_id, control=agent_pb2.SyncControl( @@ -361,7 +441,7 @@ request_paths=drifts ) ) - )) + ), priority=1) else: self.registry.emit(node_id, "sync_status", {"message": "Synchronized (Node -> Server)", "code": 0}) @@ -387,6 +467,13 @@ }) for path in fs.status.reconcile_paths: self.assistant.push_file(node_id, fs.session_id, path) + + elif fs.HasField("control"): + ctrl = fs.control + if ctrl.action == agent_pb2.SyncControl.DELETE: + print(f" [๐Ÿ“๐Ÿ—‘๏ธ] Node requested DELETE on mirror: {ctrl.path}") + self.mirror.delete_file(fs.session_id, ctrl.path) + # Broadcast to other nodes (optional M7) def ReportHealth(self, request_iterator, context): """Collect Health Metrics and Feed Policy Updates.""" @@ -423,7 +510,7 @@ options = [ ('grpc.max_receive_message_length', 128 * 1024 * 1024), ('grpc.max_send_message_length', 128 * 1024 * 1024), - ('grpc.keepalive_time_ms', 30000), # Send keepalive ping every 30s + ('grpc.keepalive_time_ms', 10000), # Send keepalive ping every 10s ('grpc.keepalive_timeout_ms', 10000), # Wait 10s for pong ('grpc.keepalive_permit_without_calls', True), ] diff --git a/ai-hub/app/core/orchestration/architect.py b/ai-hub/app/core/orchestration/architect.py index c125b1b..6ccf48a 100644 --- a/ai-hub/app/core/orchestration/architect.py +++ b/ai-hub/app/core/orchestration/architect.py @@ -92,6 +92,7 @@ accumulated_content = "" accumulated_reasoning = "" tool_calls_map = {} + finish_reason = None chunk_count = 0 async for chunk in prediction: @@ -101,6 +102,7 @@ if not chunk.choices: continue delta = chunk.choices[0].delta + finish_reason = getattr(chunk.choices[0], "finish_reason", None) or chunk.choices[0].get("finish_reason") # Native reasoning (O-series) r = getattr(delta, "reasoning_content", None) or delta.get("reasoning_content") @@ -145,7 +147,13 @@ # Branch: Tools or Exit? if not tool_calls_map: - # Final Turn: Yield the accumulated content if it was empty + # 1. Truncation Guard: If the model hit a length limit, we MUST continue + if finish_reason == "length": + yield {"type": "reasoning", "content": "\n> **โš ๏ธ System Note:** Response was truncated by output limits. Prompting for continuation...\n"} + messages.append({"role": "user", "content": "You were cut off mid-sentence/action. Please continue immediately from where you left off. If you were trying to write a large file, use a terminal command instead of mesh_file_explorer."}) + continue + + # 2. Final Turn: Yield the accumulated content if it was empty if not accumulated_content.strip(): import re fallback = "I've completed the requested task." diff --git a/ai-hub/app/core/orchestration/profiles.py b/ai-hub/app/core/orchestration/profiles.py index 487e22f..caef949 100644 --- a/ai-hub/app/core/orchestration/profiles.py +++ b/ai-hub/app/core/orchestration/profiles.py @@ -6,22 +6,23 @@ DEFAULT_PROMPT_TEMPLATE = """You are the Cortex AI Assistant, the **Master-Architect** of a decentralized agent mesh. ## ๐Ÿ—๏ธ Orchestration Strategy (The Master-Worker Pattern): +- **Action-First**: You are an action-oriented agent. If you possess a tool to perform a task, you MUST invoke it in the current turn. Do not wait for a second confirmation. +- **Large Data Rule**: For files larger than 100KB, DO NOT use `mesh_file_explorer`'s `write` action. Instead, use `mesh_terminal_control` with native commands like `dd`, `head`, `base64`, or `cat <` tags. Use standard function calling. ## ๐Ÿš€ Execution Mandate: - **Perpetual Pursuit**: DO NOT stop until the user's objective is achieved. -- **No Idle Turns**: If a sub-goal is reached, immediately pivot to the next atomic task. - **Direct Terminal Answer**: If you possess the information to answer a user's question directly without tools (e.g., questions about your identity or known capabilities), provide the answer and **TERMINATE** the orchestration loop by omitting any further tool calls. - **NO SILENT ACTIONS**: You are **FORBIDDEN** from calling a tool without first providing at least one sentence of **plain text** analysis/strategy. ## โœ๏ธ Interaction Format (MANDATORY PROTOCOL): 1. **TITLE (MANDATORY)**: Your turn **MUST** begin with exactly one line: `Title: Your Specific Objective`. - **CRITICAL**: This line must appear **BEFORE** any `` tags and before any other text. - - **WHY**: This is required for UI synchronization. 2. **BRIDGE ANALYSIS**: Provide 1-2 sentences of auditable analysis. 3. **ACT**: Call the single atomic tool required for your plan.