diff --git a/internal/api.go b/internal/api.go index 2250e2d..dc7d220 100644 --- a/internal/api.go +++ b/internal/api.go @@ -7,15 +7,16 @@ resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" internalapi "envoy-control-plane/internal/api" + "envoy-control-plane/internal/snapshot" ) // API holds reference to snapshot manager type API struct { - Manager *SnapshotManager // SnapshotManager is assumed to be defined elsewhere in internal package + Manager *snapshot.SnapshotManager // SnapshotManager is assumed to be defined elsewhere in internal package } // NewAPI returns a new REST API handler -func NewAPI(sm *SnapshotManager) *API { +func NewAPI(sm *snapshot.SnapshotManager) *API { return &API{ Manager: sm, } diff --git a/internal/api_handlers.go b/internal/api_handlers.go index b5c4bcc..b33919a 100644 --- a/internal/api_handlers.go +++ b/internal/api_handlers.go @@ -12,6 +12,7 @@ "google.golang.org/protobuf/reflect/protoreflect" internalapi "envoy-control-plane/internal/api" + "envoy-control-plane/internal/storage" ) // ---------------- Persistence Handlers ---------------- @@ -45,12 +46,12 @@ w.Header().Set("Content-Type", "application/json") // Default to DeleteLogical (no physical deletion) - deleteStrategy := DeleteLogical // DeleteLogical is assumed to be defined elsewhere + deleteStrategy := storage.DeleteLogical // DeleteLogical is assumed to be defined elsewhere // Check for 'deleteMissing' query parameter. If "true" or "1", switch to DeleteActual. deleteMissingStr := r.URL.Query().Get("deleteMissing") if deleteMissingStr == "true" || deleteMissingStr == "1" { - deleteStrategy = DeleteActual // DeleteActual is assumed to be defined elsewhere + deleteStrategy = storage.DeleteActual // DeleteActual is assumed to be defined elsewhere } // Use context.Background() since this is a top-level operation @@ -151,7 +152,7 @@ return } // Persist immediately using DeleteLogical (mark as disabled in DB) - if err := api.Manager.FlushCacheToDB(context.Background(), DeleteLogical); err != nil { + if err := api.Manager.FlushCacheToDB(context.Background(), storage.DeleteLogical); err != nil { http.Error(w, fmt.Sprintf("failed to persist resource to DB: %v", err), http.StatusInternalServerError) return } @@ -175,7 +176,7 @@ } // Use DeleteLogical to remove from cache and mark as disabled in DB - if err := api.Manager.RemoveResource(req.Name, typ, DeleteLogical); err != nil { + if err := api.Manager.RemoveResource(req.Name, typ, storage.DeleteLogical); err != nil { http.Error(w, fmt.Sprintf("failed to disable resource: %v", err), http.StatusInternalServerError) return } @@ -235,7 +236,7 @@ } // Use DeleteActual strategy for permanent removal from the DB. - if err := api.Manager.RemoveResource(req.Name, typ, DeleteActual); err != nil { + if err := api.Manager.RemoveResource(req.Name, typ, storage.DeleteActual); err != nil { // NOTE: Exact error string check is brittle but necessary to replicate original logic's status code. if err.Error() == fmt.Sprintf("resource %s for type %s is enabled and must be disabled before removal", req.Name, typ) { http.Error(w, fmt.Sprintf("resource '%s' must be disabled first before permanent removal: %v", req.Name, err), http.StatusBadRequest) diff --git a/internal/snapshot.go b/internal/snapshot.go deleted file mode 100644 index 015b221..0000000 --- a/internal/snapshot.go +++ /dev/null @@ -1,666 +0,0 @@ -package internal - -import ( - "context" - "encoding/json" - "fmt" - "os" - "time" - - clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" - listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" - - // Ensure all standard filters are imported for proto unmarshalling - _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/jwt_authn/v3" - _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/lua/v3" - _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/oauth2/v3" - _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3" - _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/listener/tls_inspector/v3" - _ "github.com/envoyproxy/go-control-plane/envoy/service/runtime/v3" - - "github.com/envoyproxy/go-control-plane/pkg/cache/types" - cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" - resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" - "google.golang.org/protobuf/encoding/protojson" - yaml "gopkg.in/yaml.v3" - - internalapi "envoy-control-plane/internal/api" - internallog "envoy-control-plane/internal/log" -) - -// ResourceNamer is an interface implemented by all xDS resources with a GetName() method. -type ResourceNamer interface { - GetName() string -} - -// SnapshotManager wraps a SnapshotCache and provides file loading/modifying -type SnapshotManager struct { - Cache cachev3.SnapshotCache - NodeID string - DB *Storage // Assuming Storage is defined elsewhere and has RebuildSnapshot/LoadAll* methods -} - -func NewSnapshotManager(cache cachev3.SnapshotCache, nodeID string, db *Storage) *SnapshotManager { - return &SnapshotManager{ - Cache: cache, - NodeID: nodeID, - DB: db, - } -} - -// YamlResources is a helper struct to unmarshal the common Envoy YAML file structure -type YamlResources struct { - Resources []yaml.Node `yaml:"resources"` -} - -// unmarshalYamlNodeToProto takes a single YAML/JSON object (represented as a map[string]interface{}) -// and unmarshals it into the given Protobuf resource pointer using protojson. -func unmarshalYamlNodeToProto(node map[string]interface{}, resource types.Resource) error { - // 1. We must remove the standard Protobuf type marker (if present) before marshaling to JSON, - // as it can interfere with direct Go struct marshaling. - delete(node, "@type") - - // 2. Marshal the generic map into JSON bytes. - jsonBytes, err := json.Marshal(node) - if err != nil { - return fmt.Errorf("failed to marshal resource node to JSON: %w", err) - } - - // 3. Unmarshal the JSON bytes into the target Protobuf struct. - // protojson correctly handles field names (snake_case vs camelCase) and Any fields. - if err := protojson.Unmarshal(jsonBytes, resource); err != nil { - return fmt.Errorf("failed to unmarshal into proto: %w", err) - } - return nil -} - -func (sm *SnapshotManager) LoadSnapshotFromFile(context context.Context, filePath string) (map[resourcev3.Type][]types.Resource, error) { - log := internallog.LogFromContext(context) - - // Read the file - data, err := os.ReadFile(filePath) - if err != nil { - return nil, fmt.Errorf("failed to read file: %w", err) - } - - var raw interface{} - if err := yaml.Unmarshal(data, &raw); err != nil { - return nil, fmt.Errorf("failed to unmarshal YAML/JSON file %s: %w", filePath, err) - } - - resources := make(map[resourcev3.Type][]types.Resource) - - var walk func(node interface{}) error - walk = func(node interface{}) error { - switch v := node.(type) { - case map[string]interface{}: - if typStr, ok := v["@type"].(string); ok { - typ := resourcev3.Type(typStr) - - // only process known top-level xDS resources - var resource types.Resource - var newResource bool - - switch typ { - case resourcev3.ClusterType: - resource = &clusterv3.Cluster{} - newResource = true - case resourcev3.ListenerType: - resource = &listenerv3.Listener{} - newResource = true - // ... other types ... - default: - log.Warnf("unsupported resource type: %s", typ) - // Skip nested or unsupported types - } - - if newResource { - // *** REPLACED: json.Marshal and protojson.Unmarshal logic *** - if err := unmarshalYamlNodeToProto(v, resource); err != nil { - return fmt.Errorf("failed to unmarshal %s from file: %w", typ, err) - } - resources[typ] = append(resources[typ], resource) - } - } - - // recurse into children - for _, child := range v { - if err := walk(child); err != nil { - return err - } - } - - case []interface{}: - for _, item := range v { - if err := walk(item); err != nil { - return err - } - } - } - return nil - } - - if err := walk(raw); err != nil { - return nil, err - } - - return resources, nil -} - -// LoadFilterChainFromYAML unmarshals a YAML string representing an Envoy Listener FilterChain -// configuration into a listenerv3.FilterChain protobuf message using protojson pipeline. -func (sm *SnapshotManager) LoadFilterChainFromYAML(ctx context.Context, yamlStr string) (*listenerv3.FilterChain, error) { - log := internallog.LogFromContext(ctx) - - // 1. Unmarshal YAML into a generic Go map - var rawChainMap map[string]interface{} - if err := yaml.Unmarshal([]byte(yamlStr), &rawChainMap); err != nil { - log.Errorf("Failed to unmarshal YAML: %v", err) - return nil, fmt.Errorf("failed to unmarshal YAML into generic map: %w", err) - } - if rawChainMap == nil { - return nil, fmt.Errorf("failed to unmarshal YAML: input was empty or invalid") - } - - // 2. Unmarshal the generic map into the Protobuf struct using the helper - rawChain := &listenerv3.FilterChain{} - if err := unmarshalYamlNodeToProto(rawChainMap, rawChain); err != nil { - return nil, fmt.Errorf("failed to unmarshal YAML into FilterChain using protojson: %w", err) - } - - // Check if the FilterChain contains any filters (optional but good sanity check) - if len(rawChain.Filters) == 0 { - return nil, fmt.Errorf("filter chain loaded but contains no network filters") - } - - // Return the single FilterChain object. - return rawChain, nil -} - -// AppendFilterChainToListener loads the current listener from the cache, appends the provided -// FilterChain to its list of FilterChains, and updates the cache with the new snapshot. -func (sm *SnapshotManager) AppendFilterChainToListener(ctx context.Context, listenerName string, newFilterChain *listenerv3.FilterChain) error { - log := internallog.LogFromContext(ctx) - - // 1. Get the current Listener from the cache - resource, err := sm.GetResourceFromCache(listenerName, resourcev3.ListenerType) - if err != nil { - return fmt.Errorf("failed to get listener '%s' from cache: %w", listenerName, err) - } - - listener, ok := resource.(*listenerv3.Listener) - if !ok { - return fmt.Errorf("resource '%s' is not a Listener type", listenerName) - } - - // 2. Append the new FilterChain to the listener's list of filter chains. - // This is the core change: appending a *FilterChain, not just []*Filter. - listener.FilterChains = append(listener.FilterChains, newFilterChain) - log.Infof("Appended new filter chain (match: %v) to listener '%s'", newFilterChain.FilterChainMatch, listenerName) - - // 3. Create a new snapshot with the modified listener (rest of logic remains similar) - snap, err := sm.Cache.GetSnapshot(sm.NodeID) - if err != nil { - return fmt.Errorf("failed to get snapshot for modification: %w", err) - } - - // Get all current resources - resources := sm.getAllResourcesFromSnapshot(snap) - - // Replace the old listener with the modified one - listenerList, ok := resources[resourcev3.ListenerType] - if !ok { - return fmt.Errorf("listener resource type not present in snapshot") - } - - foundAndReplaced := false - for i, res := range listenerList { - if namer, ok := res.(interface{ GetName() string }); ok && namer.GetName() == listenerName { - listenerList[i] = listener - foundAndReplaced = true - break - } - } - - if !foundAndReplaced { - return fmt.Errorf("failed to locate listener '%s' in current resource list for replacement", listenerName) - } - - // Create and set the new snapshot - version := fmt.Sprintf("listener-update-%s-%d", listenerName, time.Now().UnixNano()) - newSnap, err := cachev3.NewSnapshot(version, resources) - if err != nil { - return fmt.Errorf("failed to create new snapshot: %w", err) - } - - if err := sm.Cache.SetSnapshot(ctx, sm.NodeID, newSnap); err != nil { - return fmt.Errorf("failed to set new snapshot: %w", err) - } - - log.Infof("Successfully updated listener '%s' in cache with new filter chain.", listenerName) - - return nil -} - -// SetSnapshotFromConfig sets a snapshot from an aggregated SnapshotConfig -func (sm *SnapshotManager) SetSnapshotFromConfig(ctx context.Context, version string, cfg *SnapshotConfig) error { - if cfg == nil { - return fmt.Errorf("snapshot config is nil") - } - - // Ensure version is not empty - if version == "" { - version = fmt.Sprintf("snap-%d", time.Now().UnixNano()) - } - - // Build the resource map expected by cachev3.NewSnapshot - resources := map[resourcev3.Type][]types.Resource{ - resourcev3.ClusterType: make([]types.Resource, len(cfg.EnabledClusters)), - resourcev3.ListenerType: make([]types.Resource, len(cfg.EnabledListeners)), - // Other types if supported by SnapshotConfig, can be added here - } - - // Populate slices by direct type assertion and conversion - for i, c := range cfg.EnabledClusters { - resources[resourcev3.ClusterType][i] = c - } - for i, l := range cfg.EnabledListeners { - resources[resourcev3.ListenerType][i] = l - } - - // Create the snapshot - snap, err := cachev3.NewSnapshot(version, resources) - if err != nil { - return fmt.Errorf("failed to create snapshot: %w", err) - } - - // Apply snapshot to the cache - if err := sm.Cache.SetSnapshot(ctx, sm.NodeID, snap); err != nil { - return fmt.Errorf("failed to set snapshot: %w", err) - } - - return nil -} - -// SnapshotToConfig converts current cache snapshot into SnapshotConfig -func (sm *SnapshotManager) SnapshotToConfig(ctx context.Context, nodeID string) (*SnapshotConfig, error) { - snap, err := sm.Cache.GetSnapshot(nodeID) - if err != nil { - return nil, fmt.Errorf("failed to get snapshot for node %s: %w", nodeID, err) - } - - config := &SnapshotConfig{ - EnabledClusters: []*clusterv3.Cluster{}, - EnabledListeners: []*listenerv3.Listener{}, - // Disabled fields are not populated from the cache, only enabled ones. - } - - // Convert Cluster resources - for _, r := range snap.GetResources(string(resourcev3.ClusterType)) { - if c, ok := r.(*clusterv3.Cluster); ok { - config.EnabledClusters = append(config.EnabledClusters, c) - } - } - - // Convert Listener resources - for _, r := range snap.GetResources(string(resourcev3.ListenerType)) { - if l, ok := r.(*listenerv3.Listener); ok { - config.EnabledListeners = append(config.EnabledListeners, l) - } - } - - return config, nil -} - -// SetSnapshot sets a full snapshot -func (sm *SnapshotManager) SetSnapshot(ctx context.Context, version string, resources map[resourcev3.Type][]types.Resource) error { - snap, err := cachev3.NewSnapshot(version, resources) - if err != nil { - return fmt.Errorf("failed to create snapshot: %w", err) - } - return sm.Cache.SetSnapshot(ctx, sm.NodeID, snap) -} - -// ---------------- Persistence and Sync Methods (Two-Layer Model) ---------------- - -// LoadSnapshotFromDB implements the sync DB data to cache. It rebuilds the snapshot -// from the persistent store and updates the Envoy cache. -func (sm *SnapshotManager) LoadSnapshotFromDB(ctx context.Context) error { - fmt.Println("Loading configuration from main DB...") - - // 1. Try Database (Primary Source) - cfg, err := sm.DB.RebuildSnapshot(ctx) - if err != nil { - return fmt.Errorf("failed to rebuild snapshot from DB: %w", err) - } - - fmt.Println("Loaded configuration from DB. Updating Envoy Cache.") - - // 2. Update Envoy's in-memory cache (This is the 'flash cache' layer) - return sm.SetSnapshotFromConfig(ctx, "db-sync-"+time.Now().Format(time.RFC3339), cfg) -} - -// FlushCacheToDB saves the current in-memory Envoy snapshot (the source of truth) -// to the persistent DB. This implements the "flash cache to db" write-through. -func (sm *SnapshotManager) FlushCacheToDB(ctx context.Context, strategy DeleteStrategy) error { - // 1. Get current Envoy snapshot as SnapshotConfig - cfg, err := sm.SnapshotToConfig(ctx, sm.NodeID) - if err != nil { - return fmt.Errorf("failed to convert snapshot to config: %w", err) - } - - // 2. Save to Persistent DB - // Note: DB.SaveSnapshot handles insert/update logic for all resources - if err := sm.DB.SaveSnapshot(ctx, cfg, strategy); err != nil { - return fmt.Errorf("failed to save config to DB: %w", err) - } - fmt.Println("Successfully saved to Persistent DB.") - - return nil -} - -// EnableResourceFromDB fetches a logically disabled resource from the DB and -// flips its status to enabled, then adds it back to the cache. -func (sm *SnapshotManager) EnableResourceFromDB(name string, typ resourcev3.Type) error { - ctx := context.Background() - switch typ { - case resourcev3.ClusterType: - if err := sm.DB.EnableCluster(ctx, name, true); err != nil { - return fmt.Errorf("failed to enable cluster '%s' in DB: %w", name, err) - } - case resourcev3.ListenerType: - if err := sm.DB.EnableListener(ctx, name, true); err != nil { - return fmt.Errorf("failed to enable listener '%s' in DB: %w", name, err) - } - default: - return fmt.Errorf("unsupported resource type for enabling: %s", typ) - } - - // Reload snapshot from DB to update the cache with the newly enabled resource - return sm.LoadSnapshotFromDB(ctx) -} - -// ---------------- Consistency Check ---------------- - -// CheckCacheDBConsistency compares the currently active Envoy cache snapshot -// against the enabled resources in the persistent DB. -func (sm *SnapshotManager) CheckCacheDBConsistency(ctx context.Context) (*internalapi.ConsistencyReport, error) { - report := &internalapi.ConsistencyReport{ - CacheOnly: make(map[resourcev3.Type][]string), - DBOnly: make(map[resourcev3.Type][]string), - } - - // 1. Get current cache snapshot - cacheConfig, err := sm.SnapshotToConfig(ctx, sm.NodeID) - if err != nil { - return nil, fmt.Errorf("failed to get snapshot from cache: %w", err) - } - - // 2. Rebuild snapshot from DB (only fetches *enabled* resources from DB) - dbConfig, err := sm.DB.RebuildSnapshot(ctx) - if err != nil { - return nil, fmt.Errorf("failed to rebuild snapshot from DB: %w", err) - } - - // Helper to build a set (map[string]struct{}) of resource names for faster lookups - buildNameSet := func(resources []ResourceNamer) map[string]struct{} { - set := make(map[string]struct{}, len(resources)) - for _, r := range resources { - set[r.GetName()] = struct{}{} - } - return set - } - - // Map of resource types to their lists in SnapshotConfig - typeResourceMaps := []struct { - typ resourcev3.Type - cacheList []ResourceNamer - dbList []ResourceNamer - }{ - {resourcev3.ClusterType, resourcesToNamers(cacheConfig.EnabledClusters), resourcesToNamers(dbConfig.EnabledClusters)}, - {resourcev3.ListenerType, resourcesToNamers(cacheConfig.EnabledListeners), resourcesToNamers(dbConfig.EnabledListeners)}, - } - - for _, m := range typeResourceMaps { - cacheSet := buildNameSet(m.cacheList) - dbSet := buildNameSet(m.dbList) - - // Check for Cache-only resources (present in cacheSet but not in dbSet) - for cacheName := range cacheSet { - if _, existsInDB := dbSet[cacheName]; !existsInDB { - report.CacheOnly[m.typ] = append(report.CacheOnly[m.typ], cacheName) - report.Inconsistent = true - } - } - - // Check for DB-only resources (present in dbSet but not in cacheSet) - for dbName := range dbSet { - if _, existsInCache := cacheSet[dbName]; !existsInCache { - report.DBOnly[m.typ] = append(report.DBOnly[m.typ], dbName) - report.Inconsistent = true - } - } - } - - return report, nil -} - -// ---------------- Generic Add / Remove / List / Get ---------------- - -// AddResourceToSnapshot adds any resource to the snapshot dynamically -func (sm *SnapshotManager) AddResourceToSnapshot(resource types.Resource, typ resourcev3.Type) error { - snap, err := sm.Cache.GetSnapshot(sm.NodeID) - if err != nil { - return fmt.Errorf("failed to get snapshot from cache: %w", err) - } - resources := sm.getAllResourcesFromSnapshot(snap) - - // Append to the appropriate slice - switch typ { - case resourcev3.ClusterType: - resources[resourcev3.ClusterType] = append(resources[resourcev3.ClusterType], resource) - case resourcev3.ListenerType: - resources[resourcev3.ListenerType] = append(resources[resourcev3.ListenerType], resource) - case resourcev3.EndpointType, resourcev3.SecretType, resourcev3.RuntimeType: - resources[typ] = append(resources[typ], resource) - default: - return fmt.Errorf("unsupported resource type: %s", typ) - } - - resourceNamer, ok := resource.(interface{ GetName() string }) - if !ok { - return fmt.Errorf("resource of type %s does not implement GetName()", typ) - } - - newSnap, _ := cachev3.NewSnapshot( - "snap-generic-"+resourceNamer.GetName(), - resources, - ) - return sm.Cache.SetSnapshot(context.TODO(), sm.NodeID, newSnap) -} - -// RemoveResource removes any resource by name dynamically -func (sm *SnapshotManager) RemoveResource(name string, typ resourcev3.Type, strategy DeleteStrategy) error { - snap, _ := sm.Cache.GetSnapshot(sm.NodeID) - resources := sm.getAllResourcesFromSnapshot(snap) - - // Flag to check if resource was found in cache - var resourceFound = false - - // Filter the target type - if targetResources, ok := resources[typ]; ok { - resources[typ], resourceFound = filterAndCheckResourcesByName(targetResources, name) - } - - if strategy == DeleteActual { - if resourceFound { - return fmt.Errorf("actual delete requested but resource %s of type %s still exists in cache", name, typ) - } - if typ == resourcev3.ClusterType { - if err := sm.DB.RemoveCluster(context.TODO(), name); err != nil { - return fmt.Errorf("failed to delete cluster %s from DB: %w", name, err) - } - return nil - } - if typ == resourcev3.ListenerType { - if err := sm.DB.RemoveListener(context.TODO(), name); err != nil { - return fmt.Errorf("failed to delete listener %s from DB: %w", name, err) - } - return nil - } - return fmt.Errorf("actual delete not supported for resource type: %s", typ) - } - - if !resourceFound { - return fmt.Errorf("resource %s of type %s not found in cache", name, typ) - } - - newSnap, _ := cachev3.NewSnapshot( - "snap-remove-generic-"+name, - resources, - ) - - if err := sm.Cache.SetSnapshot(context.TODO(), sm.NodeID, newSnap); err != nil { - return fmt.Errorf("failed to set snapshot: %w", err) - } - - if err := sm.FlushCacheToDB(context.TODO(), strategy); err != nil { - return fmt.Errorf("failed to flush cache to DB: %w", err) - } - return nil -} - -// ListResources returns all enabled and disabled resources of a given type from the DB. -func (sm *SnapshotManager) ListResources(typ resourcev3.Type) ([]types.Resource, []types.Resource, error) { - snap, err := sm.DB.RebuildSnapshot(context.Background()) - if err != nil { - return nil, nil, fmt.Errorf("failed to rebuild snapshot from DB: %w", err) - } - - var enabled, disabled []types.Resource - var namerEnabled, namerDisabled []ResourceNamer - - switch typ { - case resourcev3.ClusterType: - namerEnabled = resourcesToNamers(snap.EnabledClusters) - namerDisabled = resourcesToNamers(snap.DisabledClusters) - case resourcev3.ListenerType: - namerEnabled = resourcesToNamers(snap.EnabledListeners) - namerDisabled = resourcesToNamers(snap.DisabledListeners) - default: - return nil, nil, fmt.Errorf("unsupported resource type: %s", typ) - } - - // Convert ResourceNamer slices back to types.Resource slices - enabled = make([]types.Resource, len(namerEnabled)) - for i, r := range namerEnabled { - enabled[i] = r.(types.Resource) - } - - disabled = make([]types.Resource, len(namerDisabled)) - for i, r := range namerDisabled { - disabled[i] = r.(types.Resource) - } - - return enabled, disabled, nil -} - -// GetResourceFromCache retrieves a resource by name and type from the cache. -func (sm *SnapshotManager) GetResourceFromCache(name string, typ resourcev3.Type) (types.Resource, error) { - snap, err := sm.Cache.GetSnapshot(sm.NodeID) - if err != nil { - return nil, err - } - r, ok := snap.GetResources(string(typ))[name] - if !ok { - return nil, fmt.Errorf("%s resource %s not found in cache", typ, name) - } - - // We rely on the type given to be correct, as all xDS resources implement GetName(). - return r, nil -} - -// ---------------- Save ---------------- - -func (sm *SnapshotManager) SaveSnapshotToFile(filePath string) error { - snap, err := sm.Cache.GetSnapshot(sm.NodeID) - if err != nil { - return err - } - - out := make(map[string][]interface{}) - - // Iterate over all known types - clusterTypeResources := snap.GetResources(resourcev3.ClusterType) - for _, r := range clusterTypeResources { - out[resourcev3.ClusterType] = append(out[resourcev3.ClusterType], r) - } - listenerTypeResources := snap.GetResources(resourcev3.ListenerType) - for _, r := range listenerTypeResources { - out[resourcev3.ListenerType] = append(out[resourcev3.ListenerType], r) - } - - data, err := json.MarshalIndent(out, "", " ") - if err != nil { - return err - } - - return os.WriteFile(filePath, data, 0644) -} - -// ---------------- Helpers ---------------- - -// mapToSlice converts a map of named resources to a slice of resources. -func mapToSlice(m map[string]types.Resource) []types.Resource { - out := make([]types.Resource, 0, len(m)) - for _, r := range m { - out = append(out, r) - } - return out -} - -// filterAndCheckResourcesByName filters a slice of resources by name, -// returning the filtered slice and a boolean indicating if the named resource was found. -func filterAndCheckResourcesByName(resources []types.Resource, name string) ([]types.Resource, bool) { - filtered := []types.Resource{} - var found = false - for _, r := range resources { - if namer, ok := r.(interface{ GetName() string }); ok { - if namer.GetName() != name { - filtered = append(filtered, r) - } else { - found = true - } - } else { - // fallback, include unknown type - filtered = append(filtered, r) - } - } - return filtered, found -} - -// getAllResourcesFromSnapshot retrieves all known resource types from a snapshot as a map. -func (sm *SnapshotManager) getAllResourcesFromSnapshot(snap cachev3.ResourceSnapshot) map[resourcev3.Type][]types.Resource { - // Only include types that might be manipulated by the generic functions - resources := map[resourcev3.Type][]types.Resource{ - resourcev3.ClusterType: mapToSlice(snap.GetResources(string(resourcev3.ClusterType))), - resourcev3.ListenerType: mapToSlice(snap.GetResources(string(resourcev3.ListenerType))), - // resourcev3.EndpointType: mapToSlice(snap.GetResources(string(resourcev3.EndpointType))), - // resourcev3.SecretType: mapToSlice(snap.GetResources(string(resourcev3.SecretType))), - // resourcev3.RuntimeType: mapToSlice(snap.GetResources(string(resourcev3.RuntimeType))), - // Include other types as needed - } - return resources -} - -// resourcesToNamers converts a slice of proto-generated resource pointers -// (like []*clusterv3.Cluster) to a slice of the generic ResourceNamer interface. -// This is necessary because structs like *clusterv3.Cluster don't explicitly -// implement types.Resource, but are compatible with it and ResourceNamer. -func resourcesToNamers[T ResourceNamer](list []T) []ResourceNamer { - out := make([]ResourceNamer, len(list)) - for i, item := range list { - out[i] = item - } - return out -} diff --git a/internal/snapshot/manager.go b/internal/snapshot/manager.go new file mode 100644 index 0000000..6c71a30 --- /dev/null +++ b/internal/snapshot/manager.go @@ -0,0 +1,50 @@ +package snapshot + +import ( + "context" + "envoy-control-plane/internal/storage" + "fmt" + + // Ensure all standard filters are imported for proto unmarshalling + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/jwt_authn/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/lua/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/oauth2/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/listener/tls_inspector/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/service/runtime/v3" + + "github.com/envoyproxy/go-control-plane/pkg/cache/types" + cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" +) + +// ResourceNamer is an interface implemented by all xDS resources with a GetName() method. +type ResourceNamer interface { + GetName() string +} + +// SnapshotManager wraps a SnapshotCache and provides file loading/modifying +// and DB synchronization functionality for Envoy xDS resources. +type SnapshotManager struct { + Cache cachev3.SnapshotCache + NodeID string + DB *storage.Storage // Assuming Storage is defined elsewhere (DB persistence layer) +} + +// NewSnapshotManager creates a new instance of SnapshotManager. +func NewSnapshotManager(cache cachev3.SnapshotCache, nodeID string, db *storage.Storage) *SnapshotManager { + return &SnapshotManager{ + Cache: cache, + NodeID: nodeID, + DB: db, + } +} + +// SetSnapshot sets a full snapshot (utility method used by others). +func (sm *SnapshotManager) SetSnapshot(ctx context.Context, version string, resources map[resourcev3.Type][]types.Resource) error { + snap, err := cachev3.NewSnapshot(version, resources) + if err != nil { + return fmt.Errorf("failed to create snapshot: %w", err) + } + return sm.Cache.SetSnapshot(ctx, sm.NodeID, snap) +} diff --git a/internal/snapshot/resource_config.go b/internal/snapshot/resource_config.go new file mode 100644 index 0000000..2779415 --- /dev/null +++ b/internal/snapshot/resource_config.go @@ -0,0 +1,253 @@ +package snapshot + +import ( + "context" + "fmt" + "time" + + clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" + cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + + internalapi "envoy-control-plane/internal/api" + "envoy-control-plane/internal/storage" +) + +// SetSnapshotFromConfig sets a snapshot from an aggregated SnapshotConfig +func (sm *SnapshotManager) SetSnapshotFromConfig(ctx context.Context, version string, cfg *storage.SnapshotConfig) error { + if cfg == nil { + return fmt.Errorf("snapshot config is nil") + } + + // Ensure version is not empty + if version == "" { + version = fmt.Sprintf("snap-%d", time.Now().UnixNano()) + } + + // Build the resource map expected by cachev3.NewSnapshot + resources := map[resourcev3.Type][]types.Resource{ + resourcev3.ClusterType: make([]types.Resource, len(cfg.EnabledClusters)), + resourcev3.ListenerType: make([]types.Resource, len(cfg.EnabledListeners)), + // Other types if supported by SnapshotConfig, can be added here + } + + // Populate slices by direct type assertion and conversion + for i, c := range cfg.EnabledClusters { + resources[resourcev3.ClusterType][i] = c + } + for i, l := range cfg.EnabledListeners { + resources[resourcev3.ListenerType][i] = l + } + + // Create the snapshot + snap, err := cachev3.NewSnapshot(version, resources) + if err != nil { + return fmt.Errorf("failed to create snapshot: %w", err) + } + + // Apply snapshot to the cache + if err := sm.Cache.SetSnapshot(ctx, sm.NodeID, snap); err != nil { + return fmt.Errorf("failed to set snapshot: %w", err) + } + + return nil +} + +// SnapshotToConfig converts current cache snapshot into SnapshotConfig +func (sm *SnapshotManager) SnapshotToConfig(ctx context.Context, nodeID string) (*storage.SnapshotConfig, error) { + snap, err := sm.Cache.GetSnapshot(nodeID) + if err != nil { + return nil, fmt.Errorf("failed to get snapshot for node %s: %w", nodeID, err) + } + + config := &storage.SnapshotConfig{ + EnabledClusters: []*clusterv3.Cluster{}, + EnabledListeners: []*listenerv3.Listener{}, + // Disabled fields are not populated from the cache, only enabled ones. + } + + // Convert Cluster resources + for _, r := range snap.GetResources(string(resourcev3.ClusterType)) { + if c, ok := r.(*clusterv3.Cluster); ok { + config.EnabledClusters = append(config.EnabledClusters, c) + } + } + + // Convert Listener resources + for _, r := range snap.GetResources(string(resourcev3.ListenerType)) { + if l, ok := r.(*listenerv3.Listener); ok { + config.EnabledListeners = append(config.EnabledListeners, l) + } + } + + return config, nil +} + +// LoadSnapshotFromDB implements the sync DB data to cache. It rebuilds the snapshot +// from the persistent store and updates the Envoy cache. +func (sm *SnapshotManager) LoadSnapshotFromDB(ctx context.Context) error { + fmt.Println("Loading configuration from main DB...") + + // 1. Try Database (Primary Source) + cfg, err := sm.DB.RebuildSnapshot(ctx) + if err != nil { + return fmt.Errorf("failed to rebuild snapshot from DB: %w", err) + } + + fmt.Println("Loaded configuration from DB. Updating Envoy Cache.") + + // 2. Update Envoy's in-memory cache (This is the 'flash cache' layer) + return sm.SetSnapshotFromConfig(ctx, "db-sync-"+time.Now().Format(time.RFC3339), cfg) +} + +// FlushCacheToDB saves the current in-memory Envoy snapshot (the source of truth) +// to the persistent DB. This implements the "flash cache to db" write-through. +func (sm *SnapshotManager) FlushCacheToDB(ctx context.Context, strategy storage.DeleteStrategy) error { + // 1. Get current Envoy snapshot as SnapshotConfig + cfg, err := sm.SnapshotToConfig(ctx, sm.NodeID) + if err != nil { + return fmt.Errorf("failed to convert snapshot to config: %w", err) + } + + // 2. Save to Persistent DB + // Note: DB.SaveSnapshot handles insert/update logic for all resources + if err := sm.DB.SaveSnapshot(ctx, cfg, strategy); err != nil { + return fmt.Errorf("failed to save config to DB: %w", err) + } + fmt.Println("Successfully saved to Persistent DB.") + + return nil +} + +// EnableResourceFromDB fetches a logically disabled resource from the DB and +// flips its status to enabled, then adds it back to the cache. +func (sm *SnapshotManager) EnableResourceFromDB(name string, typ resourcev3.Type) error { + ctx := context.Background() + switch typ { + case resourcev3.ClusterType: + if err := sm.DB.EnableCluster(ctx, name, true); err != nil { + return fmt.Errorf("failed to enable cluster '%s' in DB: %w", name, err) + } + case resourcev3.ListenerType: + if err := sm.DB.EnableListener(ctx, name, true); err != nil { + return fmt.Errorf("failed to enable listener '%s' in DB: %w", name, err) + } + default: + return fmt.Errorf("unsupported resource type for enabling: %s", typ) + } + + // Reload snapshot from DB to update the cache with the newly enabled resource + return sm.LoadSnapshotFromDB(ctx) +} + +// CheckCacheDBConsistency compares the currently active Envoy cache snapshot +// against the enabled resources in the persistent DB. +func (sm *SnapshotManager) CheckCacheDBConsistency(ctx context.Context) (*internalapi.ConsistencyReport, error) { + report := &internalapi.ConsistencyReport{ + CacheOnly: make(map[resourcev3.Type][]string), + DBOnly: make(map[resourcev3.Type][]string), + } + + // 1. Get current cache snapshot + cacheConfig, err := sm.SnapshotToConfig(ctx, sm.NodeID) + if err != nil { + return nil, fmt.Errorf("failed to get snapshot from cache: %w", err) + } + + // 2. Rebuild snapshot from DB (only fetches *enabled* resources from DB) + dbConfig, err := sm.DB.RebuildSnapshot(ctx) + if err != nil { + return nil, fmt.Errorf("failed to rebuild snapshot from DB: %w", err) + } + + // Helper to build a set (map[string]struct{}) of resource names for faster lookups + buildNameSet := func(resources []ResourceNamer) map[string]struct{} { + set := make(map[string]struct{}, len(resources)) + for _, r := range resources { + set[r.GetName()] = struct{}{} + } + return set + } + + // Map of resource types to their lists in SnapshotConfig + typeResourceMaps := []struct { + typ resourcev3.Type + cacheList []ResourceNamer + dbList []ResourceNamer + }{ + {resourcev3.ClusterType, resourcesToNamers(cacheConfig.EnabledClusters), resourcesToNamers(dbConfig.EnabledClusters)}, + {resourcev3.ListenerType, resourcesToNamers(cacheConfig.EnabledListeners), resourcesToNamers(dbConfig.EnabledListeners)}, + } + + for _, m := range typeResourceMaps { + cacheSet := buildNameSet(m.cacheList) + dbSet := buildNameSet(m.dbList) + + // Check for Cache-only resources (present in cacheSet but not in dbSet) + for cacheName := range cacheSet { + if _, existsInDB := dbSet[cacheName]; !existsInDB { + report.CacheOnly[m.typ] = append(report.CacheOnly[m.typ], cacheName) + report.Inconsistent = true + } + } + + // Check for DB-only resources (present in dbSet but not in cacheSet) + for dbName := range dbSet { + if _, existsInCache := cacheSet[dbName]; !existsInCache { + report.DBOnly[m.typ] = append(report.DBOnly[m.typ], dbName) + report.Inconsistent = true + } + } + } + + return report, nil +} + +// ListResources returns all enabled and disabled resources of a given type from the DB. +func (sm *SnapshotManager) ListResources(typ resourcev3.Type) ([]types.Resource, []types.Resource, error) { + snap, err := sm.DB.RebuildSnapshot(context.Background()) + if err != nil { + return nil, nil, fmt.Errorf("failed to rebuild snapshot from DB: %w", err) + } + + var enabled, disabled []types.Resource + var namerEnabled, namerDisabled []ResourceNamer + + switch typ { + case resourcev3.ClusterType: + namerEnabled = resourcesToNamers(snap.EnabledClusters) + namerDisabled = resourcesToNamers(snap.DisabledClusters) + case resourcev3.ListenerType: + namerEnabled = resourcesToNamers(snap.EnabledListeners) + namerDisabled = resourcesToNamers(snap.DisabledListeners) + default: + return nil, nil, fmt.Errorf("unsupported resource type: %s", typ) + } + + // Convert ResourceNamer slices back to types.Resource slices + enabled = make([]types.Resource, len(namerEnabled)) + for i, r := range namerEnabled { + enabled[i] = r.(types.Resource) + } + + disabled = make([]types.Resource, len(namerDisabled)) + for i, r := range namerDisabled { + disabled[i] = r.(types.Resource) + } + + return enabled, disabled, nil +} + +// resourcesToNamers converts a slice of proto-generated resource pointers +// (like []*clusterv3.Cluster) to a slice of the generic ResourceNamer interface. +// This is necessary because structs like *clusterv3.Cluster don't explicitly +// implement types.Resource, but are compatible with it and ResourceNamer. +func resourcesToNamers[T ResourceNamer](list []T) []ResourceNamer { + out := make([]ResourceNamer, len(list)) + for i, item := range list { + out[i] = item + } + return out +} diff --git a/internal/snapshot/resource_crud.go b/internal/snapshot/resource_crud.go new file mode 100644 index 0000000..642c5d9 --- /dev/null +++ b/internal/snapshot/resource_crud.go @@ -0,0 +1,220 @@ +package snapshot + +import ( + "context" + "fmt" + "time" + + listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" + cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + + internallog "envoy-control-plane/internal/log" + "envoy-control-plane/internal/storage" +) + +// AppendFilterChainToListener loads the current listener from the cache, appends the provided +// FilterChain to its list of FilterChains, and updates the cache with the new snapshot. +func (sm *SnapshotManager) AppendFilterChainToListener(ctx context.Context, listenerName string, newFilterChain *listenerv3.FilterChain) error { + log := internallog.LogFromContext(ctx) + + // 1. Get the current Listener from the cache + resource, err := sm.GetResourceFromCache(listenerName, resourcev3.ListenerType) + if err != nil { + return fmt.Errorf("failed to get listener '%s' from cache: %w", listenerName, err) + } + + listener, ok := resource.(*listenerv3.Listener) + if !ok { + return fmt.Errorf("resource '%s' is not a Listener type", listenerName) + } + + // 2. Append the new FilterChain to the listener's list of filter chains. + listener.FilterChains = append(listener.FilterChains, newFilterChain) + log.Infof("Appended new filter chain (match: %v) to listener '%s'", newFilterChain.FilterChainMatch, listenerName) + + // 3. Create a new snapshot with the modified listener (rest of logic remains similar) + snap, err := sm.Cache.GetSnapshot(sm.NodeID) + if err != nil { + return fmt.Errorf("failed to get snapshot for modification: %w", err) + } + + // Get all current resources + resources := sm.getAllResourcesFromSnapshot(snap) + + // Replace the old listener with the modified one + listenerList, ok := resources[resourcev3.ListenerType] + if !ok { + return fmt.Errorf("listener resource type not present in snapshot") + } + + foundAndReplaced := false + for i, res := range listenerList { + if namer, ok := res.(interface{ GetName() string }); ok && namer.GetName() == listenerName { + listenerList[i] = listener + foundAndReplaced = true + break + } + } + + if !foundAndReplaced { + return fmt.Errorf("failed to locate listener '%s' in current resource list for replacement", listenerName) + } + + // Create and set the new snapshot + version := fmt.Sprintf("listener-update-%s-%d", listenerName, time.Now().UnixNano()) + newSnap, err := cachev3.NewSnapshot(version, resources) + if err != nil { + return fmt.Errorf("failed to create new snapshot: %w", err) + } + + if err := sm.Cache.SetSnapshot(ctx, sm.NodeID, newSnap); err != nil { + return fmt.Errorf("failed to set new snapshot: %w", err) + } + + log.Infof("Successfully updated listener '%s' in cache with new filter chain.", listenerName) + + return nil +} + +// AddResourceToSnapshot adds any resource to the snapshot dynamically +func (sm *SnapshotManager) AddResourceToSnapshot(resource types.Resource, typ resourcev3.Type) error { + snap, err := sm.Cache.GetSnapshot(sm.NodeID) + if err != nil { + return fmt.Errorf("failed to get snapshot from cache: %w", err) + } + resources := sm.getAllResourcesFromSnapshot(snap) + + // Append to the appropriate slice + switch typ { + case resourcev3.ClusterType: + resources[resourcev3.ClusterType] = append(resources[resourcev3.ClusterType], resource) + case resourcev3.ListenerType: + resources[resourcev3.ListenerType] = append(resources[resourcev3.ListenerType], resource) + case resourcev3.EndpointType, resourcev3.SecretType, resourcev3.RuntimeType: + resources[typ] = append(resources[typ], resource) + default: + return fmt.Errorf("unsupported resource type: %s", typ) + } + + resourceNamer, ok := resource.(interface{ GetName() string }) + if !ok { + return fmt.Errorf("resource of type %s does not implement GetName()", typ) + } + + newSnap, _ := cachev3.NewSnapshot( + "snap-generic-"+resourceNamer.GetName(), + resources, + ) + return sm.Cache.SetSnapshot(context.TODO(), sm.NodeID, newSnap) +} + +// RemoveResource removes any resource by name dynamically +func (sm *SnapshotManager) RemoveResource(name string, typ resourcev3.Type, strategy storage.DeleteStrategy) error { + snap, _ := sm.Cache.GetSnapshot(sm.NodeID) + resources := sm.getAllResourcesFromSnapshot(snap) + + // Flag to check if resource was found in cache + var resourceFound = false + + // Filter the target type + if targetResources, ok := resources[typ]; ok { + resources[typ], resourceFound = filterAndCheckResourcesByName(targetResources, name) + } + + if strategy == storage.DeleteActual { + if resourceFound { + return fmt.Errorf("actual delete requested but resource %s of type %s still exists in cache", name, typ) + } + if typ == resourcev3.ClusterType { + if err := sm.DB.RemoveCluster(context.TODO(), name); err != nil { + return fmt.Errorf("failed to delete cluster %s from DB: %w", name, err) + } + return nil + } + if typ == resourcev3.ListenerType { + if err := sm.DB.RemoveListener(context.TODO(), name); err != nil { + return fmt.Errorf("failed to delete listener %s from DB: %w", name, err) + } + return nil + } + return fmt.Errorf("actual delete not supported for resource type: %s", typ) + } + + if !resourceFound { + return fmt.Errorf("resource %s of type %s not found in cache", name, typ) + } + + newSnap, _ := cachev3.NewSnapshot( + "snap-remove-generic-"+name, + resources, + ) + + if err := sm.Cache.SetSnapshot(context.TODO(), sm.NodeID, newSnap); err != nil { + return fmt.Errorf("failed to set snapshot: %w", err) + } + + if err := sm.FlushCacheToDB(context.TODO(), strategy); err != nil { + return fmt.Errorf("failed to flush cache to DB: %w", err) + } + return nil +} + +// GetResourceFromCache retrieves a resource by name and type from the cache. +func (sm *SnapshotManager) GetResourceFromCache(name string, typ resourcev3.Type) (types.Resource, error) { + snap, err := sm.Cache.GetSnapshot(sm.NodeID) + if err != nil { + return nil, err + } + r, ok := snap.GetResources(string(typ))[name] + if !ok { + return nil, fmt.Errorf("%s resource %s not found in cache", typ, name) + } + + // We rely on the type given to be correct, as all xDS resources implement GetName(). + return r, nil +} + +// getAllResourcesFromSnapshot retrieves all known resource types from a snapshot as a map. +func (sm *SnapshotManager) getAllResourcesFromSnapshot(snap cachev3.ResourceSnapshot) map[resourcev3.Type][]types.Resource { + // Only include types that might be manipulated by the generic functions + resources := map[resourcev3.Type][]types.Resource{ + resourcev3.ClusterType: mapToSlice(snap.GetResources(string(resourcev3.ClusterType))), + resourcev3.ListenerType: mapToSlice(snap.GetResources(string(resourcev3.ListenerType))), + // resourcev3.EndpointType: mapToSlice(snap.GetResources(string(resourcev3.EndpointType))), + // resourcev3.SecretType: mapToSlice(snap.GetResources(string(resourcev3.SecretType))), + // resourcev3.RuntimeType: mapToSlice(snap.GetResources(string(resourcev3.RuntimeType))), + // Include other types as needed + } + return resources +} + +// mapToSlice converts a map of named resources to a slice of resources. +func mapToSlice(m map[string]types.Resource) []types.Resource { + out := make([]types.Resource, 0, len(m)) + for _, r := range m { + out = append(out, r) + } + return out +} + +// filterAndCheckResourcesByName filters a slice of resources by name, +// returning the filtered slice and a boolean indicating if the named resource was found. +func filterAndCheckResourcesByName(resources []types.Resource, name string) ([]types.Resource, bool) { + filtered := []types.Resource{} + var found = false + for _, r := range resources { + if namer, ok := r.(interface{ GetName() string }); ok { + if namer.GetName() != name { + filtered = append(filtered, r) + } else { + found = true + } + } else { + // fallback, include unknown type + filtered = append(filtered, r) + } + } + return filtered, found +} diff --git a/internal/snapshot/resource_io.go b/internal/snapshot/resource_io.go new file mode 100644 index 0000000..73e1fef --- /dev/null +++ b/internal/snapshot/resource_io.go @@ -0,0 +1,171 @@ +package snapshot + +import ( + "context" + "encoding/json" + "fmt" + "os" + + clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" + resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "google.golang.org/protobuf/encoding/protojson" + yaml "gopkg.in/yaml.v3" + + internallog "envoy-control-plane/internal/log" +) + +// YamlResources is a helper struct to unmarshal the common Envoy YAML file structure +type YamlResources struct { + Resources []yaml.Node `yaml:"resources"` +} + +// unmarshalYamlNodeToProto takes a generic map representation of a YAML/JSON object +// and unmarshals it into the given Protobuf resource pointer using protojson. +func unmarshalYamlNodeToProto(node map[string]interface{}, resource types.Resource) error { + // 1. Remove the standard Protobuf type marker (if present) before marshaling to JSON. + delete(node, "@type") + + // 2. Marshal the generic map into JSON bytes. + jsonBytes, err := json.Marshal(node) + if err != nil { + return fmt.Errorf("failed to marshal resource node to JSON: %w", err) + } + + // 3. Unmarshal the JSON bytes into the target Protobuf struct. + if err := protojson.Unmarshal(jsonBytes, resource); err != nil { + return fmt.Errorf("failed to unmarshal into proto: %w", err) + } + return nil +} + +// LoadSnapshotFromFile reads a YAML/JSON file, parses it, and returns a map of xDS resources. +func (sm *SnapshotManager) LoadSnapshotFromFile(context context.Context, filePath string) (map[resourcev3.Type][]types.Resource, error) { + log := internallog.LogFromContext(context) + + // Read the file + data, err := os.ReadFile(filePath) + if err != nil { + return nil, fmt.Errorf("failed to read file: %w", err) + } + + var raw interface{} + if err := yaml.Unmarshal(data, &raw); err != nil { + return nil, fmt.Errorf("failed to unmarshal YAML/JSON file %s: %w", filePath, err) + } + + resources := make(map[resourcev3.Type][]types.Resource) + + var walk func(node interface{}) error + walk = func(node interface{}) error { + switch v := node.(type) { + case map[string]interface{}: + if typStr, ok := v["@type"].(string); ok { + typ := resourcev3.Type(typStr) + + // only process known top-level xDS resources + var resource types.Resource + var newResource bool + + switch typ { + case resourcev3.ClusterType: + resource = &clusterv3.Cluster{} + newResource = true + case resourcev3.ListenerType: + resource = &listenerv3.Listener{} + newResource = true + // ... other types ... + default: + log.Warnf("unsupported resource type: %s", typ) + // Skip nested or unsupported types + } + + if newResource { + if err := unmarshalYamlNodeToProto(v, resource); err != nil { + return fmt.Errorf("failed to unmarshal %s from file: %w", typ, err) + } + resources[typ] = append(resources[typ], resource) + } + } + + // recurse into children + for _, child := range v { + if err := walk(child); err != nil { + return err + } + } + + case []interface{}: + for _, item := range v { + if err := walk(item); err != nil { + return err + } + } + } + return nil + } + + if err := walk(raw); err != nil { + return nil, err + } + + return resources, nil +} + +// LoadFilterChainFromYAML unmarshals a YAML string representing an Envoy Listener FilterChain +// configuration into a listenerv3.FilterChain protobuf message using protojson pipeline. +func (sm *SnapshotManager) LoadFilterChainFromYAML(ctx context.Context, yamlStr string) (*listenerv3.FilterChain, error) { + log := internallog.LogFromContext(ctx) + + // 1. Unmarshal YAML into a generic Go map + var rawChainMap map[string]interface{} + if err := yaml.Unmarshal([]byte(yamlStr), &rawChainMap); err != nil { + log.Errorf("Failed to unmarshal YAML: %v", err) + return nil, fmt.Errorf("failed to unmarshal YAML into generic map: %w", err) + } + if rawChainMap == nil { + return nil, fmt.Errorf("failed to unmarshal YAML: input was empty or invalid") + } + + // 2. Unmarshal the generic map into the Protobuf struct using the helper + rawChain := &listenerv3.FilterChain{} + if err := unmarshalYamlNodeToProto(rawChainMap, rawChain); err != nil { + return nil, fmt.Errorf("failed to unmarshal YAML into FilterChain using protojson: %w", err) + } + + // Check if the FilterChain contains any filters (optional but good sanity check) + if len(rawChain.Filters) == 0 { + return nil, fmt.Errorf("filter chain loaded but contains no network filters") + } + + // Return the single FilterChain object. + return rawChain, nil +} + +// SaveSnapshotToFile marshals the current cache snapshot to JSON and writes it to a file. +func (sm *SnapshotManager) SaveSnapshotToFile(filePath string) error { + snap, err := sm.Cache.GetSnapshot(sm.NodeID) + if err != nil { + return err + } + + out := make(map[string][]interface{}) + + // Iterate over all known types + clusterTypeResources := snap.GetResources(resourcev3.ClusterType) + for _, r := range clusterTypeResources { + out[resourcev3.ClusterType] = append(out[resourcev3.ClusterType], r) + } + listenerTypeResources := snap.GetResources(resourcev3.ListenerType) + for _, r := range listenerTypeResources { + out[resourcev3.ListenerType] = append(out[resourcev3.ListenerType], r) + } + + data, err := json.MarshalIndent(out, "", " ") + if err != nil { + return err + } + + return os.WriteFile(filePath, data, 0644) +} diff --git a/internal/snapshot/resource_io_test.go b/internal/snapshot/resource_io_test.go new file mode 100644 index 0000000..829b66b --- /dev/null +++ b/internal/snapshot/resource_io_test.go @@ -0,0 +1,199 @@ +package snapshot + +import ( + "context" + "testing" +) + +// NOTE: Assume MockLogger and SnapshotManager are defined for the test to run. +// The actual implementation of LoadFilterChainFromYAML is assumed to be available +// to the test file. + +// TestLoadFilterChainFromYAML_ComplexInput tests the functionality of LoadFilterChainFromYAML +func TestLoadFilterChainFromYAML_ComplexInput(t *testing.T) { + sm := &SnapshotManager{} + ctx := context.Background() + + // The user's provided, valid YAML for a single FilterChain object + validComplexYAML := ` + filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: ingress_http + codec_type: AUTO + upgrade_configs: + - upgrade_type: websocket + stream_idle_timeout: 0s + normalize_path: true + merge_slashes: true + route_config: + virtual_hosts: + - name: printer_service + domains: ["printer.jerxie.com"] + routes: + - match: { prefix: "/webcam" } + route: { prefix_rewrite: "/", cluster: "_3d_printer_camera", max_stream_duration: {grpc_timeout_header_max: 0s} } + - match: { prefix: "/" } + route: { cluster: "_3d_printer_console"} + http_filters: + - name: envoy.filters.http.oauth2 + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.oauth2.v3.OAuth2 + config: + token_endpoint: + cluster: _auth_server + uri: auth.jerxie.com/token + timeout: 3s + authorization_endpoint: https://auth.jerxie.com/auth + redirect_uri: "%REQ(x-forwarded-proto)%://%REQ(:authority)%/callback" + redirect_path_matcher: + path: + exact: /callback + signout_path: + path: + exact: /signout + forward_bearer_token: true + credentials: + client_id: octoprint-portal + token_secret: + name: token + sds_config: + path: "/etc/envoy/token-secret.yaml" + hmac_secret: + name: hmac + sds_config: + path: "/etc/envoy/hmac-secret.yaml" + auth_scopes: + - openid + - email + - name: envoy.filters.http.jwt_authn + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.jwt_authn.v3.JwtAuthentication + providers: + provider1: + remote_jwks: + http_uri: + uri: "https://auth.jerxie.com/keys" + cluster: _auth_server + timeout: 5s + cache_duration: 600s + from_headers: + - name: Authorization + value_prefix: "Bearer " + payload_in_metadata: jwt_payload + rules: + - match: + prefix: / + requires: + provider_name: provider1 + - name: envoy.filters.http.lua + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.lua.v3.Lua + inline_code: | + email = "" + function envoy_on_request(request_handle) + email = "" + local meta = request_handle:streamInfo():dynamicMetadata() + for key, value in pairs(meta:get("envoy.filters.http.jwt_authn")) do + if key == "jwt_payload" then + for k, v in pairs(value) do + if k == "email" then + print("login octoprint: "..v) + email = v + request_handle:headers():add("ENVOY_AUTHENTICATED_USER", v) + end + end + end + end + end + + function envoy_on_response(response_handle) + if email ~="" and email ~= "axieyangb@gmail.com" then + response_handle:logInfo("Got unauthorized user, return 403 for user " ..email) + response_handle:headers():add("set-cookie", "BearerToken=deleted; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT") + response_handle:headers():add("set-cookie", "OauthHMAC=deleted; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT") + response_handle:headers():add("set-cookie", "IdToken=deleted; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT") + response_handle:headers():add("set-cookie", "OauthExpires=deleted; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT") + end + email = "" + end + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + filter_chain_match: + server_names: ["printer.jerxie.com", "printer.local"] + transport_socket: + name: envoy.transport_sockets.tls + typed_config: + "@type": type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.DownstreamTlsContext + common_tls_context: + tls_certificates: + - certificate_chain: { filename: "/etc/certs/downstream/printer.jerxie.com/fullchain.pem" } + private_key: { filename: "/etc/certs/downstream/printer.jerxie.com/privkey.pem" } +` + + tests := []struct { + name string + yamlStr string + expectError bool + expectedLen int // Expected number of network filters (top-level filters array) + }{ + { + name: "Success_ComplexSingleFilterChain", + yamlStr: validComplexYAML, + expectError: false, + expectedLen: 1, // Only one top-level network filter: http_connection_manager + }, + // Re-include sanity checks for robust testing + { + name: "Error_NoFiltersInChain", + yamlStr: `filter_chain_match: { server_names: ["empty"] }`, + expectError: true, + expectedLen: 0, + }, + { + name: "Error_InputIsAList", + yamlStr: `- filters: []`, + expectError: true, // Should fail unmarshaling a list into a single struct + expectedLen: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + chain, err := sm.LoadFilterChainFromYAML(ctx, tt.yamlStr) + + if tt.expectError { + if err == nil { + t.Errorf("Expected an error but got nil") + } + if chain != nil { + t.Errorf("Expected nil chain on error, but got non-nil") + } + } else { + if err != nil { + t.Fatalf("Expected no error but got: %v", err) + } + if chain == nil { + t.Fatal("Expected non-nil filter chain, but got nil") + } + + // 1. Check top-level filter count + if len(chain.Filters) != tt.expectedLen { + t.Errorf("Top-level filter count mismatch. Got %d, want %d", len(chain.Filters), tt.expectedLen) + } + + // 2. Check a deeply nested value to ensure complex unmarshaling worked + if len(chain.FilterChainMatch.ServerNames) == 0 || chain.FilterChainMatch.ServerNames[0] != "printer.jerxie.com" { + t.Errorf("FilterChainMatch assertion failed. Expected server name 'printer.jerxie.com'") + } + + // 3. Check the name of the top-level filter + if chain.Filters[0].Name != "envoy.filters.network.http_connection_manager" { + t.Errorf("Top-level filter name mismatch. Got %s", chain.Filters[0].Name) + } + } + }) + } +} diff --git a/internal/snapshot_test.go b/internal/snapshot_test.go deleted file mode 100644 index 2c35f96..0000000 --- a/internal/snapshot_test.go +++ /dev/null @@ -1,199 +0,0 @@ -package internal - -import ( - "context" - "testing" -) - -// NOTE: Assume MockLogger and SnapshotManager are defined for the test to run. -// The actual implementation of LoadFilterChainFromYAML is assumed to be available -// to the test file. - -// TestLoadFilterChainFromYAML_ComplexInput tests the functionality of LoadFilterChainFromYAML -func TestLoadFilterChainFromYAML_ComplexInput(t *testing.T) { - sm := &SnapshotManager{} - ctx := context.Background() - - // The user's provided, valid YAML for a single FilterChain object - validComplexYAML := ` - filters: - - name: envoy.filters.network.http_connection_manager - typed_config: - "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager - stat_prefix: ingress_http - codec_type: AUTO - upgrade_configs: - - upgrade_type: websocket - stream_idle_timeout: 0s - normalize_path: true - merge_slashes: true - route_config: - virtual_hosts: - - name: printer_service - domains: ["printer.jerxie.com"] - routes: - - match: { prefix: "/webcam" } - route: { prefix_rewrite: "/", cluster: "_3d_printer_camera", max_stream_duration: {grpc_timeout_header_max: 0s} } - - match: { prefix: "/" } - route: { cluster: "_3d_printer_console"} - http_filters: - - name: envoy.filters.http.oauth2 - typed_config: - "@type": type.googleapis.com/envoy.extensions.filters.http.oauth2.v3.OAuth2 - config: - token_endpoint: - cluster: _auth_server - uri: auth.jerxie.com/token - timeout: 3s - authorization_endpoint: https://auth.jerxie.com/auth - redirect_uri: "%REQ(x-forwarded-proto)%://%REQ(:authority)%/callback" - redirect_path_matcher: - path: - exact: /callback - signout_path: - path: - exact: /signout - forward_bearer_token: true - credentials: - client_id: octoprint-portal - token_secret: - name: token - sds_config: - path: "/etc/envoy/token-secret.yaml" - hmac_secret: - name: hmac - sds_config: - path: "/etc/envoy/hmac-secret.yaml" - auth_scopes: - - openid - - email - - name: envoy.filters.http.jwt_authn - typed_config: - "@type": type.googleapis.com/envoy.extensions.filters.http.jwt_authn.v3.JwtAuthentication - providers: - provider1: - remote_jwks: - http_uri: - uri: "https://auth.jerxie.com/keys" - cluster: _auth_server - timeout: 5s - cache_duration: 600s - from_headers: - - name: Authorization - value_prefix: "Bearer " - payload_in_metadata: jwt_payload - rules: - - match: - prefix: / - requires: - provider_name: provider1 - - name: envoy.filters.http.lua - typed_config: - "@type": type.googleapis.com/envoy.extensions.filters.http.lua.v3.Lua - inline_code: | - email = "" - function envoy_on_request(request_handle) - email = "" - local meta = request_handle:streamInfo():dynamicMetadata() - for key, value in pairs(meta:get("envoy.filters.http.jwt_authn")) do - if key == "jwt_payload" then - for k, v in pairs(value) do - if k == "email" then - print("login octoprint: "..v) - email = v - request_handle:headers():add("ENVOY_AUTHENTICATED_USER", v) - end - end - end - end - end - - function envoy_on_response(response_handle) - if email ~="" and email ~= "axieyangb@gmail.com" then - response_handle:logInfo("Got unauthorized user, return 403 for user " ..email) - response_handle:headers():add("set-cookie", "BearerToken=deleted; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT") - response_handle:headers():add("set-cookie", "OauthHMAC=deleted; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT") - response_handle:headers():add("set-cookie", "IdToken=deleted; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT") - response_handle:headers():add("set-cookie", "OauthExpires=deleted; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT") - end - email = "" - end - - name: envoy.filters.http.router - typed_config: - "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router - filter_chain_match: - server_names: ["printer.jerxie.com", "printer.local"] - transport_socket: - name: envoy.transport_sockets.tls - typed_config: - "@type": type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.DownstreamTlsContext - common_tls_context: - tls_certificates: - - certificate_chain: { filename: "/etc/certs/downstream/printer.jerxie.com/fullchain.pem" } - private_key: { filename: "/etc/certs/downstream/printer.jerxie.com/privkey.pem" } -` - - tests := []struct { - name string - yamlStr string - expectError bool - expectedLen int // Expected number of network filters (top-level filters array) - }{ - { - name: "Success_ComplexSingleFilterChain", - yamlStr: validComplexYAML, - expectError: false, - expectedLen: 1, // Only one top-level network filter: http_connection_manager - }, - // Re-include sanity checks for robust testing - { - name: "Error_NoFiltersInChain", - yamlStr: `filter_chain_match: { server_names: ["empty"] }`, - expectError: true, - expectedLen: 0, - }, - { - name: "Error_InputIsAList", - yamlStr: `- filters: []`, - expectError: true, // Should fail unmarshaling a list into a single struct - expectedLen: 0, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - chain, err := sm.LoadFilterChainFromYAML(ctx, tt.yamlStr) - - if tt.expectError { - if err == nil { - t.Errorf("Expected an error but got nil") - } - if chain != nil { - t.Errorf("Expected nil chain on error, but got non-nil") - } - } else { - if err != nil { - t.Fatalf("Expected no error but got: %v", err) - } - if chain == nil { - t.Fatal("Expected non-nil filter chain, but got nil") - } - - // 1. Check top-level filter count - if len(chain.Filters) != tt.expectedLen { - t.Errorf("Top-level filter count mismatch. Got %d, want %d", len(chain.Filters), tt.expectedLen) - } - - // 2. Check a deeply nested value to ensure complex unmarshaling worked - if len(chain.FilterChainMatch.ServerNames) == 0 || chain.FilterChainMatch.ServerNames[0] != "printer.jerxie.com" { - t.Errorf("FilterChainMatch assertion failed. Expected server name 'printer.jerxie.com'") - } - - // 3. Check the name of the top-level filter - if chain.Filters[0].Name != "envoy.filters.network.http_connection_manager" { - t.Errorf("Top-level filter name mismatch. Got %s", chain.Filters[0].Name) - } - } - }) - } -} diff --git a/internal/storage.go b/internal/storage.go deleted file mode 100644 index ff8139a..0000000 --- a/internal/storage.go +++ /dev/null @@ -1,604 +0,0 @@ -package internal - -import ( - "context" - "database/sql" - "encoding/json" - "fmt" - "strings" - - clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" - listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" - - // routev3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" // REMOVED - - "google.golang.org/protobuf/encoding/protojson" -) - -// Storage abstracts database persistence -type Storage struct { - db *sql.DB - driver string -} - -// DeleteStrategy defines the action to take on missing resources -type DeleteStrategy int - -const ( - // DeleteNone performs only UPSERT for items in the list (default behavior) - DeleteNone DeleteStrategy = iota - // DeleteLogical marks missing resources as disabled (now applicable to clusters and listeners) - DeleteLogical - // DeleteActual removes missing resources physically from the database - DeleteActual -) - -// NewStorage initializes a Storage instance -func NewStorage(db *sql.DB, driver string) *Storage { - return &Storage{db: db, driver: driver} -} - -// placeholder returns correct SQL placeholder based on driver -func (s *Storage) placeholder(n int) string { - if s.driver == "postgres" { - return fmt.Sprintf("$%d", n) - } - return "?" -} - -// InitSchema ensures required tables exist -func (s *Storage) InitSchema(ctx context.Context) error { - var schema string - switch s.driver { - case "postgres": - schema = ` - CREATE TABLE IF NOT EXISTS clusters ( - id SERIAL PRIMARY KEY, - name TEXT UNIQUE NOT NULL, - data JSONB NOT NULL, - enabled BOOLEAN DEFAULT true, - updated_at TIMESTAMP DEFAULT now() - ); - -- REMOVED routes table - CREATE TABLE IF NOT EXISTS listeners ( - id SERIAL PRIMARY KEY, - name TEXT UNIQUE NOT NULL, - data JSONB NOT NULL, - enabled BOOLEAN DEFAULT true, - updated_at TIMESTAMP DEFAULT now() - );` - default: // SQLite - schema = ` - CREATE TABLE IF NOT EXISTS clusters ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT UNIQUE NOT NULL, - data TEXT NOT NULL, - enabled BOOLEAN DEFAULT 1, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ); - -- REMOVED routes table - CREATE TABLE IF NOT EXISTS listeners ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT UNIQUE NOT NULL, - data TEXT NOT NULL, - enabled BOOLEAN DEFAULT 1, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - );` - } - _, err := s.db.ExecContext(ctx, schema) - return err -} - -// SaveCluster inserts or updates a cluster -func (s *Storage) SaveCluster(ctx context.Context, cluster *clusterv3.Cluster) error { - data, err := protojson.Marshal(cluster) - if err != nil { - return err - } - - var query string - switch s.driver { - case "postgres": - // Explicitly set enabled=true on update to re-enable a logically deleted cluster - query = fmt.Sprintf(` - INSERT INTO clusters (name, data, enabled, updated_at) - VALUES (%s, %s, true, now()) - ON CONFLICT (name) DO UPDATE SET data = %s, enabled = true, updated_at = now()`, - s.placeholder(1), s.placeholder(2), s.placeholder(2)) - default: // SQLite - // Explicitly set enabled=1 on update to re-enable a logically deleted cluster - query = ` - INSERT INTO clusters (name, data, enabled, updated_at) - VALUES (?, ?, 1, CURRENT_TIMESTAMP) - ON CONFLICT(name) DO UPDATE SET data=excluded.data, enabled=1, updated_at=CURRENT_TIMESTAMP` - } - - _, err = s.db.ExecContext(ctx, query, cluster.GetName(), string(data)) - return err -} - -// SaveRoute inserts or updates a route // REMOVED -// func (s *Storage) SaveRoute(ctx context.Context, route *routev3.RouteConfiguration) error { -// // ... (route logic removed) -// } - -// SaveListener inserts or updates a listener -func (s *Storage) SaveListener(ctx context.Context, listener *listenerv3.Listener) error { - data, err := protojson.Marshal(listener) - if err != nil { - return err - } - - var query string - switch s.driver { - case "postgres": - // Explicitly set enabled=true on update to re-enable a logically deleted listener - query = fmt.Sprintf(` - INSERT INTO listeners (name, data, enabled, updated_at) - VALUES (%s, %s, true, now()) - ON CONFLICT (name) DO UPDATE SET data = %s, enabled = true, updated_at = now()`, - s.placeholder(1), s.placeholder(2), s.placeholder(2)) - default: // SQLite - // Explicitly set enabled=1 on update to re-enable a logically deleted listener - query = ` - INSERT INTO listeners (name, data, enabled, updated_at) - VALUES (?, ?, 1, CURRENT_TIMESTAMP) - ON CONFLICT(name) DO UPDATE SET data=excluded.data, enabled=1, updated_at=CURRENT_TIMESTAMP` - } - - _, err = s.db.ExecContext(ctx, query, listener.GetName(), string(data)) - return err -} - -// LoadEnabledClusters retrieves all enabled clusters -func (s *Storage) LoadEnabledClusters(ctx context.Context) ([]*clusterv3.Cluster, error) { - query := `SELECT data FROM clusters` - if s.driver == "postgres" { - query += ` WHERE enabled = true` - } else { - query += ` WHERE enabled = 1` - } - - rows, err := s.db.QueryContext(ctx, query) - if err != nil { - return nil, err - } - defer rows.Close() - - var clusters []*clusterv3.Cluster - for rows.Next() { - var raw json.RawMessage - // FIX: Handle type difference between Postgres (JSONB) and SQLite (TEXT) - if s.driver != "postgres" { - var dataStr string - if err := rows.Scan(&dataStr); err != nil { - return nil, err - } - raw = json.RawMessage(dataStr) // Convert string to json.RawMessage - } else { - if err := rows.Scan(&raw); err != nil { - return nil, err - } - } - - var cluster clusterv3.Cluster - if err := protojson.Unmarshal(raw, &cluster); err != nil { - return nil, err - } - clusters = append(clusters, &cluster) - } - return clusters, nil -} - -// LoadAllClusters retrieves all clusters, regardless of their enabled status -func (s *Storage) LoadAllClusters(ctx context.Context) ([]*clusterv3.Cluster, error) { - rows, err := s.db.QueryContext(ctx, `SELECT data FROM clusters`) - if err != nil { - return nil, err - } - defer rows.Close() - - var clusters []*clusterv3.Cluster - for rows.Next() { - var raw json.RawMessage - // FIX: Handle type difference between Postgres (JSONB) and SQLite (TEXT) - if s.driver != "postgres" { - var dataStr string - if err := rows.Scan(&dataStr); err != nil { - return nil, err - } - raw = json.RawMessage(dataStr) // Convert string to json.RawMessage - } else { - if err := rows.Scan(&raw); err != nil { - return nil, err - } - } - - var cluster clusterv3.Cluster - if err := protojson.Unmarshal(raw, &cluster); err != nil { - return nil, err - } - clusters = append(clusters, &cluster) - } - return clusters, nil -} - -// LoadEnabledRoutes retrieves all enabled routes // REMOVED -// func (s *Storage) LoadEnabledRoutes(ctx context.Context) ([]*routev3.RouteConfiguration, error) { -// // ... (route logic removed) -// } - -// LoadAllRoutes retrieves all routes, regardless of their enabled status // REMOVED -// func (s *Storage) LoadAllRoutes(ctx context.Context) ([]*routev3.RouteConfiguration, error) { -// // ... (route logic removed) -// } - -// LoadEnabledListeners retrieves all enabled listeners -func (s *Storage) LoadEnabledListeners(ctx context.Context) ([]*listenerv3.Listener, error) { - query := `SELECT data FROM listeners` - if s.driver == "postgres" { - query += ` WHERE enabled = true` - } else { - query += ` WHERE enabled = 1` - } - - rows, err := s.db.QueryContext(ctx, query) - if err != nil { - return nil, err - } - defer rows.Close() - - var listeners []*listenerv3.Listener - for rows.Next() { - var raw json.RawMessage - // FIX: Handle type difference between Postgres (JSONB) and SQLite (TEXT) - if s.driver != "postgres" { - var dataStr string - if err := rows.Scan(&dataStr); err != nil { - return nil, err - } - raw = json.RawMessage(dataStr) // Convert string to json.RawMessage - } else { - if err := rows.Scan(&raw); err != nil { - return nil, err - } - } - - var l listenerv3.Listener - if err := protojson.Unmarshal(raw, &l); err != nil { - return nil, err - } - listeners = append(listeners, &l) - } - return listeners, nil -} - -// LoadAllListeners retrieves all listeners, regardless of their enabled status -func (s *Storage) LoadAllListeners(ctx context.Context) ([]*listenerv3.Listener, error) { - rows, err := s.db.QueryContext(ctx, `SELECT data FROM listeners`) - if err != nil { - return nil, err - } - defer rows.Close() - - var listeners []*listenerv3.Listener - for rows.Next() { - var raw json.RawMessage - // FIX: Handle type difference between Postgres (JSONB) and SQLite (TEXT) - if s.driver != "postgres" { - var dataStr string - if err := rows.Scan(&dataStr); err != nil { - return nil, err - } - raw = json.RawMessage(dataStr) // Convert string to json.RawMessage - } else { - if err := rows.Scan(&raw); err != nil { - return nil, err - } - } - - var l listenerv3.Listener - if err := protojson.Unmarshal(raw, &l); err != nil { - return nil, err - } - listeners = append(listeners, &l) - } - return listeners, nil -} - -// RebuildSnapshot rebuilds full snapshot from DB -func (s *Storage) RebuildSnapshot(ctx context.Context) (*SnapshotConfig, error) { - // 1. Load Enabled Resources (for xDS serving) - enabledClusters, err := s.LoadEnabledClusters(ctx) - if err != nil { - return nil, err - } - // enabledRoutes, err := s.LoadEnabledRoutes(ctx) // REMOVED - // if err != nil { - // return nil, err - // } - enabledListeners, err := s.LoadEnabledListeners(ctx) - if err != nil { - return nil, err - } - - // 2. Load ALL Resources (for comparison and disabled set) - allClusters, err := s.LoadAllClusters(ctx) - if err != nil { - return nil, err - } - // allRoutes, err := s.LoadAllRoutes(ctx) // REMOVED - // if err != nil { - // return nil, err - // } - allListeners, err := s.LoadAllListeners(ctx) - if err != nil { - return nil, err - } - - // 3. Separate Disabled Resources - - // Clusters - enabledClusterNames := make(map[string]struct{}, len(enabledClusters)) - for _, c := range enabledClusters { - enabledClusterNames[c.GetName()] = struct{}{} - } - var disabledClusters []*clusterv3.Cluster - for _, c := range allClusters { - if _, found := enabledClusterNames[c.GetName()]; !found { - disabledClusters = append(disabledClusters, c) - } - } - - // Routes // REMOVED - // enabledRouteNames := make(map[string]struct{}, 0) - // var disabledRoutes []*routev3.RouteConfiguration - // for _, r := range allRoutes { - // if _, found := enabledRouteNames[r.GetName()]; !found { - // disabledRoutes = append(disabledRoutes, r) - // } - // } - - // Listeners - enabledListenerNames := make(map[string]struct{}, len(enabledListeners)) - for _, l := range enabledListeners { - enabledListenerNames[l.GetName()] = struct{}{} - } - var disabledListeners []*listenerv3.Listener - for _, l := range allListeners { - if _, found := enabledListenerNames[l.GetName()]; !found { - disabledListeners = append(disabledListeners, l) - } - } - - return &SnapshotConfig{ - EnabledClusters: enabledClusters, - // EnabledRoutes: nil, // REMOVED - EnabledListeners: enabledListeners, - DisabledClusters: disabledClusters, - // DisabledRoutes: nil, // REMOVED - DisabledListeners: disabledListeners, - }, nil -} - -// SnapshotConfig aggregates xDS resources -type SnapshotConfig struct { - // Enabled resources (for xDS serving) - EnabledClusters []*clusterv3.Cluster - // EnabledRoutes []*routev3.RouteConfiguration // REMOVED - EnabledListeners []*listenerv3.Listener - - // Disabled resources (for UI display) - DisabledClusters []*clusterv3.Cluster - // DisabledRoutes []*routev3.RouteConfiguration // REMOVED - DisabledListeners []*listenerv3.Listener -} - -// EnableCluster toggles a cluster -func (s *Storage) EnableCluster(ctx context.Context, name string, enabled bool) error { - query := `UPDATE clusters SET enabled = ?, updated_at = CURRENT_TIMESTAMP WHERE name = ?` - if s.driver == "postgres" { - query = `UPDATE clusters SET enabled = $1, updated_at = now() WHERE name = $2` - } - _, err := s.db.ExecContext(ctx, query, enabled, name) - return err -} - -// EnableRoute toggles a route // REMOVED -// func (s *Storage) EnableRoute(ctx context.Context, name string, enabled bool) error { -// // ... (route logic removed) -// } - -// EnableListener toggles a listener -func (s *Storage) EnableListener(ctx context.Context, name string, enabled bool) error { - query := `UPDATE listeners SET enabled = ?, updated_at = CURRENT_TIMESTAMP WHERE name = ?` - if s.driver == "postgres" { - query = `UPDATE listeners SET enabled = $1, updated_at = now() WHERE name = $2` - } - _, err := s.db.ExecContext(ctx, query, enabled, name) - return err -} - -// disableMissingResources updates the 'enabled' status for resources in 'table' -// whose 'name' is NOT in 'inputNames'. -func (s *Storage) disableMissingResources(ctx context.Context, table string, inputNames []string) error { - if table != "clusters" && table != "listeners" { // CHECK UPDATED - return fmt.Errorf("logical delete (disable) is only supported for tables with an 'enabled' column (clusters, listeners)") - } - - // 1. Build placeholders and args - placeholders := make([]string, len(inputNames)) - args := make([]interface{}, len(inputNames)) - for i, name := range inputNames { - if s.driver == "postgres" { - placeholders[i] = fmt.Sprintf("$%d", i+1) - } else { - placeholders[i] = "?" - } - args[i] = name - } - - disabledValue := "false" - if s.driver != "postgres" { - disabledValue = "0" - } - - var updateTime string - if s.driver == "postgres" { - updateTime = "now()" - } else { - updateTime = "CURRENT_TIMESTAMP" - } - - // If no names are provided, disable ALL currently enabled resources - whereClause := "" - if len(inputNames) > 0 { - whereClause = fmt.Sprintf("WHERE name NOT IN (%s)", strings.Join(placeholders, ", ")) - } - - // 2. Construct and execute the UPDATE query - query := fmt.Sprintf(` - UPDATE %s - SET enabled = %s, updated_at = %s - %s`, - table, disabledValue, updateTime, whereClause) - - _, err := s.db.ExecContext(ctx, query, args...) - return err -} - -// deleteMissingResources physically deletes resources from 'table' whose 'name' is NOT in 'inputNames'. -func (s *Storage) deleteMissingResources(ctx context.Context, table string, inputNames []string) error { - if table != "clusters" && table != "listeners" { // CHECK UPDATED - return fmt.Errorf("physical delete is only supported for tables: clusters, listeners") - } - - // 1. Build placeholders and args - placeholders := make([]string, len(inputNames)) - args := make([]interface{}, len(inputNames)) - for i, name := range inputNames { - if s.driver == "postgres" { - placeholders[i] = fmt.Sprintf("$%d", i+1) - } else { - placeholders[i] = "?" - } - args[i] = name - } - - // If no names are provided, delete ALL resources - whereClause := "" - if len(inputNames) > 0 { - whereClause = fmt.Sprintf("WHERE name NOT IN (%s)", strings.Join(placeholders, ", ")) - } - - // 2. Construct and execute the DELETE query - query := fmt.Sprintf(` - DELETE FROM %s - %s`, - table, whereClause) - - _, err := s.db.ExecContext(ctx, query, args...) - return err -} - -func (s *Storage) SaveSnapshot(ctx context.Context, cfg *SnapshotConfig, strategy DeleteStrategy) error { - if cfg == nil { - return fmt.Errorf("SnapshotConfig is nil") - } - - // Use a transaction for atomicity - tx, err := s.db.BeginTx(ctx, nil) - if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) - } - defer func() { - if err != nil { - tx.Rollback() - return - } - err = tx.Commit() - }() - - // Note: Only Enabledxxx resources are UPSERTED. Disabledxxx resources are - // left alone unless the deletion strategy removes them. - - // --- 1. Save/Upsert Clusters and Collect Names --- - clusterNames := make([]string, 0, len(cfg.EnabledClusters)) - for _, c := range cfg.EnabledClusters { - if err = s.SaveCluster(ctx, c); err != nil { - return fmt.Errorf("failed to save cluster %s: %w", c.GetName(), err) - } - clusterNames = append(clusterNames, c.GetName()) - } - - // --- 2. Save/Upsert Routes and Collect Names --- // REMOVED - // routeNames := make([]string, 0, len(cfg.EnabledRoutes)) - // for _, r := range cfg.EnabledRoutes { - // if err = s.SaveRoute(ctx, r); err != nil { - // return fmt.Errorf("failed to save route %s: %w", r.GetName(), err) - // } - // routeNames = append(routeNames, r.GetName()) - // } - - // --- 3. Save/Upsert Listeners and Collect Names --- - listenerNames := make([]string, 0, len(cfg.EnabledListeners)) - for _, l := range cfg.EnabledListeners { - if err = s.SaveListener(ctx, l); err != nil { - return fmt.Errorf("failed to save listener %s: %w", l.GetName(), err) - } - listenerNames = append(listenerNames, l.GetName()) - } - - // --- 4. Apply Deletion Strategy --- - switch strategy { - case DeleteLogical: - // Logical Delete (Disable) for all resource types: marks resources NOT in the current enabled list as disabled - if err = s.disableMissingResources(ctx, "clusters", clusterNames); err != nil { - return fmt.Errorf("failed to logically delete missing clusters: %w", err) - } - // if err = s.disableMissingResources(ctx, "routes", routeNames); err != nil { // REMOVED - // return fmt.Errorf("failed to logically delete missing routes: %w", err) - // } - if err = s.disableMissingResources(ctx, "listeners", listenerNames); err != nil { - return fmt.Errorf("failed to logically delete missing listeners: %w", err) - } - - case DeleteActual: - // Actual Delete (Physical Removal) for all resources: removes resources NOT in the current enabled list - if err = s.deleteMissingResources(ctx, "clusters", clusterNames); err != nil { - return fmt.Errorf("failed to physically delete missing clusters: %w", err) - } - // if err = s.deleteMissingResources(ctx, "routes", routeNames); err != nil { // REMOVED - // return fmt.Errorf("failed to physically delete missing routes: %w", err) - // } - if err = s.deleteMissingResources(ctx, "listeners", listenerNames); err != nil { - return fmt.Errorf("failed to physically delete missing listeners: %w", err) - } - - case DeleteNone: - // Do nothing for missing resources - return nil - } - - return err -} - -// RemoveListener deletes a listener by name -func (s *Storage) RemoveListener(ctx context.Context, name string) error { - query := `DELETE FROM listeners WHERE name = ?` - if s.driver == "postgres" { - query = `DELETE FROM listeners WHERE name = $1` - } - _, err := s.db.ExecContext(ctx, query, name) - return err -} - -// RemoveCluster deletes a cluster by name -func (s *Storage) RemoveCluster(ctx context.Context, name string) error { - query := `DELETE FROM clusters WHERE name = ?` - if s.driver == "postgres" { - query = `DELETE FROM clusters WHERE name = $1` - } - _, err := s.db.ExecContext(ctx, query, name) - return err -} diff --git a/internal/storage/storage.go b/internal/storage/storage.go new file mode 100644 index 0000000..4d44d4e --- /dev/null +++ b/internal/storage/storage.go @@ -0,0 +1,604 @@ +package storage + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "strings" + + clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + + // routev3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" // REMOVED + + "google.golang.org/protobuf/encoding/protojson" +) + +// Storage abstracts database persistence +type Storage struct { + db *sql.DB + driver string +} + +// DeleteStrategy defines the action to take on missing resources +type DeleteStrategy int + +const ( + // DeleteNone performs only UPSERT for items in the list (default behavior) + DeleteNone DeleteStrategy = iota + // DeleteLogical marks missing resources as disabled (now applicable to clusters and listeners) + DeleteLogical + // DeleteActual removes missing resources physically from the database + DeleteActual +) + +// NewStorage initializes a Storage instance +func NewStorage(db *sql.DB, driver string) *Storage { + return &Storage{db: db, driver: driver} +} + +// placeholder returns correct SQL placeholder based on driver +func (s *Storage) placeholder(n int) string { + if s.driver == "postgres" { + return fmt.Sprintf("$%d", n) + } + return "?" +} + +// InitSchema ensures required tables exist +func (s *Storage) InitSchema(ctx context.Context) error { + var schema string + switch s.driver { + case "postgres": + schema = ` + CREATE TABLE IF NOT EXISTS clusters ( + id SERIAL PRIMARY KEY, + name TEXT UNIQUE NOT NULL, + data JSONB NOT NULL, + enabled BOOLEAN DEFAULT true, + updated_at TIMESTAMP DEFAULT now() + ); + -- REMOVED routes table + CREATE TABLE IF NOT EXISTS listeners ( + id SERIAL PRIMARY KEY, + name TEXT UNIQUE NOT NULL, + data JSONB NOT NULL, + enabled BOOLEAN DEFAULT true, + updated_at TIMESTAMP DEFAULT now() + );` + default: // SQLite + schema = ` + CREATE TABLE IF NOT EXISTS clusters ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT UNIQUE NOT NULL, + data TEXT NOT NULL, + enabled BOOLEAN DEFAULT 1, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); + -- REMOVED routes table + CREATE TABLE IF NOT EXISTS listeners ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT UNIQUE NOT NULL, + data TEXT NOT NULL, + enabled BOOLEAN DEFAULT 1, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + );` + } + _, err := s.db.ExecContext(ctx, schema) + return err +} + +// SaveCluster inserts or updates a cluster +func (s *Storage) SaveCluster(ctx context.Context, cluster *clusterv3.Cluster) error { + data, err := protojson.Marshal(cluster) + if err != nil { + return err + } + + var query string + switch s.driver { + case "postgres": + // Explicitly set enabled=true on update to re-enable a logically deleted cluster + query = fmt.Sprintf(` + INSERT INTO clusters (name, data, enabled, updated_at) + VALUES (%s, %s, true, now()) + ON CONFLICT (name) DO UPDATE SET data = %s, enabled = true, updated_at = now()`, + s.placeholder(1), s.placeholder(2), s.placeholder(2)) + default: // SQLite + // Explicitly set enabled=1 on update to re-enable a logically deleted cluster + query = ` + INSERT INTO clusters (name, data, enabled, updated_at) + VALUES (?, ?, 1, CURRENT_TIMESTAMP) + ON CONFLICT(name) DO UPDATE SET data=excluded.data, enabled=1, updated_at=CURRENT_TIMESTAMP` + } + + _, err = s.db.ExecContext(ctx, query, cluster.GetName(), string(data)) + return err +} + +// SaveRoute inserts or updates a route // REMOVED +// func (s *Storage) SaveRoute(ctx context.Context, route *routev3.RouteConfiguration) error { +// // ... (route logic removed) +// } + +// SaveListener inserts or updates a listener +func (s *Storage) SaveListener(ctx context.Context, listener *listenerv3.Listener) error { + data, err := protojson.Marshal(listener) + if err != nil { + return err + } + + var query string + switch s.driver { + case "postgres": + // Explicitly set enabled=true on update to re-enable a logically deleted listener + query = fmt.Sprintf(` + INSERT INTO listeners (name, data, enabled, updated_at) + VALUES (%s, %s, true, now()) + ON CONFLICT (name) DO UPDATE SET data = %s, enabled = true, updated_at = now()`, + s.placeholder(1), s.placeholder(2), s.placeholder(2)) + default: // SQLite + // Explicitly set enabled=1 on update to re-enable a logically deleted listener + query = ` + INSERT INTO listeners (name, data, enabled, updated_at) + VALUES (?, ?, 1, CURRENT_TIMESTAMP) + ON CONFLICT(name) DO UPDATE SET data=excluded.data, enabled=1, updated_at=CURRENT_TIMESTAMP` + } + + _, err = s.db.ExecContext(ctx, query, listener.GetName(), string(data)) + return err +} + +// LoadEnabledClusters retrieves all enabled clusters +func (s *Storage) LoadEnabledClusters(ctx context.Context) ([]*clusterv3.Cluster, error) { + query := `SELECT data FROM clusters` + if s.driver == "postgres" { + query += ` WHERE enabled = true` + } else { + query += ` WHERE enabled = 1` + } + + rows, err := s.db.QueryContext(ctx, query) + if err != nil { + return nil, err + } + defer rows.Close() + + var clusters []*clusterv3.Cluster + for rows.Next() { + var raw json.RawMessage + // FIX: Handle type difference between Postgres (JSONB) and SQLite (TEXT) + if s.driver != "postgres" { + var dataStr string + if err := rows.Scan(&dataStr); err != nil { + return nil, err + } + raw = json.RawMessage(dataStr) // Convert string to json.RawMessage + } else { + if err := rows.Scan(&raw); err != nil { + return nil, err + } + } + + var cluster clusterv3.Cluster + if err := protojson.Unmarshal(raw, &cluster); err != nil { + return nil, err + } + clusters = append(clusters, &cluster) + } + return clusters, nil +} + +// LoadAllClusters retrieves all clusters, regardless of their enabled status +func (s *Storage) LoadAllClusters(ctx context.Context) ([]*clusterv3.Cluster, error) { + rows, err := s.db.QueryContext(ctx, `SELECT data FROM clusters`) + if err != nil { + return nil, err + } + defer rows.Close() + + var clusters []*clusterv3.Cluster + for rows.Next() { + var raw json.RawMessage + // FIX: Handle type difference between Postgres (JSONB) and SQLite (TEXT) + if s.driver != "postgres" { + var dataStr string + if err := rows.Scan(&dataStr); err != nil { + return nil, err + } + raw = json.RawMessage(dataStr) // Convert string to json.RawMessage + } else { + if err := rows.Scan(&raw); err != nil { + return nil, err + } + } + + var cluster clusterv3.Cluster + if err := protojson.Unmarshal(raw, &cluster); err != nil { + return nil, err + } + clusters = append(clusters, &cluster) + } + return clusters, nil +} + +// LoadEnabledRoutes retrieves all enabled routes // REMOVED +// func (s *Storage) LoadEnabledRoutes(ctx context.Context) ([]*routev3.RouteConfiguration, error) { +// // ... (route logic removed) +// } + +// LoadAllRoutes retrieves all routes, regardless of their enabled status // REMOVED +// func (s *Storage) LoadAllRoutes(ctx context.Context) ([]*routev3.RouteConfiguration, error) { +// // ... (route logic removed) +// } + +// LoadEnabledListeners retrieves all enabled listeners +func (s *Storage) LoadEnabledListeners(ctx context.Context) ([]*listenerv3.Listener, error) { + query := `SELECT data FROM listeners` + if s.driver == "postgres" { + query += ` WHERE enabled = true` + } else { + query += ` WHERE enabled = 1` + } + + rows, err := s.db.QueryContext(ctx, query) + if err != nil { + return nil, err + } + defer rows.Close() + + var listeners []*listenerv3.Listener + for rows.Next() { + var raw json.RawMessage + // FIX: Handle type difference between Postgres (JSONB) and SQLite (TEXT) + if s.driver != "postgres" { + var dataStr string + if err := rows.Scan(&dataStr); err != nil { + return nil, err + } + raw = json.RawMessage(dataStr) // Convert string to json.RawMessage + } else { + if err := rows.Scan(&raw); err != nil { + return nil, err + } + } + + var l listenerv3.Listener + if err := protojson.Unmarshal(raw, &l); err != nil { + return nil, err + } + listeners = append(listeners, &l) + } + return listeners, nil +} + +// LoadAllListeners retrieves all listeners, regardless of their enabled status +func (s *Storage) LoadAllListeners(ctx context.Context) ([]*listenerv3.Listener, error) { + rows, err := s.db.QueryContext(ctx, `SELECT data FROM listeners`) + if err != nil { + return nil, err + } + defer rows.Close() + + var listeners []*listenerv3.Listener + for rows.Next() { + var raw json.RawMessage + // FIX: Handle type difference between Postgres (JSONB) and SQLite (TEXT) + if s.driver != "postgres" { + var dataStr string + if err := rows.Scan(&dataStr); err != nil { + return nil, err + } + raw = json.RawMessage(dataStr) // Convert string to json.RawMessage + } else { + if err := rows.Scan(&raw); err != nil { + return nil, err + } + } + + var l listenerv3.Listener + if err := protojson.Unmarshal(raw, &l); err != nil { + return nil, err + } + listeners = append(listeners, &l) + } + return listeners, nil +} + +// RebuildSnapshot rebuilds full snapshot from DB +func (s *Storage) RebuildSnapshot(ctx context.Context) (*SnapshotConfig, error) { + // 1. Load Enabled Resources (for xDS serving) + enabledClusters, err := s.LoadEnabledClusters(ctx) + if err != nil { + return nil, err + } + // enabledRoutes, err := s.LoadEnabledRoutes(ctx) // REMOVED + // if err != nil { + // return nil, err + // } + enabledListeners, err := s.LoadEnabledListeners(ctx) + if err != nil { + return nil, err + } + + // 2. Load ALL Resources (for comparison and disabled set) + allClusters, err := s.LoadAllClusters(ctx) + if err != nil { + return nil, err + } + // allRoutes, err := s.LoadAllRoutes(ctx) // REMOVED + // if err != nil { + // return nil, err + // } + allListeners, err := s.LoadAllListeners(ctx) + if err != nil { + return nil, err + } + + // 3. Separate Disabled Resources + + // Clusters + enabledClusterNames := make(map[string]struct{}, len(enabledClusters)) + for _, c := range enabledClusters { + enabledClusterNames[c.GetName()] = struct{}{} + } + var disabledClusters []*clusterv3.Cluster + for _, c := range allClusters { + if _, found := enabledClusterNames[c.GetName()]; !found { + disabledClusters = append(disabledClusters, c) + } + } + + // Routes // REMOVED + // enabledRouteNames := make(map[string]struct{}, 0) + // var disabledRoutes []*routev3.RouteConfiguration + // for _, r := range allRoutes { + // if _, found := enabledRouteNames[r.GetName()]; !found { + // disabledRoutes = append(disabledRoutes, r) + // } + // } + + // Listeners + enabledListenerNames := make(map[string]struct{}, len(enabledListeners)) + for _, l := range enabledListeners { + enabledListenerNames[l.GetName()] = struct{}{} + } + var disabledListeners []*listenerv3.Listener + for _, l := range allListeners { + if _, found := enabledListenerNames[l.GetName()]; !found { + disabledListeners = append(disabledListeners, l) + } + } + + return &SnapshotConfig{ + EnabledClusters: enabledClusters, + // EnabledRoutes: nil, // REMOVED + EnabledListeners: enabledListeners, + DisabledClusters: disabledClusters, + // DisabledRoutes: nil, // REMOVED + DisabledListeners: disabledListeners, + }, nil +} + +// SnapshotConfig aggregates xDS resources +type SnapshotConfig struct { + // Enabled resources (for xDS serving) + EnabledClusters []*clusterv3.Cluster + // EnabledRoutes []*routev3.RouteConfiguration // REMOVED + EnabledListeners []*listenerv3.Listener + + // Disabled resources (for UI display) + DisabledClusters []*clusterv3.Cluster + // DisabledRoutes []*routev3.RouteConfiguration // REMOVED + DisabledListeners []*listenerv3.Listener +} + +// EnableCluster toggles a cluster +func (s *Storage) EnableCluster(ctx context.Context, name string, enabled bool) error { + query := `UPDATE clusters SET enabled = ?, updated_at = CURRENT_TIMESTAMP WHERE name = ?` + if s.driver == "postgres" { + query = `UPDATE clusters SET enabled = $1, updated_at = now() WHERE name = $2` + } + _, err := s.db.ExecContext(ctx, query, enabled, name) + return err +} + +// EnableRoute toggles a route // REMOVED +// func (s *Storage) EnableRoute(ctx context.Context, name string, enabled bool) error { +// // ... (route logic removed) +// } + +// EnableListener toggles a listener +func (s *Storage) EnableListener(ctx context.Context, name string, enabled bool) error { + query := `UPDATE listeners SET enabled = ?, updated_at = CURRENT_TIMESTAMP WHERE name = ?` + if s.driver == "postgres" { + query = `UPDATE listeners SET enabled = $1, updated_at = now() WHERE name = $2` + } + _, err := s.db.ExecContext(ctx, query, enabled, name) + return err +} + +// disableMissingResources updates the 'enabled' status for resources in 'table' +// whose 'name' is NOT in 'inputNames'. +func (s *Storage) disableMissingResources(ctx context.Context, table string, inputNames []string) error { + if table != "clusters" && table != "listeners" { // CHECK UPDATED + return fmt.Errorf("logical delete (disable) is only supported for tables with an 'enabled' column (clusters, listeners)") + } + + // 1. Build placeholders and args + placeholders := make([]string, len(inputNames)) + args := make([]interface{}, len(inputNames)) + for i, name := range inputNames { + if s.driver == "postgres" { + placeholders[i] = fmt.Sprintf("$%d", i+1) + } else { + placeholders[i] = "?" + } + args[i] = name + } + + disabledValue := "false" + if s.driver != "postgres" { + disabledValue = "0" + } + + var updateTime string + if s.driver == "postgres" { + updateTime = "now()" + } else { + updateTime = "CURRENT_TIMESTAMP" + } + + // If no names are provided, disable ALL currently enabled resources + whereClause := "" + if len(inputNames) > 0 { + whereClause = fmt.Sprintf("WHERE name NOT IN (%s)", strings.Join(placeholders, ", ")) + } + + // 2. Construct and execute the UPDATE query + query := fmt.Sprintf(` + UPDATE %s + SET enabled = %s, updated_at = %s + %s`, + table, disabledValue, updateTime, whereClause) + + _, err := s.db.ExecContext(ctx, query, args...) + return err +} + +// deleteMissingResources physically deletes resources from 'table' whose 'name' is NOT in 'inputNames'. +func (s *Storage) deleteMissingResources(ctx context.Context, table string, inputNames []string) error { + if table != "clusters" && table != "listeners" { // CHECK UPDATED + return fmt.Errorf("physical delete is only supported for tables: clusters, listeners") + } + + // 1. Build placeholders and args + placeholders := make([]string, len(inputNames)) + args := make([]interface{}, len(inputNames)) + for i, name := range inputNames { + if s.driver == "postgres" { + placeholders[i] = fmt.Sprintf("$%d", i+1) + } else { + placeholders[i] = "?" + } + args[i] = name + } + + // If no names are provided, delete ALL resources + whereClause := "" + if len(inputNames) > 0 { + whereClause = fmt.Sprintf("WHERE name NOT IN (%s)", strings.Join(placeholders, ", ")) + } + + // 2. Construct and execute the DELETE query + query := fmt.Sprintf(` + DELETE FROM %s + %s`, + table, whereClause) + + _, err := s.db.ExecContext(ctx, query, args...) + return err +} + +func (s *Storage) SaveSnapshot(ctx context.Context, cfg *SnapshotConfig, strategy DeleteStrategy) error { + if cfg == nil { + return fmt.Errorf("SnapshotConfig is nil") + } + + // Use a transaction for atomicity + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + defer func() { + if err != nil { + tx.Rollback() + return + } + err = tx.Commit() + }() + + // Note: Only Enabledxxx resources are UPSERTED. Disabledxxx resources are + // left alone unless the deletion strategy removes them. + + // --- 1. Save/Upsert Clusters and Collect Names --- + clusterNames := make([]string, 0, len(cfg.EnabledClusters)) + for _, c := range cfg.EnabledClusters { + if err = s.SaveCluster(ctx, c); err != nil { + return fmt.Errorf("failed to save cluster %s: %w", c.GetName(), err) + } + clusterNames = append(clusterNames, c.GetName()) + } + + // --- 2. Save/Upsert Routes and Collect Names --- // REMOVED + // routeNames := make([]string, 0, len(cfg.EnabledRoutes)) + // for _, r := range cfg.EnabledRoutes { + // if err = s.SaveRoute(ctx, r); err != nil { + // return fmt.Errorf("failed to save route %s: %w", r.GetName(), err) + // } + // routeNames = append(routeNames, r.GetName()) + // } + + // --- 3. Save/Upsert Listeners and Collect Names --- + listenerNames := make([]string, 0, len(cfg.EnabledListeners)) + for _, l := range cfg.EnabledListeners { + if err = s.SaveListener(ctx, l); err != nil { + return fmt.Errorf("failed to save listener %s: %w", l.GetName(), err) + } + listenerNames = append(listenerNames, l.GetName()) + } + + // --- 4. Apply Deletion Strategy --- + switch strategy { + case DeleteLogical: + // Logical Delete (Disable) for all resource types: marks resources NOT in the current enabled list as disabled + if err = s.disableMissingResources(ctx, "clusters", clusterNames); err != nil { + return fmt.Errorf("failed to logically delete missing clusters: %w", err) + } + // if err = s.disableMissingResources(ctx, "routes", routeNames); err != nil { // REMOVED + // return fmt.Errorf("failed to logically delete missing routes: %w", err) + // } + if err = s.disableMissingResources(ctx, "listeners", listenerNames); err != nil { + return fmt.Errorf("failed to logically delete missing listeners: %w", err) + } + + case DeleteActual: + // Actual Delete (Physical Removal) for all resources: removes resources NOT in the current enabled list + if err = s.deleteMissingResources(ctx, "clusters", clusterNames); err != nil { + return fmt.Errorf("failed to physically delete missing clusters: %w", err) + } + // if err = s.deleteMissingResources(ctx, "routes", routeNames); err != nil { // REMOVED + // return fmt.Errorf("failed to physically delete missing routes: %w", err) + // } + if err = s.deleteMissingResources(ctx, "listeners", listenerNames); err != nil { + return fmt.Errorf("failed to physically delete missing listeners: %w", err) + } + + case DeleteNone: + // Do nothing for missing resources + return nil + } + + return err +} + +// RemoveListener deletes a listener by name +func (s *Storage) RemoveListener(ctx context.Context, name string) error { + query := `DELETE FROM listeners WHERE name = ?` + if s.driver == "postgres" { + query = `DELETE FROM listeners WHERE name = $1` + } + _, err := s.db.ExecContext(ctx, query, name) + return err +} + +// RemoveCluster deletes a cluster by name +func (s *Storage) RemoveCluster(ctx context.Context, name string) error { + query := `DELETE FROM clusters WHERE name = ?` + if s.driver == "postgres" { + query = `DELETE FROM clusters WHERE name = $1` + } + _, err := s.db.ExecContext(ctx, query, name) + return err +} diff --git a/main.go b/main.go index dc2453d..e50591e 100644 --- a/main.go +++ b/main.go @@ -22,6 +22,8 @@ "envoy-control-plane/internal" internallog "envoy-control-plane/internal/log" + "envoy-control-plane/internal/snapshot" + internalstorage "envoy-control-plane/internal/storage" ) var ( @@ -59,7 +61,7 @@ } // loadConfigFiles now accepts and uses a context -func loadConfigFiles(ctx context.Context, manager *internal.SnapshotManager, dir string) error { +func loadConfigFiles(ctx context.Context, manager *snapshot.SnapshotManager, dir string) error { log := internallog.LogFromContext(ctx) // Use the logger from context log.Infof("loading configuration files from directory: %s", dir) @@ -145,7 +147,7 @@ // internal.NewStorage likely needs to be updated to accept a logger as well // if its methods don't accept context, but we will pass context to its methods below. - storage := internal.NewStorage(db, dbDriver) + storage := internalstorage.NewStorage(db, dbDriver) // Pass the context with the logger down if err := storage.InitSchema(ctx); err != nil { log.Errorf("failed to initialize DB schema: %v", err) @@ -159,7 +161,7 @@ // logger behavior you previously set up) as a bridge, since it was initialized // to log.NewDefaultLogger(). cache := cachev3.NewSnapshotCache(false, cachev3.IDHash{}, logger) - manager := internal.NewSnapshotManager(cache, nodeID, storage) + manager := snapshot.NewSnapshotManager(cache, nodeID, storage) loadedConfigs := false @@ -210,7 +212,7 @@ os.Exit(1) } // Pass the context with the logger down - if err := storage.SaveSnapshot(ctx, snapCfg, internal.DeleteLogical); err != nil { + if err := storage.SaveSnapshot(ctx, snapCfg, internalstorage.DeleteLogical); err != nil { log.Errorf("failed to save initial snapshot into DB: %v", err) os.Exit(1) }