Newer
Older
cortex-hub / ai-hub / app / core / services / tool.py
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session
from app.db import models
from app.core.skills.base import BaseSkill
import logging

logger = logging.getLogger(__name__)

class ToolService:
    """
    Orchestrates AI tools (Skills) available to users.
    Handles discovery, permission checks, and execution routing.
    """
    
    def __init__(self, services: Any = None, local_skills: List[BaseSkill] = []):
        self._services = services
        self._local_skills = {s.name: s for s in local_skills}

    def get_available_tools(self, db: Session, user_id: str, feature: str = None) -> List[Dict[str, Any]]:
        """
        Retrieves all tools the user is authorized to use, optionally filtered by feature.
        """
        # 1. Fetch system/local skills and filter by feature if requested
        local_skills = self._local_skills.values()
        if feature:
            local_skills = [s for s in local_skills if feature in getattr(s, "features", ["chat"])]
        
        tools = [s.to_tool_definition() for s in local_skills]
        
        # 2. Add DB-defined skills (System skills or user-owned)
        query = db.query(models.Skill).filter(
            (models.Skill.is_system == True) | 
            (models.Skill.owner_id == user_id)
        ).filter(models.Skill.is_enabled == True)
        
        if feature:
            # SQLAlchemy JSON containment check (SQLite specific or generic enough)
            # For simplicity, we filter in Python if the DB driver is tricky
            db_skills = query.all()
            db_skills = [ds for ds in db_skills if feature in (ds.features or [])]
        else:
            db_skills = query.all()
        
        for ds in db_skills:
            # Prevent duplicates if name overlaps with local
            if any(t["function"]["name"] == ds.name for t in tools):
                continue
            
            tools.append({
                "type": "function",
                "function": {
                    "name": ds.name,
                    "description": ds.description,
                    "parameters": ds.config.get("parameters", {})
                }
            })
            
        return tools

    async def call_tool(self, tool_name: str, arguments: Dict[str, Any], db: Session = None, user_id: str = None) -> Any:
        """
        Executes a registered skill.
        """
        # 1. Try local/native skill first
        if tool_name in self._local_skills:
            skill = self._local_skills[tool_name]
            result = await skill.execute(**arguments)
            return result.dict()
        
        # 2. Handle System / DB Skills
        if db:
            db_skill = db.query(models.Skill).filter(models.Skill.name == tool_name).first()
            if db_skill and db_skill.is_system:
                return await self._execute_system_skill(db_skill, arguments)
        
        logger.error(f"Tool '{tool_name}' not found or handled yet.")
        return {"success": False, "error": "Tool not found"}

    async def _execute_system_skill(self, skill: models.Skill, args: Dict[str, Any]) -> Any:
        """Routes system skill execution to the appropriate internal service."""
        orchestrator = getattr(self._services, "orchestrator", None)
        if not orchestrator:
            return {"success": False, "error": "Orchestrator not available"}
        
        assistant = orchestrator.assistant
        node_id = args.get("node_id")
        
        if not node_id:
            return {"success": False, "error": "node_id is required"}

        try:
            if skill.name == "mesh_terminal_control":
                # Maps to TaskAssistant.dispatch_single
                cmd = args.get("command")
                res = assistant.dispatch_single(node_id, cmd)
                return {"success": True, "output": res}

            elif skill.name == "browser_automation_agent":
                # Maps to TaskAssistant.dispatch_browser
                from app.protos import agent_pb2
                action_str = args.get("action", "navigate").upper()
                action_type = getattr(agent_pb2.BrowserAction, action_str, agent_pb2.BrowserAction.NAVIGATE)
                
                browser_action = agent_pb2.BrowserAction(
                    action=action_type,
                    url=args.get("url", ""),
                )
                res = assistant.dispatch_browser(node_id, browser_action)
                return {"success": True, "output": res}

            elif skill.name == "mesh_file_explorer":
                # Maps to TaskAssistant.ls, cat, write, rm
                action = args.get("action")
                path = args.get("path")
                
                if action == "list":
                    res = assistant.ls(node_id, path)
                elif action == "read":
                    res = assistant.cat(node_id, path)
                elif action == "write":
                    content = args.get("content", "").encode('utf-8')
                    res = assistant.write(node_id, path, content)
                elif action == "delete":
                    res = assistant.rm(node_id, path)
                else:
                    return {"success": False, "error": f"Unsupported action: {action}"}
                
                return {"success": True, "output": res}

        except Exception as e:
            logger.exception(f"System skill execution failed: {e}")
            return {"success": False, "error": str(e)}

        return {"success": False, "error": "Skill execution logic not found"}