Newer
Older
cortex-hub / ai-hub / app / core / orchestration / stream.py
import re
from typing import AsyncGenerator, Dict, Any, Optional

class StreamProcessor:
    """Handles logical processing of LLM streams: thinking tags and content routing."""
    
    def __init__(self, profile: Any):
        self._in_thinking_tag = False
        self.tag_buffer = ""
        self.prefix_buffer = "" # Accumulates start of content to catch split headers
        self.header_sent = False
        self.profile = profile

    def reset_turn(self):
        self.header_sent = False
        self.prefix_buffer = ""

    async def process_chunk(self, content_chunk: str, turn: int) -> AsyncGenerator[Dict[str, Any], None]:
        """Processes a raw content chunk, yields UI events (content, reasoning)."""
        self.tag_buffer += content_chunk
        turn_header = f"---\n### 🛰️ **[Turn {turn}] Thinking...**\n"

        while self.tag_buffer:
            if not self._in_thinking_tag:
                lower_buf = self.tag_buffer.lower()
                start_tag_idx = lower_buf.find("<thinking>")
                
                if start_tag_idx != -1:
                    strategy_part = self.tag_buffer[:start_tag_idx]
                    if strategy_part:
                        # Flush prefix buffer if any before tag
                        async for event in self._flush_prefix(strategy_part, turn_header):
                            yield event
                    
                    self._in_thinking_tag = True
                    self.tag_buffer = self.tag_buffer[start_tag_idx + len("<thinking>"):]
                    continue
                
                potential_idx = self.tag_buffer.find("<")
                if potential_idx != -1 and "<thinking>".startswith(self.tag_buffer[potential_idx:].lower()):
                    strategy_part = self.tag_buffer[:potential_idx]
                    if strategy_part:
                        async for event in self._flush_prefix(strategy_part, turn_header):
                            yield event
                    self.tag_buffer = self.tag_buffer[potential_idx:]
                    break
                else:
                    # Not in thinking tag, accumulate in prefix buffer if header not sent
                    if not self.header_sent:
                        self.prefix_buffer += self.tag_buffer
                        self.tag_buffer = ""
                        # If buffer is large enough, or we have a reason to flush
                        if len(self.prefix_buffer) > 200:
                             async for event in self._flush_prefix("", turn_header):
                                 yield event
                    else:
                        strategy_part = self._apply_turn_header(self.tag_buffer, turn_header)
                        if strategy_part:
                            yield {"type": "content", "content": strategy_part}
                        self.tag_buffer = ""

            else:
                # Inside thinking tag - reasoning should not be prefix-buffered
                # But if we were buffering prefix, flush it now
                if not self.header_sent and self.prefix_buffer:
                    async for event in self._flush_prefix("", turn_header):
                        yield event

                lower_buf = self.tag_buffer.lower()
                end_tag_idx = lower_buf.find("</thinking>")
                
                if end_tag_idx != -1:
                    reasoning_part = self.tag_buffer[:end_tag_idx]
                    if reasoning_part:
                        yield {"type": "reasoning", "content": reasoning_part}
                    
                    self._in_thinking_tag = False
                    self.tag_buffer = self.tag_buffer[end_tag_idx + len("</thinking>"):]
                    continue
                
                potential_idx = self.tag_buffer.find("<")
                if potential_idx != -1 and "</thinking>".startswith(self.tag_buffer[potential_idx:].lower()):
                    reasoning_part = self.tag_buffer[:potential_idx]
                    if reasoning_part:
                        yield {"type": "reasoning", "content": reasoning_part}
                    self.tag_buffer = self.tag_buffer[potential_idx:]
                    break
                else:
                    yield {"type": "reasoning", "content": self.tag_buffer}
                    self.tag_buffer = ""

    async def _flush_prefix(self, extra_text: str, header: str) -> AsyncGenerator[Dict[str, Any], None]:
        full_text = self.prefix_buffer + extra_text
        self.prefix_buffer = "" # Clear it
        processed = self._apply_turn_header(full_text, header)
        if processed:
            yield {"type": "content", "content": processed}

    def _apply_turn_header(self, text: str, header: str) -> Optional[str]:
        # List of patterns to strip (hallucinated headers from various LLM generations)
        strip_patterns = [
            r"(?i)^.*\[Turn\s*\d+\].*$",
            r"(?i)Turn\s*\d+:\s*architecting\s*next\s*step\.*",
            r"(?i)🏗️\s*BRIDGE\s*ANALYSIS:?",
            r"(?i)Analysis: ",
            r"(?i)---"
        ]

        if self.profile.silent_stream:
            # Aggressively strip any hallucinated headers defined in the profile
            for pattern in self.profile.strip_headers:
                text = re.sub(pattern, "", text, flags=re.IGNORECASE | re.MULTILINE)
            return text

        # Even in non-silent mode, strip the patterns from the LLM's raw text to avoid duplication
        for pattern in strip_patterns:
            text = re.sub(pattern, "", text, flags=re.IGNORECASE | re.MULTILINE)

        # Dynamic Title Extraction
        if not self.header_sent:
            # 1. Primary: Look for "Title: [Title Text]"
            title_match = re.search(r"(?i)Title:\s*(.*?)\n", text)
            if title_match:
                custom_title = title_match.group(1).strip()
                header = re.sub(r"Thinking\.\.\.", custom_title, header)
                text = text[:title_match.start()] + text[title_match.end():]
            else:
                # 2. Secondary: If AI uses a Markdown header like "### 🚀 My Title"
                md_header_match = re.search(r"(?i)^###\s*.*?\s*(.*?)\n", text, re.MULTILINE)
                if md_header_match:
                    custom_title = md_header_match.group(1).strip()
                    header = re.sub(r"Thinking\.\.\.", custom_title, header)
                    text = text[:md_header_match.start()] + text[md_header_match.end():]

        if not text.strip():
            # If after stripping the text is empty, don't send anything yet (unless it's the very first chunk)
            if not self.header_sent:
                return None
            return ""

        if not self.header_sent:
            self.header_sent = True
            # Prepend the system's authoritative header
            return header + text.lstrip()
        
        return text
    async def end_stream(self, turn: int) -> AsyncGenerator[Dict[str, Any], None]:
        """Flushes any remaining buffered text at the very end of the stream."""
        turn_header = f"---\n### 🛰️ **[Turn {turn}] Thinking...**\n"
        
        # 1. Flush prefix buffer if we haven't sent the header yet
        if not self.header_sent and self.prefix_buffer:
            async for event in self._flush_prefix("", turn_header):
                yield event
        
        # 2. Flush any leftover tag buffer (rarely happens if LLM cuts off)
        if self.tag_buffer:
            if self._in_thinking_tag:
                 yield {"type": "reasoning", "content": self.tag_buffer}
            else:
                 processed = self._apply_turn_header(self.tag_buffer, turn_header)
                 if processed:
                      yield {"type": "content", "content": processed}
            self.tag_buffer = ""