import time
import json
from orchestrator.utils.crypto import sign_payload, sign_browser_action
from protos import agent_pb2
class TaskAssistant:
"""The 'Brain' of the Orchestrator: High-Level AI API for Dispatching Tasks."""
def __init__(self, registry, journal, pool):
self.registry = registry
self.journal = journal
self.pool = pool
def dispatch_single(self, node_id, cmd, timeout=30):
"""Dispatches a shell command to a specific node."""
node = self.registry.get_node(node_id)
if not node: return {"error": f"Node {node_id} Offline"}
tid = f"task-{int(time.time()*1000)}"
event = self.journal.register(tid, node_id)
# 12-Factor Signing Logic
sig = sign_payload(cmd)
req = agent_pb2.ServerTaskMessage(task_request=agent_pb2.TaskRequest(
task_id=tid, payload_json=cmd, signature=sig))
print(f"[📤] Dispatching shell {tid} to {node_id}")
node["queue"].put(req)
if event.wait(timeout):
res = self.journal.get_result(tid)
self.journal.pop(tid)
return res
self.journal.pop(tid)
return {"error": "Timeout"}
def dispatch_browser(self, node_id, action, timeout=60):
"""Dispatches a browser action to a directed session node."""
node = self.registry.get_node(node_id)
if not node: return {"error": f"Node {node_id} Offline"}
tid = f"br-{int(time.time()*1000)}"
event = self.journal.register(tid, node_id)
# Secure Browser Signing
sig = sign_browser_action(
agent_pb2.BrowserAction.ActionType.Name(action.action),
action.url,
action.session_id
)
req = agent_pb2.ServerTaskMessage(task_request=agent_pb2.TaskRequest(
task_id=tid, browser_action=action, signature=sig))
print(f"[🌐📤] Dispatching browser {tid} to {node_id}")
node["queue"].put(req)
if event.wait(timeout):
res = self.journal.get_result(tid)
self.journal.pop(tid)
return res
self.journal.pop(tid)
return {"error": "Timeout"}