package storage import ( "context" "database/sql" "encoding/json" "fmt" "strings" clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" secretv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" // SDS Import "google.golang.org/protobuf/encoding/protojson" ) // Storage abstracts database persistence type Storage struct { db *sql.DB driver string } // DeleteStrategy defines the action to take on missing resources type DeleteStrategy int const ( // DeleteNone performs only UPSERT for items in the list (default behavior) DeleteNone DeleteStrategy = iota // DeleteLogical marks missing resources as disabled (applicable to clusters, listeners, and secrets) DeleteLogical // DeleteActual removes missing resources physically from the database DeleteActual ) // NewStorage initializes a Storage instance func NewStorage(db *sql.DB, driver string) *Storage { return &Storage{db: db, driver: driver} } // placeholder returns correct SQL placeholder based on driver func (s *Storage) placeholder(n int) string { if s.driver == "postgres" { return fmt.Sprintf("$%d", n) } return "?" } // InitSchema ensures required tables exist func (s *Storage) InitSchema(ctx context.Context) error { var schema string switch s.driver { case "postgres": schema = ` CREATE TABLE IF NOT EXISTS clusters ( id SERIAL PRIMARY KEY, name TEXT UNIQUE NOT NULL, data JSONB NOT NULL, enabled BOOLEAN DEFAULT true, updated_at TIMESTAMP DEFAULT now() ); CREATE TABLE IF NOT EXISTS listeners ( id SERIAL PRIMARY KEY, name TEXT UNIQUE NOT NULL, data JSONB NOT NULL, enabled BOOLEAN DEFAULT true, updated_at TIMESTAMP DEFAULT now() ); -- SDS secrets table for Postgres CREATE TABLE IF NOT EXISTS secrets ( id SERIAL PRIMARY KEY, name TEXT UNIQUE NOT NULL, data JSONB NOT NULL, enabled BOOLEAN DEFAULT true, updated_at TIMESTAMP DEFAULT now() );` default: // SQLite schema = ` CREATE TABLE IF NOT EXISTS clusters ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE NOT NULL, data TEXT NOT NULL, enabled BOOLEAN DEFAULT 1, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS listeners ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE NOT NULL, data TEXT NOT NULL, enabled BOOLEAN DEFAULT 1, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- SDS secrets table for SQLite CREATE TABLE IF NOT EXISTS secrets ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE NOT NULL, data TEXT NOT NULL, enabled BOOLEAN DEFAULT 1, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );` } _, err := s.db.ExecContext(ctx, schema) return err } // ----------------------------------------------------------------------------- // SAVE METHODS (UPSERT) // ----------------------------------------------------------------------------- // SaveCluster inserts or updates a cluster func (s *Storage) SaveCluster(ctx context.Context, cluster *clusterv3.Cluster) error { data, err := protojson.Marshal(cluster) if err != nil { return err } var query string switch s.driver { case "postgres": // Explicitly set enabled=true on update to re-enable a logically deleted cluster query = fmt.Sprintf(` INSERT INTO clusters (name, data, enabled, updated_at) VALUES (%s, %s, true, now()) ON CONFLICT (name) DO UPDATE SET data = %s, enabled = true, updated_at = now()`, s.placeholder(1), s.placeholder(2), s.placeholder(2)) default: // SQLite // Explicitly set enabled=1 on update to re-enable a logically deleted cluster query = ` INSERT INTO clusters (name, data, enabled, updated_at) VALUES (?, ?, 1, CURRENT_TIMESTAMP) ON CONFLICT(name) DO UPDATE SET data=excluded.data, enabled=1, updated_at=CURRENT_TIMESTAMP` } _, err = s.db.ExecContext(ctx, query, cluster.GetName(), string(data)) return err } // SaveListener inserts or updates a listener func (s *Storage) SaveListener(ctx context.Context, listener *listenerv3.Listener) error { data, err := protojson.Marshal(listener) if err != nil { return err } var query string switch s.driver { case "postgres": // Explicitly set enabled=true on update to re-enable a logically deleted listener query = fmt.Sprintf(` INSERT INTO listeners (name, data, enabled, updated_at) VALUES (%s, %s, true, now()) ON CONFLICT (name) DO UPDATE SET data = %s, enabled = true, updated_at = now()`, s.placeholder(1), s.placeholder(2), s.placeholder(2)) default: // SQLite // Explicitly set enabled=1 on update to re-enable a logically deleted listener query = ` INSERT INTO listeners (name, data, enabled, updated_at) VALUES (?, ?, 1, CURRENT_TIMESTAMP) ON CONFLICT(name) DO UPDATE SET data=excluded.data, enabled=1, updated_at=CURRENT_TIMESTAMP` } _, err = s.db.ExecContext(ctx, query, listener.GetName(), string(data)) return err } // SaveSecret inserts or updates a Secret func (s *Storage) SaveSecret(ctx context.Context, secret *secretv3.Secret) error { data, err := protojson.Marshal(secret) if err != nil { return err } var query string switch s.driver { case "postgres": // Explicitly set enabled=true on update to re-enable a logically deleted secret query = fmt.Sprintf(` INSERT INTO secrets (name, data, enabled, updated_at) VALUES (%s, %s, true, now()) ON CONFLICT (name) DO UPDATE SET data = %s, enabled = true, updated_at = now()`, s.placeholder(1), s.placeholder(2), s.placeholder(2)) default: // SQLite // Explicitly set enabled=1 on update to re-enable a logically deleted secret query = ` INSERT INTO secrets (name, data, enabled, updated_at) VALUES (?, ?, 1, CURRENT_TIMESTAMP) ON CONFLICT(name) DO UPDATE SET data=excluded.data, enabled=1, updated_at=CURRENT_TIMESTAMP` } _, err = s.db.ExecContext(ctx, query, secret.GetName(), string(data)) return err } // ----------------------------------------------------------------------------- // LOAD ENABLED METHODS // ----------------------------------------------------------------------------- // LoadEnabledClusters retrieves all enabled clusters func (s *Storage) LoadEnabledClusters(ctx context.Context) ([]*clusterv3.Cluster, error) { query := `SELECT data FROM clusters` if s.driver == "postgres" { query += ` WHERE enabled = true` } else { query += ` WHERE enabled = 1` } rows, err := s.db.QueryContext(ctx, query) if err != nil { return nil, err } defer rows.Close() var clusters []*clusterv3.Cluster for rows.Next() { var raw json.RawMessage if s.driver != "postgres" { var dataStr string if err := rows.Scan(&dataStr); err != nil { return nil, err } raw = json.RawMessage(dataStr) } else { if err := rows.Scan(&raw); err != nil { return nil, err } } var cluster clusterv3.Cluster if err := protojson.Unmarshal(raw, &cluster); err != nil { return nil, err } clusters = append(clusters, &cluster) } return clusters, nil } // LoadEnabledListeners retrieves all enabled listeners func (s *Storage) LoadEnabledListeners(ctx context.Context) ([]*listenerv3.Listener, error) { query := `SELECT data FROM listeners` if s.driver == "postgres" { query += ` WHERE enabled = true` } else { query += ` WHERE enabled = 1` } rows, err := s.db.QueryContext(ctx, query) if err != nil { return nil, err } defer rows.Close() var listeners []*listenerv3.Listener for rows.Next() { var raw json.RawMessage if s.driver != "postgres" { var dataStr string if err := rows.Scan(&dataStr); err != nil { return nil, err } raw = json.RawMessage(dataStr) } else { if err := rows.Scan(&raw); err != nil { return nil, err } } var l listenerv3.Listener if err := protojson.Unmarshal(raw, &l); err != nil { return nil, err } listeners = append(listeners, &l) } return listeners, nil } // LoadEnabledSecrets retrieves all enabled secrets func (s *Storage) LoadEnabledSecrets(ctx context.Context) ([]*secretv3.Secret, error) { query := `SELECT data FROM secrets` if s.driver == "postgres" { query += ` WHERE enabled = true` } else { query += ` WHERE enabled = 1` } rows, err := s.db.QueryContext(ctx, query) if err != nil { return nil, err } defer rows.Close() var secrets []*secretv3.Secret for rows.Next() { var raw json.RawMessage if s.driver != "postgres" { var dataStr string if err := rows.Scan(&dataStr); err != nil { return nil, err } raw = json.RawMessage(dataStr) } else { if err := rows.Scan(&raw); err != nil { return nil, err } } var secret secretv3.Secret if err := protojson.Unmarshal(raw, &secret); err != nil { return nil, err } secrets = append(secrets, &secret) } return secrets, nil } // ----------------------------------------------------------------------------- // LOAD ALL METHODS // ----------------------------------------------------------------------------- // LoadAllClusters retrieves all clusters, regardless of their enabled status func (s *Storage) LoadAllClusters(ctx context.Context) ([]*clusterv3.Cluster, error) { rows, err := s.db.QueryContext(ctx, `SELECT data FROM clusters`) if err != nil { return nil, err } defer rows.Close() var clusters []*clusterv3.Cluster for rows.Next() { var raw json.RawMessage if s.driver != "postgres" { var dataStr string if err := rows.Scan(&dataStr); err != nil { return nil, err } raw = json.RawMessage(dataStr) } else { if err := rows.Scan(&raw); err != nil { return nil, err } } var cluster clusterv3.Cluster if err := protojson.Unmarshal(raw, &cluster); err != nil { return nil, err } clusters = append(clusters, &cluster) } return clusters, nil } // LoadAllListeners retrieves all listeners, regardless of their enabled status func (s *Storage) LoadAllListeners(ctx context.Context) ([]*listenerv3.Listener, error) { rows, err := s.db.QueryContext(ctx, `SELECT data FROM listeners`) if err != nil { return nil, err } defer rows.Close() var listeners []*listenerv3.Listener for rows.Next() { var raw json.RawMessage if s.driver != "postgres" { var dataStr string if err := rows.Scan(&dataStr); err != nil { return nil, err } raw = json.RawMessage(dataStr) } else { if err := rows.Scan(&raw); err != nil { return nil, err } } var l listenerv3.Listener if err := protojson.Unmarshal(raw, &l); err != nil { return nil, err } listeners = append(listeners, &l) } return listeners, nil } // LoadAllSecrets retrieves all secrets, regardless of their enabled status func (s *Storage) LoadAllSecrets(ctx context.Context) ([]*secretv3.Secret, error) { rows, err := s.db.QueryContext(ctx, `SELECT data FROM secrets`) if err != nil { return nil, err } defer rows.Close() var secrets []*secretv3.Secret for rows.Next() { var raw json.RawMessage if s.driver != "postgres" { var dataStr string if err := rows.Scan(&dataStr); err != nil { return nil, err } raw = json.RawMessage(dataStr) } else { if err := rows.Scan(&raw); err != nil { return nil, err } } var secret secretv3.Secret if err := protojson.Unmarshal(raw, &secret); err != nil { return nil, err } secrets = append(secrets, &secret) } return secrets, nil } // ----------------------------------------------------------------------------- // SNAPSHOT MANAGEMENT // ----------------------------------------------------------------------------- // SnapshotConfig aggregates xDS resources type SnapshotConfig struct { // Enabled resources (for xDS serving) EnabledClusters []*clusterv3.Cluster EnabledListeners []*listenerv3.Listener EnabledSecrets []*secretv3.Secret // New SDS resource // Disabled resources (for UI display) DisabledClusters []*clusterv3.Cluster DisabledListeners []*listenerv3.Listener DisabledSecrets []*secretv3.Secret // New SDS resource } // RebuildSnapshot rebuilds full snapshot from DB func (s *Storage) RebuildSnapshot(ctx context.Context) (*SnapshotConfig, error) { // 1. Load Enabled Resources (for xDS serving) enabledClusters, err := s.LoadEnabledClusters(ctx) if err != nil { return nil, err } enabledListeners, err := s.LoadEnabledListeners(ctx) if err != nil { return nil, err } enabledSecrets, err := s.LoadEnabledSecrets(ctx) if err != nil { return nil, err } // 2. Load ALL Resources (for comparison and disabled set) allClusters, err := s.LoadAllClusters(ctx) if err != nil { return nil, err } allListeners, err := s.LoadAllListeners(ctx) if err != nil { return nil, err } allSecrets, err := s.LoadAllSecrets(ctx) if err != nil { return nil, err } // 3. Separate Disabled Resources // Clusters enabledClusterNames := make(map[string]struct{}, len(enabledClusters)) for _, c := range enabledClusters { enabledClusterNames[c.GetName()] = struct{}{} } var disabledClusters []*clusterv3.Cluster for _, c := range allClusters { if _, found := enabledClusterNames[c.GetName()]; !found { disabledClusters = append(disabledClusters, c) } } // Listeners enabledListenerNames := make(map[string]struct{}, len(enabledListeners)) for _, l := range enabledListeners { enabledListenerNames[l.GetName()] = struct{}{} } var disabledListeners []*listenerv3.Listener for _, l := range allListeners { if _, found := enabledListenerNames[l.GetName()]; !found { disabledListeners = append(disabledListeners, l) } } // Secrets enabledSecretNames := make(map[string]struct{}, len(enabledSecrets)) for _, sec := range enabledSecrets { enabledSecretNames[sec.GetName()] = struct{}{} } var disabledSecrets []*secretv3.Secret for _, sec := range allSecrets { if _, found := enabledSecretNames[sec.GetName()]; !found { disabledSecrets = append(disabledSecrets, sec) } } return &SnapshotConfig{ EnabledClusters: enabledClusters, EnabledListeners: enabledListeners, EnabledSecrets: enabledSecrets, DisabledClusters: disabledClusters, DisabledListeners: disabledListeners, DisabledSecrets: disabledSecrets, }, nil } // SaveSnapshot saves the entire snapshot to the DB func (s *Storage) SaveSnapshot(ctx context.Context, cfg *SnapshotConfig, strategy DeleteStrategy) error { if cfg == nil { return fmt.Errorf("SnapshotConfig is nil") } // Use a transaction for atomicity tx, err := s.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("failed to begin transaction: %w", err) } defer func() { if err != nil { tx.Rollback() return } err = tx.Commit() }() // --- 1. Save/Upsert Clusters and Collect Names --- clusterNames := make([]string, 0, len(cfg.EnabledClusters)) for _, c := range cfg.EnabledClusters { if err = s.SaveCluster(ctx, c); err != nil { return fmt.Errorf("failed to save cluster %s: %w", c.GetName(), err) } clusterNames = append(clusterNames, c.GetName()) } // --- 2. Save/Upsert Listeners and Collect Names --- listenerNames := make([]string, 0, len(cfg.EnabledListeners)) for _, l := range cfg.EnabledListeners { if err = s.SaveListener(ctx, l); err != nil { return fmt.Errorf("failed to save listener %s: %w", l.GetName(), err) } listenerNames = append(listenerNames, l.GetName()) } // --- 3. Save/Upsert Secrets and Collect Names --- secretNames := make([]string, 0, len(cfg.EnabledSecrets)) for _, sec := range cfg.EnabledSecrets { if err = s.SaveSecret(ctx, sec); err != nil { return fmt.Errorf("failed to save secret %s: %w", sec.GetName(), err) } secretNames = append(secretNames, sec.GetName()) } // --- 4. Apply Deletion Strategy --- switch strategy { case DeleteLogical: // Logical Delete (Disable) for all resource types if err = s.disableMissingResources(ctx, "clusters", clusterNames); err != nil { return fmt.Errorf("failed to logically delete missing clusters: %w", err) } if err = s.disableMissingResources(ctx, "listeners", listenerNames); err != nil { return fmt.Errorf("failed to logically delete missing listeners: %w", err) } if err = s.disableMissingResources(ctx, "secrets", secretNames); err != nil { return fmt.Errorf("failed to logically delete missing secrets: %w", err) } case DeleteActual: // Actual Delete (Physical Removal) for all resources if err = s.deleteMissingResources(ctx, "clusters", clusterNames); err != nil { return fmt.Errorf("failed to physically delete missing clusters: %w", err) } if err = s.deleteMissingResources(ctx, "listeners", listenerNames); err != nil { return fmt.Errorf("failed to physically delete missing listeners: %w", err) } if err = s.deleteMissingResources(ctx, "secrets", secretNames); err != nil { return fmt.Errorf("failed to physically delete missing secrets: %w", err) } case DeleteNone: // Do nothing for missing resources return nil } return err } // ----------------------------------------------------------------------------- // ENABLE/DISABLE & DELETE METHODS // ----------------------------------------------------------------------------- // EnableCluster toggles a cluster func (s *Storage) EnableCluster(ctx context.Context, name string, enabled bool) error { query := `UPDATE clusters SET enabled = ?, updated_at = CURRENT_TIMESTAMP WHERE name = ?` if s.driver == "postgres" { query = `UPDATE clusters SET enabled = $1, updated_at = now() WHERE name = $2` } _, err := s.db.ExecContext(ctx, query, enabled, name) return err } // EnableListener toggles a listener func (s *Storage) EnableListener(ctx context.Context, name string, enabled bool) error { query := `UPDATE listeners SET enabled = ?, updated_at = CURRENT_TIMESTAMP WHERE name = ?` if s.driver == "postgres" { query = `UPDATE listeners SET enabled = $1, updated_at = now() WHERE name = $2` } _, err := s.db.ExecContext(ctx, query, enabled, name) return err } // EnableSecret toggles a secret func (s *Storage) EnableSecret(ctx context.Context, name string, enabled bool) error { query := `UPDATE secrets SET enabled = ?, updated_at = CURRENT_TIMESTAMP WHERE name = ?` if s.driver == "postgres" { query = `UPDATE secrets SET enabled = $1, updated_at = now() WHERE name = $2` } _, err := s.db.ExecContext(ctx, query, enabled, name) return err } // RemoveListener deletes a listener by name func (s *Storage) RemoveListener(ctx context.Context, name string) error { query := `DELETE FROM listeners WHERE name = ?` if s.driver == "postgres" { query = `DELETE FROM listeners WHERE name = $1` } _, err := s.db.ExecContext(ctx, query, name) return err } // RemoveCluster deletes a cluster by name func (s *Storage) RemoveCluster(ctx context.Context, name string) error { query := `DELETE FROM clusters WHERE name = ?` if s.driver == "postgres" { query = `DELETE FROM clusters WHERE name = $1` } _, err := s.db.ExecContext(ctx, query, name) return err } // RemoveSecret deletes a secret by name func (s *Storage) RemoveSecret(ctx context.Context, name string) error { query := `DELETE FROM secrets WHERE name = ?` if s.driver == "postgres" { query = `DELETE FROM secrets WHERE name = $1` } _, err := s.db.ExecContext(ctx, query, name) return err } // disableMissingResources updates the 'enabled' status for resources in 'table' // whose 'name' is NOT in 'inputNames'. func (s *Storage) disableMissingResources(ctx context.Context, table string, inputNames []string) error { if table != "clusters" && table != "listeners" && table != "secrets" { return fmt.Errorf("logical delete (disable) is only supported for tables with an 'enabled' column (clusters, listeners, secrets)") } // 1. Build placeholders and args placeholders := make([]string, len(inputNames)) args := make([]interface{}, len(inputNames)) for i, name := range inputNames { if s.driver == "postgres" { placeholders[i] = fmt.Sprintf("$%d", i+1) } else { placeholders[i] = "?" } args[i] = name } disabledValue := "false" if s.driver != "postgres" { disabledValue = "0" } var updateTime string if s.driver == "postgres" { updateTime = "now()" } else { updateTime = "CURRENT_TIMESTAMP" } // If no names are provided, disable ALL currently enabled resources whereClause := "" if len(inputNames) > 0 { whereClause = fmt.Sprintf("WHERE name NOT IN (%s)", strings.Join(placeholders, ", ")) } // 2. Construct and execute the UPDATE query query := fmt.Sprintf(` UPDATE %s SET enabled = %s, updated_at = %s %s`, table, disabledValue, updateTime, whereClause) _, err := s.db.ExecContext(ctx, query, args...) return err } // deleteMissingResources physically deletes resources from 'table' whose 'name' is NOT in 'inputNames'. func (s *Storage) deleteMissingResources(ctx context.Context, table string, inputNames []string) error { if table != "clusters" && table != "listeners" && table != "secrets" { return fmt.Errorf("physical delete is only supported for tables: clusters, listeners, secrets") } // 1. Build placeholders and args placeholders := make([]string, len(inputNames)) args := make([]interface{}, len(inputNames)) for i, name := range inputNames { if s.driver == "postgres" { placeholders[i] = fmt.Sprintf("$%d", i+1) } else { placeholders[i] = "?" } args[i] = name } // If no names are provided, delete ALL resources whereClause := "" if len(inputNames) > 0 { whereClause = fmt.Sprintf("WHERE name NOT IN (%s)", strings.Join(placeholders, ", ")) } // 2. Construct and execute the DELETE query query := fmt.Sprintf(` DELETE FROM %s %s`, table, whereClause) _, err := s.db.ExecContext(ctx, query, args...) return err }