Newer
Older
cortex-hub / test_terminal.py
import asyncio
import websockets
import json
import uuid

async def test_terminal():
    user_id = "37471c66-9da0-42a5-8b00-8b2f5bb46baa"
    node_id = "media-windows-server"
    uri = f"wss://ai.jerxie.com/api/v1/nodes/stream/all?user_id={user_id}"
    
    async with websockets.connect(uri) as websocket:
        print("Connected to Hub WebSocket")
        
        # 1. Wait for initial snapshot
        msg = await websocket.recv()
        print(f"Recv: {msg[:100]}...")
        
        # 2. Start a terminal session (implicitly by sending a character if session doesn't exist)
        session_id = str(uuid.uuid4())
        print(f"Using Session ID: {session_id}")
        
        # 3. Send a command
        cmd_msg = {
            "action": "dispatch",
            "node_id": node_id,
            "session_id": session_id,
            "command": "hostname"
        }
        await websocket.send(json.dumps(cmd_msg))
        print("Sent hostname command")
        
        # 4. Wait for output
        while True:
            try:
                msg = await asyncio.wait_for(websocket.recv(), timeout=10.0)
                data = json.loads(msg)
                print(f"Event: {data.get('event')} | Data: {str(data.get('data'))[:100]}")
                if data.get("event") in ("task_complete", "skill_event"):
                    if "hostname" in str(data.get("data")):
                        print("SUCCESS: Received hostname in output!")
                        return True
            except asyncio.TimeoutError:
                print("FAILED: Timeout waiting for output")
                return False

if __name__ == "__main__":
    asyncio.run(test_terminal())