Newer
Older
cortex-hub / ai-hub / app / core / orchestration / stream.py
import re
from typing import AsyncGenerator, Dict, Any, Optional

class StreamProcessor:
    """Handles logical processing of LLM streams: thinking tags and content routing."""
    
    def __init__(self, profile: Any):
        self._in_thinking_tag = False
        self.tag_buffer = ""
        self.header_sent = False
        self.profile = profile

    def reset_turn(self):
        self.header_sent = False
        # Note: _in_thinking_tag and tag_buffer might need to persist if the turn ends abruptly 
        # but usually LLM closes tags before tool calls.

    async def process_chunk(self, content_chunk: str, turn: int) -> AsyncGenerator[Dict[str, Any], None]:
        """Processes a raw content chunk, yields UI events (content, reasoning)."""
        self.tag_buffer += content_chunk
        turn_header = f"---\n### 🛰️ **[Turn {turn}] Master-Architect Analysis**\n"

        while self.tag_buffer:
            if not self._in_thinking_tag:
                lower_buf = self.tag_buffer.lower()
                start_tag_idx = lower_buf.find("<thinking>")
                
                if start_tag_idx != -1:
                    strategy_part = self.tag_buffer[:start_tag_idx]
                    if strategy_part:
                        modified_strategy = self._apply_turn_header(strategy_part, turn_header)
                        if modified_strategy:
                            yield {"type": "content", "content": modified_strategy}
                    
                    self._in_thinking_tag = True
                    self.tag_buffer = self.tag_buffer[start_tag_idx + len("<thinking>"):]
                    continue
                
                potential_idx = self.tag_buffer.find("<")
                if potential_idx != -1 and "<thinking>".startswith(self.tag_buffer[potential_idx:].lower()):
                    strategy_part = self.tag_buffer[:potential_idx]
                    if strategy_part:
                        modified_strategy = self._apply_turn_header(strategy_part, turn_header)
                        if modified_strategy:
                            yield {"type": "content", "content": modified_strategy}
                    self.tag_buffer = self.tag_buffer[potential_idx:]
                    break
                else:
                    strategy_part = self._apply_turn_header(self.tag_buffer, turn_header)
                    if strategy_part:
                        yield {"type": "content", "content": strategy_part}
                    self.tag_buffer = ""

            else:
                lower_buf = self.tag_buffer.lower()
                end_tag_idx = lower_buf.find("</thinking>")
                
                if end_tag_idx != -1:
                    reasoning_part = self.tag_buffer[:end_tag_idx]
                    if reasoning_part:
                        yield {"type": "reasoning", "content": reasoning_part}
                    
                    self._in_thinking_tag = False
                    self.tag_buffer = self.tag_buffer[end_tag_idx + len("</thinking>"):]
                    continue
                
                potential_idx = self.tag_buffer.find("<")
                if potential_idx != -1 and "</thinking>".startswith(self.tag_buffer[potential_idx:].lower()):
                    reasoning_part = self.tag_buffer[:potential_idx]
                    if reasoning_part:
                        yield {"type": "reasoning", "content": reasoning_part}
                    self.tag_buffer = self.tag_buffer[potential_idx:]
                    break
                else:
                    yield {"type": "reasoning", "content": self.tag_buffer}
                    self.tag_buffer = ""

    def _apply_turn_header(self, text: str, header: str) -> Optional[str]:
        if self.profile.silent_stream:
            # Aggressively strip any hallucinated headers defined in the profile
            for pattern in self.profile.strip_headers:
                text = re.sub(pattern, "", text, flags=re.IGNORECASE)
            return text

        if not text:
            return text

        if not self.header_sent:
            self.header_sent = True
            return header + text
        
        return text