import sys
import os
import unittest
from unittest.mock import MagicMock
# Add mesh_core dir directly to bypass __init__.py which requires protobuf
_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(os.path.join(_root, "mesh_core"))
# Mock agent_pb2 since we don't have protobuf installed
class MockProto:
def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
def WhichOneof(self, name):
return self._kind
def SerializeToString(self, **kwargs):
return b""
mock_pb2 = MagicMock()
mock_pb2.ServerTaskMessage = lambda **kwargs: MockProto(_kind='task_request', **kwargs)
mock_pb2.ClientTaskMessage = lambda **kwargs: MockProto(_kind='announce', **kwargs)
sys.modules['agent_pb2'] = mock_pb2
sys.modules['mesh_core.models'] = MagicMock()
sys.modules['mesh_core.models.agent_pb2'] = mock_pb2
# Add mesh_core dir directly
_mesh_core_path = os.path.join(_root, "mesh_core")
sys.path.append(_mesh_core_path)
sys.path.append(os.path.join(_mesh_core_path, "engines"))
sys.path.append(os.path.join(_mesh_core_path, "transport"))
sys.path.append(os.path.join(_mesh_core_path, "utils"))
# Now import the modules directly
import server as server_mod
import node as node_mod
import data as data_mod
MeshServerCore = server_mod.MeshServerCore
MeshNodeCore = node_mod.MeshNodeCore
DataChunker = data_mod.DataChunker
DataReassembler = data_mod.DataReassembler
class TestMeshRobustness(unittest.TestCase):
def test_chunking_integrity(self):
"""Verifies that DataChunker and DataReassembler maintain byte integrity."""
data = b"Hello Mesh " * 100
import hashlib
h = hashlib.sha256(data).hexdigest()
chunks = list(DataChunker.chunk_bytes(data, 50))
reassembler = DataReassembler(expected_hash=h)
for i, c in enumerate(chunks):
reassembler.add_chunk(i, c)
self.assertEqual(reassembler.get_full_data(), data)
print("✅ Chunking Integrity Passed")
def test_server_node_registration(self):
"""Verifies that MeshServerCore correctly tracks nodes."""
server = MeshServerCore()
online = []
server.on_node_online = lambda node: online.append(node.node_id)
transport = MagicMock()
server.register_node("node-1", "user-1", {}, transport)
self.assertIn("node-1", online)
self.assertEqual(len(server.list_nodes()), 1)
print("✅ Node Registration Passed")
def test_idempotency_recovery_logic(self):
"""Verifies that we can identify idempotent tasks for recovery."""
# This tests the logic we added to TaskAssistant
idempotent_types = {"shell", "ls", "cat"}
tasks = [
{"id": "t1", "type": "shell"},
{"id": "t2", "type": "write"},
{"id": "t3", "type": "ls"}
]
recovered = [t["id"] for t in tasks if t["type"] in idempotent_types]
self.assertEqual(recovered, ["t1", "t3"])
print("✅ Recovery Logic Passed")
if __name__ == "__main__":
unittest.main()