import threading
import os
import importlib.util
from concurrent import futures
from agent_node.skills.base import BaseSkill
from agent_node.config import MAX_SKILL_WORKERS
class SkillManager:
"""Orchestrates multiple modular skills and manages the task worker pool."""
def __init__(self, max_workers=MAX_SKILL_WORKERS, sync_mgr=None):
self.executor = futures.ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="skill-worker")
self.active_tasks = {} # task_id -> future
self.sync_mgr = sync_mgr
self.max_workers = max_workers
self.lock = threading.Lock()
# 1. Start with Hardcoded Bridges (Pure Programmatic Logic)
self.skills = self._load_core_bridges(sync_mgr)
# 2. Optionally supplement with dynamic skills from disk
dynamic_skills = self._discover_skills(sync_mgr)
self.skills.update(dynamic_skills)
def _load_core_bridges(self, sync_mgr):
"""Hard-imports the core execution bridges to ensure they are always available."""
bridges = {}
# Shell Bridge
try:
from agent_node.skills.shell_bridge import ShellSkill
instance = ShellSkill(sync_mgr=sync_mgr)
bridges["shell"] = instance
bridges["mesh-terminal-control"] = instance
print(" [๐ง๐ฆ] Core Shell Bridge Loaded.")
except ImportError as e:
print(f" [๐งโ ๏ธ] Fatal: Core Shell Bridge not found: {e}")
# File Bridge
try:
from agent_node.skills.file_bridge import FileSkill
instance = FileSkill(sync_mgr=sync_mgr)
bridges["file"] = instance
bridges["mesh-file-explorer"] = instance
print(" [๐ง๐ฆ] Core File Bridge Loaded.")
except ImportError: pass
# Browser Bridge
try:
from agent_node.skills.browser_bridge import BrowserSkill
instance = BrowserSkill(sync_mgr=sync_mgr)
bridges["browser"] = instance
bridges["browser-automation-agent"] = instance
print(" [๐ง๐ฆ] Core Browser Bridge Loaded.")
except ImportError: pass
return bridges
def _discover_skills(self, sync_mgr):
"""Scans the disk for additional logic.py plugins (supplemental)."""
# Find candidate locations for skills
candidates = [
"/app/skills",
"/app/node_skills"
]
skills_dir = None
for cand in candidates:
if os.path.exists(cand) and os.path.isdir(cand):
try:
if any(os.path.isdir(os.path.join(cand, d)) for d in os.listdir(cand)):
skills_dir = cand
break
except OSError:
continue
discovered = {}
if not skills_dir:
return discovered
print(f" [๐ง] Scanning supplemental skills: {skills_dir}")
for skill_dir in os.listdir(skills_dir):
if skill_dir in self.skills: continue # Skip if already hardcoded
item_path = os.path.join(skills_dir, skill_dir)
if os.path.isdir(item_path):
logic_py = os.path.join(item_path, "logic.py")
if os.path.exists(logic_py):
# Dynamic import
try:
spec = importlib.util.spec_from_file_location(f"skill_{skill_dir}", logic_py)
module = importlib.util.module_from_spec(spec)
if spec.loader:
spec.loader.exec_module(module)
# Robust class detection
for attr_name in dir(module):
attr = getattr(module, attr_name)
if isinstance(attr, type) and any(b.__name__ == 'BaseSkill' for b in attr.__mro__) and attr.__name__ != 'BaseSkill':
try:
instance = attr(sync_mgr=sync_mgr)
discovered[skill_dir] = instance
print(f" [๐งโ
] Loaded supplemental skill: {skill_dir}")
except Exception as e:
print(f" [๐งโ] Failed to instantiate skill {skill_dir}: {e}")
break
except Exception as e:
print(f" [๐งโ] Failed to load skill from {logic_py}: {e}")
return discovered
def submit(self, task, sandbox, on_complete, on_event=None):
"""Routes a task to the appropriate skill and submits it to the thread pool."""
# --- 0. Transparent TTY Bypass (Gaming Performance) ---
# Keystrokes and Resizes should NEVER wait for a thread or be blocked by sandbox
if "shell" in self.skills and hasattr(self.skills["shell"], "handle_transparent_tty"):
if self.skills["shell"].handle_transparent_tty(task, on_complete, on_event):
return True, "Accepted (Transparent)"
with self.lock:
if len(self.active_tasks) >= self.max_workers:
return False, "Node Capacity Reached"
# 1. Routing Engine
skill = None
if task.HasField("browser_action"):
skill = self.skills.get("browser")
elif task.task_type == "file":
skill = self.skills.get("file")
else:
# Default to the one that looks like a shell
skill = self.skills.get("shell")
if not skill:
return False, f"Target skill not available for task type: {task.task_type}"
# 2. Execution submission
future = self.executor.submit(skill.execute, task, sandbox, on_complete, on_event)
self.active_tasks[task.task_id] = future
# Cleanup hook
future.add_done_callback(lambda f: self._cleanup(task.task_id))
return True, "Accepted"
def cancel(self, task_id):
"""Attempts to cancel an active task through all registered skills."""
with self.lock:
cancelled = any(s.cancel(task_id) for s in self.skills.values())
return cancelled
def get_active_ids(self):
"""Returns the list of currently running task IDs."""
with self.lock:
return list(self.active_tasks.keys())
def _cleanup(self, task_id):
"""Internal callback to release capacity when a task finishes."""
with self.lock:
self.active_tasks.pop(task_id, None)
def shutdown(self):
"""Triggers shutdown for all skills and the worker pool."""
print("[๐ง] Shutting down Skill Manager...")
with self.lock:
# Use set to avoid shutting down the same instance multiple times due to alias mapping
for skill in set(self.skills.values()):
skill.shutdown()
# Shutdown thread pool
self.executor.shutdown(wait=True)