diff --git a/ai-hub/app/core/orchestration/agent_loop.py b/ai-hub/app/core/orchestration/agent_loop.py index 2dfae19..ce5a73d 100644 --- a/ai-hub/app/core/orchestration/agent_loop.py +++ b/ai-hub/app/core/orchestration/agent_loop.py @@ -53,10 +53,46 @@ db.commit() return + # Configuration for Rework Loop + co_worker_enabled = getattr(template, "co_worker_quality_gate", False) + rework_threshold = getattr(template, "rework_threshold", 80) + max_rework_attempts = getattr(template, "max_rework_attempts", 3) + + # --- Phase 1: Pre-Execution Initialization (Harness Mirror) --- + from app.core.orchestration.harness_evaluator import HarnessEvaluator + evaluator = None + rubric_content = "" + + if co_worker_enabled: + from app.core.providers.factory import get_llm_provider + # For Evaluation, we use the same provider/model as the main task for consistency + # Load provider settings + from app.db.models.session import Session as SessionModel + agent_session = db.query(SessionModel).filter(SessionModel.id == instance.session_id).first() + provider_name = getattr(agent_session, "provider_name", None) + if not provider_name and user_service: + from app.config import settings + provider_name = settings.ACTIVE_LLM_PROVIDER + + # We need api_key etc. For brevity we fallback to default if not found + # In real impl, we'd replicate the llm_provider resolution logic from RAGService + eval_provider = get_llm_provider(provider_name) + + evaluator = HarnessEvaluator(db, agent_id, instance.mesh_node_id, instance.session.sync_workspace_id if instance.session else str(instance.session_id), eval_provider, rag_service) + await evaluator.initialize_cortex() + rubric_content = await evaluator.generate_rubric(prompt) + + # Update status + instance.evaluation_status = "evaluating" + instance.current_rework_attempt = 0 + db.commit() + + max_iterations = template.max_loop_iterations or 20 session_id = instance.session_id # Load session to check configured assigned provider + # Reloading to ensure latest state after DB writes from app.db.models.session import Session as SessionModel agent_session = db.query(SessionModel).filter(SessionModel.id == session_id).first() @@ -68,10 +104,7 @@ provider_name = settings.ACTIVE_LLM_PROVIDER # Area 4.2: Hippocampus (Scratchpad) Idempotency Check - # We skip this for simple chat prompts, but for autonomous loops its vital if session_id: - # In a real impl, we'd check if .cortex_memory_scratchpad.txt exists on node - # For MVP, we just log the intention as per Task 4.2 print(f"[AgentExecutor] Task 4.2: Idempotency check for {agent_id} in {instance.current_workspace_jail or '/tmp'}") if getattr(agent_session, "auto_clear_history", False): @@ -81,139 +114,204 @@ print(f"[AgentExecutor] Starting run for {agent_id} with provider '{provider_name}'. Prompt length: {len(prompt)}") - loop_start = time.time() - # Iterate the RAG architecture to solve the prompt - try: - final_tool_counts = {} - final_input_tokens = 0 - final_output_tokens = 0 - final_answer = "" - - instance = db.query(AgentInstance).filter(AgentInstance.id == agent_id).first() - instance.last_reasoning = "" - db.commit() + current_prompt = prompt + current_attempt = 0 + final_result = None - # Buffers for real-time streaming to avoid O(N^2) regex and DB hammering - content_buffer = "" - last_db_sync_time = time.time() - sync_token_count = 0 + # --- MAIN REWORK LOOP --- + while True: + loop_start = time.time() + try: + final_tool_counts = {} + final_input_tokens = 0 + final_output_tokens = 0 + final_answer = "" + + instance = db.query(AgentInstance).filter(AgentInstance.id == agent_id).first() + instance.last_reasoning = "" + db.commit() - # We consume the generator completely to let it execute all tools and generate reasoning - async for event in rag_service.chat_with_rag( - db=db, - session_id=session_id, - prompt=prompt, - provider_name=provider_name, - load_faiss_retriever=False, - user_service=user_service - ): - if event.get("type") == "finish": - final_tool_counts = event.get("tool_counts", {}) - final_input_tokens = event.get("input_tokens", 0) - final_output_tokens = event.get("output_tokens", 0) - final_answer = event.get("full_answer", "") - elif event.get("type") == "token_counted": - usage = event.get("usage", {}) - final_input_tokens += usage.get("prompt_tokens", 0) - final_output_tokens += usage.get("completion_tokens", 0) - elif event.get("type") in ("reasoning", "content"): - # Stream real-time thoughts for UI observability - new_content = event.get("content", "") - content_buffer += new_content - sync_token_count += 1 + # Buffers for real-time streaming to avoid O(N^2) regex and DB hammering + content_buffer = "" + last_db_sync_time = time.time() + sync_token_count = 0 + + async for event in rag_service.chat_with_rag( + db=db, + session_id=session_id, + prompt=current_prompt, + provider_name=provider_name, + load_faiss_retriever=False, + user_service=user_service + ): + if event.get("type") == "finish": + final_tool_counts = event.get("tool_counts", {}) + final_input_tokens = event.get("input_tokens", 0) + final_output_tokens = event.get("output_tokens", 0) + final_answer = event.get("full_answer", "") + elif event.get("type") == "token_counted": + usage = event.get("usage", {}) + final_input_tokens += usage.get("prompt_tokens", 0) + final_output_tokens += usage.get("completion_tokens", 0) + elif event.get("type") in ("reasoning", "content"): + new_content = event.get("content", "") + content_buffer += new_content + sync_token_count += 1 + + now = time.time() + if now - last_db_sync_time > 2.0 or sync_token_count >= 50: + instance = db.query(AgentInstance).filter(AgentInstance.id == agent_id).first() + instance.last_reasoning = (instance.last_reasoning or "") + content_buffer + content_buffer = "" + last_db_sync_time = now + sync_token_count = 0 + db.commit() + + registry = getattr(rag_service, "node_registry_service", None) + if registry and instance.mesh_node_id: + registry.emit(instance.mesh_node_id, "reasoning", { + "content": new_content, + "agent_id": agent_id, + "session_id": instance.session_id + }) + + # Final flush + if content_buffer: + instance = db.query(AgentInstance).filter(AgentInstance.id == agent_id).first() + instance.last_reasoning = (instance.last_reasoning or "") + content_buffer + db.commit() + content_buffer = "" + + # Execution complete + instance = db.query(AgentInstance).filter(AgentInstance.id == agent_id).first() + # 4.3: Post-processing to compress boilerplate from reasoning + final_reasoning = AgentExecutor._compress_reasoning(instance.last_reasoning or "") + + final_result = { + "response": final_answer, + "reasoning": final_reasoning + } + + # --- EVALUATION PHASE (Co-Worker Loop) --- + if evaluator and final_answer: + instance.evaluation_status = "evaluating" + db.commit() - # Debounce DB writes: every 2 seconds or 50 tokens - now = time.time() - if now - last_db_sync_time > 2.0 or sync_token_count >= 50: - instance = db.query(AgentInstance).filter(AgentInstance.id == agent_id).first() - # IMPORTANT: No longer calling _compress_reasoning in the loop. - # We just append to last_reasoning to maintain high performance - # and avoid blocking the event loop on every 50 tokens. - instance.last_reasoning = (instance.last_reasoning or "") + content_buffer - content_buffer = "" - last_db_sync_time = now - sync_token_count = 0 - db.commit() - - # Forward to Swarm Registry so the Node List/Swarm Control UI sees it + # 📡 Emit status update to Swarm registry registry = getattr(rag_service, "node_registry_service", None) if registry and instance.mesh_node_id: - registry.emit(instance.mesh_node_id, "reasoning", { - "content": new_content, - "agent_id": agent_id, - "session_id": instance.session_id - }) - - # Final flush of buffered content/reasoning before finishing - if content_buffer: - instance = db.query(AgentInstance).filter(AgentInstance.id == agent_id).first() - instance.last_reasoning = (instance.last_reasoning or "") + content_buffer - db.commit() - content_buffer = "" + registry.emit(instance.mesh_node_id, "status_update", {"evaluation_status": "evaluating"}) - # Execution complete - instance = db.query(AgentInstance).filter(AgentInstance.id == agent_id).first() - if instance.status == "active": - instance.status = "idle" # Completed work - instance.successful_runs = (instance.successful_runs or 0) + 1 - - elapsed = int(time.time() - loop_start) - instance.total_running_time_seconds = (instance.total_running_time_seconds or 0) + elapsed - instance.total_input_tokens = (instance.total_input_tokens or 0) + final_input_tokens - instance.total_output_tokens = (instance.total_output_tokens or 0) + final_output_tokens - - if final_tool_counts: - # Deep copy the current counts to ensure SQLAlchemy detects change - import copy - current_counts = copy.deepcopy(instance.tool_call_counts or {}) + # Stage 2A: Blind Rating + blind_eval = await evaluator.evaluate_blind(prompt, rubric_content) + score = blind_eval.get("score", 0) + justification = blind_eval.get("justification", "") - for k, v in final_tool_counts.items(): - # Upgrade legacy single-integer counts into rich metric dicts - if k in current_counts and isinstance(current_counts[k], int): - current_counts[k] = {"calls": current_counts[k], "successes": current_counts[k], "failures": 0} - if not isinstance(v, dict): - v = {"calls": v, "successes": v, "failures": 0} - if k not in current_counts: - current_counts[k] = {"calls": 0, "successes": 0, "failures": 0} + # Update instance with latest score + instance = db.query(AgentInstance).filter(AgentInstance.id == agent_id).first() + instance.latest_quality_score = score + db.commit() - current_counts[k]["calls"] += v.get("calls", 0) - current_counts[k]["successes"] += v.get("successes", 0) - current_counts[k]["failures"] += v.get("failures", 0) - - instance.tool_call_counts = current_counts - # Explicitly mark as modified for JSON column persistence - from sqlalchemy.orm.attributes import flag_modified - flag_modified(instance, "tool_call_counts") - - # 4.3: Post-processing to compress boilerplate from reasoning - final_reasoning = AgentExecutor._compress_reasoning(instance.last_reasoning or "") + # Check Threshold + if score >= rework_threshold: + instance.evaluation_status = "passed" + db.commit() + await evaluator.log_round(current_attempt + 1, score, "Passed quality gate.") + break # Success! + + # Check Rework Limits + if current_attempt >= max_rework_attempts: + instance.evaluation_status = "failed_limit" + db.commit() + await evaluator.log_round(current_attempt + 1, score, "Failed quality gate after max attempts.") + break # No more reworks + + # Stage 2B: Delta Analysis & Directive Generation + instance.evaluation_status = "reworking" + instance.current_rework_attempt = current_attempt + 1 + db.commit() + + # We pass the history log and transcript (last_reasoning) to the delta analyst + # We need to fetch the history log first + cmd_res = await evaluator.assistant.dispatch_single(instance.mesh_node_id, "cat .cortex/history.log", session_id=evaluator.sync_workspace_id) + hist_log = [] + try: hist_log = json.loads(cmd_res.get("stdout", "[]")) + except: pass + + directive_feedback = await evaluator.evaluate_delta(prompt, rubric_content, justification, hist_log, final_reasoning) + + # Log this round + await evaluator.log_round(current_attempt + 1, score, justification) + + # Trigger next iteration + current_prompt = f"### CO-WORKER DIRECTIVE (ATTEMPT {current_attempt + 1})\n\n{directive_feedback}\n\nProceed with rework." + current_attempt += 1 + + if registry and instance.mesh_node_id: + registry.emit(instance.mesh_node_id, "status_update", { + "evaluation_status": "reworking", + "attempt": current_attempt, + "score": score + }) + + print(f"[AgentExecutor] Triggering Rework Round {current_attempt} for agent {agent_id} (Score: {score})") + continue # Start next loop iteration + else: + break # No co-worker or no answer + + except Exception as e: + import traceback + print(f"[AgentExecutor] RAG attempt failed for {agent_id}: {e}") + print(traceback.format_exc()) + instance = db.query(AgentInstance).filter(AgentInstance.id == agent_id).first() + if instance: + instance.status = "error_suspended" + instance.last_error = str(e) + db.commit() + return { + "status": "error", + "response": f"Execution failed: {str(e)}", + "reasoning": instance.last_reasoning if instance else "" + } + + # Final loop cleanup & Stats + instance = db.query(AgentInstance).filter(AgentInstance.id == agent_id).first() + if instance.status == "active": + instance.status = "idle" # Completed work + instance.successful_runs = (instance.successful_runs or 0) + 1 - # Clear reasoning as the task is now complete - instance.last_reasoning = None - db.commit() + elapsed = int(time.time() - loop_start) + instance.total_running_time_seconds = (instance.total_running_time_seconds or 0) + elapsed + instance.total_input_tokens = (instance.total_input_tokens or 0) + (final_input_tokens if final_result else 0) + instance.total_output_tokens = (instance.total_output_tokens or 0) + (final_output_tokens if final_result else 0) + + if final_tool_counts: + import copy + current_counts = copy.deepcopy(instance.tool_call_counts or {}) + for k, v in final_tool_counts.items(): + if k in current_counts and isinstance(current_counts[k], int): + current_counts[k] = {"calls": current_counts[k], "successes": current_counts[k], "failures": 0} + if not isinstance(v, dict): + v = {"calls": v, "successes": v, "failures": 0} + if k not in current_counts: + current_counts[k] = {"calls": 0, "successes": 0, "failures": 0} + current_counts[k]["calls"] += v.get("calls", 0) + current_counts[k]["successes"] += v.get("successes", 0) + current_counts[k]["failures"] += v.get("failures", 0) + instance.tool_call_counts = current_counts + from sqlalchemy.orm.attributes import flag_modified + flag_modified(instance, "tool_call_counts") + + # Clear reasoning as the task is now complete + instance.last_reasoning = None + db.commit() - return { - "response": final_answer, - "reasoning": final_reasoning - } - - except Exception as e: - import traceback - print(f"[AgentExecutor] RAG execution failed for {agent_id}: {e}") - print(traceback.format_exc()) - instance = db.query(AgentInstance).filter(AgentInstance.id == agent_id).first() - if instance: - instance.status = "error_suspended" - instance.last_error = str(e) - db.commit() - return { - "status": "error", - "response": f"Execution failed: {str(e)}", - "reasoning": instance.last_reasoning if instance else "" - } + return final_result except Exception as e: + import traceback print(f"[AgentExecutor] Unhandled loop error: {e}") + print(traceback.format_exc()) instance = db.query(AgentInstance).filter(AgentInstance.id == agent_id).first() if instance: instance.status = "error_suspended" diff --git a/ai-hub/app/core/orchestration/harness_evaluator.py b/ai-hub/app/core/orchestration/harness_evaluator.py new file mode 100644 index 0000000..97ad806 --- /dev/null +++ b/ai-hub/app/core/orchestration/harness_evaluator.py @@ -0,0 +1,258 @@ +import logging +import json +import time +from typing import Dict, Any, List, Optional +import os + +from app.db.models.agent import AgentInstance, AgentTemplate +from app.db import models +from app.core.orchestration import Architect + +logger = logging.getLogger(__name__) + +class HarnessEvaluator: + def __init__(self, db, agent_id, mesh_node_id, sync_workspace_id, llm_provider, rag_service): + self.db = db + self.agent_id = agent_id + self.mesh_node_id = mesh_node_id + self.sync_workspace_id = sync_workspace_id + self.llm_provider = llm_provider + self.rag_service = rag_service + + # Resolve orchestrator assistant from rag_service internals + self.orchestrator = None + self.assistant = None + + tool_service = getattr(rag_service, "tool_service", None) + if tool_service and hasattr(tool_service, "_services"): + self.orchestrator = getattr(tool_service._services, "orchestrator", None) + if self.orchestrator: + self.assistant = self.orchestrator.assistant + + async def initialize_cortex(self): + """Creates .cortex/ directory and an empty history.log in the agent's jail.""" + if not self.assistant or not self.mesh_node_id: + logger.warning(f"[HarnessEvaluator] Assistant or mesh_node_id missing for agent {self.agent_id}; skipping .cortex init.") + return + + logger.info(f"[HarnessEvaluator] Initializing .cortex/ for agent {self.agent_id} on {self.mesh_node_id}") + + # Ensure directory exists + await self.assistant.dispatch_single( + self.mesh_node_id, + "mkdir -p .cortex", + session_id=self.sync_workspace_id + ) + + # Initialize history.log and clean feedback.md for the new round + await self.assistant.write( + self.mesh_node_id, + ".cortex/history.log", + "[]", + session_id=self.sync_workspace_id + ) + await self.assistant.write( + self.mesh_node_id, + ".cortex/feedback.md", + "# New Round Started\n", + session_id=self.sync_workspace_id + ) + + async def generate_rubric(self, initial_prompt: str): + """Stage 1: Pre-Execution. Generate a task-specific rubric.md.""" + if not self.assistant: return None + + system_prompt = """You are a Quality Control Architect. +Your task is to analyze a user request and generate a specific Evaluation Rubric in Markdown. + +The Rubric MUST include: +1. **Expectations**: A checklist of specific results the agent should satisfy for this specific task. +2. **Core Rubric**: A quantitative scoring guide (0-100) across these dimensions: + - **Quality**: Tone, structure, and readability. + - **Accuracy**: Completeness and technical correctness. + - **Efficiency (Non-AI Alike)**: Adherence to edicts: No gold-plating, no unnecessary refactors, no redundant docstrings. + +Format as a clean Markdown file with exactly one '# Evaluation Rubric' title.""" + + user_prompt = f"Target Prompt: \"{initial_prompt}\"\n\nConstruct the request-specific rubric.md now." + + try: + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt} + ] + # Use acompletion directly + prediction = await self.llm_provider.acompletion(messages=messages, stream=False) + rubric_content = prediction.choices[0].message.content + + # Save to node + await self.assistant.write( + self.mesh_node_id, + ".cortex/rubric.md", + rubric_content, + session_id=self.sync_workspace_id + ) + return rubric_content + except Exception as e: + logger.error(f"[HarnessEvaluator] Rubric generation failed: {e}") + return None + + async def evaluate_blind(self, initial_prompt: str, rubric_content: str) -> Dict[str, Any]: + """Stage 2A: The Blind Rating (Absolute Objectivity). Uses tools to inspect results.""" + + system_prompt = f"""You are the Co-Worker Evaluator (Blind Auditor). +Your goal is to perform a BLIND evaluation of the Main Agent's work. +You have NO knowledge of previous rounds or internal reasoning. You only see the goal and the result. + +Original Request: {initial_prompt} + +Rubric: +{rubric_content} + +EDICTS: +- Don't add features/refactors beyond what was asked. +- Don't add docstrings/comments not explicitly requested. +- Don't create helpers/utilities for one-time operations. + +MISSION: +Explore the workspace using your tools (ls, cat, etc.) to verify truth. +THEN assign a numerical score (0-100) and a brief justification. +Your final response MUST end with exactly: FINAL_SCORE: [number]""" + + return await self._run_evaluator_agent(system_prompt, "Perform Blind Evaluation of the result state.") + + async def evaluate_delta(self, initial_prompt: str, rubric_content: str, blind_justification: str, history_log: List[Dict[str, Any]], transcript: str) -> str: + """Stage 2B: The Delta Analysis. Identifies gaps by comparing result to reasoning transcript.""" + + historical_context = "Historical Rework Instructions (Gap Context):\n" + for entry in history_log: + historical_context += f"- Attempt {entry['round']}: {entry.get('reason', 'N/A')}\n" + + system_prompt = f"""You are the Co-Worker Quality Architect (Delta Analyst). +The Blind Evaluator assigned a score based solely on the file result, but now we must bridge the gap. +You see the FULL mental transcript of how the Main Agent reached this state. + +Original Request: {initial_prompt} + +Rubric: +{rubric_content} + +Blind Evaluation Justification: +{blind_justification} + +{historical_context} + +Main Agent Execution Transcript: +--- +{transcript} +--- + +MISSION: +1. Identify the 'Delta' (what was intended vs. what was actually committed). +2. Spot 'Gold-Plating' (did they do extra work not asked for?). +3. Format feedback as ACTIONABLE COMMANDS (Directives). +Example: "Directive: Refactor auth.py:L10 to use settings.API_KEY instead of hardcoded string." + +Format as Markdown. Start with '# Rework Instructions'.""" + + try: + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": "Analyze the delta and generate the Directive-based rework feedback."} + ] + prediction = await self.llm_provider.acompletion(messages=messages, stream=False) + feedback = prediction.choices[0].message.content + + # Save to node + await self.assistant.write( + self.mesh_node_id, + ".cortex/feedback.md", + feedback, + session_id=self.sync_workspace_id + ) + return feedback + except Exception as e: + logger.error(f"[HarnessEvaluator] Delta analysis failed: {e}") + return f"Error during Delta Analysis: {str(e)}" + + async def _run_evaluator_agent(self, system_prompt: str, user_request: str) -> Dict[str, Any]: + """Utility to run a context-stripped Architect loop for verification.""" + architect = Architect() + + # Resolve tools for the evaluator (same as parent session) + tool_service = getattr(self.rag_service, "tool_service", None) + tools = [] + user_id = "agent-system" + if tool_service: + instance = self.db.query(AgentInstance).filter(AgentInstance.id == self.agent_id).first() + if instance and instance.session: + user_id = instance.session.user_id + tools = tool_service.get_available_tools(self.db, user_id, feature="agent_harness", session_id=instance.session_id) + + final_answer = "" + score = 0 + + # Run Architect with a strictly limited profile to ensure snappy evaluation + # We pass no history to ensure "Blind" context + async for event in architect.run( + question=user_request, + context_chunks=[], + history=[], + llm_provider=self.llm_provider, + tool_service=tool_service, + tools=tools, + db=self.db, + user_id=user_id, + sync_workspace_id=self.sync_workspace_id, + session_id=None, # Evaluation shouldn't append to session Message table + feature_name="agent_harness", + session_override=system_prompt + ): + if event["type"] == "content": + final_answer += event["content"] + elif event["type"] == "error": + logger.error(f"[HarnessEvaluator] Sub-evaluator fault: {event['content']}") + + import re + score_match = re.search(r"FINAL_SCORE:\s*(\d+)", final_answer) + if score_match: + try: score = int(score_match.group(1)) + except: score = 0 + + return { + "score": score, + "justification": final_answer + } + + async def log_round(self, round_num: int, score: int, reason: str): + """Append-only record-keeping in history.log.""" + if not self.assistant: return + try: + # Atomic Read-Modify-Write for the JSON log on the node + cmd_res = await self.assistant.dispatch_single( + self.mesh_node_id, + "cat .cortex/history.log", + session_id=self.sync_workspace_id + ) + history = [] + if cmd_res.get("status") == "SUCCESS": + try: + history = json.loads(cmd_res.get("stdout", "[]")) + except: + history = [] + + history.append({ + "round": round_num, + "score": score, + "reason": reason[:200] + "..." if len(reason) > 200 else reason, + "timestamp": time.time() + }) + + await self.assistant.write( + self.mesh_node_id, + ".cortex/history.log", + json.dumps(history, indent=2), + session_id=self.sync_workspace_id + ) + except Exception as e: + logger.error(f"[HarnessEvaluator] Failed to log round: {e}")