import asyncio
import logging
from datetime import datetime, timedelta
import croniter
from sqlalchemy.orm import Session
from app.db.session import SessionLocal
from app.db.models.agent import AgentInstance, AgentTrigger
from app.core.orchestration.agent_loop import AgentExecutor
logger = logging.getLogger(__name__)
class AgentScheduler:
def __init__(self, services):
self.services = services
self._running = False
self._last_run_map = {} # instance_id -> last_run_timestamp
async def start(self):
"""Task 4: Initialize the background agent scheduler."""
if self._running:
return
self._running = True
# Area 4.1: The Zombie Sweeper
asyncio.create_task(self._zombie_sweeper_loop())
# Area 3 / Area 4: Periodic / CRON Trigger Executor
asyncio.create_task(self._cron_trigger_loop())
logger.info("[Scheduler] Agent background services (Zombie Sweeper & CRON) started.")
async def _zombie_sweeper_loop(self):
"""Task 4.1: Detects dead agent loops and resets them to idle/active retry."""
while self._running:
try:
db = SessionLocal()
# Find active agents that haven't heartbeat in 3+ minutes
timeout = datetime.utcnow() - timedelta(minutes=3)
zombies = db.query(AgentInstance).filter(
AgentInstance.status == 'active',
AgentInstance.last_heartbeat < timeout
).all()
for zombie in zombies:
logger.warning(f"[Scheduler] Zombie Agent detected: {zombie.id}. Resetting to idle for recovery.")
zombie.status = 'idle' # The CRON/Webhook will pick it back up
db.commit()
db.close()
except Exception as e:
logger.error(f"[Scheduler] Zombie Sweeper iteration failed: {e}")
await asyncio.sleep(60) # Run every minute
async def _cron_trigger_loop(self):
"""Task 3: Handles periodic agent waking (e.g., your 30s test case)."""
while self._running:
try:
db = SessionLocal()
# Fetch all agents with CRON triggers
triggers = db.query(AgentTrigger).filter(AgentTrigger.trigger_type == 'cron').all()
now = datetime.utcnow()
for trigger in triggers:
instance_id = trigger.instance_id
cron_expr = trigger.cron_expression # e.g. "*/30 * * * * *" for 30s
if not cron_expr:
continue
# Custom handling for non-standard 30s test case if it's just an integer
# OR use croniter for standard cron
should_fire = False
try:
# Fallback for simple integer "30" representing seconds
if cron_expr.isdigit():
interval = int(cron_expr)
last_run = self._last_run_map.get(instance_id, datetime.min)
if (now - last_run).total_seconds() >= interval:
should_fire = True
else:
# Standard CRON parsing
iter = croniter.croniter(cron_expr, now)
# This is a bit simplistic, usually you'd check if we crossed a boundary
# since the last check (30s ago)
last_run = self._last_run_map.get(instance_id, now - timedelta(seconds=35))
if iter.get_next(datetime) <= now:
should_fire = True
except Exception as ce:
logger.error(f"[Scheduler] Invalid cron expression '{cron_expr}' for agent {instance_id}: {ce}")
continue
if should_fire:
instance = db.query(AgentInstance).filter(AgentInstance.id == instance_id).first()
if instance and instance.status != 'active': # Don't double-trigger if still active
logger.info(f"[Scheduler] CRON WAKEUP: Triggering Agent {instance_id} (Cron: {cron_expr})")
# Update last run BEFORE firing to prevent racing
self._last_run_map[instance_id] = now
# Determine prompt (Default if none on trigger)
prompt = "SYSTEM: CRON WAKEUP"
# If it's your 2+2 test case, ensure the persona or prompt is correct.
# In future, we could store the prompt in the Trigger record.
# Fire! (Asynchronous background task)
asyncio.create_task(AgentExecutor.run(
instance_id,
prompt,
self.services.rag_service,
self.services.user_service
))
db.close()
except Exception as e:
logger.error(f"[Scheduler] CRON Trigger loop error: {e}")
await asyncio.sleep(10) # Resolution: Check every 10 seconds