Newer
Older
cortex-hub / poc-grpc-agent / agent_node / utils / auth.py
import jwt
import datetime
import hmac
import hashlib
from protos import agent_pb2
from agent_node.config import SECRET_KEY

def create_auth_token(node_id: str) -> str:
    """Creates a JWT for node authentication."""
    payload = {
        "sub": node_id, 
        "iat": datetime.datetime.utcnow(), 
        "exp": datetime.datetime.utcnow() + datetime.timedelta(minutes=10)
    }
    return jwt.encode(payload, SECRET_KEY, algorithm="HS256")

def verify_task_signature(task, secret=SECRET_KEY) -> bool:
    """Verifies HMAC signature for shell or browser tasks."""
    if task.HasField("browser_action"):
        a = task.browser_action
        # Aligned with orchestrator's sign_browser_action using the string Name
        kind = agent_pb2.BrowserAction.ActionType.Name(a.action)
        sign_base = f"{kind}:{a.url}:{a.session_id}"
    else:
        sign_base = task.payload_json
        
    expected_sig = hmac.new(secret.encode(), sign_base.encode(), hashlib.sha256).hexdigest()
    return hmac.compare_digest(task.signature, expected_sig)