import sys
import os
import unittest
from unittest.mock import MagicMock
# Add mesh_core dir directly to bypass __init__.py which requires protobuf
_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(os.path.join(_root, "mesh_core"))
# Mock agent_pb2 since we don't have protobuf installed
class MockProto:
def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
def WhichOneof(self, name):
return self._kind
def SerializeToString(self, **kwargs):
return b""
mock_pb2 = MagicMock()
mock_pb2.ServerTaskMessage = lambda **kwargs: MockProto(_kind='task_request', **kwargs)
mock_pb2.ClientTaskMessage = lambda **kwargs: MockProto(_kind='announce', **kwargs)
sys.modules['agent_pb2'] = mock_pb2
sys.modules['mesh_core.models'] = MagicMock()
sys.modules['mesh_core.models.agent_pb2'] = mock_pb2
# Add mesh-sdk dir
sys.path.append(_root)
# Now import the modules directly
import mesh_core.engines.server as server_mod
import mesh_core.engines.node as node_mod
import mesh_core.utils.data as data_mod
MeshServerCore = server_mod.MeshServerCore
MeshNodeCore = node_mod.MeshNodeCore
DataChunker = data_mod.DataChunker
DataReassembler = data_mod.DataReassembler
class TestMeshRobustness(unittest.TestCase):
def test_chunking_integrity(self):
"""Verifies that DataChunker and DataReassembler maintain byte integrity."""
data = b"Hello Mesh " * 100
import hashlib
h = hashlib.sha256(data).hexdigest()
chunks = list(DataChunker.chunk_bytes(data, 50))
reassembler = DataReassembler(expected_hash=h)
for i, c in enumerate(chunks):
reassembler.add_chunk(i, c)
self.assertEqual(reassembler.get_full_data(), data)
print("✅ Chunking Integrity Passed")
def test_server_node_registration(self):
"""Verifies that MeshServerCore correctly tracks nodes."""
server = MeshServerCore()
online = []
server.on_node_online = lambda node: online.append(node.node_id)
transport = MagicMock()
server.register_node("node-1", "user-1", {}, transport)
self.assertIn("node-1", online)
self.assertEqual(len(server.list_nodes()), 1)
print("✅ Node Registration Passed")
def test_idempotency_recovery_logic(self):
"""Verifies that we can identify idempotent tasks for recovery."""
# This tests the logic we added to TaskAssistant
idempotent_types = {"shell", "ls", "cat"}
tasks = [
{"id": "t1", "type": "shell"},
{"id": "t2", "type": "write"},
{"id": "t3", "type": "ls"}
]
recovered = [t["id"] for t in tasks if t["type"] in idempotent_types]
self.assertEqual(recovered, ["t1", "t3"])
print("✅ Recovery Logic Passed")
def test_windows_terminal_compatibility(self):
"""Verifies Windows terminal backspace and env mapping logic."""
# 1. Backspace check
input_data = b"dir\x7f"
text = input_data.decode('utf-8', errors='replace')
text = text.replace('\x7f', '\x08')
self.assertEqual(text, "dir\x08")
# 2. Env Block conversion check
env = {"TERM": "dumb", "PATH": "C:\\Windows"}
if isinstance(env, dict):
env_str = "".join(f"{k}={v}\0" for k, v in env.items()) + "\0"
expected_str = "TERM=dumb\0PATH=C:\\Windows\0\0"
self.assertEqual(env_str, expected_str)
print("✅ Windows Compatibility Checks Passed")
if __name__ == "__main__":
unittest.main()