diff --git a/agent-node/src/agent_node/node.py b/agent-node/src/agent_node/node.py index da7c652..609e489 100644 --- a/agent-node/src/agent_node/node.py +++ b/agent-node/src/agent_node/node.py @@ -203,24 +203,58 @@ except: self.io_semaphore.release() def _on_sync_control(self, sid, ctrl, task_id): - """Handles sync control actions like watching, locking, or directory listing.""" - action = ctrl.action - if action == agent_pb2.SyncControl.START_WATCHING: - self.watcher.start_watching(sid, ctrl.path if os.path.isabs(ctrl.path) else os.path.join(self.sync_mgr.get_session_dir(sid), ctrl.path)) - elif action == agent_pb2.SyncControl.STOP_WATCHING: self.watcher.stop_watching(sid) - elif action == agent_pb2.SyncControl.LOCK: self.watcher.set_lock(sid, True) - elif action == agent_pb2.SyncControl.UNLOCK: self.watcher.set_lock(sid, False) - elif action in (agent_pb2.SyncControl.REFRESH_MANIFEST, agent_pb2.SyncControl.RESYNC): - if ctrl.request_paths: - for p in ctrl.request_paths: self.io_executor.submit(self._push_file, sid, p) - else: self._push_full_manifest(sid, ctrl.path) - elif action == agent_pb2.SyncControl.PURGE: - self.watcher.stop_watching(sid) - self.sync_mgr.purge(sid) - elif action == agent_pb2.SyncControl.LIST: self._push_full_manifest(sid, ctrl.path, task_id=task_id, shallow=True) - elif action == agent_pb2.SyncControl.READ: self._push_file(sid, ctrl.path, task_id=task_id) - elif action == agent_pb2.SyncControl.WRITE: self._handle_fs_write(sid, ctrl.path, ctrl.content, ctrl.is_dir, task_id=task_id) - elif action == agent_pb2.SyncControl.DELETE: self._handle_fs_delete(sid, ctrl.path, task_id=task_id) + router = { + agent_pb2.SyncControl.START_WATCHING: self._ctrl_start_watching, + agent_pb2.SyncControl.STOP_WATCHING: self._ctrl_stop_watching, + agent_pb2.SyncControl.LOCK: self._ctrl_lock, + agent_pb2.SyncControl.UNLOCK: self._ctrl_unlock, + agent_pb2.SyncControl.REFRESH_MANIFEST: self._ctrl_refresh_manifest, + agent_pb2.SyncControl.RESYNC: self._ctrl_refresh_manifest, + agent_pb2.SyncControl.PURGE: self._ctrl_purge, + agent_pb2.SyncControl.LIST: self._ctrl_list, + agent_pb2.SyncControl.READ: self._ctrl_read, + agent_pb2.SyncControl.WRITE: self._ctrl_write, + agent_pb2.SyncControl.DELETE: self._ctrl_delete + } + + handler = router.get(ctrl.action) + if handler: + handler(sid, ctrl, task_id) + + def _ctrl_start_watching(self, sid, ctrl, task_id): + self.watcher.start_watching(sid, ctrl.path if os.path.isabs(ctrl.path) else os.path.join(self.sync_mgr.get_session_dir(sid), ctrl.path)) + + def _ctrl_stop_watching(self, sid, ctrl, task_id): + self.watcher.stop_watching(sid) + + def _ctrl_lock(self, sid, ctrl, task_id): + self.watcher.set_lock(sid, True) + + def _ctrl_unlock(self, sid, ctrl, task_id): + self.watcher.set_lock(sid, False) + + def _ctrl_refresh_manifest(self, sid, ctrl, task_id): + if ctrl.request_paths: + for p in ctrl.request_paths: + self.io_executor.submit(self._push_file, sid, p) + else: + self._push_full_manifest(sid, ctrl.path) + + def _ctrl_purge(self, sid, ctrl, task_id): + self.watcher.stop_watching(sid) + self.sync_mgr.purge(sid) + + def _ctrl_list(self, sid, ctrl, task_id): + self._push_full_manifest(sid, ctrl.path, task_id=task_id, shallow=True) + + def _ctrl_read(self, sid, ctrl, task_id): + self._push_file(sid, ctrl.path, task_id=task_id) + + def _ctrl_write(self, sid, ctrl, task_id): + self._handle_fs_write(sid, ctrl.path, ctrl.content, ctrl.is_dir, task_id=task_id) + + def _ctrl_delete(self, sid, ctrl, task_id): + self._handle_fs_delete(sid, ctrl.path, task_id=task_id) def _get_base_dir(self, session_id, create=False): """Resolves the session's effective root directory.""" diff --git a/ai-hub/app/core/grpc/services/grpc_server.py b/ai-hub/app/core/grpc/services/grpc_server.py index 24fbb23..8e079e8 100644 --- a/ai-hub/app/core/grpc/services/grpc_server.py +++ b/ai-hub/app/core/grpc/services/grpc_server.py @@ -372,18 +372,11 @@ to_purge = [k for k in self.io_locks.keys() if k.startswith(f"session:")] # More precise check needed if we have session ids # Currently io_locks are keyed as "{session_id}:{path}" # We can't easily map back to node_id here without more state, - # but we can rely on the 10-minute periodic worker for an absolute safety net. - pass - - # Hand off failure handling to the Assistant's Recovery Policy (Ownership Shift) - self.assistant.recover_node_tasks(node_id) self.registry.deregister(node_id, record=node) - self.mesh_core.deregister_node(node_id) def _handle_client_message(self, msg, node_id, node): kind = msg.WhichOneof('payload') if os.getenv("DEBUG_GRPC"): - # Exclude large blobs like terminal_out or file_data from full log unless needed if kind == 'skill_event' and msg.skill_event.WhichOneof('data') == 'terminal_out': logger.info(f"[DEBUG-gRPC] INBOUND from {node_id}: {kind} (terminal_out, size={len(msg.skill_event.terminal_out)})") elif kind == 'file_sync' and msg.file_sync.HasField('file_data'): @@ -391,178 +384,180 @@ else: logger.info(f"[DEBUG-gRPC] INBOUND from {node_id}: {kind} | {msg}") - if kind == 'task_claim': - task_id = msg.task_claim.task_id - success, payload = self.pool.claim(task_id, node_id) - - node.send_message(agent_pb2.ServerTaskMessage( - claim_status=agent_pb2.TaskClaimResponse( + router = { + 'task_claim': self._handle_task_claim, + 'task_response': self._handle_task_response, + 'skill_event': self._handle_skill_event, + 'file_sync': self._handle_file_sync + } + + handler = router.get(kind) + if handler: + handler(msg, node_id, node) + else: + logger.warning(f"[gRPC] Unhandled client message kind: {kind}") + + def _handle_task_claim(self, msg, node_id, node): + task_id = msg.task_claim.task_id + success, payload = self.pool.claim(task_id, node_id) + + node.send_message(agent_pb2.ServerTaskMessage( + claim_status=agent_pb2.TaskClaimResponse( + task_id=task_id, + granted=success, + reason="Task successfully claimed" if success else "Task already claimed by another node" + ) + ), priority=1) + self.registry.emit(node_id, "task_claim", {"task_id": task_id, "granted": success}) + + if success: + sig = sign_payload(payload) + self.mesh_core.dispatch(node_id, agent_pb2.ServerTaskMessage( + task_request=agent_pb2.TaskRequest( task_id=task_id, - granted=success, - reason="Task successfully claimed" if success else "Task already claimed by another node" + payload_json=payload, + signature=sig ) ), priority=1) - # M6: Notify UI that a node is claiming a global task - self.registry.emit(node_id, "task_claim", {"task_id": task_id, "granted": success}) - if success: - sig = sign_payload(payload) - self.mesh_core.dispatch(node_id, agent_pb2.ServerTaskMessage( - task_request=agent_pb2.TaskRequest( - task_id=task_id, - payload_json=payload, - signature=sig - ) - ), priority=1) + def _handle_task_response(self, msg, node_id, node): + tr = msg.task_response + res_obj = {"stdout": tr.stdout, "stderr": tr.stderr, "status": tr.status} + self.journal.fulfill(tr.task_id, res_obj) - elif kind == 'task_response': - tr = msg.task_response - res_obj = {"stdout": tr.stdout, "stderr": tr.stderr, "status": tr.status} - self.journal.fulfill(tr.task_id, res_obj) - - # M6: Emit to EventBus for UI streaming - event_type = "task_complete" if tr.status == agent_pb2.TaskResponse.SUCCESS else "task_error" - self.registry.emit(node_id, event_type, res_obj, task_id=tr.task_id) - - elif kind == 'skill_event': - se = msg.skill_event - data_kind = se.WhichOneof('data') - event_data = {"session_id": se.session_id, "task_id": se.task_id, "skill": "shell"} - - if data_kind == 'terminal_out': - event_data["data"] = se.terminal_out - event_data["type"] = "output" - # NEW: Capture stream buffer in journal for tool callback/timeout - if se.task_id: - self.journal.append_stream(se.task_id, se.terminal_out) - elif data_kind == 'prompt': - event_data["data"] = se.prompt - event_data["type"] = "prompt" - - # EDGE INTELLIGENCE: Signal SubAgent a prompt was detected - if se.task_id: - self.journal.signal_prompt(se.task_id) - - self.registry.emit(node_id, "skill_event", event_data) - + event_type = "task_complete" if tr.status == agent_pb2.TaskResponse.SUCCESS else "task_error" + self.registry.emit(node_id, event_type, res_obj, task_id=tr.task_id) - elif kind == 'file_sync': - fs = msg.file_sync - task_id = fs.task_id - - if fs.HasField("file_data"): - # Keyed Lock per file to prevent write/hash/swap race - lock_key = f"{fs.session_id}:{fs.file_data.path}" - with self.io_locks_lock: - if lock_key not in self.io_locks: - self.io_locks[lock_key] = threading.Lock() - lock = self.io_locks[lock_key] - - with lock: - self.mirror.write_file_chunk(fs.session_id, fs.file_data) - - # M6: Clean up the lock entry after finalization to prevent memory leak - if fs.file_data.is_final: - with self.io_locks_lock: - if lock_key in self.io_locks: - del self.io_locks[lock_key] - - self.assistant.broadcast_file_chunk(fs.session_id, node_id, fs.file_data) - # M6: Emit sync progress (rarely to avoid flood, but good for large pushes) - if fs.file_data.chunk_index % 10 == 0: - self.registry.emit(node_id, "sync_progress", {"path": fs.file_data.path, "chunk": fs.file_data.chunk_index}) - - elif fs.HasField("manifest"): - logger.debug(f"[📁📥] Received Manifest from {node_id} for {fs.session_id} (chunk: {fs.manifest.chunk_index}, final: {fs.manifest.is_final})") - - key = (node_id, fs.session_id) - with self.manifest_accumulators_lock: - if key not in self.manifest_accumulators: - self.manifest_accumulators[key] = [] - self.manifest_accumulators[key].extend(fs.manifest.files) - - is_final = fs.manifest.is_final - root_path = fs.manifest.root_path - - if is_final: - accumulated_files = self.manifest_accumulators.pop(key) - - if is_final: - # Create a complete manifest object for downstream processing - full_manifest = agent_pb2.DirectoryManifest( - root_path=root_path, - files=accumulated_files - ) - - # M6: Handle interactive 'ls' result correlation - if task_id and task_id.startswith("fs-ls-"): - files = [ - {"path": f.path, "name": os.path.basename(f.path) or f.path, "is_dir": f.is_dir, "size": f.size} - for f in accumulated_files - ] - self.journal.fulfill(task_id, {"files": files, "path": root_path}) - - # M6: Only reconcile for real user sessions, not for the modular explorer. - if fs.session_id != "__fs_explorer__": - drifts = None - # Do not reconcile on shallow manifests triggered by interactive FS tools - if task_id and any(task_id.startswith(p) for p in ("fs-ls-", "fs-write-", "fs-rm-")): - pass - else: - drifts = self.mirror.reconcile(fs.session_id, full_manifest) - if drifts: - logger.info(f"[📁🏃] Drift Detected (Node -> Server): Requesting {len(drifts)} files") - # Request node to push these specific files - # Priority 1: Drift Reconciliation Request - self.mesh_core.dispatch(node_id, agent_pb2.ServerTaskMessage( - file_sync=agent_pb2.FileSyncMessage( - session_id=fs.session_id, - control=agent_pb2.SyncControl( - action=agent_pb2.SyncControl.REFRESH_MANIFEST, - path=root_path, - request_paths=drifts - ) - ) - ), priority=1) - else: - self.registry.emit(node_id, "sync_status", {"message": "Synchronized (Node -> Server)", "code": 0}) + def _handle_skill_event(self, msg, node_id, node): + se = msg.skill_event + data_kind = se.WhichOneof('data') + event_data = {"session_id": se.session_id, "task_id": se.task_id, "skill": "shell"} + + if data_kind == 'terminal_out': + event_data["data"] = se.terminal_out + event_data["type"] = "output" + if se.task_id: + self.journal.append_stream(se.task_id, se.terminal_out) + elif data_kind == 'prompt': + event_data["data"] = se.prompt + event_data["type"] = "prompt" + if se.task_id: + self.journal.signal_prompt(se.task_id) + + self.registry.emit(node_id, "skill_event", event_data) - elif fs.HasField("status"): - logger.debug(f"[📁] Sync Status from {node_id}: {fs.status.message}") - - # M6: Handle interactive write/rm/ls/cat result correlation from node-side error status - if task_id and task_id.startswith("fs-"): - success = fs.status.code == 0 - if not success: - self.journal.fulfill(task_id, {"error": fs.status.message}) - elif task_id.startswith("fs-write-") or task_id.startswith("fs-rm-"): - self.journal.fulfill(task_id, {"success": True, "message": fs.status.message}) - - self.registry.emit(node_id, "sync_status", {"message": fs.status.message, "code": fs.status.code}) - if fs.status.code == agent_pb2.SyncStatus.RECONCILE_REQUIRED: - # Phase 3: Notify active orchestrators of drift - self.registry.emit(node_id, "mesh_observation", { - "type": "drift_warning", - "session_id": fs.session_id, - "paths": list(fs.status.reconcile_paths), - "message": f"Critical Drift: Node {node_id} has hash mismatches in {fs.session_id}." - }) - for path in fs.status.reconcile_paths: - self.assistant.push_file(node_id, fs.session_id, path) + def _handle_file_sync(self, msg, node_id, node): + fs = msg.file_sync + task_id = fs.task_id + + if fs.HasField("file_data"): + self._process_file_data(fs, node_id, task_id) + elif fs.HasField("manifest"): + self._process_file_manifest(fs, node_id, task_id) + elif fs.HasField("status"): + self._process_file_status(fs, node_id, task_id) + elif fs.HasField("control"): + self._process_file_control(fs, node_id, task_id) + + def _process_file_data(self, fs, node_id, task_id): + lock_key = f"{fs.session_id}:{fs.file_data.path}" + with self.io_locks_lock: + if lock_key not in self.io_locks: + self.io_locks[lock_key] = threading.Lock() + lock = self.io_locks[lock_key] + + with lock: + self.mirror.write_file_chunk(fs.session_id, fs.file_data) + + if fs.file_data.is_final: + with self.io_locks_lock: + if lock_key in self.io_locks: + del self.io_locks[lock_key] + + self.assistant.broadcast_file_chunk(fs.session_id, node_id, fs.file_data) + if fs.file_data.chunk_index % 10 == 0: + self.registry.emit(node_id, "sync_progress", {"path": fs.file_data.path, "chunk": fs.file_data.chunk_index}) + + def _process_file_manifest(self, fs, node_id, task_id): + logger.debug(f"[📁📥] Received Manifest from {node_id} for {fs.session_id} (chunk: {fs.manifest.chunk_index}, final: {fs.manifest.is_final})") + + key = (node_id, fs.session_id) + with self.manifest_accumulators_lock: + if key not in self.manifest_accumulators: + self.manifest_accumulators[key] = [] + self.manifest_accumulators[key].extend(fs.manifest.files) - elif fs.HasField("control"): - ctrl = fs.control - if ctrl.action == agent_pb2.SyncControl.DELETE: - path_to_del = ctrl.path - # Guard: never process deletes for internal temp/lock files. - # Nodes may fire spurious DELETE events for these when os.replace() is used. - if path_to_del.endswith(".cortex_tmp") or path_to_del.endswith(".cortex_lock"): - print(f" [📁🚫] Ignored temp/lock DELETE from {node_id}: {path_to_del}") - else: - logger.debug(f"[📁🗑️] Node requested DELETE on mirror: {path_to_del}") - self.mirror.delete_file(fs.session_id, path_to_del) - # Broadcast delete to all other session nodes for mesh consistency - self.assistant.broadcast_delete(fs.session_id, node_id, path_to_del) + is_final = fs.manifest.is_final + root_path = fs.manifest.root_path + + if is_final: + accumulated_files = self.manifest_accumulators.pop(key) + + if is_final: + full_manifest = agent_pb2.DirectoryManifest( + root_path=root_path, + files=accumulated_files + ) + + if task_id and task_id.startswith("fs-ls-"): + files = [ + {"path": f.path, "name": os.path.basename(f.path) or f.path, "is_dir": f.is_dir, "size": f.size} + for f in accumulated_files + ] + self.journal.fulfill(task_id, {"files": files, "path": root_path}) + + if fs.session_id != "__fs_explorer__": + drifts = None + if task_id and any(task_id.startswith(p) for p in ("fs-ls-", "fs-write-", "fs-rm-")): + pass + else: + drifts = self.mirror.reconcile(fs.session_id, full_manifest) + if drifts: + logger.info(f"[📁🏃] Drift Detected (Node -> Server): Requesting {len(drifts)} files") + self.mesh_core.dispatch(node_id, agent_pb2.ServerTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=fs.session_id, + control=agent_pb2.SyncControl( + action=agent_pb2.SyncControl.REFRESH_MANIFEST, + path=root_path, + request_paths=drifts + ) + ) + ), priority=1) + else: + self.registry.emit(node_id, "sync_status", {"message": "Synchronized (Node -> Server)", "code": 0}) + + def _process_file_status(self, fs, node_id, task_id): + logger.debug(f"[📁] Sync Status from {node_id}: {fs.status.message}") + + if task_id and task_id.startswith("fs-"): + success = fs.status.code == 0 + if not success: + self.journal.fulfill(task_id, {"error": fs.status.message}) + elif task_id.startswith("fs-write-") or task_id.startswith("fs-rm-"): + self.journal.fulfill(task_id, {"success": True, "message": fs.status.message}) + + self.registry.emit(node_id, "sync_status", {"message": fs.status.message, "code": fs.status.code}) + if fs.status.code == agent_pb2.SyncStatus.RECONCILE_REQUIRED: + self.registry.emit(node_id, "mesh_observation", { + "type": "drift_warning", + "session_id": fs.session_id, + "paths": list(fs.status.reconcile_paths), + "message": f"Critical Drift: Node {node_id} has hash mismatches in {fs.session_id}." + }) + for path in fs.status.reconcile_paths: + self.assistant.push_file(node_id, fs.session_id, path) + + def _process_file_control(self, fs, node_id, task_id): + ctrl = fs.control + if ctrl.action == agent_pb2.SyncControl.DELETE: + path_to_del = ctrl.path + if path_to_del.endswith(".cortex_tmp") or path_to_del.endswith(".cortex_lock"): + print(f" [📁🚫] Ignored temp/lock DELETE from {node_id}: {path_to_del}") + else: + logger.debug(f"[📁🗑️] Node requested DELETE on mirror: {path_to_del}") + self.mirror.delete_file(fs.session_id, path_to_del) + self.assistant.broadcast_delete(fs.session_id, node_id, path_to_del) def ReportHealth(self, request_iterator, context): """Collect Health Metrics and Feed Policy Updates."""