Newer
Older
EnvoyControlPlane / internal / pkg / snapshot / resource_config.go
package snapshot

import (
	"context"
	"fmt"
	"time"

	clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
	listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
	secretv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" // ADDED: SDS Secret Import
	"github.com/envoyproxy/go-control-plane/pkg/cache/types"
	cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
	resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3"

	internalapi "envoy-control-plane/internal/api"
	"envoy-control-plane/internal/pkg/storage"
)

// SetSnapshotFromConfig sets a snapshot from an aggregated SnapshotConfig
func (sm *SnapshotManager) SetSnapshotFromConfig(ctx context.Context, version string, cfg *storage.SnapshotConfig) error {
	if cfg == nil {
		return fmt.Errorf("snapshot config is nil")
	}

	// Ensure version is not empty
	if version == "" {
		version = fmt.Sprintf("snap-%d", time.Now().UnixNano())
	}

	// Build the resource map expected by cachev3.NewSnapshot
	resources := map[resourcev3.Type][]types.Resource{
		resourcev3.ClusterType:  make([]types.Resource, len(cfg.EnabledClusters)),
		resourcev3.ListenerType: make([]types.Resource, len(cfg.EnabledListeners)),
		resourcev3.SecretType:   make([]types.Resource, len(cfg.EnabledSecrets)), // ADDED: SecretType
	}

	// Populate slices by direct type assertion and conversion
	for i, c := range cfg.EnabledClusters {
		resources[resourcev3.ClusterType][i] = c
	}
	for i, l := range cfg.EnabledListeners {
		resources[resourcev3.ListenerType][i] = l
	}
	// ADDED: Populate Secrets
	for i, s := range cfg.EnabledSecrets {
		resources[resourcev3.SecretType][i] = s
	}

	// Create the snapshot
	snap, err := cachev3.NewSnapshot(version, resources)
	if err != nil {
		return fmt.Errorf("failed to create snapshot: %w", err)
	}

	// Apply snapshot to the cache
	if err := sm.Cache.SetSnapshot(ctx, sm.NodeID, snap); err != nil {
		return fmt.Errorf("failed to set snapshot: %w", err)
	}

	return nil
}

// SnapshotToConfig converts current cache snapshot into SnapshotConfig
func (sm *SnapshotManager) SnapshotToConfig(ctx context.Context, nodeID string) (*storage.SnapshotConfig, error) {
	snap, err := sm.Cache.GetSnapshot(nodeID)
	if err != nil {
		return nil, fmt.Errorf("failed to get snapshot for node %s: %w", nodeID, err)
	}

	config := &storage.SnapshotConfig{
		EnabledClusters:  []*clusterv3.Cluster{},
		EnabledListeners: []*listenerv3.Listener{},
		EnabledSecrets:   []*secretv3.Secret{}, // ADDED: EnabledSecrets
		// Disabled fields are not populated from the cache, only enabled ones.
	}

	// Convert Cluster resources
	for _, r := range snap.GetResources(string(resourcev3.ClusterType)) {
		if c, ok := r.(*clusterv3.Cluster); ok {
			config.EnabledClusters = append(config.EnabledClusters, c)
		}
	}

	// Convert Listener resources
	for _, r := range snap.GetResources(string(resourcev3.ListenerType)) {
		if l, ok := r.(*listenerv3.Listener); ok {
			config.EnabledListeners = append(config.EnabledListeners, l)
		}
	}

	// ADDED: Convert Secret resources
	for _, r := range snap.GetResources(string(resourcev3.SecretType)) {
		if s, ok := r.(*secretv3.Secret); ok {
			config.EnabledSecrets = append(config.EnabledSecrets, s)
		}
	}

	return config, nil
}

// LoadSnapshotFromDB implements the sync DB data to cache. It rebuilds the snapshot
// from the persistent store and updates the Envoy cache.
func (sm *SnapshotManager) LoadSnapshotFromDB(ctx context.Context) error {
	fmt.Println("Loading configuration from main DB...")

	// 1. Try Database (Primary Source)
	cfg, err := sm.DB.RebuildSnapshot(ctx)
	if err != nil {
		return fmt.Errorf("failed to rebuild snapshot from DB: %w", err)
	}

	fmt.Println("Loaded configuration from DB. Updating Envoy Cache.")

	// 2. Update Envoy's in-memory cache (This is the 'flash cache' layer)
	return sm.SetSnapshotFromConfig(ctx, "db-sync-"+time.Now().Format(time.RFC3339), cfg)
}

// FlushCacheToDB saves the current in-memory Envoy snapshot (the source of truth)
// to the persistent DB. This implements the "flash cache to db" write-through.
func (sm *SnapshotManager) FlushCacheToDB(ctx context.Context, strategy storage.DeleteStrategy) error {
	// 1. Get current Envoy snapshot as SnapshotConfig
	cfg, err := sm.SnapshotToConfig(ctx, sm.NodeID)
	if err != nil {
		return fmt.Errorf("failed to convert snapshot to config: %w", err)
	}

	// 2. Save to Persistent DB
	// Note: DB.SaveSnapshot handles insert/update logic for all resources
	if err := sm.DB.SaveSnapshot(ctx, cfg, strategy); err != nil {
		return fmt.Errorf("failed to save config to DB: %w", err)
	}
	fmt.Println("Successfully saved to Persistent DB.")

	return nil
}

// EnableResourceFromDB fetches a logically disabled resource from the DB and
// flips its status to enabled, then adds it back to the cache.
func (sm *SnapshotManager) EnableResourceFromDB(name string, typ resourcev3.Type) error {
	ctx := context.Background()
	switch typ {
	case resourcev3.ClusterType:
		if err := sm.DB.EnableCluster(ctx, name, true); err != nil {
			return fmt.Errorf("failed to enable cluster '%s' in DB: %w", name, err)
		}
	case resourcev3.ListenerType:
		if err := sm.DB.EnableListener(ctx, name, true); err != nil {
			return fmt.Errorf("failed to enable listener '%s' in DB: %w", name, err)
		}
	case resourcev3.SecretType: // ADDED: SecretType
		if err := sm.DB.EnableSecret(ctx, name, true); err != nil {
			return fmt.Errorf("failed to enable secret '%s' in DB: %w", name, err)
		}
	default:
		return fmt.Errorf("unsupported resource type for enabling: %s", typ)
	}

	// Reload snapshot from DB to update the cache with the newly enabled resource
	return sm.LoadSnapshotFromDB(ctx)
}

// CheckCacheDBConsistency compares the currently active Envoy cache snapshot
// against the enabled resources in the persistent DB.
func (sm *SnapshotManager) CheckCacheDBConsistency(ctx context.Context) (*internalapi.ConsistencyReport, error) {
	report := &internalapi.ConsistencyReport{
		CacheOnly: make(map[resourcev3.Type][]string),
		DBOnly:    make(map[resourcev3.Type][]string),
	}

	// 1. Get current cache snapshot
	cacheConfig, err := sm.SnapshotToConfig(ctx, sm.NodeID)
	if err != nil {
		return nil, fmt.Errorf("failed to get snapshot from cache: %w", err)
	}

	// 2. Rebuild snapshot from DB (only fetches *enabled* resources from DB)
	dbConfig, err := sm.DB.RebuildSnapshot(ctx)
	if err != nil {
		return nil, fmt.Errorf("failed to rebuild snapshot from DB: %w", err)
	}

	// Helper to build a set (map[string]struct{}) of resource names for faster lookups
	buildNameSet := func(resources []ResourceNamer) map[string]struct{} {
		set := make(map[string]struct{}, len(resources))
		for _, r := range resources {
			set[r.GetName()] = struct{}{}
		}
		return set
	}

	// Map of resource types to their lists in SnapshotConfig
	typeResourceMaps := []struct {
		typ       resourcev3.Type
		cacheList []ResourceNamer
		dbList    []ResourceNamer
	}{
		{resourcev3.ClusterType, resourcesToNamers(cacheConfig.EnabledClusters), resourcesToNamers(dbConfig.EnabledClusters)},
		{resourcev3.ListenerType, resourcesToNamers(cacheConfig.EnabledListeners), resourcesToNamers(dbConfig.EnabledListeners)},
		{resourcev3.SecretType, resourcesToNamers(cacheConfig.EnabledSecrets), resourcesToNamers(dbConfig.EnabledSecrets)}, // ADDED: SecretType
	}

	for _, m := range typeResourceMaps {
		cacheSet := buildNameSet(m.cacheList)
		dbSet := buildNameSet(m.dbList)

		// Check for Cache-only resources (present in cacheSet but not in dbSet)
		for cacheName := range cacheSet {
			if _, existsInDB := dbSet[cacheName]; !existsInDB {
				report.CacheOnly[m.typ] = append(report.CacheOnly[m.typ], cacheName)
				report.Inconsistent = true
			}
		}

		// Check for DB-only resources (present in dbSet but not in cacheSet)
		for dbName := range dbSet {
			if _, existsInCache := cacheSet[dbName]; !existsInCache {
				report.DBOnly[m.typ] = append(report.DBOnly[m.typ], dbName)
				report.Inconsistent = true
			}
		}
	}

	return report, nil
}

// ListResources returns all enabled and disabled resources of a given type from the DB.
func (sm *SnapshotManager) ListResources(typ resourcev3.Type) ([]types.Resource, []types.Resource, error) {
	snap, err := sm.DB.RebuildSnapshot(context.Background())
	if err != nil {
		return nil, nil, fmt.Errorf("failed to rebuild snapshot from DB: %w", err)
	}

	var namerEnabled, namerDisabled []ResourceNamer

	switch typ {
	case resourcev3.ClusterType:
		namerEnabled = resourcesToNamers(snap.EnabledClusters)
		namerDisabled = resourcesToNamers(snap.DisabledClusters)
	case resourcev3.ListenerType:
		namerEnabled = resourcesToNamers(snap.EnabledListeners)
		namerDisabled = resourcesToNamers(snap.DisabledListeners)
	case resourcev3.SecretType: // ADDED: SecretType
		namerEnabled = resourcesToNamers(snap.EnabledSecrets)
		namerDisabled = resourcesToNamers(snap.DisabledSecrets)
	default:
		return nil, nil, fmt.Errorf("unsupported resource type: %s", typ)
	}

	// Convert ResourceNamer slices back to types.Resource slices
	enabled := make([]types.Resource, len(namerEnabled))
	for i, r := range namerEnabled {
		enabled[i] = r.(types.Resource)
	}

	disabled := make([]types.Resource, len(namerDisabled))
	for i, r := range namerDisabled {
		disabled[i] = r.(types.Resource)
	}

	return enabled, disabled, nil
}

// resourcesToNamers converts a slice of proto-generated resource pointers
// (like []*clusterv3.Cluster) to a slice of the generic ResourceNamer interface.
// This is necessary because structs like *clusterv3.Cluster don't explicitly
// implement types.Resource, but are compatible with it and ResourceNamer.
func resourcesToNamers[T ResourceNamer](list []T) []ResourceNamer {
	out := make([]ResourceNamer, len(list))
	for i, item := range list {
		out[i] = item
	}
	return out
}