Newer
Older
EnvoyControlPlane / internal / snapshot.go
package internal

import (
	"context"
	"encoding/json"
	"fmt"
	"os"
	"time"

	clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
	endpointv3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
	listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
	secretv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"

	// Ensure all standard filters are imported for proto unmarshalling
	_ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/jwt_authn/v3"
	_ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/lua/v3"
	_ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/oauth2/v3"
	_ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
	_ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/listener/tls_inspector/v3"
	_ "github.com/envoyproxy/go-control-plane/envoy/service/runtime/v3"

	"github.com/envoyproxy/go-control-plane/pkg/cache/types"
	cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
	resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
	"google.golang.org/protobuf/encoding/protojson"
	yaml "gopkg.in/yaml.v3"

	internalapi "envoy-control-plane/internal/api"
)

// ResourceNamer is an interface implemented by all xDS resources with a GetName() method.
type ResourceNamer interface {
	GetName() string
}

// SnapshotManager wraps a SnapshotCache and provides file loading/modifying
type SnapshotManager struct {
	Cache  cachev3.SnapshotCache
	NodeID string
	DB     *Storage // Assuming Storage is defined elsewhere and has RebuildSnapshot/LoadAll* methods
}

func NewSnapshotManager(cache cachev3.SnapshotCache, nodeID string, db *Storage) *SnapshotManager {
	return &SnapshotManager{
		Cache:  cache,
		NodeID: nodeID,
		DB:     db,
	}
}

// YamlResources is a helper struct to unmarshal the common Envoy YAML file structure
type YamlResources struct {
	Resources []yaml.Node `yaml:"resources"`
}

func (sm *SnapshotManager) LoadSnapshotFromFile(filePath string) (map[resourcev3.Type][]types.Resource, error) {
	data, err := os.ReadFile(filePath)
	if err != nil {
		return nil, fmt.Errorf("failed to read file: %w", err)
	}

	var raw interface{}
	if err := yaml.Unmarshal(data, &raw); err != nil {
		return nil, fmt.Errorf("failed to unmarshal YAML/JSON file %s: %w", filePath, err)
	}

	resources := make(map[resourcev3.Type][]types.Resource)

	var walk func(node interface{}) error
	walk = func(node interface{}) error {
		switch v := node.(type) {
		case map[string]interface{}:
			if typStr, ok := v["@type"].(string); ok {
				typ := resourcev3.Type(typStr)

				// only process known top-level xDS resources
				var resource types.Resource
				var newResource bool

				switch typ {
				case resourcev3.ClusterType:
					resource = &clusterv3.Cluster{}
					newResource = true
				case resourcev3.ListenerType:
					resource = &listenerv3.Listener{}
					newResource = true
				case resourcev3.EndpointType:
					resource = &endpointv3.ClusterLoadAssignment{}
					newResource = true
				case resourcev3.SecretType:
					resource = &secretv3.Secret{}
					newResource = true
				case resourcev3.RuntimeType:
					// resource = &runtimev3.Runtime{} // Placeholder, assuming it's correctly imported
					// newResource = true
				default:
					// Skip nested or unsupported types
				}

				if newResource {
					// Remove @type before unmarshalling
					delete(v, "@type")

					jsonBytes, err := json.Marshal(v)
					if err != nil {
						return fmt.Errorf("failed to marshal resource node to JSON: %w", err)
					}

					if err := protojson.Unmarshal(jsonBytes, resource); err != nil {
						return fmt.Errorf("failed to unmarshal %s: %w", typ, err)
					}
					resources[typ] = append(resources[typ], resource)
				}
			}

			// recurse into children
			for _, child := range v {
				if err := walk(child); err != nil {
					return err
				}
			}

		case []interface{}:
			for _, item := range v {
				if err := walk(item); err != nil {
					return err
				}
			}
		}
		return nil
	}

	if err := walk(raw); err != nil {
		return nil, err
	}

	return resources, nil
}

// SetSnapshotFromConfig sets a snapshot from an aggregated SnapshotConfig
func (sm *SnapshotManager) SetSnapshotFromConfig(ctx context.Context, version string, cfg *SnapshotConfig) error {
	if cfg == nil {
		return fmt.Errorf("snapshot config is nil")
	}

	// Ensure version is not empty
	if version == "" {
		version = fmt.Sprintf("snap-%d", time.Now().UnixNano())
	}

	// Build the resource map expected by cachev3.NewSnapshot
	resources := map[resourcev3.Type][]types.Resource{
		resourcev3.ClusterType:  make([]types.Resource, len(cfg.EnabledClusters)),
		resourcev3.ListenerType: make([]types.Resource, len(cfg.EnabledListeners)),
		// Other types if supported by SnapshotConfig, can be added here
	}

	// Populate slices by direct type assertion and conversion
	for i, c := range cfg.EnabledClusters {
		resources[resourcev3.ClusterType][i] = c
	}
	for i, l := range cfg.EnabledListeners {
		resources[resourcev3.ListenerType][i] = l
	}

	// Create the snapshot
	snap, err := cachev3.NewSnapshot(version, resources)
	if err != nil {
		return fmt.Errorf("failed to create snapshot: %w", err)
	}

	// Apply snapshot to the cache
	if err := sm.Cache.SetSnapshot(ctx, sm.NodeID, snap); err != nil {
		return fmt.Errorf("failed to set snapshot: %w", err)
	}

	return nil
}

// SnapshotToConfig converts current cache snapshot into SnapshotConfig
func (sm *SnapshotManager) SnapshotToConfig(ctx context.Context, nodeID string) (*SnapshotConfig, error) {
	snap, err := sm.Cache.GetSnapshot(nodeID)
	if err != nil {
		return nil, fmt.Errorf("failed to get snapshot for node %s: %w", nodeID, err)
	}

	config := &SnapshotConfig{
		EnabledClusters:  []*clusterv3.Cluster{},
		EnabledListeners: []*listenerv3.Listener{},
		// Disabled fields are not populated from the cache, only enabled ones.
	}

	// Convert Cluster resources
	for _, r := range snap.GetResources(string(resourcev3.ClusterType)) {
		if c, ok := r.(*clusterv3.Cluster); ok {
			config.EnabledClusters = append(config.EnabledClusters, c)
		}
	}

	// Convert Listener resources
	for _, r := range snap.GetResources(string(resourcev3.ListenerType)) {
		if l, ok := r.(*listenerv3.Listener); ok {
			config.EnabledListeners = append(config.EnabledListeners, l)
		}
	}

	return config, nil
}

// SetSnapshot sets a full snapshot
func (sm *SnapshotManager) SetSnapshot(ctx context.Context, version string, resources map[resourcev3.Type][]types.Resource) error {
	snap, err := cachev3.NewSnapshot(version, resources)
	if err != nil {
		return fmt.Errorf("failed to create snapshot: %w", err)
	}
	return sm.Cache.SetSnapshot(ctx, sm.NodeID, snap)
}

// ---------------- Persistence and Sync Methods (Two-Layer Model) ----------------

// LoadSnapshotFromDB implements the sync DB data to cache. It rebuilds the snapshot
// from the persistent store and updates the Envoy cache.
func (sm *SnapshotManager) LoadSnapshotFromDB(ctx context.Context) error {
	fmt.Println("Loading configuration from main DB...")

	// 1. Try Database (Primary Source)
	cfg, err := sm.DB.RebuildSnapshot(ctx)
	if err != nil {
		return fmt.Errorf("failed to rebuild snapshot from DB: %w", err)
	}

	fmt.Println("Loaded configuration from DB. Updating Envoy Cache.")

	// 2. Update Envoy's in-memory cache (This is the 'flash cache' layer)
	return sm.SetSnapshotFromConfig(ctx, "db-sync-"+time.Now().Format(time.RFC3339), cfg)
}

// FlushCacheToDB saves the current in-memory Envoy snapshot (the source of truth)
// to the persistent DB. This implements the "flash cache to db" write-through.
func (sm *SnapshotManager) FlushCacheToDB(ctx context.Context, strategy DeleteStrategy) error {
	// 1. Get current Envoy snapshot as SnapshotConfig
	cfg, err := sm.SnapshotToConfig(ctx, sm.NodeID)
	if err != nil {
		return fmt.Errorf("failed to convert snapshot to config: %w", err)
	}

	// 2. Save to Persistent DB
	// Note: DB.SaveSnapshot handles insert/update logic for all resources
	if err := sm.DB.SaveSnapshot(ctx, cfg, strategy); err != nil {
		return fmt.Errorf("failed to save config to DB: %w", err)
	}
	fmt.Println("Successfully saved to Persistent DB.")

	return nil
}

// EnableResourceFromDB fetches a logically disabled resource from the DB and
// flips its status to enabled, then adds it back to the cache.
func (sm *SnapshotManager) EnableResourceFromDB(name string, typ resourcev3.Type) error {
	ctx := context.Background()
	switch typ {
	case resourcev3.ClusterType:
		if err := sm.DB.EnableCluster(ctx, name, true); err != nil {
			return fmt.Errorf("failed to enable cluster '%s' in DB: %w", name, err)
		}
	case resourcev3.ListenerType:
		if err := sm.DB.EnableListener(ctx, name, true); err != nil {
			return fmt.Errorf("failed to enable listener '%s' in DB: %w", name, err)
		}
	default:
		return fmt.Errorf("unsupported resource type for enabling: %s", typ)
	}

	// Reload snapshot from DB to update the cache with the newly enabled resource
	return sm.LoadSnapshotFromDB(ctx)
}

// ---------------- Consistency Check ----------------

// CheckCacheDBConsistency compares the currently active Envoy cache snapshot
// against the enabled resources in the persistent DB.
func (sm *SnapshotManager) CheckCacheDBConsistency(ctx context.Context) (*internalapi.ConsistencyReport, error) {
	report := &internalapi.ConsistencyReport{
		CacheOnly: make(map[resourcev3.Type][]string),
		DBOnly:    make(map[resourcev3.Type][]string),
	}

	// 1. Get current cache snapshot
	cacheConfig, err := sm.SnapshotToConfig(ctx, sm.NodeID)
	if err != nil {
		return nil, fmt.Errorf("failed to get snapshot from cache: %w", err)
	}

	// 2. Rebuild snapshot from DB (only fetches *enabled* resources from DB)
	dbConfig, err := sm.DB.RebuildSnapshot(ctx)
	if err != nil {
		return nil, fmt.Errorf("failed to rebuild snapshot from DB: %w", err)
	}

	// Helper to build a set (map[string]struct{}) of resource names for faster lookups
	buildNameSet := func(resources []ResourceNamer) map[string]struct{} {
		set := make(map[string]struct{}, len(resources))
		for _, r := range resources {
			set[r.GetName()] = struct{}{}
		}
		return set
	}

	// Map of resource types to their lists in SnapshotConfig
	typeResourceMaps := []struct {
		typ       resourcev3.Type
		cacheList []ResourceNamer
		dbList    []ResourceNamer
	}{
		{resourcev3.ClusterType, resourcesToNamers(cacheConfig.EnabledClusters), resourcesToNamers(dbConfig.EnabledClusters)},
		{resourcev3.ListenerType, resourcesToNamers(cacheConfig.EnabledListeners), resourcesToNamers(dbConfig.EnabledListeners)},
	}

	for _, m := range typeResourceMaps {
		cacheSet := buildNameSet(m.cacheList)
		dbSet := buildNameSet(m.dbList)

		// Check for Cache-only resources (present in cacheSet but not in dbSet)
		for cacheName := range cacheSet {
			if _, existsInDB := dbSet[cacheName]; !existsInDB {
				report.CacheOnly[m.typ] = append(report.CacheOnly[m.typ], cacheName)
				report.Inconsistent = true
			}
		}

		// Check for DB-only resources (present in dbSet but not in cacheSet)
		for dbName := range dbSet {
			if _, existsInCache := cacheSet[dbName]; !existsInCache {
				report.DBOnly[m.typ] = append(report.DBOnly[m.typ], dbName)
				report.Inconsistent = true
			}
		}
	}

	return report, nil
}

// ---------------- Generic Add / Remove / List / Get ----------------

// AddResourceToSnapshot adds any resource to the snapshot dynamically
func (sm *SnapshotManager) AddResourceToSnapshot(resource types.Resource, typ resourcev3.Type) error {
	snap, err := sm.Cache.GetSnapshot(sm.NodeID)
	if err != nil {
		return fmt.Errorf("failed to get snapshot from cache: %w", err)
	}
	resources := sm.getAllResourcesFromSnapshot(snap)

	// Append to the appropriate slice
	switch typ {
	case resourcev3.ClusterType:
		resources[resourcev3.ClusterType] = append(resources[resourcev3.ClusterType], resource)
	case resourcev3.ListenerType:
		resources[resourcev3.ListenerType] = append(resources[resourcev3.ListenerType], resource)
	case resourcev3.EndpointType, resourcev3.SecretType, resourcev3.RuntimeType:
		resources[typ] = append(resources[typ], resource)
	default:
		return fmt.Errorf("unsupported resource type: %s", typ)
	}

	resourceNamer, ok := resource.(interface{ GetName() string })
	if !ok {
		return fmt.Errorf("resource of type %s does not implement GetName()", typ)
	}

	newSnap, _ := cachev3.NewSnapshot(
		"snap-generic-"+resourceNamer.GetName(),
		resources,
	)
	return sm.Cache.SetSnapshot(context.TODO(), sm.NodeID, newSnap)
}

// RemoveResource removes any resource by name dynamically
func (sm *SnapshotManager) RemoveResource(name string, typ resourcev3.Type, strategy DeleteStrategy) error {
	snap, _ := sm.Cache.GetSnapshot(sm.NodeID)
	resources := sm.getAllResourcesFromSnapshot(snap)

	// Flag to check if resource was found in cache
	var resourceFound = false

	// Filter the target type
	if targetResources, ok := resources[typ]; ok {
		resources[typ], resourceFound = filterAndCheckResourcesByName(targetResources, name)
	}

	if strategy == DeleteActual {
		if resourceFound {
			return fmt.Errorf("actual delete requested but resource %s of type %s still exists in cache", name, typ)
		}
		if typ == resourcev3.ClusterType {
			if err := sm.DB.RemoveCluster(context.TODO(), name); err != nil {
				return fmt.Errorf("failed to delete cluster %s from DB: %w", name, err)
			}
			return nil
		}
		if typ == resourcev3.ListenerType {
			if err := sm.DB.RemoveListener(context.TODO(), name); err != nil {
				return fmt.Errorf("failed to delete listener %s from DB: %w", name, err)
			}
			return nil
		}
		return fmt.Errorf("actual delete not supported for resource type: %s", typ)
	}

	if !resourceFound {
		return fmt.Errorf("resource %s of type %s not found in cache", name, typ)
	}

	newSnap, _ := cachev3.NewSnapshot(
		"snap-remove-generic-"+name,
		resources,
	)

	if err := sm.Cache.SetSnapshot(context.TODO(), sm.NodeID, newSnap); err != nil {
		return fmt.Errorf("failed to set snapshot: %w", err)
	}

	if err := sm.FlushCacheToDB(context.TODO(), strategy); err != nil {
		return fmt.Errorf("failed to flush cache to DB: %w", err)
	}
	return nil
}

// ListResources returns all enabled and disabled resources of a given type from the DB.
func (sm *SnapshotManager) ListResources(typ resourcev3.Type) ([]types.Resource, []types.Resource, error) {
	snap, err := sm.DB.RebuildSnapshot(context.Background())
	if err != nil {
		return nil, nil, fmt.Errorf("failed to rebuild snapshot from DB: %w", err)
	}

	var enabled, disabled []types.Resource
	var namerEnabled, namerDisabled []ResourceNamer

	switch typ {
	case resourcev3.ClusterType:
		namerEnabled = resourcesToNamers(snap.EnabledClusters)
		namerDisabled = resourcesToNamers(snap.DisabledClusters)
	case resourcev3.ListenerType:
		namerEnabled = resourcesToNamers(snap.EnabledListeners)
		namerDisabled = resourcesToNamers(snap.DisabledListeners)
	default:
		return nil, nil, fmt.Errorf("unsupported resource type: %s", typ)
	}

	// Convert ResourceNamer slices back to types.Resource slices
	enabled = make([]types.Resource, len(namerEnabled))
	for i, r := range namerEnabled {
		enabled[i] = r.(types.Resource)
	}

	disabled = make([]types.Resource, len(namerDisabled))
	for i, r := range namerDisabled {
		disabled[i] = r.(types.Resource)
	}

	return enabled, disabled, nil
}

// GetResourceFromCache retrieves a resource by name and type from the cache.
func (sm *SnapshotManager) GetResourceFromCache(name string, typ resourcev3.Type) (types.Resource, error) {
	snap, err := sm.Cache.GetSnapshot(sm.NodeID)
	if err != nil {
		return nil, err
	}
	r, ok := snap.GetResources(string(typ))[name]
	if !ok {
		return nil, fmt.Errorf("%s resource %s not found in cache", typ, name)
	}

	// We rely on the type given to be correct, as all xDS resources implement GetName().
	return r, nil
}

// ---------------- Save ----------------

func (sm *SnapshotManager) SaveSnapshotToFile(filePath string) error {
	snap, err := sm.Cache.GetSnapshot(sm.NodeID)
	if err != nil {
		return err
	}

	out := make(map[string][]interface{})

	// Iterate over all known types
	clusterTypeResources := snap.GetResources(resourcev3.ClusterType)
	for _, r := range clusterTypeResources {
		out[resourcev3.ClusterType] = append(out[resourcev3.ClusterType], r)
	}
	listenerTypeResources := snap.GetResources(resourcev3.ListenerType)
	for _, r := range listenerTypeResources {
		out[resourcev3.ListenerType] = append(out[resourcev3.ListenerType], r)
	}

	data, err := json.MarshalIndent(out, "", "  ")
	if err != nil {
		return err
	}

	return os.WriteFile(filePath, data, 0644)
}

// ---------------- Helpers ----------------

// mapToSlice converts a map of named resources to a slice of resources.
func mapToSlice(m map[string]types.Resource) []types.Resource {
	out := make([]types.Resource, 0, len(m))
	for _, r := range m {
		out = append(out, r)
	}
	return out
}

// filterAndCheckResourcesByName filters a slice of resources by name,
// returning the filtered slice and a boolean indicating if the named resource was found.
func filterAndCheckResourcesByName(resources []types.Resource, name string) ([]types.Resource, bool) {
	filtered := []types.Resource{}
	var found = false
	for _, r := range resources {
		if namer, ok := r.(interface{ GetName() string }); ok {
			if namer.GetName() != name {
				filtered = append(filtered, r)
			} else {
				found = true
			}
		} else {
			// fallback, include unknown type
			filtered = append(filtered, r)
		}
	}
	return filtered, found
}

// getAllResourcesFromSnapshot retrieves all known resource types from a snapshot as a map.
func (sm *SnapshotManager) getAllResourcesFromSnapshot(snap cachev3.ResourceSnapshot) map[resourcev3.Type][]types.Resource {
	// Only include types that might be manipulated by the generic functions
	resources := map[resourcev3.Type][]types.Resource{
		resourcev3.ClusterType:  mapToSlice(snap.GetResources(string(resourcev3.ClusterType))),
		resourcev3.ListenerType: mapToSlice(snap.GetResources(string(resourcev3.ListenerType))),
		// resourcev3.EndpointType: mapToSlice(snap.GetResources(string(resourcev3.EndpointType))),
		// resourcev3.SecretType:   mapToSlice(snap.GetResources(string(resourcev3.SecretType))),
		// resourcev3.RuntimeType:  mapToSlice(snap.GetResources(string(resourcev3.RuntimeType))),
		// Include other types as needed
	}
	return resources
}

// resourcesToNamers converts a slice of proto-generated resource pointers
// (like []*clusterv3.Cluster) to a slice of the generic ResourceNamer interface.
// This is necessary because structs like *clusterv3.Cluster don't explicitly
// implement types.Resource, but are compatible with it and ResourceNamer.
func resourcesToNamers[T ResourceNamer](list []T) []ResourceNamer {
	out := make([]ResourceNamer, len(list))
	for i, item := range list {
		out[i] = item
	}
	return out
}