diff --git a/ai-hub/app/api/routes/nodes.py b/ai-hub/app/api/routes/nodes.py index d1425b2..4b4e250 100644 --- a/ai-hub/app/api/routes/nodes.py +++ b/ai-hub/app/api/routes/nodes.py @@ -670,45 +670,93 @@ Request a directory listing from a node. Returns a tree-structured list for the File Navigator. """ - res = services.orchestrator.assistant.ls(node_id, path) - if isinstance(res, dict) and "error" in res: - raise HTTPException(status_code=500, detail=res["error"]) - return schemas.DirectoryListing(node_id=node_id, path=path, files=res.get("files", [])) + try: + # Defensive check for orchestrator service injection + try: + orchestrator = services.orchestrator + except AttributeError: + logger.error("[FS] Orchestrator service not found in ServiceContainer.") + raise HTTPException(status_code=500, detail="Agent Orchestrator service is starting or unavailable.") + + res = orchestrator.assistant.ls(node_id, path) + + if not res: + logger.error(f"[FS] Received empty response from node {node_id} for path {path}") + raise HTTPException(status_code=500, detail="Node returned an empty response.") + + if isinstance(res, dict) and "error" in res: + status_code = 404 if res["error"] == "Offline" else 500 + logger.warning(f"[FS] Explorer Error for {node_id}: {res['error']}") + raise HTTPException(status_code=status_code, detail=res["error"]) + + return schemas.DirectoryListing(node_id=node_id, path=path, files=res.get("files", [])) + except HTTPException: + raise + except Exception as e: + logger.error(f"[FS] Unexpected error in fs_ls: {e}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") @router.get("/{node_id}/fs/cat", summary="Read File Content") def fs_cat(node_id: str, path: str): """ - Read the content of a file from a remote node. + Read the content of a file on a remote node. """ - res = services.orchestrator.assistant.cat(node_id, path) - if isinstance(res, dict) and "error" in res: - raise HTTPException(status_code=500, detail=res["error"]) - return res + try: + orchestrator = services.orchestrator + res = orchestrator.assistant.cat(node_id, path) + if not res: + raise HTTPException(status_code=500, detail="Node returned an empty response.") + if isinstance(res, dict) and "error" in res: + raise HTTPException(status_code=500, detail=res["error"]) + return res # Expecting {"content": "..."} + except AttributeError: + raise HTTPException(status_code=500, detail="Orchestrator unavailable.") + except Exception as e: + logger.error(f"[FS] Cat error: {e}") + raise HTTPException(status_code=500, detail=str(e)) @router.post("/{node_id}/fs/touch", summary="Create File or Directory") def fs_touch(node_id: str, req: schemas.FileWriteRequest): """ Create a new file or directory on the node. """ - res = services.orchestrator.assistant.write( - node_id, - req.path, - req.content.encode('utf-8'), - req.is_dir - ) - if isinstance(res, dict) and "error" in res: - raise HTTPException(status_code=500, detail=res["error"]) - return res + try: + orchestrator = services.orchestrator + res = orchestrator.assistant.write( + node_id, + req.path, + req.content.encode('utf-8'), + req.is_dir + ) + if not res: + raise HTTPException(status_code=500, detail="Node returned an empty response.") + if isinstance(res, dict) and "error" in res: + raise HTTPException(status_code=500, detail=res["error"]) + return res # Expecting {"success": bool, "message": str} + except AttributeError: + raise HTTPException(status_code=500, detail="Orchestrator unavailable.") + except Exception as e: + logger.error(f"[FS] Touch error: {e}") + raise HTTPException(status_code=500, detail=str(e)) - @router.delete("/{node_id}/fs/rm", summary="Delete File or Directory") - def fs_rm(node_id: str, path: str): + @router.delete("/{node_id}/fs/rm", summary="Delete File/Directory") + def fs_rm(node_id: str, req: schemas.FileDeleteRequest): """ - Delete a file or directory from the node. + Delete a file or directory from a remote node. """ - res = services.orchestrator.assistant.rm(node_id, path) - if isinstance(res, dict) and "error" in res: - raise HTTPException(status_code=500, detail=res["error"]) - return res + try: + orchestrator = services.orchestrator + res = orchestrator.assistant.rm(node_id, req.path) + if not res: + raise HTTPException(status_code=500, detail="Node returned an empty response.") + if isinstance(res, dict) and "error" in res: + raise HTTPException(status_code=500, detail=res["error"]) + return res # Expecting {"success": bool, "message": str} + except AttributeError: + raise HTTPException(status_code=500, detail="Orchestrator unavailable.") + except Exception as e: + logger.error(f"[FS] Rm error: {e}") + raise HTTPException(status_code=500, detail=str(e)) return router diff --git a/ai-hub/app/core/grpc/services/assistant.py b/ai-hub/app/core/grpc/services/assistant.py index 8a7058e..55f6ab8 100644 --- a/ai-hub/app/core/grpc/services/assistant.py +++ b/ai-hub/app/core/grpc/services/assistant.py @@ -2,9 +2,12 @@ import json import os import hashlib +import logging from app.core.grpc.utils.crypto import sign_payload, sign_browser_action from app.protos import agent_pb2 +logger = logging.getLogger(__name__) + class TaskAssistant: """The 'Brain' of the Orchestrator: High-Level AI API for Dispatching Tasks.""" def __init__(self, registry, journal, pool, mirror=None): @@ -263,8 +266,8 @@ req = agent_pb2.ServerTaskMessage(task_request=agent_pb2.TaskRequest( task_id=tid, payload_json=cmd, signature=sig, session_id=session_id)) - print(f"[📤] Dispatching shell {tid} to {node_id}") - node["queue"].put(req) + logger.info(f"[📤] Dispatching shell {tid} to {node_id}") + node.queue.put(req) if event.wait(timeout): res = self.journal.get_result(tid) @@ -291,8 +294,8 @@ req = agent_pb2.ServerTaskMessage(task_request=agent_pb2.TaskRequest( task_id=tid, browser_action=action, signature=sig, session_id=session_id)) - print(f"[🌐📤] Dispatching browser {tid} to {node_id}") - node["queue"].put(req) + logger.info(f"[🌐📤] Dispatching browser {tid} to {node_id}") + node.queue.put(req) if event.wait(timeout): res = self.journal.get_result(tid)