import re
from typing import AsyncGenerator, Dict, Any, Optional
class StreamProcessor:
"""Handles logical processing of LLM streams: thinking tags and content routing."""
def __init__(self):
self._in_thinking_tag = False
self.tag_buffer = ""
self.header_sent = False
def reset_turn(self):
self.header_sent = False
# Note: _in_thinking_tag and tag_buffer might need to persist if the turn ends abruptly
# but usually LLM closes tags before tool calls.
async def process_chunk(self, content_chunk: str, turn: int) -> AsyncGenerator[Dict[str, Any], None]:
"""Processes a raw content chunk, yields UI events (content, reasoning)."""
self.tag_buffer += content_chunk
turn_header = f"---\n### 🛰️ **[Turn {turn}] Master-Architect Analysis**\n"
while self.tag_buffer:
if not self._in_thinking_tag:
lower_buf = self.tag_buffer.lower()
start_tag_idx = lower_buf.find("<thinking>")
if start_tag_idx != -1:
strategy_part = self.tag_buffer[:start_tag_idx]
if strategy_part:
modified_strategy = self._apply_turn_header(strategy_part, turn_header)
if modified_strategy:
yield {"type": "content", "content": modified_strategy}
self._in_thinking_tag = True
self.tag_buffer = self.tag_buffer[start_tag_idx + len("<thinking>"):]
continue
potential_idx = self.tag_buffer.find("<")
if potential_idx != -1 and "<thinking>".startswith(self.tag_buffer[potential_idx:].lower()):
strategy_part = self.tag_buffer[:potential_idx]
if strategy_part:
modified_strategy = self._apply_turn_header(strategy_part, turn_header)
if modified_strategy:
yield {"type": "content", "content": modified_strategy}
self.tag_buffer = self.tag_buffer[potential_idx:]
break
else:
strategy_part = self._apply_turn_header(self.tag_buffer, turn_header)
if strategy_part:
yield {"type": "content", "content": strategy_part}
self.tag_buffer = ""
else:
lower_buf = self.tag_buffer.lower()
end_tag_idx = lower_buf.find("</thinking>")
if end_tag_idx != -1:
reasoning_part = self.tag_buffer[:end_tag_idx]
if reasoning_part:
yield {"type": "reasoning", "content": reasoning_part}
self._in_thinking_tag = False
self.tag_buffer = self.tag_buffer[end_tag_idx + len("</thinking>"):]
continue
potential_idx = self.tag_buffer.find("<")
if potential_idx != -1 and "</thinking>".startswith(self.tag_buffer[potential_idx:].lower()):
reasoning_part = self.tag_buffer[:potential_idx]
if reasoning_part:
yield {"type": "reasoning", "content": reasoning_part}
self.tag_buffer = self.tag_buffer[potential_idx:]
break
else:
yield {"type": "reasoning", "content": self.tag_buffer}
self.tag_buffer = ""
def _apply_turn_header(self, text: str, header: str) -> Optional[str]:
if not text or self.header_sent:
return text
# Idempotency check
has_existing = re.search(r"Turn\s+\d+|Master-Architect\s+Analysis|###\s+🛰️", text, re.IGNORECASE)
if not has_existing:
text = "\n\n" + header + text
self.header_sent = True
return text