import os
import subprocess
import time
import re
SESSION_NAME = "jetski_swarm"
def get_windows():
cmd = f"tmux list-windows -t {SESSION_NAME} -F '#I:#W'"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
print("Error listing windows. Is tmux running?")
return []
return result.stdout.strip().split('\n')
def get_panes(window_id):
cmd = f"tmux list-panes -t {SESSION_NAME}:{window_id} -F '#P'"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode != 0:
return []
return result.stdout.strip().split('\n')
def capture_pane(target):
cmd = f"tmux capture-pane -p -t {SESSION_NAME}:{target}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
return result.stdout
def send_keys(target, keys):
cmd = f"tmux send-keys -t {SESSION_NAME}:{target} {keys}"
subprocess.run(cmd, shell=True)
def analyze_and_respond(target, content):
# Heuristic 1: File Access Prompt
if "Allow access to this file?" in content:
print(f"Detected File Access prompt in target {target}")
# Search for "always allow"
match = re.search(r"([0-9]+)\..*always allow.*", content, re.IGNORECASE)
if match:
option = match.group(1)
print(f"Found 'always allow' option: {option}. Sending...")
send_keys(target, f"{option} Enter")
return True
else:
# Fallback to option 2 if not found but prompt exists
print("Could not find specific 'always allow' option. Defaulting to 2.")
send_keys(target, "2 Enter")
return True
# Heuristic 2: Confirmation Prompt [y/n]
if re.search(r"\[y/n\]", content, re.IGNORECASE):
print(f"Detected [y/N] prompt in target {target}")
send_keys(target, "y Enter")
return True
# Heuristic 3: Confirmation Prompt with default Yes [Y/n]
if re.search(r"\[Y/n\]", content):
print(f"Detected [Y/n] prompt in target {target}")
send_keys(target, "y Enter")
return True
return False
def process_target(target):
# Poll a few times to give it a chance to settle or show prompt
for _ in range(3):
content = capture_pane(target)
if analyze_and_respond(target, content):
# Wait a bit after sending keys for screen to update
time.sleep(2)
return True
time.sleep(1)
return False
def main():
print("Starting Advanced Swarm Auto Aligner...")
windows = get_windows()
if not windows:
return
for win in windows:
if not win:
continue
try:
win_id, win_name = win.split(':')
except ValueError:
continue
# Skip orchestrator
if win_name in ["orchestrator"]:
continue
print(f"Checking Window: {win_id} ({win_name})")
if win_name == "grid":
panes = get_panes(win_id)
for pane in panes:
target = f"{win_id}.{pane}"
print(f"Checking Grid Pane: {target}")
process_target(target)
else:
process_target(win_id)
if __name__ == "__main__":
main()