import re
from typing import AsyncGenerator, Dict, Any, Optional
class StreamProcessor:
"""Handles logical processing of LLM streams: thinking tags and content routing."""
def __init__(self, profile: Any):
self._in_thinking_tag = False
self.tag_buffer = ""
self.header_sent = False
self.profile = profile
def reset_turn(self):
self.header_sent = False
# Note: _in_thinking_tag and tag_buffer might need to persist if the turn ends abruptly
# but usually LLM closes tags before tool calls.
async def process_chunk(self, content_chunk: str, turn: int) -> AsyncGenerator[Dict[str, Any], None]:
"""Processes a raw content chunk, yields UI events (content, reasoning)."""
self.tag_buffer += content_chunk
turn_header = f"---\n### 🛰️ **[Turn {turn}] Master-Architect Analysis**\n"
while self.tag_buffer:
if not self._in_thinking_tag:
lower_buf = self.tag_buffer.lower()
start_tag_idx = lower_buf.find("<thinking>")
if start_tag_idx != -1:
strategy_part = self.tag_buffer[:start_tag_idx]
if strategy_part:
modified_strategy = self._apply_turn_header(strategy_part, turn_header)
if modified_strategy:
yield {"type": "content", "content": modified_strategy}
self._in_thinking_tag = True
self.tag_buffer = self.tag_buffer[start_tag_idx + len("<thinking>"):]
continue
potential_idx = self.tag_buffer.find("<")
if potential_idx != -1 and "<thinking>".startswith(self.tag_buffer[potential_idx:].lower()):
strategy_part = self.tag_buffer[:potential_idx]
if strategy_part:
modified_strategy = self._apply_turn_header(strategy_part, turn_header)
if modified_strategy:
yield {"type": "content", "content": modified_strategy}
self.tag_buffer = self.tag_buffer[potential_idx:]
break
else:
strategy_part = self._apply_turn_header(self.tag_buffer, turn_header)
if strategy_part:
yield {"type": "content", "content": strategy_part}
self.tag_buffer = ""
else:
lower_buf = self.tag_buffer.lower()
end_tag_idx = lower_buf.find("</thinking>")
if end_tag_idx != -1:
reasoning_part = self.tag_buffer[:end_tag_idx]
if reasoning_part:
yield {"type": "reasoning", "content": reasoning_part}
self._in_thinking_tag = False
self.tag_buffer = self.tag_buffer[end_tag_idx + len("</thinking>"):]
continue
potential_idx = self.tag_buffer.find("<")
if potential_idx != -1 and "</thinking>".startswith(self.tag_buffer[potential_idx:].lower()):
reasoning_part = self.tag_buffer[:potential_idx]
if reasoning_part:
yield {"type": "reasoning", "content": reasoning_part}
self.tag_buffer = self.tag_buffer[potential_idx:]
break
else:
yield {"type": "reasoning", "content": self.tag_buffer}
self.tag_buffer = ""
def _apply_turn_header(self, text: str, header: str) -> Optional[str]:
if self.profile.silent_stream:
# Aggressively strip any hallucinated headers defined in the profile
for pattern in self.profile.strip_headers:
text = re.sub(pattern, "", text, flags=re.IGNORECASE)
return text
if not text:
return text
if not self.header_sent:
self.header_sent = True
return header + text
return text