diff --git a/agent-node/src/agent_node/core/watcher.py b/agent-node/src/agent_node/core/watcher.py index 69dd04a..5285f09 100644 --- a/agent-node/src/agent_node/core/watcher.py +++ b/agent-node/src/agent_node/core/watcher.py @@ -173,7 +173,7 @@ self.last_sync[rel_path] = file_hash file_size = os.path.getsize(abs_path) - chunk_size = 1024 * 1024 # 1MB buffer for hashing/stream + chunk_size = 4 * 1024 * 1024 # 4MB buffer for hashing/stream total_chunks = (file_size + chunk_size - 1) // chunk_size if file_size > 0 else 1 print(f" [📁📤] Streaming Sync Started: {rel_path} ({file_size} bytes)") diff --git a/agent-node/src/agent_node/node.py b/agent-node/src/agent_node/node.py index 8b54593..d56b806 100644 --- a/agent-node/src/agent_node/node.py +++ b/agent-node/src/agent_node/node.py @@ -33,7 +33,8 @@ self.sync_mgr = NodeSyncManager() self.skills = SkillManager(max_workers=config.MAX_SKILL_WORKERS, sync_mgr=self.sync_mgr) self.watcher = WorkspaceWatcher(self._on_sync_delta) - self.task_queue = queue.Queue(maxsize=10000) # Prevents fast PTY/Worker deadlock + # Bounded queue to prevent memory ballooning; 250 * 4MB chunks = 1GB max memory. + self.task_queue = queue.Queue(maxsize=250) self.stub = None self.channel = None self._stop_event = threading.Event() diff --git a/ai-hub/app/core/grpc/services/assistant.py b/ai-hub/app/core/grpc/services/assistant.py index 5c9e8f9..bd4c443 100644 --- a/ai-hub/app/core/grpc/services/assistant.py +++ b/ai-hub/app/core/grpc/services/assistant.py @@ -212,13 +212,13 @@ ) ), priority=2) - # M6: Use registry executor if available for parallel mesh broadcast - if self.registry.executor: - for nid in destinations: - self.registry.executor.submit(_send_to_node, nid) - else: - for nid in destinations: - _send_to_node(nid) + # M6: We MUST execute synchronously here instead of using the registry executor. + # If we offload to an unbounded thread pool, the inbound sender will push chunks faster + # than the outbound connection can send them, queuing gigabytes of protobufs in memory + # and causing the Hub to OOM instantly on large files. + # Running synchronously provides natural backpressure to the sender queue. + for nid in destinations: + _send_to_node(nid) def broadcast_delete(self, session_id: str, sender_node_id: str, rel_path: str): """Broadcasts a file deletion from one node to all other nodes in the session mesh.""" diff --git a/ai-hub/app/core/services/node_registry.py b/ai-hub/app/core/services/node_registry.py index 971fb02..49200e6 100644 --- a/ai-hub/app/core/services/node_registry.py +++ b/ai-hub/app/core/services/node_registry.py @@ -52,8 +52,8 @@ self.node_id = node_id self.user_id = user_id # Owner — maps node to a Hub user self.metadata = metadata # desc, caps (capabilities dict) - # Increased queue size to 1000 to handle high-concurrency file sync without dropping interactive tasks - self.queue: queue.PriorityQueue = queue.PriorityQueue(maxsize=1000) + # Bounded queue to prevent memory ballooning; 100 * 4MB chunks = 400MB max memory per connected node + self.queue: queue.PriorityQueue = queue.PriorityQueue(maxsize=100) self.stats: dict = { "active_worker_count": 0, "cpu_usage_percent": 0.0, @@ -75,7 +75,9 @@ def _blocking_put(): try: - self.queue.put(item, block=True, timeout=2.0) + # 300s timeout provides absolute maximum backpressure for large file sync across slow VPNs + # preventing dropped chunks while still protecting against permanent deadlocks. + self.queue.put(item, block=True, timeout=300.0) except queue.Full: logger.warning(f"[📋⚠️] Message dropped for {self.node_id}: outbound queue FULL. Node may be unresponsive.") except Exception as e: