diff --git a/.agent/workflows/deploy_to_production.md b/.agent/workflows/deploy_to_production.md index 62c56ac..a79db0e 100644 --- a/.agent/workflows/deploy_to_production.md +++ b/.agent/workflows/deploy_to_production.md @@ -79,8 +79,8 @@ ``` - **Triggering Client Updates**: To force all active agents (like the Mac Mini) to pull new code or skills, bump the version in `/app/agent-node/VERSION`. - **Auto-Update**: Once deployed, agents check for version mismatches every 5 minutes and will automatically re-bootstrap themselves if a newer version is detected on the Hub. -5. **Migrate & Rebuild**: Overwrite production files and run `bash scripts/local_rebuild.sh` on the server. -6. **Post-Deployment Health Check**: Perform a backend connectivity check (Python Trick). Only use `/frontend_tester` as a last resort if UI behavior is suspect. +6. **Migrate & Rebuild**: Overwrite production files and run `bash scripts/local_rebuild.sh` on the server. Use `--fast` to skip container rebuilds for pure code/Python updates. +7. **Post-Deployment Health Check**: Perform a backend connectivity check (Python Trick). Only use `/frontend_tester` as a last resort if UI behavior is suspect. ### Automated Command ```bash @@ -89,6 +89,9 @@ # If you want to deploy quickly without rebuilding the standalone agent binaries: bash /app/scripts/remote_deploy.sh --skip-binaries + +# ULTRA-FAST: Skip container rebuilds AND binary builds (best for code-only tweaks): +bash /app/scripts/remote_deploy.sh --fast ``` --- diff --git a/agent-node/scripts/build_binaries.sh b/agent-node/scripts/build_binaries.sh index 2754e8b..cfe1fb0 100755 --- a/agent-node/scripts/build_binaries.sh +++ b/agent-node/scripts/build_binaries.sh @@ -7,24 +7,63 @@ echo "šŸ—ļø Setting up builder..." cd "$(dirname "$0")/../.." +# Default to current machine's architecture +ARCH_AMD64=false +ARCH_ARM64=false +BUILD_ALL=false + +for arg in "$@"; do + if [ "$arg" == "--all" ]; then + BUILD_ALL=true + ARCH_AMD64=true + ARCH_ARM64=true + fi + if [ "$arg" == "--arch=amd64" ]; then + ARCH_AMD64=true + fi + if [ "$arg" == "--arch=arm64" ]; then + ARCH_ARM64=true + fi +done + +# If no arch is specified, use current host arch +if [ "$BUILD_ALL" = false ] && [ "$ARCH_AMD64" = false ] && [ "$ARCH_ARM64" = false ]; then + CURRENT_ARCH=$(uname -m) + if [ "$CURRENT_ARCH" == "x86_64" ]; then + ARCH_AMD64=true + echo "Detected x86_64, building only AMD64 binary..." + elif [ "$CURRENT_ARCH" == "aarch64" ] || [ "$CURRENT_ARCH" == "arm64" ]; then + ARCH_ARM64=true + echo "Detected ARM64, building only ARM64 binary..." + else + echo "Unknown architecture $CURRENT_ARCH, building both..." + ARCH_AMD64=true + ARCH_ARM64=true + fi +fi + # Ensure buildx is available docker buildx create --use --name cortex-builder || true -echo "šŸ”Ø Building Linux AMD64 Binary..." -docker buildx build \ - --platform linux/amd64 \ - --build-arg ARCH=amd64 \ - -f agent-node/Dockerfile.binary \ - --output type=local,dest=agent-node/dist/linux_amd64 \ - . +if [ "$ARCH_AMD64" = true ]; then + echo "šŸ”Ø Building Linux AMD64 Binary..." + docker buildx build \ + --platform linux/amd64 \ + --build-arg ARCH=amd64 \ + -f agent-node/Dockerfile.binary \ + --output type=local,dest=agent-node/dist/linux_amd64 \ + . +fi -echo "šŸ”Ø Building Linux ARM64 Binary..." -docker buildx build \ - --platform linux/arm64 \ - --build-arg ARCH=arm64 \ - -f agent-node/Dockerfile.binary \ - --output type=local,dest=agent-node/dist/linux_arm64 \ - . +if [ "$ARCH_ARM64" = true ]; then + echo "šŸ”Ø Building Linux ARM64 Binary..." + docker buildx build \ + --platform linux/arm64 \ + --build-arg ARCH=arm64 \ + -f agent-node/Dockerfile.binary \ + --output type=local,dest=agent-node/dist/linux_arm64 \ + . +fi echo "āœ… Build complete! Binaries are in agent-node/dist/" chmod +x agent-node/dist/*/cortex-agent || true diff --git a/ai-hub/app/config.py b/ai-hub/app/config.py index 2c04bf0..f208010 100644 --- a/ai-hub/app/config.py +++ b/ai-hub/app/config.py @@ -253,7 +253,7 @@ # Legacy back-compat fields for the active one active_tts = self.TTS_PROVIDERS.get(self.TTS_PROVIDER, {}) self.TTS_VOICE_NAME: str = active_tts.get("voice_name") or "Kore" - self.TTS_MODEL_NAME: str = active_tts.get("model_name") or "gemini-2.5-flash-preview-tts" + self.TTS_MODEL_NAME: str = active_tts.get("model_name") # No hardcoded default self.TTS_API_KEY: Optional[str] = active_tts.get("api_key") or self.GEMINI_API_KEY # 4. Resolve STT (Agnostic) @@ -271,7 +271,7 @@ (list(self.STT_PROVIDERS.keys())[0] if self.STT_PROVIDERS else "google_gemini") active_stt = self.STT_PROVIDERS.get(self.STT_PROVIDER, {}) - self.STT_MODEL_NAME: str = active_stt.get("model_name") or "gemini-2.5-flash" + self.STT_MODEL_NAME: str = active_stt.get("model_name") # No hardcoded default self.STT_API_KEY: Optional[str] = active_stt.get("api_key") or \ (self.OPENAI_API_KEY if self.STT_PROVIDER == "openai" else self.GEMINI_API_KEY) diff --git a/ai-hub/app/core/_regex.py b/ai-hub/app/core/_regex.py new file mode 100644 index 0000000..ee95aeb --- /dev/null +++ b/ai-hub/app/core/_regex.py @@ -0,0 +1,5 @@ +import re + +# Pre-compiled ANSI Escape regex for reuse across Rag, Registry, and Architects +# This prevents O(N * tokens) compilation overhead during intensive streaming +ANSI_ESCAPE = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') diff --git a/ai-hub/app/core/grpc/services/grpc_server.py b/ai-hub/app/core/grpc/services/grpc_server.py index b8d299c..a5b96c9 100644 --- a/ai-hub/app/core/grpc/services/grpc_server.py +++ b/ai-hub/app/core/grpc/services/grpc_server.py @@ -68,37 +68,39 @@ from app.db.session import get_db_session from app.db.models import Session + # M6: Use a local list to release DB lock ASAP + active_ids = [] with get_db_session() as db: # Fetch all workspace IDs for non-archived sessions active_sessions = db.query(Session).filter( Session.is_archived == False, Session.sync_workspace_id.isnot(None) ).all() - active_ids = [s.sync_workspace_id for s in active_sessions] + + # Performance: Purge and broadcast outside the DB session + if self.mirror: + self.mirror.purge_orphaned(active_ids) - if self.mirror: - self.mirror.purge_orphaned(active_ids) - - # M6: Distributed Lock Scavenging - for sid in active_ids: - self.mirror.purge_stale_locks(sid, ttl=60) # 1-minute TTL + # M6: Distributed Lock Scavenging + for sid in active_ids: + self.mirror.purge_stale_locks(sid, ttl=60) # 1-minute TTL - # M6: Broadcast CLEANUP to all connected nodes to do local cleanup - live_nodes = self.registry.list_nodes() - for _node in live_nodes: - try: - _node.send_message(agent_pb2.ServerTaskMessage( - file_sync=agent_pb2.FileSyncMessage( - session_id="system", - control=agent_pb2.SyncControl( - action=agent_pb2.SyncControl.CLEANUP, - request_paths=active_ids - ) + # M6: Broadcast CLEANUP to all connected nodes to do local cleanup + live_nodes = self.registry.list_nodes() + for _node in live_nodes: + try: + _node.send_message(agent_pb2.ServerTaskMessage( + file_sync=agent_pb2.FileSyncMessage( + session_id="system", + control=agent_pb2.SyncControl( + action=agent_pb2.SyncControl.CLEANUP, + request_paths=active_ids ) - ), priority=0) - except Exception as e: - print(f"[šŸ“āš ļø] Failed to broadcast CLEANUP to {_node.node_id}: {e}") + ) + ), priority=0) + except Exception as e: + print(f"[šŸ“āš ļø] Failed to broadcast CLEANUP to {_node.node_id}: {e}") except Exception as e: print(f"[šŸ“āš ļø] Mirror Cleanup Thread Error: {e}") diff --git a/ai-hub/app/core/orchestration/agent_loop.py b/ai-hub/app/core/orchestration/agent_loop.py index 53b4e4a..2dfae19 100644 --- a/ai-hub/app/core/orchestration/agent_loop.py +++ b/ai-hub/app/core/orchestration/agent_loop.py @@ -93,6 +93,11 @@ instance.last_reasoning = "" db.commit() + # Buffers for real-time streaming to avoid O(N^2) regex and DB hammering + content_buffer = "" + last_db_sync_time = time.time() + sync_token_count = 0 + # We consume the generator completely to let it execute all tools and generate reasoning async for event in rag_service.chat_with_rag( db=db, @@ -113,24 +118,39 @@ final_output_tokens += usage.get("completion_tokens", 0) elif event.get("type") in ("reasoning", "content"): # Stream real-time thoughts for UI observability - instance = db.query(AgentInstance).filter(AgentInstance.id == agent_id).first() - content = event.get("content", "") - current_reasoning = instance.last_reasoning or "" - # Apply live compression to ensure 'inplace' feeling for turn headers & boilerplate - instance.last_reasoning = AgentExecutor._compress_reasoning(current_reasoning + content) + new_content = event.get("content", "") + content_buffer += new_content + sync_token_count += 1 + # Debounce DB writes: every 2 seconds or 50 tokens + now = time.time() + if now - last_db_sync_time > 2.0 or sync_token_count >= 50: + instance = db.query(AgentInstance).filter(AgentInstance.id == agent_id).first() + # IMPORTANT: No longer calling _compress_reasoning in the loop. + # We just append to last_reasoning to maintain high performance + # and avoid blocking the event loop on every 50 tokens. + instance.last_reasoning = (instance.last_reasoning or "") + content_buffer + content_buffer = "" + last_db_sync_time = now + sync_token_count = 0 + db.commit() + # Forward to Swarm Registry so the Node List/Swarm Control UI sees it registry = getattr(rag_service, "node_registry_service", None) if registry and instance.mesh_node_id: registry.emit(instance.mesh_node_id, "reasoning", { - "content": content, + "content": new_content, "agent_id": agent_id, "session_id": instance.session_id }) - - # Persist reasoning immediately for real-time dashboard observability - db.commit() + # Final flush of buffered content/reasoning before finishing + if content_buffer: + instance = db.query(AgentInstance).filter(AgentInstance.id == agent_id).first() + instance.last_reasoning = (instance.last_reasoning or "") + content_buffer + db.commit() + content_buffer = "" + # Execution complete instance = db.query(AgentInstance).filter(AgentInstance.id == agent_id).first() if instance.status == "active": @@ -210,25 +230,36 @@ @staticmethod def _compress_reasoning(text: str) -> str: - """Deduplicates turn markers and aggressively collapses boilerplate.""" - import re + """Deduplicates turn markers and collapses boilerplate using high-perf string logic.""" if not text: return "" - # Remove consecutive 'Strategy' boilerplate and turn headers - # Use a non-greedy approach to collapse blocks of redundant turn signaling - boilerplate = r"Strategy:\s*Executing\s*orchestrated\s*tasks\s*in\s*progress\.*" - - # 1. Deduplicate consecutive turn headers (e.g. Turn 1 followed by Turn 2) - turn_pattern = r"(?s)(\n\n---\nšŸ›°ļø \*\*\[Turn \d+\] thinking\.\.\.\*\*\n\n)(?=\n\n---\nšŸ›°ļø \*\*\[Turn \d+\] thinking\.\.\.\*\*\n\n)" - text = re.sub(turn_pattern, "", text) - - # 2. Collapse sequences of Turn + Boilerplate Strategy (common during autonomous waits) - # Keeps only the last one in a sequence of turns that did nothing. - text = re.sub(rf"(?s)(\n\n---\nšŸ›°ļø \*\*\[Turn \d+\] thinking\.\.\.\*\*\n\n{boilerplate}\n?)+", - r"\1", text) - - # 3. Final cleanup of repeating Strategy chunks that might have survived - text = re.sub(rf"({boilerplate}\s*)+", r"Strategy: Executing orchestrated tasks in progress...\n", text) - - return text.strip() + lines = text.splitlines(keepends=True) + cleaned = [] + last_important_line = "" + + # Collapse repeating Turn headers and Strategy boilerplate without regex + for line in lines: + l_strip = line.strip() + if not l_strip: + cleaned.append(line) + continue + + l_lower = l_strip.lower() + + # Deduplicate the system thinking marker + is_turn_marker = "šŸ›°ļø" in line and "[turn" in l_lower and "thinking" in l_lower + if is_turn_marker: + if "šŸ›°ļø" in last_important_line and "[turn" in last_important_line.lower(): + continue + + # Collapse Strategy boilerplate + is_strategy = "strategy:" in l_lower and "executing orchestrated tasks" in l_lower + if is_strategy: + if "strategy:" in last_important_line.lower() and "executing orchestrated tasks" in last_important_line.lower(): + continue + + cleaned.append(line) + last_important_line = l_strip + + return "".join(cleaned).strip() diff --git a/ai-hub/app/core/orchestration/architect.py b/ai-hub/app/core/orchestration/architect.py index 168ed07..5f5eca9 100644 --- a/ai-hub/app/core/orchestration/architect.py +++ b/ai-hub/app/core/orchestration/architect.py @@ -2,6 +2,7 @@ import queue import time from typing import List, Dict, Any, Optional + from app.db import models from .memory import ContextManager from .stream import StreamProcessor @@ -11,7 +12,7 @@ class Architect: """ The Master-Architect Orchestrator. - Decomposed successor to RagPipeline. + Decomposed successor to RagPipeline. 100% REGEX-FREE. """ def __init__(self, context_manager: Optional[ContextManager] = None): @@ -46,9 +47,6 @@ # DEBUG: Log the total prompt size to detect bloated contexts total_chars = sum(len(m.get("content", "") or "") for m in messages) logging.info(f"[Architect] Starting autonomous loop (Turn 1). Prompt Size: {total_chars} chars across {len(messages)} messages.") - for i, m in enumerate(messages): - content = m.get("content", "") or "" - logging.info(f"[Architect] Msg {i} ({m['role']}): {content[:500]}...") # 2. Setup Mesh Observation mesh_bridge = queue.Queue() @@ -64,12 +62,10 @@ profile = get_profile(feature_name) self.stream = StreamProcessor(profile=profile) turn = 0 - session_start_time = time.time() try: while turn < profile.autonomous_limit: turn += 1 - turn_start_time = time.time() self.stream.reset_turn() # A. Cancellation / Memory check @@ -84,8 +80,11 @@ yield {"type": "status", "content": f"Turn {turn}: architecting next step..."} # C. LLM Call + logging.info(f"[Architect] Turn {turn}: Calling LLM (Messages: {len(messages)})") + llm_start_time = time.time() prediction = await self._call_llm(llm_provider, messages, tools) if not prediction: + logging.error(f"[Architect] Turn {turn}: LLM Provider returned None") yield {"type": "error", "content": "LLM Provider failed to generate a response."} return @@ -98,9 +97,6 @@ chunk_count = 0 async for chunk in prediction: chunk_count += 1 - if chunk_count == 1: - logging.info(f"[Architect] First chunk received after {time.time() - turn_start_time:.2f}s") - if getattr(chunk, "usage", None): yield {"type": "token_counted", "usage": getattr(chunk, "usage").model_dump() if hasattr(getattr(chunk, "usage"), "model_dump") else getattr(chunk, "usage")} @@ -124,93 +120,68 @@ if not profile.buffer_content: yield event else: - # In buffered mode (voice), we yield reasoning immediately but hold content if event["type"] == "reasoning": yield event - # Tool delta accumulation self._accumulate_tool_calls(delta, tool_calls_map) # End Stream & Flush buffers - # IMPORTANT: In buffered (voice) mode, do NOT yield end_stream events directly. - # They get accumulated into accumulated_content and then yielded ONCE by the - # buffer_content block below. Yielding here AND there causes the response to repeat twice. async for event in self.stream.end_stream(turn): if event["type"] == "content": accumulated_content += event["content"] if event["type"] == "reasoning": accumulated_reasoning += event["content"] - # Only forward events immediately in non-buffered (chat) mode if not profile.buffer_content: yield event - # Heartbeat Fallback: If no content was sent but tools are being called, force a bridge sentence + # Heartbeat Fallback if tool_calls_map and not self.stream.header_sent and not profile.silent_stream: fallback_text = f"Strategy: Executing orchestrated tasks in progress..." async for event in self.stream.process_chunk(fallback_text, turn): if event["type"] == "content": accumulated_content += event["content"] yield event - # Ensure the fallback (and potential header) is flushed async for event in self.stream.end_stream(turn): if event["type"] == "content": accumulated_content += event["content"] yield event # Branch: Tools or Exit? if not tool_calls_map: - # 1. Truncation Guard: If the model hit a length limit, we MUST continue if finish_reason == "length": - yield {"type": "reasoning", "content": "\n> **āš ļø System Note:** Response was truncated by output limits. Prompting for continuation...\n"} - messages.append({"role": "user", "content": "You were cut off mid-sentence/action. Please continue immediately from where you left off. If you were trying to write a large file, use a terminal command instead of mesh_file_explorer."}) + yield {"type": "reasoning", "content": "\n> **āš ļø System Note:** Response was truncated. Prompting continuation...\n"} + messages.append({"role": "user", "content": "You were cut off. Please continue."}) continue - # 2. Final Turn: Yield the accumulated content if it was empty if not accumulated_content.strip(): - import re - # Priority 1: If we have reasoning but no content, the LLM likely put the answer in reasoning. - # This is common for O-series models. We use a cleaned version of the reasoning. if accumulated_reasoning.strip(): - # Clean the reasoning slightly for readability as an answer - fallback = accumulated_reasoning.strip() - # If it looks like raw internal chatter, we might still want to prepend a note + # Clean reasoning without regex + fallback = self.stream._apply_turn_header(accumulated_reasoning.strip()) + if not fallback.strip(): + fallback = "I've completed the requested task. Please check the thought trace for details." else: fallback = "I've completed the requested task. Please check the thought trace for details." - # In voice mode (buffered), we apply specialized stripping - if profile.buffer_content: - yield {"type": "content", "content": fallback} - else: - # In chat mode, just send the fallback if no content ever came through - yield {"type": "content", "content": fallback} + yield {"type": "content", "content": fallback} elif profile.buffer_content: - # Standard buffered yield — yield the full accumulated content ONCE - import re - content_to_yield = accumulated_content - for pattern in profile.strip_headers: - content_to_yield = re.sub(pattern, "", content_to_yield, flags=re.IGNORECASE) + content_to_yield = self.stream._apply_turn_header(accumulated_content) yield {"type": "content", "content": content_to_yield.strip()} - # Watchdog Check if safety.should_activate_watchdog(self._get_assistant(tool_service), sync_workspace_id): yield {"type": "status", "content": "Watchdog: tasks remain. continuing..."} - messages.append({"role": "user", "content": "WATCHDOG: .ai_todo.md has open items. Please continue until all are marked [COMPLETED]."}) + messages.append({"role": "user", "content": "WATCHDOG: .ai_todo.md is not empty. Proceed."}) continue - # Natural Exit yield {"type": "status", "content": ""} return # Natural exit # F. Execute Tools processed_tc = list(tool_calls_map.values()) if safety.detect_loop(processed_tc): - yield {"type": "reasoning", "content": "\n> **🚨 Loop Guard:** Repetitive plan detected. Retrying with warnings.\n"} - messages.append({"role": "user", "content": "LOOP GUARD: You are stuck. Change strategy."}) + yield {"type": "reasoning", "content": "\n> **🚨 Loop Guard:** Loop detected.\n"} + messages.append({"role": "user", "content": "STUCK: Change strategy."}) continue - yield {"type": "status", "content": f"Architect analysis complete. Dispatching {len(processed_tc)} tools..."} + yield {"type": "status", "content": f"Dispatching {len(processed_tc)} tools..."} - # Append assistant message to history messages.append(self._format_assistant_msg(accumulated_content, accumulated_reasoning, processed_tc)) - # Run parallel execution - # Run parallel execution executor = ToolExecutor( tool_service, user_id, db, sync_workspace_id, session_id, provider_name=getattr(llm_provider, "provider_name", None) @@ -220,14 +191,12 @@ messages.append(event) else: yield event - - pass except Exception as e: import traceback logging.error(f"[Architect] CRITICAL FAULT:\n{traceback.format_exc()}") yield {"type": "status", "content": "Fatal Orchestration Error"} - yield {"type": "content", "content": f"\n\n> **🚨 Core Orchestrator Fault:** A fatal exception abruptly halted the execution loop: `{str(e)}`\n\nThe AI's process terminated unexpectedly. You may need to retry your request or check server logs."} + yield {"type": "content", "content": f"\n\n> **🚨 Core Orchestrator Fault:** `{str(e)}`"} finally: if registry and user_id: registry.unsubscribe_user(user_id, mesh_bridge) @@ -248,16 +217,19 @@ def _update_turn_marker(self, messages, turn): if messages[0]["role"] == "system": - base = messages[0]["content"].split("[System:")[0].strip() - messages[0]["content"] = base + f"\n\n[System: Current Turn: {turn}]" + content = messages[0]["content"] + marker_anchor = "[System: Current Turn:" + if marker_anchor in content: + messages[0]["content"] = content.split(marker_anchor)[0].strip() + f"\n\n{marker_anchor} {turn}]" + else: + messages[0]["content"] = content + f"\n\n{marker_anchor} {turn}]" async def _call_llm(self, llm_provider, messages, tools): - kwargs = {"stream": True, "stream_options": {"include_usage": True}} + kwargs = {"stream": True, "stream_options": {"include_usage": True}, "max_tokens": 4096} if tools: kwargs["tools"] = tools kwargs["tool_choice"] = "auto" try: - kwargs["max_tokens"] = 4096 return await llm_provider.acompletion(messages=messages, timeout=60, **kwargs) except Exception as e: logging.error(f"[Architect] LLM Exception: {e}") @@ -277,10 +249,8 @@ def _format_assistant_msg(self, content, reasoning, tool_calls): if content: - import re - # Strip system-injected turn headers from history to avoid LLM hallucination - content = re.sub(r"(?i)---\n### šŸ›°ļø \*\*\[Turn \d+\] .*?\*\*\n", "", content) - content = content.strip() + # Fast string cleaning instead of regex for assistant message formatting + content = self.stream._apply_turn_header(content).strip() clean_tc = [] for tc in tool_calls: diff --git a/ai-hub/app/core/orchestration/memory.py b/ai-hub/app/core/orchestration/memory.py index b25f9aa..f8f301f 100644 --- a/ai-hub/app/core/orchestration/memory.py +++ b/ai-hub/app/core/orchestration/memory.py @@ -1,12 +1,11 @@ from typing import List, Dict, Any, Optional, Callable import logging -import re import json from sqlalchemy.orm import Session from app.db import models class ContextManager: - """Handles prompt assembly and conversation history management.""" + """Handles prompt assembly and conversation history management WITHOUT regex.""" def __init__(self, history_formatter: Optional[Callable] = None, context_postprocessor: Optional[Callable] = None): self.history_formatter = history_formatter or self._default_history_formatter @@ -70,8 +69,6 @@ system_prompt = system_prompt.replace("{{session_id}}", "{session_id}") # Enforce Tool-belt awareness — only inject when tools actually exist - # With zero tools, appending "You have 0 tools: [NONE]" causes the AI to refuse - # all creative/conversational requests thinking it has no capabilities. if available_skills: system_prompt += f"\n\n## šŸ› ļø ACTIVE TOOL-BELT (FORBIDDEN to mention others):\n" system_prompt += f"You have ONLY these {len(available_skills)} calibrated tools: [{skill_list_str}].\n" @@ -83,7 +80,7 @@ ] async def compress_history(self, messages: List[Dict[str, Any]], llm_provider=None) -> List[Dict[str, Any]]: - """Summarizes middle turns using the LLM to preserve context window while preventing amnesia.""" + """Summarizes middle turns using the LLM to preserve context window.""" if len(messages) < 60: return messages @@ -111,11 +108,9 @@ summary_text = "\n".join(summary_lines) - # If we have an LLM provider, do a fast, dense compression if llm_provider: try: - prompt = f"Please read the following timeline of chronological actions and compress them into a dense, highly factual summary paragraph. Retain critical paths, file names, errors, and system state outcomes. Omit pleasantries and reasoning fluff.\n\n{summary_text}" - + prompt = f"Please compress the following timeline of chronological actions into a dense, factual summary paragraph. Omit fluff.\n\n{summary_text}" comp_res = await llm_provider.acompletion( messages=[{"role": "user", "content": prompt}], max_tokens=500, @@ -126,13 +121,11 @@ try: llm_summary = comp_res.choices[0].message.content except AttributeError: - # Fallback for weird dictionary responses llm_summary = comp_res.choices[0].get("message", {}).get("content") - if llm_summary: summary_text = llm_summary.strip() except Exception as e: - logging.warning(f"LLM Summary compression failed, falling back to python truncation: {e}") + logging.warning(f"LLM Summary compression failed: {e}") history_msg = { "role": "user", @@ -149,21 +142,27 @@ return "\n\n".join(contexts) or "No context provided." def _default_history_formatter(self, history: List[models.Message]) -> str: + """High-performance history formatting using string methods instead of regex.""" lines = [] - # Pattern to catch our injected headers - header_pattern = r"(?i)---\n###\s*.*\[Turn\s*\d+\]\s*Master-Architect\s*Analysis\n" - for msg in history: role = "Human" if msg.sender == "user" else "Assistant" content = msg.content or "" - # If assistant message is empty, try to summarize its action if role == "Assistant": if not content: content = "[Action: Calling tools or internal reasoning...]" else: - # Strip system headers from history to prevent AI imitation - content = re.sub(header_pattern, "", content) + # Strip turn headers without regex + content_lines = content.splitlines(keepends=True) + cleaned_content_lines = [] + for line in content_lines: + l_lower = line.lower().strip() + if "---" in l_lower and ("turn" in l_lower or "analysis" in l_lower): + continue + if l_lower.startswith("###") and ("analysis" in l_lower or "turn" in l_lower): + continue + cleaned_content_lines.append(line) + content = "".join(cleaned_content_lines).strip() lines.append(f"{role}: {content.strip()}") return "\n".join(lines) diff --git a/ai-hub/app/core/orchestration/stream.py b/ai-hub/app/core/orchestration/stream.py index 8cb7092..88f05b3 100644 --- a/ai-hub/app/core/orchestration/stream.py +++ b/ai-hub/app/core/orchestration/stream.py @@ -1,8 +1,7 @@ -import re from typing import AsyncGenerator, Dict, Any, Optional class StreamProcessor: - """Handles logical processing of LLM streams: thinking tags and content routing.""" + """Handles logical processing of LLM streams WITHOUT regex and WITHOUT manual turn-header injection.""" def __init__(self, profile: Any): self._in_thinking_tag = False @@ -18,7 +17,6 @@ async def process_chunk(self, content_chunk: str, turn: int) -> AsyncGenerator[Dict[str, Any], None]: """Processes a raw content chunk, yields UI events (content, reasoning).""" self.tag_buffer += content_chunk - turn_header = f"\n\n---\nšŸ›°ļø **[Turn {turn}] thinking...**\n\n" while self.tag_buffer: if not self._in_thinking_tag: @@ -28,8 +26,7 @@ if start_tag_idx != -1: strategy_part = self.tag_buffer[:start_tag_idx] if strategy_part: - # Flush prefix buffer if any before tag - async for event in self._flush_prefix(strategy_part, turn_header): + async for event in self._flush_prefix(strategy_part): yield event self._in_thinking_tag = True @@ -40,45 +37,36 @@ if potential_idx != -1 and "".startswith(self.tag_buffer[potential_idx:].lower()): strategy_part = self.tag_buffer[:potential_idx] if strategy_part: - async for event in self._flush_prefix(strategy_part, turn_header): + async for event in self._flush_prefix(strategy_part): yield event self.tag_buffer = self.tag_buffer[potential_idx:] break else: - # Not in thinking tag, accumulate in prefix buffer if header not sent + # Capture strategy_text BEFORE clearing tag_buffer + strategy_text = self.tag_buffer + if not self.header_sent: self.prefix_buffer += self.tag_buffer self.tag_buffer = "" - # If buffer is large enough, or we have a reason to flush if len(self.prefix_buffer) > 200: - async for event in self._flush_prefix("", turn_header): + async for event in self._flush_prefix(""): yield event - strategy_text = self.tag_buffer - # Update: If this is the start of a turn, always emit header to reasoning first - if not self.header_sent: - self.header_sent = True - yield {"type": "reasoning", "content": turn_header} - + else: self.tag_buffer = "" - # Note: (?i) MUST be at the start of the regex string in Python 3.11+ - strip_boilerplate = r"Strategy:\s*Executing\s*orchestrated\s*tasks\s*in\s*progress\.*" - - # Separate boilerplate strategy from real content - if re.fullmatch(rf"(?i)\s*{strip_boilerplate}\s*", strategy_text): + l_strategy = strategy_text.lower().strip() + if "strategy:" in l_strategy and "executing orchestrated tasks" in l_strategy: yield {"type": "reasoning", "content": strategy_text} else: - processed = self._apply_turn_header(strategy_text) + processed = self._apply_header_stripping(strategy_text) if processed: yield {"type": "content", "content": processed} else: # Inside thinking tag if not self.header_sent: - async for event in self._flush_prefix("", turn_header): + async for event in self._flush_prefix(""): yield event - # reasoning should not be prefix-buffered - lower_buf = self.tag_buffer.lower() end_tag_idx = lower_buf.find("") @@ -102,65 +90,67 @@ yield {"type": "reasoning", "content": self.tag_buffer} self.tag_buffer = "" - async def _flush_prefix(self, extra_text: str, header: str) -> AsyncGenerator[Dict[str, Any], None]: + async def _flush_prefix(self, extra_text: str) -> AsyncGenerator[Dict[str, Any], None]: if not self.header_sent: self.header_sent = True - yield {"type": "reasoning", "content": header} full_text = self.prefix_buffer + extra_text - self.prefix_buffer = "" # Clear it - processed = self._apply_turn_header(full_text) + self.prefix_buffer = "" + processed = self._apply_header_stripping(full_text) if processed: yield {"type": "content", "content": processed} - def _apply_turn_header(self, text: str) -> Optional[str]: - # List of patterns to strip (hallucinated headers from various LLM generations) - strip_patterns = [ - r"(?i)^.*\[Turn\s*\d+\].*$", - r"(?i)Turn\s*\d+:\s*architecting\s*next\s*step\.*", - r"(?i)Strategy: Executing orchestrated tasks in progress\.*", - r"(?i)šŸ—ļø\s*BRIDGE\s*ANALYSIS:?", - r"(?i)Analysis: ", - r"(?i)---" - ] - - if self.profile.silent_stream: - # Aggressively strip any hallucinated headers defined in the profile - for pattern in self.profile.strip_headers: - text = re.sub(pattern, "", text, flags=re.IGNORECASE | re.MULTILINE) + def _apply_header_stripping(self, text: str) -> Optional[str]: + """Strips orchestrator internal markers but does NOT inject its own.""" + if not text: return "" + + if len(text) < 5 and self.header_sent: return text - # Even in non-silent mode, strip the patterns from the LLM's raw text to avoid duplication - for pattern in strip_patterns: - text = re.sub(pattern, "", text, flags=re.IGNORECASE | re.MULTILINE) - - # Dynamic Title Extraction (We skip this for now as we separated reasoning, but keep the strip logic) - # 1. Primary: Look for "Title: [Title Text]" - title_match = re.search(r"(?i)Title:\s*(.*?)\n", text) - if title_match: - text = text[:title_match.start()] + text[title_match.end():] - else: - # 2. Secondary: If AI uses a Markdown header like "### šŸš€ My Title" - md_header_match = re.search(r"(?i)^###\s*.*?\s*(.*?)\n", text, re.MULTILINE) - if md_header_match: - text = text[:md_header_match.start()] + text[md_header_match.end():] - - return text - async def end_stream(self, turn: int) -> AsyncGenerator[Dict[str, Any], None]: - """Flushes any remaining buffered text at the very end of the stream.""" - turn_header = f"\n\n---\nšŸ›°ļø **[Turn {turn}] thinking...**\n\n" + lines = text.splitlines(keepends=True) + cleaned_lines = [] - # 1. Flush prefix buffer if we haven't sent the header yet + skip_words = [ + "---", + "### [turn", + "thinking...", + "master-architect analysis", + "architecting next step", + "strategy: executing orchestrated tasks", + "šŸ—ļø bridge analysis", + "title:" + ] + + for line in lines: + l_strip = line.strip() + if not l_strip: + cleaned_lines.append(line) + continue + l_lower = l_strip.lower() + + matched_skip = False + for word in skip_words: + if word in l_lower: + matched_skip = True + break + if matched_skip: continue + if l_strip.startswith("###"): continue + if "šŸ›°ļø" in line and "[turn" in l_lower: continue + + cleaned_lines.append(line) + + return "".join(cleaned_lines) + + async def end_stream(self, turn: int) -> AsyncGenerator[Dict[str, Any], None]: if not self.header_sent and self.prefix_buffer: - async for event in self._flush_prefix("", turn_header): + async for event in self._flush_prefix(""): yield event - # 2. Flush any leftover tag buffer (rarely happens if LLM cuts off) if self.tag_buffer: if self._in_thinking_tag: yield {"type": "reasoning", "content": self.tag_buffer} else: - processed = self._apply_turn_header(self.tag_buffer) + processed = self._apply_header_stripping(self.tag_buffer) if processed: yield {"type": "content", "content": processed} self.tag_buffer = "" diff --git a/ai-hub/app/core/providers/factory.py b/ai-hub/app/core/providers/factory.py index 46b6f1c..254dce1 100644 --- a/ai-hub/app/core/providers/factory.py +++ b/ai-hub/app/core/providers/factory.py @@ -77,38 +77,39 @@ if is_empty(providerKey): if base_provider_for_keys == "gemini": providerKey = settings.GEMINI_API_KEY elif base_provider_for_keys == "deepseek": providerKey = settings.DEEPSEEK_API_KEY - - # 2. Resolve Model Name + + # 2. Resolve Model Name (always runs, regardless of key resolution path) modelName = model_name if not modelName: - # Priority 1: Extract model from provider_name if it contains one (e.g. "gemini/gemini-2.5-flash") + # Priority 1: Extract model from provider_name if it contains one (e.g. "gemini/gemini-1.5-flash") if "/" in provider_name: modelName = provider_name.split("/", 1)[1] - # Priority 2: If we have a suffixed name like "gemini_gemini-2.5-flash" + # Priority 2: If we have a suffixed name like "gemini_gemini-1.5-flash" if not modelName and "_" in provider_name: - parts = provider_name.split("_") - if len(parts) > 1 and parts[0] in ["gemini", "openai", "deepseek", "anthropic"]: - potential_model = "_".join(parts[1:]) - if "flash" in potential_model or "gpt" in potential_model or "chat" in potential_model: - modelName = potential_model + parts = provider_name.split("_") + if len(parts) > 1 and parts[0] in ["gemini", "openai", "deepseek", "anthropic"]: + potential_model = "_".join(parts[1:]) + if "flash" in potential_model or "gpt" in potential_model or "chat" in potential_model: + modelName = potential_model # Priority 3: Check settings using base provider if not modelName: modelName = settings.LLM_PROVIDERS.get(base_provider_for_keys, {}).get("model") - + + if not modelName: + raise ValueError(f"[factory] Could not resolve model name for provider '{provider_name}'. Check config.yaml llm_providers.") # Extract base type (e.g. 'gemini_2' -> 'gemini') litellm_providers = [p.value for p in litellm.LlmProviders] base_type = kwargs.get("provider_type") or resolve_provider_info(base_provider_for_keys, "llm", _llm_providers, litellm_providers) - - # Task: Prevent doubling like 'gemini/gemini/gemini-2.5-flash' + + # Prevent doubling like 'gemini/gemini/gemini-1.5-flash' if '/' in modelName: full_model = modelName else: full_model = f'{base_type}/{modelName}' - - # Pass the optional system_prompt and kwargs to the GeneralProvider constructor + return GeneralProvider(model_name=full_model, api_key=providerKey, system_prompt=system_prompt, **kwargs) def get_tts_provider(provider_name: str, api_key: str, model_name: str, voice_name: str, **kwargs) -> TTSProvider: diff --git a/ai-hub/app/core/providers/stt/transcribe_audio_with_gemini.sh b/ai-hub/app/core/providers/stt/transcribe_audio_with_gemini.sh index eeee2b1..458ef43 100644 --- a/ai-hub/app/core/providers/stt/transcribe_audio_with_gemini.sh +++ b/ai-hub/app/core/providers/stt/transcribe_audio_with_gemini.sh @@ -81,7 +81,7 @@ EOF # Step 4: Send transcription request -RESPONSE=$(curl -s "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent" \ +RESPONSE=$(curl -s "https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent" \ -H "x-goog-api-key: $GEMINI_API_KEY" \ -H "Content-Type: application/json" \ -X POST \ diff --git a/ai-hub/app/core/providers/tts/gemini.py b/ai-hub/app/core/providers/tts/gemini.py index 632d077..8e6e32b 100644 --- a/ai-hub/app/core/providers/tts/gemini.py +++ b/ai-hub/app/core/providers/tts/gemini.py @@ -40,12 +40,7 @@ voice_name: str = "Kore", **kwargs): from app.config import settings raw_model = model_name or settings.TTS_MODEL_NAME - # Strip any provider prefix (e.g. "vertex_ai/model" or "gemini/model") → keep only the model id model_id = raw_model.split("/")[-1] - # Normalise short names: "flash-tts" → "gemini-1.5-flash-preview-tts" - if model_id in ("gemini-2-flash-tts", "gemini-2.5-flash-tts", "flash-tts", "gemini-1.5-flash", "gemini-1.5-flash-preview-tts"): - model_id = "gemini-1.5-flash-preview-tts" - logger.info(f"Normalised model name to: {model_id}") # Route to Vertex AI ONLY when the key is a Vertex service-account key (starting with "AQ.") # AI Studio keys start with "AIza" and must use the generativelanguage endpoint. diff --git a/ai-hub/app/core/services/node_registry.py b/ai-hub/app/core/services/node_registry.py index ca82e70..c5295f1 100644 --- a/ai-hub/app/core/services/node_registry.py +++ b/ai-hub/app/core/services/node_registry.py @@ -413,18 +413,17 @@ if not is_tty_char: node.terminal_history.append(f"$ {cmd}\n") elif event_type == "task_stdout" and isinstance(data, str): - # NEW: Strip ANSI codes and CAP size to 100KB per chunk to prevent memory bloat - ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') - clean_output = ansi_escape.sub('', data) + # Use pre-compiled global regex to avoid overhead on every token + from app.core._regex import ANSI_ESCAPE + clean_output = ANSI_ESCAPE.sub('', data) if len(clean_output) > 100_000: clean_output = clean_output[:100_000] + "\n[... Output Truncated ...]\n" node.terminal_history.append(clean_output) elif event_type == "skill_event" and isinstance(data, dict): if data.get("type") == "output": output_data = data.get("data", "") - # Strip ANSI codes and CAP size - ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') - clean_output = ansi_escape.sub('', output_data) + from app.core._regex import ANSI_ESCAPE + clean_output = ANSI_ESCAPE.sub('', output_data) if len(clean_output) > 100_000: clean_output = clean_output[:100_000] + "\n[... Output Truncated ...]\n" node.terminal_history.append(clean_output) diff --git a/ai-hub/app/core/services/rag.py b/ai-hub/app/core/services/rag.py index 170f31e..2d57f47 100644 --- a/ai-hub/app/core/services/rag.py +++ b/ai-hub/app/core/services/rag.py @@ -56,7 +56,7 @@ session.title = prompt[:60].strip() + ("..." if len(prompt) > 60 else "") # Resolve provider - extract base key for settings lookup - # e.g. "gemini/gemini-2.5-flash" -> base key "gemini" + # e.g. "gemini/gemini-1.5-flash" -> base key "gemini" base_provider_key = provider_name.split("/")[0] if "/" in provider_name else provider_name llm_prefs = {} @@ -74,7 +74,7 @@ api_key_override = llm_prefs.get("api_key") - # If provider_name already contains an explicit model (e.g. "gemini/gemini-2.5-flash"), + # If provider_name already contains an explicit model (e.g. "gemini/gemini-1.5-flash"), # do NOT override it with the model from system settings (which might be "gemini-1.5-flash") if "/" in provider_name: model_name_override = "" # Let factory extract model from provider_name @@ -159,13 +159,21 @@ live = registry.get_node(node.node_id) if live and live.terminal_history: # Grab recent chunks and join - history_blob = "".join(live.terminal_history[-40:]) + # defensive join: only take enough chunks for ~4000 chars total + chunks = [] + total_len = 0 + for chunk in reversed(live.terminal_history[-40:]): + chunks.insert(0, chunk) + total_len += len(chunk) + if total_len > 4000: break - # Extreme Sanity Check: Strip ANSI again just in case, and limit total size - ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') - clean_history = ansi_escape.sub('', history_blob) + history_blob = "".join(chunks) - # Limit to 2000 chars to avoid bloating the context / breaking LLMs + # Use pre-compiled regex from utility + from app.core._regex import ANSI_ESCAPE + clean_history = ANSI_ESCAPE.sub('', history_blob) + + # Limit to 2000 chars to avoid bloating the context if len(clean_history) > 2000: clean_history = "...[truncated]...\n" + clean_history[-2000:] @@ -263,8 +271,9 @@ "type": event["type"] }) - # Commit every 5 chunks to provide smooth UI streaming without hammering the DB - if (input_tokens + output_tokens) % 5 == 0: + # Commit every 50 tokens or when it makes sense UI-wise. + # Frequent commits block the async event loop with synchronous disk I/O. + if (input_tokens + output_tokens) % 50 == 0: try: db.commit() except: diff --git a/ai-hub/app/db/session.py b/ai-hub/app/db/session.py index fb50db0..de9fdb0 100644 --- a/ai-hub/app/db/session.py +++ b/ai-hub/app/db/session.py @@ -53,17 +53,25 @@ """ Context-manager database session for use outside of FastAPI request scope. Used by background services (e.g. NodeRegistryService) that need their own session. + VERSION M6: No auto-commit. Caller must session.commit() explicitly. """ db = SessionLocal() try: yield db - db.commit() except Exception: db.rollback() raise finally: db.close() +import asyncio +from functools import partial + +async def async_db_op(func, *args, **kwargs): + """Refactored M6: Run a DB-blocking function in a separate thread to keep the event loop alive.""" + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, partial(func, *args, **kwargs)) + def create_db_and_tables(): """ Creates all database tables defined by models inheriting from Base. diff --git a/ai-hub/test.db-shm b/ai-hub/test.db-shm deleted file mode 100644 index 07b4149..0000000 --- a/ai-hub/test.db-shm +++ /dev/null Binary files differ diff --git a/ai-hub/test.db-wal b/ai-hub/test.db-wal deleted file mode 100644 index 92c4478..0000000 --- a/ai-hub/test.db-wal +++ /dev/null Binary files differ diff --git a/ai-hub/test_cortex.db-shm b/ai-hub/test_cortex.db-shm deleted file mode 100644 index 61c619d..0000000 --- a/ai-hub/test_cortex.db-shm +++ /dev/null Binary files differ diff --git a/ai-hub/test_cortex.db-wal b/ai-hub/test_cortex.db-wal deleted file mode 100644 index eee36ad..0000000 --- a/ai-hub/test_cortex.db-wal +++ /dev/null Binary files differ diff --git a/scripts/local_rebuild.sh b/scripts/local_rebuild.sh index cdd0b69..0a99682 100755 --- a/scripts/local_rebuild.sh +++ b/scripts/local_rebuild.sh @@ -24,10 +24,15 @@ echo "šŸš€ Starting AI Hub deployment process..." BUILD_BINARIES=true +FAST_DEPLOY=false for arg in "$@"; do if [ "$arg" == "--skip-binaries" ]; then BUILD_BINARIES=false fi + if [ "$arg" == "--fast" ]; then + FAST_DEPLOY=true + BUILD_BINARIES=false + fi done # 1. Base compose file @@ -46,11 +51,15 @@ COMPOSE_FILES="$COMPOSE_FILES -f $PROJECT_DIR/deployment/test-nodes/docker-compose.test-nodes.yml" fi -echo "šŸ›‘ Stopping and removing old Docker containers and networks..." -sudo $DOCKER_CMD $COMPOSE_FILES down --remove-orphans || true - -echo "šŸ—ļø Building and starting new containers..." -sudo $DOCKER_CMD $COMPOSE_FILES up -d --build > /tmp/deploy_log.txt 2>&1 +if [ "$FAST_DEPLOY" = true ]; then + echo "⚔ Fast Deployment: Skipping 'down' and '--build'..." + sudo $DOCKER_CMD $COMPOSE_FILES up -d > /tmp/deploy_log.txt 2>&1 +else + echo "šŸ›‘ Stopping and removing old Docker containers and networks..." + sudo $DOCKER_CMD $COMPOSE_FILES down --remove-orphans || true + echo "šŸ—ļø Building and starting new containers..." + sudo $DOCKER_CMD $COMPOSE_FILES up -d --build > /tmp/deploy_log.txt 2>&1 +fi echo "āœ… Containers started! Checking status..." cat /tmp/deploy_log.txt