import threading
import os
import importlib.util
from concurrent import futures
from agent_node.skills.base import BaseSkill
from agent_node.config import MAX_SKILL_WORKERS
class SkillManager:
"""Orchestrates multiple modular skills and manages the task worker pool."""
def __init__(self, max_workers=MAX_SKILL_WORKERS, sync_mgr=None):
self.executor = futures.ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="skill-worker")
self.active_tasks = {} # task_id -> future
self.sync_mgr = sync_mgr
self.skills = self._discover_skills(sync_mgr)
self.max_workers = max_workers
self.lock = threading.Lock()
def _discover_skills(self, sync_mgr):
"""Scans the skills/ directory for logic.py and loads skill implementations."""
# Find project root (assumes /app/agent_node/skills/manager.py)
# We need to go up from agent_node/skills/manager.py to /app/
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../"))
skills_dir = os.path.join(project_root, "skills")
discovered = {}
if not os.path.exists(skills_dir):
print(f" [๐งโ ๏ธ] Skills directory not found: {skills_dir}")
return discovered
for skill_dir in os.listdir(skills_dir):
item_path = os.path.join(skills_dir, skill_dir)
if os.path.isdir(item_path):
logic_py = os.path.join(item_path, "logic.py")
if os.path.exists(logic_py):
# Dynamic import
try:
spec = importlib.util.spec_from_file_location(f"skill_{skill_dir}", logic_py)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Find the first class that inherits from BaseSkill
for attr_name in dir(module):
attr = getattr(module, attr_name)
if isinstance(attr, type) and issubclass(attr, BaseSkill) and attr is not BaseSkill:
# We map the internal skill name (e.g. mesh_terminal_control)
# if we can find it in the module or assume it based on folder name
# For backward compatibility with task_type routing, we map common ones
instance = attr(sync_mgr=sync_mgr)
discovered[skill_dir] = instance
# Also map legacy names for the routing engine below
if "terminal" in skill_dir or "shell" in skill_dir:
discovered["shell"] = instance
if "browser" in skill_dir:
discovered["browser"] = instance
if "file" in skill_dir:
discovered["file"] = instance
break
except Exception as e:
print(f" [๐งโ] Failed to load skill from {logic_py}: {e}")
print(f" [๐ง] Discovered skills: {list(discovered.keys())}")
return discovered
def submit(self, task, sandbox, on_complete, on_event=None):
"""Routes a task to the appropriate skill and submits it to the thread pool."""
# --- 0. Transparent TTY Bypass (Gaming Performance) ---
# Keystrokes and Resizes should NEVER wait for a thread or be blocked by sandbox
if "shell" in self.skills and hasattr(self.skills["shell"], "handle_transparent_tty"):
if self.skills["shell"].handle_transparent_tty(task, on_complete, on_event):
return True, "Accepted (Transparent)"
with self.lock:
if len(self.active_tasks) >= self.max_workers:
return False, "Node Capacity Reached"
# 1. Routing Engine
skill = None
if task.HasField("browser_action"):
skill = self.skills.get("browser")
elif task.task_type == "file":
skill = self.skills.get("file")
else:
# Default to the one that looks like a shell
skill = self.skills.get("shell")
if not skill:
return False, f"Target skill not available for task type: {task.task_type}"
# 2. Execution submission
future = self.executor.submit(skill.execute, task, sandbox, on_complete, on_event)
self.active_tasks[task.task_id] = future
# Cleanup hook
future.add_done_callback(lambda f: self._cleanup(task.task_id))
return True, "Accepted"
def cancel(self, task_id):
"""Attempts to cancel an active task through all registered skills."""
with self.lock:
cancelled = any(s.cancel(task_id) for s in self.skills.values())
return cancelled
def get_active_ids(self):
"""Returns the list of currently running task IDs."""
with self.lock:
return list(self.active_tasks.keys())
def _cleanup(self, task_id):
"""Internal callback to release capacity when a task finishes."""
with self.lock:
self.active_tasks.pop(task_id, None)
def shutdown(self):
"""Triggers shutdown for all skills and the worker pool."""
print("[๐ง] Shutting down Skill Manager...")
with self.lock:
# Use set to avoid shutting down the same instance multiple times due to alias mapping
for skill in set(self.skills.values()):
skill.shutdown()
# Shutdown thread pool
self.executor.shutdown(wait=True)