import re
from typing import AsyncGenerator, Dict, Any, Optional
class StreamProcessor:
"""Handles logical processing of LLM streams: thinking tags and content routing."""
def __init__(self, profile: Any):
self._in_thinking_tag = False
self.tag_buffer = ""
self.prefix_buffer = "" # Accumulates start of content to catch split headers
self.header_sent = False
self.profile = profile
def reset_turn(self):
self.header_sent = False
self.prefix_buffer = ""
async def process_chunk(self, content_chunk: str, turn: int) -> AsyncGenerator[Dict[str, Any], None]:
"""Processes a raw content chunk, yields UI events (content, reasoning)."""
self.tag_buffer += content_chunk
turn_header = f"---\n### 🛰️ **[Turn {turn}] Thinking...**\n"
while self.tag_buffer:
if not self._in_thinking_tag:
lower_buf = self.tag_buffer.lower()
start_tag_idx = lower_buf.find("<thinking>")
if start_tag_idx != -1:
strategy_part = self.tag_buffer[:start_tag_idx]
if strategy_part:
# Flush prefix buffer if any before tag
async for event in self._flush_prefix(strategy_part, turn_header):
yield event
self._in_thinking_tag = True
self.tag_buffer = self.tag_buffer[start_tag_idx + len("<thinking>"):]
continue
potential_idx = self.tag_buffer.find("<")
if potential_idx != -1 and "<thinking>".startswith(self.tag_buffer[potential_idx:].lower()):
strategy_part = self.tag_buffer[:potential_idx]
if strategy_part:
async for event in self._flush_prefix(strategy_part, turn_header):
yield event
self.tag_buffer = self.tag_buffer[potential_idx:]
break
else:
# Not in thinking tag, accumulate in prefix buffer if header not sent
if not self.header_sent:
self.prefix_buffer += self.tag_buffer
self.tag_buffer = ""
# If buffer is large enough, or we have a reason to flush
if len(self.prefix_buffer) > 200:
async for event in self._flush_prefix("", turn_header):
yield event
else:
strategy_part = self._apply_turn_header(self.tag_buffer, turn_header)
if strategy_part:
yield {"type": "content", "content": strategy_part}
self.tag_buffer = ""
else:
# Inside thinking tag - reasoning should not be prefix-buffered
# But if we were buffering prefix, flush it now
if not self.header_sent and self.prefix_buffer:
async for event in self._flush_prefix("", turn_header):
yield event
lower_buf = self.tag_buffer.lower()
end_tag_idx = lower_buf.find("</thinking>")
if end_tag_idx != -1:
reasoning_part = self.tag_buffer[:end_tag_idx]
if reasoning_part:
yield {"type": "reasoning", "content": reasoning_part}
self._in_thinking_tag = False
self.tag_buffer = self.tag_buffer[end_tag_idx + len("</thinking>"):]
continue
potential_idx = self.tag_buffer.find("<")
if potential_idx != -1 and "</thinking>".startswith(self.tag_buffer[potential_idx:].lower()):
reasoning_part = self.tag_buffer[:potential_idx]
if reasoning_part:
yield {"type": "reasoning", "content": reasoning_part}
self.tag_buffer = self.tag_buffer[potential_idx:]
break
else:
yield {"type": "reasoning", "content": self.tag_buffer}
self.tag_buffer = ""
async def _flush_prefix(self, extra_text: str, header: str) -> AsyncGenerator[Dict[str, Any], None]:
full_text = self.prefix_buffer + extra_text
self.prefix_buffer = "" # Clear it
processed = self._apply_turn_header(full_text, header)
if processed:
yield {"type": "content", "content": processed}
def _apply_turn_header(self, text: str, header: str) -> Optional[str]:
# List of patterns to strip (hallucinated headers from various LLM generations)
strip_patterns = [
r"(?i)^.*\[Turn\s*\d+\].*$",
r"(?i)Turn\s*\d+:\s*architecting\s*next\s*step\.*",
r"(?i)🏗️\s*BRIDGE\s*ANALYSIS:?",
r"(?i)Analysis: ",
r"(?i)---"
]
if self.profile.silent_stream:
# Aggressively strip any hallucinated headers defined in the profile
for pattern in self.profile.strip_headers:
text = re.sub(pattern, "", text, flags=re.IGNORECASE | re.MULTILINE)
return text
# Even in non-silent mode, strip the patterns from the LLM's raw text to avoid duplication
for pattern in strip_patterns:
text = re.sub(pattern, "", text, flags=re.IGNORECASE | re.MULTILINE)
# Dynamic Title Extraction
if not self.header_sent:
# 1. Primary: Look for "Title: [Title Text]"
title_match = re.search(r"(?i)Title:\s*(.*?)\n", text)
if title_match:
custom_title = title_match.group(1).strip()
header = re.sub(r"Thinking\.\.\.", custom_title, header)
text = text[:title_match.start()] + text[title_match.end():]
else:
# 2. Secondary: If AI uses a Markdown header like "### 🚀 My Title"
md_header_match = re.search(r"(?i)^###\s*.*?\s*(.*?)\n", text, re.MULTILINE)
if md_header_match:
custom_title = md_header_match.group(1).strip()
header = re.sub(r"Thinking\.\.\.", custom_title, header)
text = text[:md_header_match.start()] + text[md_header_match.end():]
if not text.strip():
# If after stripping the text is empty, don't send anything yet (unless it's the very first chunk)
if not self.header_sent:
return None
return ""
if not self.header_sent:
self.header_sent = True
# Prepend the system's authoritative header
return header + text.lstrip()
return text
async def end_stream(self, turn: int) -> AsyncGenerator[Dict[str, Any], None]:
"""Flushes any remaining buffered text at the very end of the stream."""
turn_header = f"---\n### 🛰️ **[Turn {turn}] Thinking...**\n"
# 1. Flush prefix buffer if we haven't sent the header yet
if not self.header_sent and self.prefix_buffer:
async for event in self._flush_prefix("", turn_header):
yield event
# 2. Flush any leftover tag buffer (rarely happens if LLM cuts off)
if self.tag_buffer:
if self._in_thinking_tag:
yield {"type": "reasoning", "content": self.tag_buffer}
else:
processed = self._apply_turn_header(self.tag_buffer, turn_header)
if processed:
yield {"type": "content", "content": processed}
self.tag_buffer = ""