Newer
Older
cortex-hub / ai-hub / integration_tests / test_interactive_terminal.py
import pytest
import os
import asyncio
import websockets
import json
import uuid
import httpx
from conftest import BASE_URL

def _headers():
    uid = os.getenv("SYNC_TEST_USER_ID", "")
    return {"X-User-ID": uid}

@pytest.mark.slow
@pytest.mark.asyncio
async def test_interactive_terminal():
    """
    Test interactive terminal control over WebSocket.
    1. Connect to WebSocket /nodes/{node_id}/stream
    2. Send terminal_in action with a command
    3. Verify output is received
    """
    node_id = os.getenv("SYNC_TEST_NODE1", "test-node-1")
    user_id = os.getenv("SYNC_TEST_USER_ID", "")
    
    # Construct WebSocket URL
    # BASE_URL is like http://127.0.0.1:8002/api/v1
    ws_url = BASE_URL.replace("http://", "ws://") + f"/nodes/{node_id}/stream?user_id={user_id}"
    
    print(f"\n[test] Connecting to WebSocket: {ws_url}")
    
    try:
        async with websockets.connect(ws_url) as ws:
            # 1. Wait for initial snapshot
            msg = await ws.recv()
            data = json.loads(msg)
            print(f"[test] Received event: {data.get('event')}")
            assert data.get("event") in ["snapshot", "initial_snapshot"]
            
            # 2. Send terminal_in
            session_id = f"test-session-{uuid.uuid4().hex[:6]}"
            cmd_input = "echo 'Hello from WebSocket'\n"
            
            await ws.send(json.dumps({
                "action": "terminal_in",
                "data": cmd_input,
                "session_id": session_id
            }))
            print(f"[test] Sent terminal_in: {cmd_input.strip()}")
            
            # 3. Wait for output
            found_output = False
            for _ in range(30): # 30 seconds timeout
                try:
                    msg = await asyncio.wait_for(ws.recv(), timeout=1.0)
                    data = json.loads(msg)
                    print(f"[test] Received event: {data.get('event')}")
                    
                    if data.get("event") == "skill_event":
                        skill_data = data.get("data", {})
                        if skill_data.get("type") == "output":
                            output_text = skill_data.get("data", "")
                            print(f"[test] Terminal Output: {output_text}")
                            if "Hello from WebSocket" in output_text:
                                found_output = True
                                break
                except asyncio.TimeoutError:
                    continue
                
            assert found_output, "Failed to receive expected output from terminal"
            
    except Exception as e:
        pytest.fail(f"WebSocket test failed: {e}")