import os
import time
import json
import subprocess
# Self-contained paths relative to this script
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
REQUESTS_DIR = os.path.join(BASE_DIR, "comms", "requests")
RESPONSES_DIR = os.path.join(BASE_DIR, "comms", "responses")
SESSION_NAME = "jetski_swarm"
CLI_PATH = "/google/bin/releases/jetski-devs/tools/cli"
# State tracking
# { target: { "last_cmd_time": timestamp, "last_output_time": timestamp, "last_output": str } }
AGENT_STATE = {}
HANGING_THRESHOLD = 30 # seconds of no output change while busy
BUSY_THRESHOLD = 300 # seconds max execution time
def get_active_agents():
try:
cmd = f"tmux list-windows -t {SESSION_NAME} -F '#{{window_name}}'"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, check=True)
windows = result.stdout.strip().split('\n')
agents = []
for w in windows:
if w in ["master", "orchestrator"]:
continue
if w == "grid":
# List panes
cmd = f"tmux list-panes -t {SESSION_NAME}:grid -F '#P'"
res = subprocess.run(cmd, shell=True, capture_output=True, text=True, check=True)
panes = res.stdout.strip().split('\n')
agents.extend([f"grid.{p}" for p in panes])
else:
agents.append(w)
return agents
except Exception as e:
print(f"Error listing agents: {e}")
return []
def check_hanging():
agents = get_active_agents()
now = time.time()
for agent in agents:
state = AGENT_STATE.setdefault(agent, {
"last_cmd_time": 0,
"last_output_time": now,
"last_output": ""
})
# Check if we recently sent a command
is_busy = (now - state["last_cmd_time"]) < BUSY_THRESHOLD
print(f"Debug [check_hanging]: Agent: {agent}, is_busy: {is_busy}, now: {now}, last_cmd_time: {state['last_cmd_time']}")
if is_busy:
# Capture output
try:
cap_cmd = f"tmux capture-pane -p -t {SESSION_NAME}:{agent}"
result = subprocess.run(cap_cmd, shell=True, capture_output=True, text=True, check=True)
current_output = result.stdout
print(f"Debug [check_hanging]: Agent: {agent}, current_output len: {len(current_output)}, prev_output len: {len(state['last_output'])}")
if current_output != state["last_output"]:
state["last_output"] = current_output
state["last_output_time"] = now
print(f"Debug [check_hanging]: Agent: {agent}, output changed.")
else:
# No change
silence_duration = now - state["last_output_time"]
print(f"Debug [check_hanging]: Agent: {agent}, silence_duration: {silence_duration}")
if silence_duration > HANGING_THRESHOLD:
print(f"Detected hanging agent: {agent}")
notify_master(agent, current_output)
# Reset to avoid spamming
state["last_cmd_time"] = 0
except Exception as e:
print(f"Error checking agent {agent}: {e}")
def notify_master(agent, last_output):
print(f"Notifying master about hanging agent {agent}...")
# Extract last few lines of output
lines = last_output.split('\n')
last_snippet = "\n".join(lines[-5:]) if len(lines) > 5 else last_output
message = f"\n[System] Warning: Agent {agent} seems to be hanging.\nLast output snippet:\n---\n{last_snippet}\n---\nWhat should we do?"
try:
# Send keys to master pane
# We need to escape quotes in message
escaped_msg = message.replace('"', '\\"')
cmd = f"tmux send-keys -t {SESSION_NAME}:master \"{escaped_msg}\" C-m"
subprocess.run(cmd, shell=True, check=True)
except Exception as e:
print(f"Failed to notify master: {e}")
def process_request(file_path):
try:
with open(file_path, 'r') as f:
req = json.load(f)
req_id = req.get("request_id")
req_type = req.get("type", "prompt")
target = req.get("target_agent")
resp_file = req.get("response_file")
# Mapping for Grid layout
if target.startswith("agent_"):
try:
agent_num = int(target.split("_")[1])
target = f"grid.{agent_num - 1}"
print(f"Mapped {req.get('target_agent')} to {target}")
except ValueError:
pass
print(f"Processing {req_id} (Type: {req_type}) for {target}...")
output = ""
status = "success"
if req_type == "prompt":
prompt = req.get("prompt")
# Update state
state = AGENT_STATE.setdefault(target, {})
state["last_cmd_time"] = time.time()
# Send keys
cmd = f"tmux send-keys -t {SESSION_NAME}:{target} \"{prompt}\" C-m"
subprocess.run(cmd, shell=True, check=True)
# Wait for agent to process
time.sleep(5)
# Capture pane for response
cap_cmd = f"tmux capture-pane -p -t {SESSION_NAME}:{target}"
result = subprocess.run(cap_cmd, shell=True, capture_output=True, text=True, check=True)
output = result.stdout
elif req_type == "control":
action = req.get("action")
print(f"Control Action: {action}")
if action == "create":
agent_dir = os.path.join(BASE_DIR, "agents", target)
os.makedirs(agent_dir, exist_ok=True)
conv_id = req.get("conversation_id")
cli_cmd = f"{CLI_PATH} -cli=true"
if conv_id:
cli_cmd += f" -conversation={conv_id}"
# Create tmux window
cmd = f"tmux new-window -t {SESSION_NAME} -n {target} -c {agent_dir} \"{cli_cmd}\""
subprocess.run(cmd, shell=True, check=True)
output = f"Created agent {target} in window and directory."
if conv_id:
output += f" Resumed conversation {conv_id}."
elif action == "kill":
cmd = f"tmux kill-window -t {SESSION_NAME}:{target}"
subprocess.run(cmd, shell=True, check=True)
output = f"Killed agent {target} (window)."
elif action == "clear":
cmd = f"tmux send-keys -t {SESSION_NAME}:{target} C-l"
subprocess.run(cmd, shell=True, check=True)
output = f"Sent clear screen to {target}."
elif action == "resume":
conv_id = req.get("conversation_id")
cmd = f"tmux send-keys -t {SESSION_NAME}:{target} \"{CLI_PATH} -conversation={conv_id}\" C-m"
subprocess.run(cmd, shell=True, check=True)
output = f"Resumed conversation {conv_id} in {target}."
else:
status = "error"
output = f"Unknown control action: {action}"
else:
status = "error"
output = f"Unknown request type: {req_type}"
# Write response
resp_data = {
"request_id": req_id,
"status": status,
"output": output
}
# Ensure target file directory exists
os.makedirs(os.path.dirname(resp_file), exist_ok=True)
with open(resp_file, 'w') as f:
json.dump(resp_data, f, indent=2)
# Delete request
os.remove(file_path)
print(f"Completed {req_id}")
except Exception as e:
print(f"Error processing {file_path}: {e}")
def main():
print(f"Swarm Orchestrator started polling in {REQUESTS_DIR}...")
while True:
try:
# Check hanging agents
check_hanging()
# Check requests
files = os.listdir(REQUESTS_DIR)
for file in files:
if file.endswith(".json"):
file_path = os.path.join(REQUESTS_DIR, file)
process_request(file_path)
except Exception as e:
print(f"Polling error: {e}")
time.sleep(2)
if __name__ == "__main__":
main()