#!/usr/bin/env python3
"""
cortex_mcp_auth.py — gcloud-style browser login for Cortex Hub MCP config generation.
Usage:
python cortex_mcp_auth.py # uses https://ai.jerxie.com
python cortex_mcp_auth.py --hub https://my-hub.com
python cortex_mcp_auth.py --hub http://localhost:8000 --gemini-project antifravity
What it does:
1. Checks the hub's auth config (OIDC vs local password)
2. OIDC path: opens a browser login URL, waits for the hub to redirect back
3. Local path: prompts for email + password, POSTs to /users/login/local
4. Writes ~/.gemini/<project>/mcp_config.json (Gemini CLI)
5. Writes ~/.claude/mcp.json (Claude Code) — skip with --no-claude
"""
import argparse
import getpass
import json
import os
import sys
import socket
import threading
import time
import urllib.parse
import urllib.request
import webbrowser
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
# ── ANSI colours (auto-disable when stdout is not a tty) ─────────────────────
_USE_COLOR = sys.stdout.isatty()
def _c(code, text): return f"\033[{code}m{text}\033[0m" if _USE_COLOR else text
def ok(msg): print(_c("32", "✓ ") + msg)
def err(msg): print(_c("31", "✗ ") + msg, file=sys.stderr)
def info(msg): print(_c("36", " ") + msg)
def bold(msg): return _c("1", msg)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _free_port() -> int:
with socket.socket() as s:
s.bind(("", 0))
return s.getsockname()[1]
def _hub_get(hub_url: str, path: str, params: dict = None) -> dict:
url = f"{hub_url.rstrip('/')}{path}"
if params:
url += "?" + urllib.parse.urlencode(params)
with urllib.request.urlopen(url, timeout=10) as r:
return json.loads(r.read())
def _hub_post(hub_url: str, path: str, payload: dict, token: str = None) -> dict:
data = json.dumps(payload).encode()
req = urllib.request.Request(
f"{hub_url.rstrip('/')}{path}",
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
if token:
req.add_header("Authorization", f"Bearer {token}")
with urllib.request.urlopen(req, timeout=10) as r:
return json.loads(r.read())
# ── One-shot local HTTP server that catches the OAuth redirect ────────────────
class _CallbackResult:
def __init__(self):
self.token = None
self.email = None
self.error = None
self._event = threading.Event()
def set(self, token, email):
self.token, self.email = token, email
self._event.set()
def set_error(self, msg):
self.error = msg
self._event.set()
def wait(self, timeout=300):
return self._event.wait(timeout)
def _make_handler(result: _CallbackResult):
class Handler(BaseHTTPRequestHandler):
def do_GET(self):
qs = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
token = (qs.get("token") or [None])[0]
email = (qs.get("email") or [""])[0]
error = (qs.get("error") or [None])[0]
if error:
result.set_error(error)
body = b"<h2>Authentication failed. You can close this tab.</h2>"
elif token:
result.set(token, email)
body = b"<h2>Authenticated! You can close this tab and return to the terminal.</h2>"
else:
result.set_error("no_token")
body = b"<h2>No token received. Please try again.</h2>"
self.send_response(200)
self.send_header("Content-Type", "text/html")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, *_):
pass # silence request logs
return Handler
def _oidc_flow(hub_url: str) -> tuple[str, str]:
"""Opens browser, waits for token via local callback. Returns (token, email)."""
port = _free_port()
result = _CallbackResult()
server = HTTPServer(("127.0.0.1", port), _make_handler(result))
t = threading.Thread(target=server.serve_forever, daemon=True)
t.start()
try:
init = _hub_get(hub_url, "/api/v1/users/auth/cli-init", {"port": port})
except Exception as e:
server.shutdown()
raise RuntimeError(f"Failed to reach hub at {hub_url}: {e}")
auth_url = init.get("auth_url")
if not auth_url:
server.shutdown()
raise RuntimeError("Hub returned no auth URL. Is OIDC configured?")
print()
print(bold("Opening browser for login…"))
info(f"If the browser does not open, visit:\n {auth_url}")
print()
webbrowser.open(auth_url)
if not result.wait(timeout=300):
server.shutdown()
raise TimeoutError("Authentication timed out (5 min). Please try again.")
server.shutdown()
if result.error:
raise RuntimeError(f"Authentication failed: {result.error}")
return result.token, result.email
def _local_auth_flow(hub_url: str) -> tuple[str, str]:
"""Prompts for email + password. Returns (token, email)."""
print()
info("This hub uses local password authentication.")
email = input(" Email: ").strip()
password = getpass.getpass(" Password: ")
try:
resp = _hub_post(hub_url, "/api/v1/users/login/local", {"email": email, "password": password})
except urllib.error.HTTPError as e:
body = e.read().decode(errors="replace")
detail = json.loads(body).get("detail", body) if body.startswith("{") else body
raise RuntimeError(f"Login failed: {detail}")
return resp["token"], resp["email"]
# ── Config writers ─────────────────────────────────────────────────────────────
def _sse_url(hub_url: str, token: str) -> str:
base = hub_url.rstrip("/")
return f"{base}/api/v1/mcp/sse?token={urllib.parse.quote(token, safe='')}"
def _write_gemini_config(hub_url: str, token: str, project: str, output: str = None):
if output:
config_path = Path(output).expanduser()
else:
config_path = Path.home() / ".gemini" / project / "mcp_config.json"
config_path.parent.mkdir(parents=True, exist_ok=True)
existing = {}
if config_path.exists():
try:
existing = json.loads(config_path.read_text())
except Exception:
pass
servers = existing.get("mcpServers", {})
servers["cortex-hub"] = {
"serverURL": _sse_url(hub_url, token),
}
existing["mcpServers"] = servers
config_path.write_text(json.dumps(existing, indent=2))
ok(f"Gemini MCP config written → {config_path}")
def _write_claude_config(hub_url: str, token: str):
import shutil, subprocess
url = _sse_url(hub_url, token)
if shutil.which("claude"):
# Use claude CLI so it lands in the right project/user slot
r = subprocess.run(
["claude", "mcp", "add", "--transport", "sse", "--scope", "user", "cortex-hub", url],
capture_output=True, text=True
)
if r.returncode == 0:
ok("Claude Code MCP server registered (user scope) via `claude mcp add`")
return
# Scope flag may not exist in older versions — fall back to default scope
r = subprocess.run(
["claude", "mcp", "add", "--transport", "sse", "cortex-hub", url],
capture_output=True, text=True
)
if r.returncode == 0:
ok("Claude Code MCP server registered (project scope) via `claude mcp add`")
return
# Fallback: write directly into ~/.claude.json at the top-level mcpServers key
config_path = Path.home() / ".claude.json"
existing = {}
if config_path.exists():
try:
existing = json.loads(config_path.read_text())
except Exception:
pass
existing.setdefault("mcpServers", {})["cortex-hub"] = {"type": "sse", "serverURL": url}
config_path.write_text(json.dumps(existing, indent=2))
ok(f"Claude Code MCP config written → {config_path}")
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Authenticate with Cortex Hub and generate MCP config files."
)
parser.add_argument(
"--hub",
default="https://ai.jerxie.com",
metavar="URL",
help="Cortex Hub base URL (default: https://ai.jerxie.com)",
)
parser.add_argument(
"--gemini-project",
default="antigravity",
metavar="NAME",
help="Gemini CLI project name (default: antigravity)",
)
parser.add_argument(
"--output",
default=None,
metavar="PATH",
help="Override output path for Gemini mcp_config.json",
)
parser.add_argument(
"--no-claude",
action="store_true",
help="Skip writing ~/.claude/mcp.json",
)
args = parser.parse_args()
hub_url = args.hub.rstrip("/")
print()
print(bold(f"Cortex Hub MCP Auth"))
info(f"Hub: {hub_url}")
print()
# 1. Determine auth method
try:
auth_config = _hub_get(hub_url, "/api/v1/users/config")
except Exception as e:
err(f"Cannot reach hub: {e}")
sys.exit(1)
oidc_enabled = auth_config.get("oidc_configured", False)
# 2. Authenticate
try:
if oidc_enabled:
token, email = _oidc_flow(hub_url)
else:
token, email = _local_auth_flow(hub_url)
except (RuntimeError, TimeoutError) as e:
err(str(e))
sys.exit(1)
print()
ok(f"Authenticated as {bold(email)}")
# 3. Write configs
print()
_write_gemini_config(hub_url, token, args.gemini_project, args.output)
if not args.no_claude:
_write_claude_config(hub_url, token)
print()
info("Token is valid for 24 hours. Re-run this script to refresh.")
print()
if __name__ == "__main__":
main()