package internal import ( "context" "encoding/json" "fmt" "os" "time" clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" // Ensure all standard filters are imported for proto unmarshalling _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/jwt_authn/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/lua/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/oauth2/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3" _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/listener/tls_inspector/v3" _ "github.com/envoyproxy/go-control-plane/envoy/service/runtime/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "google.golang.org/protobuf/encoding/protojson" yaml "gopkg.in/yaml.v3" internalapi "envoy-control-plane/internal/api" internallog "envoy-control-plane/internal/log" ) // ResourceNamer is an interface implemented by all xDS resources with a GetName() method. type ResourceNamer interface { GetName() string } // SnapshotManager wraps a SnapshotCache and provides file loading/modifying type SnapshotManager struct { Cache cachev3.SnapshotCache NodeID string DB *Storage // Assuming Storage is defined elsewhere and has RebuildSnapshot/LoadAll* methods } func NewSnapshotManager(cache cachev3.SnapshotCache, nodeID string, db *Storage) *SnapshotManager { return &SnapshotManager{ Cache: cache, NodeID: nodeID, DB: db, } } // YamlResources is a helper struct to unmarshal the common Envoy YAML file structure type YamlResources struct { Resources []yaml.Node `yaml:"resources"` } // unmarshalYamlNodeToProto takes a single YAML/JSON object (represented as a map[string]interface{}) // and unmarshals it into the given Protobuf resource pointer using protojson. func unmarshalYamlNodeToProto(node map[string]interface{}, resource types.Resource) error { // 1. We must remove the standard Protobuf type marker (if present) before marshaling to JSON, // as it can interfere with direct Go struct marshaling. delete(node, "@type") // 2. Marshal the generic map into JSON bytes. jsonBytes, err := json.Marshal(node) if err != nil { return fmt.Errorf("failed to marshal resource node to JSON: %w", err) } // 3. Unmarshal the JSON bytes into the target Protobuf struct. // protojson correctly handles field names (snake_case vs camelCase) and Any fields. if err := protojson.Unmarshal(jsonBytes, resource); err != nil { return fmt.Errorf("failed to unmarshal into proto: %w", err) } return nil } func (sm *SnapshotManager) LoadSnapshotFromFile(context context.Context, filePath string) (map[resourcev3.Type][]types.Resource, error) { log := internallog.LogFromContext(context) // Read the file data, err := os.ReadFile(filePath) if err != nil { return nil, fmt.Errorf("failed to read file: %w", err) } var raw interface{} if err := yaml.Unmarshal(data, &raw); err != nil { return nil, fmt.Errorf("failed to unmarshal YAML/JSON file %s: %w", filePath, err) } resources := make(map[resourcev3.Type][]types.Resource) var walk func(node interface{}) error walk = func(node interface{}) error { switch v := node.(type) { case map[string]interface{}: if typStr, ok := v["@type"].(string); ok { typ := resourcev3.Type(typStr) // only process known top-level xDS resources var resource types.Resource var newResource bool switch typ { case resourcev3.ClusterType: resource = &clusterv3.Cluster{} newResource = true case resourcev3.ListenerType: resource = &listenerv3.Listener{} newResource = true // ... other types ... default: log.Warnf("unsupported resource type: %s", typ) // Skip nested or unsupported types } if newResource { // *** REPLACED: json.Marshal and protojson.Unmarshal logic *** if err := unmarshalYamlNodeToProto(v, resource); err != nil { return fmt.Errorf("failed to unmarshal %s from file: %w", typ, err) } resources[typ] = append(resources[typ], resource) } } // recurse into children for _, child := range v { if err := walk(child); err != nil { return err } } case []interface{}: for _, item := range v { if err := walk(item); err != nil { return err } } } return nil } if err := walk(raw); err != nil { return nil, err } return resources, nil } // LoadFilterChainFromYAML unmarshals a YAML string representing an Envoy Listener FilterChain // configuration into a listenerv3.FilterChain protobuf message using protojson pipeline. func (sm *SnapshotManager) LoadFilterChainFromYAML(ctx context.Context, yamlStr string) (*listenerv3.FilterChain, error) { log := internallog.LogFromContext(ctx) // 1. Unmarshal YAML into a generic Go map var rawChainMap map[string]interface{} if err := yaml.Unmarshal([]byte(yamlStr), &rawChainMap); err != nil { log.Errorf("Failed to unmarshal YAML: %v", err) return nil, fmt.Errorf("failed to unmarshal YAML into generic map: %w", err) } if rawChainMap == nil { return nil, fmt.Errorf("failed to unmarshal YAML: input was empty or invalid") } // 2. Unmarshal the generic map into the Protobuf struct using the helper rawChain := &listenerv3.FilterChain{} if err := unmarshalYamlNodeToProto(rawChainMap, rawChain); err != nil { return nil, fmt.Errorf("failed to unmarshal YAML into FilterChain using protojson: %w", err) } // Check if the FilterChain contains any filters (optional but good sanity check) if len(rawChain.Filters) == 0 { return nil, fmt.Errorf("filter chain loaded but contains no network filters") } // Return the single FilterChain object. return rawChain, nil } // AppendFilterChainToListener loads the current listener from the cache, appends the provided // FilterChain to its list of FilterChains, and updates the cache with the new snapshot. func (sm *SnapshotManager) AppendFilterChainToListener(ctx context.Context, listenerName string, newFilterChain *listenerv3.FilterChain) error { log := internallog.LogFromContext(ctx) // 1. Get the current Listener from the cache resource, err := sm.GetResourceFromCache(listenerName, resourcev3.ListenerType) if err != nil { return fmt.Errorf("failed to get listener '%s' from cache: %w", listenerName, err) } listener, ok := resource.(*listenerv3.Listener) if !ok { return fmt.Errorf("resource '%s' is not a Listener type", listenerName) } // 2. Append the new FilterChain to the listener's list of filter chains. // This is the core change: appending a *FilterChain, not just []*Filter. listener.FilterChains = append(listener.FilterChains, newFilterChain) log.Infof("Appended new filter chain (match: %v) to listener '%s'", newFilterChain.FilterChainMatch, listenerName) // 3. Create a new snapshot with the modified listener (rest of logic remains similar) snap, err := sm.Cache.GetSnapshot(sm.NodeID) if err != nil { return fmt.Errorf("failed to get snapshot for modification: %w", err) } // Get all current resources resources := sm.getAllResourcesFromSnapshot(snap) // Replace the old listener with the modified one listenerList, ok := resources[resourcev3.ListenerType] if !ok { return fmt.Errorf("listener resource type not present in snapshot") } foundAndReplaced := false for i, res := range listenerList { if namer, ok := res.(interface{ GetName() string }); ok && namer.GetName() == listenerName { listenerList[i] = listener foundAndReplaced = true break } } if !foundAndReplaced { return fmt.Errorf("failed to locate listener '%s' in current resource list for replacement", listenerName) } // Create and set the new snapshot version := fmt.Sprintf("listener-update-%s-%d", listenerName, time.Now().UnixNano()) newSnap, err := cachev3.NewSnapshot(version, resources) if err != nil { return fmt.Errorf("failed to create new snapshot: %w", err) } if err := sm.Cache.SetSnapshot(ctx, sm.NodeID, newSnap); err != nil { return fmt.Errorf("failed to set new snapshot: %w", err) } log.Infof("Successfully updated listener '%s' in cache with new filter chain.", listenerName) return nil } // SetSnapshotFromConfig sets a snapshot from an aggregated SnapshotConfig func (sm *SnapshotManager) SetSnapshotFromConfig(ctx context.Context, version string, cfg *SnapshotConfig) error { if cfg == nil { return fmt.Errorf("snapshot config is nil") } // Ensure version is not empty if version == "" { version = fmt.Sprintf("snap-%d", time.Now().UnixNano()) } // Build the resource map expected by cachev3.NewSnapshot resources := map[resourcev3.Type][]types.Resource{ resourcev3.ClusterType: make([]types.Resource, len(cfg.EnabledClusters)), resourcev3.ListenerType: make([]types.Resource, len(cfg.EnabledListeners)), // Other types if supported by SnapshotConfig, can be added here } // Populate slices by direct type assertion and conversion for i, c := range cfg.EnabledClusters { resources[resourcev3.ClusterType][i] = c } for i, l := range cfg.EnabledListeners { resources[resourcev3.ListenerType][i] = l } // Create the snapshot snap, err := cachev3.NewSnapshot(version, resources) if err != nil { return fmt.Errorf("failed to create snapshot: %w", err) } // Apply snapshot to the cache if err := sm.Cache.SetSnapshot(ctx, sm.NodeID, snap); err != nil { return fmt.Errorf("failed to set snapshot: %w", err) } return nil } // SnapshotToConfig converts current cache snapshot into SnapshotConfig func (sm *SnapshotManager) SnapshotToConfig(ctx context.Context, nodeID string) (*SnapshotConfig, error) { snap, err := sm.Cache.GetSnapshot(nodeID) if err != nil { return nil, fmt.Errorf("failed to get snapshot for node %s: %w", nodeID, err) } config := &SnapshotConfig{ EnabledClusters: []*clusterv3.Cluster{}, EnabledListeners: []*listenerv3.Listener{}, // Disabled fields are not populated from the cache, only enabled ones. } // Convert Cluster resources for _, r := range snap.GetResources(string(resourcev3.ClusterType)) { if c, ok := r.(*clusterv3.Cluster); ok { config.EnabledClusters = append(config.EnabledClusters, c) } } // Convert Listener resources for _, r := range snap.GetResources(string(resourcev3.ListenerType)) { if l, ok := r.(*listenerv3.Listener); ok { config.EnabledListeners = append(config.EnabledListeners, l) } } return config, nil } // SetSnapshot sets a full snapshot func (sm *SnapshotManager) SetSnapshot(ctx context.Context, version string, resources map[resourcev3.Type][]types.Resource) error { snap, err := cachev3.NewSnapshot(version, resources) if err != nil { return fmt.Errorf("failed to create snapshot: %w", err) } return sm.Cache.SetSnapshot(ctx, sm.NodeID, snap) } // ---------------- Persistence and Sync Methods (Two-Layer Model) ---------------- // LoadSnapshotFromDB implements the sync DB data to cache. It rebuilds the snapshot // from the persistent store and updates the Envoy cache. func (sm *SnapshotManager) LoadSnapshotFromDB(ctx context.Context) error { fmt.Println("Loading configuration from main DB...") // 1. Try Database (Primary Source) cfg, err := sm.DB.RebuildSnapshot(ctx) if err != nil { return fmt.Errorf("failed to rebuild snapshot from DB: %w", err) } fmt.Println("Loaded configuration from DB. Updating Envoy Cache.") // 2. Update Envoy's in-memory cache (This is the 'flash cache' layer) return sm.SetSnapshotFromConfig(ctx, "db-sync-"+time.Now().Format(time.RFC3339), cfg) } // FlushCacheToDB saves the current in-memory Envoy snapshot (the source of truth) // to the persistent DB. This implements the "flash cache to db" write-through. func (sm *SnapshotManager) FlushCacheToDB(ctx context.Context, strategy DeleteStrategy) error { // 1. Get current Envoy snapshot as SnapshotConfig cfg, err := sm.SnapshotToConfig(ctx, sm.NodeID) if err != nil { return fmt.Errorf("failed to convert snapshot to config: %w", err) } // 2. Save to Persistent DB // Note: DB.SaveSnapshot handles insert/update logic for all resources if err := sm.DB.SaveSnapshot(ctx, cfg, strategy); err != nil { return fmt.Errorf("failed to save config to DB: %w", err) } fmt.Println("Successfully saved to Persistent DB.") return nil } // EnableResourceFromDB fetches a logically disabled resource from the DB and // flips its status to enabled, then adds it back to the cache. func (sm *SnapshotManager) EnableResourceFromDB(name string, typ resourcev3.Type) error { ctx := context.Background() switch typ { case resourcev3.ClusterType: if err := sm.DB.EnableCluster(ctx, name, true); err != nil { return fmt.Errorf("failed to enable cluster '%s' in DB: %w", name, err) } case resourcev3.ListenerType: if err := sm.DB.EnableListener(ctx, name, true); err != nil { return fmt.Errorf("failed to enable listener '%s' in DB: %w", name, err) } default: return fmt.Errorf("unsupported resource type for enabling: %s", typ) } // Reload snapshot from DB to update the cache with the newly enabled resource return sm.LoadSnapshotFromDB(ctx) } // ---------------- Consistency Check ---------------- // CheckCacheDBConsistency compares the currently active Envoy cache snapshot // against the enabled resources in the persistent DB. func (sm *SnapshotManager) CheckCacheDBConsistency(ctx context.Context) (*internalapi.ConsistencyReport, error) { report := &internalapi.ConsistencyReport{ CacheOnly: make(map[resourcev3.Type][]string), DBOnly: make(map[resourcev3.Type][]string), } // 1. Get current cache snapshot cacheConfig, err := sm.SnapshotToConfig(ctx, sm.NodeID) if err != nil { return nil, fmt.Errorf("failed to get snapshot from cache: %w", err) } // 2. Rebuild snapshot from DB (only fetches *enabled* resources from DB) dbConfig, err := sm.DB.RebuildSnapshot(ctx) if err != nil { return nil, fmt.Errorf("failed to rebuild snapshot from DB: %w", err) } // Helper to build a set (map[string]struct{}) of resource names for faster lookups buildNameSet := func(resources []ResourceNamer) map[string]struct{} { set := make(map[string]struct{}, len(resources)) for _, r := range resources { set[r.GetName()] = struct{}{} } return set } // Map of resource types to their lists in SnapshotConfig typeResourceMaps := []struct { typ resourcev3.Type cacheList []ResourceNamer dbList []ResourceNamer }{ {resourcev3.ClusterType, resourcesToNamers(cacheConfig.EnabledClusters), resourcesToNamers(dbConfig.EnabledClusters)}, {resourcev3.ListenerType, resourcesToNamers(cacheConfig.EnabledListeners), resourcesToNamers(dbConfig.EnabledListeners)}, } for _, m := range typeResourceMaps { cacheSet := buildNameSet(m.cacheList) dbSet := buildNameSet(m.dbList) // Check for Cache-only resources (present in cacheSet but not in dbSet) for cacheName := range cacheSet { if _, existsInDB := dbSet[cacheName]; !existsInDB { report.CacheOnly[m.typ] = append(report.CacheOnly[m.typ], cacheName) report.Inconsistent = true } } // Check for DB-only resources (present in dbSet but not in cacheSet) for dbName := range dbSet { if _, existsInCache := cacheSet[dbName]; !existsInCache { report.DBOnly[m.typ] = append(report.DBOnly[m.typ], dbName) report.Inconsistent = true } } } return report, nil } // ---------------- Generic Add / Remove / List / Get ---------------- // AddResourceToSnapshot adds any resource to the snapshot dynamically func (sm *SnapshotManager) AddResourceToSnapshot(resource types.Resource, typ resourcev3.Type) error { snap, err := sm.Cache.GetSnapshot(sm.NodeID) if err != nil { return fmt.Errorf("failed to get snapshot from cache: %w", err) } resources := sm.getAllResourcesFromSnapshot(snap) // Append to the appropriate slice switch typ { case resourcev3.ClusterType: resources[resourcev3.ClusterType] = append(resources[resourcev3.ClusterType], resource) case resourcev3.ListenerType: resources[resourcev3.ListenerType] = append(resources[resourcev3.ListenerType], resource) case resourcev3.EndpointType, resourcev3.SecretType, resourcev3.RuntimeType: resources[typ] = append(resources[typ], resource) default: return fmt.Errorf("unsupported resource type: %s", typ) } resourceNamer, ok := resource.(interface{ GetName() string }) if !ok { return fmt.Errorf("resource of type %s does not implement GetName()", typ) } newSnap, _ := cachev3.NewSnapshot( "snap-generic-"+resourceNamer.GetName(), resources, ) return sm.Cache.SetSnapshot(context.TODO(), sm.NodeID, newSnap) } // RemoveResource removes any resource by name dynamically func (sm *SnapshotManager) RemoveResource(name string, typ resourcev3.Type, strategy DeleteStrategy) error { snap, _ := sm.Cache.GetSnapshot(sm.NodeID) resources := sm.getAllResourcesFromSnapshot(snap) // Flag to check if resource was found in cache var resourceFound = false // Filter the target type if targetResources, ok := resources[typ]; ok { resources[typ], resourceFound = filterAndCheckResourcesByName(targetResources, name) } if strategy == DeleteActual { if resourceFound { return fmt.Errorf("actual delete requested but resource %s of type %s still exists in cache", name, typ) } if typ == resourcev3.ClusterType { if err := sm.DB.RemoveCluster(context.TODO(), name); err != nil { return fmt.Errorf("failed to delete cluster %s from DB: %w", name, err) } return nil } if typ == resourcev3.ListenerType { if err := sm.DB.RemoveListener(context.TODO(), name); err != nil { return fmt.Errorf("failed to delete listener %s from DB: %w", name, err) } return nil } return fmt.Errorf("actual delete not supported for resource type: %s", typ) } if !resourceFound { return fmt.Errorf("resource %s of type %s not found in cache", name, typ) } newSnap, _ := cachev3.NewSnapshot( "snap-remove-generic-"+name, resources, ) if err := sm.Cache.SetSnapshot(context.TODO(), sm.NodeID, newSnap); err != nil { return fmt.Errorf("failed to set snapshot: %w", err) } if err := sm.FlushCacheToDB(context.TODO(), strategy); err != nil { return fmt.Errorf("failed to flush cache to DB: %w", err) } return nil } // ListResources returns all enabled and disabled resources of a given type from the DB. func (sm *SnapshotManager) ListResources(typ resourcev3.Type) ([]types.Resource, []types.Resource, error) { snap, err := sm.DB.RebuildSnapshot(context.Background()) if err != nil { return nil, nil, fmt.Errorf("failed to rebuild snapshot from DB: %w", err) } var enabled, disabled []types.Resource var namerEnabled, namerDisabled []ResourceNamer switch typ { case resourcev3.ClusterType: namerEnabled = resourcesToNamers(snap.EnabledClusters) namerDisabled = resourcesToNamers(snap.DisabledClusters) case resourcev3.ListenerType: namerEnabled = resourcesToNamers(snap.EnabledListeners) namerDisabled = resourcesToNamers(snap.DisabledListeners) default: return nil, nil, fmt.Errorf("unsupported resource type: %s", typ) } // Convert ResourceNamer slices back to types.Resource slices enabled = make([]types.Resource, len(namerEnabled)) for i, r := range namerEnabled { enabled[i] = r.(types.Resource) } disabled = make([]types.Resource, len(namerDisabled)) for i, r := range namerDisabled { disabled[i] = r.(types.Resource) } return enabled, disabled, nil } // GetResourceFromCache retrieves a resource by name and type from the cache. func (sm *SnapshotManager) GetResourceFromCache(name string, typ resourcev3.Type) (types.Resource, error) { snap, err := sm.Cache.GetSnapshot(sm.NodeID) if err != nil { return nil, err } r, ok := snap.GetResources(string(typ))[name] if !ok { return nil, fmt.Errorf("%s resource %s not found in cache", typ, name) } // We rely on the type given to be correct, as all xDS resources implement GetName(). return r, nil } // ---------------- Save ---------------- func (sm *SnapshotManager) SaveSnapshotToFile(filePath string) error { snap, err := sm.Cache.GetSnapshot(sm.NodeID) if err != nil { return err } out := make(map[string][]interface{}) // Iterate over all known types clusterTypeResources := snap.GetResources(resourcev3.ClusterType) for _, r := range clusterTypeResources { out[resourcev3.ClusterType] = append(out[resourcev3.ClusterType], r) } listenerTypeResources := snap.GetResources(resourcev3.ListenerType) for _, r := range listenerTypeResources { out[resourcev3.ListenerType] = append(out[resourcev3.ListenerType], r) } data, err := json.MarshalIndent(out, "", " ") if err != nil { return err } return os.WriteFile(filePath, data, 0644) } // ---------------- Helpers ---------------- // mapToSlice converts a map of named resources to a slice of resources. func mapToSlice(m map[string]types.Resource) []types.Resource { out := make([]types.Resource, 0, len(m)) for _, r := range m { out = append(out, r) } return out } // filterAndCheckResourcesByName filters a slice of resources by name, // returning the filtered slice and a boolean indicating if the named resource was found. func filterAndCheckResourcesByName(resources []types.Resource, name string) ([]types.Resource, bool) { filtered := []types.Resource{} var found = false for _, r := range resources { if namer, ok := r.(interface{ GetName() string }); ok { if namer.GetName() != name { filtered = append(filtered, r) } else { found = true } } else { // fallback, include unknown type filtered = append(filtered, r) } } return filtered, found } // getAllResourcesFromSnapshot retrieves all known resource types from a snapshot as a map. func (sm *SnapshotManager) getAllResourcesFromSnapshot(snap cachev3.ResourceSnapshot) map[resourcev3.Type][]types.Resource { // Only include types that might be manipulated by the generic functions resources := map[resourcev3.Type][]types.Resource{ resourcev3.ClusterType: mapToSlice(snap.GetResources(string(resourcev3.ClusterType))), resourcev3.ListenerType: mapToSlice(snap.GetResources(string(resourcev3.ListenerType))), // resourcev3.EndpointType: mapToSlice(snap.GetResources(string(resourcev3.EndpointType))), // resourcev3.SecretType: mapToSlice(snap.GetResources(string(resourcev3.SecretType))), // resourcev3.RuntimeType: mapToSlice(snap.GetResources(string(resourcev3.RuntimeType))), // Include other types as needed } return resources } // resourcesToNamers converts a slice of proto-generated resource pointers // (like []*clusterv3.Cluster) to a slice of the generic ResourceNamer interface. // This is necessary because structs like *clusterv3.Cluster don't explicitly // implement types.Resource, but are compatible with it and ResourceNamer. func resourcesToNamers[T ResourceNamer](list []T) []ResourceNamer { out := make([]ResourceNamer, len(list)) for i, item := range list { out[i] = item } return out }