diff --git a/agent-node/src/agent_node/node.py b/agent-node/src/agent_node/node.py index 54089aa..c17bc44 100644 --- a/agent-node/src/agent_node/node.py +++ b/agent-node/src/agent_node/node.py @@ -404,9 +404,18 @@ self._send_response(task_id, agent_pb2.TaskResponse(task_id=task_id, stdout=result.get("stdout", ""), stderr=result.get("stderr", ""), status=result.get("status", 0), trace_id=trace_id)) def _send_response(self, task_id, response, status_override=None): - """Helper to queue a standard task response.""" + """Helper to queue a standard task response with memory safety caps.""" res = response or agent_pb2.TaskResponse(task_id=task_id) if status_override: res.status = status_override + + # Redundant safety cap (5MB) to protect gRPC/Queue memory + SAFETY_CAP = 5 * 1024 * 1024 + for field in ['stdout', 'stderr']: + val = getattr(res, field, "") + if len(val) > SAFETY_CAP: + logger.warning(f"Truncating excessive {field} ({len(val):,} bytes) for task {task_id}") + setattr(res, field, val[:SAFETY_CAP // 2] + "\n... [TRUNCATED DUE TO SIZE] ...\n" + val[-(SAFETY_CAP // 2):]) + self.task_queue.put(agent_pb2.ClientTaskMessage(task_response=res)) def _send_sync_ok(self, sid, tid, msg): diff --git a/agent-node/src/agent_node/skills/shell_bridge.py b/agent-node/src/agent_node/skills/shell_bridge.py index 43cb3b2..ea23015 100644 --- a/agent-node/src/agent_node/skills/shell_bridge.py +++ b/agent-node/src/agent_node/skills/shell_bridge.py @@ -32,6 +32,10 @@ # Patterns moved to core/regex_patterns.py self.PROMPT_PATTERNS = COMPILED_PROMPT_PATTERNS + # --- M8: Memory Safety Constants --- + self.MAX_RESULT_BUFFER = 10 * 1024 * 1024 # 10MB hard cap for final result capture + self.MAX_STDOUT_RETURN = 2 * 1024 * 1024 # 2MB cap for what we send back to Hub + # --- M7: Idle Session Reaper --- self.reaper_thread = threading.Thread(target=self._session_reaper, daemon=True, name="ShellReaper") self.reaper_thread.start() @@ -128,8 +132,16 @@ bracket_start_fence = f"[[1337;TaskStart;id={active_tid}]]" bracket_end_fence_prefix = f"[[1337;TaskEnd;id={active_tid};exit=" - sess["buffer_file"].write(decoded) - sess["buffer_file"].flush() + # M8: Enforce buffer hard cap to prevent memory exhaustion + if sess.get("buffer_size", 0) < self.MAX_RESULT_BUFFER: + sess["buffer_file"].write(decoded) + sess["buffer_file"].flush() + sess["buffer_size"] = sess.get("buffer_size", 0) + len(decoded) + elif not sess.get("buffer_truncated"): + sess["buffer_truncated"] = True + trunc_msg = f"\n\n[!!!] TERMINAL OUTPUT BUFFER EXCEEDED {self.MAX_RESULT_BUFFER // 1024**2}MB. RESPONSES WILL BE TRUNCATED.\n\n" + sess["buffer_file"].write(trunc_msg) + sess["buffer_file"].flush() sess["tail_buffer"] = (sess.get("tail_buffer", "") + decoded)[-16384:] clean_tail = ANSI_ESCAPE.sub('', sess["tail_buffer"]) @@ -146,7 +158,16 @@ bf = sess["buffer_file"] bf.seek(0) - clean_full_raw = ANSI_ESCAPE.sub('', bf.read()) + + # M8: Chunked processing to avoid loading multi-GB files into RAM + clean_full_raw = "" + while True: + chunk = bf.read(128 * 1024) # 128KB chunks + if not chunk: break + clean_full_raw += ANSI_ESCAPE.sub('', chunk) + # Safety break if string aggregation still grows too much (though unlikely with 10MB cap) + if len(clean_full_raw) > self.MAX_RESULT_BUFFER * 1.5: + break # Extract content between fences if active_start_fence in clean_full_raw: @@ -158,6 +179,12 @@ content = ECHO_CLEANUP_ANSI.sub('', content) content = ECHO_CLEANUP_BRACKET.sub('', content).strip() + # M8: Truncate very large stdout before sending to Hub + if len(content) > self.MAX_STDOUT_RETURN: + head_part = content[:self.MAX_STDOUT_RETURN // 2] + tail_part = content[-(self.MAX_STDOUT_RETURN // 2):] + content = head_part + f"\n\n[... output truncated: {len(content) - self.MAX_STDOUT_RETURN:,} bytes omitted ...]\n\n" + tail_part + sess["result"]["stdout"] = content sess["result"]["status"] = 0 if exit_code == 0 else 1 @@ -283,6 +310,8 @@ _tmp = tempfile.NamedTemporaryFile("w+", encoding="utf-8", prefix=f"cortex_task_{tid}_", delete=False, suffix=".tmp") sess["buffer_file"] = _tmp sess["buffer_file_path"] = _tmp.name + sess["buffer_size"] = 0 + sess["buffer_truncated"] = False sess["tail_buffer"] = "" sess["result"] = result_container sess["cancel_event"] = cancel_event diff --git a/ai-hub/app/core/orchestration/agent_loop.py b/ai-hub/app/core/orchestration/agent_loop.py index 8b7a9c0..1d93e00 100644 --- a/ai-hub/app/core/orchestration/agent_loop.py +++ b/ai-hub/app/core/orchestration/agent_loop.py @@ -138,12 +138,14 @@ from app.core.orchestration.harness_evaluator import HarnessEvaluator self.evaluator = HarnessEvaluator(self.db, self.agent_id, self.instance.mesh_node_id, workspace_id, provider, self.services) - # Update UI status self.instance.status = "starting" self.instance.evaluation_status = "📋 Co-Worker: Initiating parallel rubric & mission setup..." self.instance.current_rework_attempt = 0 await self._safe_commit() + # Deterministic setup of the .cortex state directory + await self.evaluator.initialize_cortex() + # Parallel rubric generation task mesh_node_id = str(self.instance.mesh_node_id) return asyncio.create_task(self._rubric_generator_bg(prompt, provider, workspace_id, mesh_node_id)) @@ -168,7 +170,6 @@ from app.core.orchestration.harness_evaluator import HarnessEvaluator try: bg_evaluator = HarnessEvaluator(bg_db, self.agent_id, mesh_node_id, workspace_id, provider, self.services) - await bg_evaluator.initialize_cortex() return await bg_evaluator.generate_rubric(prompt) except Exception as e: logger.error(f"[AgentExecutor] Rubric generation failed: {e}") diff --git a/ai-hub/integration_tests/test_coworker_flow.py b/ai-hub/integration_tests/test_coworker_flow.py index 792a0db..95d45f3 100644 --- a/ai-hub/integration_tests/test_coworker_flow.py +++ b/ai-hub/integration_tests/test_coworker_flow.py @@ -60,7 +60,7 @@ # 4. Use the /nodes/{id}/fs/ls API to verify the .cortex folder existence print("[test] Verifying .cortex mirror files...") files_found = False - for _ in range(30): + for _ in range(60): params = {"path": ".cortex", "session_id": sync_workspace_id} r_ls = client.get(f"{BASE_URL}/nodes/{node_id}/fs/ls", params=params, headers=_headers()) if r_ls.status_code == 200: diff --git a/scripts/move_orphans.exp b/scripts/move_orphans.exp new file mode 100755 index 0000000..c3b63d3 --- /dev/null +++ b/scripts/move_orphans.exp @@ -0,0 +1,44 @@ +#!/usr/bin/expect -f +set timeout 60 +set password "a6163484a" + +# 1. Create backup directory +spawn ssh -o StrictHostKeyChecking=no axieyangb@192.168.68.113 "mkdir -p /home/coder/project/cortex-hub/data/orphans_backup/" +expect { + "password:" { + send "$password\r" + exp_continue + } + eof +} + +# 2. Move files +set orphans { + "/home/coder/project/cortex-hub/ai-hub-dump.db" + "/home/coder/project/cortex-hub/ai-hub/ai-hub/test.db" + "/home/coder/project/cortex-hub/ai-hub/ai-hub/data/ai_hub.db" + "/home/coder/project/cortex-hub/ai-hub/app/db/data/ai_hub.db" + "/home/coder/project/cortex-hub/ai-hub/cortex_integration_test.db" + "/home/coder/project/cortex-hub/data/ai-hub-copy.db" +} + +foreach file $orphans { + spawn ssh -o StrictHostKeyChecking=no axieyangb@192.168.68.113 "mv $file /home/coder/project/cortex-hub/data/orphans_backup/" + expect { + "password:" { + send "$password\r" + exp_continue + } + eof + } +} + +# 3. Verify +spawn ssh -o StrictHostKeyChecking=no axieyangb@192.168.68.113 "ls -lh /home/coder/project/cortex-hub/data/orphans_backup/" +expect { + "password:" { + send "$password\r" + exp_continue + } + eof +} diff --git a/scripts/nas_exec.exp b/scripts/nas_exec.exp new file mode 100755 index 0000000..82d2eca --- /dev/null +++ b/scripts/nas_exec.exp @@ -0,0 +1,15 @@ +#!/usr/bin/expect -f +set timeout 300 +set password [lindex $argv 0] +set cmd [lindex $argv 1] +spawn ssh -o StrictHostKeyChecking=no -p 2222 axieyangb@192.168.68.90 "$cmd" +expect { + "password:" { + send "$password\r" + exp_continue + } + "axieyangb@synology-nas:*" { + # Already logged in or similar + } + eof +} diff --git a/scripts/remote_exec.exp b/scripts/remote_exec.exp new file mode 100755 index 0000000..b034fe5 --- /dev/null +++ b/scripts/remote_exec.exp @@ -0,0 +1,15 @@ +#!/usr/bin/expect -f +set timeout 300 +set password "a6163484a" +set user "axieyangb" +set host "192.168.68.113" +set cmd [lindex $argv 0] + +spawn ssh -o StrictHostKeyChecking=no $user@$host "$cmd" +expect { + "password:" { + send "$password\r" + exp_continue + } + eof +} diff --git a/scripts/remote_sync.exp b/scripts/remote_sync.exp new file mode 100755 index 0000000..b35b181 --- /dev/null +++ b/scripts/remote_sync.exp @@ -0,0 +1,16 @@ +#!/usr/bin/expect -f +set timeout 300 +set password "a6163484a" +set user "axieyangb" +set host "192.168.68.113" +set local_dir [lindex $argv 0] +set remote_path [lindex $argv 1] + +spawn rsync -avz --delete --exclude ".git" --exclude "node_modules" --exclude "__pycache__" -e "ssh -o StrictHostKeyChecking=no" $local_dir $user@$host:$remote_path +expect { + "password:" { + send "$password\r" + exp_continue + } + eof +} diff --git a/scripts/test_eval_loop.py b/scripts/test_eval_loop.py new file mode 100644 index 0000000..6ae2621 --- /dev/null +++ b/scripts/test_eval_loop.py @@ -0,0 +1,68 @@ +import requests +import time +import sys + +BASE_URL = "http://192.168.68.113:8000/api/v1" +HEADERS = {"X-User-ID": "585cd6e9-05e5-42ac-83a5-93029c6cb038"} + +def check(): + print("1. Fetching agents...") + resp = requests.get(f"{BASE_URL}/agents", headers=HEADERS) + if not resp.ok: + print(f"Failed to fetch agents: {resp.text}") + sys.exit(1) + + agents = resp.json() + if not agents: + print("No agents found for user.") + sys.exit(1) + + agent_id = agents[0]["id"] + print(f"2. Injecting test task to Agent {agent_id}...") + + inject_resp = requests.post(f"{BASE_URL}/agents/{agent_id}/run", json={"prompt": "list the current directory on your node please"}, headers=HEADERS) + if not inject_resp.ok: + print(f"Inject failed: {inject_resp.text}") + sys.exit(1) + + print("3. Polling for evaluation completion...") + start_time = time.time() + + last_status = "" + last_eval_status = "" + + while time.time() - start_time < 120: + a_resp = requests.get(f"{BASE_URL}/agents", headers=HEADERS) + if not a_resp.ok: continue + + alist = a_resp.json() + me = next((a for a in alist if a["id"] == agent_id), None) + if not me: continue + + curr_status = me.get("status", "unknown") + curr_eval = me.get("evaluation_status", "") + + if curr_status != last_status or curr_eval != last_eval_status: + print(f"[{int(time.time() - start_time)}s] Status: {curr_status} | Eval: {curr_eval}") + last_status = curr_status + last_eval_status = curr_eval + + if curr_status in ("idle", "error_suspended") and time.time() - start_time > 5: + # Check if eval was passed + print("\nFinal State Reached.") + print(f"Successful Runs: {me.get('successful_runs', 0)}") + print(f"Latest Score: {me.get('latest_quality_score', 0)}") + + # Print last message + if me.get("session_id"): + m_resp = requests.get(f"{BASE_URL}/sessions/{me['session_id']}/messages", headers=HEADERS) + if m_resp.ok: + msgs = m_resp.json().get("messages", []) + if msgs: + print(f"Last Assistant Msg: {msgs[-1]['content'][:200]}...") + break + + time.sleep(2) + +if __name__ == "__main__": + check() diff --git a/scripts/tmp_check_db.exp b/scripts/tmp_check_db.exp new file mode 100755 index 0000000..86f108e --- /dev/null +++ b/scripts/tmp_check_db.exp @@ -0,0 +1,11 @@ +#!/usr/bin/expect -f +set timeout 30 +set password "a6163484a" +spawn ssh -o StrictHostKeyChecking=no axieyangb@192.168.68.113 "ls -lh --time-style=long-iso /home/coder/project/cortex-hub/ai-hub-dump.db /home/coder/project/cortex-hub/ai-hub/ai-hub/test.db /home/coder/project/cortex-hub/ai-hub/ai-hub/data/ai_hub.db /home/coder/project/cortex-hub/ai-hub/app/db/data/ai_hub.db /home/coder/project/cortex-hub/ai-hub/cortex_integration_test.db /home/coder/project/cortex-hub/data/ai-hub-copy.db /home/coder/project/cortex-hub/data/ai-hub.db" +expect { + "password:" { + send "$password\r" + exp_continue + } + eof +} diff --git a/scripts/verify_coworker.sh b/scripts/verify_coworker.sh new file mode 100755 index 0000000..f532e22 --- /dev/null +++ b/scripts/verify_coworker.sh @@ -0,0 +1,27 @@ +#!/bin/bash +# Local verification for Co-Worker performance and non-blocking behavior + +cd ai-hub +export DB_MODE=sqlite +export LOCAL_DB_PATH="data/verify_ai_hub.db" +export LOG_LEVEL="INFO" +export SKIP_DOCKER_NODES=true +export TEST_HUB_URL="http://127.0.0.1:8001" +export TEST_GRPC_ENDPOINT="127.0.0.1:50051" +export SYNC_TEST_BASE_URL="http://127.0.0.1:8001/api/v1" + +# Step 1: Start Hub Server +echo "Starting local Hub server..." +python3 -m uvicorn app.main:app --host 127.0.0.1 --port 8001 & +HUB_PID=$! + +# Step 2: Start gRPC Node (Simulated) +# In a real integration test, conftest.py handles this if SKIP_DOCKER_NODES=true +# but we need to ensure dependencies are installed. + +# Step 3: Run pytest +echo "Running Co-Worker integration tests..." +python3 -m pytest -v integration_tests/test_coworker_flow.py + +# Cleanup +kill $HUB_PID