from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session
from app.db import models
from app.core.skills.base import BaseSkill
import logging
logger = logging.getLogger(__name__)
class ToolService:
"""
Orchestrates AI tools (Skills) available to users.
Handles discovery, permission checks, and execution routing.
"""
def __init__(self, services: Any = None, local_skills: List[BaseSkill] = []):
self._services = services
self._local_skills = {s.name: s for s in local_skills}
def get_available_tools(self, db: Session, user_id: str, feature: str = None) -> List[Dict[str, Any]]:
"""
Retrieves all tools the user is authorized to use, optionally filtered by feature.
"""
# 1. Fetch system/local skills and filter by feature if requested
local_skills = self._local_skills.values()
if feature:
local_skills = [s for s in local_skills if feature in getattr(s, "features", ["chat"])]
tools = [s.to_tool_definition() for s in local_skills]
# 2. Add DB-defined skills (System skills or user-owned)
query = db.query(models.Skill).filter(
(models.Skill.is_system == True) |
(models.Skill.owner_id == user_id)
).filter(models.Skill.is_enabled == True)
if feature:
# SQLAlchemy JSON containment check (SQLite specific or generic enough)
# For simplicity, we filter in Python if the DB driver is tricky
db_skills = query.all()
db_skills = [ds for ds in db_skills if feature in (ds.features or [])]
else:
db_skills = query.all()
for ds in db_skills:
# Prevent duplicates if name overlaps with local
if any(t["function"]["name"] == ds.name for t in tools):
continue
tools.append({
"type": "function",
"function": {
"name": ds.name,
"description": ds.description,
"parameters": ds.config.get("parameters", {})
}
})
return tools
async def call_tool(self, tool_name: str, arguments: Dict[str, Any], db: Session = None, user_id: str = None, on_event = None) -> Any:
"""
Executes a registered skill.
"""
# 1. Try local/native skill first
if tool_name in self._local_skills:
skill = self._local_skills[tool_name]
result = await skill.execute(**arguments)
return result.dict()
# 2. Handle System / DB Skills
if db:
db_skill = db.query(models.Skill).filter(models.Skill.name == tool_name).first()
if db_skill and db_skill.is_system:
return await self._execute_system_skill(db_skill, arguments, user_id=user_id, db=db, on_event=on_event)
logger.error(f"Tool '{tool_name}' not found or handled yet.")
return {"success": False, "error": "Tool not found"}
async def _execute_system_skill(self, skill: models.Skill, args: Dict[str, Any], user_id: str = None, db: Session = None, on_event = None) -> Any:
"""Routes system skill execution to a stateful SubAgent."""
from app.core.services.sub_agent import SubAgent
from app.core.providers.factory import get_llm_provider
orchestrator = getattr(self._services, "orchestrator", None)
if not orchestrator:
return {"success": False, "error": "Orchestrator not available"}
assistant = orchestrator.assistant
node_id = args.get("node_id")
# node_id requirement is now handled per-skill below to support swarm/plural fields
# --- AI Sub-Agent Setup ---
llm_provider = None
subagent_prompt = skill.config.get("subagent_system_prompt")
if db and user_id and subagent_prompt:
user = db.query(models.User).filter(models.User.id == user_id).first()
if user:
# Use user's preferred model, or fallback to system default
p_name = user.preferences.get("llm_provider", "gemini")
m_name = user.preferences.get("llm_model", "")
try:
llm_provider = get_llm_provider(p_name, m_name)
logger.info(f"[ToolService] AI Sub-Agent enabled using {p_name}/{m_name}")
except Exception as e:
logger.warning(f"[ToolService] Could not init LLM for sub-agent: {e}")
# Define the task function and arguments for the SubAgent
task_fn = None
task_args = {}
try:
if skill.name == "mesh_terminal_control":
# ... same logic ...
cmd = args.get("command", "")
timeout = int(args.get("timeout", 30))
session_id = args.get("session_id")
node_ids = args.get("node_ids")
no_abort = args.get("no_abort", False)
if node_ids and isinstance(node_ids, list):
task_fn = assistant.dispatch_swarm
task_args = {"node_ids": node_ids, "cmd": cmd, "timeout": timeout, "session_id": session_id, "no_abort": no_abort}
elif node_id:
task_fn = assistant.dispatch_single
task_args = {"node_id": node_id, "cmd": cmd, "timeout": timeout, "session_id": session_id, "no_abort": no_abort}
else:
return {"success": False, "error": "node_id or node_ids is required"}
elif skill.name == "mesh_wait_tasks":
timeout = int(args.get("timeout", 30))
no_abort = args.get("no_abort", False)
task_map = args.get("task_map", {})
if not task_map:
return {"success": False, "error": "task_map is required"}
if len(task_map) == 1:
nid, tid = next(iter(task_map.items()))
task_fn = assistant.wait_for_task
task_args = {"node_id": nid, "task_id": tid, "timeout": timeout, "no_abort": no_abort}
else:
task_fn = assistant.wait_for_swarm
task_args = {"task_map": task_map, "timeout": timeout, "no_abort": no_abort}
elif skill.name == "browser_automation_agent":
# ... existing logic ...
from app.protos import agent_pb2
action_str = args.get("action", "navigate").upper()
action_type = getattr(agent_pb2.BrowserAction, action_str, agent_pb2.BrowserAction.NAVIGATE)
session_id = args.get("session_id")
browser_action = agent_pb2.BrowserAction(
action=action_type,
url=args.get("url", ""),
session_id=session_id or ""
)
task_fn = assistant.dispatch_browser
task_args = {"node_id": node_id, "action": browser_action, "session_id": session_id}
elif skill.name == "mesh_file_explorer":
# ... existing logic ...
action = args.get("action")
path = args.get("path")
explorer_sid = session_id or "__fs_explorer__"
if action == "list":
task_fn = assistant.ls
task_args = {"node_id": node_id, "path": path, "session_id": explorer_sid}
elif action == "read":
task_fn = assistant.cat
task_args = {"node_id": node_id, "path": path, "session_id": explorer_sid}
elif action == "write":
content = args.get("content", "").encode('utf-8')
task_fn = assistant.write
task_args = {"node_id": node_id, "path": path, "content": content, "session_id": explorer_sid}
elif action == "delete":
task_fn = assistant.rm
task_args = {"node_id": node_id, "path": path, "session_id": explorer_sid}
else:
return {"success": False, "error": f"Unsupported action: {action}"}
if task_fn:
# Create and run the SubAgent (potentially AI-powered)
sub_agent = SubAgent(
name=f"{skill.name}_{node_id or 'swarm'}",
task_fn=task_fn,
args=task_args,
retries=0 if skill.name in ["mesh_terminal_control", "mesh_wait_tasks"] else 2,
llm_provider=llm_provider,
assistant=assistant,
subagent_system_prompt=subagent_prompt,
on_event=on_event
)
res = await sub_agent.run()
# Standardize output for AI
if isinstance(res, dict) and "error" in res:
return {"success": False, "error": res["error"], "sub_agent_status": sub_agent.status}
return {"success": True, "output": res, "sub_agent_status": sub_agent.status}
except Exception as e:
logger.exception(f"System skill execution failed: {e}")
return {"success": False, "error": str(e)}
return {"success": False, "error": "Skill execution logic not found"}
def _format_ls_result(self, res: dict, node_id: str, path: str) -> str:
"""Formats raw directory listing for LLM consumption."""
formatted = f"Directory listing for '{res.get('path', path)}' on node {node_id}:\n"
files = res.get("files")
if not files:
formatted += "(Empty directory or failed to list files)"
else:
files.sort(key=lambda x: (not x.get("is_dir"), x.get("name", "").lower()))
limit = 100
for f in files[:limit]:
icon = "📁" if f.get("is_dir") else "📄"
size_str = f" ({f.get('size')} bytes)" if not f.get("is_dir") else ""
formatted += f"{icon} {f.get('name')}{size_str}\n"
if len(files) > limit:
formatted += f"... and {len(files) - limit} more items."
return formatted