diff --git a/ai-hub/app/core/pipelines/rag_pipeline.py b/ai-hub/app/core/pipelines/rag_pipeline.py index db0723a..b840698 100644 --- a/ai-hub/app/core/pipelines/rag_pipeline.py +++ b/ai-hub/app/core/pipelines/rag_pipeline.py @@ -165,62 +165,61 @@ # If no tools, this is the final answer for this forward pass. return - # Parallel dispatch logic for tools - processed_tool_calls = list(tool_calls_map.values()) - - # Reconstruct the tool call list and message object for the next turn - assistant_msg = { - "role": "assistant", - "content": accumulated_content or None, - "tool_calls": processed_tool_calls - } - if accumulated_reasoning: - assistant_msg["reasoning_content"] = accumulated_reasoning - - messages.append(assistant_msg) - - # Dispatch all tool calls simultaneously - tool_tasks = [] - for tc in processed_tool_calls: - func_name = tc.function.name - func_args = {} - try: - import json - func_args = json.loads(tc.function.arguments) - except: pass + # 3. Parallel dispatch logic for tools + processed_tool_calls = list(tool_calls_map.values()) + + # Reconstruct the tool call list and message object for the next turn + assistant_msg = { + "role": "assistant", + "content": accumulated_content or None, + "tool_calls": processed_tool_calls + } + if accumulated_reasoning: + assistant_msg["reasoning_content"] = accumulated_reasoning + + messages.append(assistant_msg) + + # A. Dispatch all tool calls simultaneously + tool_tasks = [] + for tc in processed_tool_calls: + func_name = tc.function.name + func_args = {} + try: + import json + func_args = json.loads(tc.function.arguments) + except: pass - # --- M7 Parallel PTY Optimization --- - # If the tool is terminal control and no session is provided, - # use a unique session ID per SUBAGENT task to avoid PTY SERIALIZATION. - if func_name == "mesh_terminal_control" and "session_id" not in func_args: - func_args["session_id"] = f"subagent-{tc.id[:8]}" + # --- M7 Parallel PTY Optimization --- + # If the tool is terminal control and no session is provided, + # use a unique session ID per SUBAGENT task to avoid PTY SERIALIZATION. + if func_name == "mesh_terminal_control" and "session_id" not in func_args: + func_args["session_id"] = f"subagent-{tc.id[:8]}" - yield {"type": "status", "content": f"AI decided to use tool: {func_name}"} - logging.info(f"[🔧] Agent calling tool (PARALLEL): {func_name} with {func_args}") - + yield {"type": "status", "content": f"AI decided to use tool: {func_name}"} + logging.info(f"[🔧] Agent calling tool (PARALLEL): {func_name} with {func_args}") - if tool_service: - # Notify UI about tool execution start - yield {"type": "tool_start", "name": func_name, "args": func_args} - - # Create an async task for each tool call - tool_tasks.append(asyncio.create_task( - tool_service.call_tool(func_name, func_args, db=db, user_id=user_id) - )) - else: - # Treat as failure immediately if no service - tool_tasks.append(asyncio.sleep(0, result={"success": False, "error": "Tool service not available"})) + if tool_service: + # Notify UI about tool execution start + yield {"type": "tool_start", "name": func_name, "args": func_args} + + # Create an async task for each tool call + tool_tasks.append(asyncio.create_task( + tool_service.call_tool(func_name, func_args, db=db, user_id=user_id) + )) + else: + # Treat as failure immediately if no service + tool_tasks.append(asyncio.sleep(0, result={"success": False, "error": "Tool service not available"})) - - # 3. HEARTBEAT WAIT: Wait for all sub-agent tasks to fulfill + # B. HEARTBEAT WAIT: Wait for all sub-agent tasks to fulfill in parallel wait_start = time.time() - while not all(t.done() for t in tool_tasks): - elapsed = int(time.time() - wait_start) - # This status fulfills the requirement: "internal wait seconds (showing this wait seconds in chat)" - yield {"type": "status", "content": f"Waiting for nodes result... ({elapsed}s)"} - await asyncio.sleep(1) + if tool_tasks: + while not all(t.done() for t in tool_tasks): + elapsed = int(time.time() - wait_start) + # This status fulfills the requirement: "internal wait seconds (showing this wait seconds in chat)" + yield {"type": "status", "content": f"Waiting for nodes result... ({elapsed}s)"} + await asyncio.sleep(1) - # 4. Collect results and populate history turn + # C. Collect results and populate history turn for i, task in enumerate(tool_tasks): tc = processed_tool_calls[i] func_name = tc.function.name @@ -236,8 +235,8 @@ "content": json.dumps(result) if isinstance(result, dict) else str(result) }) - yield {"type": "error", "content": "Agent loop reached maximum turns without a final response."} - + # --- Loop finished without return --- + yield {"type": "error", "content": "Agent loop reached maximum turns (5) without a final response."} def _build_prompt(self, context, history, question):