Newer
Older
EnvoyControlPlane / internal / rest_api.go
package internal

import (
	"context"
	"encoding/json"
	"fmt"
	"net/http"

	clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
	listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
	"github.com/envoyproxy/go-control-plane/pkg/cache/types"
	resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
	"google.golang.org/protobuf/encoding/protojson"
	"google.golang.org/protobuf/reflect/protoreflect"
)

// API holds reference to snapshot manager
type API struct {
	Manager *SnapshotManager
}

// AddClusterRequest defines payload to add a cluster
type AddClusterRequest struct {
	Name    string            `json:"name"`
	Cluster clusterv3.Cluster `json:"cluster"`
}

// RemoveClusterRequest defines payload to remove a cluster
type RemoveClusterRequest struct {
	Name string `json:"name"`
}

// EnableResourceRequest defines payload to enable a resource
// This will be used for both enable-cluster and enable-listener
type EnableResourceRequest struct {
	Name string `json:"name"`
}

// RemoveResourceRequest defines payload to remove a resource.
// Used for both /remove-cluster and /remove-listener.
type RemoveResourceRequest struct {
	Name string `json:"name"`
}

// SnapshotFileRequest defines payload to load/save snapshot from/to file
type SnapshotFileRequest struct {
	Path string `json:"path"`
}

// AddListenerRequest defines payload to add a listener
type AddListenerRequest struct {
	Name     string              `json:"name"`
	Listener listenerv3.Listener `json:"listener"`
}

// RemoveListenerRequest defines payload to remove a listener
type RemoveListenerRequest struct {
	Name string `json:"name"`
}

// NewAPI returns a new REST API handler
func NewAPI(sm *SnapshotManager) *API {
	return &API{
		Manager: sm,
	}
}

// RegisterRoutes mounts REST handlers
func (api *API) RegisterRoutes(mux *http.ServeMux) {
	// Management Handlers (Add / Remove / Enable / Disable)

	// Cluster Handlers
	mux.HandleFunc("/add-cluster", func(w http.ResponseWriter, r *http.Request) {
		api.addResourceHandler(w, r, resourcev3.ClusterType, func(req interface{}) types.Resource {
			cr := req.(*AddClusterRequest)
			return &cr.Cluster
		})
	})
	mux.HandleFunc("/disable-cluster", func(w http.ResponseWriter, r *http.Request) {
		api.disableResourceHandler(w, r, resourcev3.ClusterType)
	})
	mux.HandleFunc("/enable-cluster", func(w http.ResponseWriter, r *http.Request) {
		api.enableResourceHandler(w, r, resourcev3.ClusterType)
	})
	// NEW: Remove Cluster Handler
	mux.HandleFunc("/remove-cluster", func(w http.ResponseWriter, r *http.Request) {
		api.removeResourceHandler(w, r, resourcev3.ClusterType)
	})

	// Listener Handlers
	mux.HandleFunc("/add-listener", func(w http.ResponseWriter, r *http.Request) {
		api.addResourceHandler(w, r, resourcev3.ListenerType, func(req interface{}) types.Resource {
			lr := req.(*AddListenerRequest)
			return &lr.Listener
		})
	})
	mux.HandleFunc("/disable-listener", func(w http.ResponseWriter, r *http.Request) {
		api.disableResourceHandler(w, r, resourcev3.ListenerType)
	})
	mux.HandleFunc("/enable-listener", func(w http.ResponseWriter, r *http.Request) {
		api.enableResourceHandler(w, r, resourcev3.ListenerType)
	})
	// NEW: Remove Listener Handler
	mux.HandleFunc("/remove-listener", func(w http.ResponseWriter, r *http.Request) {
		api.removeResourceHandler(w, r, resourcev3.ListenerType)
	})

	// Query / List Handlers
	mux.HandleFunc("/list-clusters", func(w http.ResponseWriter, r *http.Request) {
		api.listResourceHandler(w, r, resourcev3.ClusterType)
	})
	mux.HandleFunc("/get-cluster", func(w http.ResponseWriter, r *http.Request) {
		api.getResourceHandler(w, r, resourcev3.ClusterType)
	})

	mux.HandleFunc("/list-listeners", func(w http.ResponseWriter, r *http.Request) {
		api.listResourceHandler(w, r, resourcev3.ListenerType)
	})
	mux.HandleFunc("/get-listener", func(w http.ResponseWriter, r *http.Request) {
		api.getResourceHandler(w, r, resourcev3.ListenerType)
	})

	// Persistence Handlers (NEW)
	mux.HandleFunc("/load-from-db", api.loadSnapshotFromDB)
	mux.HandleFunc("/flush-to-db", api.flushCacheToDB)
	mux.HandleFunc("/load-from-file", api.loadSnapshotFromFile)
	mux.HandleFunc("/save-to-file", api.saveSnapshotToFile)

	// Consistency Handler (NEW)
	mux.HandleFunc("/is-consistent", api.isConsistentHandler)
}

// ---------------- Persistence Handlers ----------------

// loadSnapshotFromDB loads the full configuration from the persistent database
// into the SnapshotManager's Envoy Cache.
func (api *API) loadSnapshotFromDB(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
		return
	}
	w.Header().Set("Content-Type", "application/json")

	// Use context.Background() since this is a top-level operation
	if err := api.Manager.LoadSnapshotFromDB(context.Background()); err != nil {
		http.Error(w, fmt.Sprintf("failed to load from DB: %v", err), http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusOK)
	json.NewEncoder(w).Encode(map[string]string{"status": "ok", "message": "Configuration loaded from DB and applied to cache."})
}

// flushCacheToDB saves the current configuration from the Envoy Cache (source of truth)
// to the persistent database.
func (api *API) flushCacheToDB(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
		return
	}
	w.Header().Set("Content-Type", "application/json")

	// Default to DeleteNone (no physical deletion)
	deleteStrategy := DeleteLogical

	// Check for 'deleteMissing' query parameter. If "true" or "1", switch to DeleteActual.
	deleteMissingStr := r.URL.Query().Get("deleteMissing")
	if deleteMissingStr == "true" || deleteMissingStr == "1" {
		deleteStrategy = DeleteActual
	}

	// Use context.Background() since this is a top-level operation
	// Pass the determined DeleteStrategy
	if err := api.Manager.FlushCacheToDB(context.Background(), deleteStrategy); err != nil {
		http.Error(w, fmt.Sprintf("failed to flush to DB: %v", err), http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusOK)
	json.NewEncoder(w).Encode(map[string]string{"status": "ok", "message": "Configuration saved from cache to DB."})
}

// loadSnapshotFromFile loads a snapshot from a local file and applies it to the cache.
func (api *API) loadSnapshotFromFile(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
		return
	}
	w.Header().Set("Content-Type", "application/json")

	var req SnapshotFileRequest
	if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Path == "" {
		http.Error(w, "path required in request body", http.StatusBadRequest)
		return
	}

	resources, err := api.Manager.LoadSnapshotFromFile(req.Path)
	if err != nil {
		http.Error(w, fmt.Sprintf("failed to load snapshot from file: %v", err), http.StatusInternalServerError)
		return
	}

	// Use context.Background()
	if err := api.Manager.SetSnapshot(context.Background(), req.Path, resources); err != nil {
		http.Error(w, fmt.Sprintf("failed to set snapshot: %v", err), http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusOK)
	json.NewEncoder(w).Encode(map[string]string{"status": "ok", "message": fmt.Sprintf("Snapshot loaded from %s and applied.", req.Path)})
}

// saveSnapshotToFile saves the current cache snapshot to a local file.
func (api *API) saveSnapshotToFile(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
		return
	}
	w.Header().Set("Content-Type", "application/json")

	var req SnapshotFileRequest
	if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Path == "" {
		http.Error(w, "path required in request body", http.StatusBadRequest)
		return
	}

	if err := api.Manager.SaveSnapshotToFile(req.Path); err != nil {
		http.Error(w, fmt.Sprintf("failed to save snapshot to file: %v", err), http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusOK)
	json.NewEncoder(w).Encode(map[string]string{"status": "ok", "message": fmt.Sprintf("Snapshot saved to %s.", req.Path)})
}

// ---------------- Generic REST Handlers ----------------

// createFn returns a types.Resource
func (api *API) addResourceHandler(w http.ResponseWriter, r *http.Request, typ resourcev3.Type, createFn func(interface{}) types.Resource) {
	if r.Method != http.MethodPost {
		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
		return
	}
	w.Header().Set("Content-Type", "application/json")

	var req interface{}
	switch typ {
	case resourcev3.ClusterType:
		req = &AddClusterRequest{}
	// case resourcev3.RouteType:
	// 	req = &AddRouteRequest{}
	case resourcev3.ListenerType:
		req = &AddListenerRequest{}
	default:
		http.Error(w, "unsupported type", http.StatusBadRequest)
		return
	}

	if err := json.NewDecoder(r.Body).Decode(req); err != nil {
		http.Error(w, "invalid request", http.StatusBadRequest)
		return
	}

	res := createFn(req)
	if err := api.Manager.AddResourceToSnapshot(res, typ); err != nil {
		http.Error(w, fmt.Sprintf("failed to add resource: %v", err), http.StatusInternalServerError)
		return
	}
	if err := api.Manager.FlushCacheToDB(context.Background(), DeleteLogical); err != nil {
		http.Error(w, fmt.Sprintf("failed to persist resource to DB: %v", err), http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusCreated)
	json.NewEncoder(w).Encode(map[string]string{"name": res.(interface{ GetName() string }).GetName()})
}

func (api *API) disableResourceHandler(w http.ResponseWriter, r *http.Request, typ resourcev3.Type) {
	if r.Method != http.MethodPost {
		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
		return
	}
	w.Header().Set("Content-Type", "application/json")

	var req struct{ Name string }
	if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Name == "" {
		http.Error(w, "name required", http.StatusBadRequest)
		return
	}

	if err := api.Manager.RemoveResource(req.Name, typ, DeleteLogical); err != nil {
		http.Error(w, fmt.Sprintf("failed to remove resource: %v", err), http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusOK)
}

// enableResourceHandler fetches a disabled resource from the DB and enables it (adds to cache).
func (api *API) enableResourceHandler(w http.ResponseWriter, r *http.Request, typ resourcev3.Type) {
	if r.Method != http.MethodPost {
		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
		return
	}
	w.Header().Set("Content-Type", "application/json")

	var req EnableResourceRequest
	if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Name == "" {
		http.Error(w, "name required", http.StatusBadRequest)
		return
	}

	// Call the Manager function to pull the resource from DB, enable it, and add to the cache.
	// The implementation of EnableResourceFromDB is assumed to exist in SnapshotManager.
	if err := api.Manager.EnableResourceFromDB(req.Name, typ); err != nil {
		// Use StatusNotFound if the error indicates the resource wasn't found in the disabled list
		// NOTE: The exact error string "not found" or similar must match the error returned by Manager.EnableResourceFromDB
		if err.Error() == fmt.Sprintf("disabled resource %s not found in DB for type %s", req.Name, typ) {
			http.Error(w, fmt.Sprintf("disabled resource '%s' not found or already enabled: %v", req.Name, err), http.StatusNotFound)
		} else {
			http.Error(w, fmt.Sprintf("failed to enable resource '%s' from DB: %v", req.Name, err), http.StatusInternalServerError)
		}
		return
	}

	// Reload the cache again from DB to ensure consistency.
	if err := api.Manager.LoadSnapshotFromDB(context.Background()); err != nil {
		http.Error(w, fmt.Sprintf("failed to reload snapshot from DB after enabling resource: %v", err), http.StatusInternalServerError)
		return
	}

	w.WriteHeader(http.StatusOK)
	json.NewEncoder(w).Encode(map[string]string{"status": "ok", "message": fmt.Sprintf("Resource '%s' enabled and applied to cache.", req.Name)})
}

// ---------------- NEW: Generic Remove Handler ----------------

// removeResourceHandler removes a resource completely from the DB (DeleteActual).
// It requires the resource to be disabled (not in the cache) before actual deletion.
func (api *API) removeResourceHandler(w http.ResponseWriter, r *http.Request, typ resourcev3.Type) {
	if r.Method != http.MethodPost {
		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
		return
	}
	w.Header().Set("Content-Type", "application/json")

	var req RemoveResourceRequest
	if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Name == "" {
		http.Error(w, "name required", http.StatusBadRequest)
		return
	}

	// Use DeleteActual strategy for permanent removal from the DB.
	// The Manager implementation must enforce the 'must be disabled' rule.
	if err := api.Manager.RemoveResource(req.Name, typ, DeleteActual); err != nil {
		// Manager is assumed to return a specific error if the resource is still enabled.
		if err.Error() == fmt.Sprintf("resource %s for type %s is enabled and must be disabled before removal", req.Name, typ) {
			http.Error(w, fmt.Sprintf("resource '%s' must be disabled first before permanent removal: %v", req.Name, err), http.StatusBadRequest)
		} else {
			http.Error(w, fmt.Sprintf("failed to permanently remove resource: %v", err), http.StatusInternalServerError)
		}
		return
	}

	w.WriteHeader(http.StatusOK)
	json.NewEncoder(w).Encode(map[string]string{"status": "ok", "message": fmt.Sprintf("Resource '%s' permanently removed.", req.Name)})
}

// ---------------- Query / List Handlers ----------------

func (api *API) listResourceHandler(w http.ResponseWriter, r *http.Request, typ resourcev3.Type) {
	if r.Method != http.MethodGet {
		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
		return
	}
	w.Header().Set("Content-Type", "application/json")

	enabledResources, disabledResources, err := api.Manager.ListResources(typ)
	if err != nil {
		http.Error(w, fmt.Sprintf("failed to list resources: %v", err), http.StatusInternalServerError)
		return
	}

	// Create the final response object structure
	response := struct {
		Enabled  []types.Resource `json:"enabled"`
		Disabled []types.Resource `json:"disabled"`
	}{
		Enabled:  enabledResources,
		Disabled: disabledResources,
	}

	w.WriteHeader(http.StatusOK)
	// Encode the new struct instead of the flat 'out' slice
	if err := json.NewEncoder(w).Encode(response); err != nil {
		// Handle encoding error, although unlikely if marshaling was fine
		http.Error(w, "failed to encode response", http.StatusInternalServerError)
	}
}

func (api *API) getResourceHandler(w http.ResponseWriter, r *http.Request, typ resourcev3.Type) {
	if r.Method != http.MethodGet {
		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
		return
	}

	name := r.URL.Query().Get("name")
	if name == "" {
		http.Error(w, "name query parameter required", http.StatusBadRequest)
		return
	}

	format := r.URL.Query().Get("format")
	if format == "" {
		format = "json" // default format
	}

	res, err := api.Manager.GetResourceFromCache(name, typ)
	if err != nil {
		http.Error(w, fmt.Sprintf("resource not found: %v", err), http.StatusNotFound)
		return
	}

	pb, ok := res.(interface{ ProtoReflect() protoreflect.Message })
	if !ok {
		http.Error(w, "resource is not a protobuf message", http.StatusInternalServerError)
		return
	}

	var output []byte
	switch format {
	case "yaml":
		yamlStr, err := ConvertProtoToYAML(pb)
		if err != nil {
			http.Error(w, fmt.Sprintf("failed to convert resource to YAML: %v", err), http.StatusInternalServerError)
			return
		}
		w.Header().Set("Content-Type", "application/x-yaml")
		output = []byte(yamlStr)
	default: // json
		data, err := protojson.Marshal(pb)
		if err != nil {
			http.Error(w, fmt.Sprintf("failed to marshal protobuf to JSON: %v", err), http.StatusInternalServerError)
			return
		}
		w.Header().Set("Content-Type", "application/json")
		output = data
	}

	w.WriteHeader(http.StatusOK)
	w.Write(output)
}

// ---------------- Consistency Handler ----------------

// isConsistentHandler checks whether the current in-memory cache is consistent with the database.
func (api *API) isConsistentHandler(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodGet {
		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
		return
	}
	w.Header().Set("Content-Type", "application/json")

	consistent, err := api.Manager.CheckCacheDBConsistency(context.TODO()) // Assumes IsConsistent method exists on SnapshotManager
	if err != nil {
		http.Error(w, fmt.Sprintf("failed to check consistency: %v", err), http.StatusInternalServerError)
		return
	}

	response := struct {
		Consistent *ConsistencyReport `json:"consistent"`
	}{
		Consistent: consistent,
	}

	w.WriteHeader(http.StatusOK)
	if err := json.NewEncoder(w).Encode(response); err != nil {
		http.Error(w, "failed to encode response", http.StatusInternalServerError)
	}
}