import jwt
import datetime
import hmac
import hashlib
from protos import agent_pb2
from agent_node import config
def create_auth_token(node_id: str) -> str:
"""Creates a JWT for node authentication."""
payload = {
"sub": node_id,
"iat": datetime.datetime.utcnow(),
"exp": datetime.datetime.utcnow() + datetime.timedelta(minutes=10)
}
return jwt.encode(payload, config.SECRET_KEY, algorithm="HS256")
def verify_task_signature(task, secret=None) -> bool:
"""Verifies HMAC signature for task payloads."""
if secret is None:
secret = config.SECRET_KEY
sign_base = task.payload_json
expected_sig = hmac.new(secret.encode(), sign_base.encode(), hashlib.sha256).hexdigest()
return hmac.compare_digest(task.signature, expected_sig)
def verify_server_message_signature(msg: agent_pb2.ServerTaskMessage, secret=None) -> bool:
"""Verifies HMAC signature for ServerTaskMessage."""
if secret is None:
secret = config.SECRET_KEY
sig = msg.signature
msg.signature = ""
msg_bytes = msg.SerializeToString(deterministic=True)
msg.signature = sig # Restore it
expected_sig = hmac.new(secret.encode(), msg_bytes, hashlib.sha256).hexdigest()
return hmac.compare_digest(sig, expected_sig)