"""Minimal diagnostic: Does the TaskStream actually deliver inbound messages?"""
import sys, os, time, queue, threading
os.environ["GRPC_ENABLE_FORK_SUPPORT"] = "0"
sys.path.insert(0, "src")
sys.path.insert(0, os.path.join("src", "protos"))
import grpc
from agent_node import config
from protos import agent_pb2, agent_pb2_grpc
config.reload()
node_id = config.NODE_ID
token = config.AUTH_TOKEN
print(f"[DIAG] Node: {node_id}, GRPC: {config.SERVER_HOST_PORT}")
creds = grpc.ssl_channel_credentials()
channel = grpc.secure_channel(config.SERVER_HOST_PORT, creds)
stub = agent_pb2_grpc.AgentOrchestratorStub(channel)
# Handshake
req = agent_pb2.RegistrationRequest(node_id=node_id, auth_token=token, version="1.1.11")
resp = stub.SyncConfiguration(req)
print(f"[DIAG] Handshake: success={resp.success}")
# Minimal generator - announce then sleep
def gen():
yield agent_pb2.ClientTaskMessage(announce=agent_pb2.NodeAnnounce(node_id=node_id))
while True:
time.sleep(30)
yield agent_pb2.ClientTaskMessage(skill_event=agent_pb2.SkillEvent(keep_alive=True))
responses = stub.TaskStream(gen())
print("[DIAG] TaskStream opened. Listening for inbound messages...")
sys.stdout.flush()
count = 0
for msg in responses:
kind = msg.WhichOneof("payload")
count += 1
print(f"[DIAG] MSG #{count}: type={kind}")
if kind == "work_pool_update":
print(f"[DIAG] available={len(msg.work_pool_update.available_task_ids)}")
if kind == "task_request":
print(f"[DIAG] task_id={msg.task_request.task_id}")
print(f"[DIAG] payload={msg.task_request.payload_json[:100]}")
sys.stdout.flush()
if count > 30:
print("[DIAG] 30+ messages received, stopping.")
break
print("[DIAG] Stream ended.")