diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6dd29b7 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +bin/ \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..e44694b --- /dev/null +++ b/Makefile @@ -0,0 +1,52 @@ +# Project settings +APP_NAME := xds-server +PKG := ./... +BIN_DIR := bin +BIN := $(BIN_DIR)/$(APP_NAME) + +# Go build settings +GO ?= go +GOFLAGS ?= +LDFLAGS := -s -w + +# Default target +.PHONY: all +all: build + +## Build binary +.PHONY: build +build: + @echo "==> Building $(APP_NAME)..." + @mkdir -p $(BIN_DIR) + $(GO) build $(GOFLAGS) -ldflags "$(LDFLAGS)" -o $(BIN) . + +## Run directly (default port 18000, override with PORT env var) +.PHONY: run +run: + @echo "==> Running $(APP_NAME)..." + @$(GO) run . --port=$${PORT:-18000} --nodeID=test-id + +## Run tests +.PHONY: test +test: + @echo "==> Running tests..." + $(GO) test -v $(PKG) + +## Lint (basic formatting + vet) +.PHONY: lint +lint: + @echo "==> Running go fmt and go vet..." + @$(GO) fmt $(PKG) + @$(GO) vet $(PKG) + +## Clean up build artifacts +.PHONY: clean +clean: + @echo "==> Cleaning build artifacts..." + rm -rf $(BIN_DIR) + +## Install deps (tidy) +.PHONY: deps +deps: + @echo "==> Downloading dependencies..." + $(GO) mod tidy diff --git a/README.md b/README.md new file mode 100644 index 0000000..08ccfc1 --- /dev/null +++ b/README.md @@ -0,0 +1,246 @@ +# Envoy Control Plane + +A lightweight **Envoy xDS control plane** with **REST API support** to dynamically manage clusters and routes. +Implemented in **Go** using **Envoy Go Control Plane v3**. + +--- + +## Features + +* Dynamic xDS snapshot management (clusters & routes) +* REST API to: + + * Add/remove clusters + * Add/remove routes + * Load/save snapshot to JSON files +* Optional initial snapshot from file at startup +* Runs an xDS gRPC server compatible with Envoy +* Fully compatible with `types.Resource` and Envoy v3 snapshot APIs + +--- + +## Prerequisites + +* Go 1.21+ +* Envoy 1.30+ (or compatible) +* Ensure `GOPATH` is set and dependencies are downloaded via `go mod tidy` + +--- + +## Project Structure + +``` +├── internal +│ ├── snapshot.go # SnapshotManager for clusters/routes +│ ├── rest_api.go # REST API for managing snapshots +│ └── ... # other helpers +├── main.go # Main entry: starts gRPC + REST servers +├── go.mod +└── go.sum +``` + +--- + +## Build + +```bash +# Clone repo +git clone +cd envoy-control-plane + +# Download dependencies +go mod tidy + +# Build binary +make all +``` + +Binary will be located at `bin/xds-server`. + +--- + +## Run + +```bash +./bin/xds-server \ + -port=18000 \ + -rest-port=8080 \ + -nodeID=test-node \ + -snapshot-file=snapshot.json +``` + +--- + +## Flags + +| Flag | Description | Default | +| ---------------- | ------------------------------------------- | ------- | +| `-port` | xDS gRPC server port | 18000 | +| `-rest-port` | REST API port | 8080 | +| `-nodeID` | Node ID for snapshot | test-id | +| `-snapshot-file` | Optional JSON file to load initial snapshot | "" | + +--- + +## REST API + +The REST server allows dynamic control over clusters and routes. + +### Add Cluster + +**POST** `/add-cluster` + +```json +{ + "name": "cluster1" +} +``` + +**Response:** + +```json +{ + "cuid": "cluster1" +} +``` + +--- + +### Remove Cluster + +**POST** `/remove-cluster` + +```json +{ + "name": "cluster1" +} +``` + +**Response:** `200 OK` + +--- + +### Add Route + +**POST** `/add-route` + +```json +{ + "name": "route1", + "cluster": "cluster1", + "path_prefix": "/api" +} +``` + +**Response:** + +```json +{ + "route": "route1" +} +``` + +--- + +### Remove Route + +**POST** `/remove-route` + +```json +{ + "name": "route1" +} +``` + +**Response:** `200 OK` + +--- + +### Load Snapshot From File + +**POST** `/load-snapshot` + +```json +{ + "path": "snapshot.json" +} +``` + +**Response:** `200 OK` + +--- + +### Save Snapshot To File + +**POST** `/save-snapshot` + +```json +{ + "path": "snapshot.json" +} +``` + +**Response:** `200 OK` + +--- + +## Snapshot JSON Format + +```json +{ + "envoy.config.cluster.v3.Cluster": [ + { + "name": "cluster1", + "connect_timeout": "5s", + "lb_policy": "ROUND_ROBIN", + "cluster_discovery_type": { "type": "EDS" } + } + ], + "envoy.config.route.v3.RouteConfiguration": [ + { + "name": "route1", + "virtual_hosts": [ + { + "name": "vh-route1", + "domains": ["*"], + "routes": [ + { + "match": { "prefix": "/api" }, + "route": { "cluster": "cluster1" } + } + ] + } + ] + } + ] +} +``` + +> Each top-level key is the **full type URL** of the resource. +> Clusters and routes can be added dynamically via REST or preloaded from a snapshot JSON file. + +--- + +## Connecting Envoy + +Configure Envoy with: + +```yaml +dynamic_resources: + ads_config: + api_type: GRPC + grpc_services: + - envoy_grpc: + cluster_name: xds_cluster + cds_config: {} + lds_config: {} +``` + +* Set `xds_cluster` to point to your control plane gRPC server (e.g., `localhost:18000`) +* Envoy will pull clusters and routes dynamically + +--- + +## License + +**Apache 2.0 License** diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..be4474c --- /dev/null +++ b/go.mod @@ -0,0 +1,24 @@ +module envoy-control-plane + +go 1.24.6 + +require ( + github.com/envoyproxy/go-control-plane v0.13.4 + github.com/envoyproxy/go-control-plane/envoy v1.35.0 + github.com/google/uuid v1.6.0 + google.golang.org/grpc v1.75.1 + google.golang.org/protobuf v1.36.10 +) + +require ( + cel.dev/expr v0.24.0 // indirect + github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 // indirect + github.com/envoyproxy/go-control-plane/ratelimit v0.1.0 // indirect + github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect + github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect + golang.org/x/net v0.41.0 // indirect + golang.org/x/sys v0.33.0 // indirect + golang.org/x/text v0.26.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250707201910-8d1bb00bc6a7 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..997d5f7 --- /dev/null +++ b/go.sum @@ -0,0 +1,62 @@ +cel.dev/expr v0.24.0 h1:56OvJKSH3hDGL0ml5uSxZmz3/3Pq4tJ+fb1unVLAFcY= +cel.dev/expr v0.24.0/go.mod h1:hLPLo1W4QUmuYdA72RBX06QTs6MXw941piREPl3Yfiw= +github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 h1:aQ3y1lwWyqYPiWZThqv1aFbZMiM9vblcSArJRf2Irls= +github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.13.4 h1:zEqyPVyku6IvWCFwux4x9RxkLOMUL+1vC9xUFv5l2/M= +github.com/envoyproxy/go-control-plane v0.13.4/go.mod h1:kDfuBlDVsSj2MjrLEtRWtHlsWIFcGyB2RMO44Dc5GZA= +github.com/envoyproxy/go-control-plane/envoy v1.35.0 h1:ixjkELDE+ru6idPxcHLj8LBVc2bFP7iBytj353BoHUo= +github.com/envoyproxy/go-control-plane/envoy v1.35.0/go.mod h1:09qwbGVuSWWAyN5t/b3iyVfz5+z8QWGrzkoqm/8SbEs= +github.com/envoyproxy/go-control-plane/ratelimit v0.1.0 h1:/G9QYbddjL25KvtKTv3an9lx6VBE2cnb8wp1vEGNYGI= +github.com/envoyproxy/go-control-plane/ratelimit v0.1.0/go.mod h1:Wk+tMFAFbCXaJPzVVHnPgRKdUdwW/KdbRt94AzgRee4= +github.com/envoyproxy/protoc-gen-validate v1.2.1 h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8= +github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo= +github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ= +go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I= +go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE= +go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E= +go.opentelemetry.io/otel/sdk v1.37.0 h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI= +go.opentelemetry.io/otel/sdk v1.37.0/go.mod h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg= +go.opentelemetry.io/otel/sdk/metric v1.37.0 h1:90lI228XrB9jCMuSdA0673aubgRobVZFhbjxHHspCPc= +go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps= +go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4= +go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= +golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= +golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= +gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= +gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +google.golang.org/genproto/googleapis/api v0.0.0-20250707201910-8d1bb00bc6a7 h1:FiusG7LWj+4byqhbvmB+Q93B/mOxJLN2DTozDuZm4EU= +google.golang.org/genproto/googleapis/api v0.0.0-20250707201910-8d1bb00bc6a7/go.mod h1:kXqgZtrWaf6qS3jZOCnCH7WYfrvFjkC51bM8fz3RsCA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 h1:pFyd6EwwL2TqFf8emdthzeX+gZE1ElRq3iM8pui4KBY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= +google.golang.org/grpc v1.75.1 h1:/ODCNEuf9VghjgO3rqLcfg8fiOP0nSluljWFlDxELLI= +google.golang.org/grpc v1.75.1/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ= +google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= +google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/rest_api.go b/internal/rest_api.go new file mode 100644 index 0000000..5d195b2 --- /dev/null +++ b/internal/rest_api.go @@ -0,0 +1,171 @@ +package internal + +import ( + "encoding/json" + "net/http" + "os" + + cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/google/uuid" +) + +// API holds reference to snapshot manager +type API struct { + Manager *SnapshotManager +} + +// AddClusterRequest defines payload to add a cluster +type AddClusterRequest struct { + Name string `json:"name"` +} + +// RemoveClusterRequest defines payload to remove a cluster +type RemoveClusterRequest struct { + Name string `json:"name"` +} + +// AddRouteRequest defines payload to add a route +type AddRouteRequest struct { + Name string `json:"name"` + Cluster string `json:"cluster"` + PathPrefix string `json:"path_prefix"` +} + +// RemoveRouteRequest defines payload to remove a route +type RemoveRouteRequest struct { + Name string `json:"name"` +} + +// SnapshotFileRequest defines payload to load/save snapshot from/to file +type SnapshotFileRequest struct { + Path string `json:"path"` +} + +// NewAPI returns a new REST API handler +func NewAPI(cache cachev3.SnapshotCache, nodeID string) *API { + return &API{ + Manager: NewSnapshotManager(cache, nodeID), + } +} + +// RegisterRoutes mounts REST handlers +func (api *API) RegisterRoutes(mux *http.ServeMux) { + mux.HandleFunc("/add-cluster", api.addCluster) + mux.HandleFunc("/remove-cluster", api.removeCluster) + mux.HandleFunc("/add-route", api.addRoute) + mux.HandleFunc("/remove-route", api.removeRoute) + mux.HandleFunc("/load-snapshot", api.loadSnapshot) + mux.HandleFunc("/save-snapshot", api.saveSnapshot) +} + +// ---------------- Cluster Handlers ---------------- + +func (api *API) addCluster(w http.ResponseWriter, r *http.Request) { + var req AddClusterRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "invalid request", http.StatusBadRequest) + return + } + if req.Name == "" { + req.Name = uuid.NewString() + } + + cluster := NewCluster(req.Name) + if err := api.Manager.AddCluster(cluster); err != nil { + http.Error(w, "failed to add cluster", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusCreated) + json.NewEncoder(w).Encode(map[string]string{"cuid": req.Name}) +} + +func (api *API) removeCluster(w http.ResponseWriter, r *http.Request) { + var req RemoveClusterRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Name == "" { + http.Error(w, "name required", http.StatusBadRequest) + return + } + + if err := api.Manager.RemoveCluster(req.Name); err != nil { + http.Error(w, "failed to remove cluster", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} + +// ---------------- Route Handlers ---------------- + +func (api *API) addRoute(w http.ResponseWriter, r *http.Request) { + var req AddRouteRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil || + req.Name == "" || req.Cluster == "" || req.PathPrefix == "" { + http.Error(w, "invalid request", http.StatusBadRequest) + return + } + + route := NewRoute(req.Name, req.Cluster, req.PathPrefix) + if err := api.Manager.AddRoute(route); err != nil { + http.Error(w, "failed to add route", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusCreated) + json.NewEncoder(w).Encode(map[string]string{"route": req.Name}) +} + +func (api *API) removeRoute(w http.ResponseWriter, r *http.Request) { + var req RemoveRouteRequest + // Decode request and check for required 'Name' field + if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Name == "" { + http.Error(w, "route name required", http.StatusBadRequest) + return + } + + // Call the SnapshotManager's RemoveRoute method + if err := api.Manager.RemoveRoute(req.Name); err != nil { + // If the route doesn't exist, the manager handles the snapshot update anyway, + // so we mainly worry about cache read/write failures here. + http.Error(w, "failed to remove route", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} + +// ---------------- Snapshot File Handlers ---------------- + +func (api *API) loadSnapshot(w http.ResponseWriter, r *http.Request) { + var req SnapshotFileRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Path == "" { + http.Error(w, "path required", http.StatusBadRequest) + return + } + if _, err := os.Stat(req.Path); os.IsNotExist(err) { + http.Error(w, "file not found", http.StatusBadRequest) + return + } + + if err := api.Manager.LoadSnapshotFromFile(req.Path); err != nil { + http.Error(w, "failed to load snapshot", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} + +func (api *API) saveSnapshot(w http.ResponseWriter, r *http.Request) { + var req SnapshotFileRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Path == "" { + http.Error(w, "path required", http.StatusBadRequest) + return + } + + if err := api.Manager.SaveSnapshotToFile(req.Path); err != nil { + http.Error(w, "failed to save snapshot", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} diff --git a/internal/run_server.go b/internal/run_server.go new file mode 100644 index 0000000..a4f5bfc --- /dev/null +++ b/internal/run_server.go @@ -0,0 +1,66 @@ +package internal + +import ( + "fmt" + "log" + "net" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" + + clusterservice "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3" + discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + endpointservice "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3" + listenerservice "github.com/envoyproxy/go-control-plane/envoy/service/listener/v3" + routeservice "github.com/envoyproxy/go-control-plane/envoy/service/route/v3" + runtimeservice "github.com/envoyproxy/go-control-plane/envoy/service/runtime/v3" + secretservice "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3" + + "github.com/envoyproxy/go-control-plane/pkg/server/v3" +) + +const ( + grpcKeepaliveTime = 30 * time.Second + grpcKeepaliveTimeout = 5 * time.Second + grpcKeepaliveMinTime = 30 * time.Second + grpcMaxConcurrent = 1000000 +) + +// RunServer starts the xDS management server on the given port. +func RunServer(srv server.Server, port uint) { + var grpcOptions []grpc.ServerOption + grpcOptions = append(grpcOptions, + grpc.MaxConcurrentStreams(grpcMaxConcurrent), + grpc.KeepaliveParams(keepalive.ServerParameters{ + Time: grpcKeepaliveTime, + Timeout: grpcKeepaliveTimeout, + }), + grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: grpcKeepaliveMinTime, + PermitWithoutStream: true, + }), + ) + + grpcServer := grpc.NewServer(grpcOptions...) + + // Register all xDS services + discoverygrpc.RegisterAggregatedDiscoveryServiceServer(grpcServer, srv) + endpointservice.RegisterEndpointDiscoveryServiceServer(grpcServer, srv) + clusterservice.RegisterClusterDiscoveryServiceServer(grpcServer, srv) + routeservice.RegisterRouteDiscoveryServiceServer(grpcServer, srv) + listenerservice.RegisterListenerDiscoveryServiceServer(grpcServer, srv) + secretservice.RegisterSecretDiscoveryServiceServer(grpcServer, srv) + runtimeservice.RegisterRuntimeDiscoveryServiceServer(grpcServer, srv) + + addr := fmt.Sprintf(":%d", port) + lis, err := net.Listen("tcp", addr) + if err != nil { + log.Fatalf("failed to listen on %s: %v", addr, err) + } + + log.Printf("management server listening on %s\n", addr) + if err := grpcServer.Serve(lis); err != nil { + log.Fatalf("gRPC server failed: %v", err) + } +} diff --git a/internal/snapshot.go b/internal/snapshot.go new file mode 100644 index 0000000..ba72f84 --- /dev/null +++ b/internal/snapshot.go @@ -0,0 +1,256 @@ +package internal + +import ( + "context" + "encoding/json" + "os" + "time" + + clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + routev3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" + cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "google.golang.org/protobuf/types/known/durationpb" +) + +// SnapshotManager wraps a SnapshotCache and provides file loading/modifying +type SnapshotManager struct { + Cache cachev3.SnapshotCache + NodeID string +} + +// NewSnapshotManager creates a new manager for a given cache and node +func NewSnapshotManager(cache cachev3.SnapshotCache, nodeID string) *SnapshotManager { + return &SnapshotManager{ + Cache: cache, + NodeID: nodeID, + } +} + +// LoadSnapshotFromFile loads a snapshot from a JSON file +func (sm *SnapshotManager) LoadSnapshotFromFile(filePath string) error { + data, err := os.ReadFile(filePath) + if err != nil { + return err + } + + var raw map[string][]json.RawMessage + if err := json.Unmarshal(data, &raw); err != nil { + return err + } + + resources := make(map[resourcev3.Type][]types.Resource) + + for typStr, arr := range raw { + typ := resourcev3.Type(typStr) + for _, r := range arr { + switch typ { + case resourcev3.ClusterType: + var c clusterv3.Cluster + if err := json.Unmarshal(r, &c); err != nil { + return err + } + resources[typ] = append(resources[typ], &c) + case resourcev3.RouteType: + var rt routev3.RouteConfiguration + if err := json.Unmarshal(r, &rt); err != nil { + return err + } + resources[typ] = append(resources[typ], &rt) + default: + // skip unknown types + } + } + } + + snap, _ := cachev3.NewSnapshot("snap-from-file", resources) + return sm.Cache.SetSnapshot(context.TODO(), sm.NodeID, snap) +} + +// AddCluster adds a cluster to the snapshot +func (sm *SnapshotManager) AddCluster(cluster *clusterv3.Cluster) error { + snap, err := sm.Cache.GetSnapshot(sm.NodeID) + var clusters []types.Resource + var routes []types.Resource + if err != nil { + clusters = []types.Resource{} + routes = []types.Resource{} + } else { + // Convert map to slice + clusters = mapToSlice(snap.GetResources(string(resourcev3.ClusterType))) + routes = mapToSlice(snap.GetResources(string(resourcev3.RouteType))) + } + + clusters = append(clusters, cluster) + + newSnap, _ := cachev3.NewSnapshot( + "snap-"+cluster.GetName(), + map[resourcev3.Type][]types.Resource{ + resourcev3.ClusterType: clusters, + resourcev3.RouteType: routes, + }, + ) + + return sm.Cache.SetSnapshot(context.TODO(), sm.NodeID, newSnap) +} + +// AddRoute adds a route configuration to the snapshot +func (sm *SnapshotManager) AddRoute(route *routev3.RouteConfiguration) error { + snap, err := sm.Cache.GetSnapshot(sm.NodeID) + var clusters []types.Resource + var routes []types.Resource + if err != nil { + clusters = []types.Resource{} + routes = []types.Resource{} + } else { + clusters = mapToSlice(snap.GetResources(string(resourcev3.ClusterType))) + routes = mapToSlice(snap.GetResources(string(resourcev3.RouteType))) + } + + routes = append(routes, route) + + newSnap, _ := cachev3.NewSnapshot( + "snap-"+route.GetName(), + map[resourcev3.Type][]types.Resource{ + resourcev3.ClusterType: clusters, + resourcev3.RouteType: routes, + }, + ) + + return sm.Cache.SetSnapshot(context.TODO(), sm.NodeID, newSnap) +} + +// RemoveRoute removes a route configuration by name +func (sm *SnapshotManager) RemoveRoute(name string) error { + snap, err := sm.Cache.GetSnapshot(sm.NodeID) + if err != nil { + return err + } + + // Keep clusters unchanged + clusters := mapToSlice(snap.GetResources(string(resourcev3.ClusterType))) + routes := []types.Resource{} + + // Filter routes: keep only those that do not match the given name + for _, r := range snap.GetResources(string(resourcev3.RouteType)) { + if rt, ok := r.(*routev3.RouteConfiguration); ok && rt.GetName() != name { + routes = append(routes, rt) + } + } + + // Create a new snapshot with the filtered route list + newSnap, _ := cachev3.NewSnapshot( + "snap-remove-route-"+name, + map[resourcev3.Type][]types.Resource{ + resourcev3.ClusterType: clusters, + resourcev3.RouteType: routes, + }, + ) + + return sm.Cache.SetSnapshot(context.TODO(), sm.NodeID, newSnap) +} + +// RemoveCluster removes a cluster by name +func (sm *SnapshotManager) RemoveCluster(name string) error { + snap, err := sm.Cache.GetSnapshot(sm.NodeID) + if err != nil { + return err + } + + clusters := []types.Resource{} + routes := mapToSlice(snap.GetResources(string(resourcev3.RouteType))) + + for _, r := range snap.GetResources(string(resourcev3.ClusterType)) { + if c, ok := r.(*clusterv3.Cluster); ok && c.GetName() != name { + clusters = append(clusters, c) + } + } + + newSnap, _ := cachev3.NewSnapshot( + "snap-remove-"+name, + map[resourcev3.Type][]types.Resource{ + resourcev3.ClusterType: clusters, + resourcev3.RouteType: routes, + }, + ) + + return sm.Cache.SetSnapshot(context.TODO(), sm.NodeID, newSnap) +} + +// SaveSnapshotToFile saves snapshot to a JSON file +func (sm *SnapshotManager) SaveSnapshotToFile(filePath string) error { + snap, err := sm.Cache.GetSnapshot(sm.NodeID) + if err != nil { + return err + } + + out := make(map[string][]interface{}) + + for _, r := range snap.GetResources(string(resourcev3.ClusterType)) { + if c, ok := r.(*clusterv3.Cluster); ok { + out[string(resourcev3.ClusterType)] = append(out[string(resourcev3.ClusterType)], c) + } + } + + for _, r := range snap.GetResources(string(resourcev3.RouteType)) { + if rt, ok := r.(*routev3.RouteConfiguration); ok { + out[string(resourcev3.RouteType)] = append(out[string(resourcev3.RouteType)], rt) + } + } + + data, err := json.MarshalIndent(out, "", " ") + if err != nil { + return err + } + + return os.WriteFile(filePath, data, 0644) +} + +// ----------------- Helpers ----------------- + +// Convert map[name]Resource → slice of Resource +func mapToSlice(m map[string]types.Resource) []types.Resource { + out := make([]types.Resource, 0, len(m)) + for _, r := range m { + out = append(out, r) + } + return out +} + +// NewCluster creates a simple cluster +func NewCluster(name string) *clusterv3.Cluster { + return &clusterv3.Cluster{ + Name: name, + ConnectTimeout: durationpb.New(5 * time.Second), + ClusterDiscoveryType: &clusterv3.Cluster_Type{ + Type: clusterv3.Cluster_EDS, + }, + LbPolicy: clusterv3.Cluster_ROUND_ROBIN, + } +} + +// NewRoute creates a simple route tied to a cluster +func NewRoute(name, clusterName, prefix string) *routev3.RouteConfiguration { + return &routev3.RouteConfiguration{ + Name: name, + VirtualHosts: []*routev3.VirtualHost{ + { + Name: "vh-" + name, + Domains: []string{"*"}, + Routes: []*routev3.Route{ + { + Match: &routev3.RouteMatch{ + PathSpecifier: &routev3.RouteMatch_Prefix{Prefix: prefix}, + }, + Action: &routev3.Route_Route{ + Route: &routev3.RouteAction{ + ClusterSpecifier: &routev3.RouteAction_Cluster{Cluster: clusterName}, + }, + }, + }, + }, + }, + }, + } +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..b60ea9f --- /dev/null +++ b/main.go @@ -0,0 +1,92 @@ +package main + +import ( + "context" + "flag" + "fmt" + "net/http" + "os" + + "github.com/envoyproxy/go-control-plane/pkg/cache/types" + "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/log" + resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/v3" + "github.com/envoyproxy/go-control-plane/pkg/test/v3" + + "envoy-control-plane/internal" +) + +var ( + logger *log.DefaultLogger + port uint + nodeID string + restPort uint + snapshotFile string +) + +func init() { + logger = log.NewDefaultLogger() + + flag.UintVar(&port, "port", 18000, "xDS management server port") + flag.StringVar(&nodeID, "nodeID", "test-id", "Node ID") + flag.UintVar(&restPort, "rest-port", 8080, "REST API server port") + flag.StringVar(&snapshotFile, "snapshot-file", "", "Optional initial snapshot JSON file") +} + +func main() { + flag.Parse() + + // Create snapshot cache + cache := cache.NewSnapshotCache(false, cache.IDHash{}, logger) + + // Create SnapshotManager + manager := internal.NewSnapshotManager(cache, nodeID) + + // Load initial snapshot from file if provided + if snapshotFile != "" { + if _, err := os.Stat(snapshotFile); err == nil { + if err := manager.LoadSnapshotFromFile(snapshotFile); err != nil { + logger.Errorf("failed to load snapshot from file: %v", err) + os.Exit(1) + } + logger.Infof("loaded initial snapshot from %s", snapshotFile) + } else { + logger.Warnf("snapshot file not found: %s, starting empty snapshot", snapshotFile) + } + } + + // Ensure snapshot is consistent or create empty snapshot + snap, err := cache.GetSnapshot(nodeID) + if err != nil { + snap, _ = cachev3.NewSnapshot("snap-init", map[resourcev3.Type][]types.Resource{ + resourcev3.ClusterType: {}, + resourcev3.RouteType: {}, + }) + if err := cache.SetSnapshot(context.Background(), nodeID, snap); err != nil { + logger.Errorf("failed to set initial snapshot: %v", err) + os.Exit(1) + } + } + + logger.Infof("xDS snapshot ready: %+v", snap) + + // Start xDS gRPC server + ctx := context.Background() + cb := &test.Callbacks{Debug: true} + srv := server.NewServer(ctx, cache, cb) + go internal.RunServer(srv, port) // your existing RunServer implementation + + // Start REST API server + api := internal.NewAPI(cache, nodeID) + mux := http.NewServeMux() + api.RegisterRoutes(mux) + + restAddr := ":" + fmt.Sprint(restPort) + logger.Infof("starting REST API server on %s", restAddr) + if err := http.ListenAndServe(restAddr, mux); err != nil { + logger.Errorf("REST server error: %v", err) + os.Exit(1) + } +}