Newer
Older
cortex-hub / agent-node / agent_node / skills / manager.py
import threading
import os
import importlib.util
from concurrent import futures
from agent_node.skills.base import BaseSkill
from agent_node.config import MAX_SKILL_WORKERS

class SkillManager:
    """Orchestrates multiple modular skills and manages the task worker pool."""
    def __init__(self, max_workers=MAX_SKILL_WORKERS, sync_mgr=None):
        self.executor = futures.ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="skill-worker")
        self.active_tasks = {} # task_id -> future
        self.sync_mgr = sync_mgr
        self.skills = self._discover_skills(sync_mgr)
        self.max_workers = max_workers
        self.lock = threading.Lock()

    def _discover_skills(self, sync_mgr):
        """Scans the skills/ directory for logic.py and loads skill implementations."""
        # Find candidate locations for skills
        # 1. Monorepo root (../../../skills from this file)
        # 2. Agent-node local (../../skills from this file)
        # 3. Docker standard (/app/skills)
        candidates = [
            os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../skills")),
            os.path.abspath(os.path.join(os.path.dirname(__file__), "../../skills")),
            "/app/skills",
            "/app/node_skills"
        ]
        
        skills_dir = None
        for cand in candidates:
            if os.path.exists(cand) and os.path.isdir(cand):
                # Ensure it's not a broken symlink and has actual content
                try:
                    if any(os.path.isdir(os.path.join(cand, d)) for d in os.listdir(cand)):
                        skills_dir = cand
                        break
                except OSError:
                    continue
        
        discovered = {}
        if not skills_dir:
            print(f"    [๐Ÿ”งโš ๏ธ] Skills directory not found in candidate locations: {candidates}")
            return discovered
            
        print(f"    [๐Ÿ”ง] Using skills directory: {skills_dir}")
        for skill_dir in os.listdir(skills_dir):
            item_path = os.path.join(skills_dir, skill_dir)
            if os.path.isdir(item_path):
                logic_py = os.path.join(item_path, "logic.py")
                if os.path.exists(logic_py):
                    # Dynamic import
                    try:
                        spec = importlib.util.spec_from_file_location(f"skill_{skill_dir}", logic_py)
                        module = importlib.util.module_from_spec(spec)
                        spec.loader.exec_module(module)
                        
                        # Find the first class that inherits from BaseSkill
                        for attr_name in dir(module):
                            attr = getattr(module, attr_name)
                            if isinstance(attr, type) and issubclass(attr, BaseSkill) and attr is not BaseSkill:
                                # We map the internal skill name (e.g. mesh_terminal_control) 
                                # if we can find it in the module or assume it based on folder name
                                # For backward compatibility with task_type routing, we map common ones
                                instance = attr(sync_mgr=sync_mgr)
                                discovered[skill_dir] = instance
                                # Also map legacy names for the routing engine below
                                if "terminal" in skill_dir or "shell" in skill_dir:
                                    discovered["shell"] = instance
                                if "browser" in skill_dir:
                                    discovered["browser"] = instance
                                if "file" in skill_dir:
                                    discovered["file"] = instance
                                break
                    except Exception as e:
                        print(f"    [๐Ÿ”งโŒ] Failed to load skill from {logic_py}: {e}")
        
        print(f"    [๐Ÿ”ง] Discovered skills: {list(discovered.keys())}")
        return discovered

    def submit(self, task, sandbox, on_complete, on_event=None):
        """Routes a task to the appropriate skill and submits it to the thread pool."""
        # --- 0. Transparent TTY Bypass (Gaming Performance) ---
        # Keystrokes and Resizes should NEVER wait for a thread or be blocked by sandbox
        if "shell" in self.skills and hasattr(self.skills["shell"], "handle_transparent_tty"):
            if self.skills["shell"].handle_transparent_tty(task, on_complete, on_event):
                return True, "Accepted (Transparent)"

        with self.lock:
            if len(self.active_tasks) >= self.max_workers:
                return False, "Node Capacity Reached"
            
            # 1. Routing Engine
            skill = None
            if task.HasField("browser_action"):
                skill = self.skills.get("browser")
            elif task.task_type == "file":
                skill = self.skills.get("file")
            else:
                # Default to the one that looks like a shell
                skill = self.skills.get("shell")
            
            if not skill:
                return False, f"Target skill not available for task type: {task.task_type}"
            
            # 2. Execution submission
            future = self.executor.submit(skill.execute, task, sandbox, on_complete, on_event)
            self.active_tasks[task.task_id] = future
            
            # Cleanup hook
            future.add_done_callback(lambda f: self._cleanup(task.task_id))
            return True, "Accepted"

    def cancel(self, task_id):
        """Attempts to cancel an active task through all registered skills."""
        with self.lock:
            cancelled = any(s.cancel(task_id) for s in self.skills.values())
            return cancelled

    def get_active_ids(self):
        """Returns the list of currently running task IDs."""
        with self.lock:
            return list(self.active_tasks.keys())

    def _cleanup(self, task_id):
        """Internal callback to release capacity when a task finishes."""
        with self.lock: 
            self.active_tasks.pop(task_id, None)

    def shutdown(self):
        """Triggers shutdown for all skills and the worker pool."""
        print("[๐Ÿ”ง] Shutting down Skill Manager...")
        with self.lock:
            # Use set to avoid shutting down the same instance multiple times due to alias mapping
            for skill in set(self.skills.values()):
                skill.shutdown()
        # Shutdown thread pool
        self.executor.shutdown(wait=True)