package snapshot import ( "context" "fmt" "time" clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" internalapi "envoy-control-plane/internal/api" "envoy-control-plane/internal/storage" ) // SetSnapshotFromConfig sets a snapshot from an aggregated SnapshotConfig func (sm *SnapshotManager) SetSnapshotFromConfig(ctx context.Context, version string, cfg *storage.SnapshotConfig) error { if cfg == nil { return fmt.Errorf("snapshot config is nil") } // Ensure version is not empty if version == "" { version = fmt.Sprintf("snap-%d", time.Now().UnixNano()) } // Build the resource map expected by cachev3.NewSnapshot resources := map[resourcev3.Type][]types.Resource{ resourcev3.ClusterType: make([]types.Resource, len(cfg.EnabledClusters)), resourcev3.ListenerType: make([]types.Resource, len(cfg.EnabledListeners)), // Other types if supported by SnapshotConfig, can be added here } // Populate slices by direct type assertion and conversion for i, c := range cfg.EnabledClusters { resources[resourcev3.ClusterType][i] = c } for i, l := range cfg.EnabledListeners { resources[resourcev3.ListenerType][i] = l } // Create the snapshot snap, err := cachev3.NewSnapshot(version, resources) if err != nil { return fmt.Errorf("failed to create snapshot: %w", err) } // Apply snapshot to the cache if err := sm.Cache.SetSnapshot(ctx, sm.NodeID, snap); err != nil { return fmt.Errorf("failed to set snapshot: %w", err) } return nil } // SnapshotToConfig converts current cache snapshot into SnapshotConfig func (sm *SnapshotManager) SnapshotToConfig(ctx context.Context, nodeID string) (*storage.SnapshotConfig, error) { snap, err := sm.Cache.GetSnapshot(nodeID) if err != nil { return nil, fmt.Errorf("failed to get snapshot for node %s: %w", nodeID, err) } config := &storage.SnapshotConfig{ EnabledClusters: []*clusterv3.Cluster{}, EnabledListeners: []*listenerv3.Listener{}, // Disabled fields are not populated from the cache, only enabled ones. } // Convert Cluster resources for _, r := range snap.GetResources(string(resourcev3.ClusterType)) { if c, ok := r.(*clusterv3.Cluster); ok { config.EnabledClusters = append(config.EnabledClusters, c) } } // Convert Listener resources for _, r := range snap.GetResources(string(resourcev3.ListenerType)) { if l, ok := r.(*listenerv3.Listener); ok { config.EnabledListeners = append(config.EnabledListeners, l) } } return config, nil } // LoadSnapshotFromDB implements the sync DB data to cache. It rebuilds the snapshot // from the persistent store and updates the Envoy cache. func (sm *SnapshotManager) LoadSnapshotFromDB(ctx context.Context) error { fmt.Println("Loading configuration from main DB...") // 1. Try Database (Primary Source) cfg, err := sm.DB.RebuildSnapshot(ctx) if err != nil { return fmt.Errorf("failed to rebuild snapshot from DB: %w", err) } fmt.Println("Loaded configuration from DB. Updating Envoy Cache.") // 2. Update Envoy's in-memory cache (This is the 'flash cache' layer) return sm.SetSnapshotFromConfig(ctx, "db-sync-"+time.Now().Format(time.RFC3339), cfg) } // FlushCacheToDB saves the current in-memory Envoy snapshot (the source of truth) // to the persistent DB. This implements the "flash cache to db" write-through. func (sm *SnapshotManager) FlushCacheToDB(ctx context.Context, strategy storage.DeleteStrategy) error { // 1. Get current Envoy snapshot as SnapshotConfig cfg, err := sm.SnapshotToConfig(ctx, sm.NodeID) if err != nil { return fmt.Errorf("failed to convert snapshot to config: %w", err) } // 2. Save to Persistent DB // Note: DB.SaveSnapshot handles insert/update logic for all resources if err := sm.DB.SaveSnapshot(ctx, cfg, strategy); err != nil { return fmt.Errorf("failed to save config to DB: %w", err) } fmt.Println("Successfully saved to Persistent DB.") return nil } // EnableResourceFromDB fetches a logically disabled resource from the DB and // flips its status to enabled, then adds it back to the cache. func (sm *SnapshotManager) EnableResourceFromDB(name string, typ resourcev3.Type) error { ctx := context.Background() switch typ { case resourcev3.ClusterType: if err := sm.DB.EnableCluster(ctx, name, true); err != nil { return fmt.Errorf("failed to enable cluster '%s' in DB: %w", name, err) } case resourcev3.ListenerType: if err := sm.DB.EnableListener(ctx, name, true); err != nil { return fmt.Errorf("failed to enable listener '%s' in DB: %w", name, err) } default: return fmt.Errorf("unsupported resource type for enabling: %s", typ) } // Reload snapshot from DB to update the cache with the newly enabled resource return sm.LoadSnapshotFromDB(ctx) } // CheckCacheDBConsistency compares the currently active Envoy cache snapshot // against the enabled resources in the persistent DB. func (sm *SnapshotManager) CheckCacheDBConsistency(ctx context.Context) (*internalapi.ConsistencyReport, error) { report := &internalapi.ConsistencyReport{ CacheOnly: make(map[resourcev3.Type][]string), DBOnly: make(map[resourcev3.Type][]string), } // 1. Get current cache snapshot cacheConfig, err := sm.SnapshotToConfig(ctx, sm.NodeID) if err != nil { return nil, fmt.Errorf("failed to get snapshot from cache: %w", err) } // 2. Rebuild snapshot from DB (only fetches *enabled* resources from DB) dbConfig, err := sm.DB.RebuildSnapshot(ctx) if err != nil { return nil, fmt.Errorf("failed to rebuild snapshot from DB: %w", err) } // Helper to build a set (map[string]struct{}) of resource names for faster lookups buildNameSet := func(resources []ResourceNamer) map[string]struct{} { set := make(map[string]struct{}, len(resources)) for _, r := range resources { set[r.GetName()] = struct{}{} } return set } // Map of resource types to their lists in SnapshotConfig typeResourceMaps := []struct { typ resourcev3.Type cacheList []ResourceNamer dbList []ResourceNamer }{ {resourcev3.ClusterType, resourcesToNamers(cacheConfig.EnabledClusters), resourcesToNamers(dbConfig.EnabledClusters)}, {resourcev3.ListenerType, resourcesToNamers(cacheConfig.EnabledListeners), resourcesToNamers(dbConfig.EnabledListeners)}, } for _, m := range typeResourceMaps { cacheSet := buildNameSet(m.cacheList) dbSet := buildNameSet(m.dbList) // Check for Cache-only resources (present in cacheSet but not in dbSet) for cacheName := range cacheSet { if _, existsInDB := dbSet[cacheName]; !existsInDB { report.CacheOnly[m.typ] = append(report.CacheOnly[m.typ], cacheName) report.Inconsistent = true } } // Check for DB-only resources (present in dbSet but not in cacheSet) for dbName := range dbSet { if _, existsInCache := cacheSet[dbName]; !existsInCache { report.DBOnly[m.typ] = append(report.DBOnly[m.typ], dbName) report.Inconsistent = true } } } return report, nil } // ListResources returns all enabled and disabled resources of a given type from the DB. func (sm *SnapshotManager) ListResources(typ resourcev3.Type) ([]types.Resource, []types.Resource, error) { snap, err := sm.DB.RebuildSnapshot(context.Background()) if err != nil { return nil, nil, fmt.Errorf("failed to rebuild snapshot from DB: %w", err) } var enabled, disabled []types.Resource var namerEnabled, namerDisabled []ResourceNamer switch typ { case resourcev3.ClusterType: namerEnabled = resourcesToNamers(snap.EnabledClusters) namerDisabled = resourcesToNamers(snap.DisabledClusters) case resourcev3.ListenerType: namerEnabled = resourcesToNamers(snap.EnabledListeners) namerDisabled = resourcesToNamers(snap.DisabledListeners) default: return nil, nil, fmt.Errorf("unsupported resource type: %s", typ) } // Convert ResourceNamer slices back to types.Resource slices enabled = make([]types.Resource, len(namerEnabled)) for i, r := range namerEnabled { enabled[i] = r.(types.Resource) } disabled = make([]types.Resource, len(namerDisabled)) for i, r := range namerDisabled { disabled[i] = r.(types.Resource) } return enabled, disabled, nil } // resourcesToNamers converts a slice of proto-generated resource pointers // (like []*clusterv3.Cluster) to a slice of the generic ResourceNamer interface. // This is necessary because structs like *clusterv3.Cluster don't explicitly // implement types.Resource, but are compatible with it and ResourceNamer. func resourcesToNamers[T ResourceNamer](list []T) []ResourceNamer { out := make([]ResourceNamer, len(list)) for i, item := range list { out[i] = item } return out }