import grpc
import time
import os
import agent_pb2
import agent_pb2_grpc
import threading
import subprocess
import json
import jwt
import datetime
import hmac
import hashlib
import queue
import sys
import platform
from concurrent import futures
from playwright.sync_api import sync_playwright
SECRET_KEY = "cortex-secret-shared-key"
class BaseSkill:
"""Interface for pluggable node capabilities."""
def execute(self, task, sandbox, on_complete):
raise NotImplementedError
def cancel(self, task_id):
return False
class ShellSkill(BaseSkill):
"""Default Skill: Executing shell commands."""
def __init__(self):
self.processes = {} # task_id -> Popen
self.lock = threading.Lock()
def execute(self, task, sandbox, on_complete):
try:
cmd = task.payload_json
allowed, status_msg = sandbox.verify(cmd)
if not allowed:
return on_complete(task.task_id, {"stderr": f"SANDBOX_VIOLATION: {status_msg}", "status": 2}, task.trace_id)
print(f" [🐚] Executing Shell: {cmd}")
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
with self.lock: self.processes[task.task_id] = p
timeout = task.timeout_ms / 1000.0 if task.timeout_ms > 0 else None
stdout, stderr = p.communicate(timeout=timeout)
on_complete(task.task_id, {"stdout": stdout, "stderr": stderr, "status": 1 if p.returncode == 0 else 2}, task.trace_id)
except subprocess.TimeoutExpired:
self.cancel(task.task_id)
on_complete(task.task_id, {"stderr": "TIMEOUT", "status": 2}, task.trace_id)
except Exception as e:
on_complete(task.task_id, {"stderr": str(e), "status": 2}, task.trace_id)
finally:
with self.lock: self.processes.pop(task.task_id, None)
class BrowserSkill(BaseSkill):
"""The 'Antigravity Bridge': Persistent Browser Skill."""
def __init__(self):
self.lock = threading.Lock()
self.pw = None
self.browser = None
self.sessions = {} # session_id -> { "context": Context, "page": Page }
def _ensure_browser(self):
if not self.pw:
self.pw = sync_playwright().start()
self.browser = self.pw.chromium.launch(headless=True)
def execute(self, task, sandbox, on_complete):
try:
self._ensure_browser()
action = task.browser_action
sid = action.session_id or "default"
with self.lock:
if sid not in self.sessions:
context = self.browser.new_context()
page = context.new_page()
self.sessions[sid] = {"context": context, "page": page}
page = self.sessions[sid]["page"]
print(f" [🌐] Browser Action: {agent_pb2.BrowserAction.ActionType.Name(action.action)} | Session: {sid}")
res_data = {"url": page.url, "title": page.title()}
if action.action == agent_pb2.BrowserAction.NAVIGATE:
page.goto(action.url, wait_until="domcontentloaded")
elif action.action == agent_pb2.BrowserAction.CLICK:
page.click(action.selector)
elif action.action == agent_pb2.BrowserAction.TYPE:
page.fill(action.selector, action.text)
elif action.action == agent_pb2.BrowserAction.SCREENSHOT:
res_data["snapshot"] = page.screenshot()
elif action.action == agent_pb2.BrowserAction.GET_DOM:
res_data["dom_content"] = page.content()
elif action.action == agent_pb2.BrowserAction.CLOSE:
with self.lock:
sess = self.sessions.pop(sid, None)
if sess: sess["context"].close()
# Refresh metadata after action
res_data["url"] = page.url
res_data["title"] = page.title()
browser_res = agent_pb2.BrowserResponse(
url=res_data["url"], title=res_data["title"],
snapshot=res_data.get("snapshot", b""),
dom_content=res_data.get("dom_content", "")
)
on_complete(task.task_id, {"status": 1, "browser_result": browser_res}, task.trace_id)
except Exception as e:
print(f" [!] Browser Error: {e}")
on_complete(task.task_id, {"stderr": str(e), "status": 2}, task.trace_id)
def cancel(self, task_id):
# Playwright sync actions are typically blocking in the thread,
# but we can close the contexts if needed. For now, we trust the timeout.
return False
def cancel(self, task_id):
with self.lock:
p = self.processes.get(task_id)
if p:
print(f"[🛑] Killing Shell Process: {task_id}")
p.kill()
return True
return False
class SkillManager:
"""Orchestrates multiple skills and manages the worker thread pool."""
def __init__(self, max_workers=5):
self.executor = futures.ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="skill-worker")
self.active_tasks = {} # task_id -> future
self.skills = {
"shell": ShellSkill(),
"browser": BrowserSkill()
}
self.max_workers = max_workers
self.lock = threading.Lock()
def submit(self, task, sandbox, on_complete):
with self.lock:
if len(self.active_tasks) >= self.max_workers:
return False, "Node Capacity Reached"
# Decide Skill
if task.HasField("browser_action") or task.task_type == "browser":
skill = self.skills["browser"]
else:
skill = self.skills["shell"]
future = self.executor.submit(skill.execute, task, sandbox, on_complete)
self.active_tasks[task.task_id] = future
# Cleanup hook
future.add_done_callback(lambda f: self._cleanup(task.task_id))
return True, "Accepted"
def cancel(self, task_id):
with self.lock:
# Tell all skills to try and cancel this ID
cancelled = any(s.cancel(task_id) for s in self.skills.values())
return cancelled
def get_active_ids(self):
with self.lock:
return list(self.active_tasks.keys())
def _cleanup(self, task_id):
with self.lock: self.active_tasks.pop(task_id, None)
class SandboxEngine:
def __init__(self):
self.policy = None
def sync(self, p):
self.policy = {"MODE": "STRICT" if p.mode == agent_pb2.SandboxPolicy.STRICT else "PERMISSIVE",
"ALLOWED": list(p.allowed_commands), "DENIED": list(p.denied_commands), "SENSITIVE": list(p.sensitive_commands)}
def verify(self, command_str):
if not self.policy: return False, "No Policy"
parts = (command_str or "").strip().split()
if not parts: return False, "Empty"
base_cmd = parts[0]
if base_cmd in self.policy["DENIED"]: return False, "Forbidden"
if self.policy["MODE"] == "STRICT" and base_cmd not in self.policy["ALLOWED"]:
return False, "Not Whitelisted"
return True, "OK"
class AgentNode:
def __init__(self, node_id="agent-node-007"):
self.node_id = node_id
self.skills = SkillManager()
self.sandbox = SandboxEngine()
self.task_queue = queue.Queue()
# gRPC Setup
with open('certs/client.key', 'rb') as f: pkey = f.read()
with open('certs/client.crt', 'rb') as f: cert = f.read()
with open('certs/ca.crt', 'rb') as f: ca = f.read()
creds = grpc.ssl_channel_credentials(ca, pkey, cert)
self.channel = grpc.secure_channel('localhost:50051', creds)
self.stub = agent_pb2_grpc.AgentOrchestratorStub(self.channel)
def _create_token(self):
return jwt.encode({"sub": self.node_id, "iat": datetime.datetime.utcnow(),
"exp": datetime.datetime.utcnow() + datetime.timedelta(minutes=10)}, SECRET_KEY, algorithm="HS256")
def sync_configuration(self):
print(f"[*] Handshake: {self.node_id}")
reg = agent_pb2.RegistrationRequest(node_id=self.node_id, auth_token=self._create_token(),
node_description="Refactored Stateful Node with Browser Skill",
capabilities={"shell": "v1", "browser": "playwright-1.42"})
res = self.stub.SyncConfiguration(reg)
if res.success: self.sandbox.sync(res.policy); print("[OK] Policy Synced.")
else: print(f"[!] Rejected: {res.error_message}"); sys.exit(1)
def start_health_reporting(self):
def _gen():
while True:
ids = self.skills.get_active_ids()
yield agent_pb2.Heartbeat(node_id=self.node_id, cpu_usage_percent=1.0,
active_worker_count=len(ids), max_worker_capacity=5, running_task_ids=ids)
time.sleep(10)
threading.Thread(target=lambda: list(self.stub.ReportHealth(_gen())), daemon=True).start()
def run_task_stream(self):
def _gen():
while True: yield self.task_queue.get()
responses = self.stub.TaskStream(_gen())
for msg in responses:
kind = msg.WhichOneof('payload')
if kind == 'task_request':
self._handle_task(msg.task_request)
elif kind == 'task_cancel':
if self.skills.cancel(msg.task_cancel.task_id):
self.task_queue.put(agent_pb2.ClientTaskMessage(task_response=agent_pb2.TaskResponse(
task_id=msg.task_cancel.task_id, status=agent_pb2.TaskResponse.CANCELLED)))
elif kind == 'work_pool_update':
for tid in msg.work_pool_update.available_task_ids:
if len(self.skills.get_active_ids()) < self.skills.max_workers:
self.task_queue.put(agent_pb2.ClientTaskMessage(task_claim=agent_pb2.TaskClaimRequest(task_id=tid, node_id=self.node_id)))
def _handle_task(self, task):
# Sig Verify logic based on payload type
if task.HasField("browser_action"):
a = task.browser_action
sign_base = f"{a.action}:{a.url}:{a.session_id}".encode()
else:
sign_base = task.payload_json.encode()
expected_sig = hmac.new(SECRET_KEY.encode(), sign_base, hashlib.sha256).hexdigest()
if not hmac.compare_digest(task.signature, expected_sig):
return print(f"[!] Sig Fail for {task.task_id}")
self.skills.submit(task, self.sandbox, self._on_finish)
def _on_finish(self, tid, res, trace):
print(f"[*] Task {tid} finished.")
status = agent_pb2.TaskResponse.SUCCESS if res['status'] == 1 else agent_pb2.TaskResponse.ERROR
tr = agent_pb2.TaskResponse(
task_id=tid, status=status,
stdout=res.get('stdout',''),
stderr=res.get('stderr',''),
trace_id=trace,
browser_result=res.get("browser_result")
)
self.task_queue.put(agent_pb2.ClientTaskMessage(task_response=tr))
if __name__ == '__main__':
node = AgentNode()
node.sync_configuration()
node.start_health_reporting()
node.run_task_stream()