import asyncio
import logging
from datetime import datetime, timedelta
import croniter
from sqlalchemy.orm import Session
from app.db.session import SessionLocal
from app.db.models.agent import AgentInstance, AgentTrigger
from app.core.orchestration.agent_loop import AgentExecutor
logger = logging.getLogger(__name__)
class AgentScheduler:
def __init__(self, services):
self.services = services
self._running = False
self._last_run_map = {} # instance_id -> last_run_timestamp
async def start(self):
"""Task 4: Initialize the background agent scheduler."""
if self._running:
return
self._running = True
# Area 4.1: The Zombie Sweeper
asyncio.create_task(self._zombie_sweeper_loop())
# Area 3 / Area 4: Periodic / CRON Trigger Executor
asyncio.create_task(self._cron_trigger_loop())
logger.info("[Scheduler] Agent background services (Zombie Sweeper & CRON) started.")
async def _zombie_sweeper_loop(self):
"""Task 4.1: Detects dead agent loops and resets them to idle/active retry."""
while self._running:
try:
db = SessionLocal()
# Find active agents that haven't heartbeat in 3+ minutes
timeout = datetime.utcnow() - timedelta(minutes=3)
zombies = db.query(AgentInstance).filter(
AgentInstance.status == 'active',
AgentInstance.last_heartbeat < timeout
).all()
for zombie in zombies:
logger.warning(f"[Scheduler] Zombie Agent detected: {zombie.id}. Resetting to idle for recovery.")
zombie.status = 'idle' # The CRON/Webhook will pick it back up
db.commit()
db.close()
except Exception as e:
logger.error(f"[Scheduler] Zombie Sweeper iteration failed: {e}")
await asyncio.sleep(60) # Run every minute
async def _cron_trigger_loop(self):
"""Task 3: Handles periodic agent waking for both CRON and Interval triggers."""
while self._running:
try:
db = SessionLocal()
now = datetime.utcnow()
# --- Handle CRON triggers ---
cron_triggers = db.query(AgentTrigger).filter(AgentTrigger.trigger_type == 'cron').all()
for trigger in cron_triggers:
instance_id = trigger.instance_id
cron_expr = trigger.cron_expression
if not cron_expr:
continue
should_fire = False
try:
if cron_expr.isdigit():
interval = int(cron_expr)
last_run = self._last_run_map.get(instance_id, datetime.min)
if (now - last_run).total_seconds() >= interval:
should_fire = True
else:
iter = croniter.croniter(cron_expr, now)
last_run = self._last_run_map.get(instance_id, now - timedelta(seconds=35))
if iter.get_next(datetime) <= now:
should_fire = True
except Exception as ce:
logger.error(f"[Scheduler] Invalid cron expression '{cron_expr}' for agent {instance_id}: {ce}")
continue
if should_fire:
instance = db.query(AgentInstance).filter(AgentInstance.id == instance_id).first()
if instance and instance.status != 'active':
prompt = trigger.default_prompt or "SYSTEM: CRON WAKEUP"
logger.info(f"[Scheduler] CRON WAKEUP: Triggering Agent {instance_id} (Cron: {cron_expr})")
self._last_run_map[instance_id] = now
asyncio.create_task(AgentExecutor.run(
instance_id, prompt,
self.services.rag_service, self.services.user_service
))
# --- Handle INTERVAL triggers ---
interval_triggers = db.query(AgentTrigger).filter(AgentTrigger.trigger_type == 'interval').all()
for trigger in interval_triggers:
instance_id = trigger.instance_id
wait_seconds = trigger.interval_seconds or 60
instance = db.query(AgentInstance).filter(AgentInstance.id == instance_id).first()
if not instance:
continue
# Only fire if agent is idle (finished previous run)
if instance.status == 'active':
continue
last_run = self._last_run_map.get(instance_id, datetime.min)
elapsed = (now - last_run).total_seconds()
if elapsed >= wait_seconds:
prompt = trigger.default_prompt or "SYSTEM: INTERVAL WAKEUP"
logger.info(f"[Scheduler] INTERVAL WAKEUP: Triggering Agent {instance_id} (Wait: {wait_seconds}s, Elapsed: {elapsed:.0f}s)")
self._last_run_map[instance_id] = now
asyncio.create_task(AgentExecutor.run(
instance_id, prompt,
self.services.rag_service, self.services.user_service
))
db.close()
except Exception as e:
logger.error(f"[Scheduler] CRON/Interval Trigger loop error: {e}")
await asyncio.sleep(10) # Resolution: Check every 10 seconds