import os
import time
import threading
import logging
logger = logging.getLogger(__name__)
class SelfWatchdog:
"""
A robust "panic" mechanism to prevent the agent from hanging indefinitely.
If the agent's core loops (Heartbeat or TaskStream) fail to check in
for a specified threshold, this watchdog will force-terminate the process
using os._exit(1), allowing system supervisors (launchd/systemd) to
restart it from scratch.
"""
def __init__(self, threshold_secs=300):
self.threshold = threshold_secs
self.last_checkin = time.time()
self._stop_event = threading.Event()
self._thread = None
self.enabled = True
def tick(self):
"""Called by core loops to signal they are alive."""
self.last_checkin = time.time()
def start(self):
if self._thread and self._thread.is_alive():
return
self._thread = threading.Thread(target=self._run, daemon=True, name="SelfWatchdog")
self._thread.start()
logger.info(f"[*] SelfWatchdog started (threshold: {self.threshold}s)")
def stop(self):
self._stop_event.set()
def _run(self):
while not self._stop_event.is_set():
time.sleep(10)
if not self.enabled:
continue
elapsed = time.time() - self.last_checkin
if elapsed > self.threshold:
# PANIC
print(f"\n" + "!"*80)
print(f" CRITICAL: SELF-WATCHDOG PANIC!")
print(f" No activity detected for {int(elapsed)}s (threshold: {self.threshold}s).")
print(f" This usually indicates a PTY or gRPC deadlock.")
print(f" FORCING HARD TERMINATION FOR SYSTEM RESTART...")
print("!"*80 + "\n", flush=True)
# We use os._exit(1) specifically to bypass all try/except,
# signal handlers, and main.py loop persistence.
# This ensures the process actually dies so launchd/systemd can see it.
os._exit(1)
# Global singleton
watchdog = SelfWatchdog()