diff --git a/ai-hub/app/core/services/node_registry.py b/ai-hub/app/core/services/node_registry.py index 807e996..4054a9a 100644 --- a/ai-hub/app/core/services/node_registry.py +++ b/ai-hub/app/core/services/node_registry.py @@ -17,10 +17,10 @@ import uuid import asyncio import collections +import itertools from datetime import datetime from typing import Dict, Optional, List, Any from concurrent.futures import ThreadPoolExecutor -import queue class BoundedThreadPoolExecutor(ThreadPoolExecutor): """ @@ -68,6 +68,7 @@ self.metadata = metadata # desc, caps (capabilities dict) # Bounded queue to prevent memory ballooning; 100 * 4MB chunks = 400MB max memory per connected node self.queue: queue.PriorityQueue = queue.PriorityQueue(maxsize=100) + self._send_counter = itertools.count() self.stats: dict = { "active_worker_count": 0, "cpu_usage_percent": 0.0, @@ -95,7 +96,7 @@ logger.warning(f"[Mesh] Dispatch via MeshCore failed for {self.node_id}: {e}. Falling back to legacy queue.") # Legacy Fallback - item = (priority, time.time(), msg) + item = (priority, next(self._send_counter), msg) self._dispatch_to_queue(item) def _dispatch_to_queue(self, item): diff --git a/mesh-sdk/mesh_core/transport/grpc.py b/mesh-sdk/mesh_core/transport/grpc.py index 65c3a29..0f227c6 100644 --- a/mesh-sdk/mesh_core/transport/grpc.py +++ b/mesh-sdk/mesh_core/transport/grpc.py @@ -6,6 +6,7 @@ import urllib.error import json import os +import itertools from typing import Any, Optional, Callable, Union from ..models import agent_pb2, agent_pb2_grpc from .base import IMeshTransport, IMeshListener @@ -33,7 +34,7 @@ self._connected = False self._health_thread_started = False self.last_activity = 0 - self._send_seq = 0 # tiebreaker so protobuf messages are never compared + self._send_counter = itertools.count() # thread-safe atomic counter; avoids protobuf comparison in heapq def handshake(self) -> bool: self._refresh_stub() @@ -182,8 +183,7 @@ time.sleep(backoff) def send(self, message: Any, priority: int = 1): - self._send_seq += 1 - self.send_queue.put((priority, self._send_seq, message)) + self.send_queue.put((priority, next(self._send_counter), message)) def send_health(self, heartbeat: Any): """Sends a heartbeat via the dedicated health stream.""" @@ -241,7 +241,7 @@ self.send_queue = queue.PriorityQueue() self.listener = None self.signer = signer - self._send_seq = 0 + self._send_counter = itertools.count() def handshake(self) -> bool: return True # Handshake handled by Servicer @@ -259,8 +259,7 @@ msg_bytes = message.SerializeToString(deterministic=True) message.signature = self.signer(msg_bytes) - self._send_seq += 1 - self.send_queue.put((priority, self._send_seq, message)) + self.send_queue.put((priority, next(self._send_counter), message)) def close(self): try: