from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session
from app.db import models
from app.core.skills.base import BaseSkill
import logging
import time
import os
logger = logging.getLogger(__name__)
class ToolService:
"""
Orchestrates AI tools (Skills) available to users.
Handles discovery, permission checks, and execution routing.
"""
def __init__(self, services: Any = None, local_skills: List[BaseSkill] = []):
self._services = services
self._local_skills = {s.name: s for s in local_skills}
def get_available_tools(self, db: Session, user_id: str, feature: str = None) -> List[Dict[str, Any]]:
"""
Retrieves all tools the user is authorized to use, optionally filtered by feature.
"""
# 1. Fetch system/local skills and filter by feature if requested
local_skills = self._local_skills.values()
if feature:
local_skills = [s for s in local_skills if feature in getattr(s, "features", ["chat"])]
tools = [s.to_tool_definition() for s in local_skills]
# 2. Add DB-defined skills (System skills or user-owned)
query = db.query(models.Skill).filter(
(models.Skill.is_system == True) |
(models.Skill.owner_id == user_id)
).filter(models.Skill.is_enabled == True)
if feature:
# SQLAlchemy JSON containment check (SQLite specific or generic enough)
# For simplicity, we filter in Python if the DB driver is tricky
db_skills = query.all()
db_skills = [ds for ds in db_skills if feature in (ds.features or [])]
else:
db_skills = query.all()
for ds in db_skills:
# Prevent duplicates if name overlaps with local
if any(t["function"]["name"] == ds.name for t in tools):
continue
# M3: Use the public description, but append internal AI instructions if available
# This makes the "system prompt" invisible to end users but fully visible to the Orchestrator.
description = ds.description or ""
if ds.system_prompt:
description += f"\n\nInternal Intelligence Protocol:\n{ds.system_prompt}"
tools.append({
"type": "function",
"function": {
"name": ds.name,
"description": description,
"parameters": ds.config.get("parameters", {})
}
})
return tools
async def call_tool(self, tool_name: str, arguments: Dict[str, Any], db: Session = None, user_id: str = None, session_id: str = None, session_db_id: int = None, on_event = None) -> Any:
"""
Executes a registered skill.
"""
# 1. Try local/native skill first
if tool_name in self._local_skills:
skill = self._local_skills[tool_name]
result = await skill.execute(**arguments)
return result.dict()
# 2. Handle System / DB Skills
if db:
db_skill = db.query(models.Skill).filter(models.Skill.name == tool_name).first()
if db_skill and db_skill.is_system:
return await self._execute_system_skill(db_skill, arguments, user_id=user_id, db=db, session_id=session_id, session_db_id=session_db_id, on_event=on_event)
logger.error(f"Tool '{tool_name}' not found or handled yet.")
return {"success": False, "error": "Tool not found"}
async def _execute_system_skill(self, skill: models.Skill, args: Dict[str, Any], user_id: str = None, db: Session = None, session_id: str = None, session_db_id: int = None, on_event = None) -> Any:
"""Routes system skill execution to a stateful SubAgent."""
from app.core.services.sub_agent import SubAgent
from app.core.providers.factory import get_llm_provider
# --- Programmatic Access Control (M3/M6) ---
# If targeting a mesh node, we MUST ensure it's actually attached to this session in the DB.
# This prevents AI from 'guessing' node IDs and executing on unauthorized infrastructure.
node_id = args.get("node_id")
node_ids = args.get("node_ids")
if db and session_db_id:
session = db.query(models.Session).filter(models.Session.id == session_db_id).first()
if session:
attached = session.attached_node_ids or []
# Allow virtual node IDs for system maintenance
allowed_ids = attached + ["hub", "server", "local"]
# Check single node target
if node_id and node_id not in allowed_ids:
logger.warning(f"[Security] AI attempted to access unattached node '{node_id}' in session {session_db_id}")
return {"success": False, "error": f"Node '{node_id}' is NOT attached to this session. Access denied."}
# Check swarm target
if node_ids:
illegal = [nid for nid in node_ids if nid not in allowed_ids]
if illegal:
logger.warning(f"[Security] AI attempted to access unattached nodes {illegal} in swarm call")
return {"success": False, "error": f"Nodes {illegal} are NOT attached to this session. Access denied."}
# --- Standard Preparation ---
llm_provider = None
orchestrator = getattr(self._services, "orchestrator", None)
if not orchestrator:
return {"success": False, "error": "Orchestrator not available"}
assistant = orchestrator.assistant
# M3: Resolve session_id from either arguments OR the passed session_id context
# (AI might use placeholders like 'current' which we resolve here)
session_id_arg = args.get("session_id")
if not session_id_arg or session_id_arg == "current":
resolved_sid = session_id or "__fs_explorer__"
else:
resolved_sid = session_id_arg
logger.info(f"[ToolService] Executing {skill.name} on {node_id or 'swarm'} (Resolved Session: {resolved_sid})")
if db and user_id:
user = db.query(models.User).filter(models.User.id == user_id).first()
if user:
# Use user's preferred model, or fallback to system default
p_name = user.preferences.get("llm_provider", "gemini")
m_name = user.preferences.get("llm_model", "")
# Fetch provider-specific keys from user or system defaults
llm_prefs = user.preferences.get("llm", {}).get("providers", {}).get(p_name, {})
user_service = getattr(self._services, "user_service", None)
if (not llm_prefs or not llm_prefs.get("api_key") or "*" in str(llm_prefs.get("api_key"))) and user_service:
system_prefs = user_service.get_system_settings(db)
system_provider_prefs = system_prefs.get("llm", {}).get("providers", {}).get(p_name, {})
if system_provider_prefs:
merged = system_provider_prefs.copy()
if llm_prefs: merged.update({k: v for k, v in llm_prefs.items() if v})
llm_prefs = merged
api_key_override = llm_prefs.get("api_key")
actual_m_name = m_name or llm_prefs.get("model", "")
kwargs = {k: v for k, v in llm_prefs.items() if k not in ["api_key", "model"]}
try:
llm_provider = get_llm_provider(p_name, model_name=actual_m_name, api_key_override=api_key_override, **kwargs)
logger.info(f"[ToolService] AI Sub-Agent enabled using {p_name}/{actual_m_name}")
except Exception as e:
logger.warning(f"[ToolService] Could not init LLM for sub-agent: {e}")
# Define the task function and arguments for the SubAgent
task_fn = None
task_args = {}
try:
if skill.name == "mesh_terminal_control":
# ... same logic ...
cmd = args.get("command", "")
timeout = int(args.get("timeout", 30))
session_id = args.get("session_id")
node_ids = args.get("node_ids")
no_abort = args.get("no_abort", False)
if node_id in ["hub", "server", "local"] or (node_ids and any(nid in ["hub", "server", "local"] for nid in node_ids)):
# Special Case: Direct Hub/Server Execution for system maintenance (e.g., cleaning up .browser_data)
task_fn = self._execute_hub_command
task_args = {"cmd": cmd, "timeout": timeout, "resolved_sid": resolved_sid}
elif node_ids and isinstance(node_ids, list):
task_fn = assistant.dispatch_swarm
task_args = {"node_ids": node_ids, "cmd": cmd, "timeout": timeout, "session_id": resolved_sid, "no_abort": no_abort}
elif node_id:
task_fn = assistant.dispatch_single
task_args = {"node_id": node_id, "cmd": cmd, "timeout": timeout, "session_id": resolved_sid, "no_abort": no_abort}
else:
return {"success": False, "error": "node_id or node_ids is required"}
elif skill.name == "mesh_wait_tasks":
timeout = int(args.get("timeout", 30))
no_abort = args.get("no_abort", False)
task_map = args.get("task_map", {})
if not task_map:
return {"success": False, "error": "task_map is required"}
if len(task_map) == 1:
nid, tid = next(iter(task_map.items()))
task_fn = assistant.wait_for_task
task_args = {"node_id": nid, "task_id": tid, "timeout": timeout, "no_abort": no_abort}
else:
task_fn = assistant.wait_for_swarm
task_args = {"task_map": task_map, "timeout": timeout, "no_abort": no_abort}
elif skill.name == "browser_automation_agent":
browser_service = getattr(self._services, "browser_service", None)
if not browser_service:
return {"success": False, "error": "Browser Service not available"}
action = args.get("action", "navigate").lower()
if action == "navigate":
task_fn = browser_service.navigate
task_args = {"url": args.get("url"), "session_id": resolved_sid, "on_event": on_event}
elif action == "click":
task_fn = browser_service.click
task_args = {"selector": args.get("selector"), "session_id": resolved_sid, "x": args.get("x", 0), "y": args.get("y", 0), "on_event": on_event}
elif action == "type":
task_fn = browser_service.type
task_args = {"text": args.get("text"), "selector": args.get("selector", ""), "session_id": resolved_sid, "on_event": on_event}
elif action == "snapshot":
task_fn = browser_service.get_snapshot
task_args = {"session_id": resolved_sid, "on_event": on_event}
elif action == "screenshot":
task_fn = browser_service.screenshot
task_args = {"session_id": resolved_sid, "on_event": on_event}
elif action == "hover":
task_fn = browser_service.hover
task_args = {"selector": args.get("selector"), "session_id": resolved_sid, "on_event": on_event}
elif action == "close":
task_fn = browser_service.close
task_args = {"session_id": resolved_sid, "on_event": on_event}
elif action == "eval":
task_fn = browser_service.eval
task_args = {"script": args.get("script", ""), "session_id": resolved_sid, "on_event": on_event}
elif action == "scroll":
task_fn = browser_service.scroll
task_args = {
"delta_x": int(args.get("delta_x", 0)),
"delta_y": int(args.get("delta_y", 0)),
"selector": args.get("selector", ""),
"session_id": resolved_sid,
"on_event": on_event
}
elif action == "research":
task_fn = browser_service.parallel_fetch
task_args = {
"urls": args.get("urls", []),
"session_id": resolved_sid,
"max_concurrent": int(args.get("max_concurrent", 5)),
"on_event": on_event
}
else:
return {"success": False, "error": f"Unsupported browser action: {action}"}
elif skill.name == "mesh_file_explorer":
# ... existing logic ...
action = args.get("action")
path = args.get("path")
if node_id in ["hub", "server", "local"]:
task_fn = self._execute_hub_fs
task_args = {"action": action, "path": path, "session_id": resolved_sid, "content": args.get("content")}
elif action == "list":
task_fn = assistant.ls
task_args = {"node_id": node_id, "path": path, "session_id": resolved_sid}
elif action == "read":
task_fn = assistant.cat
task_args = {"node_id": node_id, "path": path, "session_id": resolved_sid}
elif action == "write":
content = args.get("content", "").encode('utf-8')
task_fn = assistant.write
task_args = {"node_id": node_id, "path": path, "content": content, "session_id": resolved_sid}
elif action == "delete":
task_fn = assistant.rm
task_args = {"node_id": node_id, "path": path, "session_id": resolved_sid}
else:
return {"success": False, "error": f"Unsupported action: {action}"}
elif skill.name == "mesh_sync_control":
action_str = args.get("action", "start").upper()
# Normalize mapping user string to assistant enum
action_map = {
"START": "START",
"STOP": "STOP",
"LOCK": "LOCK",
"UNLOCK": "UNLOCK",
"RESYNC": "RESYNC"
}
internal_action = action_map.get(action_str, "START")
task_fn = assistant.control_sync
task_args = {
"node_id": node_id,
"session_id": resolved_sid,
"action": internal_action,
"path": args.get("path", ".")
}
elif skill.name == "mesh_inspect_drift":
task_fn = assistant.inspect_drift
task_args = {
"node_id": node_id,
"path": args.get("path"),
"session_id": resolved_sid
}
if task_fn:
# Create and run the SubAgent (potentially AI-powered)
sub_agent = SubAgent(
name=f"{skill.name}_{node_id or 'swarm'}",
task_fn=task_fn,
args=task_args,
retries=0 if skill.name in ["mesh_terminal_control", "mesh_wait_tasks"] else 2,
llm_provider=llm_provider,
assistant=assistant,
on_event=on_event
)
res = await sub_agent.run()
# Standardize output for AI:
# If it's a string (our new Intelligence Report), pass it through directly.
# If it's a dict, only wrap as failure if a non-None error exists.
if isinstance(res, dict) and res.get("error"):
return {"success": False, "error": res["error"], "sub_agent_status": sub_agent.status}
# M6: Post-processing for Binary Artifacts (Screenshots, etc.)
if skill.name == "browser_automation_agent" and isinstance(res, dict):
# Organise browser data by session for better UX
if resolved_sid and resolved_sid != "__fs_explorer__":
try:
abs_workspace = assistant.mirror.get_workspace_path(resolved_sid)
# M6: Use .browser_data (ignored from node sync)
base_dir = os.path.join(abs_workspace, ".browser_data")
os.makedirs(base_dir, exist_ok=True)
timestamp = int(time.time())
action = args.get("action", "unknown").lower()
# Clean filename for the image: {timestamp}_{action}.png
# This allows better "next/prev" sorting in the parent gallery
ss_filename = f"{timestamp}_{action}.png"
# Save Screenshot if available
if "_screenshot_bytes" in res:
bits = res.pop("_screenshot_bytes")
if bits:
ss_path = os.path.join(base_dir, ss_filename)
with open(ss_path, "wb") as f:
f.write(bits)
res["screenshot_url"] = f"/.browser_data/{resolved_sid}/{ss_filename}"
res["_visual_feedback"] = f"Action screenshot captured: {res['screenshot_url']}"
# Save Metadata/A11y into a hidden or specific sub-folder if needed,
# but keep images in the root of the session for quick gallery view.
action_dir = os.path.join(base_dir, ".metadata", f"{timestamp}_{action}")
os.makedirs(action_dir, exist_ok=True)
# Save Metadata/Result for easy debugging in file explorer
meta = {
"timestamp": timestamp,
"action": action,
"url": res.get("url"),
"title": res.get("title"),
"success": res.get("success"),
"error": res.get("error"),
"eval_result": res.get("eval_result")
}
with open(os.path.join(action_dir, "metadata.json"), "w") as f:
json.dump(meta, f, indent=2)
# Optional: Save A11y summary for quick viewing
if "a11y_summary" in res:
with open(os.path.join(action_dir, "a11y_summary.txt"), "w") as f:
f.write(res["a11y_summary"])
logger.info(f"[ToolService] Browser artifacts saved to: {action_dir}")
except Exception as sse:
logger.warning(f"Failed to persist browser data to workspace: {sse}")
logger.info(f"[ToolService] System skill '{skill.name}' completed (Status: {sub_agent.status}).")
return res
except Exception as e:
logger.exception(f"System skill execution failed: {e}")
return {"success": False, "error": str(e)}
return {"success": False, "error": "Skill execution logic not found"}
def _format_ls_result(self, res: dict, node_id: str, path: str) -> str:
"""Formats raw directory listing for LLM consumption."""
formatted = f"Directory listing for '{res.get('path', path)}' on node {node_id}:\n"
files = res.get("files")
if not files:
formatted += "(Empty directory or failed to list files)"
else:
files.sort(key=lambda x: (not x.get("is_dir"), x.get("name", "").lower()))
limit = 100
for f in files[:limit]:
icon = "📁" if f.get("is_dir") else "📄"
size_str = f" ({f.get('size')} bytes)" if not f.get("is_dir") else ""
formatted += f"{icon} {f.get('name')}{size_str}\n"
if len(files) > limit:
formatted += f"... and {len(files) - limit} more items."
return formatted
def _execute_hub_command(self, cmd: str, timeout: int = 30, resolved_sid: str = None) -> dict:
"""Executes a command locally on the Hub server within the workspace context."""
import subprocess
cwd = os.getcwd()
if resolved_sid and self._services.orchestrator:
try:
# Use absolute path of the ghost mirror as CWD
cwd = self._services.orchestrator.mirror.get_workspace_path(resolved_sid)
except: pass
try:
logger.info(f"[HubExec] Local command: {cmd} (CWD: {cwd})")
proc = subprocess.run(
cmd, shell=True, capture_output=True, text=True, timeout=timeout, cwd=cwd
)
return {
"status": "SUCCESS" if proc.returncode == 0 else "FAILED",
"stdout": proc.stdout,
"stderr": proc.stderr,
"exit_code": proc.returncode,
"node_id": "hub"
}
except subprocess.TimeoutExpired as e:
return {"status": "TIMEOUT", "stdout": e.stdout or "", "stderr": e.stderr or "", "error": "Command timed out on Hub"}
except Exception as e:
return {"status": "ERROR", "error": str(e)}
def _execute_hub_fs(self, action: str, path: str, session_id: str, content: str = None) -> dict:
"""Performs filesystem actions locally on the Hub server."""
import shutil
orchestrator = getattr(self._services, "orchestrator", None)
if not orchestrator or not orchestrator.mirror:
return {"success": False, "error": "Ghost Mirror not available"}
base = orchestrator.mirror.get_workspace_path(session_id)
# Ensure path is relative and doesn't escape
target = os.path.normpath(os.path.join(base, path.lstrip("/")))
if not target.startswith(base):
return {"success": False, "error": "Path traversal attempt blocked"}
try:
if action == "list":
if not os.path.exists(target): return {"error": "Path not found"}
files = []
for entry in os.scandir(target):
files.append({
"path": os.path.relpath(entry.path, base),
"name": entry.name,
"is_dir": entry.is_dir(),
"size": entry.stat().st_size if entry.is_file() else 0
})
return {"files": files, "path": path}
elif action == "read":
if not os.path.exists(target): return {"error": "File not found"}
with open(target, "r", encoding="utf-8", errors="ignore") as f:
return {"content": f.read(), "path": path}
elif action == "write":
os.makedirs(os.path.dirname(target), exist_ok=True)
with open(target, "w", encoding="utf-8") as f:
f.write(content or "")
return {"success": True}
elif action == "delete":
if os.path.isdir(target):
shutil.rmtree(target)
elif os.path.exists(target):
os.remove(target)
return {"success": True}
except Exception as e:
return {"error": str(e)}
return {"error": f"Unknown action: {action}"}