diff --git a/go.mod b/go.mod index c302257..22021cb 100644 --- a/go.mod +++ b/go.mod @@ -5,9 +5,12 @@ require ( github.com/envoyproxy/go-control-plane v0.13.4 github.com/envoyproxy/go-control-plane/envoy v1.35.0 - github.com/google/uuid v1.6.0 + github.com/lib/pq v1.10.9 + github.com/mattn/go-sqlite3 v1.14.32 google.golang.org/grpc v1.75.1 google.golang.org/protobuf v1.36.10 + gopkg.in/yaml.v3 v3.0.1 + k8s.io/klog/v2 v2.130.1 ) require ( @@ -16,14 +19,11 @@ github.com/envoyproxy/go-control-plane/ratelimit v0.1.0 // indirect github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect github.com/go-logr/logr v1.4.3 // indirect - github.com/lib/pq v1.10.9 // indirect - github.com/mattn/go-sqlite3 v1.14.32 // indirect + github.com/kr/text v0.2.0 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect golang.org/x/net v0.41.0 // indirect golang.org/x/sys v0.33.0 // indirect golang.org/x/text v0.26.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250707201910-8d1bb00bc6a7 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/klog/v2 v2.130.1 // indirect ) diff --git a/go.sum b/go.sum index 793cc5a..1bb4e6c 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,7 @@ cel.dev/expr v0.24.0/go.mod h1:hLPLo1W4QUmuYdA72RBX06QTs6MXw941piREPl3Yfiw= github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 h1:aQ3y1lwWyqYPiWZThqv1aFbZMiM9vblcSArJRf2Irls= github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/envoyproxy/go-control-plane v0.13.4 h1:zEqyPVyku6IvWCFwux4x9RxkLOMUL+1vC9xUFv5l2/M= @@ -22,6 +23,10 @@ github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mattn/go-sqlite3 v1.14.32 h1:JD12Ag3oLy1zQA+BNn74xRgaBbdhbNIDYvQUEuuErjs= @@ -30,6 +35,8 @@ github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= @@ -63,6 +70,8 @@ google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= diff --git a/internal/api.go b/internal/api.go new file mode 100644 index 0000000..2250e2d --- /dev/null +++ b/internal/api.go @@ -0,0 +1,85 @@ +package internal + +import ( + "net/http" + + "github.com/envoyproxy/go-control-plane/pkg/cache/types" + resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + + internalapi "envoy-control-plane/internal/api" +) + +// API holds reference to snapshot manager +type API struct { + Manager *SnapshotManager // SnapshotManager is assumed to be defined elsewhere in internal package +} + +// NewAPI returns a new REST API handler +func NewAPI(sm *SnapshotManager) *API { + return &API{ + Manager: sm, + } +} + +// RegisterRoutes mounts REST handlers +func (api *API) RegisterRoutes(mux *http.ServeMux) { + // Management Handlers (Add / Remove / Enable / Disable) + + // Cluster Handlers + mux.HandleFunc("/add-cluster", func(w http.ResponseWriter, r *http.Request) { + api.addResourceHandler(w, r, resourcev3.ClusterType, func(req interface{}) types.Resource { + cr := req.(*internalapi.AddClusterRequest) + return &cr.Cluster + }) + }) + mux.HandleFunc("/disable-cluster", func(w http.ResponseWriter, r *http.Request) { + api.disableResourceHandler(w, r, resourcev3.ClusterType) + }) + mux.HandleFunc("/enable-cluster", func(w http.ResponseWriter, r *http.Request) { + api.enableResourceHandler(w, r, resourcev3.ClusterType) + }) + mux.HandleFunc("/remove-cluster", func(w http.ResponseWriter, r *http.Request) { + api.removeResourceHandler(w, r, resourcev3.ClusterType) + }) + + // Listener Handlers + mux.HandleFunc("/add-listener", func(w http.ResponseWriter, r *http.Request) { + api.addResourceHandler(w, r, resourcev3.ListenerType, func(req interface{}) types.Resource { + lr := req.(*internalapi.AddListenerRequest) + return &lr.Listener + }) + }) + mux.HandleFunc("/disable-listener", func(w http.ResponseWriter, r *http.Request) { + api.disableResourceHandler(w, r, resourcev3.ListenerType) + }) + mux.HandleFunc("/enable-listener", func(w http.ResponseWriter, r *http.Request) { + api.enableResourceHandler(w, r, resourcev3.ListenerType) + }) + mux.HandleFunc("/remove-listener", func(w http.ResponseWriter, r *http.Request) { + api.removeResourceHandler(w, r, resourcev3.ListenerType) + }) + + // Query / List Handlers + mux.HandleFunc("/list-clusters", func(w http.ResponseWriter, r *http.Request) { + api.listResourceHandler(w, r, resourcev3.ClusterType) + }) + mux.HandleFunc("/get-cluster", func(w http.ResponseWriter, r *http.Request) { + api.getResourceHandler(w, r, resourcev3.ClusterType) + }) + + mux.HandleFunc("/list-listeners", func(w http.ResponseWriter, r *http.Request) { + api.listResourceHandler(w, r, resourcev3.ListenerType) + }) + mux.HandleFunc("/get-listener", func(w http.ResponseWriter, r *http.Request) { + api.getResourceHandler(w, r, resourcev3.ListenerType) + }) + + // Persistence Handlers + mux.HandleFunc("/load-from-db", api.loadSnapshotFromDB) + mux.HandleFunc("/flush-to-db", api.flushCacheToDB) + mux.HandleFunc("/load-from-file", api.loadSnapshotFromFile) + mux.HandleFunc("/save-to-file", api.saveSnapshotToFile) + + // Consistency Handler + mux.HandleFunc("/is-consistent", api.isConsistentHandler) +} diff --git a/internal/api/types.go b/internal/api/types.go new file mode 100644 index 0000000..ddb1bff --- /dev/null +++ b/internal/api/types.go @@ -0,0 +1,53 @@ +package internal + +import ( + clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" +) + +// AddClusterRequest defines payload to add a cluster +type AddClusterRequest struct { + Name string `json:"name"` + Cluster clusterv3.Cluster `json:"cluster"` +} + +// RemoveClusterRequest defines payload to remove a cluster (Not explicitly used in handlers, but included for completeness) +type RemoveClusterRequest struct { + Name string `json:"name"` +} + +// EnableResourceRequest defines payload to enable a resource +// This will be used for both enable-cluster and enable-listener +type EnableResourceRequest struct { + Name string `json:"name"` +} + +// RemoveResourceRequest defines payload to remove a resource. +// Used for both /remove-cluster and /remove-listener. +type RemoveResourceRequest struct { + Name string `json:"name"` +} + +// SnapshotFileRequest defines payload to load/save snapshot from/to file +type SnapshotFileRequest struct { + Path string `json:"path"` +} + +// AddListenerRequest defines payload to add a listener +type AddListenerRequest struct { + Name string `json:"name"` + Listener listenerv3.Listener `json:"listener"` +} + +// RemoveListenerRequest defines payload to remove a listener (Not explicitly used in handlers, but included for completeness) +type RemoveListenerRequest struct { + Name string `json:"name"` +} + +// ConsistencyReport holds the results of the cache/DB consistency check. +type ConsistencyReport struct { + CacheOnly map[resourcev3.Type][]string `json:"cache-only"` // Resources present in cache but not enabled in DB + DBOnly map[resourcev3.Type][]string `json:"db-only"` // Resources enabled in DB but not present in cache + Inconsistent bool `json:"inconsistent"` +} diff --git a/internal/api_handlers.go b/internal/api_handlers.go new file mode 100644 index 0000000..0569963 --- /dev/null +++ b/internal/api_handlers.go @@ -0,0 +1,365 @@ +package internal + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + "github.com/envoyproxy/go-control-plane/pkg/cache/types" + resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/reflect/protoreflect" + + internalapi "envoy-control-plane/internal/api" +) + +// ---------------- Persistence Handlers ---------------- + +// loadSnapshotFromDB loads the full configuration from the persistent database +// into the SnapshotManager's Envoy Cache. +func (api *API) loadSnapshotFromDB(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + w.Header().Set("Content-Type", "application/json") + + // Use context.Background() since this is a top-level operation + if err := api.Manager.LoadSnapshotFromDB(context.Background()); err != nil { + http.Error(w, fmt.Sprintf("failed to load from DB: %v", err), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{"status": "ok", "message": "Configuration loaded from DB and applied to cache."}) +} + +// flushCacheToDB saves the current configuration from the Envoy Cache (source of truth) +// to the persistent database. +func (api *API) flushCacheToDB(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + w.Header().Set("Content-Type", "application/json") + + // Default to DeleteLogical (no physical deletion) + deleteStrategy := DeleteLogical // DeleteLogical is assumed to be defined elsewhere + + // Check for 'deleteMissing' query parameter. If "true" or "1", switch to DeleteActual. + deleteMissingStr := r.URL.Query().Get("deleteMissing") + if deleteMissingStr == "true" || deleteMissingStr == "1" { + deleteStrategy = DeleteActual // DeleteActual is assumed to be defined elsewhere + } + + // Use context.Background() since this is a top-level operation + // Pass the determined DeleteStrategy + if err := api.Manager.FlushCacheToDB(context.Background(), deleteStrategy); err != nil { + http.Error(w, fmt.Sprintf("failed to flush to DB: %v", err), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{"status": "ok", "message": "Configuration saved from cache to DB."}) +} + +// loadSnapshotFromFile loads a snapshot from a local file and applies it to the cache. +func (api *API) loadSnapshotFromFile(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + w.Header().Set("Content-Type", "application/json") + + var req internalapi.SnapshotFileRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Path == "" { + http.Error(w, "path required in request body", http.StatusBadRequest) + return + } + + resources, err := api.Manager.LoadSnapshotFromFile(req.Path) + if err != nil { + http.Error(w, fmt.Sprintf("failed to load snapshot from file: %v", err), http.StatusInternalServerError) + return + } + + // Use context.Background() + if err := api.Manager.SetSnapshot(context.Background(), req.Path, resources); err != nil { + http.Error(w, fmt.Sprintf("failed to set snapshot: %v", err), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{"status": "ok", "message": fmt.Sprintf("Snapshot loaded from %s and applied.", req.Path)}) +} + +// saveSnapshotToFile saves the current cache snapshot to a local file. +func (api *API) saveSnapshotToFile(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + w.Header().Set("Content-Type", "application/json") + + var req internalapi.SnapshotFileRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Path == "" { + http.Error(w, "path required in request body", http.StatusBadRequest) + return + } + + if err := api.Manager.SaveSnapshotToFile(req.Path); err != nil { + http.Error(w, fmt.Sprintf("failed to save snapshot to file: %v", err), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{"status": "ok", "message": fmt.Sprintf("Snapshot saved to %s.", req.Path)}) +} + +// ---------------- Generic REST Handlers ---------------- + +// addResourceHandler handles adding a resource (Cluster, Listener) to the cache and persisting it. +func (api *API) addResourceHandler(w http.ResponseWriter, r *http.Request, typ resourcev3.Type, createFn func(interface{}) types.Resource) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + w.Header().Set("Content-Type", "application/json") + + var req interface{} + switch typ { + case resourcev3.ClusterType: + req = &internalapi.AddClusterRequest{} + // case resourcev3.RouteType: + // req = &AddRouteRequest{} + case resourcev3.ListenerType: + req = &internalapi.AddListenerRequest{} + default: + http.Error(w, "unsupported type", http.StatusBadRequest) + return + } + + if err := json.NewDecoder(r.Body).Decode(req); err != nil { + http.Error(w, "invalid request", http.StatusBadRequest) + return + } + + res := createFn(req) + if err := api.Manager.AddResourceToSnapshot(res, typ); err != nil { + http.Error(w, fmt.Sprintf("failed to add resource: %v", err), http.StatusInternalServerError) + return + } + // Persist immediately using DeleteLogical (mark as disabled in DB) + if err := api.Manager.FlushCacheToDB(context.Background(), DeleteLogical); err != nil { + http.Error(w, fmt.Sprintf("failed to persist resource to DB: %v", err), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusCreated) + json.NewEncoder(w).Encode(map[string]string{"name": res.(interface{ GetName() string }).GetName()}) +} + +// disableResourceHandler handles disabling a resource (logical removal from cache/DB). +func (api *API) disableResourceHandler(w http.ResponseWriter, r *http.Request, typ resourcev3.Type) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + w.Header().Set("Content-Type", "application/json") + + var req struct{ Name string } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Name == "" { + http.Error(w, "name required", http.StatusBadRequest) + return + } + + // Use DeleteLogical to remove from cache and mark as disabled in DB + if err := api.Manager.RemoveResource(req.Name, typ, DeleteLogical); err != nil { + http.Error(w, fmt.Sprintf("failed to disable resource: %v", err), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{"status": "ok", "message": fmt.Sprintf("Resource '%s' disabled.", req.Name)}) +} + +// enableResourceHandler fetches a disabled resource from the DB and enables it (adds to cache). +func (api *API) enableResourceHandler(w http.ResponseWriter, r *http.Request, typ resourcev3.Type) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + w.Header().Set("Content-Type", "application/json") + + var req internalapi.EnableResourceRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Name == "" { + http.Error(w, "name required", http.StatusBadRequest) + return + } + + // Call the Manager function to pull the resource from DB, enable it, and add to the cache. + if err := api.Manager.EnableResourceFromDB(req.Name, typ); err != nil { + // NOTE: Exact error string check is brittle but necessary to replicate original logic's status code. + if err.Error() == fmt.Sprintf("disabled resource %s not found in DB for type %s", req.Name, typ) { + http.Error(w, fmt.Sprintf("disabled resource '%s' not found or already enabled: %v", req.Name, err), http.StatusNotFound) + } else { + http.Error(w, fmt.Sprintf("failed to enable resource '%s' from DB: %v", req.Name, err), http.StatusInternalServerError) + } + return + } + + // Reload the cache again from DB to ensure consistency. + if err := api.Manager.LoadSnapshotFromDB(context.Background()); err != nil { + http.Error(w, fmt.Sprintf("failed to reload snapshot from DB after enabling resource: %v", err), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{"status": "ok", "message": fmt.Sprintf("Resource '%s' enabled and applied to cache.", req.Name)}) +} + +// removeResourceHandler removes a resource completely from the DB (DeleteActual). +// It requires the resource to be disabled (not in the cache) before actual deletion. +func (api *API) removeResourceHandler(w http.ResponseWriter, r *http.Request, typ resourcev3.Type) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + w.Header().Set("Content-Type", "application/json") + + var req internalapi.RemoveResourceRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Name == "" { + http.Error(w, "name required", http.StatusBadRequest) + return + } + + // Use DeleteActual strategy for permanent removal from the DB. + if err := api.Manager.RemoveResource(req.Name, typ, DeleteActual); err != nil { + // NOTE: Exact error string check is brittle but necessary to replicate original logic's status code. + if err.Error() == fmt.Sprintf("resource %s for type %s is enabled and must be disabled before removal", req.Name, typ) { + http.Error(w, fmt.Sprintf("resource '%s' must be disabled first before permanent removal: %v", req.Name, err), http.StatusBadRequest) + } else { + http.Error(w, fmt.Sprintf("failed to permanently remove resource: %v", err), http.StatusInternalServerError) + } + return + } + + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{"status": "ok", "message": fmt.Sprintf("Resource '%s' permanently removed.", req.Name)}) +} + +// ---------------- Query / List Handlers ---------------- + +// listResourceHandler returns a list of enabled and disabled resources of a given type. +func (api *API) listResourceHandler(w http.ResponseWriter, r *http.Request, typ resourcev3.Type) { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + w.Header().Set("Content-Type", "application/json") + + enabledResources, disabledResources, err := api.Manager.ListResources(typ) + if err != nil { + http.Error(w, fmt.Sprintf("failed to list resources: %v", err), http.StatusInternalServerError) + return + } + + // Create the final response object structure + response := struct { + Enabled []types.Resource `json:"enabled"` + Disabled []types.Resource `json:"disabled"` + }{ + Enabled: enabledResources, + Disabled: disabledResources, + } + + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(response); err != nil { + http.Error(w, "failed to encode response", http.StatusInternalServerError) + } +} + +// getResourceHandler returns a single resource by name, allowing for different output formats. +func (api *API) getResourceHandler(w http.ResponseWriter, r *http.Request, typ resourcev3.Type) { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + name := r.URL.Query().Get("name") + if name == "" { + http.Error(w, "name query parameter required", http.StatusBadRequest) + return + } + + format := r.URL.Query().Get("format") + if format == "" { + format = "json" // default format + } + + res, err := api.Manager.GetResourceFromCache(name, typ) + if err != nil { + http.Error(w, fmt.Sprintf("resource not found: %v", err), http.StatusNotFound) + return + } + + pb, ok := res.(interface{ ProtoReflect() protoreflect.Message }) + if !ok { + http.Error(w, "resource is not a protobuf message", http.StatusInternalServerError) + return + } + + var output []byte + switch format { + case "yaml": + // ConvertProtoToYAML is assumed to be defined elsewhere + yamlStr, err := ConvertProtoToYAML(pb) + if err != nil { + http.Error(w, fmt.Sprintf("failed to convert resource to YAML: %v", err), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/x-yaml") + output = []byte(yamlStr) + default: // json + data, err := protojson.Marshal(pb) + if err != nil { + http.Error(w, fmt.Sprintf("failed to marshal protobuf to JSON: %v", err), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + output = data + } + + w.WriteHeader(http.StatusOK) + w.Write(output) +} + +// ---------------- Consistency Handler ---------------- + +// isConsistentHandler checks whether the current in-memory cache is consistent with the database. +func (api *API) isConsistentHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + w.Header().Set("Content-Type", "application/json") + + // ConsistencyReport is assumed to be defined elsewhere + consistent, err := api.Manager.CheckCacheDBConsistency(context.TODO()) + if err != nil { + http.Error(w, fmt.Sprintf("failed to check consistency: %v", err), http.StatusInternalServerError) + return + } + + response := struct { + Consistent *internalapi.ConsistencyReport `json:"consistent"` // ConsistencyReport is assumed to be defined elsewhere + }{ + Consistent: consistent, + } + + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(response); err != nil { + http.Error(w, "failed to encode response", http.StatusInternalServerError) + } +} diff --git a/internal/log.go b/internal/log.go deleted file mode 100644 index 50a57de..0000000 --- a/internal/log.go +++ /dev/null @@ -1,41 +0,0 @@ -package internal - -import ( - "k8s.io/klog/v2" // Import klog -) - -// DefaultLogger is enabled when no consuming clients provide -// a logger to the server/cache subsystem. -type DefaultLogger struct { -} - -// NewDefaultLogger creates a DefaultLogger. -func NewDefaultLogger() *DefaultLogger { - // klog is globally initialized. You might call klog.InitFlags(nil) - // and flag.Parse() earlier in your main function to configure it. - // We don't do it here as it would conflict with other flag parsing. - return &DefaultLogger{} -} - -// Debugf logs a message at level debug. -// klog's standard Verbosity (V) is used for debugging/info levels. -// V(0) is typically equivalent to Infof, V(1) or higher is for debugging. -func (l *DefaultLogger) Debugf(format string, args ...interface{}) { - // Using V(2) for typical debug output - klog.V(2).Infof(format, args...) -} - -// Infof logs a message at level info. -func (l *DefaultLogger) Infof(format string, args ...interface{}) { - klog.Infof(format, args...) -} - -// Warnf logs a message at level warn. -func (l *DefaultLogger) Warnf(format string, args ...interface{}) { - klog.Warningf(format, args...) -} - -// Errorf logs a message at level error. -func (l *DefaultLogger) Errorf(format string, args ...interface{}) { - klog.Errorf(format, args...) -} diff --git a/internal/log/log.go b/internal/log/log.go new file mode 100644 index 0000000..50a57de --- /dev/null +++ b/internal/log/log.go @@ -0,0 +1,41 @@ +package internal + +import ( + "k8s.io/klog/v2" // Import klog +) + +// DefaultLogger is enabled when no consuming clients provide +// a logger to the server/cache subsystem. +type DefaultLogger struct { +} + +// NewDefaultLogger creates a DefaultLogger. +func NewDefaultLogger() *DefaultLogger { + // klog is globally initialized. You might call klog.InitFlags(nil) + // and flag.Parse() earlier in your main function to configure it. + // We don't do it here as it would conflict with other flag parsing. + return &DefaultLogger{} +} + +// Debugf logs a message at level debug. +// klog's standard Verbosity (V) is used for debugging/info levels. +// V(0) is typically equivalent to Infof, V(1) or higher is for debugging. +func (l *DefaultLogger) Debugf(format string, args ...interface{}) { + // Using V(2) for typical debug output + klog.V(2).Infof(format, args...) +} + +// Infof logs a message at level info. +func (l *DefaultLogger) Infof(format string, args ...interface{}) { + klog.Infof(format, args...) +} + +// Warnf logs a message at level warn. +func (l *DefaultLogger) Warnf(format string, args ...interface{}) { + klog.Warningf(format, args...) +} + +// Errorf logs a message at level error. +func (l *DefaultLogger) Errorf(format string, args ...interface{}) { + klog.Errorf(format, args...) +} diff --git a/internal/rest_api.go b/internal/rest_api.go deleted file mode 100644 index 933d07d..0000000 --- a/internal/rest_api.go +++ /dev/null @@ -1,480 +0,0 @@ -package internal - -import ( - "context" - "encoding/json" - "fmt" - "net/http" - - clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" - listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" - "github.com/envoyproxy/go-control-plane/pkg/cache/types" - resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" - "google.golang.org/protobuf/encoding/protojson" - "google.golang.org/protobuf/reflect/protoreflect" -) - -// API holds reference to snapshot manager -type API struct { - Manager *SnapshotManager -} - -// AddClusterRequest defines payload to add a cluster -type AddClusterRequest struct { - Name string `json:"name"` - Cluster clusterv3.Cluster `json:"cluster"` -} - -// RemoveClusterRequest defines payload to remove a cluster -type RemoveClusterRequest struct { - Name string `json:"name"` -} - -// EnableResourceRequest defines payload to enable a resource -// This will be used for both enable-cluster and enable-listener -type EnableResourceRequest struct { - Name string `json:"name"` -} - -// RemoveResourceRequest defines payload to remove a resource. -// Used for both /remove-cluster and /remove-listener. -type RemoveResourceRequest struct { - Name string `json:"name"` -} - -// SnapshotFileRequest defines payload to load/save snapshot from/to file -type SnapshotFileRequest struct { - Path string `json:"path"` -} - -// AddListenerRequest defines payload to add a listener -type AddListenerRequest struct { - Name string `json:"name"` - Listener listenerv3.Listener `json:"listener"` -} - -// RemoveListenerRequest defines payload to remove a listener -type RemoveListenerRequest struct { - Name string `json:"name"` -} - -// NewAPI returns a new REST API handler -func NewAPI(sm *SnapshotManager) *API { - return &API{ - Manager: sm, - } -} - -// RegisterRoutes mounts REST handlers -func (api *API) RegisterRoutes(mux *http.ServeMux) { - // Management Handlers (Add / Remove / Enable / Disable) - - // Cluster Handlers - mux.HandleFunc("/add-cluster", func(w http.ResponseWriter, r *http.Request) { - api.addResourceHandler(w, r, resourcev3.ClusterType, func(req interface{}) types.Resource { - cr := req.(*AddClusterRequest) - return &cr.Cluster - }) - }) - mux.HandleFunc("/disable-cluster", func(w http.ResponseWriter, r *http.Request) { - api.disableResourceHandler(w, r, resourcev3.ClusterType) - }) - mux.HandleFunc("/enable-cluster", func(w http.ResponseWriter, r *http.Request) { - api.enableResourceHandler(w, r, resourcev3.ClusterType) - }) - // NEW: Remove Cluster Handler - mux.HandleFunc("/remove-cluster", func(w http.ResponseWriter, r *http.Request) { - api.removeResourceHandler(w, r, resourcev3.ClusterType) - }) - - // Listener Handlers - mux.HandleFunc("/add-listener", func(w http.ResponseWriter, r *http.Request) { - api.addResourceHandler(w, r, resourcev3.ListenerType, func(req interface{}) types.Resource { - lr := req.(*AddListenerRequest) - return &lr.Listener - }) - }) - mux.HandleFunc("/disable-listener", func(w http.ResponseWriter, r *http.Request) { - api.disableResourceHandler(w, r, resourcev3.ListenerType) - }) - mux.HandleFunc("/enable-listener", func(w http.ResponseWriter, r *http.Request) { - api.enableResourceHandler(w, r, resourcev3.ListenerType) - }) - // NEW: Remove Listener Handler - mux.HandleFunc("/remove-listener", func(w http.ResponseWriter, r *http.Request) { - api.removeResourceHandler(w, r, resourcev3.ListenerType) - }) - - // Query / List Handlers - mux.HandleFunc("/list-clusters", func(w http.ResponseWriter, r *http.Request) { - api.listResourceHandler(w, r, resourcev3.ClusterType) - }) - mux.HandleFunc("/get-cluster", func(w http.ResponseWriter, r *http.Request) { - api.getResourceHandler(w, r, resourcev3.ClusterType) - }) - - mux.HandleFunc("/list-listeners", func(w http.ResponseWriter, r *http.Request) { - api.listResourceHandler(w, r, resourcev3.ListenerType) - }) - mux.HandleFunc("/get-listener", func(w http.ResponseWriter, r *http.Request) { - api.getResourceHandler(w, r, resourcev3.ListenerType) - }) - - // Persistence Handlers (NEW) - mux.HandleFunc("/load-from-db", api.loadSnapshotFromDB) - mux.HandleFunc("/flush-to-db", api.flushCacheToDB) - mux.HandleFunc("/load-from-file", api.loadSnapshotFromFile) - mux.HandleFunc("/save-to-file", api.saveSnapshotToFile) - - // Consistency Handler (NEW) - mux.HandleFunc("/is-consistent", api.isConsistentHandler) -} - -// ---------------- Persistence Handlers ---------------- - -// loadSnapshotFromDB loads the full configuration from the persistent database -// into the SnapshotManager's Envoy Cache. -func (api *API) loadSnapshotFromDB(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) - return - } - w.Header().Set("Content-Type", "application/json") - - // Use context.Background() since this is a top-level operation - if err := api.Manager.LoadSnapshotFromDB(context.Background()); err != nil { - http.Error(w, fmt.Sprintf("failed to load from DB: %v", err), http.StatusInternalServerError) - return - } - - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(map[string]string{"status": "ok", "message": "Configuration loaded from DB and applied to cache."}) -} - -// flushCacheToDB saves the current configuration from the Envoy Cache (source of truth) -// to the persistent database. -func (api *API) flushCacheToDB(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) - return - } - w.Header().Set("Content-Type", "application/json") - - // Default to DeleteNone (no physical deletion) - deleteStrategy := DeleteLogical - - // Check for 'deleteMissing' query parameter. If "true" or "1", switch to DeleteActual. - deleteMissingStr := r.URL.Query().Get("deleteMissing") - if deleteMissingStr == "true" || deleteMissingStr == "1" { - deleteStrategy = DeleteActual - } - - // Use context.Background() since this is a top-level operation - // Pass the determined DeleteStrategy - if err := api.Manager.FlushCacheToDB(context.Background(), deleteStrategy); err != nil { - http.Error(w, fmt.Sprintf("failed to flush to DB: %v", err), http.StatusInternalServerError) - return - } - - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(map[string]string{"status": "ok", "message": "Configuration saved from cache to DB."}) -} - -// loadSnapshotFromFile loads a snapshot from a local file and applies it to the cache. -func (api *API) loadSnapshotFromFile(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) - return - } - w.Header().Set("Content-Type", "application/json") - - var req SnapshotFileRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Path == "" { - http.Error(w, "path required in request body", http.StatusBadRequest) - return - } - - resources, err := api.Manager.LoadSnapshotFromFile(req.Path) - if err != nil { - http.Error(w, fmt.Sprintf("failed to load snapshot from file: %v", err), http.StatusInternalServerError) - return - } - - // Use context.Background() - if err := api.Manager.SetSnapshot(context.Background(), req.Path, resources); err != nil { - http.Error(w, fmt.Sprintf("failed to set snapshot: %v", err), http.StatusInternalServerError) - return - } - - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(map[string]string{"status": "ok", "message": fmt.Sprintf("Snapshot loaded from %s and applied.", req.Path)}) -} - -// saveSnapshotToFile saves the current cache snapshot to a local file. -func (api *API) saveSnapshotToFile(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) - return - } - w.Header().Set("Content-Type", "application/json") - - var req SnapshotFileRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Path == "" { - http.Error(w, "path required in request body", http.StatusBadRequest) - return - } - - if err := api.Manager.SaveSnapshotToFile(req.Path); err != nil { - http.Error(w, fmt.Sprintf("failed to save snapshot to file: %v", err), http.StatusInternalServerError) - return - } - - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(map[string]string{"status": "ok", "message": fmt.Sprintf("Snapshot saved to %s.", req.Path)}) -} - -// ---------------- Generic REST Handlers ---------------- - -// createFn returns a types.Resource -func (api *API) addResourceHandler(w http.ResponseWriter, r *http.Request, typ resourcev3.Type, createFn func(interface{}) types.Resource) { - if r.Method != http.MethodPost { - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) - return - } - w.Header().Set("Content-Type", "application/json") - - var req interface{} - switch typ { - case resourcev3.ClusterType: - req = &AddClusterRequest{} - // case resourcev3.RouteType: - // req = &AddRouteRequest{} - case resourcev3.ListenerType: - req = &AddListenerRequest{} - default: - http.Error(w, "unsupported type", http.StatusBadRequest) - return - } - - if err := json.NewDecoder(r.Body).Decode(req); err != nil { - http.Error(w, "invalid request", http.StatusBadRequest) - return - } - - res := createFn(req) - if err := api.Manager.AddResourceToSnapshot(res, typ); err != nil { - http.Error(w, fmt.Sprintf("failed to add resource: %v", err), http.StatusInternalServerError) - return - } - if err := api.Manager.FlushCacheToDB(context.Background(), DeleteLogical); err != nil { - http.Error(w, fmt.Sprintf("failed to persist resource to DB: %v", err), http.StatusInternalServerError) - return - } - - w.WriteHeader(http.StatusCreated) - json.NewEncoder(w).Encode(map[string]string{"name": res.(interface{ GetName() string }).GetName()}) -} - -func (api *API) disableResourceHandler(w http.ResponseWriter, r *http.Request, typ resourcev3.Type) { - if r.Method != http.MethodPost { - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) - return - } - w.Header().Set("Content-Type", "application/json") - - var req struct{ Name string } - if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Name == "" { - http.Error(w, "name required", http.StatusBadRequest) - return - } - - if err := api.Manager.RemoveResource(req.Name, typ, DeleteLogical); err != nil { - http.Error(w, fmt.Sprintf("failed to remove resource: %v", err), http.StatusInternalServerError) - return - } - - w.WriteHeader(http.StatusOK) -} - -// enableResourceHandler fetches a disabled resource from the DB and enables it (adds to cache). -func (api *API) enableResourceHandler(w http.ResponseWriter, r *http.Request, typ resourcev3.Type) { - if r.Method != http.MethodPost { - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) - return - } - w.Header().Set("Content-Type", "application/json") - - var req EnableResourceRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Name == "" { - http.Error(w, "name required", http.StatusBadRequest) - return - } - - // Call the Manager function to pull the resource from DB, enable it, and add to the cache. - // The implementation of EnableResourceFromDB is assumed to exist in SnapshotManager. - if err := api.Manager.EnableResourceFromDB(req.Name, typ); err != nil { - // Use StatusNotFound if the error indicates the resource wasn't found in the disabled list - // NOTE: The exact error string "not found" or similar must match the error returned by Manager.EnableResourceFromDB - if err.Error() == fmt.Sprintf("disabled resource %s not found in DB for type %s", req.Name, typ) { - http.Error(w, fmt.Sprintf("disabled resource '%s' not found or already enabled: %v", req.Name, err), http.StatusNotFound) - } else { - http.Error(w, fmt.Sprintf("failed to enable resource '%s' from DB: %v", req.Name, err), http.StatusInternalServerError) - } - return - } - - // Reload the cache again from DB to ensure consistency. - if err := api.Manager.LoadSnapshotFromDB(context.Background()); err != nil { - http.Error(w, fmt.Sprintf("failed to reload snapshot from DB after enabling resource: %v", err), http.StatusInternalServerError) - return - } - - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(map[string]string{"status": "ok", "message": fmt.Sprintf("Resource '%s' enabled and applied to cache.", req.Name)}) -} - -// ---------------- NEW: Generic Remove Handler ---------------- - -// removeResourceHandler removes a resource completely from the DB (DeleteActual). -// It requires the resource to be disabled (not in the cache) before actual deletion. -func (api *API) removeResourceHandler(w http.ResponseWriter, r *http.Request, typ resourcev3.Type) { - if r.Method != http.MethodPost { - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) - return - } - w.Header().Set("Content-Type", "application/json") - - var req RemoveResourceRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Name == "" { - http.Error(w, "name required", http.StatusBadRequest) - return - } - - // Use DeleteActual strategy for permanent removal from the DB. - // The Manager implementation must enforce the 'must be disabled' rule. - if err := api.Manager.RemoveResource(req.Name, typ, DeleteActual); err != nil { - // Manager is assumed to return a specific error if the resource is still enabled. - if err.Error() == fmt.Sprintf("resource %s for type %s is enabled and must be disabled before removal", req.Name, typ) { - http.Error(w, fmt.Sprintf("resource '%s' must be disabled first before permanent removal: %v", req.Name, err), http.StatusBadRequest) - } else { - http.Error(w, fmt.Sprintf("failed to permanently remove resource: %v", err), http.StatusInternalServerError) - } - return - } - - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(map[string]string{"status": "ok", "message": fmt.Sprintf("Resource '%s' permanently removed.", req.Name)}) -} - -// ---------------- Query / List Handlers ---------------- - -func (api *API) listResourceHandler(w http.ResponseWriter, r *http.Request, typ resourcev3.Type) { - if r.Method != http.MethodGet { - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) - return - } - w.Header().Set("Content-Type", "application/json") - - enabledResources, disabledResources, err := api.Manager.ListResources(typ) - if err != nil { - http.Error(w, fmt.Sprintf("failed to list resources: %v", err), http.StatusInternalServerError) - return - } - - // Create the final response object structure - response := struct { - Enabled []types.Resource `json:"enabled"` - Disabled []types.Resource `json:"disabled"` - }{ - Enabled: enabledResources, - Disabled: disabledResources, - } - - w.WriteHeader(http.StatusOK) - // Encode the new struct instead of the flat 'out' slice - if err := json.NewEncoder(w).Encode(response); err != nil { - // Handle encoding error, although unlikely if marshaling was fine - http.Error(w, "failed to encode response", http.StatusInternalServerError) - } -} - -func (api *API) getResourceHandler(w http.ResponseWriter, r *http.Request, typ resourcev3.Type) { - if r.Method != http.MethodGet { - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) - return - } - - name := r.URL.Query().Get("name") - if name == "" { - http.Error(w, "name query parameter required", http.StatusBadRequest) - return - } - - format := r.URL.Query().Get("format") - if format == "" { - format = "json" // default format - } - - res, err := api.Manager.GetResourceFromCache(name, typ) - if err != nil { - http.Error(w, fmt.Sprintf("resource not found: %v", err), http.StatusNotFound) - return - } - - pb, ok := res.(interface{ ProtoReflect() protoreflect.Message }) - if !ok { - http.Error(w, "resource is not a protobuf message", http.StatusInternalServerError) - return - } - - var output []byte - switch format { - case "yaml": - yamlStr, err := ConvertProtoToYAML(pb) - if err != nil { - http.Error(w, fmt.Sprintf("failed to convert resource to YAML: %v", err), http.StatusInternalServerError) - return - } - w.Header().Set("Content-Type", "application/x-yaml") - output = []byte(yamlStr) - default: // json - data, err := protojson.Marshal(pb) - if err != nil { - http.Error(w, fmt.Sprintf("failed to marshal protobuf to JSON: %v", err), http.StatusInternalServerError) - return - } - w.Header().Set("Content-Type", "application/json") - output = data - } - - w.WriteHeader(http.StatusOK) - w.Write(output) -} - -// ---------------- Consistency Handler ---------------- - -// isConsistentHandler checks whether the current in-memory cache is consistent with the database. -func (api *API) isConsistentHandler(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) - return - } - w.Header().Set("Content-Type", "application/json") - - consistent, err := api.Manager.CheckCacheDBConsistency(context.TODO()) // Assumes IsConsistent method exists on SnapshotManager - if err != nil { - http.Error(w, fmt.Sprintf("failed to check consistency: %v", err), http.StatusInternalServerError) - return - } - - response := struct { - Consistent *ConsistencyReport `json:"consistent"` - }{ - Consistent: consistent, - } - - w.WriteHeader(http.StatusOK) - if err := json.NewEncoder(w).Encode(response); err != nil { - http.Error(w, "failed to encode response", http.StatusInternalServerError) - } -} diff --git a/internal/snapshot.go b/internal/snapshot.go index 5b64822..055d819 100644 --- a/internal/snapshot.go +++ b/internal/snapshot.go @@ -25,6 +25,8 @@ resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "google.golang.org/protobuf/encoding/protojson" yaml "gopkg.in/yaml.v3" + + internalapi "envoy-control-plane/internal/api" ) // ResourceNamer is an interface implemented by all xDS resources with a GetName() method. @@ -276,17 +278,10 @@ // ---------------- Consistency Check ---------------- -// ConsistencyReport holds the results of the cache/DB consistency check. -type ConsistencyReport struct { - CacheOnly map[resourcev3.Type][]string `json:"cache-only"` // Resources present in cache but not enabled in DB - DBOnly map[resourcev3.Type][]string `json:"db-only"` // Resources enabled in DB but not present in cache - Inconsistent bool `json:"inconsistent"` -} - // CheckCacheDBConsistency compares the currently active Envoy cache snapshot // against the enabled resources in the persistent DB. -func (sm *SnapshotManager) CheckCacheDBConsistency(ctx context.Context) (*ConsistencyReport, error) { - report := &ConsistencyReport{ +func (sm *SnapshotManager) CheckCacheDBConsistency(ctx context.Context) (*internalapi.ConsistencyReport, error) { + report := &internalapi.ConsistencyReport{ CacheOnly: make(map[resourcev3.Type][]string), DBOnly: make(map[resourcev3.Type][]string), } diff --git a/main.go b/main.go index 738001d..c280210 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ import ( "context" "database/sql" + "flag" "fmt" "net/http" @@ -10,84 +11,84 @@ "path/filepath" "strings" - _ "github.com/lib/pq" // Postgres driver - _ "github.com/mattn/go-sqlite3" // SQLite driver - "github.com/envoyproxy/go-control-plane/pkg/cache/types" cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/log" resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/envoyproxy/go-control-plane/pkg/server/v3" "github.com/envoyproxy/go-control-plane/pkg/test/v3" + _ "github.com/lib/pq" // Postgres driver + _ "github.com/mattn/go-sqlite3" // SQLite driver "k8s.io/klog/v2" "envoy-control-plane/internal" ) var ( - logger *internal.DefaultLogger - port uint - nodeID string - restPort uint - snapshotFile string - configDir string - dbConnStr string - dbDriver string + logger *log.DefaultLogger + port uint + nodeID string + restPort uint + snapshotFile string + configDir string + dbConnStr string + dbDriver string ) func init() { - logger = internal.NewDefaultLogger() - klog.InitFlags(nil) + logger = log.NewDefaultLogger() + klog.InitFlags(nil) - flag.UintVar(&port, "port", 18000, "xDS management server port") - flag.StringVar(&nodeID, "nodeID", "test-id", "Node ID") - flag.UintVar(&restPort, "rest-port", 8080, "REST API server port") - flag.StringVar(&snapshotFile, "snapshot-file", "", "Optional initial snapshot JSON/YAML file") - flag.StringVar(&configDir, "config-dir", "data/", "Optional directory containing multiple config files") - flag.StringVar(&dbConnStr, "db", "", "Optional database connection string for config persistence") + flag.UintVar(&port, "port", 18000, "xDS management server port") + flag.StringVar(&nodeID, "nodeID", "test-id", "Node ID") + flag.UintVar(&restPort, "rest-port", 8080, "REST API server port") + flag.StringVar(&snapshotFile, "snapshot-file", "", "Optional initial snapshot JSON/YAML file") + flag.StringVar(&configDir, "config-dir", "data/", "Optional directory containing multiple config files") + flag.StringVar(&dbConnStr, "db", "", "Optional database connection string for config persistence") } // determineDriver returns driver name from connection string func determineDriver(dsn string) string { - if strings.HasPrefix(dsn, "postgres://") || strings.HasPrefix(dsn, "postgresql://") { - return "postgres" - } - return "sqlite3" + if strings.HasPrefix(dsn, "postgres://") || strings.HasPrefix(dsn, "postgresql://") { + return "postgres" + } + return "sqlite3" } // loadConfigFiles iterates over a directory and loads all .yaml/.json files func loadConfigFiles(manager *internal.SnapshotManager, dir string) error { - logger.Infof("loading configuration files from directory: %s", dir) + logger.Infof("loading configuration files from directory: %s", dir) - files, err := os.ReadDir(dir) - if err != nil { - return fmt.Errorf("failed to read directory %s: %w", dir, err) - } + files, err := os.ReadDir(dir) + if err != nil { + return fmt.Errorf("failed to read directory %s: %w", dir, err) + } - resourceFiles := make(map[string][]types.Resource) - for _, file := range files { - if file.IsDir() { - continue - } - fileName := file.Name() - if strings.HasSuffix(fileName, ".yaml") || strings.HasSuffix(fileName, ".yml") || strings.HasSuffix(fileName, ".json") { - filePath := filepath.Join(dir, fileName) - logger.Infof(" -> loading config file: %s", filePath) + resourceFiles := make(map[string][]types.Resource) + for _, file := range files { + if file.IsDir() { + continue + } + fileName := file.Name() + if strings.HasSuffix(fileName, ".yaml") || strings.HasSuffix(fileName, ".yml") || strings.HasSuffix(fileName, ".json") { + filePath := filepath.Join(dir, fileName) + logger.Infof(" -> loading config file: %s", filePath) - rf, err := manager.LoadSnapshotFromFile(filePath) - if err != nil { - return fmt.Errorf("failed to load snapshot from file %s: %w", filePath, err) - } - for k, v := range rf { - resourceFiles[k] = append(resourceFiles[k], v...) - } - logger.Infof("loaded %d resources from %s", len(rf), filePath) - } - } + rf, err := manager.LoadSnapshotFromFile(filePath) + if err != nil { + return fmt.Errorf("failed to load snapshot from file %s: %w", filePath, err) + } + for k, v := range rf { + resourceFiles[k] = append(resourceFiles[k], v...) + } + logger.Infof("loaded %d resources from %s", len(rf), filePath) + } + } - if err := manager.SetSnapshot(context.TODO(), "snap-from-file", resourceFiles); err != nil { - return fmt.Errorf("failed to set combined snapshot from files: %w", err) - } - return nil + if err := manager.SetSnapshot(context.TODO(), "snap-from-file", resourceFiles); err != nil { + return fmt.Errorf("failed to set combined snapshot from files: %w", err) + } + return nil } // CORS is a middleware that sets the Access-Control-Allow-Origin header to * (all origins). @@ -108,132 +109,131 @@ }) } - func main() { - flag.Parse() - defer klog.Flush() + flag.Parse() + defer klog.Flush() - // Default DB to SQLite file if none provided - if dbConnStr == "" { - defaultDBPath := "data/config.db" - if err := os.MkdirAll(filepath.Dir(defaultDBPath), 0755); err != nil { - fmt.Fprintf(os.Stderr, "failed to create data directory: %v\n", err) - os.Exit(1) - } - dbConnStr = fmt.Sprintf("file:%s?_foreign_keys=on", defaultDBPath) - dbDriver = "sqlite3" - } else { - dbDriver = determineDriver(dbConnStr) - } - // --- Database initialization --- - db, err := sql.Open(dbDriver, dbConnStr) - if err != nil { - logger.Errorf("failed to connect to DB: %v", err) - os.Exit(1) - } - defer db.Close() + // Default DB to SQLite file if none provided + if dbConnStr == "" { + defaultDBPath := "data/config.db" + if err := os.MkdirAll(filepath.Dir(defaultDBPath), 0755); err != nil { + fmt.Fprintf(os.Stderr, "failed to create data directory: %v\n", err) + os.Exit(1) + } + dbConnStr = fmt.Sprintf("file:%s?_foreign_keys=on", defaultDBPath) + dbDriver = "sqlite3" + } else { + dbDriver = determineDriver(dbConnStr) + } + // --- Database initialization --- + db, err := sql.Open(dbDriver, dbConnStr) + if err != nil { + logger.Errorf("failed to connect to DB: %v", err) + os.Exit(1) + } + defer db.Close() - storage := internal.NewStorage(db, dbDriver) - if err := storage.InitSchema(context.Background()); err != nil { - logger.Errorf("failed to initialize DB schema: %v", err) - os.Exit(1) - } + storage := internal.NewStorage(db, dbDriver) + if err := storage.InitSchema(context.Background()); err != nil { + logger.Errorf("failed to initialize DB schema: %v", err) + os.Exit(1) + } - // Create snapshot cache and manager - cache := cachev3.NewSnapshotCache(false, cachev3.IDHash{}, logger) - manager := internal.NewSnapshotManager(cache, nodeID, storage) + // Create snapshot cache and manager + cache := cachev3.NewSnapshotCache(false, cachev3.IDHash{}, logger) + manager := internal.NewSnapshotManager(cache, nodeID, storage) - loadedConfigs := false + loadedConfigs := false - // Step 1: Try to load snapshot from DB - snapCfg, err := storage.RebuildSnapshot(context.Background()) - if err == nil && len(snapCfg.EnabledClusters)+len(snapCfg.EnabledListeners) > 0 { - if err := manager.SetSnapshotFromConfig(context.Background(), "snap-from-db", snapCfg); err != nil { - logger.Errorf("failed to set DB snapshot: %v", err) - os.Exit(1) - } - loadedConfigs = true - logger.Infof("loaded snapshot from database") - } + // Step 1: Try to load snapshot from DB + snapCfg, err := storage.RebuildSnapshot(context.Background()) + if err == nil && len(snapCfg.EnabledClusters)+len(snapCfg.EnabledListeners) > 0 { + if err := manager.SetSnapshotFromConfig(context.Background(), "snap-from-db", snapCfg); err != nil { + logger.Errorf("failed to set DB snapshot: %v", err) + os.Exit(1) + } + loadedConfigs = true + logger.Infof("loaded snapshot from database") + } - // Step 2: If DB empty, load from files and persist into DB - if !loadedConfigs { - if configDir != "" { - if err := loadConfigFiles(manager, configDir); err != nil { - logger.Errorf("failed to load configs from directory: %v", err) - os.Exit(1) - } - loadedConfigs = true - } else if snapshotFile != "" { - if _, err := os.Stat(snapshotFile); err == nil { - resources, err := manager.LoadSnapshotFromFile(snapshotFile) - if err != nil { - logger.Errorf("failed to load snapshot from file: %v", err) - os.Exit(1) - } - if err := manager.SetSnapshot(context.TODO(), "snap-from-file", resources); err != nil { - logger.Errorf("failed to set loaded snapshot: %v", err) - os.Exit(1) - } - loadedConfigs = true - } else { - logger.Warnf("snapshot file not found: %s", snapshotFile) - } - } + // Step 2: If DB empty, load from files and persist into DB + if !loadedConfigs { + if configDir != "" { + if err := loadConfigFiles(manager, configDir); err != nil { + logger.Errorf("failed to load configs from directory: %v", err) + os.Exit(1) + } + loadedConfigs = true + } else if snapshotFile != "" { + if _, err := os.Stat(snapshotFile); err == nil { + resources, err := manager.LoadSnapshotFromFile(snapshotFile) + if err != nil { + logger.Errorf("failed to load snapshot from file: %v", err) + os.Exit(1) + } + if err := manager.SetSnapshot(context.TODO(), "snap-from-file", resources); err != nil { + logger.Errorf("failed to set loaded snapshot: %v", err) + os.Exit(1) + } + loadedConfigs = true + } else { + logger.Warnf("snapshot file not found: %s", snapshotFile) + } + } - // Persist loaded snapshot into DB - if loadedConfigs { - snapCfg, err := manager.SnapshotToConfig(context.Background(), nodeID) - if err != nil { - logger.Errorf("failed to convert snapshot to DB config: %v", err) - os.Exit(1) - } - if err := storage.SaveSnapshot(context.Background(), snapCfg, internal.DeleteLogical); err != nil { - logger.Errorf("failed to save initial snapshot into DB: %v", err) - os.Exit(1) - } - logger.Infof("initial snapshot written into database") - } - } + // Persist loaded snapshot into DB + if loadedConfigs { + snapCfg, err := manager.SnapshotToConfig(context.Background(), nodeID) + if err != nil { + logger.Errorf("failed to convert snapshot to DB config: %v", err) + os.Exit(1) + } + if err := storage.SaveSnapshot(context.Background(), snapCfg, internal.DeleteLogical); err != nil { + logger.Errorf("failed to save initial snapshot into DB: %v", err) + os.Exit(1) + } + logger.Infof("initial snapshot written into database") + } + } - // Step 3: Ensure snapshot exists in cache - snap, err := manager.Cache.GetSnapshot(nodeID) - if err != nil || !loadedConfigs { - logger.Warnf("no valid snapshot found, creating empty snapshot") - snap, _ = cachev3.NewSnapshot("snap-init", map[resourcev3.Type][]types.Resource{ - resourcev3.ClusterType: {}, - resourcev3.RouteType: {}, - resourcev3.ListenerType: {}, - }) - if err := cache.SetSnapshot(context.Background(), nodeID, snap); err != nil { - logger.Errorf("failed to set initial snapshot: %v", err) - os.Exit(1) - } - } + // Step 3: Ensure snapshot exists in cache + snap, err := manager.Cache.GetSnapshot(nodeID) + if err != nil || !loadedConfigs { + logger.Warnf("no valid snapshot found, creating empty snapshot") + snap, _ = cachev3.NewSnapshot("snap-init", map[resourcev3.Type][]types.Resource{ + resourcev3.ClusterType: {}, + resourcev3.RouteType: {}, + resourcev3.ListenerType: {}, + }) + if err := cache.SetSnapshot(context.Background(), nodeID, snap); err != nil { + logger.Errorf("failed to set initial snapshot: %v", err) + os.Exit(1) + } + } - logger.Infof("xDS snapshot ready: version %s", snap.GetVersion(string(resourcev3.ClusterType))) + logger.Infof("xDS snapshot ready: version %s", snap.GetVersion(string(resourcev3.ClusterType))) - // --- Start xDS gRPC server --- - ctx := context.Background() - cb := &test.Callbacks{Debug: true} - srv := server.NewServer(ctx, cache, cb) - go internal.RunServer(srv, port) + // --- Start xDS gRPC server --- + ctx := context.Background() + cb := &test.Callbacks{Debug: true} + srv := server.NewServer(ctx, cache, cb) + go internal.RunServer(srv, port) - // --- Start REST API server --- - api := internal.NewAPI(manager) - mux := http.NewServeMux() - api.RegisterRoutes(mux) + // --- Start REST API server --- + api := internal.NewAPI(manager) + mux := http.NewServeMux() + api.RegisterRoutes(mux) - // Wrap the main multiplexer with the CORS handler - corsHandler := CORS(mux) + // Wrap the main multiplexer with the CORS handler + corsHandler := CORS(mux) - // NEW: Serve the index.html file and any other static assets - mux.Handle("/", http.FileServer(http.Dir("./static"))) // Assuming 'web' is the folder + // NEW: Serve the index.html file and any other static assets + mux.Handle("/", http.FileServer(http.Dir("./static"))) // Assuming 'web' is the folder - restAddr := fmt.Sprintf(":%d", restPort) - logger.Infof("starting REST API server on %s", restAddr) - if err := http.ListenAndServe(restAddr, corsHandler); err != nil { // Use corsHandler - logger.Errorf("REST server error: %v", err) - os.Exit(1) - } -} \ No newline at end of file + restAddr := fmt.Sprintf(":%d", restPort) + logger.Infof("starting REST API server on %s", restAddr) + if err := http.ListenAndServe(restAddr, corsHandler); err != nil { // Use corsHandler + logger.Errorf("REST server error: %v", err) + os.Exit(1) + } +}