import jwt
import datetime
import hmac
import hashlib
from protos import agent_pb2
from agent_node.config import SECRET_KEY
def create_auth_token(node_id: str) -> str:
"""Creates a JWT for node authentication."""
payload = {
"sub": node_id,
"iat": datetime.datetime.utcnow(),
"exp": datetime.datetime.utcnow() + datetime.timedelta(minutes=10)
}
return jwt.encode(payload, SECRET_KEY, algorithm="HS256")
def verify_task_signature(task, secret=SECRET_KEY) -> bool:
"""Verifies HMAC signature for task payloads."""
sign_base = task.payload_json
expected_sig = hmac.new(secret.encode(), sign_base.encode(), hashlib.sha256).hexdigest()
return hmac.compare_digest(task.signature, expected_sig)
def verify_server_message_signature(msg: agent_pb2.ServerTaskMessage, secret=SECRET_KEY) -> bool:
"""Verifies HMAC signature for ServerTaskMessage."""
sig = msg.signature
msg.signature = ""
msg_bytes = msg.SerializeToString()
msg.signature = sig # Restore it
expected_sig = hmac.new(secret.encode(), msg_bytes, hashlib.sha256).hexdigest()
return hmac.compare_digest(sig, expected_sig)