Newer
Older
EnvoyControlPlane / internal / pkg / storage / storage.go
package storage

import (
	"context"
	"database/sql"
	"encoding/json"
	"errors"
	"fmt"
	"strings"

	clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
	listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
	secretv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" // SDS Import

	"google.golang.org/protobuf/encoding/protojson"
)

// Storage abstracts database persistence
type Storage struct {
	db     *sql.DB
	driver string
}

// DeleteStrategy defines the action to take on missing resources
type DeleteStrategy int

const (
	// DeleteNone performs only UPSERT for items in the list (default behavior)
	DeleteNone DeleteStrategy = iota
	// DeleteLogical marks missing resources as disabled (applicable to clusters, listeners, and secrets)
	DeleteLogical
	// DeleteActual removes missing resources physically from the database
	DeleteActual
)

// NewStorage initializes a Storage instance
func NewStorage(db *sql.DB, driver string) *Storage {
	return &Storage{db: db, driver: driver}
}

// placeholder returns correct SQL placeholder based on driver
func (s *Storage) placeholder(n int) string {
	if s.driver == "postgres" {
		return fmt.Sprintf("$%d", n)
	}
	return "?"
}

// CertStorage represents the persistent data needed for certificate renewal.
// This mirrors the data that was previously stored in the internalcertapi.Certificate.
type CertStorage struct {
	Domain     string // The certificate domain (used as the primary key)
	Email      string // The ACME account email
	CertPEM    []byte // The current certificate (public part + chain)
	KeyPEM     []byte // The domain's private key
	AccountKey []byte // The ACME Account private key D-value (for signing)
	AccountURL string // The ACME Account URI (KID)
	IssuerType string // The type of issuer (e.g., "LetsEncrypt"). Default to ""
	SecretName string // The name of the SDS Secret this certificate is associated with. Empty if unlinked/manual.
}

// InitSchema ensures required tables exist
func (s *Storage) InitSchema(ctx context.Context) error {
	var schema string
	switch s.driver {
	case "postgres":
		schema = `
        CREATE TABLE IF NOT EXISTS clusters (
            id SERIAL PRIMARY KEY,
            name TEXT UNIQUE NOT NULL,
            data JSONB NOT NULL,
            enabled BOOLEAN DEFAULT true,
            updated_at TIMESTAMP DEFAULT now()
        );
        CREATE TABLE IF NOT EXISTS listeners (
            id SERIAL PRIMARY KEY,
            name TEXT UNIQUE NOT NULL,
            data JSONB NOT NULL,
            enabled BOOLEAN DEFAULT true,
            updated_at TIMESTAMP DEFAULT now()
        );
        -- SDS secrets table for Postgres
        CREATE TABLE IF NOT EXISTS secrets (
            id SERIAL PRIMARY KEY,
            name TEXT UNIQUE NOT NULL,
            data JSONB NOT NULL,
            enabled BOOLEAN DEFAULT true,
            updated_at TIMESTAMP DEFAULT now()
        );
        -- 👇 UPDATED CERTIFICATE TABLE FOR ACME RENEWAL
        CREATE TABLE IF NOT EXISTS certificates (
            domain TEXT PRIMARY KEY,
            email TEXT NOT NULL,
            cert_pem BYTEA NOT NULL,
            key_pem BYTEA NOT NULL,
            account_key BYTEA NOT NULL,
            account_url TEXT NOT NULL,
            issuer_type TEXT DEFAULT '',
            secret_name TEXT DEFAULT '', -- New field
            updated_at TIMESTAMP DEFAULT now()
        );`
	default: // SQLite
		schema = `
        CREATE TABLE IF NOT EXISTS clusters (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT UNIQUE NOT NULL,
            data TEXT NOT NULL,
            enabled BOOLEAN DEFAULT 1,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        CREATE TABLE IF NOT EXISTS listeners (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT UNIQUE NOT NULL,
            data TEXT NOT NULL,
            enabled BOOLEAN DEFAULT 1,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        -- SDS secrets table for SQLite
        CREATE TABLE IF NOT EXISTS secrets (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT UNIQUE NOT NULL,
            data TEXT NOT NULL,
            enabled BOOLEAN DEFAULT 1,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        -- 👇 UPDATED CERTIFICATE TABLE FOR ACME RENEWAL
        CREATE TABLE IF NOT EXISTS certificates (
            domain TEXT PRIMARY KEY,
            email TEXT NOT NULL,
            cert_pem BLOB NOT NULL,
            key_pem BLOB NOT NULL,
            account_key BLOB NOT NULL,
            account_url TEXT NOT NULL,
            issuer_type TEXT DEFAULT '',
            secret_name TEXT DEFAULT '', -- New field
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );`
	}
	_, err := s.db.ExecContext(ctx, schema)
	return err
}

// -----------------------------------------------------------------------------
// NEW CERTIFICATE METHODS (UPSERT & LOAD)
// -----------------------------------------------------------------------------

// SaveCertificate inserts or updates a certificate resource
func (s *Storage) SaveCertificate(ctx context.Context, cert *CertStorage) error {
	var query string
	switch s.driver {
	case "postgres":
		query = fmt.Sprintf(`
            INSERT INTO certificates (domain, email, cert_pem, key_pem, account_key, account_url, issuer_type, secret_name, updated_at)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, now())
            ON CONFLICT (domain) DO UPDATE SET 
                email = EXCLUDED.email, 
                cert_pem = EXCLUDED.cert_pem, 
                key_pem = EXCLUDED.key_pem, 
                account_key = EXCLUDED.account_key, 
                account_url = EXCLUDED.account_url, 
                issuer_type = EXCLUDED.issuer_type, 
                secret_name = EXCLUDED.secret_name, -- Updated field
                updated_at = now()`,
			s.placeholder(1), s.placeholder(2), s.placeholder(3), s.placeholder(4), s.placeholder(5), s.placeholder(6), s.placeholder(7), s.placeholder(8))
	default: // SQLite
		query = `
            INSERT INTO certificates (domain, email, cert_pem, key_pem, account_key, account_url, issuer_type, secret_name, updated_at)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
            ON CONFLICT(domain) DO UPDATE SET 
                email = excluded.email, 
                cert_pem = excluded.cert_pem, 
                key_pem = excluded.key_pem, 
                account_key = excluded.account_key, 
                account_url = excluded.account_url, 
                issuer_type = excluded.issuer_type, 
                secret_name = excluded.secret_name, -- Updated field
                updated_at = CURRENT_TIMESTAMP`
	}

	_, err := s.db.ExecContext(ctx, query,
		cert.Domain,
		cert.Email,
		cert.CertPEM,
		cert.KeyPEM,
		cert.AccountKey,
		cert.AccountURL,
		cert.IssuerType,
		cert.SecretName, // New field
	)
	return err
}

// LoadCertificate retrieves a certificate resource by domain
func (s *Storage) LoadCertificate(ctx context.Context, domain string) (*CertStorage, error) {
	// Updated SELECT statement to include secret_name
	query := `SELECT email, cert_pem, key_pem, account_key, account_url, issuer_type, secret_name FROM certificates WHERE domain = $1`
	if s.driver != "postgres" {
		query = `SELECT email, cert_pem, key_pem, account_key, account_url, issuer_type, secret_name FROM certificates WHERE domain = ?`
	}

	row := s.db.QueryRowContext(ctx, query, domain)

	cert := &CertStorage{Domain: domain}
	// Updated Scan call to include &cert.SecretName
	err := row.Scan(&cert.Email, &cert.CertPEM, &cert.KeyPEM, &cert.AccountKey, &cert.AccountURL, &cert.IssuerType, &cert.SecretName)

	if errors.Is(err, sql.ErrNoRows) {
		return nil, fmt.Errorf("certificate for domain %s not found", domain)
	}
	if err != nil {
		return nil, fmt.Errorf("failed to scan certificate data for %s: %w", domain, err)
	}

	return cert, nil
}

// LoadAllCertificates retrieves all stored certificate resources
func (s *Storage) LoadAllCertificates(ctx context.Context) ([]*CertStorage, error) {
	// Updated SELECT statement to include secret_name
	query := `SELECT domain, email, cert_pem, key_pem, account_key, account_url, issuer_type, secret_name FROM certificates`

	rows, err := s.db.QueryContext(ctx, query)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var certs []*CertStorage
	for rows.Next() {
		cert := &CertStorage{}
		// Updated Scan call to include &cert.SecretName
		if err := rows.Scan(&cert.Domain, &cert.Email, &cert.CertPEM, &cert.KeyPEM, &cert.AccountKey, &cert.AccountURL, &cert.IssuerType, &cert.SecretName); err != nil {
			return nil, fmt.Errorf("failed to scan all certificate data: %w", err)
		}
		certs = append(certs, cert)
	}
	if err := rows.Err(); err != nil {
		return nil, err
	}

	return certs, nil
}

// -----------------------------------------------------------------------------
// REST OF THE ORIGINAL CODE FOLLOWS...
// -----------------------------------------------------------------------------

// SaveCluster inserts or updates a cluster
func (s *Storage) SaveCluster(ctx context.Context, cluster *clusterv3.Cluster) error {
	data, err := protojson.Marshal(cluster)
	if err != nil {
		return err
	}

	var query string
	switch s.driver {
	case "postgres":
		// Explicitly set enabled=true on update to re-enable a logically deleted cluster
		query = fmt.Sprintf(`
            INSERT INTO clusters (name, data, enabled, updated_at)
            VALUES (%s, %s, true, now())
            ON CONFLICT (name) DO UPDATE SET data = %s, enabled = true, updated_at = now()`,
			s.placeholder(1), s.placeholder(2), s.placeholder(2))
	default: // SQLite
		// Explicitly set enabled=1 on update to re-enable a logically deleted cluster
		query = `
            INSERT INTO clusters (name, data, enabled, updated_at)
            VALUES (?, ?, 1, CURRENT_TIMESTAMP)
            ON CONFLICT(name) DO UPDATE SET data=excluded.data, enabled=1, updated_at=CURRENT_TIMESTAMP`
	}

	_, err = s.db.ExecContext(ctx, query, cluster.GetName(), string(data))
	return err
}

// SaveListener inserts or updates a listener
func (s *Storage) SaveListener(ctx context.Context, listener *listenerv3.Listener) error {
	data, err := protojson.Marshal(listener)
	if err != nil {
		return err
	}

	var query string
	switch s.driver {
	case "postgres":
		// Explicitly set enabled=true on update to re-enable a logically deleted listener
		query = fmt.Sprintf(`
            INSERT INTO listeners (name, data, enabled, updated_at)
            VALUES (%s, %s, true, now())
            ON CONFLICT (name) DO UPDATE SET data = %s, enabled = true, updated_at = now()`,
			s.placeholder(1), s.placeholder(2), s.placeholder(2))
	default: // SQLite
		// Explicitly set enabled=1 on update to re-enable a logically deleted listener
		query = `
            INSERT INTO listeners (name, data, enabled, updated_at)
            VALUES (?, ?, 1, CURRENT_TIMESTAMP)
            ON CONFLICT(name) DO UPDATE SET data=excluded.data, enabled=1, updated_at=CURRENT_TIMESTAMP`
	}

	_, err = s.db.ExecContext(ctx, query, listener.GetName(), string(data))
	return err
}

// SaveSecret inserts or updates a Secret
func (s *Storage) SaveSecret(ctx context.Context, secret *secretv3.Secret) error {
	data, err := protojson.Marshal(secret)
	if err != nil {
		return err
	}

	var query string
	switch s.driver {
	case "postgres":
		// Explicitly set enabled=true on update to re-enable a logically deleted secret
		query = fmt.Sprintf(`
            INSERT INTO secrets (name, data, enabled, updated_at)
            VALUES (%s, %s, true, now())
            ON CONFLICT (name) DO UPDATE SET data = %s, enabled = true, updated_at = now()`,
			s.placeholder(1), s.placeholder(2), s.placeholder(2))
	default: // SQLite
		// Explicitly set enabled=1 on update to re-enable a logically deleted secret
		query = `
            INSERT INTO secrets (name, data, enabled, updated_at)
            VALUES (?, ?, 1, CURRENT_TIMESTAMP)
            ON CONFLICT(name) DO UPDATE SET data=excluded.data, enabled=1, updated_at=CURRENT_TIMESTAMP`
	}

	_, err = s.db.ExecContext(ctx, query, secret.GetName(), string(data))
	return err
}

// -----------------------------------------------------------------------------
// LOAD ENABLED METHODS (UNCHANGED)
// -----------------------------------------------------------------------------

// LoadEnabledClusters retrieves all enabled clusters
func (s *Storage) LoadEnabledClusters(ctx context.Context) ([]*clusterv3.Cluster, error) {
	query := `SELECT data FROM clusters`
	if s.driver == "postgres" {
		query += ` WHERE enabled = true`
	} else {
		query += ` WHERE enabled = 1`
	}

	rows, err := s.db.QueryContext(ctx, query)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var clusters []*clusterv3.Cluster
	for rows.Next() {
		var raw json.RawMessage
		if s.driver != "postgres" {
			var dataStr string
			if err := rows.Scan(&dataStr); err != nil {
				return nil, err
			}
			raw = json.RawMessage(dataStr)
		} else {
			if err := rows.Scan(&raw); err != nil {
				return nil, err
			}
		}

		var cluster clusterv3.Cluster
		if err := protojson.Unmarshal(raw, &cluster); err != nil {
			return nil, err
		}
		clusters = append(clusters, &cluster)
	}
	return clusters, nil
}

// LoadEnabledListeners retrieves all enabled listeners
func (s *Storage) LoadEnabledListeners(ctx context.Context) ([]*listenerv3.Listener, error) {
	query := `SELECT data FROM listeners`
	if s.driver == "postgres" {
		query += ` WHERE enabled = true`
	} else {
		query += ` WHERE enabled = 1`
	}

	rows, err := s.db.QueryContext(ctx, query)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var listeners []*listenerv3.Listener
	for rows.Next() {
		var raw json.RawMessage
		if s.driver != "postgres" {
			var dataStr string
			if err := rows.Scan(&dataStr); err != nil {
				return nil, err
			}
			raw = json.RawMessage(dataStr)
		} else {
			if err := rows.Scan(&raw); err != nil {
				return nil, err
			}
		}

		var l listenerv3.Listener
		if err := protojson.Unmarshal(raw, &l); err != nil {
			return nil, err
		}
		listeners = append(listeners, &l)
	}
	return listeners, nil
}

// LoadEnabledSecrets retrieves all enabled secrets
func (s *Storage) LoadEnabledSecrets(ctx context.Context) ([]*secretv3.Secret, error) {
	query := `SELECT data FROM secrets`
	if s.driver == "postgres" {
		query += ` WHERE enabled = true`
	} else {
		query += ` WHERE enabled = 1`
	}

	rows, err := s.db.QueryContext(ctx, query)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var secrets []*secretv3.Secret
	for rows.Next() {
		var raw json.RawMessage
		if s.driver != "postgres" {
			var dataStr string
			if err := rows.Scan(&dataStr); err != nil {
				return nil, err
			}
			raw = json.RawMessage(dataStr)
		} else {
			if err := rows.Scan(&raw); err != nil {
				return nil, err
			}
		}

		var secret secretv3.Secret
		if err := protojson.Unmarshal(raw, &secret); err != nil {
			return nil, err
		}
		secrets = append(secrets, &secret)
	}
	return secrets, nil
}

// -----------------------------------------------------------------------------
// LOAD ALL METHODS (UNCHANGED)
// -----------------------------------------------------------------------------

// LoadAllClusters retrieves all clusters, regardless of their enabled status
func (s *Storage) LoadAllClusters(ctx context.Context) ([]*clusterv3.Cluster, error) {
	rows, err := s.db.QueryContext(ctx, `SELECT data FROM clusters`)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var clusters []*clusterv3.Cluster
	for rows.Next() {
		var raw json.RawMessage
		if s.driver != "postgres" {
			var dataStr string
			if err := rows.Scan(&dataStr); err != nil {
				return nil, err
			}
			raw = json.RawMessage(dataStr)
		} else {
			if err := rows.Scan(&raw); err != nil {
				return nil, err
			}
		}

		var cluster clusterv3.Cluster
		if err := protojson.Unmarshal(raw, &cluster); err != nil {
			return nil, err
		}
		clusters = append(clusters, &cluster)
	}
	return clusters, nil
}

// LoadAllListeners retrieves all listeners, regardless of their enabled status
func (s *Storage) LoadAllListeners(ctx context.Context) ([]*listenerv3.Listener, error) {
	rows, err := s.db.QueryContext(ctx, `SELECT data FROM listeners`)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var listeners []*listenerv3.Listener
	for rows.Next() {
		var raw json.RawMessage
		if s.driver != "postgres" {
			var dataStr string
			if err := rows.Scan(&dataStr); err != nil {
				return nil, err
			}
			raw = json.RawMessage(dataStr)
		} else {
			if err := rows.Scan(&raw); err != nil {
				return nil, err
			}
		}

		var l listenerv3.Listener
		if err := protojson.Unmarshal(raw, &l); err != nil {
			return nil, err
		}
		listeners = append(listeners, &l)
	}
	return listeners, nil
}

// LoadAllSecrets retrieves all secrets, regardless of their enabled status
func (s *Storage) LoadAllSecrets(ctx context.Context) ([]*secretv3.Secret, error) {
	rows, err := s.db.QueryContext(ctx, `SELECT data FROM secrets`)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var secrets []*secretv3.Secret
	for rows.Next() {
		var raw json.RawMessage
		if s.driver != "postgres" {
			var dataStr string
			if err := rows.Scan(&dataStr); err != nil {
				return nil, err
			}
			raw = json.RawMessage(dataStr)
		} else {
			if err := rows.Scan(&raw); err != nil {
				return nil, err
			}
		}

		var secret secretv3.Secret
		if err := protojson.Unmarshal(raw, &secret); err != nil {
			return nil, err
		}
		secrets = append(secrets, &secret)
	}
	return secrets, nil
}

// -----------------------------------------------------------------------------
// SNAPSHOT MANAGEMENT (UNCHANGED)
// -----------------------------------------------------------------------------

// SnapshotConfig aggregates xDS resources
type SnapshotConfig struct {
	// Enabled resources (for xDS serving)
	EnabledClusters  []*clusterv3.Cluster
	EnabledListeners []*listenerv3.Listener
	EnabledSecrets   []*secretv3.Secret // New SDS resource

	// Disabled resources (for UI display)
	DisabledClusters  []*clusterv3.Cluster
	DisabledListeners []*listenerv3.Listener
	DisabledSecrets   []*secretv3.Secret // New SDS resource
}

// RebuildSnapshot rebuilds full snapshot from DB
func (s *Storage) RebuildSnapshot(ctx context.Context) (*SnapshotConfig, error) {
	// 1. Load Enabled Resources (for xDS serving)
	enabledClusters, err := s.LoadEnabledClusters(ctx)
	if err != nil {
		return nil, err
	}
	enabledListeners, err := s.LoadEnabledListeners(ctx)
	if err != nil {
		return nil, err
	}
	enabledSecrets, err := s.LoadEnabledSecrets(ctx)
	if err != nil {
		return nil, err
	}

	// 2. Load ALL Resources (for comparison and disabled set)
	allClusters, err := s.LoadAllClusters(ctx)
	if err != nil {
		return nil, err
	}
	allListeners, err := s.LoadAllListeners(ctx)
	if err != nil {
		return nil, err
	}
	allSecrets, err := s.LoadAllSecrets(ctx)
	if err != nil {
		return nil, err
	}

	// 3. Separate Disabled Resources

	// Clusters
	enabledClusterNames := make(map[string]struct{}, len(enabledClusters))
	for _, c := range enabledClusters {
		enabledClusterNames[c.GetName()] = struct{}{}
	}
	var disabledClusters []*clusterv3.Cluster
	for _, c := range allClusters {
		if _, found := enabledClusterNames[c.GetName()]; !found {
			disabledClusters = append(disabledClusters, c)
		}
	}

	// Listeners
	enabledListenerNames := make(map[string]struct{}, len(enabledListeners))
	for _, l := range enabledListeners {
		enabledListenerNames[l.GetName()] = struct{}{}
	}
	var disabledListeners []*listenerv3.Listener
	for _, l := range allListeners {
		if _, found := enabledListenerNames[l.GetName()]; !found {
			disabledListeners = append(disabledListeners, l)
		}
	}

	// Secrets
	enabledSecretNames := make(map[string]struct{}, len(enabledSecrets))
	for _, sec := range enabledSecrets {
		enabledSecretNames[sec.GetName()] = struct{}{}
	}
	var disabledSecrets []*secretv3.Secret
	for _, sec := range allSecrets {
		if _, found := enabledSecretNames[sec.GetName()]; !found {
			disabledSecrets = append(disabledSecrets, sec)
		}
	}

	return &SnapshotConfig{
		EnabledClusters:   enabledClusters,
		EnabledListeners:  enabledListeners,
		EnabledSecrets:    enabledSecrets,
		DisabledClusters:  disabledClusters,
		DisabledListeners: disabledListeners,
		DisabledSecrets:   disabledSecrets,
	}, nil
}

// SaveSnapshot saves the entire snapshot to the DB
func (s *Storage) SaveSnapshot(ctx context.Context, cfg *SnapshotConfig, strategy DeleteStrategy) error {
	if cfg == nil {
		return fmt.Errorf("SnapshotConfig is nil")
	}

	// Use a transaction for atomicity
	tx, err := s.db.BeginTx(ctx, nil)
	if err != nil {
		return fmt.Errorf("failed to begin transaction: %w", err)
	}
	defer func() {
		if err != nil {
			tx.Rollback()
			return
		}
		err = tx.Commit()
	}()

	// --- 1. Save/Upsert Clusters and Collect Names ---
	clusterNames := make([]string, 0, len(cfg.EnabledClusters))
	for _, c := range cfg.EnabledClusters {
		// NOTE: This uses the existing SaveCluster which doesn't use the transaction 'tx'
		if err = s.SaveCluster(ctx, c); err != nil {
			return fmt.Errorf("failed to save cluster %s: %w", c.GetName(), err)
		}
		clusterNames = append(clusterNames, c.GetName())
	}

	// --- 2. Save/Upsert Listeners and Collect Names ---
	listenerNames := make([]string, 0, len(cfg.EnabledListeners))
	for _, l := range cfg.EnabledListeners {
		// NOTE: This uses the existing SaveListener which doesn't use the transaction 'tx'
		if err = s.SaveListener(ctx, l); err != nil {
			return fmt.Errorf("failed to save listener %s: %w", l.GetName(), err)
		}
		listenerNames = append(listenerNames, l.GetName())
	}

	// --- 3. Save/Upsert Secrets and Collect Names ---
	secretNames := make([]string, 0, len(cfg.EnabledSecrets))
	for _, sec := range cfg.EnabledSecrets {
		// NOTE: This uses the existing SaveSecret which doesn't use the transaction 'tx'
		if err = s.SaveSecret(ctx, sec); err != nil {
			return fmt.Errorf("failed to save secret %s: %w", sec.GetName(), err)
		}
		secretNames = append(secretNames, sec.GetName())
	}

	// --- 4. Apply Deletion Strategy ---
	switch strategy {
	case DeleteLogical:
		// Logical Delete (Disable) for all resource types
		if err = s.disableMissingResources(ctx, "clusters", clusterNames); err != nil {
			return fmt.Errorf("failed to logically delete missing clusters: %w", err)
		}
		if err = s.disableMissingResources(ctx, "listeners", listenerNames); err != nil {
			return fmt.Errorf("failed to logically delete missing listeners: %w", err)
		}
		if err = s.disableMissingResources(ctx, "secrets", secretNames); err != nil {
			return fmt.Errorf("failed to logically delete missing secrets: %w", err)
		}

	case DeleteActual:
		// Actual Delete (Physical Removal) for all resources
		if err = s.deleteMissingResources(ctx, "clusters", clusterNames); err != nil {
			return fmt.Errorf("failed to physically delete missing clusters: %w", err)
		}
		if err = s.deleteMissingResources(ctx, "listeners", listenerNames); err != nil {
			return fmt.Errorf("failed to physically delete missing listeners: %w", err)
		}
		if err = s.deleteMissingResources(ctx, "secrets", secretNames); err != nil {
			return fmt.Errorf("failed to physically delete missing secrets: %w", err)
		}

	case DeleteNone:
		// Do nothing for missing resources
		return nil
	}

	return err
}

// -----------------------------------------------------------------------------
// ENABLE/DISABLE & DELETE METHODS (UNCHANGED)
// -----------------------------------------------------------------------------

// EnableCluster toggles a cluster
func (s *Storage) EnableCluster(ctx context.Context, name string, enabled bool) error {
	query := `UPDATE clusters SET enabled = ?, updated_at = CURRENT_TIMESTAMP WHERE name = ?`
	if s.driver == "postgres" {
		query = `UPDATE clusters SET enabled = $1, updated_at = now() WHERE name = $2`
	}
	_, err := s.db.ExecContext(ctx, query, enabled, name)
	return err
}

// EnableListener toggles a listener
func (s *Storage) EnableListener(ctx context.Context, name string, enabled bool) error {
	query := `UPDATE listeners SET enabled = ?, updated_at = CURRENT_TIMESTAMP WHERE name = ?`
	if s.driver == "postgres" {
		query = `UPDATE listeners SET enabled = $1, updated_at = now() WHERE name = $2`
	}
	_, err := s.db.ExecContext(ctx, query, enabled, name)
	return err
}

// EnableSecret toggles a secret
func (s *Storage) EnableSecret(ctx context.Context, name string, enabled bool) error {
	query := `UPDATE secrets SET enabled = ?, updated_at = CURRENT_TIMESTAMP WHERE name = ?`
	if s.driver == "postgres" {
		query = `UPDATE secrets SET enabled = $1, updated_at = now() WHERE name = $2`
	}
	_, err := s.db.ExecContext(ctx, query, enabled, name)
	return err
}

// RemoveListener deletes a listener by name
func (s *Storage) RemoveListener(ctx context.Context, name string) error {
	query := `DELETE FROM listeners WHERE name = ?`
	if s.driver == "postgres" {
		query = `DELETE FROM listeners WHERE name = $1`
	}
	_, err := s.db.ExecContext(ctx, query, name)
	return err
}

// RemoveCluster deletes a cluster by name
func (s *Storage) RemoveCluster(ctx context.Context, name string) error {
	query := `DELETE FROM clusters WHERE name = ?`
	if s.driver == "postgres" {
		query = `DELETE FROM clusters WHERE name = $1`
	}
	_, err := s.db.ExecContext(ctx, query, name)
	return err
}

// RemoveSecret deletes a secret by name
func (s *Storage) RemoveSecret(ctx context.Context, name string) error {
	query := `DELETE FROM secrets WHERE name = ?`
	if s.driver == "postgres" {
		query = `DELETE FROM secrets WHERE name = $1`
	}
	_, err := s.db.ExecContext(ctx, query, name)
	return err
}

// disableMissingResources updates the 'enabled' status for resources in 'table'
// whose 'name' is NOT in 'inputNames'.
func (s *Storage) disableMissingResources(ctx context.Context, table string, inputNames []string) error {
	if table != "clusters" && table != "listeners" && table != "secrets" {
		return fmt.Errorf("logical delete (disable) is only supported for tables with an 'enabled' column (clusters, listeners, secrets)")
	}

	// 1. Build placeholders and args
	placeholders := make([]string, len(inputNames))
	args := make([]interface{}, len(inputNames))
	for i, name := range inputNames {
		if s.driver == "postgres" {
			placeholders[i] = fmt.Sprintf("$%d", i+1)
		} else {
			placeholders[i] = "?"
		}
		args[i] = name
	}

	disabledValue := "false"
	if s.driver != "postgres" {
		disabledValue = "0"
	}

	var updateTime string
	if s.driver == "postgres" {
		updateTime = "now()"
	} else {
		updateTime = "CURRENT_TIMESTAMP"
	}

	// If no names are provided, disable ALL currently enabled resources
	whereClause := ""
	if len(inputNames) > 0 {
		whereClause = fmt.Sprintf("WHERE name NOT IN (%s)", strings.Join(placeholders, ", "))
	}

	// 2. Construct and execute the UPDATE query
	query := fmt.Sprintf(`
        UPDATE %s
        SET enabled = %s, updated_at = %s
        %s`,
		table, disabledValue, updateTime, whereClause)

	_, err := s.db.ExecContext(ctx, query, args...)
	return err
}

// deleteMissingResources physically deletes resources from 'table' whose 'name' is NOT in 'inputNames'.
func (s *Storage) deleteMissingResources(ctx context.Context, table string, inputNames []string) error {
	if table != "clusters" && table != "listeners" && table != "secrets" {
		return fmt.Errorf("physical delete is only supported for tables: clusters, listeners, secrets")
	}

	// 1. Build placeholders and args
	placeholders := make([]string, len(inputNames))
	args := make([]interface{}, len(inputNames))
	for i, name := range inputNames {
		if s.driver == "postgres" {
			placeholders[i] = fmt.Sprintf("$%d", i+1)
		} else {
			placeholders[i] = "?"
		}
		args[i] = name
	}

	// If no names are provided, delete ALL resources
	whereClause := ""
	if len(inputNames) > 0 {
		whereClause = fmt.Sprintf("WHERE name NOT IN (%s)", strings.Join(placeholders, ", "))
	}

	// 2. Construct and execute the DELETE query
	query := fmt.Sprintf(`
        DELETE FROM %s
        %s`,
		table, whereClause)

	_, err := s.db.ExecContext(ctx, query, args...)
	return err
}