Newer
Older
cortex-hub / ai-hub / app / core / tools / registry.py
import importlib
import inspect
import pkgutil
import logging
from typing import Dict, Type

from app.core.tools.base import BaseToolPlugin

logger = logging.getLogger(__name__)

class ToolRegistry:
    """
    Auto-discovers and holds all SubAgent Tool definition plugins.
    Allows decoupling monolithic `tool.py` into separate definitions.
    """
    
    def __init__(self):
        self._plugins: Dict[str, BaseToolPlugin] = {}

    def load_plugins(self, package_name: str = "app.core.tools.definitions"):
        """
        Dynamically imports all modules in the given package, searching for 
        subclasses of BaseToolPlugin and instantiating them.
        """
        try:
            package = importlib.import_module(package_name)
        except ImportError as e:
            logger.warning(f"Could not import tools package '{package_name}': {e}")
            return

        for _, module_name, _ in pkgutil.iter_modules(package.__path__):
            full_module_name = f"{package_name}.{module_name}"
            try:
                module = importlib.import_module(full_module_name)
                for name, obj in inspect.getmembers(module):
                    if inspect.isclass(obj) and issubclass(obj, BaseToolPlugin) and obj is not BaseToolPlugin:
                        plugin_instance = obj()
                        if plugin_instance.name:
                            self._plugins[plugin_instance.name] = plugin_instance
                            logger.info(f"Registered dynamic tool plugin: '{plugin_instance.name}'")
            except Exception as e:
                logger.error(f"Failed to load plugin module '{full_module_name}': {e}")

    def get_plugin(self, name: str) -> BaseToolPlugin:
        return self._plugins.get(name)

# Singleton global registry to load at startup
tool_registry = ToolRegistry()