import os
import time
import json
import subprocess
import shlex
import threading
import shutil
import re
# Self-contained paths relative to this script
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
REQUESTS_DIR = os.path.join(BASE_DIR, "comms", "requests")
RESPONSES_DIR = os.path.join(BASE_DIR, "comms", "responses")
SESSION_NAME = "jetski_swarm"
CLI_PATH = "/google/bin/releases/jetski-devs/tools/cli"
# State tracking
# { target: { "last_cmd_time": timestamp, "last_output_time": timestamp, "last_output": str } }
AGENT_STATE = {}
HANGING_THRESHOLD = 30 # seconds of no output change while busy
BUSY_THRESHOLD = 300 # seconds max execution time
def get_active_agents():
try:
cmd = f"tmux list-windows -t {SESSION_NAME} -F '#{{window_name}}'"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, check=True)
windows = result.stdout.strip().split('\n')
agents = []
for w in windows:
if w in ["orchestrator"]:
continue
if w == "grid":
# List panes
cmd = f"tmux list-panes -t {SESSION_NAME}:grid -F '#P'"
res = subprocess.run(cmd, shell=True, capture_output=True, text=True, check=True)
panes = res.stdout.strip().split('\n')
agents.extend([f"grid.{p}" for p in panes])
else:
agents.append(w)
print(f"Debug [get_active_agents]: {agents}", flush=True)
return agents
except Exception as e:
print(f"Error listing agents: {e}")
return []
def check_hanging():
agents = get_active_agents()
now = time.time()
prompt_regex = r"(?i)(approval needed|pending approval|confirm\?|yes/no|choose option|do you want to proceed\?)"
import re
for agent in agents:
state = AGENT_STATE.setdefault(agent, {
"last_cmd_time": 0,
"last_output_time": now,
"last_output": ""
})
current_output = ""
try:
cap_cmd = f"tmux capture-pane -p -t {SESSION_NAME}:{agent}"
result = subprocess.run(cap_cmd, shell=True, capture_output=True, text=True, check=True)
current_output = result.stdout
is_prompt = False
# Check for numbered options (common in some prompts)
lines = current_output.split('\n')
last_snippet = "\n".join(lines[-10:]) if len(lines) > 10 else current_output
if "1." in last_snippet and "2." in last_snippet:
is_prompt = True
# Check for regex keywords
print(f"Checking agent for prompt: {agent}", flush=True)
if agent == "grid.1":
print(f"Debug [check_hanging]: grid.1 output:\n{current_output}", flush=True)
if re.search(prompt_regex, current_output):
is_prompt = True
print(f"Debug [check_hanging]: {agent} is_prompt={is_prompt}", flush=True)
if is_prompt:
state_key = f"{agent}_prompted"
print(f"Debug [check_hanging]: {agent} state={state.get(state_key)}", flush=True)
if not state.get(state_key):
print(f"Detected prompt in {agent}.")
state[state_key] = True
# Sub-agent prompt: Notify Orchestrator via file!
if agent != "grid.0" and agent != "orchestrator":
notification = {
"type": "prompt_approval",
"agent": agent,
"timestamp": time.time(),
"message": f"Sub-agent {agent} needs approval."
}
notif_path = f"agents/orchestrator/incoming/notif_{agent}_{int(time.time())}.json"
try:
with open(notif_path, 'w') as f:
json.dump(notification, f, indent=2)
print(f"Notified orchestrator via {notif_path}")
except Exception as e:
print(f"Failed to write notification: {e}")
else:
# Orchestrator prompt: Notify Master!
notify_master_prompt(agent, current_output)
else:
# Reset flag if not matching anymore
state_key = f"{agent}_prompted"
state[state_key] = False
except Exception as e:
print(f"Error capturing output for {agent}: {e}")
continue
# Check if we recently sent a command
is_busy = (now - state["last_cmd_time"]) < BUSY_THRESHOLD
print(f"Debug [check_hanging]: Agent: {agent}, is_busy: {is_busy}, now: {now}, last_cmd_time: {state['last_cmd_time']}")
if is_busy:
print(f"Debug [check_hanging]: Agent: {agent}, current_output len: {len(current_output)}, prev_output len: {len(state['last_output'])}")
if current_output != state["last_output"]:
state["last_output"] = current_output
state["last_output_time"] = now
state["prompted"] = False # Reset prompt flag on change
print(f"Debug [check_hanging]: Agent: {agent}, output changed.")
else:
# No change
silence_duration = now - state["last_output_time"]
print(f"Debug [check_hanging]: Agent: {agent}, silence_duration: {silence_duration}")
if silence_duration > HANGING_THRESHOLD:
print(f"Detected hanging agent: {agent}")
notify_master(agent, current_output)
# Reset to avoid spamming
state["last_cmd_time"] = 0
def add_notification(notification_type, agent, message):
notif_file = os.path.join(BASE_DIR, "comms", "master_notifications.json")
notif_data = []
if os.path.exists(notif_file):
try:
with open(notif_file, 'r') as f:
notif_data = json.load(f)
except Exception as e:
print(f"Error reading notification file: {e}")
notif_data = []
notif_data.append({
"timestamp": time.time(),
"type": notification_type,
"agent": agent,
"message": message
})
try:
os.makedirs(os.path.dirname(notif_file), exist_ok=True)
with open(notif_file, 'w') as f:
json.dump(notif_data, f, indent=2)
print(f"Added notification to {notif_file}")
except Exception as e:
print(f"Failed to write notification: {e}")
def notify_master(agent, last_output):
print(f"Logging notification about hanging agent {agent}...")
lines = last_output.split('\n')
last_snippet = "\n".join(lines[-10:]) if len(lines) > 10 else last_output
message = f"Warning: Agent {agent} seems to be hanging.\nLast output snippet:\n---\n{last_snippet}\n---"
add_notification("hanging", agent, message)
def notify_master_prompt(agent, last_output):
print(f"Logging notification about prompt in agent {agent}...")
lines = last_output.split('\n')
last_snippet = "\n".join(lines[-10:]) if len(lines) > 10 else last_output
message = f"Orchestrator Brain needs input.\nPrompt snippet:\n---\n{last_snippet}\n---"
add_notification("prompt", agent, message)
def extract_payload(text):
# Try to extract between ===RESULT=== and ===END_RESULT===
import re
match = re.search(r'===RESULT===(.*?)===END_RESULT===', text, re.DOTALL)
if match:
return match.group(1).strip()
lines = text.split('\n')
filtered_lines = []
for line in lines:
# Skip UI elements
if line.strip().startswith('▸'): continue
if line.strip().startswith('●'): continue
if line.strip().startswith('⚠'): continue
if line.strip().startswith('▀'): continue
if line.strip().startswith('█'): continue
if line.strip().startswith('──────────────────────────'): continue
if 'Keyboard: ↑/↓ Navigate' in line: continue
if '? for shortcuts' in line: continue
filtered_lines.append(line)
return '\n'.join(filtered_lines).strip()
def process_request(file_path):
try:
with open(file_path, 'r') as f:
req = json.load(f)
# Delete request file immediately to prevent processing loops
os.remove(file_path)
req_id = req.get("request_id")
req_type = req.get("type", "prompt")
target = req.get("target_agent") or ""
sender = req.get("sender")
resp_file = req.get("response_file")
# Loop Detection
trace = req.get("trace", [])
if target in trace:
raise ValueError(f"Loop detected! Target agent {target} is already in the trace: {trace}")
# Enforce boundary rules
if target == "master":
raise ValueError("Master agent cannot be triggered via file request!")
if sender == "master":
if target != "orchestrator":
raise ValueError("Master agent can trigger orchestrator only!")
if sender and (sender.startswith("grid.") or sender.startswith("agent_")):
# Subagent can trigger subagent or orchestrator
if target != "orchestrator" and not (target.startswith("grid.") or target.startswith("agent_")):
raise ValueError("Subagents can only trigger subagent or orchestrator!")
if not sender:
print(f"Warning: Request {req_id} missing 'sender' field. Boundary rules not fully enforced.")
# Mapping for Grid layout
if target == "orchestrator":
target = "grid.0"
print(f"Mapped orchestrator to grid.0")
if target.startswith("agent_"):
try:
agent_num = int(target.split("_")[1])
target = f"grid.{agent_num}"
print(f"Mapped {req.get('target_agent')} to {target}")
except ValueError:
pass
print(f"Processing {req_id} (Type: {req_type}) for {target}...")
output = ""
status = "success"
if req_type == "prompt":
prompt = req.get("prompt")
# Update state
state = AGENT_STATE.setdefault(target, {})
state["last_cmd_time"] = time.time()
# Capture output before sending to detect if it starts working
cap_cmd = f"tmux capture-pane -p -t {SESSION_NAME}:{target}"
try:
result = subprocess.run(cap_cmd, shell=True, capture_output=True, text=True, check=True)
output_before = result.stdout
except Exception:
output_before = ""
# Send keys
quoted_prompt = shlex.quote(prompt)
cmd = f"tmux send-keys -t {SESSION_NAME}:{target} {quoted_prompt} C-m"
subprocess.run(cmd, shell=True, check=True)
# Check shortly if agent started working
time.sleep(1)
try:
result = subprocess.run(cap_cmd, shell=True, capture_output=True, text=True, check=True)
output_after = result.stdout
except Exception:
output_after = ""
if output_before == output_after:
print(f"Agent {target} seems stuck, sending extra Enter...")
cmd = f"tmux send-keys -t {SESSION_NAME}:{target} C-m"
subprocess.run(cmd, shell=True, check=True)
time.sleep(1) # Wait a bit more
# Wait for agent to process further or write response file
response_file = req.get("response_file")
output = ""
file_found = False
if response_file:
# Poll for response file for up to 15 seconds
for i in range(15):
if os.path.exists(response_file):
print(f"Sub-agent response file found: {response_file}")
try:
with open(response_file, 'r') as f:
resp_data = json.load(f)
output = resp_data.get("output", "")
file_found = True
# Delete the file after reading it
os.remove(response_file)
break
except Exception as e:
print(f"Error reading response file: {e}")
time.sleep(1)
if not file_found:
print(f"Fallback: Capturing pane for {target}")
# Fallback: Capture pane for response
cap_cmd = f"tmux capture-pane -p -t {SESSION_NAME}:{target}"
result = subprocess.run(cap_cmd, shell=True, capture_output=True, text=True, check=True)
output = extract_payload(result.stdout)
elif req_type == "control":
action = req.get("action")
print(f"Control Action: {action}")
if action == "create":
agent_dir = os.path.join(BASE_DIR, "agents", target)
os.makedirs(agent_dir, exist_ok=True)
conv_id = req.get("conversation_id")
cli_cmd = f"{CLI_PATH} -cli=true"
if conv_id:
cli_cmd += f" -conversation={conv_id}"
# Create tmux window
cmd = f"tmux new-window -t {SESSION_NAME} -n {target} -c {agent_dir} \"{cli_cmd}\""
subprocess.run(cmd, shell=True, check=True)
output = f"Created agent {target} in window and directory."
if conv_id:
output += f" Resumed conversation {conv_id}."
elif action == "kill":
cmd = f"tmux kill-window -t {SESSION_NAME}:{target}"
subprocess.run(cmd, shell=True, check=True)
output = f"Killed agent {target} (window)."
elif action == "clear":
cmd = f"tmux send-keys -t {SESSION_NAME}:{target} C-l"
subprocess.run(cmd, shell=True, check=True)
output = f"Sent clear screen to {target}."
elif action == "resume":
conv_id = req.get("conversation_id")
cmd = f"tmux send-keys -t {SESSION_NAME}:{target} \"{CLI_PATH} -conversation={conv_id}\" C-m"
subprocess.run(cmd, shell=True, check=True)
output = f"Resumed conversation {conv_id} in {target}."
else:
status = "error"
output = f"Unknown control action: {action}"
elif req_type == "tmux":
command = req.get("command")
if command:
if not command.startswith("tmux "):
command = "tmux " + command
try:
subprocess.run(command, shell=True, check=True)
output = f"Executed tmux command: {command}"
except Exception as e:
status = "error"
output = f"Failed to execute tmux command: {e}"
else:
status = "error"
output = "Missing 'command' field for tmux request type."
else:
status = "error"
output = f"Unknown request type: {req_type}"
# Write response
resp_data = {
"request_id": req_id,
"status": status,
"output": output
}
# Ensure target file directory exists
os.makedirs(os.path.dirname(resp_file), exist_ok=True)
with open(resp_file, 'w') as f:
json.dump(resp_data, f, indent=2)
print(f"Completed {req_id}")
except Exception as e:
print(f"Error processing {file_path}: {e}")
def get_agent_dir_name(agent_name):
if agent_name == "grid.0":
return "orchestrator"
elif agent_name.startswith("grid."):
try:
idx = int(agent_name.split(".")[1])
return f"agent_{idx}"
except ValueError:
pass
return agent_name
def agent_dir_watcher(agent):
dir_name = get_agent_dir_name(agent)
agent_dir = os.path.join(BASE_DIR, "agents", dir_name)
incoming_dir = os.path.join(agent_dir, "incoming")
processing_dir = os.path.join(agent_dir, "processing")
output_dir = os.path.join(agent_dir, "output")
archive_dir = os.path.join(agent_dir, "archive")
print(f"Thread started for agent: {agent} (dir: {dir_name})")
while True:
try:
# 1. Watch Output
if os.path.exists(output_dir):
files = os.listdir(output_dir)
if files:
print(f"Debug [{dir_name} output]: {files}", flush=True)
for f in files:
if f.endswith(".json"):
match = re.match(r"^to_(agent_\d+|orchestrator|master)_([a-zA-Z0-9_]+)\.json$", f)
if match:
target = match.group(1)
msg_id = match.group(2)
target_dir_name = get_agent_dir_name(target)
# Validate target
target_inc_dir = os.path.join(BASE_DIR, "agents", target_dir_name, "incoming")
print(f"Debug [target_inc_dir]: {target_inc_dir}", flush=True)
if os.path.exists(target_inc_dir):
src_path = os.path.join(output_dir, f)
# Append rules to prompt if sending to a sub-agent
if target.startswith("agent_"):
try:
with open(src_path, 'r') as file_in:
msg_data = json.load(file_in)
if "prompt" in msg_data:
rules = "\n\n=== REMINDER: SWARM COMMUNICATION RULES ===\n1. DO NOT print blocking prompts or clarification requests to console. You MUST raise prompts via file communication in your 'output/' folder to inform upstream.\n2. You MUST write a response file to your 'output/' folder when you complete a task or need upper layer attention.\n3. Wrap your final answer in ===RESULT=== and ===END_RESULT=== markers.\n=========================================="
msg_data["prompt"] += rules
with open(src_path, 'w') as file_out:
json.dump(msg_data, file_out, indent=2)
except Exception as e:
print(f"Error appending rules: {e}")
dst_path = os.path.join(target_inc_dir, f"from_{agent}_{msg_id}.json")
try:
shutil.move(src_path, dst_path)
print(f"Moved {f} from {agent}/output to {target_dir_name}/incoming")
except Exception as e:
print(f"Error moving file: {e}")
else:
print(f"Target dir not found for {target_dir_name}")
else:
# Fallback for Orchestrator Brain results
if dir_name == "orchestrator" and f.startswith("res_") and f.endswith(".json"):
target = "master"
msg_id = f[4:-5]
target_dir_name = "master"
target_inc_dir = os.path.join(BASE_DIR, "agents", target_dir_name, "incoming")
if os.path.exists(target_inc_dir):
src_path = os.path.join(output_dir, f)
dst_path = os.path.join(target_inc_dir, f"from_{agent}_{msg_id}.json")
try:
shutil.move(src_path, dst_path)
print(f"Moved {f} from {agent}/output to {target_dir_name}/incoming (fallback)")
except Exception as e:
print(f"Error moving file: {e}")
else:
print(f"Target dir not found for {target_dir_name}")
else:
print(f"Ignored file not matching pattern: {f}")
# 2. Watch Incoming
if os.path.exists(incoming_dir):
files = os.listdir(incoming_dir)
for f in files:
if f.endswith(".json"):
src_path = os.path.join(incoming_dir, f)
dst_path = os.path.join(processing_dir, f)
try:
shutil.move(src_path, dst_path)
print(f"Moved {f} to {dir_name}/processing")
# Notify agent pane
cmd = f"tmux send-keys -t {SESSION_NAME}:{agent} \"Read file agents/{dir_name}/processing/{f}\" C-m"
subprocess.run(cmd, shell=True, check=True)
except Exception as e:
print(f"Error processing incoming file: {e}")
except Exception as e:
print(f"Error in thread for {agent}: {e}")
time.sleep(1)
def main():
print(f"Swarm Orchestrator started polling in {REQUESTS_DIR}...")
spawned_agents = set()
while True:
try:
# Dynamic discovery of agents
agents = get_active_agents()
for agent in agents:
if agent not in spawned_agents:
t = threading.Thread(target=agent_dir_watcher, args=(agent,))
t.daemon = True
t.start()
spawned_agents.add(agent)
# Check hanging agents
check_hanging()
# Check requests
files = os.listdir(REQUESTS_DIR)
for file in files:
if file.endswith(".json"):
file_path = os.path.join(REQUESTS_DIR, file)
process_request(file_path)
except Exception as e:
print(f"Polling error: {e}")
time.sleep(2)
if __name__ == "__main__":
main()