diff --git a/agent-node/Dockerfile b/agent-node/Dockerfile index cdddab1..46f0b30 100644 --- a/agent-node/Dockerfile +++ b/agent-node/Dockerfile @@ -4,6 +4,7 @@ # Set environment variables ENV PYTHONDONTWRITEBYTECODE 1 ENV PYTHONUNBUFFERED 1 +ENV PYTHONPATH=/app # Install system dependencies for psutil and networking tools RUN apt-get update && apt-get install -y \ diff --git a/agent-node/src/agent_node/config.py b/agent-node/src/agent_node/config.py index a31d17c..d2390ef 100644 --- a/agent-node/src/agent_node/config.py +++ b/agent-node/src/agent_node/config.py @@ -84,8 +84,8 @@ NODE_ID = os.getenv("AGENT_NODE_ID", _config.get("node_id", _defaults["node_id"])) NODE_DESC = os.getenv("AGENT_NODE_DESC", _config.get("node_description", _defaults["node_description"])) SERVER_HOST_PORT = os.getenv("GRPC_ENDPOINT", _config.get("grpc_endpoint", _defaults["grpc_endpoint"])) - AUTH_TOKEN = os.getenv("AGENT_AUTH_TOKEN", _config.get("auth_token", _defaults["auth_token"])) - SYNC_DIR = os.getenv("CORTEX_SYNC_DIR", _config.get("sync_root", _defaults["sync_root"])) + AUTH_TOKEN = os.getenv("AGENT_AUTH_TOKEN") or _config.get("auth_token", _defaults["auth_token"]) + SYNC_DIR = os.getenv("CORTEX_SYNC_DIR") or _config.get("sync_root", _defaults["sync_root"]) TLS_ENABLED = os.getenv("AGENT_TLS_ENABLED", str(_config.get("tls", _defaults["tls"]))).lower() == 'true' HEALTH_REPORT_INTERVAL = int(os.getenv("HEALTH_REPORT_INTERVAL", _config.get("health_report_interval", _defaults["health_report_interval"]))) diff --git a/agent-node/src/agent_node/skills/terminal_backends.py b/agent-node/src/agent_node/skills/terminal_backends.py index 73116f5..f2fc193 100644 --- a/agent-node/src/agent_node/skills/terminal_backends.py +++ b/agent-node/src/agent_node/skills/terminal_backends.py @@ -44,11 +44,12 @@ def __init__(self): self.pid = None self.fd = None + self.process = None def spawn(self, cwd=None, env=None): - import pty import os import fcntl + import subprocess shell_path = "/bin/bash" if not os.path.exists(shell_path): @@ -59,18 +60,36 @@ if "TERM" not in env: env["TERM"] = "xterm-256color" - pid, fd = pty.fork() - if pid == 0: - if cwd: - try: os.chdir(cwd) - except: pass - os.execvpe(shell_path, [shell_path], env) - else: - self.pid = pid - self.fd = fd - - fl = fcntl.fcntl(self.fd, fcntl.F_GETFL) - fcntl.fcntl(self.fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) + master_fd, slave_fd = os.openpty() + + def set_ctty(): + os.setsid() + import fcntl + import termios + try: + fcntl.ioctl(slave_fd, termios.TIOCSCTTY, 0) + except: + pass + os.close(master_fd) + + self.process = subprocess.Popen( + [shell_path], + stdin=slave_fd, + stdout=slave_fd, + stderr=slave_fd, + cwd=cwd, + env=env, + preexec_fn=set_ctty, + close_fds=True + ) + + self.pid = self.process.pid + self.fd = master_fd + + os.close(slave_fd) + + fl = fcntl.fcntl(self.fd, fcntl.F_GETFL) + fcntl.fcntl(self.fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) def read(self, size=4096) -> bytes: if self.fd is None: @@ -105,7 +124,16 @@ pass def kill(self): - if self.pid: + if self.process: + try: + self.process.kill() + self.process.wait(timeout=5) + except Exception: + pass + self.process = None + self.pid = None + self.fd = None + elif self.pid: import os import signal try: @@ -117,6 +145,8 @@ self.fd = None def is_alive(self) -> bool: + if self.process: + return self.process.poll() is None if self.pid is None: return False import os diff --git a/agent-node/src/agent_node/utils/network.py b/agent-node/src/agent_node/utils/network.py index 65e1926..473c326 100644 --- a/agent-node/src/agent_node/utils/network.py +++ b/agent-node/src/agent_node/utils/network.py @@ -7,8 +7,8 @@ """Initializes a gRPC channel (Secure or Insecure) and returns the orchestrator stub.""" options = [ - ('grpc.keepalive_time_ms', 10000), # Send keepalive ping every 10s - ('grpc.keepalive_timeout_ms', 10000), # Wait 10s for pong + ('grpc.keepalive_time_ms', 60000), # Send keepalive ping every 60s + ('grpc.keepalive_timeout_ms', 20000), # Wait 20s for pong ('grpc.keepalive_permit_without_calls', True), ('grpc.http2.max_pings_without_data', 0), # Allow infinite pings ('grpc.max_receive_message_length', 128 * 1024 * 1024), diff --git a/ai-hub/app/core/grpc/services/grpc_server.py b/ai-hub/app/core/grpc/services/grpc_server.py index 1271a50..24fbb23 100644 --- a/ai-hub/app/core/grpc/services/grpc_server.py +++ b/ai-hub/app/core/grpc/services/grpc_server.py @@ -251,16 +251,20 @@ """Persistent bi-directional stream for dispatching work and collecting results.""" node_id = "unknown" try: + logger.info(f"[gRPC] Waiting for first message in TaskStream...") first_msg = next(request_iterator) + logger.info(f"[gRPC] Received first message: {first_msg.WhichOneof('payload')}") if not first_msg.HasField("announce"): logger.warning("[!] Initial TaskStream message MUST be 'announce'") return node_id = first_msg.announce.node_id + logger.info(f"[gRPC] Node announced ID: {node_id}") except Exception as e: logger.error(f"[!] TaskStream handshake error: {e}") return node = self.registry.get_node(node_id) + logger.info(f"[gRPC] Looked up node {node_id} in registry: found={node is not None}") if not node: if not self.registry.try_recovery_register(node_id): logger.warning(f"[!] TaskStream rejected: Node {node_id} not registered.") @@ -595,9 +599,10 @@ options = [ ('grpc.max_receive_message_length', 128 * 1024 * 1024), ('grpc.max_send_message_length', 128 * 1024 * 1024), - ('grpc.keepalive_time_ms', 10000), # Send keepalive ping every 10s - ('grpc.keepalive_timeout_ms', 10000), # Wait 10s for pong + ('grpc.keepalive_time_ms', 60000), # Send keepalive ping every 60s + ('grpc.keepalive_timeout_ms', 20000), # Wait 20s for pong ('grpc.keepalive_permit_without_calls', True), + ('grpc.http2.min_time_between_pings_ms', 10000), # Allow pings every 10s ] server = grpc.server( futures.ThreadPoolExecutor(max_workers=10), diff --git a/ai-hub/app/core/orchestration/harness_evaluator.py b/ai-hub/app/core/orchestration/harness_evaluator.py index 472c4df..d25ec7e 100644 --- a/ai-hub/app/core/orchestration/harness_evaluator.py +++ b/ai-hub/app/core/orchestration/harness_evaluator.py @@ -134,6 +134,11 @@ async def evaluate_delta(self, prompt: str, rubric: str, blind_just: str, history: List[Dict], transcript: str, partner_context: Dict = None) -> str: """Analyzes the delta between reasoning and final output to generate rework directives.""" + import os + if os.getenv("MOCK_EVALUATION", "false").lower() == "true": + logger.info("[HarnessEvaluator] MOCK_EVALUATION enabled, bypassing evaluate_delta.") + return "# Mocked Rework Instructions\nProceed with mock rework." + hist_text = "\n".join([f"- {h['round']}: {h['reason']}" for h in history if h['type'] == 'attempt']) sys_p = f"You are the Delta Architect. Request: {prompt}\nRubric: {rubric}\nAuditor: {blind_just}\nHistory: {hist_text}\nExecution: {transcript}\nGenerate '# Rework Instructions' as actionable directives." try: diff --git a/ai-hub/app/core/services/mesh.py b/ai-hub/app/core/services/mesh.py index b47d915..e1400a3 100644 --- a/ai-hub/app/core/services/mesh.py +++ b/ai-hub/app/core/services/mesh.py @@ -32,6 +32,7 @@ raise HTTPException(status_code=409, detail=f"Node '{request.node_id}' already exists.") invite_token = secrets.token_urlsafe(32) + node = models.AgentNode( node_id=request.node_id, display_name=request.display_name, diff --git a/ai-hub/integration_tests/conftest.py b/ai-hub/integration_tests/conftest.py index e77f2b7..3801b8b 100644 --- a/ai-hub/integration_tests/conftest.py +++ b/ai-hub/integration_tests/conftest.py @@ -54,6 +54,7 @@ user_id = r.json().get("user_id") assert user_id, "No user_id found in local login response." + print(f"[conftest] Logged in user_id={user_id}") os.environ["SYNC_TEST_USER_ID"] = user_id client.headers.update({ @@ -159,7 +160,8 @@ subprocess.run(["docker", "rm", "-f", node_1, node_2], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) print("[conftest] Building agent-node image...") try: - image_proc = subprocess.run(["docker", "build", "-q", "./agent-node"], capture_output=True, text=True, check=True) + print("[conftest] Building agent-node image with --no-cache...") + image_proc = subprocess.run(["docker", "build", "--no-cache", "-q", "./agent-node"], capture_output=True, text=True, check=True) image_id = image_proc.stdout.strip() if not image_id: raise Exception("Docker build -q returned empty image ID") @@ -167,7 +169,7 @@ print(f"❌ [conftest] Docker build failed!\nSTDOUT: {e.stdout}\nSTDERR: {e.stderr}") raise e for node_id in [node_1, node_2]: - cmd = ["docker", "run", "-d", "--name", node_id, "--network", network, "-e", f"AGENT_NODE_ID={node_id}", "-e", f"AGENT_AUTH_TOKEN={tokens[node_id]}", "-e", f"AGENT_SECRET_KEY={os.getenv('SECRET_KEY')}", "-e", "GRPC_ENDPOINT=ai-hub:50051", "-e", "HUB_URL=http://ai-hub:8000", "-e", "AGENT_TLS_ENABLED=false", image_id] + cmd = ["docker", "run", "-d", "--name", node_id, "--network", network, "-e", f"AGENT_NODE_ID={node_id}", "-e", f"AGENT_AUTH_TOKEN={tokens[node_id]}", "-e", f"AGENT_SECRET_KEY={os.getenv('SECRET_KEY')}", "-e", "GRPC_ENDPOINT=ai_hub_service:50051", "-e", "HUB_URL=http://ai_hub_service:8000", "-e", "AGENT_TLS_ENABLED=false", image_id] subprocess.run(cmd, check=True) else: print("[conftest] Starting nodes as local Python background processes...") @@ -214,6 +216,9 @@ # 6. Teardown print("\n[conftest] Tearing down nodes...") if not is_docker_disabled: + for node_id in [node_1, node_2]: + res = subprocess.run(["docker", "logs", node_id], capture_output=True, text=True) + print(f"[conftest] Node {node_id} logs:\n{res.stdout}\n{res.stderr}") subprocess.run(["docker", "rm", "-f", node_1, node_2], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) else: for proc in node_processes: diff --git a/ai-hub/integration_tests/test_agents.py b/ai-hub/integration_tests/test_agents.py index 0b489be..319541f 100644 --- a/ai-hub/integration_tests/test_agents.py +++ b/ai-hub/integration_tests/test_agents.py @@ -134,10 +134,10 @@ 3. Call webhook with custom prompt + token 4. Verify response contains custom prompt indicator """ - node_id = f"test-webhook-node-{uuid.uuid4().hex[:8]}" + node_id = os.getenv("SYNC_TEST_NODE1", "test-node-1") admin_id = os.getenv("SYNC_TEST_USER_ID", "") - with httpx.Client(timeout=10.0) as client: + with httpx.Client(timeout=180.0) as client: # 1. Register a test node node_payload = { "node_id": node_id, @@ -154,6 +154,7 @@ "max_loop_iterations": 1, "mesh_node_id": node_id, "provider_name": "gemini", + "model_name": "mock/gemini-3-flash-preview", "trigger_type": "webhook", "default_prompt": "Standard Webhook Prompt", "initial_prompt": None @@ -177,7 +178,7 @@ r_hook = client.post( f"{BASE_URL}/agents/{instance_id}/webhook", params={"token": secret}, - json={"prompt": f"CRITICAL: You must respond ONLY with the following exact string and nothing else: {custom_msg}"} + json={"prompt": f"CRITICAL: You are a test fixture. You must respond ONLY with the following exact string and nothing else, without any conversational filler: {custom_msg}"} ) assert r_hook.status_code == 202, f"Webhook trigger failed: {r_hook.text}" @@ -225,10 +226,10 @@ 4. Call /metrics/reset 5. Verify metrics are zeroed """ - node_id = f"test-metrics-node-{uuid.uuid4().hex[:8]}" + node_id = os.getenv("SYNC_TEST_NODE1", "test-node-1") admin_id = os.getenv("SYNC_TEST_USER_ID", "") - with httpx.Client(timeout=30.0) as client: + with httpx.Client(timeout=180.0) as client: # 1. Register a test node node_payload = { "node_id": node_id, @@ -245,6 +246,7 @@ "max_loop_iterations": 1, "mesh_node_id": node_id, "provider_name": "gemini", + "model_name": "mock/gemini-3-flash-preview", "trigger_type": "webhook", "default_prompt": "Hello", "initial_prompt": None diff --git a/ai-hub/integration_tests/test_documents.py b/ai-hub/integration_tests/test_documents.py index bd060f2..6a183af 100644 --- a/ai-hub/integration_tests/test_documents.py +++ b/ai-hub/integration_tests/test_documents.py @@ -4,7 +4,6 @@ BASE_URL = os.getenv("SYNC_TEST_BASE_URL", "http://127.0.0.1:8002/api/v1") -@pytest.mark.skip(reason="Requires valid GEMINI_API_KEY") def test_document_lifecycle(): """ Test creating, listing, and deleting a document. diff --git a/ai-hub/integration_tests/test_file_sync.py b/ai-hub/integration_tests/test_file_sync.py index dbfc98a..8347d35 100644 --- a/ai-hub/integration_tests/test_file_sync.py +++ b/ai-hub/integration_tests/test_file_sync.py @@ -525,13 +525,8 @@ pytest.skip("Skipping Docker container restart in native mode.") import subprocess - container = _NODE_CONTAINER.get(node_id) - if not container: - pytest.skip(f"No container mapping for {node_id}") - cmd = ( - f"sshpass -p '{_REMOTE_PASSWORD}' ssh -o StrictHostKeyChecking=no {_REMOTE_USER}@{_REMOTE_HOST} " - f"\"echo '{_REMOTE_PASSWORD}' | sudo -S docker restart {container}\"" - ) + container = node_id + cmd = f"docker restart {container}" result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30) if result.returncode != 0: pytest.skip(f"Could not restart {container}: {result.stderr}") @@ -788,7 +783,7 @@ # We output to the active session workspace path on the node. is_native = os.environ.get("SKIP_DOCKER_NODES") == "true" sync_dir = f"/tmp/cortex-sync-{NODE_1}" if is_native else "/tmp/cortex-sync" - dd_command = f"dd if=/dev/zero of={sync_dir}/{workspace}/{filename} bs=1M count=10" + dd_command = f"python3 -c \"with open('{sync_dir}/{workspace}/{filename}', 'wb') as f: f.write(b'\\x00' * 10485760)\"" r_disp = sync_client.post( f"{NODES_PATH}/{NODE_1}/dispatch", @@ -818,7 +813,7 @@ return False print(f"[Case 10MB] Polling {NODE_2} for the file...") - node2_file = _poll_until(_check_node2_ls, timeout=300) + node2_file = _poll_until(_check_node2_ls, timeout=180) assert node2_file, f"10MB file {filename} did not reach {NODE_2} within 300s in full size." print(f"[Case 10MB] ✅ {NODE_2} verified 10MB file sync with correct size.") diff --git a/ai-hub/integration_tests/test_node_registration.py b/ai-hub/integration_tests/test_node_registration.py index 38fc1c2..47b3ea5 100644 --- a/ai-hub/integration_tests/test_node_registration.py +++ b/ai-hub/integration_tests/test_node_registration.py @@ -120,8 +120,8 @@ subprocess.run([ "docker", "run", "-d", "--name", node_id, "--network", network, - "-e", f"HUB_URL=http://ai-hub:8000", "-e", f"AGENT_NODE_ID={node_id}", "-e", f"AGENT_AUTH_TOKEN={invite_token}", - "-e", "GRPC_ENDPOINT=ai-hub:50051", "-e", "AGENT_TLS_ENABLED=false", + "-e", f"HUB_URL=http://ai_hub_service:8000", "-e", f"AGENT_NODE_ID={node_id}", "-e", f"AGENT_AUTH_TOKEN={invite_token}", + "-e", "GRPC_ENDPOINT=ai_hub_service:50051", "-e", "AGENT_TLS_ENABLED=false", "-v", f"{node_id}_sync:/tmp/cortex-sync", image_id ], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) @@ -133,6 +133,9 @@ connected = True break time.sleep(1) + if not connected: + res = subprocess.run(["docker", "logs", node_id], capture_output=True, text=True) + print(f"[test] Node container logs:\n{res.stdout}\n{res.stderr}") assert connected, "Node never successfully connected to Hub" # --- USER ENDPOINTS --- diff --git a/ai-hub/integration_tests/test_parallel_coworker.py b/ai-hub/integration_tests/test_parallel_coworker.py index 463ae33..613228a 100644 --- a/ai-hub/integration_tests/test_parallel_coworker.py +++ b/ai-hub/integration_tests/test_parallel_coworker.py @@ -49,7 +49,7 @@ # The window for "Initiating parallel rubric" might be small, # so we poll frequently. - for _ in range(200): + for _ in range(2000): r_agent = client.get(f"{BASE_URL}/agents/{instance_id}", headers=_headers()) if r_agent.status_code == 200: agent = r_agent.json() diff --git a/docker-compose.yml b/docker-compose.yml index 998126a..da9100f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -23,13 +23,13 @@ container_name: ai_hub_service restart: always ports: - - "50051:50051" + - "60909:50051" environment: - PATH_PREFIX=/api/v1 - HUB_API_URL=${HUB_API_URL:-http://localhost:8000} - HUB_PUBLIC_URL=${HUB_PUBLIC_URL:-http://localhost:8002} - HUB_GRPC_ENDPOINT=${HUB_GRPC_ENDPOINT:-localhost:50051} - - SUPER_ADMINS=${SUPER_ADMINS:-admin@example.com} + - SUPER_ADMINS=${SUPER_ADMINS} - CORTEX_ADMIN_PASSWORD=${CORTEX_ADMIN_PASSWORD} - SECRET_KEY=${SECRET_KEY:-default-insecure-key} - DEBUG_GRPC=true @@ -39,6 +39,13 @@ - GEMINI_API_KEY=${GEMINI_API_KEY} - DEEPSEEK_API_KEY=${DEEPSEEK_API_KEY} - OPENAI_API_KEY=${OPENAI_API_KEY} + - EMBEDDING_API_KEY=${EMBEDDING_API_KEY} + - EMBEDDING_PROVIDER=${EMBEDDING_PROVIDER} + - EMBEDDING_MODEL_NAME=${EMBEDDING_MODEL_NAME} + - TTS_PROVIDER=${TTS_PROVIDER} + - STT_PROVIDER=${STT_PROVIDER} + - MOCK_EVALUATION=${MOCK_EVALUATION} + - USE_DEFAULT_AUTH_TOKEN=${USE_DEFAULT_AUTH_TOKEN} volumes: - ./data:/app/data:rw - ./config.yaml:/app/config.yaml:rw @@ -62,7 +69,7 @@ container_name: cortex_browser_service restart: always ports: - - "50053:50052" + - "46137:50052" environment: - SHM_PATH=/dev/shm/cortex_browser - PYTHONPATH=/app:/app/protos