Newer
Older
cortex-hub / agent-node / src / agent_node / skills / manager.py
import threading
import os
import importlib.util
from concurrent import futures
from agent_node.skills.base import BaseSkill
from agent_node.config import MAX_SKILL_WORKERS

class SkillManager:
    """Orchestrates multiple modular skills and manages the task worker pool."""
    def __init__(self, max_workers=MAX_SKILL_WORKERS, sync_mgr=None):
        self.executor = futures.ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="skill-worker")
        self.active_tasks = {} # task_id -> future
        self.sync_mgr = sync_mgr
        self.max_workers = max_workers
        self.lock = threading.Lock()
        
        # 1. Start with Hardcoded Bridges (Pure Programmatic Logic)
        self.skills = self._load_core_bridges(sync_mgr)
        
        # 2. Optionally supplement with dynamic skills from disk
        dynamic_skills = self._discover_skills(sync_mgr)
        self.skills.update(dynamic_skills)

    def _load_core_bridges(self, sync_mgr):
        """Hard-imports the core execution bridges to ensure they are always available."""
        bridges = {}
        # Shell Bridge
        try:
            from agent_node.skills.shell_bridge import ShellSkill
            instance = ShellSkill(sync_mgr=sync_mgr)
            bridges["shell"] = instance
            bridges["mesh-terminal-control"] = instance
            print("    [๐Ÿ”ง๐Ÿ“ฆ] Core Shell Bridge Loaded.")
        except ImportError as e:
            print(f"    [๐Ÿ”งโš ๏ธ] Fatal: Core Shell Bridge not found: {e}")
        
        # File Bridge
        try:
            from agent_node.skills.file_bridge import FileSkill
            instance = FileSkill(sync_mgr=sync_mgr)
            bridges["file"] = instance
            bridges["mesh-file-explorer"] = instance
            print("    [๐Ÿ”ง๐Ÿ“ฆ] Core File Bridge Loaded.")
        except ImportError: pass

        # Browser Bridge
        try:
            from agent_node.skills.browser_bridge import BrowserSkill
            instance = BrowserSkill(sync_mgr=sync_mgr)
            bridges["browser"] = instance
            bridges["browser-automation-agent"] = instance
            print("    [๐Ÿ”ง๐Ÿ“ฆ] Core Browser Bridge Loaded.")
        except ImportError: pass
            
        return bridges

    def _discover_skills(self, sync_mgr):
        """Scans the disk for additional logic.py plugins (supplemental)."""
        # Find candidate locations for skills
        candidates = [
            "/app/skills",
            "/app/node_skills"
        ]
        
        skills_dir = None
        for cand in candidates:
            if os.path.exists(cand) and os.path.isdir(cand):
                try:
                    if any(os.path.isdir(os.path.join(cand, d)) for d in os.listdir(cand)):
                        skills_dir = cand
                        break
                except OSError:
                    continue
        
        discovered = {}
        if not skills_dir:
            return discovered
            
        print(f"    [๐Ÿ”ง] Scanning supplemental skills: {skills_dir}")
        for skill_dir in os.listdir(skills_dir):
            if skill_dir in self.skills: continue # Skip if already hardcoded
            
            item_path = os.path.join(skills_dir, skill_dir)
            if os.path.isdir(item_path):
                logic_py = os.path.join(item_path, "logic.py")
                if os.path.exists(logic_py):
                    # Dynamic import
                    try:
                        spec = importlib.util.spec_from_file_location(f"skill_{skill_dir}", logic_py)
                        module = importlib.util.module_from_spec(spec)
                        if spec.loader:
                            spec.loader.exec_module(module)
                        
                        # Robust class detection
                        for attr_name in dir(module):
                            attr = getattr(module, attr_name)
                            if isinstance(attr, type) and any(b.__name__ == 'BaseSkill' for b in attr.__mro__) and attr.__name__ != 'BaseSkill':
                                try:
                                    instance = attr(sync_mgr=sync_mgr)
                                    discovered[skill_dir] = instance
                                    print(f"    [๐Ÿ”งโœ…] Loaded supplemental skill: {skill_dir}")
                                except Exception as e:
                                    print(f"    [๐Ÿ”งโŒ] Failed to instantiate skill {skill_dir}: {e}")
                                break 
                    except Exception as e:
                        print(f"    [๐Ÿ”งโŒ] Failed to load skill from {logic_py}: {e}")
        
        return discovered

    def submit(self, task, sandbox, on_complete, on_event=None):
        """Routes a task to the appropriate skill and submits it to the thread pool."""
        # --- 0. Transparent TTY Bypass (Gaming Performance) ---
        # Keystrokes and Resizes should NEVER wait for a thread or be blocked by sandbox
        if "shell" in self.skills and hasattr(self.skills["shell"], "handle_transparent_tty"):
            if self.skills["shell"].handle_transparent_tty(task, on_complete, on_event):
                return True, "Accepted (Transparent)"

        with self.lock:
            if len(self.active_tasks) >= self.max_workers:
                return False, "Node Capacity Reached"
            
            # 1. Routing Engine
            skill = None
            if task.HasField("browser_action"):
                skill = self.skills.get("browser")
            elif task.task_type == "file":
                skill = self.skills.get("file")
            else:
                # Default to the one that looks like a shell
                skill = self.skills.get("shell")
            
            if not skill:
                return False, f"Target skill not available for task type: {task.task_type}"
            
            # 2. Execution submission
            future = self.executor.submit(skill.execute, task, sandbox, on_complete, on_event)
            self.active_tasks[task.task_id] = future
            
            # Cleanup hook
            future.add_done_callback(lambda f: self._cleanup(task.task_id))
            return True, "Accepted"

    def cancel(self, task_id):
        """Attempts to cancel an active task through all registered skills."""
        with self.lock:
            cancelled = any(s.cancel(task_id) for s in self.skills.values())
            return cancelled

    def get_active_ids(self):
        """Returns the list of currently running task IDs."""
        with self.lock:
            return list(self.active_tasks.keys())

    def _cleanup(self, task_id):
        """Internal callback to release capacity when a task finishes."""
        with self.lock: 
            self.active_tasks.pop(task_id, None)

    def shutdown(self):
        """Triggers shutdown for all skills and the worker pool."""
        print("[๐Ÿ”ง] Shutting down Skill Manager...")
        with self.lock:
            # Use set to avoid shutting down the same instance multiple times due to alias mapping
            for skill in set(self.skills.values()):
                skill.shutdown()
        # Shutdown thread pool
        self.executor.shutdown(wait=True)