import jwt
import datetime
import hmac
import hashlib
from protos import agent_pb2
from agent_node.config import SECRET_KEY
def create_auth_token(node_id: str) -> str:
"""Creates a JWT for node authentication."""
payload = {
"sub": node_id,
"iat": datetime.datetime.utcnow(),
"exp": datetime.datetime.utcnow() + datetime.timedelta(minutes=10)
}
return jwt.encode(payload, SECRET_KEY, algorithm="HS256")
def verify_task_signature(task, secret=SECRET_KEY) -> bool:
"""Verifies HMAC signature for shell or browser tasks."""
if task.HasField("browser_action"):
a = task.browser_action
# Aligned with orchestrator's sign_browser_action using the string Name
kind = agent_pb2.BrowserAction.ActionType.Name(a.action)
sign_base = f"{kind}:{a.url}:{a.session_id}"
else:
sign_base = task.payload_json
expected_sig = hmac.new(secret.encode(), sign_base.encode(), hashlib.sha256).hexdigest()
return hmac.compare_digest(task.signature, expected_sig)