Newer
Older
cortex-hub / ai-hub / app / core / orchestration / scheduler.py
import asyncio
import logging
from datetime import datetime, timedelta
import croniter
from sqlalchemy.orm import Session
from app.db.session import SessionLocal
from app.db.models.agent import AgentInstance, AgentTrigger
from app.core.orchestration.agent_loop import AgentExecutor

logger = logging.getLogger(__name__)

class AgentScheduler:
    def __init__(self, services):
        self.services = services
        self._running = False
        self._last_run_map = {} # instance_id -> last_run_timestamp

    async def start(self):
        """Task 4: Initialize the background agent scheduler."""
        if self._running:
            return
        self._running = True
        
        # Area 4.1: The Zombie Sweeper
        asyncio.create_task(self._zombie_sweeper_loop())
        
        # Area 3 / Area 4: Periodic / CRON Trigger Executor
        asyncio.create_task(self._cron_trigger_loop())
        
        logger.info("[Scheduler] Agent background services (Zombie Sweeper & CRON) started.")

    async def _zombie_sweeper_loop(self):
        """Task 4.1: Detects dead agent loops and resets them to idle/active retry."""
        while self._running:
            try:
                db = SessionLocal()
                # Find active agents that haven't heartbeat in 3+ minutes
                timeout = datetime.utcnow() - timedelta(minutes=3)
                zombies = db.query(AgentInstance).filter(
                    AgentInstance.status == 'active',
                    AgentInstance.last_heartbeat < timeout
                ).all()

                for zombie in zombies:
                    logger.warning(f"[Scheduler] Zombie Agent detected: {zombie.id}. Resetting to idle for recovery.")
                    zombie.status = 'idle'  # The CRON/Webhook will pick it back up
                
                db.commit()
                db.close()
            except Exception as e:
                logger.error(f"[Scheduler] Zombie Sweeper iteration failed: {e}")
            
            await asyncio.sleep(60) # Run every minute

    async def _cron_trigger_loop(self):
        """Task 3: Handles periodic agent waking (e.g., your 30s test case)."""
        while self._running:
            try:
                db = SessionLocal()
                # Fetch all agents with CRON triggers
                triggers = db.query(AgentTrigger).filter(AgentTrigger.trigger_type == 'cron').all()
                
                now = datetime.utcnow()
                
                for trigger in triggers:
                    instance_id = trigger.instance_id
                    cron_expr = trigger.cron_expression # e.g. "*/30 * * * * *" for 30s
                    
                    if not cron_expr:
                        continue
                        
                    # Custom handling for non-standard 30s test case if it's just an integer 
                    # OR use croniter for standard cron
                    should_fire = False
                    
                    try:
                        # Fallback for simple integer "30" representing seconds
                        if cron_expr.isdigit():
                            interval = int(cron_expr)
                            last_run = self._last_run_map.get(instance_id, datetime.min)
                            if (now - last_run).total_seconds() >= interval:
                                should_fire = True
                        else:
                            # Standard CRON parsing
                            iter = croniter.croniter(cron_expr, now)
                            # This is a bit simplistic, usually you'd check if we crossed a boundary
                            # since the last check (30s ago)
                            last_run = self._last_run_map.get(instance_id, now - timedelta(seconds=35))
                            if iter.get_next(datetime) <= now:
                                should_fire = True
                    except Exception as ce:
                        logger.error(f"[Scheduler] Invalid cron expression '{cron_expr}' for agent {instance_id}: {ce}")
                        continue

                    if should_fire:
                        instance = db.query(AgentInstance).filter(AgentInstance.id == instance_id).first()
                        if instance and instance.status != 'active': # Don't double-trigger if still active
                            logger.info(f"[Scheduler] CRON WAKEUP: Triggering Agent {instance_id} (Cron: {cron_expr})")
                            
                            # Update last run BEFORE firing to prevent racing
                            self._last_run_map[instance_id] = now
                            
                            # Determine prompt (Default if none on trigger)
                            prompt = "SYSTEM: CRON WAKEUP"
                            # If it's your 2+2 test case, ensure the persona or prompt is correct.
                            # In future, we could store the prompt in the Trigger record.
                            
                            # Fire! (Asynchronous background task)
                            asyncio.create_task(AgentExecutor.run(
                                instance_id, 
                                prompt, 
                                self.services.rag_service, 
                                self.services.user_service
                            ))
                
                db.close()
            except Exception as e:
                logger.error(f"[Scheduler] CRON Trigger loop error: {e}")
            
            await asyncio.sleep(10) # Resolution: Check every 10 seconds