import pytest
import os
import asyncio
import websockets
import json
import uuid
import httpx
from conftest import BASE_URL
def _headers():
uid = os.getenv("SYNC_TEST_USER_ID", "")
return {"X-User-ID": uid}
@pytest.mark.slow
@pytest.mark.asyncio
async def test_interactive_terminal():
"""
Test interactive terminal control over WebSocket.
1. Connect to WebSocket /nodes/{node_id}/stream
2. Send terminal_in action with a command
3. Verify output is received
"""
node_id = os.getenv("SYNC_TEST_NODE1", "test-node-1")
user_id = os.getenv("SYNC_TEST_USER_ID", "")
# Construct WebSocket URL
# BASE_URL is like http://127.0.0.1:8002/api/v1
ws_url = BASE_URL.replace("http://", "ws://") + f"/nodes/{node_id}/stream?user_id={user_id}"
print(f"\n[test] Connecting to WebSocket: {ws_url}")
try:
async with websockets.connect(ws_url) as ws:
# 1. Wait for initial snapshot
msg = await ws.recv()
data = json.loads(msg)
print(f"[test] Received event: {data.get('event')}")
assert data.get("event") in ["snapshot", "initial_snapshot"]
# 2. Send terminal_in
session_id = f"test-session-{uuid.uuid4().hex[:6]}"
cmd_input = "echo 'Hello from WebSocket'\n"
await ws.send(json.dumps({
"action": "terminal_in",
"data": cmd_input,
"session_id": session_id
}))
print(f"[test] Sent terminal_in: {cmd_input.strip()}")
# 3. Wait for output
found_output = False
for _ in range(30): # 30 seconds timeout
try:
msg = await asyncio.wait_for(ws.recv(), timeout=1.0)
data = json.loads(msg)
print(f"[test] Received event: {data.get('event')}")
if data.get("event") == "skill_event":
skill_data = data.get("data", {})
if skill_data.get("type") == "output":
output_text = skill_data.get("data", "")
print(f"[test] Terminal Output: {output_text}")
if "Hello from WebSocket" in output_text:
found_output = True
break
except asyncio.TimeoutError:
continue
assert found_output, "Failed to receive expected output from terminal"
except Exception as e:
pytest.fail(f"WebSocket test failed: {e}")