============================= test session starts ==============================
platform darwin -- Python 3.12.7, pytest-9.0.3, pluggy-1.6.0 -- /Users/axieyangb/Project/CortexAI/cortex-ai/bin/python3
cachedir: .pytest_cache
rootdir: /Users/axieyangb/Project/CortexAI/ai-hub
configfile: pytest.ini (WARNING: ignoring pytest config in pyproject.toml!)
plugins: anyio-4.13.0, tornasync-0.6.0.post2, mock-3.15.1, asyncio-1.3.0, trio-0.8.0
asyncio: mode=Mode.STRICT, debug=False, asyncio_default_fixture_loop_scope=None, asyncio_default_test_loop_scope=function
collecting ... collected 41 items
ai-hub/integration_tests/test_advanced_fs.py::TestAdvancedFS::test_mesh_move_atomic PASSED [ 2%]
ai-hub/integration_tests/test_advanced_fs.py::TestAdvancedFS::test_mesh_copy_atomic PASSED [ 4%]
ai-hub/integration_tests/test_advanced_fs.py::TestAdvancedFS::test_mesh_stat_speed PASSED [ 7%]
ai-hub/integration_tests/test_agents.py::test_agent_lifecycle_and_api_coverage FAILED [ 9%]
ai-hub/integration_tests/test_agents.py::test_agent_webhook_trigger FAILED [ 12%]
ai-hub/integration_tests/test_agents.py::test_agent_metrics_reset FAILED [ 14%]
ai-hub/integration_tests/test_audio.py::test_tts_voices PASSED [ 17%]
ai-hub/integration_tests/test_audio.py::test_tts_to_stt_lifecycle FAILED [ 19%]
ai-hub/integration_tests/test_browser_llm.py::test_browser_skill_weather FAILED [ 21%]
ai-hub/integration_tests/test_coworker_flow.py::test_coworker_sc1_mirror_check FAILED [ 24%]
ai-hub/integration_tests/test_coworker_flow.py::test_coworker_sc3_limit_check FAILED [ 26%]
ai-hub/integration_tests/test_coworker_flow.py::test_coworker_sc2_rework_loop FAILED [ 29%]
ai-hub/integration_tests/test_coworker_flow.py::test_coworker_sc4_context_compaction FAILED [ 31%]
ai-hub/integration_tests/test_coworker_full_journey.py::test_coworker_full_journey FAILED [ 34%]
ai-hub/integration_tests/test_file_sync.py::TestSmallFileSync::test_case1_write_from_node1_visible_on_node2_and_server PASSED [ 36%]
ai-hub/integration_tests/test_file_sync.py::TestSmallFileSync::test_case2_write_from_server_visible_on_all_nodes PASSED [ 39%]
ai-hub/integration_tests/test_file_sync.py::TestSmallFileSync::test_case3_delete_from_server_purges_client_nodes PASSED [ 41%]
ai-hub/integration_tests/test_file_sync.py::TestSmallFileSync::test_case4_delete_from_node2_purges_server_and_node1 PASSED [ 43%]
ai-hub/integration_tests/test_file_sync.py::TestSmallFileSync::test_case9_cat_deleted_file_returns_quickly_not_timeout PASSED [ 46%]
ai-hub/integration_tests/test_file_sync.py::TestSmallFileSync::test_case11_hub_pseudo_node_write_visibility PASSED [ 48%]
ai-hub/integration_tests/test_file_sync.py::TestNodeResync::test_case10_node_resync_after_restart SKIPPED [ 51%]
ai-hub/integration_tests/test_file_sync.py::TestLargeFileSync::test_case5_large_file_from_node1_to_server_and_node2 PASSED [ 53%]
ai-hub/integration_tests/test_file_sync.py::TestLargeFileSync::test_case6_large_file_from_server_to_all_nodes PASSED [ 56%]
ai-hub/integration_tests/test_file_sync.py::TestLargeFileSync::test_case7_delete_large_file_from_server_purges_nodes PASSED [ 58%]
ai-hub/integration_tests/test_file_sync.py::TestLargeFileSync::test_case8_delete_large_file_from_node2_purges_server_and_node1 PASSED [ 60%]
ai-hub/integration_tests/test_file_sync.py::TestGigabyteFileSync::test_case_1gb_sync_from_client_to_server_and_node FAILED [ 63%]
ai-hub/integration_tests/test_file_sync.py::TestSessionAutoPurge::test_session_lifecycle_cleanup PASSED [ 65%]
ai-hub/integration_tests/test_llm_chat.py::test_create_session_and_chat_gemini PASSED [ 68%]
ai-hub/integration_tests/test_login.py::test_login_success PASSED [ 70%]
ai-hub/integration_tests/test_login.py::test_login_failure_invalid_password PASSED [ 73%]
ai-hub/integration_tests/test_login.py::test_login_failure_invalid_user PASSED [ 75%]
ai-hub/integration_tests/test_node_registration.py::test_node_full_lifecycle_and_api_coverage FAILED [ 78%]
ai-hub/integration_tests/test_parallel_coworker.py::test_parallel_rubric_generation FAILED [ 80%]
ai-hub/integration_tests/test_provider_config.py::test_verify_llm_success_gemini PASSED [ 82%]
ai-hub/integration_tests/test_provider_config.py::test_verify_llm_failure_invalid_key PASSED [ 85%]
ai-hub/integration_tests/test_provider_config.py::test_update_user_llm_preferences PASSED [ 87%]
ai-hub/integration_tests/test_provider_config.py::test_verify_llm_success_gemini_masked_key_fallback PASSED [ 90%]
ai-hub/integration_tests/test_provider_config.py::test_verify_llm_unrecognized_provider PASSED [ 92%]
ai-hub/integration_tests/test_provider_config.py::test_get_provider_models PASSED [ 95%]
ai-hub/integration_tests/test_tools.py::test_mesh_file_explorer_none_path_and_session PASSED [ 97%]
ai-hub/integration_tests/test_tools.py::test_tool_service_node_id_validation PASSED [100%]
=================================== FAILURES ===================================
____________________ test_agent_lifecycle_and_api_coverage _____________________
def test_agent_lifecycle_and_api_coverage():
"""
Test suite covering Agent API endpoints:
1. Register Node
2. Register Template
3. Register Session
4. Register Instance
5. Register Trigger
6. Verify Agent Periodical Execution via Session Messages
7. List Agents
8. Stop Agent
9. Remove Agent
"""
node_id = f"test-agent-node-{uuid.uuid4().hex[:8]}"
admin_id = os.getenv("SYNC_TEST_USER_ID", "")
with httpx.Client(timeout=10.0) as client:
# 1. Register a test node specifically for this agent testing
node_payload = {
"node_id": node_id,
"display_name": "Agent Test Node",
"is_active": True,
"skill_config": {"shell": {"enabled": True}}
}
r_node = client.post(f"{BASE_URL}/nodes/admin", params={"admin_id": admin_id}, json=node_payload)
# If conflicts, clear first
if r_node.status_code in (400, 409):
client.delete(f"{BASE_URL}/nodes/admin/{node_id}", params={"admin_id": admin_id})
r_node = client.post(f"{BASE_URL}/nodes/admin", params={"admin_id": admin_id}, json=node_payload)
assert r_node.status_code == 200, f"Node registration failed: {r_node.text}"
# 2. Deploy Agent using the unified endpoint (matching UI behavior)
deploy_payload = {
"name": "Cron Print Agent",
"description": "Periodically prints to the node console",
"system_prompt": "You are a cron agent. Run shell tasks periodically.",
"max_loop_iterations": 1,
"mesh_node_id": node_id,
"provider_name": "gemini",
"trigger_type": "interval",
"interval_seconds": 5,
"default_prompt": "Hello test agent! Just reply the word 'Acknowledged' and nothing else.",
"initial_prompt": None
}
r_deploy = client.post(f"{BASE_URL}/agents/deploy", json=deploy_payload, headers=_headers())
assert r_deploy.status_code == 200, f"Deploy unified endpoint failed: {r_deploy.text}"
deploy_res = r_deploy.json()
> instance_id = deploy_res["instance_id"]
^^^^^^^^^^^^^^^^^^^^^^^^^
E TypeError: 'NoneType' object is not subscriptable
ai-hub/integration_tests/test_agents.py:60: TypeError
__________________________ test_agent_webhook_trigger __________________________
def test_agent_webhook_trigger():
"""
Test Agent Webhook Triggering:
1. Deploy agent with webhook trigger
2. Obtain secret token
3. Call webhook with custom prompt + token
4. Verify response contains custom prompt indicator
"""
node_id = f"test-webhook-node-{uuid.uuid4().hex[:8]}"
admin_id = os.getenv("SYNC_TEST_USER_ID", "")
with httpx.Client(timeout=10.0) as client:
# 1. Register a test node
node_payload = {
"node_id": node_id,
"display_name": "Webhook Test Node",
"is_active": True,
"skill_config": {"shell": {"enabled": True}}
}
client.post(f"{BASE_URL}/nodes/admin", params={"admin_id": admin_id}, json=node_payload)
# 2. Deploy Agent with Webhook Trigger
deploy_payload = {
"name": "Webhook Agent",
"system_prompt": "You are a helpful assistant. Just acknowledge the received webhook prompt.",
"max_loop_iterations": 1,
"mesh_node_id": node_id,
"provider_name": "gemini",
"trigger_type": "webhook",
"default_prompt": "Standard Webhook Prompt",
"initial_prompt": None
}
r_deploy = client.post(f"{BASE_URL}/agents/deploy", json=deploy_payload, headers=_headers())
assert r_deploy.status_code == 200, f"Deploy failed: {r_deploy.text}"
deploy_res = r_deploy.json()
> instance_id = deploy_res["instance_id"]
^^^^^^^^^^^^^^^^^^^^^^^^^
E TypeError: 'NoneType' object is not subscriptable
ai-hub/integration_tests/test_agents.py:150: TypeError
___________________________ test_agent_metrics_reset ___________________________
def test_agent_metrics_reset():
"""
Test Agent Metrics Tracking and Reset:
1. Deploy agent
2. Trigger sync webhook to generate metrics (tokens)
3. Verify metrics are non-zero in agent instance
4. Call /metrics/reset
5. Verify metrics are zeroed
"""
node_id = f"test-metrics-node-{uuid.uuid4().hex[:8]}"
admin_id = os.getenv("SYNC_TEST_USER_ID", "")
with httpx.Client(timeout=30.0) as client:
# 1. Register a test node
node_payload = {
"node_id": node_id,
"display_name": "Metrics Test Node",
"is_active": True,
"skill_config": {"shell": {"enabled": True}}
}
client.post(f"{BASE_URL}/nodes/admin", params={"admin_id": admin_id}, json=node_payload)
# 2. Deploy Agent
deploy_payload = {
"name": "Metrics Agent",
"system_prompt": "You are a helpful assistant.",
"max_loop_iterations": 1,
"mesh_node_id": node_id,
"provider_name": "gemini",
"trigger_type": "webhook",
"default_prompt": "Hello",
"initial_prompt": None
}
r_deploy = client.post(f"{BASE_URL}/agents/deploy", json=deploy_payload, headers=_headers())
assert r_deploy.status_code == 200
> instance_id = r_deploy.json()["instance_id"]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E TypeError: 'NoneType' object is not subscriptable
ai-hub/integration_tests/test_agents.py:237: TypeError
__________________________ test_tts_to_stt_lifecycle ___________________________
@contextlib.contextmanager
def map_httpcore_exceptions() -> typing.Iterator[None]:
global HTTPCORE_EXC_MAP
if len(HTTPCORE_EXC_MAP) == 0:
HTTPCORE_EXC_MAP = _load_httpcore_exceptions()
try:
> yield
cortex-ai/lib/python3.12/site-packages/httpx/_transports/default.py:101:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cortex-ai/lib/python3.12/site-packages/httpx/_transports/default.py:250: in handle_request
resp = self._pool.handle_request(req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
cortex-ai/lib/python3.12/site-packages/httpcore/_sync/connection_pool.py:256: in handle_request
raise exc from None
cortex-ai/lib/python3.12/site-packages/httpcore/_sync/connection_pool.py:236: in handle_request
response = connection.handle_request(
cortex-ai/lib/python3.12/site-packages/httpcore/_sync/connection.py:103: in handle_request
return self._connection.handle_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
cortex-ai/lib/python3.12/site-packages/httpcore/_sync/http11.py:136: in handle_request
raise exc
cortex-ai/lib/python3.12/site-packages/httpcore/_sync/http11.py:106: in handle_request
) = self._receive_response_headers(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
cortex-ai/lib/python3.12/site-packages/httpcore/_sync/http11.py:177: in _receive_response_headers
event = self._receive_event(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
cortex-ai/lib/python3.12/site-packages/httpcore/_sync/http11.py:217: in _receive_event
data = self._network_stream.read(
cortex-ai/lib/python3.12/site-packages/httpcore/_backends/sync.py:126: in read
with map_exceptions(exc_map):
^^^^^^^^^^^^^^^^^^^^^^^
/opt/anaconda3/lib/python3.12/contextlib.py:158: in __exit__
self.gen.throw(value)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
map = {<class 'TimeoutError'>: <class 'httpcore.ReadTimeout'>, <class 'OSError'>: <class 'httpcore.ReadError'>}
@contextlib.contextmanager
def map_exceptions(map: ExceptionMapping) -> typing.Iterator[None]:
try:
yield
except Exception as exc: # noqa: PIE786
for from_exc, to_exc in map.items():
if isinstance(exc, from_exc):
> raise to_exc(exc) from exc
E httpcore.ReadTimeout: timed out
cortex-ai/lib/python3.12/site-packages/httpcore/_exceptions.py:14: ReadTimeout
The above exception was the direct cause of the following exception:
def test_tts_to_stt_lifecycle():
"""
Test generating speech from text (TTS), then transcribing that audio
back to text (STT) to verify the full audio processing pipeline.
"""
user_id = os.environ.get("SYNC_TEST_USER_ID", "")
assert user_id, "User ID not found in environment from conftest."
test_phrase = "Hello from integration test audio pipeline."
with httpx.Client(timeout=30.0) as client:
# Step 1: Generate speech (TTS)
tts_payload = {
"text": test_phrase
}
r_tts = client.post(
f"{BASE_URL}/speech",
params={"stream": False},
headers=_headers(),
json=tts_payload
)
assert r_tts.status_code == 200, f"TTS failed: {r_tts.text}"
# Ensure we got audio bytes back
audio_content = r_tts.content
assert len(audio_content) > 1000, "TTS audio content seems too small"
# Step 2: Transcribe the generated audio (STT)
files = {
"audio_file": ("test_audio_pipeline.wav", audio_content, "audio/wav")
}
> r_stt = client.post(
f"{BASE_URL}/stt/transcribe",
headers=_headers(),
files=files
)
ai-hub/integration_tests/test_audio.py:51:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cortex-ai/lib/python3.12/site-packages/httpx/_client.py:1144: in post
return self.request(
cortex-ai/lib/python3.12/site-packages/httpx/_client.py:825: in request
return self.send(request, auth=auth, follow_redirects=follow_redirects)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
cortex-ai/lib/python3.12/site-packages/httpx/_client.py:914: in send
response = self._send_handling_auth(
cortex-ai/lib/python3.12/site-packages/httpx/_client.py:942: in _send_handling_auth
response = self._send_handling_redirects(
cortex-ai/lib/python3.12/site-packages/httpx/_client.py:979: in _send_handling_redirects
response = self._send_single_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
cortex-ai/lib/python3.12/site-packages/httpx/_client.py:1014: in _send_single_request
response = transport.handle_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
cortex-ai/lib/python3.12/site-packages/httpx/_transports/default.py:249: in handle_request
with map_httpcore_exceptions():
^^^^^^^^^^^^^^^^^^^^^^^^^
/opt/anaconda3/lib/python3.12/contextlib.py:158: in __exit__
self.gen.throw(value)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
@contextlib.contextmanager
def map_httpcore_exceptions() -> typing.Iterator[None]:
global HTTPCORE_EXC_MAP
if len(HTTPCORE_EXC_MAP) == 0:
HTTPCORE_EXC_MAP = _load_httpcore_exceptions()
try:
yield
except Exception as exc:
mapped_exc = None
for from_exc, to_exc in HTTPCORE_EXC_MAP.items():
if not isinstance(exc, from_exc):
continue
# We want to map to the most specific exception we can find.
# Eg if `exc` is an `httpcore.ReadTimeout`, we want to map to
# `httpx.ReadTimeout`, not just `httpx.TimeoutException`.
if mapped_exc is None or issubclass(to_exc, mapped_exc):
mapped_exc = to_exc
if mapped_exc is None: # pragma: no cover
raise
message = str(exc)
> raise mapped_exc(message) from exc
E httpx.ReadTimeout: timed out
cortex-ai/lib/python3.12/site-packages/httpx/_transports/default.py:118: ReadTimeout
__________________________ test_browser_skill_weather __________________________
@contextlib.contextmanager
def map_httpcore_exceptions() -> typing.Iterator[None]:
global HTTPCORE_EXC_MAP
if len(HTTPCORE_EXC_MAP) == 0:
HTTPCORE_EXC_MAP = _load_httpcore_exceptions()
try:
> yield
cortex-ai/lib/python3.12/site-packages/httpx/_transports/default.py:101:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cortex-ai/lib/python3.12/site-packages/httpx/_transports/default.py:250: in handle_request
resp = self._pool.handle_request(req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
cortex-ai/lib/python3.12/site-packages/httpcore/_sync/connection_pool.py:256: in handle_request
raise exc from None
cortex-ai/lib/python3.12/site-packages/httpcore/_sync/connection_pool.py:236: in handle_request
response = connection.handle_request(
cortex-ai/lib/python3.12/site-packages/httpcore/_sync/connection.py:103: in handle_request
return self._connection.handle_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
cortex-ai/lib/python3.12/site-packages/httpcore/_sync/http11.py:136: in handle_request
raise exc
cortex-ai/lib/python3.12/site-packages/httpcore/_sync/http11.py:106: in handle_request
) = self._receive_response_headers(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
cortex-ai/lib/python3.12/site-packages/httpcore/_sync/http11.py:177: in _receive_response_headers
event = self._receive_event(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
cortex-ai/lib/python3.12/site-packages/httpcore/_sync/http11.py:217: in _receive_event
data = self._network_stream.read(
cortex-ai/lib/python3.12/site-packages/httpcore/_backends/sync.py:126: in read
with map_exceptions(exc_map):
^^^^^^^^^^^^^^^^^^^^^^^
/opt/anaconda3/lib/python3.12/contextlib.py:158: in __exit__
self.gen.throw(value)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
map = {<class 'TimeoutError'>: <class 'httpcore.ReadTimeout'>, <class 'OSError'>: <class 'httpcore.ReadError'>}
@contextlib.contextmanager
def map_exceptions(map: ExceptionMapping) -> typing.Iterator[None]:
try:
yield
except Exception as exc: # noqa: PIE786
for from_exc, to_exc in map.items():
if isinstance(exc, from_exc):
> raise to_exc(exc) from exc
E httpcore.ReadTimeout: timed out
cortex-ai/lib/python3.12/site-packages/httpcore/_exceptions.py:14: ReadTimeout
The above exception was the direct cause of the following exception:
@pytest.mark.skipif(os.getenv("SKIP_DOCKER_NODES", "false").lower() == "true", reason="Browser skill requires a fully-loaded Docker container environment to access Chromium.")
def test_browser_skill_weather():
"""
Test explicitly asking the LLM context to leverage its browser skill
to fetch real-time data indicating that tool resolution and execution works.
"""
user_id = os.environ.get("SYNC_TEST_USER_ID", "")
assert user_id, "User ID not found in environment from conftest."
with httpx.Client(timeout=45.0) as client:
# Step 1: Create a new session bound to Gemini
session_payload = {
"user_id": user_id,
"provider_name": "gemini",
"feature_name": "agent_harness"
}
> r_sess = client.post(f"{BASE_URL}/sessions/", headers=_headers(), json=session_payload)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ai-hub/integration_tests/test_browser_llm.py:29:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cortex-ai/lib/python3.12/site-packages/httpx/_client.py:1144: in post
return self.request(
cortex-ai/lib/python3.12/site-packages/httpx/_client.py:825: in request
return self.send(request, auth=auth, follow_redirects=follow_redirects)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
cortex-ai/lib/python3.12/site-packages/httpx/_client.py:914: in send
response = self._send_handling_auth(
cortex-ai/lib/python3.12/site-packages/httpx/_client.py:942: in _send_handling_auth
response = self._send_handling_redirects(
cortex-ai/lib/python3.12/site-packages/httpx/_client.py:979: in _send_handling_redirects
response = self._send_single_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
cortex-ai/lib/python3.12/site-packages/httpx/_client.py:1014: in _send_single_request
response = transport.handle_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
cortex-ai/lib/python3.12/site-packages/httpx/_transports/default.py:249: in handle_request
with map_httpcore_exceptions():
^^^^^^^^^^^^^^^^^^^^^^^^^
/opt/anaconda3/lib/python3.12/contextlib.py:158: in __exit__
self.gen.throw(value)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
@contextlib.contextmanager
def map_httpcore_exceptions() -> typing.Iterator[None]:
global HTTPCORE_EXC_MAP
if len(HTTPCORE_EXC_MAP) == 0:
HTTPCORE_EXC_MAP = _load_httpcore_exceptions()
try:
yield
except Exception as exc:
mapped_exc = None
for from_exc, to_exc in HTTPCORE_EXC_MAP.items():
if not isinstance(exc, from_exc):
continue
# We want to map to the most specific exception we can find.
# Eg if `exc` is an `httpcore.ReadTimeout`, we want to map to
# `httpx.ReadTimeout`, not just `httpx.TimeoutException`.
if mapped_exc is None or issubclass(to_exc, mapped_exc):
mapped_exc = to_exc
if mapped_exc is None: # pragma: no cover
raise
message = str(exc)
> raise mapped_exc(message) from exc
E httpx.ReadTimeout: timed out
cortex-ai/lib/python3.12/site-packages/httpx/_transports/default.py:118: ReadTimeout
________________________ test_coworker_sc1_mirror_check ________________________
def test_coworker_sc1_mirror_check():
"""
SC-1 (Mirror Check):
1. Deploy an agent with co_worker_quality_gate=True.
2. Wait for the agent to initialize (Status: evaluating).
3. Use the /nodes/{id}/fs/ls API to verify the .cortex folder existence.
"""
node_id = os.getenv("SYNC_TEST_NODE1", "test-node-1")
admin_id = os.getenv("SYNC_TEST_USER_ID", "")
instance_id = None
with httpx.Client(timeout=90.0) as client:
try:
# 2. Deploy Agent with co_worker_quality_gate=True
deploy_payload = {
"name": "SC-1 Mirror Agent",
"description": "Tests .cortex mirror initialization",
"system_prompt": "You are a test agent. Create a simple hello world python script.",
"max_loop_iterations": 1,
"mesh_node_id": node_id,
"provider_name": "gemini",
"model_name": "gemini-3-flash-preview",
"trigger_type": "interval",
"interval_seconds": 60,
"co_worker_quality_gate": True,
"initial_prompt": "Create app.py that prints hello.",
}
r_deploy = client.post(f"{BASE_URL}/agents/deploy", json=deploy_payload, headers=_headers())
> assert r_deploy.status_code == 200, f"Deploy failed: {r_deploy.text}"
E AssertionError: Deploy failed: Internal Server Error
E assert 500 == 200
E + where 500 = <Response [500 Internal Server Error]>.status_code
ai-hub/integration_tests/test_coworker_flow.py:40: AssertionError
________________________ test_coworker_sc3_limit_check _________________________
def test_coworker_sc3_limit_check():
"""
SC-3 (Limit Check):
1. Deploy an agent with max_rework_attempts=1 and rework_threshold=100.
2. Trigger a run.
3. Poll the /agents endpoint until evaluation_status == 'failed_limit'.
4. Verify the latest_quality_score is present in the response.
"""
node_id = os.getenv("SYNC_TEST_NODE2", "test-node-2")
admin_id = os.getenv("SYNC_TEST_USER_ID", "")
instance_id = None
with httpx.Client(timeout=90.0) as client:
try:
# 2. Deploy Agent with max_rework_attempts=1 and rework_threshold=100
deploy_payload = {
"name": "SC-3 Limit Agent",
"system_prompt": "You are a test agent. Create a simple hello world python script.",
"max_loop_iterations": 2,
"mesh_node_id": node_id,
"provider_name": "gemini",
"model_name": "gemini-3-flash-preview",
"trigger_type": "webhook", # Use webhook to trigger manually
"co_worker_quality_gate": True,
"max_rework_attempts": 1,
"rework_threshold": 100, # Impossible to pass
"default_prompt": "Create app.py that prints hello.",
}
r_deploy = client.post(f"{BASE_URL}/agents/deploy", json=deploy_payload, headers=_headers())
assert r_deploy.status_code == 200, f"Deploy failed: {r_deploy.text}"
> instance_id = r_deploy.json()["instance_id"]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E TypeError: 'NoneType' object is not subscriptable
ai-hub/integration_tests/test_coworker_flow.py:113: TypeError
________________________ test_coworker_sc2_rework_loop _________________________
def test_coworker_sc2_rework_loop():
"""
SC-2 (The Rework Loop):
1. Deploy agent with a conflicting requirement to force at least one failure.
2. Poll for evaluation_status to be 'reworking'.
3. Verify history.log has a failed entry.
"""
node_id = os.getenv("SYNC_TEST_NODE1", "test-node-1")
admin_id = os.getenv("SYNC_TEST_USER_ID", "")
instance_id = None
with httpx.Client(timeout=90.0) as client:
try:
# 2. Deploy Agent with rework loop
deploy_payload = {
"name": "SC-2 Rework Agent",
"system_prompt": "You are a stubborn tester.",
"max_loop_iterations": 2,
"mesh_node_id": node_id,
"provider_name": "gemini",
"model_name": "gemini-3-flash-preview",
"trigger_type": "webhook",
"co_worker_quality_gate": True,
"max_rework_attempts": 3,
"rework_threshold": 101,
"default_prompt": "Create app.py that prints hello, but deliberately make a syntax error on your first try.",
}
r_deploy = client.post(f"{BASE_URL}/agents/deploy", json=deploy_payload, headers=_headers())
assert r_deploy.status_code == 200
> instance_id = r_deploy.json()["instance_id"]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E TypeError: 'NoneType' object is not subscriptable
ai-hub/integration_tests/test_coworker_flow.py:180: TypeError
_____________________ test_coworker_sc4_context_compaction _____________________
def test_coworker_sc4_context_compaction():
"""
SC-4 (Context Compaction):
1. Simply deploy an agent and mock/simulate conditions if actual compaction is hard to trigger.
(This is a placeholder test that checks the agent can take higher max_rework_attempts bounds.)
"""
node_id = os.getenv("SYNC_TEST_NODE2", "test-node-2")
admin_id = os.getenv("SYNC_TEST_USER_ID", "")
instance_id = None
with httpx.Client(timeout=90.0) as client:
try:
# Register node check removed (leveraging session-scoped nodes)
deploy_payload = {
"name": "SC-4 Compaction Agent",
"system_prompt": "Tester",
"max_loop_iterations": 1,
"mesh_node_id": node_id,
"provider_name": "gemini",
"model_name": "gemini-3-flash-preview",
"trigger_type": "interval",
"co_worker_quality_gate": True,
"max_rework_attempts": 5,
"rework_threshold": 95,
"default_prompt": "Placeholder",
}
r_deploy = client.post(f"{BASE_URL}/agents/deploy", json=deploy_payload, headers=_headers())
assert r_deploy.status_code == 200
> instance_id = r_deploy.json()["instance_id"]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E TypeError: 'NoneType' object is not subscriptable
ai-hub/integration_tests/test_coworker_flow.py:236: TypeError
__________________________ test_coworker_full_journey __________________________
def test_coworker_full_journey():
"""
CO-WORKER FULL JOURNEY INTEGRATION TEST:
1. Deploy agent with High Threshold (95%) to force rework.
2. Trigger via webhook.
3. Monitor 'evaluation_status' for real-time indicators.
4. Verify 'latest_quality_score' changes across attempts.
5. Audit .cortex/history.log for:
- 'duration' fields for every step.
- 'type': 'attempt' entries for rework tracking.
- 'type': 'event' entries for Rubric/Agent execution.
6. Ensure final result is delivered successfully.
"""
node_id = os.getenv("SYNC_TEST_NODE1", "test-node-1")
admin_id = os.getenv("SYNC_TEST_USER_ID", "9a333ccd-9c3f-432f-a030-7b1e1284a436")
instance_id = None
# We use a prompt that is simple but specific to ensure we can verify correctness
test_prompt = "Create a file named 'secret.txt' containing the word 'PHOENIX'. Then verify it exists."
with httpx.Client(timeout=60.0) as client:
try:
# 1. Register Node (noop if exists, but ensures it's in DB for this user)
client.post(f"{BASE_URL}/nodes/admin", params={"admin_id": admin_id}, json={
"node_id": node_id, "is_active": True, "skill_config": {"shell": {"enabled": True}, "sync": {"enabled": True}}
})
# 2. Deploy Agent with High Threshold
deploy_payload = {
"name": "Full Journey Agent",
"system_prompt": "You are an infrastructure specialist.",
"max_loop_iterations": 5,
"mesh_node_id": node_id,
"provider_name": "gemini",
"model_name": "gemini-3-flash-preview",
"trigger_type": "webhook",
"co_worker_quality_gate": True,
"max_rework_attempts": 2,
"rework_threshold": 101, # Impossible to pass to force multiple rework rounds
"default_prompt": test_prompt,
}
r_deploy = client.post(f"{BASE_URL}/agents/deploy", json=deploy_payload, headers=_headers())
assert r_deploy.status_code == 200, f"Deploy failed: {r_deploy.text}"
res = r_deploy.json()
> instance_id = res["instance_id"]
^^^^^^^^^^^^^^^^^^
E TypeError: 'NoneType' object is not subscriptable
ai-hub/integration_tests/test_coworker_full_journey.py:56: TypeError
____ TestGigabyteFileSync.test_case_1gb_sync_from_client_to_server_and_node ____
self = <test_file_sync.TestGigabyteFileSync object at 0x10b24bd10>
sync_client = <httpx.Client object at 0x11f907320>
swarm_session = 'session-33-4bb12ccb'
def test_case_1gb_sync_from_client_to_server_and_node(
self, sync_client, swarm_session
):
"""
Creates a 1 GB file on test-node-1 using the shell command `dd`.
Verifies that it syncs to both the server mirror and test-node-2.
"""
filename = _unique("gigabyte")
workspace = swarm_session
print(f"\\n[Case 1GB] Disabling memory limit checks and triggering 1GB creation on {NODE_1}...")
# Create a 512MB file consisting of zeros (highly compressible over the network) on NODE_1 directly.
# This will trigger the Inotify watcher to push chunks back up to the Hub.
# We output to the active session workspace path on the node.
is_native = os.environ.get("SKIP_DOCKER_NODES") == "true"
sync_dir = f"/tmp/cortex-sync-{NODE_1}" if is_native else "/tmp/cortex-sync"
dd_command = f"dd if=/dev/zero of={sync_dir}/{workspace}/{filename} bs=1M count=512"
r_disp = sync_client.post(
f"{NODES_PATH}/{NODE_1}/dispatch",
params={"user_id": _get_user_id()},
json={"command": dd_command},
headers=_headers(),
timeout=300.0
)
assert r_disp.status_code == 200, f"Failed to dispatch 512MB write to {NODE_1}"
# Give the agent node ample time to write to disk and push chunks over gRPC.
# Wait up to 300 seconds.
def _check_node2_ls():
r = sync_client.get(
f"{NODES_PATH}/{NODE_2}/fs/ls",
params={"path": ".", "session_id": workspace},
headers=_headers(),
timeout=30.0
)
if r.status_code != 200:
return False
for f in r.json().get("files", []):
# 512 MB (512 * 1024 * 1024 = 536870912)
if f.get("name") == filename and f.get("size", 0) >= 536870912:
return f
return False
print(f"[Case 512MB] Polling {NODE_2} for the file...")
node2_file = _poll_until(_check_node2_ls, timeout=600)
> assert node2_file, f"512MB file {filename} did not reach {NODE_2} within 600s in full size."
E AssertionError: 512MB file gigabyte_372d7c5c.txt did not reach test-node-2 within 600s in full size.
E assert False
ai-hub/integration_tests/test_file_sync.py:815: AssertionError
----------------------------- Captured stdout call -----------------------------
\n[Case 1GB] Disabling memory limit checks and triggering 1GB creation on test-node-1...
[Case 512MB] Polling test-node-2 for the file...
__________________ test_node_full_lifecycle_and_api_coverage ___________________
@pytest.mark.skipif(os.getenv("SKIP_DOCKER_NODES", "false").lower() == "true", reason="Node Lifecycle integration test requires Docker to spawn nodes imperatively.")
def test_node_full_lifecycle_and_api_coverage():
user_id = _get_user_id()
node_id = f"test-integration-node-{uuid.uuid4().hex[:8]}"
display_name = f"Integration Lifecycle {node_id}"
payload = {
"node_id": node_id,
"display_name": display_name,
"is_active": True,
"skill_config": {"shell": {"enabled": True}, "sync": {"enabled": True}}
}
try:
with httpx.Client(timeout=15.0) as client:
# --- ADMIN ENDPOINTS ---
# POST /nodes/admin
r_node = client.post(f"{BASE_URL}/nodes/admin", params={"admin_id": user_id}, json=payload, headers=_headers())
assert r_node.status_code == 200
node_data = r_node.json()
invite_token = node_data["invite_token"]
# GET /nodes/admin
r_list = client.get(f"{BASE_URL}/nodes/admin", params={"admin_id": user_id}, headers=_headers())
assert r_list.status_code == 200
assert any(n["node_id"] == node_id for n in r_list.json())
# GET /nodes/admin/{node_id}
r_get = client.get(f"{BASE_URL}/nodes/admin/{node_id}", params={"admin_id": user_id}, headers=_headers())
assert r_get.status_code == 200
# PATCH /nodes/admin/{node_id}
r_patch = client.patch(f"{BASE_URL}/nodes/admin/{node_id}", params={"admin_id": user_id}, json={"display_name": "Updated Name"}, headers=_headers())
assert r_patch.status_code == 200
# POST /nodes/admin/{node_id}/access
group_r = client.post(f"{BASE_URL}/users/admin/groups", json={"name": f"Group for {node_id}"}, headers=_headers())
group_id = group_r.json()["id"]
client.put(f"{BASE_URL}/users/admin/users/{user_id}/group", json={"group_id": group_id}, headers=_headers())
acc_r = client.post(f"{BASE_URL}/nodes/admin/{node_id}/access", params={"admin_id": user_id}, json={"group_id": group_id, "access_level": "use"}, headers=_headers())
assert acc_r.status_code == 200
# DELETE /nodes/admin/{node_id}/access/{group_id} (Revoke and re-grant for test)
rev_r = client.delete(f"{BASE_URL}/nodes/admin/{node_id}/access/{group_id}", params={"admin_id": user_id}, headers=_headers())
assert rev_r.status_code == 200
client.post(f"{BASE_URL}/nodes/admin/{node_id}/access", params={"admin_id": user_id}, json={"group_id": group_id, "access_level": "use"}, headers=_headers())
# GET /nodes/admin/{node_id}/config.yaml
conf_r = client.get(f"{BASE_URL}/nodes/admin/{node_id}/config.yaml", params={"admin_id": user_id}, headers=_headers())
assert conf_r.status_code == 200
# GET /nodes/provision/{node_id}
prov_r = client.get(f"{BASE_URL}/nodes/provision/{node_id}", params={"token": invite_token}, headers=_headers())
assert prov_r.status_code == 200
# GET /nodes/admin/{node_id}/download
dl_r = client.get(f"{BASE_URL}/nodes/admin/{node_id}/download", params={"admin_id": user_id}, headers=_headers())
> assert dl_r.status_code == 200
E assert 500 == 200
E + where 500 = <Response [500 Internal Server Error]>.status_code
ai-hub/integration_tests/test_node_registration.py:76: AssertionError
During handling of the above exception, another exception occurred:
@pytest.mark.skipif(os.getenv("SKIP_DOCKER_NODES", "false").lower() == "true", reason="Node Lifecycle integration test requires Docker to spawn nodes imperatively.")
def test_node_full_lifecycle_and_api_coverage():
user_id = _get_user_id()
node_id = f"test-integration-node-{uuid.uuid4().hex[:8]}"
display_name = f"Integration Lifecycle {node_id}"
payload = {
"node_id": node_id,
"display_name": display_name,
"is_active": True,
"skill_config": {"shell": {"enabled": True}, "sync": {"enabled": True}}
}
try:
with httpx.Client(timeout=15.0) as client:
# --- ADMIN ENDPOINTS ---
# POST /nodes/admin
r_node = client.post(f"{BASE_URL}/nodes/admin", params={"admin_id": user_id}, json=payload, headers=_headers())
assert r_node.status_code == 200
node_data = r_node.json()
invite_token = node_data["invite_token"]
# GET /nodes/admin
r_list = client.get(f"{BASE_URL}/nodes/admin", params={"admin_id": user_id}, headers=_headers())
assert r_list.status_code == 200
assert any(n["node_id"] == node_id for n in r_list.json())
# GET /nodes/admin/{node_id}
r_get = client.get(f"{BASE_URL}/nodes/admin/{node_id}", params={"admin_id": user_id}, headers=_headers())
assert r_get.status_code == 200
# PATCH /nodes/admin/{node_id}
r_patch = client.patch(f"{BASE_URL}/nodes/admin/{node_id}", params={"admin_id": user_id}, json={"display_name": "Updated Name"}, headers=_headers())
assert r_patch.status_code == 200
# POST /nodes/admin/{node_id}/access
group_r = client.post(f"{BASE_URL}/users/admin/groups", json={"name": f"Group for {node_id}"}, headers=_headers())
group_id = group_r.json()["id"]
client.put(f"{BASE_URL}/users/admin/users/{user_id}/group", json={"group_id": group_id}, headers=_headers())
acc_r = client.post(f"{BASE_URL}/nodes/admin/{node_id}/access", params={"admin_id": user_id}, json={"group_id": group_id, "access_level": "use"}, headers=_headers())
assert acc_r.status_code == 200
# DELETE /nodes/admin/{node_id}/access/{group_id} (Revoke and re-grant for test)
rev_r = client.delete(f"{BASE_URL}/nodes/admin/{node_id}/access/{group_id}", params={"admin_id": user_id}, headers=_headers())
assert rev_r.status_code == 200
client.post(f"{BASE_URL}/nodes/admin/{node_id}/access", params={"admin_id": user_id}, json={"group_id": group_id, "access_level": "use"}, headers=_headers())
# GET /nodes/admin/{node_id}/config.yaml
conf_r = client.get(f"{BASE_URL}/nodes/admin/{node_id}/config.yaml", params={"admin_id": user_id}, headers=_headers())
assert conf_r.status_code == 200
# GET /nodes/provision/{node_id}
prov_r = client.get(f"{BASE_URL}/nodes/provision/{node_id}", params={"token": invite_token}, headers=_headers())
assert prov_r.status_code == 200
# GET /nodes/admin/{node_id}/download
dl_r = client.get(f"{BASE_URL}/nodes/admin/{node_id}/download", params={"admin_id": user_id}, headers=_headers())
assert dl_r.status_code == 200
# POST /nodes/validate-token (Internal)
val_r = client.post(f"{BASE_URL}/nodes/validate-token", params={"token": invite_token, "node_id": node_id}, headers=_headers())
assert val_r.status_code == 200
# --- SPAWN NODE IMPERATIVELY ---
network = "cortexai_default"
image_proc = subprocess.run(["docker", "build", "-q", "./agent-node"], capture_output=True, text=True)
image_id = image_proc.stdout.strip()
subprocess.run([
"docker", "run", "-d", "--name", node_id, "--network", network,
"-e", f"HUB_URL=http://ai-hub:8000", "-e", f"AGENT_NODE_ID={node_id}", "-e", f"AGENT_AUTH_TOKEN={invite_token}",
"-e", "GRPC_ENDPOINT=ai-hub:50051", "-e", "AGENT_TLS_ENABLED=false",
"-v", f"{node_id}_sync:/tmp/cortex-sync", image_id
], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# Wait for connection
connected = False
for _ in range(150): # 300s timeout
st = client.get(f"{BASE_URL}/nodes/{node_id}/status", headers=_headers())
if st.status_code == 200 and st.json().get("status") == "online":
connected = True
break
time.sleep(1)
assert connected, "Node never successfully connected to Hub"
# --- USER ENDPOINTS ---
# GET /nodes/
n_list = client.get(f"{BASE_URL}/nodes/", params={"user_id": user_id}, headers=_headers())
assert n_list.status_code == 200
# GET /nodes/{node_id}/status (Already tested above)
# GET /nodes/{node_id}/terminal
term_r = client.get(f"{BASE_URL}/nodes/{node_id}/terminal", headers=_headers())
assert term_r.status_code == 200
# POST /nodes/{node_id}/dispatch
dp_r = client.post(f"{BASE_URL}/nodes/{node_id}/dispatch", params={"user_id": user_id}, json={"command": "ls -la /"}, headers=_headers())
assert dp_r.status_code == 200
task_id = dp_r.json().get("task_id")
# POST /nodes/{node_id}/cancel
can_r = client.post(f"{BASE_URL}/nodes/{node_id}/cancel", params={"task_id": task_id}, headers=_headers())
assert can_r.status_code == 200
# PATCH & GET /nodes/preferences
pref_p = client.patch(f"{BASE_URL}/nodes/preferences", params={"user_id": user_id}, json={"default_node_ids": [node_id]}, headers=_headers())
assert pref_p.status_code == 200
pref_g = client.get(f"{BASE_URL}/nodes/preferences", params={"user_id": user_id}, headers=_headers())
assert pref_g.status_code == 200
# --- FILE NAVIGATOR ENDPOINTS ---
r_sess = client.post(f"{BASE_URL}/sessions/", headers=_headers(), json={"user_id": user_id, "provider_name": "gemini", "feature_name": "swarm_control"})
sess_id = str(r_sess.json().get("id"))
# POST /fs/touch
test_fname = f"test_file_{uuid.uuid4().hex[:6]}.txt"
test_file_path = test_fname # NO LEADING SLASH
fs_touch = client.post(f"{BASE_URL}/nodes/{node_id}/fs/touch", json={"path": test_file_path, "is_dir": False, "session_id": sess_id}, headers=_headers())
assert fs_touch.status_code == 200, fs_touch.text
# GET /fs/ls (POLL)
found = False
for _ in range(5):
fs_ls = client.get(f"{BASE_URL}/nodes/{node_id}/fs/ls", params={"path": "/", "session_id": sess_id}, headers=_headers())
assert fs_ls.status_code == 200
items = fs_ls.json().get("files", [])
if any(item.get("name") == test_fname for item in items):
found = True
break
time.sleep(1)
assert found, f"Expected {test_fname} not found in ls output: {items}"
# POST /fs/upload
files = {"file": ("test_file2.txt", b"Hello Cortex!")}
fs_up = client.post(f"{BASE_URL}/nodes/{node_id}/fs/upload", params={"path": "/", "session_id": sess_id}, files=files, headers=_headers())
assert fs_up.status_code == 200
# GET /fs/cat (POLL)
found_content = False
for _ in range(5):
fs_cat = client.get(f"{BASE_URL}/nodes/{node_id}/fs/cat", params={"path": "test_file2.txt", "session_id": sess_id}, headers=_headers())
if fs_cat.status_code == 200 and getattr(fs_cat, "text", "") and "Hello Cortex!" in getattr(fs_cat, "text", ""):
found_content = True
break
time.sleep(1)
assert found_content, "Uploaded content not returned by cat"
# GET /fs/download
fs_dl = client.get(f"{BASE_URL}/nodes/{node_id}/fs/download", params={"path": "test_file2.txt", "session_id": sess_id}, headers=_headers())
assert fs_dl.status_code == 200
assert fs_dl.content == b"Hello Cortex!"
# POST /fs/rm (Delete both files)
fs_rm = client.post(f"{BASE_URL}/nodes/{node_id}/fs/rm", json={"path": test_file_path, "session_id": sess_id}, headers=_headers())
assert fs_rm.status_code == 200
client.post(f"{BASE_URL}/nodes/{node_id}/fs/rm", json={"path": "test_file2.txt", "session_id": sess_id}, headers=_headers())
# Verify deletion with GET /fs/cat returning 404 (POLL)
deleted = False
last_err = ""
for _ in range(5):
fs_cat_404 = client.get(f"{BASE_URL}/nodes/{node_id}/fs/cat", params={"path": test_file_path, "session_id": sess_id}, headers=_headers())
if fs_cat_404.status_code == 404:
deleted = True
break
else:
last_err = f"Code: {fs_cat_404.status_code}, Text: {fs_cat_404.text}"
time.sleep(1)
assert deleted, f"File was not deleted. Last response: {last_err}"
# --- TEARDOWN ---
# DELETE /nodes/admin/{node_id}
del_r = client.delete(f"{BASE_URL}/nodes/admin/{node_id}", params={"admin_id": user_id}, headers=_headers())
assert del_r.status_code == 200
finally:
> subprocess.run(["docker", "rm", "-f", node_id], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
ai-hub/integration_tests/test_node_registration.py:196:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/opt/anaconda3/lib/python3.12/subprocess.py:548: in run
with Popen(*popenargs, **kwargs) as process:
^^^^^^^^^^^^^^^^^^^^^^^^^^^
/opt/anaconda3/lib/python3.12/subprocess.py:1026: in __init__
self._execute_child(args, executable, preexec_fn, close_fds,
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Popen: returncode: 255 args: ['docker', 'rm', '-f', 'test-integration-node-...>
args = ['docker', 'rm', '-f', 'test-integration-node-911915b2']
executable = b'docker', preexec_fn = None, close_fds = True, pass_fds = ()
cwd = None, env = None, startupinfo = None, creationflags = 0, shell = False
p2cread = -1, p2cwrite = -1, c2pread = -1, c2pwrite = 15, errread = -1
errwrite = 15, restore_signals = True, gid = None, gids = None, uid = None
umask = -1, start_new_session = False, process_group = -1
def _execute_child(self, args, executable, preexec_fn, close_fds,
pass_fds, cwd, env,
startupinfo, creationflags, shell,
p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite,
restore_signals,
gid, gids, uid, umask,
start_new_session, process_group):
"""Execute program (POSIX version)"""
if isinstance(args, (str, bytes)):
args = [args]
elif isinstance(args, os.PathLike):
if shell:
raise TypeError('path-like args is not allowed when '
'shell is true')
args = [args]
else:
args = list(args)
if shell:
# On Android the default shell is at '/system/bin/sh'.
unix_shell = ('/system/bin/sh' if
hasattr(sys, 'getandroidapilevel') else '/bin/sh')
args = [unix_shell, "-c"] + args
if executable:
args[0] = executable
if executable is None:
executable = args[0]
sys.audit("subprocess.Popen", executable, args, cwd, env)
if (_USE_POSIX_SPAWN
and os.path.dirname(executable)
and preexec_fn is None
and not close_fds
and not pass_fds
and cwd is None
and (p2cread == -1 or p2cread > 2)
and (c2pwrite == -1 or c2pwrite > 2)
and (errwrite == -1 or errwrite > 2)
and not start_new_session
and process_group == -1
and gid is None
and gids is None
and uid is None
and umask < 0):
self._posix_spawn(args, executable, env, restore_signals,
p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite)
return
orig_executable = executable
# For transferring possible exec failure from child to parent.
# Data format: "exception name:hex errno:description"
# Pickle is not used; it is complex and involves memory allocation.
errpipe_read, errpipe_write = os.pipe()
# errpipe_write must not be in the standard io 0, 1, or 2 fd range.
low_fds_to_close = []
while errpipe_write < 3:
low_fds_to_close.append(errpipe_write)
errpipe_write = os.dup(errpipe_write)
for low_fd in low_fds_to_close:
os.close(low_fd)
try:
try:
# We must avoid complex work that could involve
# malloc or free in the child process to avoid
# potential deadlocks, thus we do all this here.
# and pass it to fork_exec()
if env is not None:
env_list = []
for k, v in env.items():
k = os.fsencode(k)
if b'=' in k:
raise ValueError("illegal environment variable name")
env_list.append(k + b'=' + os.fsencode(v))
else:
env_list = None # Use execv instead of execve.
executable = os.fsencode(executable)
if os.path.dirname(executable):
executable_list = (executable,)
else:
# This matches the behavior of os._execvpe().
executable_list = tuple(
os.path.join(os.fsencode(dir), executable)
for dir in os.get_exec_path(env))
fds_to_keep = set(pass_fds)
fds_to_keep.add(errpipe_write)
self.pid = _fork_exec(
args, executable_list,
close_fds, tuple(sorted(map(int, fds_to_keep))),
cwd, env_list,
p2cread, p2cwrite, c2pread, c2pwrite,
errread, errwrite,
errpipe_read, errpipe_write,
restore_signals, start_new_session,
process_group, gid, gids, uid, umask,
preexec_fn, _USE_VFORK)
self._child_created = True
finally:
# be sure the FD is closed no matter what
os.close(errpipe_write)
self._close_pipe_fds(p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite)
# Wait for exec to fail or succeed; possibly raising an
# exception (limited in size)
errpipe_data = bytearray()
while True:
part = os.read(errpipe_read, 50000)
errpipe_data += part
if not part or len(errpipe_data) > 50000:
break
finally:
# be sure the FD is closed no matter what
os.close(errpipe_read)
if errpipe_data:
try:
pid, sts = os.waitpid(self.pid, 0)
if pid == self.pid:
self._handle_exitstatus(sts)
else:
self.returncode = sys.maxsize
except ChildProcessError:
pass
try:
exception_name, hex_errno, err_msg = (
errpipe_data.split(b':', 2))
# The encoding here should match the encoding
# written in by the subprocess implementations
# like _posixsubprocess
err_msg = err_msg.decode()
except ValueError:
exception_name = b'SubprocessError'
hex_errno = b'0'
err_msg = 'Bad exception data from child: {!r}'.format(
bytes(errpipe_data))
child_exception_type = getattr(
builtins, exception_name.decode('ascii'),
SubprocessError)
if issubclass(child_exception_type, OSError) and hex_errno:
errno_num = int(hex_errno, 16)
if err_msg == "noexec:chdir":
err_msg = ""
# The error must be from chdir(cwd).
err_filename = cwd
elif err_msg == "noexec":
err_msg = ""
err_filename = None
else:
err_filename = orig_executable
if errno_num != 0:
err_msg = os.strerror(errno_num)
if err_filename is not None:
> raise child_exception_type(errno_num, err_msg, err_filename)
E FileNotFoundError: [Errno 2] No such file or directory: 'docker'
/opt/anaconda3/lib/python3.12/subprocess.py:1955: FileNotFoundError
_______________________ test_parallel_rubric_generation ________________________
def test_parallel_rubric_generation():
"""
Verifies that rubric generation and main agent execution happen in parallel.
We check for specific status transitions that indicate parallel work.
"""
node_id = os.getenv("SYNC_TEST_NODE1", "test-node-1")
instance_id = None
with httpx.Client(timeout=30.0) as client:
try:
# 1. Deploy Agent with co_worker_quality_gate=True
deploy_payload = {
"name": "Parallel Coworker Test",
"description": "Tests parallel rubric generation",
"system_prompt": "You are a helpful assistant. Provide a brief summary of the history of the internet.",
"max_loop_iterations": 1,
"mesh_node_id": node_id,
"provider_name": "gemini",
"model_name": "gemini-3-flash-preview",
"trigger_type": "webhook",
"co_worker_quality_gate": True,
"default_prompt": "Tell me about the history of the internet.",
}
r_deploy = client.post(f"{BASE_URL}/agents/deploy", json=deploy_payload, headers=_headers())
assert r_deploy.status_code == 200, f"Deploy failed: {r_deploy.text}"
> instance_id = r_deploy.json()["instance_id"]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E TypeError: 'NoneType' object is not subscriptable
ai-hub/integration_tests/test_parallel_coworker.py:36: TypeError
=========================== short test summary info ============================
FAILED ai-hub/integration_tests/test_agents.py::test_agent_lifecycle_and_api_coverage
FAILED ai-hub/integration_tests/test_agents.py::test_agent_webhook_trigger - ...
FAILED ai-hub/integration_tests/test_agents.py::test_agent_metrics_reset - Ty...
FAILED ai-hub/integration_tests/test_audio.py::test_tts_to_stt_lifecycle - ht...
FAILED ai-hub/integration_tests/test_browser_llm.py::test_browser_skill_weather
FAILED ai-hub/integration_tests/test_coworker_flow.py::test_coworker_sc1_mirror_check
FAILED ai-hub/integration_tests/test_coworker_flow.py::test_coworker_sc3_limit_check
FAILED ai-hub/integration_tests/test_coworker_flow.py::test_coworker_sc2_rework_loop
FAILED ai-hub/integration_tests/test_coworker_flow.py::test_coworker_sc4_context_compaction
FAILED ai-hub/integration_tests/test_coworker_full_journey.py::test_coworker_full_journey
FAILED ai-hub/integration_tests/test_file_sync.py::TestGigabyteFileSync::test_case_1gb_sync_from_client_to_server_and_node
FAILED ai-hub/integration_tests/test_node_registration.py::test_node_full_lifecycle_and_api_coverage
FAILED ai-hub/integration_tests/test_parallel_coworker.py::test_parallel_rubric_generation
============ 13 failed, 27 passed, 1 skipped in 11504.85s (3:11:44) ============