diff --git a/mesh-sdk/mesh_core/transport/grpc.py b/mesh-sdk/mesh_core/transport/grpc.py index 2cd8716..65c3a29 100644 --- a/mesh-sdk/mesh_core/transport/grpc.py +++ b/mesh-sdk/mesh_core/transport/grpc.py @@ -33,6 +33,7 @@ self._connected = False self._health_thread_started = False self.last_activity = 0 + self._send_seq = 0 # tiebreaker so protobuf messages are never compared def handshake(self) -> bool: self._refresh_stub() @@ -141,7 +142,7 @@ last_heartbeat = time.time() while not self._stop_event.is_set(): try: - # PriorityQueue returns (priority, ts, msg) + # PriorityQueue returns (priority, seq, msg) item = self.send_queue.get(timeout=1.0) yield item[2] except queue.Empty: @@ -181,10 +182,8 @@ time.sleep(backoff) def send(self, message: Any, priority: int = 1): - # PriorityQueue expects (priority, timestamp, item) - # Note: Client messages are not currently signed by the SDK, - # but this is where we would add it if needed. - self.send_queue.put((priority, time.time(), message)) + self._send_seq += 1 + self.send_queue.put((priority, self._send_seq, message)) def send_health(self, heartbeat: Any): """Sends a heartbeat via the dedicated health stream.""" @@ -242,6 +241,7 @@ self.send_queue = queue.PriorityQueue() self.listener = None self.signer = signer + self._send_seq = 0 def handshake(self) -> bool: return True # Handshake handled by Servicer @@ -259,8 +259,8 @@ msg_bytes = message.SerializeToString(deterministic=True) message.signature = self.signer(msg_bytes) - # PriorityQueue expects (priority, timestamp, item) to ensure stable ordering - self.send_queue.put((priority, time.time(), message)) + self._send_seq += 1 + self.send_queue.put((priority, self._send_seq, message)) def close(self): try: