diff --git a/agent-node/agent_node/node.py b/agent-node/agent_node/node.py index 6decb03..4e11d56 100644 --- a/agent-node/agent_node/node.py +++ b/agent-node/agent_node/node.py @@ -4,8 +4,11 @@ import sys import os import hashlib +import logging import psutil from protos import agent_pb2, agent_pb2_grpc + +logger = logging.getLogger(__name__) from agent_node.skills.manager import SkillManager from agent_node.core.sandbox import SandboxEngine from agent_node.core.sync import NodeSyncManager @@ -206,44 +209,88 @@ # --- M6: FS Explorer Handlers --- elif ctrl.action == agent_pb2.SyncControl.LIST: print(f" [📁📂] List Directory: {ctrl.path}") - self._push_full_manifest(sid, ctrl.path) + self._push_full_manifest(sid, ctrl.path, task_id=fs.task_id, shallow=True) elif ctrl.action == agent_pb2.SyncControl.READ: print(f" [📁📄] Read File: {ctrl.path}") - self._push_file(sid, ctrl.path) + self._push_file(sid, ctrl.path, task_id=fs.task_id) elif ctrl.action == agent_pb2.SyncControl.WRITE: print(f" [📁💾] Write File: {ctrl.path} (is_dir={ctrl.is_dir})") - self._handle_fs_write(sid, ctrl.path, ctrl.content, ctrl.is_dir) + self._handle_fs_write(sid, ctrl.path, ctrl.content, ctrl.is_dir, task_id=fs.task_id) elif ctrl.action == agent_pb2.SyncControl.DELETE: print(f" [📁🗑️] Delete Fragment: {ctrl.path}") - self._handle_fs_delete(sid, ctrl.path) + self._handle_fs_delete(sid, ctrl.path, task_id=fs.task_id) - def _push_full_manifest(self, session_id, rel_path="."): + def _push_full_manifest(self, session_id, rel_path=".", task_id="", shallow=False): """Pushes the current local manifest back to the server.""" - print(f" [📁📤] Pushing Full Manifest for {session_id}") - watch_path = rel_path if os.path.isabs(rel_path) else os.path.join(self.sync_mgr.get_session_dir(session_id), rel_path) + print(f" [📁📤] Pushing {'Shallow' if shallow else 'Full'} Manifest for {session_id}") - # We need a manifest generator similar to GhostMirrorManager but on the node - # For Phase 3, we'll implement a simple one here + # M6: If __fs_explorer__, we are browsing the root sync dir, otherwise session-scoped + if session_id == "__fs_explorer__": + base_dir = self.sync_mgr.base_sync_dir + else: + base_dir = self.sync_mgr.get_session_dir(session_id) + + watch_path = rel_path if os.path.isabs(rel_path) else os.path.normpath(os.path.join(base_dir, rel_path)) + + if not os.path.exists(watch_path): + self.task_queue.put(agent_pb2.ClientTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id=session_id, + task_id=task_id, + status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.ERROR, message=f"Path {rel_path} not found") + ) + )) + return + files = [] - for root, dirs, filenames in os.walk(watch_path): - for filename in filenames: - abs_path = os.path.join(root, filename) - r_path = os.path.relpath(abs_path, watch_path) - with open(abs_path, "rb") as f: - h = hashlib.sha256(f.read()).hexdigest() - files.append(agent_pb2.FileInfo(path=r_path, size=os.path.getsize(abs_path), hash=h)) - + try: + if shallow: + # Optimized for Explorer: immediate children only, no hashing + with os.scandir(watch_path) as it: + for entry in it: + is_dir = entry.is_dir() + # Use metadata only + try: + stats = entry.stat() + size = stats.st_size if not is_dir else 0 + except: size = 0 + files.append(agent_pb2.FileInfo(path=entry.name, size=size, hash="", is_dir=is_dir)) + else: + # Deep walk with full hashes for reconciliation + for root, dirs, filenames in os.walk(watch_path): + for filename in filenames: + abs_path = os.path.join(root, filename) + r_path = os.path.relpath(abs_path, watch_path) + try: + with open(abs_path, "rb") as f: + h = hashlib.sha256(f.read()).hexdigest() + files.append(agent_pb2.FileInfo(path=r_path, size=os.path.getsize(abs_path), hash=h, is_dir=False)) + except Exception as e: + print(f" [⚠️] Failed to hash {abs_path}: {e}") + + for d in dirs: + abs_path = os.path.join(root, d) + r_path = os.path.relpath(abs_path, watch_path) + files.append(agent_pb2.FileInfo(path=r_path, size=0, hash="", is_dir=True)) + except Exception as e: + logger.error(f"Manifest generation error: {e}") + self.task_queue.put(agent_pb2.ClientTaskMessage( file_sync=agent_pb2.FileSyncMessage( session_id=session_id, + task_id=task_id, manifest=agent_pb2.DirectoryManifest(root_path=rel_path, files=files) ) )) - def _handle_fs_write(self, session_id, rel_path, content, is_dir): + def _handle_fs_write(self, session_id, rel_path, content, is_dir, task_id=""): """Modular FS Write/Create.""" try: - base_dir = self.sync_mgr.get_session_dir(session_id) + if session_id == "__fs_explorer__": + base_dir = self.sync_mgr.base_sync_dir + else: + base_dir = self.sync_mgr.get_session_dir(session_id) + target_path = os.path.normpath(os.path.join(base_dir, rel_path)) if not target_path.startswith(base_dir): raise Exception("Path traversal attempt blocked") @@ -259,23 +306,29 @@ self.task_queue.put(agent_pb2.ClientTaskMessage( file_sync=agent_pb2.FileSyncMessage( session_id=session_id, + task_id=task_id, status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.OK, message=f"{'Directory' if is_dir else 'File'} written") ) )) # Trigger manifest refresh so UI updates - self._push_full_manifest(session_id, os.path.dirname(rel_path) or ".") + self._push_full_manifest(session_id, os.path.dirname(rel_path) or ".", task_id=task_id, shallow=True) except Exception as e: self.task_queue.put(agent_pb2.ClientTaskMessage( file_sync=agent_pb2.FileSyncMessage( session_id=session_id, + task_id=task_id, status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.ERROR, message=str(e)) ) )) - def _handle_fs_delete(self, session_id, rel_path): + def _handle_fs_delete(self, session_id, rel_path, task_id=""): """Modular FS Delete.""" try: - base_dir = self.sync_mgr.get_session_dir(session_id) + if session_id == "__fs_explorer__": + base_dir = self.sync_mgr.base_sync_dir + else: + base_dir = self.sync_mgr.get_session_dir(session_id) + target_path = os.path.normpath(os.path.join(base_dir, rel_path)) if not target_path.startswith(base_dir): raise Exception("Path traversal attempt blocked") @@ -293,27 +346,36 @@ self.task_queue.put(agent_pb2.ClientTaskMessage( file_sync=agent_pb2.FileSyncMessage( session_id=session_id, + task_id=task_id, status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.OK, message=f"Deleted {rel_path}") ) )) # Trigger manifest refresh so UI updates - self._push_full_manifest(session_id, os.path.dirname(rel_path) or ".") + self._push_full_manifest(session_id, os.path.dirname(rel_path) or ".", task_id=task_id, shallow=True) except Exception as e: self.task_queue.put(agent_pb2.ClientTaskMessage( file_sync=agent_pb2.FileSyncMessage( session_id=session_id, + task_id=task_id, status=agent_pb2.SyncStatus(code=agent_pb2.SyncStatus.ERROR, message=str(e)) ) )) - def _push_file(self, session_id, rel_path): + def _push_file(self, session_id, rel_path, task_id=""): """Pushes a specific file from node to server.""" - watch_path = self.watcher.get_watch_path(session_id) - if not watch_path: - # Fallback to sync dir if watcher not started - watch_path = self.sync_mgr.get_session_dir(session_id) + if session_id == "__fs_explorer__": + watch_path = self.sync_mgr.base_sync_dir + else: + watch_path = self.watcher.get_watch_path(session_id) + if not watch_path: + # Fallback to sync dir if watcher not started + watch_path = self.sync_mgr.get_session_dir(session_id) - abs_path = os.path.join(watch_path, rel_path) + abs_path = os.path.normpath(os.path.join(watch_path, rel_path)) + if not abs_path.startswith(watch_path): + print(f" [📁🚫] Blocked traversal attempt in _push_file: {rel_path}") + return + if not os.path.exists(abs_path): print(f" [📁❓] Requested file {rel_path} not found on node") return @@ -331,6 +393,7 @@ self.task_queue.put(agent_pb2.ClientTaskMessage( file_sync=agent_pb2.FileSyncMessage( session_id=session_id, + task_id=task_id, file_data=agent_pb2.FilePayload( path=rel_path, chunk=chunk, diff --git a/ai-hub/app/app.py b/ai-hub/app/app.py index d49b894..418648e 100644 --- a/ai-hub/app/app.py +++ b/ai-hub/app/app.py @@ -50,6 +50,7 @@ server, orchestrator = serve_grpc(registry, port=50051) app.state.grpc_server = server app.state.orchestrator = orchestrator + app.state.services.with_service("orchestrator", orchestrator) logger.info("[M6] Agent Orchestrator gRPC server started on port 50051.") except Exception as e: logger.error(f"[M6] Failed to start gRPC server: {e}")