from typing import AsyncGenerator, Dict, Any, Optional
class StreamProcessor:
"""Handles logical processing of LLM streams WITHOUT regex and WITHOUT manual turn-header injection."""
def __init__(self, profile: Any):
self._in_thinking_tag = False
self.tag_buffer = ""
self.prefix_buffer = "" # Accumulates start of content to catch split headers
self.header_sent = False
self.profile = profile
def reset_turn(self):
self.header_sent = False
self.prefix_buffer = ""
async def process_chunk(self, content_chunk: str, turn: int) -> AsyncGenerator[Dict[str, Any], None]:
"""Processes a raw content chunk, yields UI events (content, reasoning)."""
self.tag_buffer += content_chunk
while self.tag_buffer:
if not self._in_thinking_tag:
lower_buf = self.tag_buffer.lower()
start_tag_idx = lower_buf.find("<thinking>")
if start_tag_idx != -1:
strategy_part = self.tag_buffer[:start_tag_idx]
if strategy_part:
async for event in self._flush_prefix(strategy_part):
yield event
self._in_thinking_tag = True
self.tag_buffer = self.tag_buffer[start_tag_idx + len("<thinking>"):]
continue
potential_idx = self.tag_buffer.find("<")
if potential_idx != -1 and "<thinking>".startswith(self.tag_buffer[potential_idx:].lower()):
strategy_part = self.tag_buffer[:potential_idx]
if strategy_part:
async for event in self._flush_prefix(strategy_part):
yield event
self.tag_buffer = self.tag_buffer[potential_idx:]
break
else:
# Capture strategy_text BEFORE clearing tag_buffer
strategy_text = self.tag_buffer
if not self.header_sent:
self.prefix_buffer += self.tag_buffer
self.tag_buffer = ""
if len(self.prefix_buffer) > 200:
async for event in self._flush_prefix(""):
yield event
else:
self.tag_buffer = ""
l_strategy = strategy_text.lower().strip()
if "strategy:" in l_strategy and "executing orchestrated tasks" in l_strategy:
yield {"type": "reasoning", "content": strategy_text}
else:
processed = self._apply_header_stripping(strategy_text)
if processed:
yield {"type": "content", "content": processed}
else:
# Inside thinking tag
if not self.header_sent:
async for event in self._flush_prefix(""):
yield event
lower_buf = self.tag_buffer.lower()
end_tag_idx = lower_buf.find("</thinking>")
if end_tag_idx != -1:
reasoning_part = self.tag_buffer[:end_tag_idx]
if reasoning_part:
yield {"type": "reasoning", "content": reasoning_part}
self._in_thinking_tag = False
self.tag_buffer = self.tag_buffer[end_tag_idx + len("</thinking>"):]
continue
potential_idx = self.tag_buffer.find("<")
if potential_idx != -1 and "</thinking>".startswith(self.tag_buffer[potential_idx:].lower()):
reasoning_part = self.tag_buffer[:potential_idx]
if reasoning_part:
yield {"type": "reasoning", "content": reasoning_part}
self.tag_buffer = self.tag_buffer[potential_idx:]
break
else:
yield {"type": "reasoning", "content": self.tag_buffer}
self.tag_buffer = ""
async def _flush_prefix(self, extra_text: str) -> AsyncGenerator[Dict[str, Any], None]:
if not self.header_sent:
self.header_sent = True
full_text = self.prefix_buffer + extra_text
self.prefix_buffer = ""
processed = self._apply_header_stripping(full_text)
if processed:
yield {"type": "content", "content": processed}
def _apply_header_stripping(self, text: str) -> Optional[str]:
"""Strips orchestrator internal markers but does NOT inject its own."""
if not text: return ""
if len(text) < 5 and self.header_sent:
return text
lines = text.splitlines(keepends=True)
cleaned_lines = []
skip_words = [
"---",
"### [turn",
"thinking...",
"master-architect analysis",
"architecting next step",
"strategy: executing orchestrated tasks",
"🏗️ bridge analysis",
"title:"
]
for line in lines:
l_strip = line.strip()
if not l_strip:
cleaned_lines.append(line)
continue
l_lower = l_strip.lower()
matched_skip = False
for word in skip_words:
if word in l_lower:
matched_skip = True
break
if matched_skip: continue
if l_strip.startswith("###"): continue
if "🛰️" in line and "[turn" in l_lower: continue
cleaned_lines.append(line)
return "".join(cleaned_lines)
async def end_stream(self, turn: int) -> AsyncGenerator[Dict[str, Any], None]:
if not self.header_sent and self.prefix_buffer:
async for event in self._flush_prefix(""):
yield event
if self.tag_buffer:
if self._in_thinking_tag:
yield {"type": "reasoning", "content": self.tag_buffer}
else:
processed = self._apply_header_stripping(self.tag_buffer)
if processed:
yield {"type": "content", "content": processed}
self.tag_buffer = ""