diff --git a/ai-hub/app/core/grpc/services/assistant.py b/ai-hub/app/core/grpc/services/assistant.py index bd4c443..c0f45f9 100644 --- a/ai-hub/app/core/grpc/services/assistant.py +++ b/ai-hub/app/core/grpc/services/assistant.py @@ -212,14 +212,13 @@ ) ), priority=2) - # M6: We MUST execute synchronously here instead of using the registry executor. - # If we offload to an unbounded thread pool, the inbound sender will push chunks faster - # than the outbound connection can send them, queuing gigabytes of protobufs in memory - # and causing the Hub to OOM instantly on large files. - # Running synchronously provides natural backpressure to the sender queue. + # M6: Use bounded registry executor for parallel mesh broadcast. + # Max queue size on the BoundedThreadPoolExecutor ensures backpressure, preventing Hub OOM on 2GB+ meshes. for nid in destinations: - _send_to_node(nid) - + if getattr(self.registry, 'executor', None): + self.registry.executor.submit(_send_to_node, nid) + else: + _send_to_node(nid) def broadcast_delete(self, session_id: str, sender_node_id: str, rel_path: str): """Broadcasts a file deletion from one node to all other nodes in the session mesh.""" with self.membership_lock: diff --git a/ai-hub/app/core/services/node_registry.py b/ai-hub/app/core/services/node_registry.py index 49200e6..9a265d7 100644 --- a/ai-hub/app/core/services/node_registry.py +++ b/ai-hub/app/core/services/node_registry.py @@ -19,6 +19,18 @@ from datetime import datetime from typing import Dict, Optional, List, Any from concurrent.futures import ThreadPoolExecutor +import queue + +class BoundedThreadPoolExecutor(ThreadPoolExecutor): + """ + A ThreadPoolExecutor with a bounded queue size to provide natural backpressure. + The default ThreadPoolExecutor uses an unbounded SimpleQueue which can cause OOMs + when a fast source produces tasks faster than workers can consume them. + """ + def __init__(self, max_workers=None, thread_name_prefix='', max_queue_size=1000): + super().__init__(max_workers=max_workers, thread_name_prefix=thread_name_prefix) + # Bounding the internal work queue gives us backpressure when submit() is called + self._work_queue = queue.Queue(maxsize=max_queue_size) logger = logging.getLogger(__name__) @@ -152,7 +164,8 @@ self._FLAP_WINDOW_S = 60 self._FLAP_THRESHOLD = 5 # Shared Hub-wide work executor to prevent thread-spawning leaks - self.executor = ThreadPoolExecutor(max_workers=20, thread_name_prefix="RegistryWorker") + # max_queue_size=200 means we max buffer 200 file chunks = ~800MB before putting backpressure on sender + self.executor = BoundedThreadPoolExecutor(max_workers=20, thread_name_prefix="RegistryWorker", max_queue_size=200) # ------------------------------------------------------------------ # # DB Helpers # diff --git a/test_file_sync.py b/test_file_sync.py new file mode 100644 index 0000000..e1a41ce --- /dev/null +++ b/test_file_sync.py @@ -0,0 +1,16 @@ +import subprocess +import time + +def cleanup(): + # Remove files if they exist + import os + if os.path.exists("test_10mb_A.bin"): + os.remove("test_10mb_A.bin") + +def generate_10mb_file(): + with open("test_10mb_A.bin", "wb") as f: + import os + f.write(os.urandom(10 * 1024 * 1024)) + +if __name__ == "__main__": + generate_10mb_file() diff --git a/test_loopback.py b/test_loopback.py new file mode 100644 index 0000000..73643bd --- /dev/null +++ b/test_loopback.py @@ -0,0 +1,10 @@ +import subprocess +import time + +def generate_100mb_file(): + with open("big_test_file_100mb.bin", "wb") as f: + f.write(os.urandom(100 * 1024 * 1024)) + +if __name__ == "__main__": + import os + generate_100mb_file()