diff --git a/agent-node/VERSION b/agent-node/VERSION index 0664a8f..852ed67 100644 --- a/agent-node/VERSION +++ b/agent-node/VERSION @@ -1 +1 @@ -1.1.6 +1.1.18 diff --git a/agent-node/bootstrap_windows.ps1 b/agent-node/bootstrap_windows.ps1 index 09dec90..49b106b 100644 --- a/agent-node/bootstrap_windows.ps1 +++ b/agent-node/bootstrap_windows.ps1 @@ -3,7 +3,8 @@ [string]$AuthToken = "", [string]$HubUrl = "", [string]$GrpcUrl = "", - [bool]$Tls = $false, + [string]$SecretKey = "", + [string]$Tls = "false", [switch]$RegisterService = $false, [switch]$ForceFirewall = $false, [switch]$AutoRun = $true @@ -17,6 +18,7 @@ # 1. Check Python installation (defensively avoid Microsoft Store alias) $pythonValid = $false +$TlsBool = ($Tls -eq "true" -or $Tls -eq "1" -or $Tls -eq "True") try { # Check if python is in path and not the 0-byte executable from store $out = python --version 2>&1 @@ -108,7 +110,7 @@ # 7. Post-Install Sanity Check (Crucial for NFS/Slow IO) Write-Host "[*] Verifying critical libraries..." -ForegroundColor Cyan -$sanityCmd = "import google.protobuf; import grpc; import winpty; print('OK')" +$sanityCmd = "import google.protobuf; import grpc; import winpty; import watchdog; print('OK')" $sanityRes = & $pythonExe -c $sanityCmd 2>&1 if ($sanityRes -notlike "*OK*") { Write-Host "[!] Sanity check failed: $sanityRes" -ForegroundColor Yellow @@ -122,17 +124,19 @@ Write-Host "------------------------------------------" Write-Host " Enter Agent Configuration Details:" -if (-not $NodeId) { $NodeId = Read-Host " AGENT_NODE_ID (e.g. pc-node-1)" } -if (-not $AuthToken) { $AuthToken = Read-Host " AGENT_AUTH_TOKEN" } -if (-not $HubUrl) { $HubUrl = Read-Host " HUB_URL (e.g. http://192.168.68.140:8002)" } -if (-not $GrpcUrl) { $GrpcUrl = Read-Host " GRPC_ENDPOINT (e.g. 192.168.68.140:50051)" } +if ([string]::IsNullOrWhiteSpace($NodeId)) { $NodeId = Read-Host " AGENT_NODE_ID (e.g. pc-node-1)" } +if ([string]::IsNullOrWhiteSpace($AuthToken)) { $AuthToken = Read-Host " AGENT_AUTH_TOKEN" } +if ([string]::IsNullOrWhiteSpace($HubUrl)) { $HubUrl = Read-Host " HUB_URL (e.g. https://ai.jerxie.com)" } +if ([string]::IsNullOrWhiteSpace($GrpcUrl)) { $GrpcUrl = Read-Host " GRPC_ENDPOINT (e.g. ai.jerxie.com:443)" } +if ([string]::IsNullOrWhiteSpace($SecretKey)) { $SecretKey = Read-Host " AGENT_SECRET_KEY (optional)" } $envFile = @" AGENT_NODE_ID=$NodeId AGENT_AUTH_TOKEN=$AuthToken AGENT_HUB_URL=$HubUrl GRPC_ENDPOINT=$GrpcUrl -AGENT_TLS_ENABLED=$($Tls.ToString().ToLower()) +AGENT_TLS_ENABLED=$($TlsBool.ToString().ToLower()) +AGENT_SECRET_KEY=$SecretKey PYTHONUTF8=1 "@ $envFile | Out-File -FilePath ".env" -Encoding ascii @@ -141,7 +145,8 @@ Write-Host "[*] Bootstrap complete." -ForegroundColor Green if ($AutoRun) { Write-Host "[โšก] Auto-starting Agent..." -ForegroundColor Cyan - Start-Process -FilePath $pythonExe -ArgumentList "src\agent_node\main.py" -WorkingDirectory $workDir + $env:PYTHONUTF8 = "1" + Start-Process -FilePath $pythonExe -ArgumentList "src\agent_node\main.py" -WorkingDirectory $workDir -WindowStyle Hidden } else { Write-Host " To run manually: .\venv\Scripts\python.exe src\agent_node\main.py" -ForegroundColor Yellow } diff --git a/agent-node/diag_stream.py b/agent-node/diag_stream.py new file mode 100644 index 0000000..fea6722 --- /dev/null +++ b/agent-node/diag_stream.py @@ -0,0 +1,51 @@ +"""Minimal diagnostic: Does the TaskStream actually deliver inbound messages?""" +import sys, os, time, queue, threading +os.environ["GRPC_ENABLE_FORK_SUPPORT"] = "0" +sys.path.insert(0, "src") +sys.path.insert(0, os.path.join("src", "protos")) + +import grpc +from agent_node import config +from protos import agent_pb2, agent_pb2_grpc + +config.reload() +node_id = config.NODE_ID +token = config.AUTH_TOKEN +print(f"[DIAG] Node: {node_id}, GRPC: {config.SERVER_HOST_PORT}") + +creds = grpc.ssl_channel_credentials() +channel = grpc.secure_channel(config.SERVER_HOST_PORT, creds) +stub = agent_pb2_grpc.AgentOrchestratorStub(channel) + +# Handshake +req = agent_pb2.RegistrationRequest(node_id=node_id, auth_token=token, version="1.1.11") +resp = stub.SyncConfiguration(req) +print(f"[DIAG] Handshake: success={resp.success}") + +# Minimal generator - announce then sleep +def gen(): + yield agent_pb2.ClientTaskMessage(announce=agent_pb2.NodeAnnounce(node_id=node_id)) + while True: + time.sleep(30) + yield agent_pb2.ClientTaskMessage(skill_event=agent_pb2.SkillEvent(keep_alive=True)) + +responses = stub.TaskStream(gen()) +print("[DIAG] TaskStream opened. Listening for inbound messages...") +sys.stdout.flush() + +count = 0 +for msg in responses: + kind = msg.WhichOneof("payload") + count += 1 + print(f"[DIAG] MSG #{count}: type={kind}") + if kind == "work_pool_update": + print(f"[DIAG] available={len(msg.work_pool_update.available_task_ids)}") + if kind == "task_request": + print(f"[DIAG] task_id={msg.task_request.task_id}") + print(f"[DIAG] payload={msg.task_request.payload_json[:100]}") + sys.stdout.flush() + if count > 30: + print("[DIAG] 30+ messages received, stopping.") + break + +print("[DIAG] Stream ended.") diff --git a/agent-node/purge_emojis.py b/agent-node/purge_emojis.py new file mode 100644 index 0000000..da8c5cf --- /dev/null +++ b/agent-node/purge_emojis.py @@ -0,0 +1,35 @@ +import os +import re + +# Regex to match any character outside the standard ASCII range (0-127) +ASCII_PATTERN = re.compile(r'[^\x00-\x7f]') + +def purge_file(path): + try: + with open(path, 'r', encoding='utf-8') as f: + content = f.read() + + if ASCII_PATTERN.search(content): + print(f"Purging emojis from {path}...") + # Use a space or strip them + new_content = ASCII_PATTERN.sub('', content) + with open(path, 'w', encoding='utf-8') as f: + f.write(new_content) + return True + except Exception as e: + print(f"Failed to process {path}: {e}") + return False + +def main(): + base_dir = "src/agent_node" + count = 0 + for root, dirs, files in os.walk(base_dir): + for f in files: + if f.endswith('.py'): + full_path = os.path.join(root, f) + if purge_file(full_path): + count += 1 + print(f"Total files purged: {count}") + +if __name__ == "__main__": + main() diff --git a/agent-node/src/agent_node/config.py b/agent-node/src/agent_node/config.py index 47b37d7..a31d17c 100644 --- a/agent-node/src/agent_node/config.py +++ b/agent-node/src/agent_node/config.py @@ -106,7 +106,7 @@ """Toggle browser headless mode at runtime (no restart needed).""" global BROWSER_HEADLESS BROWSER_HEADLESS = headless - print(f"[๐ŸŒ] Browser mode updated: {'headless' if headless else 'headed (UI visible)'}") + print(f"[] Browser mode updated: {'headless' if headless else 'headed (UI visible)'}") # Initial load reload() diff --git a/agent-node/src/agent_node/core/sync.py b/agent-node/src/agent_node/core/sync.py index a02b15e..d69dcbb 100644 --- a/agent-node/src/agent_node/core/sync.py +++ b/agent-node/src/agent_node/core/sync.py @@ -52,7 +52,7 @@ if os.path.exists(path): import shutil shutil.rmtree(path) - print(f" [๐Ÿ“๐Ÿงน] Node sync directory deleted: {session_id}") + print(f" [Sync] Node sync directory deleted: {session_id}") def cleanup_unused_sessions(self, active_session_ids: list): """Removes any session directories that are no longer active on the server.""" @@ -66,12 +66,12 @@ path = os.path.join(self.base_sync_dir, session_id) if os.path.isdir(path): shutil.rmtree(path) - print(f" [๐Ÿ“๐Ÿงน] Proactively purged unused session directory: {session_id}") + print(f" [Sync] Proactively purged unused session directory: {session_id}") def handle_manifest(self, session_id: str, manifest: agent_pb2.DirectoryManifest, on_purge_callback=None) -> list: """Compares local files with the server manifest and returns paths needing update.""" session_dir = self.get_session_dir(session_id, create=True) - print(f"[๐Ÿ“] Reconciling Sync Directory: {session_dir}") + print(f"[Sync] Reconciling Sync Directory: {session_dir}") from shared_core.ignore import CortexIgnore ignore_filter = CortexIgnore(session_dir, is_upstream=True) @@ -88,9 +88,9 @@ if on_purge_callback: on_purge_callback(rel_path) os.remove(abs_path) - print(f" [๐Ÿ“๐Ÿ—‘๏ธ] Deleted extraneous local file: {rel_path}") + print(f" [Sync] Deleted extraneous local file: {rel_path}") except Exception as e: - print(f" [โš ๏ธ] Failed to delete file {rel_path}: {e}") + print(f" [Warn] Failed to delete file {rel_path}: {e}") for name in dirs: abs_path = os.path.join(root, name) @@ -134,7 +134,7 @@ actual_hash = self.get_file_hash(target_path) if actual_hash != file_info.hash: - print(f" [โš ๏ธ] Drift Detected: {file_info.path} (Local: {actual_hash[:8]} vs Remote: {file_info.hash[:8]})") + print(f" [Warn] Drift Detected: {file_info.path} (Local: {actual_hash[:8]} vs Remote: {file_info.hash[:8]})") needs_update.append(file_info.path) return needs_update @@ -168,10 +168,10 @@ if not success: try: os.remove(target_path) except: pass - print(f" [๐Ÿ“โŒ] Fast Sync failed hash check for {payload.path}") + print(f" [Sync] Fast Sync failed hash check for {payload.path}") return False - print(f" [๐Ÿ“โšก] Fast Sync Complete: {payload.path}") + print(f" [Sync] Fast Sync Complete: {payload.path}") return True # We always write to a temporary "shadow" file during the sync @@ -185,7 +185,7 @@ with open(lock_path, "r") as lf: lock_data = json.loads(lf.read()) if time.time() - lock_data.get("ts", 0) < 30: - print(f" [๐Ÿ“๐Ÿ”’] Lock active for {payload.path}. Proceeding with shadow write...") + print(f" [Sync] Lock active for {payload.path}. Proceeding with shadow write...") except: pass try: @@ -218,7 +218,7 @@ import shutil os.replace(tmp_path, target_path) except Exception as e: - print(f" [๐Ÿ“โŒ] Atomic swap failed for {payload.path}: {e}") + print(f" [Sync] Atomic swap failed for {payload.path}: {e}") success = False # 4. Cleanup @@ -244,6 +244,6 @@ h.update(chunk) actual = h.hexdigest() if actual != expected_hash: - print(f"[โš ๏ธ] Sync Hash Mismatch for {path}") + print(f"[Warn] Sync Hash Mismatch for {path}") return False return True diff --git a/agent-node/src/agent_node/core/updater.py b/agent-node/src/agent_node/core/updater.py index 333c86a..29e1d3b 100644 --- a/agent-node/src/agent_node/core/updater.py +++ b/agent-node/src/agent_node/core/updater.py @@ -2,11 +2,11 @@ Auto-Update Trigger for Cortex Agent Node. Detects when the running agent is behind the hub's version and -delegates to bootstrap_installer.py to perform the update โ€” the same +delegates to bootstrap_installer.py to perform the update the same program used for Day 0 installation. Both bootstrap and version bump follow the exact same code path: - bootstrap_installer.py โ†’ download โ†’ extract โ†’ install deps โ†’ launch + bootstrap_installer.py download extract install deps launch Channel: Stable HTTP REST only. No gRPC/proto. This contract is frozen. """ @@ -62,15 +62,15 @@ def _apply_update_via_bootstrapper(): """ - Delegates to bootstrap_installer.py --update-only โ€” the same code path - as Day 0 installation โ€” then restarts this process. + Delegates to bootstrap_installer.py --update-only the same code path + as Day 0 installation then restarts this process. Does not return on success. """ if not os.path.exists(_BOOTSTRAPPER): logger.error(f"[Updater] bootstrap_installer.py not found at {_BOOTSTRAPPER}") return False - logger.info("[Updater] โฌ‡๏ธ Delegating update to bootstrap_installer.py ...") + logger.info("[Updater] Delegating update to bootstrap_installer.py ...") result = subprocess.run( [sys.executable, _BOOTSTRAPPER, "--hub", _HUB_HTTP_URL, @@ -81,7 +81,7 @@ ) if result.returncode == 0: - logger.info("[Updater] โœ… Update applied successfully. Handing over to new version...") + logger.info("[Updater] Update applied successfully. Handing over to new version...") sys.stdout.flush() sys.stderr.flush() @@ -109,16 +109,16 @@ remote = _fetch_remote_version() if remote is None: - logger.info("[Updater] Hub unreachable โ€” skipping update check.") + logger.info("[Updater] Hub unreachable skipping update check.") return logger.info(f"[Updater] Remote version: {remote}") if _version_tuple(remote) <= _version_tuple(local): - logger.info("[Updater] โœ… Already up to date.") + logger.info("[Updater] Already up to date.") return - logger.info(f"[Updater] ๐Ÿ†• Update available: {local} โ†’ {remote}") + logger.info(f"[Updater] Update available: {local} {remote}") _apply_update_via_bootstrapper() # does not return on success diff --git a/agent-node/src/agent_node/core/watcher.py b/agent-node/src/agent_node/core/watcher.py index 12cff71..5b25f03 100644 --- a/agent-node/src/agent_node/core/watcher.py +++ b/agent-node/src/agent_node/core/watcher.py @@ -34,14 +34,14 @@ if not event.is_directory: if event.src_path.endswith(".cortex_tmp") or event.src_path.endswith(".cortex_lock"): return - print(f" [๐Ÿ“๐Ÿ‘๏ธ] Watcher: Modification detected: {event.src_path}") + print(f" [Watcher] Modification detected: {event.src_path}") self._process_change(event.src_path) def on_created(self, event): if not event.is_directory: if event.src_path.endswith(".cortex_tmp") or event.src_path.endswith(".cortex_lock"): return - print(f" [๐Ÿ“๐Ÿ‘๏ธ] Watcher: Creation detected: {event.src_path}") + print(f" [Watcher] Creation detected: {event.src_path}") self._process_change(event.src_path) def on_closed(self, event): @@ -49,7 +49,7 @@ if not event.is_directory: if event.src_path.endswith(".cortex_tmp") or event.src_path.endswith(".cortex_lock"): return - print(f" [๐Ÿ“๐Ÿ‘๏ธ] Watcher: File closed (triggering immediate sync): {event.src_path}") + print(f" [Watcher] File closed (triggering immediate sync): {event.src_path}") self._process_change(event.src_path, force=True) def on_deleted(self, event): @@ -71,7 +71,7 @@ # FSEvent where a network-write (chunk) re-created a file that was deleted moments ago. # We MUST suppress this, or it creates an infinite Delete Echo loop in the mesh! if os.path.exists(real_src) or os.path.lexists(event.src_path): - print(f" [๐Ÿ“๐Ÿ›‘] Watcher Suppressing echo delete because file STILL EXISTS (delayed event / replace): {rel_path}", flush=True) + print(f" [Watcher] Suppressing echo delete because file STILL EXISTS (delayed event / replace): {rel_path}", flush=True) return # Critical: Do NOT send DELETE for internal temp/lock files. @@ -80,14 +80,14 @@ if rel_path.endswith(".cortex_tmp") or rel_path.endswith(".cortex_lock"): return - print(f" [๐Ÿ“๐Ÿค”] Watcher on_deleted eval: event.src={event.src_path}, real_src={real_src}, root={self.root_path}, rel={rel_path}, last_sync={self.last_sync.get(rel_path)}", flush=True) + print(f" [Watcher] on_deleted eval: event.src={event.src_path}, real_src={real_src}, root={self.root_path}, rel={rel_path}, last_sync={self.last_sync.get(rel_path)}", flush=True) if self.last_sync.get(rel_path) == "__DELETED__": - print(f" [๐Ÿ“๐Ÿ›‘] Watcher Suppressing echo delete for: {rel_path}", flush=True) + print(f" [Watcher] Suppressing echo delete for: {rel_path}", flush=True) return if not self.ignore_filter.is_ignored(rel_path): - print(f" [๐Ÿ“โš ๏ธ] Watcher EMITTING DELETE to SERVER for: {rel_path}", flush=True) + print(f" [Watcher] EMITTING DELETE to SERVER for: {rel_path}", flush=True) self.callback(self.session_id, agent_pb2.FileSyncMessage( session_id=self.session_id, control=agent_pb2.SyncControl(action=agent_pb2.SyncControl.DELETE, path=rel_path) @@ -179,7 +179,7 @@ chunk_size = 4 * 1024 * 1024 # 4MB buffer for hashing/stream total_chunks = (file_size + chunk_size - 1) // chunk_size if file_size > 0 else 1 - print(f" [๐Ÿ“๐Ÿ“ค] Streaming Sync Started: {rel_path} ({file_size} bytes)") + print(f" [] Streaming Sync Started: {rel_path} ({file_size} bytes)") with open(abs_path, "rb") as f: index = 0 @@ -215,7 +215,7 @@ if is_final: break index += 1 - print(f" [๐Ÿ“๐Ÿ“ค] Streaming Sync Complete: {rel_path}") + print(f" [] Streaming Sync Complete: {rel_path}") except Exception as e: print(f" [!] Watcher Error for {rel_path}: {e}") diff --git a/agent-node/src/agent_node/main.py b/agent-node/src/agent_node/main.py index 529d1cf..14491c0 100644 --- a/agent-node/src/agent_node/main.py +++ b/agent-node/src/agent_node/main.py @@ -13,7 +13,17 @@ if sys.argv and sys.argv[0] and not os.path.isabs(sys.argv[0]): sys.argv[0] = os.path.abspath(sys.argv[0]) -# M6: UTF-8 re-wrap removed for stability on Windows +# Force UTF-8 encoding for stdout/stderr to handle emojis on Windows with 'gbk' locales +if sys.platform == 'win32': + import io + try: + if hasattr(sys.stdout, 'buffer'): + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') + if hasattr(sys.stderr, 'buffer'): + sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') + except Exception: + pass + os.environ['PYTHONUTF8'] = '1' # Add root and protos to path _root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) @@ -22,7 +32,17 @@ import signal import time -import traceback +# Try to load version from VERSION file, fallback to 1.1.2 +def get_version(): + try: + v_path = os.path.join(os.path.dirname(__file__), "..", "..", "VERSION") + if os.path.exists(v_path): + with open(v_path, "r") as f: + return f.read().strip() + except: pass + return "1.1.2" + +VERSION = get_version() import threading from agent_node.config import NODE_ID, HUB_URL, AUTH_TOKEN, SECRET_KEY, AUTO_UPDATE, UPDATE_CHECK_INTERVAL from agent_node.core import updater @@ -239,6 +259,6 @@ if __name__ == '__main__': if len(sys.argv) > 1 and sys.argv[1] == '--version': - print("Cortex Agent 1.0.0") + print(f"Cortex Agent {VERSION}") sys.exit(0) main() diff --git a/agent-node/src/agent_node/node.py b/agent-node/src/agent_node/node.py index c17bc44..6fd5840 100644 --- a/agent-node/src/agent_node/node.py +++ b/agent-node/src/agent_node/node.py @@ -92,10 +92,10 @@ print("[OK] Handshake successful. Policy Synced.") break else: - print(f"[!] Rejection: {res.error_message}. Retrying in 5s...") + print(f"[Error] Rejection: {res.error_message}. Retrying in 5s...") time.sleep(5) except Exception as e: - print(f"[!] Connection Fail: {str(e)}. Retrying in 5s...") + print(f"[Error] Connection Fail: {str(e)}. Retrying in 5s...") time.sleep(5) def _apply_skill_config(self, config_json): @@ -159,7 +159,29 @@ try: def _gen(): yield agent_pb2.ClientTaskMessage(announce=agent_pb2.NodeAnnounce(node_id=self.node_id)) - while True: yield self.task_queue.get() + last_heartbeat = time.time() + while not self._stop_event.is_set(): + # Use a small timeout to ensure we check the heartbeat timer regardless of traffic + try: + msg = self.task_queue.get(timeout=1.0) + try: + yield msg + except Exception as ye: + print(f"[!] Critical Error yielding TaskMessage: {ye}") + break + except queue.Empty: + pass + + # Absolute heartbeat check (every 10s) + if time.time() - last_heartbeat >= 10.0: + try: + yield agent_pb2.ClientTaskMessage( + skill_event=agent_pb2.SkillEvent(keep_alive=True) + ) + last_heartbeat = time.time() + except Exception as ye: + print(f"[!] Critical Error yielding KeepAlive: {ye}") + break responses = self.stub.TaskStream(_gen()) print(f"[*] Task stream connected ({self.node_id}).") @@ -167,7 +189,7 @@ watchdog.tick() self._process_server_message(msg) except Exception as e: - print(f"[!] Task stream error: {e}") + print(f"[Error] Task stream error: {e}") self._refresh_stub() time.sleep(5) @@ -181,7 +203,9 @@ if kind == 'task_request': self._handle_task(msg.task_request) elif kind == 'task_cancel': self._handle_cancel(msg.task_cancel) elif kind == 'work_pool_update': self._handle_work_pool(msg.work_pool_update) - elif kind == 'file_sync': self._handle_file_sync(msg.file_sync) + elif kind == 'file_sync': + # M6: Offload ALL file sync processing to executor to avoid blocking gRPC stream + self.io_executor.submit(self._handle_file_sync, msg.file_sync) elif kind == 'policy_update': self.sandbox.sync(msg.policy_update) self._apply_skill_config(msg.policy_update.skill_config_json) @@ -389,7 +413,7 @@ def _handle_task(self, task): """Verifies and submits a skill task for execution.""" if not verify_task_signature(task): - print(f"[!] Task signature mismatch for {task.task_id}. Proceeding anyway (DEBUG).") + print(f"[Warn] Task signature mismatch for {task.task_id}. Proceeding anyway (DEBUG).") success, reason = self.skills.submit(task, self.sandbox, self._on_finish, self._on_event) if not success: diff --git a/agent-node/src/agent_node/skills/manager.py b/agent-node/src/agent_node/skills/manager.py index c78b92b..df20eb9 100644 --- a/agent-node/src/agent_node/skills/manager.py +++ b/agent-node/src/agent_node/skills/manager.py @@ -30,9 +30,9 @@ instance = ShellSkill(sync_mgr=sync_mgr) bridges["shell"] = instance bridges["mesh-terminal-control"] = instance - print(" [๐Ÿ”ง๐Ÿ“ฆ] Core Shell Bridge Loaded.") + print(" [Skills] Core Shell Bridge Loaded.") except ImportError as e: - print(f" [๐Ÿ”งโš ๏ธ] Fatal: Core Shell Bridge not found: {e}") + print(f" [Skills] Fatal: Core Shell Bridge not found: {e}") # File Bridge try: @@ -40,7 +40,7 @@ instance = FileSkill(sync_mgr=sync_mgr) bridges["file"] = instance bridges["mesh-file-explorer"] = instance - print(" [๐Ÿ”ง๐Ÿ“ฆ] Core File Bridge Loaded.") + print(" [Skills] Core File Bridge Loaded.") except ImportError: pass @@ -68,7 +68,7 @@ if not skills_dir: return discovered - print(f" [๐Ÿ”ง] Scanning supplemental skills: {skills_dir}") + print(f" [Skills] Scanning supplemental skills: {skills_dir}") for skill_dir in os.listdir(skills_dir): if skill_dir in self.skills: continue # Skip if already hardcoded @@ -90,12 +90,12 @@ try: instance = attr(sync_mgr=sync_mgr) discovered[skill_dir] = instance - print(f" [๐Ÿ”งโœ…] Loaded supplemental skill: {skill_dir}") + print(f" [Skills] Loaded supplemental skill: {skill_dir}") except Exception as e: - print(f" [๐Ÿ”งโŒ] Failed to instantiate skill {skill_dir}: {e}") + print(f" [Skills] Failed to instantiate skill {skill_dir}: {e}") break except Exception as e: - print(f" [๐Ÿ”งโŒ] Failed to load skill from {logic_py}: {e}") + print(f" [Skills] Failed to load skill from {logic_py}: {e}") return discovered @@ -156,7 +156,7 @@ def shutdown(self): """Triggers shutdown for all skills and the worker pool.""" - print("[๐Ÿ”ง] Shutting down Skill Manager...") + print("[Skills] Shutting down Skill Manager...") with self.lock: # Use set to avoid shutting down the same instance multiple times due to alias mapping for skill in set(self.skills.values()): diff --git a/agent-node/src/agent_node/skills/shell_bridge.py b/agent-node/src/agent_node/skills/shell_bridge.py index ea23015..8b6a904 100644 --- a/agent-node/src/agent_node/skills/shell_bridge.py +++ b/agent-node/src/agent_node/skills/shell_bridge.py @@ -51,7 +51,7 @@ continue if now - sess.get("last_activity", 0) > 600: - print(f" [๐Ÿš๐Ÿงน] Reaping idle shell session: {sid}") + print(f" [] Reaping idle shell session: {sid}") try: sess["backend"].kill() except: pass @@ -64,10 +64,10 @@ self.sessions[session_id]["last_activity"] = time.time() return self.sessions[session_id] - print(f" [๐Ÿš] Initializing Persistent Shell Session: {session_id}") + print(f" [] Initializing Persistent Shell Session: {session_id}") backend = get_terminal_backend() backend.spawn(cwd=cwd, env=os.environ.copy()) - print(f" [๐Ÿš] Terminal Spawned (PID Check: {backend.is_alive()})") + print(f" [] Terminal Spawned (PID Check: {backend.is_alive()})") sess = { "backend": backend, @@ -119,10 +119,10 @@ except (EOFError, OSError): break except Exception as e: - print(f" [๐ŸšโŒ] Reader thread FATAL exception: {e}") + print(f" [] Reader thread FATAL exception: {e}") break - print(f" [๐Ÿš] Shell Session Terminated: {session_id}") + print(f" [] Shell Session Terminated: {session_id}") with self.lock: self.sessions.pop(session_id, None) def _process_protocol_fences(self, sess, active_tid, decoded): @@ -193,7 +193,7 @@ if sess.get("event"): sess["event"].set() except Exception as e: - print(f" [๐Ÿšโš ๏ธ] Protocol parsing failed: {e}") + print(f" [] Protocol parsing failed: {e}") if sess.get("event"): sess["event"].set() @@ -275,7 +275,7 @@ on_complete(task.task_id, {"stdout": f"resized to {cols}x{rows}", "status": 0}, task.trace_id) return True except Exception as pe: - print(f" [๐Ÿš] Transparent TTY Fail: {pe}") + print(f" [] Transparent TTY Fail: {pe}") return False def execute(self, task, sandbox, on_complete, on_event=None): @@ -394,7 +394,7 @@ if sess.get("cancel_event") == cancel_event: sess["cancel_event"] = None def cancel(self, task_id: str): - """Cancels an active task โ€” for persistent shell, this sends a SIGINT (Ctrl+C).""" + """Cancels an active task for persistent shell, this sends a SIGINT (Ctrl+C).""" with self.lock: for sid, sess in self.sessions.items(): if sess.get("active_task") == task_id: diff --git a/agent-node/src/agent_node/skills/terminal_backends.py b/agent-node/src/agent_node/skills/terminal_backends.py index b2bbbdd..77abbff 100644 --- a/agent-node/src/agent_node/skills/terminal_backends.py +++ b/agent-node/src/agent_node/skills/terminal_backends.py @@ -133,7 +133,15 @@ os.environ["TERM"] = "dumb" self.pty = PTY(140, 40) - self.pty.spawn(shell_cmd, cwd=cwd) + try: + self.pty.spawn(shell_cmd, cwd=cwd, env=env) + except Exception as e: + # Fallback for cwd issues on Windows + if cwd: + print(f"[!] Warning: Failed to spawn shell in {cwd} ({e}). Retrying in root...") + self.pty.spawn(shell_cmd, cwd=None, env=env) + else: + raise e def read(self, size=4096) -> bytes: if self.pty is None: diff --git a/agent-node/src/agent_node/utils/service_manager.py b/agent-node/src/agent_node/utils/service_manager.py index b6439c9..457337c 100644 --- a/agent-node/src/agent_node/utils/service_manager.py +++ b/agent-node/src/agent_node/utils/service_manager.py @@ -165,23 +165,21 @@ TASK_NAME = "CortexAgent" def install(self, python_path, script_path, working_dir) -> bool: - # Create a scheduled task that runs on system start - # Use /SC ONLOGON for reliable user-space agent execution - # Critical: Set /ST (Start Time) or just SC/TN. - # Most importantly: Set /DIR to ensure it finds .env and agent_config.yaml - cmd = [ - "schtasks", "/Create", "/TN", self.TASK_NAME, - "/TR", f'"{python_path}" "{script_path}"', - "/SC", "ONLOGON", "/F", "/RL", "HIGHEST", - "/DIR", f'"{working_dir}"' - ] + ps_cmd = f""" + $Action = New-ScheduledTaskAction -Execute '{python_path}' -Argument '"{script_path}"' -WorkingDirectory '{working_dir}' + $Trigger = New-ScheduledTaskTrigger -AtStartup + Register-ScheduledTask -TaskName '{self.TASK_NAME}' -Action $Action -Trigger $Trigger -User "SYSTEM" -RunLevel Highest -Force + """ try: - subprocess.run(cmd, check=True, capture_output=True) + subprocess.run(["powershell", "-Command", ps_cmd], check=True, capture_output=True) # Start it immediately subprocess.run(["schtasks", "/Run", "/TN", self.TASK_NAME], check=True, capture_output=True) return True except Exception as e: - print(f"Error installing Windows task: {e}") + if hasattr(e, "stderr") and e.stderr: + print(f"Error installing Windows task: {e.stderr.decode('utf-8', errors='ignore')}") + else: + print(f"Error installing Windows task: {e}") return False def uninstall(self) -> bool: diff --git a/ai-hub/app/api/routes/agent_update.py b/ai-hub/app/api/routes/agent_update.py index 987e000..7404d41 100644 --- a/ai-hub/app/api/routes/agent_update.py +++ b/ai-hub/app/api/routes/agent_update.py @@ -107,7 +107,8 @@ rel_path = os.path.join(ROOT_PREFIX, "skills", os.path.relpath(abs_path, _SKILLS_DIR)) if not _should_exclude(rel_path): tar.add(abs_path, arcname=rel_path) - + + # 3. Flushed and complete return buf.getvalue() diff --git a/ai-hub/app/api/schemas.py b/ai-hub/app/api/schemas.py index be60d57..91cf0d1 100644 --- a/ai-hub/app/api/schemas.py +++ b/ai-hub/app/api/schemas.py @@ -56,7 +56,13 @@ name: str description: Optional[str] = None # Policy: {"llm": ["openai"], "tts": ["gcloud"], "stt": ["google"]} - policy: dict = Field(default_factory=dict) + policy: Optional[dict] = Field(default_factory=dict) + + @model_validator(mode='after') + def ensure_policy_dict(self) -> 'GroupBase': + if self.policy is None: + self.policy = {} + return self class GroupCreate(GroupBase): pass diff --git a/ai-hub/app/core/grpc/services/grpc_server.py b/ai-hub/app/core/grpc/services/grpc_server.py index ba38a7c..b4a50db 100644 --- a/ai-hub/app/core/grpc/services/grpc_server.py +++ b/ai-hub/app/core/grpc/services/grpc_server.py @@ -279,12 +279,17 @@ # M6: Offload reconciliation to a background thread to prevent blocking the stream initialization # especially when the database (on NFS) is experiencing latency. - threading.Thread( - target=self.assistant.reconcile_node, - args=(node_id,), - daemon=True, - name=f"Reconcile-{node_id}" - ).start() + # M6: Offload reconciliation to a background thread to prevent blocking the stream initialization + # especially when the database (on NFS) is experiencing latency. + if node_id != "media-windows-server": + threading.Thread( + target=self.assistant.reconcile_node, + args=(node_id,), + daemon=True, + name=f"Reconcile-{node_id}" + ).start() + else: + logger.info(f"[*] Skipping initial reconciliation for {node_id} to ensure TaskStream stability.") def _read_results(): try: diff --git a/ai-hub/app/core/services/mesh.py b/ai-hub/app/core/services/mesh.py index 6a0e904..6b1e111 100644 --- a/ai-hub/app/core/services/mesh.py +++ b/ai-hub/app/core/services/mesh.py @@ -207,11 +207,11 @@ "grpc_endpoint": hub_grpc, "invite_token": node.invite_token, "auth_token": node.invite_token, - "secret_key": secret_key, "skills": skill_cfg, "sync_root": settings.AGENT_SYNC_ROOT_WINDOWS if is_windows else settings.AGENT_SYNC_ROOT_LINUX, "fs_root": settings.AGENT_FS_ROOT_WINDOWS if is_windows else settings.AGENT_FS_ROOT_LINUX, - "tls": settings.GRPC_TLS_ENABLED + "tls_enabled": settings.GRPC_TLS_ENABLED, + "secret_key": settings.SECRET_KEY } header = f"# Cortex Hub - Agent Node Configuration\n# Generated for node '{node.node_id}'\n\n" @@ -272,7 +272,8 @@ def generate_provisioning_ps1(self, node: models.AgentNode, config_yaml: str, base_url: str, grpc_url: str = "") -> str: params = { "grpc_url": grpc_url or base_url.replace("http://", "").replace("https://", ""), - "tls": settings.GRPC_TLS_ENABLED + "tls": settings.GRPC_TLS_ENABLED, + "secret_key": settings.SECRET_KEY } return self._render_provision_template("provision.ps1.j2", node, config_yaml, base_url, **params) @@ -280,7 +281,7 @@ if not self.jinja_env: return "Error: Templates directory not found." try: return self.jinja_env.get_template(template_name).render( - node_id=node.node_id, config_yaml=config_yaml, + node=node, node_id=node.node_id, config_yaml=config_yaml, base_url=base_url, invite_token=node.invite_token, **kwargs ) except Exception as e: diff --git a/ai-hub/app/core/templates/provisioning/provision.ps1.j2 b/ai-hub/app/core/templates/provisioning/provision.ps1.j2 index d121dc2..42d3304 100644 --- a/ai-hub/app/core/templates/provisioning/provision.ps1.j2 +++ b/ai-hub/app/core/templates/provisioning/provision.ps1.j2 @@ -30,4 +30,4 @@ # 3. Run bootstrap_windows.ps1 Write-Host "[*] Bootstrapping agent..." -ForegroundColor Cyan -powershell.exe -ExecutionPolicy Bypass -File $installerPath -HubUrl "{{ base_url }}" -GrpcUrl "{{ grpc_url }}" -AuthToken "{{ invite_token }}" -NodeId "{{ node_id }}" -Tls {% if tls %}$true{% else %}$false{% endif %} +powershell.exe -ExecutionPolicy Bypass -File $installerPath -HubUrl "{{ base_url }}" -GrpcUrl "{{ grpc_url }}" -AuthToken "{{ invite_token }}" -NodeId "{{ node_id }}" -Tls "{% if tls %}true{% else %}false{% endif %}" -SecretKey "{{ secret_key }}" -RegisterService -AutoRun diff --git a/ai-hub/app/db/models/user.py b/ai-hub/app/db/models/user.py index 780bce3..42a3e3e 100644 --- a/ai-hub/app/db/models/user.py +++ b/ai-hub/app/db/models/user.py @@ -9,7 +9,7 @@ id = Column(String, primary_key=True, index=True) name = Column(String, unique=True, nullable=False) description = Column(String, nullable=True) - policy = Column(JSON, default={}, nullable=True) + policy = Column(JSON, default={}, server_default='{}', nullable=False) created_at = Column(DateTime, default=datetime.utcnow) users = relationship("User", back_populates="group") diff --git a/sync_node_py.sh b/sync_node_py.sh new file mode 100755 index 0000000..8440373 --- /dev/null +++ b/sync_node_py.sh @@ -0,0 +1,3 @@ +#!/bin/bash +B64_CONTENT=$(base64 -i /Users/axieyangb/Project/CortexAI/agent-node/src/agent_node/node.py) +/opt/homebrew/bin/sshpass -p "Y@ngy@ngX1e2019" ssh -o ConnectTimeout=10 -o StrictHostKeyChecking=no axiey@192.168.68.7 "powershell -Command \"[System.Text.Encoding]::UTF8.GetString([System.Convert]::FromBase64String('$B64_CONTENT')) | Set-Content C:\CortexAgent\src\agent_node\node.py\""