package storage
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"strings"
clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
secretv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" // SDS Import
"google.golang.org/protobuf/encoding/protojson"
)
// Storage abstracts database persistence
type Storage struct {
db *sql.DB
driver string
}
// DeleteStrategy defines the action to take on missing resources
type DeleteStrategy int
const (
// DeleteNone performs only UPSERT for items in the list (default behavior)
DeleteNone DeleteStrategy = iota
// DeleteLogical marks missing resources as disabled (applicable to clusters, listeners, and secrets)
DeleteLogical
// DeleteActual removes missing resources physically from the database
DeleteActual
)
// NewStorage initializes a Storage instance
func NewStorage(db *sql.DB, driver string) *Storage {
return &Storage{db: db, driver: driver}
}
// placeholder returns correct SQL placeholder based on driver
func (s *Storage) placeholder(n int) string {
if s.driver == "postgres" {
return fmt.Sprintf("$%d", n)
}
return "?"
}
// CertStorage represents the persistent data needed for certificate renewal.
// This mirrors the data that was previously stored in the internalcertapi.Certificate.
type CertStorage struct {
Domain string // The certificate domain (used as the primary key)
Email string // The ACME account email
CertPEM []byte // The current certificate (public part + chain)
KeyPEM []byte // The domain's private key
AccountKey []byte // The ACME Account private key D-value (for signing)
AccountURL string // The ACME Account URI (KID)
IssuerType string // The type of issuer (e.g., "LetsEncrypt"). Default to ""
SecretName string // The name of the SDS Secret this certificate is associated with. Empty if unlinked/manual.
}
// InitSchema ensures required tables exist
func (s *Storage) InitSchema(ctx context.Context) error {
var schema string
switch s.driver {
case "postgres":
schema = `
CREATE TABLE IF NOT EXISTS clusters (
id SERIAL PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
data JSONB NOT NULL,
enabled BOOLEAN DEFAULT true,
updated_at TIMESTAMP DEFAULT now()
);
CREATE TABLE IF NOT EXISTS listeners (
id SERIAL PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
data JSONB NOT NULL,
enabled BOOLEAN DEFAULT true,
updated_at TIMESTAMP DEFAULT now()
);
-- SDS secrets table for Postgres
CREATE TABLE IF NOT EXISTS secrets (
id SERIAL PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
data JSONB NOT NULL,
enabled BOOLEAN DEFAULT true,
updated_at TIMESTAMP DEFAULT now()
);
-- 👇 UPDATED CERTIFICATE TABLE FOR ACME RENEWAL
CREATE TABLE IF NOT EXISTS certificates (
domain TEXT PRIMARY KEY,
email TEXT NOT NULL,
cert_pem BYTEA NOT NULL,
key_pem BYTEA NOT NULL,
account_key BYTEA NOT NULL,
account_url TEXT NOT NULL,
issuer_type TEXT DEFAULT '',
secret_name TEXT DEFAULT '', -- New field
updated_at TIMESTAMP DEFAULT now()
);`
default: // SQLite
schema = `
CREATE TABLE IF NOT EXISTS clusters (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
data TEXT NOT NULL,
enabled BOOLEAN DEFAULT 1,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS listeners (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
data TEXT NOT NULL,
enabled BOOLEAN DEFAULT 1,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- SDS secrets table for SQLite
CREATE TABLE IF NOT EXISTS secrets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
data TEXT NOT NULL,
enabled BOOLEAN DEFAULT 1,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 👇 UPDATED CERTIFICATE TABLE FOR ACME RENEWAL
CREATE TABLE IF NOT EXISTS certificates (
domain TEXT PRIMARY KEY,
email TEXT NOT NULL,
cert_pem BLOB NOT NULL,
key_pem BLOB NOT NULL,
account_key BLOB NOT NULL,
account_url TEXT NOT NULL,
issuer_type TEXT DEFAULT '',
secret_name TEXT DEFAULT '', -- New field
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);`
}
_, err := s.db.ExecContext(ctx, schema)
return err
}
// -----------------------------------------------------------------------------
// NEW CERTIFICATE METHODS (UPSERT & LOAD)
// -----------------------------------------------------------------------------
// SaveCertificate inserts or updates a certificate resource
func (s *Storage) SaveCertificate(ctx context.Context, cert *CertStorage) error {
var query string
switch s.driver {
case "postgres":
query = fmt.Sprintf(`
INSERT INTO certificates (domain, email, cert_pem, key_pem, account_key, account_url, issuer_type, secret_name, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, now())
ON CONFLICT (domain) DO UPDATE SET
email = EXCLUDED.email,
cert_pem = EXCLUDED.cert_pem,
key_pem = EXCLUDED.key_pem,
account_key = EXCLUDED.account_key,
account_url = EXCLUDED.account_url,
issuer_type = EXCLUDED.issuer_type,
secret_name = EXCLUDED.secret_name, -- Updated field
updated_at = now()`,
s.placeholder(1), s.placeholder(2), s.placeholder(3), s.placeholder(4), s.placeholder(5), s.placeholder(6), s.placeholder(7), s.placeholder(8))
default: // SQLite
query = `
INSERT INTO certificates (domain, email, cert_pem, key_pem, account_key, account_url, issuer_type, secret_name, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(domain) DO UPDATE SET
email = excluded.email,
cert_pem = excluded.cert_pem,
key_pem = excluded.key_pem,
account_key = excluded.account_key,
account_url = excluded.account_url,
issuer_type = excluded.issuer_type,
secret_name = excluded.secret_name, -- Updated field
updated_at = CURRENT_TIMESTAMP`
}
_, err := s.db.ExecContext(ctx, query,
cert.Domain,
cert.Email,
cert.CertPEM,
cert.KeyPEM,
cert.AccountKey,
cert.AccountURL,
cert.IssuerType,
cert.SecretName, // New field
)
return err
}
// LoadCertificate retrieves a certificate resource by domain
func (s *Storage) LoadCertificate(ctx context.Context, domain string) (*CertStorage, error) {
// Updated SELECT statement to include secret_name
query := `SELECT email, cert_pem, key_pem, account_key, account_url, issuer_type, secret_name FROM certificates WHERE domain = $1`
if s.driver != "postgres" {
query = `SELECT email, cert_pem, key_pem, account_key, account_url, issuer_type, secret_name FROM certificates WHERE domain = ?`
}
row := s.db.QueryRowContext(ctx, query, domain)
cert := &CertStorage{Domain: domain}
// Updated Scan call to include &cert.SecretName
err := row.Scan(&cert.Email, &cert.CertPEM, &cert.KeyPEM, &cert.AccountKey, &cert.AccountURL, &cert.IssuerType, &cert.SecretName)
if errors.Is(err, sql.ErrNoRows) {
return nil, fmt.Errorf("certificate for domain %s not found", domain)
}
if err != nil {
return nil, fmt.Errorf("failed to scan certificate data for %s: %w", domain, err)
}
return cert, nil
}
// LoadAllCertificates retrieves all stored certificate resources
func (s *Storage) LoadAllCertificates(ctx context.Context) ([]*CertStorage, error) {
// Updated SELECT statement to include secret_name
query := `SELECT domain, email, cert_pem, key_pem, account_key, account_url, issuer_type, secret_name FROM certificates`
rows, err := s.db.QueryContext(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()
var certs []*CertStorage
for rows.Next() {
cert := &CertStorage{}
// Updated Scan call to include &cert.SecretName
if err := rows.Scan(&cert.Domain, &cert.Email, &cert.CertPEM, &cert.KeyPEM, &cert.AccountKey, &cert.AccountURL, &cert.IssuerType, &cert.SecretName); err != nil {
return nil, fmt.Errorf("failed to scan all certificate data: %w", err)
}
certs = append(certs, cert)
}
if err := rows.Err(); err != nil {
return nil, err
}
return certs, nil
}
// -----------------------------------------------------------------------------
// REST OF THE ORIGINAL CODE FOLLOWS...
// -----------------------------------------------------------------------------
// SaveCluster inserts or updates a cluster
func (s *Storage) SaveCluster(ctx context.Context, cluster *clusterv3.Cluster) error {
data, err := protojson.Marshal(cluster)
if err != nil {
return err
}
var query string
switch s.driver {
case "postgres":
// Explicitly set enabled=true on update to re-enable a logically deleted cluster
query = fmt.Sprintf(`
INSERT INTO clusters (name, data, enabled, updated_at)
VALUES (%s, %s, true, now())
ON CONFLICT (name) DO UPDATE SET data = %s, enabled = true, updated_at = now()`,
s.placeholder(1), s.placeholder(2), s.placeholder(2))
default: // SQLite
// Explicitly set enabled=1 on update to re-enable a logically deleted cluster
query = `
INSERT INTO clusters (name, data, enabled, updated_at)
VALUES (?, ?, 1, CURRENT_TIMESTAMP)
ON CONFLICT(name) DO UPDATE SET data=excluded.data, enabled=1, updated_at=CURRENT_TIMESTAMP`
}
_, err = s.db.ExecContext(ctx, query, cluster.GetName(), string(data))
return err
}
// SaveListener inserts or updates a listener
func (s *Storage) SaveListener(ctx context.Context, listener *listenerv3.Listener) error {
data, err := protojson.Marshal(listener)
if err != nil {
return err
}
var query string
switch s.driver {
case "postgres":
// Explicitly set enabled=true on update to re-enable a logically deleted listener
query = fmt.Sprintf(`
INSERT INTO listeners (name, data, enabled, updated_at)
VALUES (%s, %s, true, now())
ON CONFLICT (name) DO UPDATE SET data = %s, enabled = true, updated_at = now()`,
s.placeholder(1), s.placeholder(2), s.placeholder(2))
default: // SQLite
// Explicitly set enabled=1 on update to re-enable a logically deleted listener
query = `
INSERT INTO listeners (name, data, enabled, updated_at)
VALUES (?, ?, 1, CURRENT_TIMESTAMP)
ON CONFLICT(name) DO UPDATE SET data=excluded.data, enabled=1, updated_at=CURRENT_TIMESTAMP`
}
_, err = s.db.ExecContext(ctx, query, listener.GetName(), string(data))
return err
}
// SaveSecret inserts or updates a Secret
func (s *Storage) SaveSecret(ctx context.Context, secret *secretv3.Secret) error {
data, err := protojson.Marshal(secret)
if err != nil {
return err
}
var query string
switch s.driver {
case "postgres":
// Explicitly set enabled=true on update to re-enable a logically deleted secret
query = fmt.Sprintf(`
INSERT INTO secrets (name, data, enabled, updated_at)
VALUES (%s, %s, true, now())
ON CONFLICT (name) DO UPDATE SET data = %s, enabled = true, updated_at = now()`,
s.placeholder(1), s.placeholder(2), s.placeholder(2))
default: // SQLite
// Explicitly set enabled=1 on update to re-enable a logically deleted secret
query = `
INSERT INTO secrets (name, data, enabled, updated_at)
VALUES (?, ?, 1, CURRENT_TIMESTAMP)
ON CONFLICT(name) DO UPDATE SET data=excluded.data, enabled=1, updated_at=CURRENT_TIMESTAMP`
}
_, err = s.db.ExecContext(ctx, query, secret.GetName(), string(data))
return err
}
// -----------------------------------------------------------------------------
// LOAD ENABLED METHODS (UNCHANGED)
// -----------------------------------------------------------------------------
// LoadEnabledClusters retrieves all enabled clusters
func (s *Storage) LoadEnabledClusters(ctx context.Context) ([]*clusterv3.Cluster, error) {
query := `SELECT data FROM clusters`
if s.driver == "postgres" {
query += ` WHERE enabled = true`
} else {
query += ` WHERE enabled = 1`
}
rows, err := s.db.QueryContext(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()
var clusters []*clusterv3.Cluster
for rows.Next() {
var raw json.RawMessage
if s.driver != "postgres" {
var dataStr string
if err := rows.Scan(&dataStr); err != nil {
return nil, err
}
raw = json.RawMessage(dataStr)
} else {
if err := rows.Scan(&raw); err != nil {
return nil, err
}
}
var cluster clusterv3.Cluster
if err := protojson.Unmarshal(raw, &cluster); err != nil {
return nil, err
}
clusters = append(clusters, &cluster)
}
return clusters, nil
}
// LoadEnabledListeners retrieves all enabled listeners
func (s *Storage) LoadEnabledListeners(ctx context.Context) ([]*listenerv3.Listener, error) {
query := `SELECT data FROM listeners`
if s.driver == "postgres" {
query += ` WHERE enabled = true`
} else {
query += ` WHERE enabled = 1`
}
rows, err := s.db.QueryContext(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()
var listeners []*listenerv3.Listener
for rows.Next() {
var raw json.RawMessage
if s.driver != "postgres" {
var dataStr string
if err := rows.Scan(&dataStr); err != nil {
return nil, err
}
raw = json.RawMessage(dataStr)
} else {
if err := rows.Scan(&raw); err != nil {
return nil, err
}
}
var l listenerv3.Listener
if err := protojson.Unmarshal(raw, &l); err != nil {
return nil, err
}
listeners = append(listeners, &l)
}
return listeners, nil
}
// LoadEnabledSecrets retrieves all enabled secrets
func (s *Storage) LoadEnabledSecrets(ctx context.Context) ([]*secretv3.Secret, error) {
query := `SELECT data FROM secrets`
if s.driver == "postgres" {
query += ` WHERE enabled = true`
} else {
query += ` WHERE enabled = 1`
}
rows, err := s.db.QueryContext(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()
var secrets []*secretv3.Secret
for rows.Next() {
var raw json.RawMessage
if s.driver != "postgres" {
var dataStr string
if err := rows.Scan(&dataStr); err != nil {
return nil, err
}
raw = json.RawMessage(dataStr)
} else {
if err := rows.Scan(&raw); err != nil {
return nil, err
}
}
var secret secretv3.Secret
if err := protojson.Unmarshal(raw, &secret); err != nil {
return nil, err
}
secrets = append(secrets, &secret)
}
return secrets, nil
}
// -----------------------------------------------------------------------------
// LOAD ALL METHODS (UNCHANGED)
// -----------------------------------------------------------------------------
// LoadAllClusters retrieves all clusters, regardless of their enabled status
func (s *Storage) LoadAllClusters(ctx context.Context) ([]*clusterv3.Cluster, error) {
rows, err := s.db.QueryContext(ctx, `SELECT data FROM clusters`)
if err != nil {
return nil, err
}
defer rows.Close()
var clusters []*clusterv3.Cluster
for rows.Next() {
var raw json.RawMessage
if s.driver != "postgres" {
var dataStr string
if err := rows.Scan(&dataStr); err != nil {
return nil, err
}
raw = json.RawMessage(dataStr)
} else {
if err := rows.Scan(&raw); err != nil {
return nil, err
}
}
var cluster clusterv3.Cluster
if err := protojson.Unmarshal(raw, &cluster); err != nil {
return nil, err
}
clusters = append(clusters, &cluster)
}
return clusters, nil
}
// LoadAllListeners retrieves all listeners, regardless of their enabled status
func (s *Storage) LoadAllListeners(ctx context.Context) ([]*listenerv3.Listener, error) {
rows, err := s.db.QueryContext(ctx, `SELECT data FROM listeners`)
if err != nil {
return nil, err
}
defer rows.Close()
var listeners []*listenerv3.Listener
for rows.Next() {
var raw json.RawMessage
if s.driver != "postgres" {
var dataStr string
if err := rows.Scan(&dataStr); err != nil {
return nil, err
}
raw = json.RawMessage(dataStr)
} else {
if err := rows.Scan(&raw); err != nil {
return nil, err
}
}
var l listenerv3.Listener
if err := protojson.Unmarshal(raw, &l); err != nil {
return nil, err
}
listeners = append(listeners, &l)
}
return listeners, nil
}
// LoadAllSecrets retrieves all secrets, regardless of their enabled status
func (s *Storage) LoadAllSecrets(ctx context.Context) ([]*secretv3.Secret, error) {
rows, err := s.db.QueryContext(ctx, `SELECT data FROM secrets`)
if err != nil {
return nil, err
}
defer rows.Close()
var secrets []*secretv3.Secret
for rows.Next() {
var raw json.RawMessage
if s.driver != "postgres" {
var dataStr string
if err := rows.Scan(&dataStr); err != nil {
return nil, err
}
raw = json.RawMessage(dataStr)
} else {
if err := rows.Scan(&raw); err != nil {
return nil, err
}
}
var secret secretv3.Secret
if err := protojson.Unmarshal(raw, &secret); err != nil {
return nil, err
}
secrets = append(secrets, &secret)
}
return secrets, nil
}
// -----------------------------------------------------------------------------
// SNAPSHOT MANAGEMENT (UNCHANGED)
// -----------------------------------------------------------------------------
// SnapshotConfig aggregates xDS resources
type SnapshotConfig struct {
// Enabled resources (for xDS serving)
EnabledClusters []*clusterv3.Cluster
EnabledListeners []*listenerv3.Listener
EnabledSecrets []*secretv3.Secret // New SDS resource
// Disabled resources (for UI display)
DisabledClusters []*clusterv3.Cluster
DisabledListeners []*listenerv3.Listener
DisabledSecrets []*secretv3.Secret // New SDS resource
}
// RebuildSnapshot rebuilds full snapshot from DB
func (s *Storage) RebuildSnapshot(ctx context.Context) (*SnapshotConfig, error) {
// 1. Load Enabled Resources (for xDS serving)
enabledClusters, err := s.LoadEnabledClusters(ctx)
if err != nil {
return nil, err
}
enabledListeners, err := s.LoadEnabledListeners(ctx)
if err != nil {
return nil, err
}
enabledSecrets, err := s.LoadEnabledSecrets(ctx)
if err != nil {
return nil, err
}
// 2. Load ALL Resources (for comparison and disabled set)
allClusters, err := s.LoadAllClusters(ctx)
if err != nil {
return nil, err
}
allListeners, err := s.LoadAllListeners(ctx)
if err != nil {
return nil, err
}
allSecrets, err := s.LoadAllSecrets(ctx)
if err != nil {
return nil, err
}
// 3. Separate Disabled Resources
// Clusters
enabledClusterNames := make(map[string]struct{}, len(enabledClusters))
for _, c := range enabledClusters {
enabledClusterNames[c.GetName()] = struct{}{}
}
var disabledClusters []*clusterv3.Cluster
for _, c := range allClusters {
if _, found := enabledClusterNames[c.GetName()]; !found {
disabledClusters = append(disabledClusters, c)
}
}
// Listeners
enabledListenerNames := make(map[string]struct{}, len(enabledListeners))
for _, l := range enabledListeners {
enabledListenerNames[l.GetName()] = struct{}{}
}
var disabledListeners []*listenerv3.Listener
for _, l := range allListeners {
if _, found := enabledListenerNames[l.GetName()]; !found {
disabledListeners = append(disabledListeners, l)
}
}
// Secrets
enabledSecretNames := make(map[string]struct{}, len(enabledSecrets))
for _, sec := range enabledSecrets {
enabledSecretNames[sec.GetName()] = struct{}{}
}
var disabledSecrets []*secretv3.Secret
for _, sec := range allSecrets {
if _, found := enabledSecretNames[sec.GetName()]; !found {
disabledSecrets = append(disabledSecrets, sec)
}
}
return &SnapshotConfig{
EnabledClusters: enabledClusters,
EnabledListeners: enabledListeners,
EnabledSecrets: enabledSecrets,
DisabledClusters: disabledClusters,
DisabledListeners: disabledListeners,
DisabledSecrets: disabledSecrets,
}, nil
}
// SaveSnapshot saves the entire snapshot to the DB
func (s *Storage) SaveSnapshot(ctx context.Context, cfg *SnapshotConfig, strategy DeleteStrategy) error {
if cfg == nil {
return fmt.Errorf("SnapshotConfig is nil")
}
// Use a transaction for atomicity
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer func() {
if err != nil {
tx.Rollback()
return
}
err = tx.Commit()
}()
// --- 1. Save/Upsert Clusters and Collect Names ---
clusterNames := make([]string, 0, len(cfg.EnabledClusters))
for _, c := range cfg.EnabledClusters {
// NOTE: This uses the existing SaveCluster which doesn't use the transaction 'tx'
if err = s.SaveCluster(ctx, c); err != nil {
return fmt.Errorf("failed to save cluster %s: %w", c.GetName(), err)
}
clusterNames = append(clusterNames, c.GetName())
}
// --- 2. Save/Upsert Listeners and Collect Names ---
listenerNames := make([]string, 0, len(cfg.EnabledListeners))
for _, l := range cfg.EnabledListeners {
// NOTE: This uses the existing SaveListener which doesn't use the transaction 'tx'
if err = s.SaveListener(ctx, l); err != nil {
return fmt.Errorf("failed to save listener %s: %w", l.GetName(), err)
}
listenerNames = append(listenerNames, l.GetName())
}
// --- 3. Save/Upsert Secrets and Collect Names ---
secretNames := make([]string, 0, len(cfg.EnabledSecrets))
for _, sec := range cfg.EnabledSecrets {
// NOTE: This uses the existing SaveSecret which doesn't use the transaction 'tx'
if err = s.SaveSecret(ctx, sec); err != nil {
return fmt.Errorf("failed to save secret %s: %w", sec.GetName(), err)
}
secretNames = append(secretNames, sec.GetName())
}
// --- 4. Apply Deletion Strategy ---
switch strategy {
case DeleteLogical:
// Logical Delete (Disable) for all resource types
if err = s.disableMissingResources(ctx, "clusters", clusterNames); err != nil {
return fmt.Errorf("failed to logically delete missing clusters: %w", err)
}
if err = s.disableMissingResources(ctx, "listeners", listenerNames); err != nil {
return fmt.Errorf("failed to logically delete missing listeners: %w", err)
}
if err = s.disableMissingResources(ctx, "secrets", secretNames); err != nil {
return fmt.Errorf("failed to logically delete missing secrets: %w", err)
}
case DeleteActual:
// Actual Delete (Physical Removal) for all resources
if err = s.deleteMissingResources(ctx, "clusters", clusterNames); err != nil {
return fmt.Errorf("failed to physically delete missing clusters: %w", err)
}
if err = s.deleteMissingResources(ctx, "listeners", listenerNames); err != nil {
return fmt.Errorf("failed to physically delete missing listeners: %w", err)
}
if err = s.deleteMissingResources(ctx, "secrets", secretNames); err != nil {
return fmt.Errorf("failed to physically delete missing secrets: %w", err)
}
case DeleteNone:
// Do nothing for missing resources
return nil
}
return err
}
// -----------------------------------------------------------------------------
// ENABLE/DISABLE & DELETE METHODS (UNCHANGED)
// -----------------------------------------------------------------------------
// EnableCluster toggles a cluster
func (s *Storage) EnableCluster(ctx context.Context, name string, enabled bool) error {
query := `UPDATE clusters SET enabled = ?, updated_at = CURRENT_TIMESTAMP WHERE name = ?`
if s.driver == "postgres" {
query = `UPDATE clusters SET enabled = $1, updated_at = now() WHERE name = $2`
}
_, err := s.db.ExecContext(ctx, query, enabled, name)
return err
}
// EnableListener toggles a listener
func (s *Storage) EnableListener(ctx context.Context, name string, enabled bool) error {
query := `UPDATE listeners SET enabled = ?, updated_at = CURRENT_TIMESTAMP WHERE name = ?`
if s.driver == "postgres" {
query = `UPDATE listeners SET enabled = $1, updated_at = now() WHERE name = $2`
}
_, err := s.db.ExecContext(ctx, query, enabled, name)
return err
}
// EnableSecret toggles a secret
func (s *Storage) EnableSecret(ctx context.Context, name string, enabled bool) error {
query := `UPDATE secrets SET enabled = ?, updated_at = CURRENT_TIMESTAMP WHERE name = ?`
if s.driver == "postgres" {
query = `UPDATE secrets SET enabled = $1, updated_at = now() WHERE name = $2`
}
_, err := s.db.ExecContext(ctx, query, enabled, name)
return err
}
// RemoveListener deletes a listener by name
func (s *Storage) RemoveListener(ctx context.Context, name string) error {
query := `DELETE FROM listeners WHERE name = ?`
if s.driver == "postgres" {
query = `DELETE FROM listeners WHERE name = $1`
}
_, err := s.db.ExecContext(ctx, query, name)
return err
}
// RemoveCluster deletes a cluster by name
func (s *Storage) RemoveCluster(ctx context.Context, name string) error {
query := `DELETE FROM clusters WHERE name = ?`
if s.driver == "postgres" {
query = `DELETE FROM clusters WHERE name = $1`
}
_, err := s.db.ExecContext(ctx, query, name)
return err
}
// RemoveSecret deletes a secret by name
func (s *Storage) RemoveSecret(ctx context.Context, name string) error {
query := `DELETE FROM secrets WHERE name = ?`
if s.driver == "postgres" {
query = `DELETE FROM secrets WHERE name = $1`
}
_, err := s.db.ExecContext(ctx, query, name)
return err
}
// disableMissingResources updates the 'enabled' status for resources in 'table'
// whose 'name' is NOT in 'inputNames'.
func (s *Storage) disableMissingResources(ctx context.Context, table string, inputNames []string) error {
if table != "clusters" && table != "listeners" && table != "secrets" {
return fmt.Errorf("logical delete (disable) is only supported for tables with an 'enabled' column (clusters, listeners, secrets)")
}
// 1. Build placeholders and args
placeholders := make([]string, len(inputNames))
args := make([]interface{}, len(inputNames))
for i, name := range inputNames {
if s.driver == "postgres" {
placeholders[i] = fmt.Sprintf("$%d", i+1)
} else {
placeholders[i] = "?"
}
args[i] = name
}
disabledValue := "false"
if s.driver != "postgres" {
disabledValue = "0"
}
var updateTime string
if s.driver == "postgres" {
updateTime = "now()"
} else {
updateTime = "CURRENT_TIMESTAMP"
}
// If no names are provided, disable ALL currently enabled resources
whereClause := ""
if len(inputNames) > 0 {
whereClause = fmt.Sprintf("WHERE name NOT IN (%s)", strings.Join(placeholders, ", "))
}
// 2. Construct and execute the UPDATE query
query := fmt.Sprintf(`
UPDATE %s
SET enabled = %s, updated_at = %s
%s`,
table, disabledValue, updateTime, whereClause)
_, err := s.db.ExecContext(ctx, query, args...)
return err
}
// deleteMissingResources physically deletes resources from 'table' whose 'name' is NOT in 'inputNames'.
func (s *Storage) deleteMissingResources(ctx context.Context, table string, inputNames []string) error {
if table != "clusters" && table != "listeners" && table != "secrets" {
return fmt.Errorf("physical delete is only supported for tables: clusters, listeners, secrets")
}
// 1. Build placeholders and args
placeholders := make([]string, len(inputNames))
args := make([]interface{}, len(inputNames))
for i, name := range inputNames {
if s.driver == "postgres" {
placeholders[i] = fmt.Sprintf("$%d", i+1)
} else {
placeholders[i] = "?"
}
args[i] = name
}
// If no names are provided, delete ALL resources
whereClause := ""
if len(inputNames) > 0 {
whereClause = fmt.Sprintf("WHERE name NOT IN (%s)", strings.Join(placeholders, ", "))
}
// 2. Construct and execute the DELETE query
query := fmt.Sprintf(`
DELETE FROM %s
%s`,
table, whereClause)
_, err := s.db.ExecContext(ctx, query, args...)
return err
}