import threading
import time
import logging
from typing import Any
from .transport import IMeshTransport, IMeshListener
logger = logging.getLogger(__name__)
class MockMeshTransport(IMeshTransport):
"""
Mock implementation of Mesh Transport for local testing.
Simulates Hub behavior without any network calls.
"""
def __init__(self, node_id: str):
self.node_id = node_id
self.listener = None
self._connected = False
self._stop_event = threading.Event()
def set_listener(self, listener: IMeshListener):
self.listener = listener
def connect(self):
self._connected = True
self._stop_event.clear()
logger.info(f"[*] Mock Mesh Transport Connected for {self.node_id}")
# Simulate initial server handshake if needed
# In a real test, we would use 'mock_server.trigger_message(msg)'
def send(self, message: Any):
kind = message.WhichOneof('payload')
logger.info(f"[Mock-Sent] To Hub: {kind}")
# Auto-responder logic for tests
if kind == 'announce':
pass # Server usually doesn't reply to announce directly via TaskStream
def close(self):
self._connected = False
self._stop_event.set()
self.listener.on_close()
def is_connected(self) -> bool:
return self._connected
def simulate_server_message(self, message: Any):
"""Helper for test suites to inject messages into the agent."""
if self._connected:
self.listener.on_message(message)