diff --git a/ai-hub/app/app.py b/ai-hub/app/app.py index 6bcb6d4..c8587c6 100644 --- a/ai-hub/app/app.py +++ b/ai-hub/app/app.py @@ -57,11 +57,11 @@ try: from app.core.grpc.services.grpc_server import serve_grpc registry = app.state.services.node_registry_service - server, orchestrator = serve_grpc(registry, port=50051) + server, orchestrator = serve_grpc(registry, port=settings.GRPC_PORT) app.state.grpc_server = server app.state.orchestrator = orchestrator app.state.services.with_service("orchestrator", orchestrator) - logger.info("[M6] Agent Orchestrator gRPC server started on port 50051.") + logger.info(f"[M6] Agent Orchestrator gRPC server started on port {settings.GRPC_PORT}.") # Launch periodic Ghost Mirror cleanup asyncio.create_task(_periodic_mirror_cleanup(orchestrator)) @@ -96,7 +96,7 @@ # --- Stop gRPC Orchestrator --- if hasattr(app.state, 'grpc_server'): logger.info("[M6] Stopping gRPC server...") - app.state.grpc_server.stop(0) + app.state.grpc_server.stop(5) # Access the vector_store from the application state to save it if hasattr(app.state, 'vector_store'): app.state.vector_store.save_index() @@ -269,7 +269,7 @@ services.with_service("node_registry_service", service=node_registry_service) # Initialize Browser Service Client - browser_endpoint = os.getenv("BROWSER_SERVICE_ENDPOINT", "browser-service:50052") + browser_endpoint = settings.BROWSER_SERVICE_ENDPOINT browser_service = BrowserServiceClient(endpoint=browser_endpoint) services.with_service("browser_service", service=browser_service) @@ -310,10 +310,9 @@ api_router = create_api_router(services=services) app.include_router(api_router, prefix="/api/v1") - cors_origins = os.getenv("CORS_ORIGINS", "http://localhost:8000,http://localhost:8080,http://localhost:3000").split(",") - hub_url = os.getenv("HUB_PUBLIC_URL") - if hub_url and hub_url not in cors_origins: - cors_origins.append(hub_url) + cors_origins = settings.CORS_ORIGINS + if settings.HUB_PUBLIC_URL and settings.HUB_PUBLIC_URL not in cors_origins: + cors_origins.append(settings.HUB_PUBLIC_URL) app.add_middleware( CORSMiddleware, diff --git a/ai-hub/app/config.py b/ai-hub/app/config.py index 55edefa..dee8679 100644 --- a/ai-hub/app/config.py +++ b/ai-hub/app/config.py @@ -159,6 +159,12 @@ "/app/data" # Hardcoded default for production Docker deployment self.SKILLS_DIR: str = os.path.join(self.DATA_DIR, "skills") + self.BROWSER_SERVICE_ENDPOINT: str = os.getenv("BROWSER_SERVICE_ENDPOINT", "browser-service:50052") + self.GRPC_PORT: int = int(os.getenv("GRPC_PORT", "50051")) + self.MIRROR_MAX_WORKERS: int = int(os.getenv("MIRROR_MAX_WORKERS", "32")) + self.CORS_ORIGINS: list[str] = (os.getenv("CORS_ORIGINS") or "http://localhost:8000,http://localhost:8080,http://localhost:3000").split(",") + self.HUB_PUBLIC_URL: Optional[str] = os.getenv("HUB_PUBLIC_URL") + # --- Database Settings --- self.DB_MODE: str = os.getenv("DB_MODE") or \ get_from_yaml(["database", "mode"]) or \ diff --git a/ai-hub/app/core/grpc/core/mirror.py b/ai-hub/app/core/grpc/core/mirror.py index 5e389c4..2ffe413 100644 --- a/ai-hub/app/core/grpc/core/mirror.py +++ b/ai-hub/app/core/grpc/core/mirror.py @@ -7,6 +7,7 @@ from typing import Dict, List from app.core.grpc.shared_core.ignore import CortexIgnore from app.protos import agent_pb2 +from app.config import settings class GhostMirrorManager: """Manages local server-side copies of node workspaces.""" @@ -387,7 +388,7 @@ return None files = [] - with ThreadPoolExecutor(max_workers=min(32, len(raw_files) or 1)) as executor: + with ThreadPoolExecutor(max_workers=min(settings.MIRROR_MAX_WORKERS, len(raw_files) or 1)) as executor: results = list(executor.map(_hash_file, raw_files)) files = [r for r in results if r is not None] @@ -482,7 +483,7 @@ return None needs_update = [] - with ThreadPoolExecutor(max_workers=32) as executor: + with ThreadPoolExecutor(max_workers=settings.MIRROR_MAX_WORKERS) as executor: drift_results = list(executor.map(_check_drift, remote_manifest.files)) needs_update = [d for d in drift_results if d is not None] diff --git a/ai-hub/app/core/orchestration/agent_loop.py b/ai-hub/app/core/orchestration/agent_loop.py index 3877e50..c78442a 100644 --- a/ai-hub/app/core/orchestration/agent_loop.py +++ b/ai-hub/app/core/orchestration/agent_loop.py @@ -1,10 +1,14 @@ import asyncio import time from datetime import datetime +import logging from sqlalchemy.orm import Session +from sqlalchemy.orm.exc import ObjectDeletedError from tenacity import retry, wait_exponential, stop_after_attempt import json +logger = logging.getLogger(__name__) + from app.db.session import SessionLocal from app.db.models.agent import AgentInstance, AgentTemplate from app.db.models import Message @@ -230,12 +234,16 @@ sync_token_count = 0 if not safe_commit(): return - if registry and instance.mesh_node_id: - registry.emit(instance.mesh_node_id, "reasoning", { - "content": new_content, - "agent_id": agent_id, - "session_id": instance.session_id - }) + try: + if registry and instance.mesh_node_id: + registry.emit(instance.mesh_node_id, "reasoning", { + "content": new_content, + "agent_id": agent_id, + "session_id": instance.session_id + }) + except ObjectDeletedError: + logger.info(f"Agent {agent_id} was deleted during execution. Stopping loop.") + return # Final flush if content_buffer: diff --git a/ai-hub/app/core/services/auth.py b/ai-hub/app/core/services/auth.py index b31d2be..e5c900f 100644 --- a/ai-hub/app/core/services/auth.py +++ b/ai-hub/app/core/services/auth.py @@ -46,6 +46,8 @@ token_response = await client.post(oidc_urls['token'], data=token_data, timeout=30.0) token_response.raise_for_status() response_json = token_response.json() + id_token = response_json.get("id_token") + except httpx.HTTPStatusError as e: logger.error(f"OIDC Token exchange failed with status {e.response.status_code}: {e.response.text}") raise HTTPException(status_code=500, detail=f"OIDC Token exchange failed: {e.response.text}") diff --git a/ai-hub/app/core/services/node_registry.py b/ai-hub/app/core/services/node_registry.py index a4630b4..60bb6e9 100644 --- a/ai-hub/app/core/services/node_registry.py +++ b/ai-hub/app/core/services/node_registry.py @@ -89,9 +89,9 @@ def _blocking_put(): try: - # 300s timeout provides absolute maximum backpressure for large file sync across slow VPNs - # preventing dropped chunks while still protecting against permanent deadlocks. - self.queue.put(item, block=True, timeout=300.0) + # Reduced timeout from 300s to 5s to avoid blocking gRPC threads for too long. + # 5s timeout provides backpressure while protecting against deadlocks. + self.queue.put(item, block=True, timeout=5.0) except queue.Full: logger.warning(f"[📋⚠️] Message dropped for {self.node_id}: outbound queue FULL. Node may be unresponsive.") except Exception as e: @@ -101,11 +101,8 @@ # Check if we are in an async loop (FastAPI context) loop = asyncio.get_running_loop() if loop.is_running(): - if self._registry_executor: - self._registry_executor.submit(_blocking_put) - else: - # Fallback to fire-and-forget thread if executor not yet ready - threading.Thread(target=_blocking_put, daemon=True).start() + # Run blocking_put in the default executor (unbounded) to avoid blocking event loop + loop.run_in_executor(None, _blocking_put) return except RuntimeError: pass # Not in async loop @@ -168,6 +165,8 @@ # Shared Hub-wide work executor to prevent thread-spawning leaks # max_queue_size=200 means we max buffer 200 file chunks = ~800MB before putting backpressure on sender self.executor = BoundedThreadPoolExecutor(max_workers=20, thread_name_prefix="RegistryWorker", max_queue_size=200) + # Separate unbounded executor for DB operations to avoid deadlocks on slow DB + self.db_executor = ThreadPoolExecutor(max_workers=5, thread_name_prefix="DBWorker") # ------------------------------------------------------------------ # # DB Helpers # @@ -308,7 +307,7 @@ self._nodes[node_id] = record # Persist to DB asynchronously to avoid blocking gRPC stream setup during NFS lag - self.executor.submit(self._db_upsert_node, node_id, user_id, metadata) + self.db_executor.submit(self._db_upsert_node, node_id, user_id, metadata) logger.info(f"[📋] NodeRegistry: Registered {node_id} (owner: {user_id}) | Stats enabled") self.emit(node_id, "node_online", record.to_dict()) @@ -367,7 +366,7 @@ if stats.get("cpu_usage_percent", 0) > 0 or stats.get("memory_usage_percent", 0) > 0: logger.debug(f"[💓] Heartbeat {node_id}: CPU {stats.get('cpu_usage_percent')}% | MEM {stats.get('memory_usage_percent')}%") # Persist heartbeat timestamp to DB asynchronously - self.executor.submit(self._db_update_heartbeat, node_id) + self.db_executor.submit(self._db_update_heartbeat, node_id) # Emit heartbeat event to live UI self.emit(node_id, "heartbeat", stats) diff --git a/ai-hub/integration_tests/test_coworker_flow.py b/ai-hub/integration_tests/test_coworker_flow.py index 111e42d..32deecf 100644 --- a/ai-hub/integration_tests/test_coworker_flow.py +++ b/ai-hub/integration_tests/test_coworker_flow.py @@ -89,7 +89,7 @@ deploy_payload = { "name": "SC-3 Limit Agent", "system_prompt": "You are a test agent. Create a simple hello world python script.", - "max_loop_iterations": 5, + "max_loop_iterations": 2, "mesh_node_id": node_id, "provider_name": "gemini", "model_name": "gemini-1.5-flash", @@ -119,7 +119,7 @@ print(f"\n[test] Waiting for agent {instance_id} to reach 'failed_limit' status...") failed_limit = False latest_score = None - for _ in range(120): # 240s timeout + for _ in range(180): # 360s timeout r_agents = client.get(f"{BASE_URL}/agents", headers=_headers()) if r_agents.status_code == 200: agents = r_agents.json() @@ -163,7 +163,7 @@ "trigger_type": "webhook", "co_worker_quality_gate": True, "max_rework_attempts": 3, - "rework_threshold": 100, + "rework_threshold": 101, "default_prompt": "Create app.py that prints hello, but deliberately make a syntax error on your first try.", } r_deploy = client.post(f"{BASE_URL}/agents/deploy", json=deploy_payload, headers=_headers()) diff --git a/ai-hub/tests/core/services/test_auth.py b/ai-hub/tests/core/services/test_auth.py new file mode 100644 index 0000000..c0bfc47 --- /dev/null +++ b/ai-hub/tests/core/services/test_auth.py @@ -0,0 +1,52 @@ +import pytest +from unittest.mock import AsyncMock, MagicMock, patch +from app.core.services.auth import AuthService +from app.config import settings + +@pytest.mark.asyncio +async def test_handle_callback_success(): + services = MagicMock() + # Mock user_service.save_user + services.user_service.save_user.return_value = ("user_123", True) + + auth_service = AuthService(services) + + # Mock db + db = MagicMock() + + # Mock OIDC responses + mock_token_resp = MagicMock() + mock_token_resp.json.return_value = {"id_token": "mock_id_token"} + mock_token_resp.raise_for_status = MagicMock() + + mock_jwks_resp = MagicMock() + mock_jwks_resp.json.return_value = {"keys": [{"kid": "key_1", "kty": "RSA"}]} + mock_jwks_resp.raise_for_status = MagicMock() + + async def mock_post(*args, **kwargs): + return mock_token_resp + + async def mock_get(*args, **kwargs): + return mock_jwks_resp + + # We need to mock AsyncClient instance methods + mock_client = MagicMock() + mock_client.post = AsyncMock(side_effect=mock_post) + mock_client.get = AsyncMock(side_effect=mock_get) + + # Mock __aenter__ and __aexit__ for context manager + mock_client.__aenter__.return_value = mock_client + mock_client.__aexit__.return_value = None + + with patch('httpx.AsyncClient', return_value=mock_client): + with patch('jwt.get_unverified_header', return_value={"kid": "key_1"}): + with patch('jwt.PyJWKSet.from_dict') as mock_jwk_set: + mock_key = MagicMock() + mock_key.key = "publicKey" + mock_jwk_set.return_value.__getitem__.return_value = mock_key + + with patch('jwt.decode', return_value={"sub": "oidc_123", "email": "user@example.com"}): + result = await auth_service.handle_callback(code="test_code", db=db) + + assert result["user_id"] == "user_123" + assert result["linked"] is True diff --git a/deployment/test-nodes/docker-compose.test-nodes.yml b/deployment/test-nodes/docker-compose.test-nodes.yml index a0d8f98..c9cba88 100644 --- a/deployment/test-nodes/docker-compose.test-nodes.yml +++ b/deployment/test-nodes/docker-compose.test-nodes.yml @@ -1,10 +1,11 @@ -# docker-compose.test-nodes.yml -# Internal testing setup for multiple Agent Nodes (e.g. Test Node 1, Test Node 2). -# This is NOT meant for end-user deployment. +networks: + default: + name: cortex-hub_default + external: true services: test-node-1: build: - context: ./agent-node + context: ../../agent-node container_name: cortex-test-1 environment: - AGENT_NODE_ID=test-node-1 @@ -28,7 +29,7 @@ test-node-2: build: - context: ./agent-node + context: ../../agent-node container_name: cortex-test-2 environment: - AGENT_NODE_ID=test-node-2 diff --git a/docker-compose.yml b/docker-compose.yml index 4a83fc4..3b43bf2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -58,6 +58,7 @@ - "50053:50052" environment: - SHM_PATH=/dev/shm/cortex_browser + - PYTHONPATH=/app:/app/protos volumes: - ./browser-service:/app - browser_shm:/dev/shm:rw @@ -69,6 +70,7 @@ cpus: '2.0' memory: 2G + # Generic named volume using local driver volumes: ai_hub_data: diff --git a/run_integration_tests.sh b/run_integration_tests.sh index 9bf7bb6..a964fae 100755 --- a/run_integration_tests.sh +++ b/run_integration_tests.sh @@ -35,20 +35,20 @@ # Check if docker daemon is reachable (i.e., not inside DevContainer without DIND) or if specifically skipped DOCKER_AVAILABLE=false -if docker info >/dev/null 2>&1 && [ "$SKIP_DOCKER_NODES" != "true" ]; then +if docker info >/dev/null 2>&1 && [ "$SKIP_DOCKER_NODES" != "true" ] && [ "$NATIVE_MODE" != "true" ]; then DOCKER_AVAILABLE=true export SKIP_DOCKER_NODES=false else - echo "Docker skipping or not reachable (likely Native mode). Switching to Native Python mode..." + echo "Docker skipping, not reachable, or Native mode requested. Skipping Docker nodes..." export SKIP_DOCKER_NODES=true - export SYNC_TEST_BASE_URL="http://127.0.0.1:8000/api/v1" + DOCKER_AVAILABLE=false fi if [ "$NO_REBUILD" = true ] && [ "$IS_RUNNING" = true ]; then echo "Service is already running and --no-rebuild flag provided." echo "Skipping rebuild and starting tests directly..." else - if [ "$DOCKER_AVAILABLE" = true ]; then + if [ "$DOCKER_AVAILABLE" = true ] && [ "$NATIVE_MODE" = false ]; then # 2. Clean start: purge the database / volumes echo "Purging database and old containers..." docker compose down -v --remove-orphans @@ -77,7 +77,7 @@ export ENVIRONMENT="development" export PATH_PREFIX="/api/v1" # Purge local test database - rm -f ai-hub/test.db + rm -f /tmp/cortex_hub_test.db mkdir -p ai-hub/data pkill -f uvicorn || true @@ -89,14 +89,15 @@ elif [ -d "/app/ai-hub" ]; then cd /app/ai-hub fi - DATA_DIR=./data DATABASE_URL=sqlite:///./test.db AGENT_NODE_SRC_DIR=../agent-node SKILLS_SRC_DIR=../skills uvicorn app.main:app --host 0.0.0.0 --port 8000 > native_hub.log 2>&1 & + source ../venv/bin/activate || source venv/bin/activate || source /tmp/venv2/bin/activate || source /tmp/venv/bin/activate || echo "No venv found for uvicorn" + PYTHONDONTWRITEBYTECODE=1 GRPC_PORT=50055 DATA_DIR=./data DATABASE_URL=sqlite:////tmp/cortex_hub_test.db AGENT_NODE_SRC_DIR=../agent-node SKILLS_SRC_DIR=../skills uvicorn app.main:app --host 0.0.0.0 --port 8010 > native_hub.log 2>&1 & HUB_PID=$! cd - > /dev/null # Wait for healthy echo "Waiting for AI Hub to be ready..." sleep 2 - until curl -I -s http://localhost:8000/api/v1/users/login/local | grep -q "405"; do + until curl -I -s http://localhost:8010/api/v1/users/login/local | grep -q "405"; do echo "Waiting for AI Hub Backend natively..." sleep 2 done @@ -109,12 +110,13 @@ echo "==========================================" echo " EXECUTING E2E INTEGRATION SUITE " echo "==========================================" -if [ "$NATIVE_MODE" = true ] || [ "$NATIVE" = 1 ]; then - export SYNC_TEST_BASE_URL="http://127.0.0.1:8000/api/v1" - export TEST_HUB_URL="http://127.0.0.1:8000" - export TEST_GRPC_ENDPOINT="127.0.0.1:50051" +if [ "$NATIVE_MODE" = true ] || [ "$NATIVE" = 1 ] || [ "$DOCKER_AVAILABLE" = false ]; then + export SYNC_TEST_BASE_URL="http://127.0.0.1:8010/api/v1" + export TEST_HUB_URL="http://127.0.0.1:8010" + export TEST_GRPC_ENDPOINT="127.0.0.1:50055" fi -source /tmp/venv/bin/activate || echo "No venv found, hoping pytest is in global PATH." +source /tmp/venv2/bin/activate || source venv/bin/activate || source /tmp/venv/bin/activate || echo "No venv found, hoping pytest is in global PATH." + TEST_TARGETS=() for arg in "$@"; do @@ -127,7 +129,9 @@ TEST_TARGETS=("ai-hub/integration_tests/") fi -pytest "${TEST_TARGETS[@]}" -v + PYTHONPATH=ai-hub python3 -m pytest "${TEST_TARGETS[@]}" -v + + if [ "$NO_REBUILD" = false ] || [ "$IS_RUNNING" = false ]; then echo "=========================================="