from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session
from app.db import models
from app.core.skills.base import BaseSkill
import logging
import time
import os
from app.core.tools.registry import tool_registry
import time
logger = logging.getLogger(__name__)
class ToolService:
"""
Orchestrates AI tools (Skills) available to users.
Handles discovery, permission checks, and execution routing.
"""
def __init__(self, services: Any = None, local_skills: List[BaseSkill] = []):
self._services = services
self._local_skills = {s.name: s for s in local_skills}
tool_registry.load_plugins()
def get_available_tools(self, db: Session, user_id: str, feature: str = None) -> List[Dict[str, Any]]:
"""
Retrieves all tools the user is authorized to use, optionally filtered by feature.
"""
# 1. Fetch system/local skills and filter by feature if requested
local_skills = self._local_skills.values()
if feature:
local_skills = [s for s in local_skills if feature in getattr(s, "features", ["chat"])]
tools = [s.to_tool_definition() for s in local_skills]
# 2. Add FS-defined skills (System skills or user-owned)
from app.core.skills.fs_loader import fs_loader
all_fs_skills = fs_loader.get_all_skills()
class _DictObj:
def __init__(self, d):
for k, v in d.items():
setattr(self, k, v)
db_skills = []
for fs_skill in all_fs_skills:
if fs_skill.get("is_enabled", True) and (fs_skill.get("is_system") or fs_skill.get("owner_id") == user_id):
if feature and feature not in fs_skill.get("features", ["chat"]):
continue
# Map virtual files array to object arrays for the legacy parsing logic
fs_skill["files"] = [_DictObj(f) for f in fs_skill.get("files", [])]
db_skills.append(_DictObj(fs_skill))
import litellm
max_md_len = 1000
try:
# Attempt to resolve the active user's model configuration dynamically to get exact context sizes
user = db.query(models.User).filter(models.User.id == user_id).first() if db else None
m_name = "gemini-2.5-pro"
if user and user.preferences:
m_name = user.preferences.get("llm_model", m_name)
model_info = litellm.get_model_info(m_name)
if model_info:
max_tokens = model_info.get("max_input_tokens", 8192)
# Cap a single skill's instruction block at 5% of the total context window to leave room
# for chat history and other plugins, with an absolute roof of 40k chars. (1 token ~= 4 chars)
max_md_len = max(min(int(max_tokens * 4 * 0.05), 40000), 1000)
except Exception as e:
logger.warning(f"Dynamic tool schema truncation failed to query model size: {e}")
for ds in db_skills:
# Prevent duplicates if name overlaps with local
if any(t["function"]["name"] == ds.name for t in tools):
continue
# --- Lazy-Loading VFS Pattern (Phase 3 - Skills as Folders) ---
# Extract parameters from SKILL.md frontmatter instead of legacy DB config column
description = ds.description or ""
parameters = {}
skill_md_file = next((f for f in ds.files if f.file_path == "SKILL.md"), None) if ds.files else None
if skill_md_file and skill_md_file.content:
exec_file = "<executable>"
for f in ds.files:
if f.file_path.endswith(".sh") or f.file_path.endswith(".py") or "run." in f.file_path:
exec_file = f.file_path
break
exec_cmd = f"bash .skills/{ds.name}/{exec_file}" if exec_file.endswith(".sh") else f"python3 .skills/{ds.name}/{exec_file}" if exec_file.endswith(".py") else f".skills/{ds.name}/{exec_file}"
description += (
f"\n\n[Native VFS Skill - Execute via: `{exec_cmd}`]\n"
f"{skill_md_file.content}"
)
# Parse YAML frontmatter to get the tool schema parameters
if skill_md_file.content.startswith("---"):
try:
import yaml
parts = skill_md_file.content.split("---", 2)
if len(parts) >= 3:
fm = yaml.safe_load(parts[1])
parameters = fm.get("config", {}).get("parameters", {})
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.warning(f"Error parsing SKILL.md frontmatter for {ds.name}: {e}")
# If no parameters found in frontmatter, try parsing markdown directly
if not parameters:
try:
import re
# Parse legacy migrated json configs
mig_match = re.search(r"### Tool Config JSON\s+```(?:yaml|json)\s+(.+?)\s+```", skill_md_file.content, re.DOTALL | re.IGNORECASE)
if mig_match:
try:
import json
parameters = json.loads(mig_match.group(1).strip())
except:
pass
if not parameters:
# Parse Description override (optional)
desc_match = re.search(r"\*\*Description:\*\*\s*(.*?)(?=\n\n|\n#|$)", skill_md_file.content, re.DOTALL | re.IGNORECASE)
if desc_match:
extracted_desc = desc_match.group(1).strip()
description = (
f"{extracted_desc}\n\n[Native VFS Skill - Execute via: `{exec_cmd}`]\n"
f"{skill_md_file.content}"
)
# Parse Parameters Table
table_pattern = r"\|\s*Name\s*\|\s*Type\s*\|\s*Description\s*\|\s*Required\s*\|\n(?:\|[-:\s]+\|[-:\s]+\|[-:\s]+\|[-:\s]+\|\n)(.*?)(?=\n\n|\n#|$)"
param_table_match = re.search(table_pattern, skill_md_file.content, re.DOTALL | re.IGNORECASE)
if param_table_match:
parameters = {"type": "object", "properties": {}, "required": []}
rows = param_table_match.group(1).strip().split('\n')
for row in rows:
if not row.strip() or '|' not in row: continue
cols = [c.strip() for c in row.split('|')][1:-1]
if len(cols) >= 4:
p_name = cols[0].replace('`', '').strip()
p_type = cols[1].strip()
p_desc = cols[2].strip()
p_req = cols[3].strip().lower() in ['yes', 'true', '1', 'y']
parameters["properties"][p_name] = {
"type": p_type,
"description": p_desc
}
if p_req:
parameters["required"].append(p_name)
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.warning(f"Error parsing SKILL.md markdown for {ds.name}: {e}")
# Automatically inject logical node parameters into the schema for all tools
if not parameters: parameters = {"type": "object", "properties": {}, "required": []}
if "properties" not in parameters: parameters["properties"] = {}
if "node_id" not in parameters["properties"]:
parameters["properties"]["node_id"] = {
"type": "string",
"description": "Optional specific mesh node ID to execute this on. Leave empty to auto-use the session's first attached node."
}
tools.append({
"type": "function",
"function": {
"name": ds.name,
"description": description,
"parameters": parameters
}
})
return tools
async def call_tool(self, tool_name: str, arguments: Dict[str, Any], db: Session = None, user_id: str = None, session_id: str = None, session_db_id: int = None, on_event = None) -> Any:
"""
Executes a registered skill.
"""
# 1. Try local/native skill first
if tool_name in self._local_skills:
skill = self._local_skills[tool_name]
result = await skill.execute(**arguments)
return result.dict()
# 2. Check the tool_registry for lightweight plugins (e.g. read_skill_artifact)
# These are system agents that DO NOT need a full SubAgent loop — their prepare_task
# returns a synchronous function we can call directly.
plugin = tool_registry.get_plugin(tool_name)
if plugin:
orchestrator = getattr(self._services, "orchestrator", None)
context = {
"db": db,
"user_id": user_id,
"session_id": session_id,
"node_id": arguments.get("node_id"),
"node_ids": arguments.get("node_ids"),
"services": self._services,
"orchestrator": orchestrator,
"assistant": orchestrator.assistant if orchestrator else None,
"on_event": on_event
}
task_fn, task_args = plugin.prepare_task(arguments, context)
if not task_fn:
return task_args # error dict
try:
import asyncio
if asyncio.iscoroutinefunction(task_fn):
res = await task_fn(**task_args)
else:
res = task_fn(**task_args)
# M6: Post-processing for Binary Artifacts (Screenshots, etc.)
if isinstance(res, dict):
res = self._process_binary_artifacts(tool_name, res, session_id, arguments, orchestrator.assistant if orchestrator else None)
return res
except Exception as e:
logger.exception(f"Plugin '{tool_name}' execution failed: {e}")
return {"success": False, "error": str(e)}
# 3. Handle System / FS Skills (full SubAgent or Bash)
from app.core.skills.fs_loader import fs_loader
all_fs_skills = fs_loader.get_all_skills()
for fs_skill in all_fs_skills:
if fs_skill.get("name") == tool_name:
class _DictObj:
def __init__(self, d):
for k, v in d.items():
setattr(self, k, v)
fs_skill["files"] = [_DictObj(f) for f in fs_skill.get("files", [])]
db_skill_mock = _DictObj(fs_skill)
return await self._execute_system_skill(db_skill_mock, arguments, user_id=user_id, db=db, session_id=session_id, session_db_id=session_db_id, on_event=on_event)
logger.error(f"Tool '{tool_name}' not found or handled yet.")
return {"success": False, "error": "Tool not found"}
async def _execute_system_skill(self, skill: Any, args: Dict[str, Any], user_id: str = None, db: Session = None, session_id: str = None, session_db_id: int = None, on_event = None) -> Any:
"""Routes FS skill execution to a stateful SubAgent or Dynamic Plugin."""
from app.core.services.sub_agent import SubAgent
from app.core.providers.factory import get_llm_provider
# --- Programmatic Access Control (M3/M6) ---
# If targeting a mesh node, we MUST ensure it's actually attached to this session in the DB.
# This prevents AI from 'guessing' node IDs and executing on unauthorized infrastructure.
node_id = args.get("node_id")
node_ids = args.get("node_ids")
if db and session_db_id:
session = db.query(models.Session).filter(models.Session.id == session_db_id).first()
if session:
attached = session.attached_node_ids or []
# Implicit fallback to first attached node if no target was specified
if not node_id and not node_ids and attached:
node_id = attached[0]
args["node_id"] = node_id
# Allow virtual node IDs for system maintenance
allowed_ids = attached + ["hub", "server", "local"]
# Check single node target
if node_id and node_id not in allowed_ids:
logger.warning(f"[Security] AI attempted to access unattached node '{node_id}' in session {session_db_id}")
return {"success": False, "error": f"Node '{node_id}' is NOT attached to this session. Access denied."}
# Check swarm target
if node_ids:
illegal = [nid for nid in node_ids if nid not in allowed_ids]
if illegal:
logger.warning(f"[Security] AI attempted to access unattached nodes {illegal} in swarm call")
return {"success": False, "error": f"Nodes {illegal} are NOT attached to this session. Access denied."}
# --- Standard Preparation ---
llm_provider = None
orchestrator = getattr(self._services, "orchestrator", None)
if not orchestrator:
return {"success": False, "error": "Orchestrator not available"}
assistant = orchestrator.assistant
# M3: Resolve session_id from either arguments OR the passed session_id context
# (AI might use placeholders like 'current' which we resolve here)
session_id_arg = args.get("session_id")
if not session_id_arg or session_id_arg == "current":
resolved_sid = session_id or "__fs_explorer__"
else:
resolved_sid = session_id_arg
logger.info(f"[ToolService] Executing {skill.name} on {node_id or 'swarm'} (Resolved Session: {resolved_sid})")
if db and user_id:
user = db.query(models.User).filter(models.User.id == user_id).first()
if user:
# Use user's preferred model, or fallback to system default
p_name = user.preferences.get("llm_provider", "gemini")
m_name = user.preferences.get("llm_model", "")
# Fetch provider-specific keys from user or system defaults
llm_prefs = user.preferences.get("llm", {}).get("providers", {}).get(p_name, {})
user_service = getattr(self._services, "user_service", None)
if (not llm_prefs or not llm_prefs.get("api_key") or "*" in str(llm_prefs.get("api_key"))) and user_service:
system_prefs = user_service.get_system_settings(db)
system_provider_prefs = system_prefs.get("llm", {}).get("providers", {}).get(p_name, {})
if system_provider_prefs:
merged = system_provider_prefs.copy()
if llm_prefs: merged.update({k: v for k, v in llm_prefs.items() if v})
llm_prefs = merged
api_key_override = llm_prefs.get("api_key")
actual_m_name = m_name or llm_prefs.get("model", "")
kwargs = {k: v for k, v in llm_prefs.items() if k not in ["api_key", "model"]}
try:
llm_provider = get_llm_provider(p_name, model_name=actual_m_name, api_key_override=api_key_override, **kwargs)
logger.info(f"[ToolService] AI Sub-Agent enabled using {p_name}/{actual_m_name}")
except Exception as e:
logger.warning(f"[ToolService] Could not init LLM for sub-agent: {e}")
# Define the task function and arguments for the SubAgent
task_fn = None
task_args = {}
plugin = tool_registry.get_plugin(skill.name)
if not plugin:
# Check if this is a Dynamic Bash Skill
bash_logic = None
skill_md_file = next((f for f in skill.files if f.file_path == "SKILL.md"), None) if getattr(skill, "files", None) else None
if skill_md_file and skill_md_file.content:
import re
# Broadened regex: Allows 3-6 hashes, any title containing "Execution Logic", and handles files ending without newlines.
bash_match = re.search(r"#{3,6}\s*Execution Logic.*?```bash\s*\n(.*?)(?:```|\Z)", skill_md_file.content, re.DOTALL | re.IGNORECASE)
if bash_match:
bash_logic = bash_match.group(1).strip()
if not bash_logic:
exec_file = "<executable>"
for f in getattr(skill, "files", []):
if f.file_path.endswith(".sh") or f.file_path.endswith(".py") or "run." in f.file_path:
exec_file = f.file_path
break
exec_cmd = f"bash .skills/{skill.name}/{exec_file}" if exec_file.endswith(".sh") else f"python3 .skills/{skill.name}/{exec_file}" if exec_file.endswith(".py") else f".skills/{skill.name}/{exec_file}"
class DynamicBashPlugin:
name = skill.name
retries = 0
def prepare_task(self, invoke_args, invoke_context):
import shlex
if bash_logic:
cmd = bash_logic
for k, v in invoke_args.items():
cmd = cmd.replace(f"${{{k}}}", str(v))
else:
# Auto-bridging fallback: construct command with env vars and positional args
safe_args = {k: v for k, v in invoke_args.items() if k != "timeout" and k != "node_id" and k != "node_ids"}
bash_env = " ".join([f'{k}={shlex.quote(str(v))}' for k, v in safe_args.items()])
bash_args_str = " ".join([shlex.quote(str(v)) for v in safe_args.values()])
cmd = f"{bash_env} {exec_cmd} {bash_args_str}".strip()
timeout = int(invoke_args.get("timeout", 60))
node_id = invoke_context.get("node_id", invoke_args.get("node_id"))
node_ids = invoke_context.get("node_ids", invoke_args.get("node_ids"))
resolved_sid = invoke_context.get("session_id")
assistant = invoke_context.get("assistant")
services = invoke_context.get("services")
if node_id in ["hub", "server", "local"] or (node_ids and any(nid in ["hub", "server", "local"] for nid in node_ids)):
def _hub_command(**kwargs):
import subprocess, os
cwd = os.getcwd()
if kwargs.get("resolved_sid") and getattr(services, "orchestrator", None):
try: cwd = services.orchestrator.mirror.get_workspace_path(kwargs.get("resolved_sid"))
except: pass
try:
proc = subprocess.run(kwargs.get("cmd"), shell=True, capture_output=True, text=True, timeout=kwargs.get("timeout"), cwd=cwd)
return {"status": "SUCCESS" if proc.returncode == 0 else "FAILED", "stdout": proc.stdout, "stderr": proc.stderr, "exit_code": proc.returncode, "node_id": "hub"}
except subprocess.TimeoutExpired as e:
return {"status": "TIMEOUT", "stdout": e.stdout or "", "stderr": e.stderr or "", "error": "Command timed out"}
except Exception as e:
return {"status": "ERROR", "error": str(e)}
return _hub_command, {"cmd": cmd, "timeout": timeout, "resolved_sid": resolved_sid}
elif node_ids and isinstance(node_ids, list):
return assistant.dispatch_swarm, {"node_ids": node_ids, "cmd": cmd, "timeout": timeout, "session_id": resolved_sid, "no_abort": False}
elif node_id:
return assistant.dispatch_single, {"node_id": node_id, "cmd": cmd, "timeout": timeout, "session_id": resolved_sid, "no_abort": False}
return None, {"success": False, "error": f"Please map a target node_id to execute this tool natively."}
plugin = DynamicBashPlugin()
context = {
"db": db,
"user_id": user_id,
"session_id": resolved_sid,
"node_id": node_id,
"node_ids": node_ids,
"assistant": assistant,
"orchestrator": orchestrator,
"services": self._services,
"on_event": on_event
}
task_fn, task_args = plugin.prepare_task(args, context)
if not task_fn:
return task_args # error dict returned by prepare_task
try:
if task_fn:
# Create and run the SubAgent (potentially AI-powered)
sub_agent = SubAgent(
name=f"{skill.name}_{node_id or 'swarm'}",
task_fn=task_fn,
args=task_args,
retries=plugin.retries,
llm_provider=llm_provider,
assistant=assistant,
on_event=on_event
)
res = await sub_agent.run()
# Standardize output for AI:
# If it's a string (our new Intelligence Report), pass it through directly.
# If it's a dict, only wrap as failure if a non-None error exists.
if isinstance(res, dict) and res.get("error"):
return {"success": False, "error": res["error"], "sub_agent_status": sub_agent.status}
# M6: Post-processing for Binary Artifacts (Screenshots, etc.)
if isinstance(res, dict):
res = self._process_binary_artifacts(skill.name, res, resolved_sid, args, assistant)
logger.info(f"[ToolService] System skill '{skill.name}' completed (Status: {sub_agent.status}).")
return res
except Exception as e:
logger.exception(f"System skill execution failed: {e}")
return {"success": False, "error": str(e)}
return {"success": False, "error": "Skill execution logic not found"}
def _process_binary_artifacts(self, skill_name: str, res: Dict[str, Any], resolved_sid: str, args: Dict[str, Any], assistant: Any) -> Dict[str, Any]:
"""Post-processing for Binary Artifacts (Screenshots, etc.)"""
# Unconditionally prevent binary data from leaking into the JSON serializer
screenshot_bits = res.pop("_screenshot_bytes", None)
if skill_name == "browser_automation_agent" and assistant:
# Organise browser data by session for better UX
if resolved_sid and resolved_sid != "__fs_explorer__":
try:
abs_workspace = assistant.mirror.get_workspace_path(resolved_sid)
# M6: Use .browser_data (ignored from node sync)
base_dir = os.path.join(abs_workspace, ".browser_data")
os.makedirs(base_dir, exist_ok=True)
timestamp = int(time.time())
action = args.get("action", "unknown").lower()
# Clean filename for the image: {timestamp}_{action}.png
ss_filename = f"{timestamp}_{action}.png"
# Save Screenshot if available
if screenshot_bits:
ss_path = os.path.join(base_dir, ss_filename)
with open(ss_path, "wb") as f:
f.write(screenshot_bits)
res["screenshot_url"] = f"/.browser_data/{resolved_sid}/{ss_filename}"
res["_visual_feedback"] = f"Action screenshot captured: {res['screenshot_url']}"
# Save Metadata/A11y into a hidden or specific sub-folder if needed,
# but keep images in the root of the session for quick gallery view.
action_dir = os.path.join(base_dir, ".metadata", f"{timestamp}_{action}")
os.makedirs(action_dir, exist_ok=True)
# Save Metadata/Result for easy debugging in file explorer
import json
meta = {
"timestamp": timestamp,
"action": action,
"url": res.get("url"),
"title": res.get("title"),
"success": res.get("success"),
"error": res.get("error"),
"eval_result": res.get("eval_result")
}
with open(os.path.join(action_dir, "metadata.json"), "w") as f:
json.dump(meta, f, indent=2)
# Optional: Save A11y summary for quick viewing
if "a11y_summary" in res:
with open(os.path.join(action_dir, "a11y_summary.txt"), "w") as f:
f.write(res["a11y_summary"])
logger.info(f"[ToolService] Browser artifacts saved to: {action_dir}")
except Exception as sse:
logger.warning(f"Failed to persist browser data to workspace: {sse}")
return res