import threading
from concurrent import futures
from agent_node.skills.shell import ShellSkill
from agent_node.skills.browser import BrowserSkill
from agent_node.config import MAX_SKILL_WORKERS
class SkillManager:
"""Orchestrates multiple modular skills and manages the task worker pool."""
def __init__(self, max_workers=MAX_SKILL_WORKERS):
self.executor = futures.ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="skill-worker")
self.active_tasks = {} # task_id -> future
self.skills = {
"shell": ShellSkill(),
"browser": BrowserSkill()
}
self.max_workers = max_workers
self.lock = threading.Lock()
def submit(self, task, sandbox, on_complete, on_event=None):
"""Routes a task to the appropriate skill and submits it to the thread pool."""
with self.lock:
if len(self.active_tasks) >= self.max_workers:
return False, "Node Capacity Reached"
# 1. Routing Engine
if task.HasField("browser_action"):
skill = self.skills["browser"]
else:
skill = self.skills["shell"]
# 2. Execution submission
future = self.executor.submit(skill.execute, task, sandbox, on_complete, on_event)
self.active_tasks[task.task_id] = future
# Cleanup hook
future.add_done_callback(lambda f: self._cleanup(task.task_id))
return True, "Accepted"
def cancel(self, task_id):
"""Attempts to cancel an active task through all registered skills."""
with self.lock:
cancelled = any(s.cancel(task_id) for s in self.skills.values())
return cancelled
def get_active_ids(self):
"""Returns the list of currently running task IDs."""
with self.lock:
return list(self.active_tasks.keys())
def _cleanup(self, task_id):
"""Internal callback to release capacity when a task finishes."""
with self.lock:
self.active_tasks.pop(task_id, None)
def shutdown(self):
"""Triggers shutdown for all skills and the worker pool."""
print("[🔧] Shutting down Skill Manager...")
with self.lock:
for name, skill in self.skills.items():
print(f" [🔧] Shutting down skill: {name}")
skill.shutdown()
# Shutdown thread pool
self.executor.shutdown(wait=True)