Newer
Older
EnvoyControlPlane / main.go
package main

import (
	"context"
	"flag"
	"fmt"
	"net/http"
	"os"
	"path/filepath" // ADDED: for directory and file path operations
	"strings"       // ADDED: for string manipulation

	"github.com/envoyproxy/go-control-plane/pkg/cache/types"
	cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
	resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
	"github.com/envoyproxy/go-control-plane/pkg/server/v3"
	"github.com/envoyproxy/go-control-plane/pkg/test/v3"
	"k8s.io/klog/v2"

	"envoy-control-plane/internal"
)

var (
	logger       *internal.DefaultLogger
	port         uint
	nodeID       string
	restPort     uint
	snapshotFile string // Single file flag (kept for backwards compatibility)
	configDir    string // NEW FLAG: Directory containing config files
)

func init() {
	logger = internal.NewDefaultLogger()
	klog.InitFlags(nil)

	flag.UintVar(&port, "port", 18000, "xDS management server port")
	flag.StringVar(&nodeID, "nodeID", "test-id", "Node ID")
	flag.UintVar(&restPort, "rest-port", 8080, "REST API server port")
	// Keeping snapshotFile for backwards compatibility, though configDir is preferred for multiple files
	flag.StringVar(&snapshotFile, "snapshot-file", "", "Optional initial snapshot JSON/YAML file (single file)")
	flag.StringVar(&configDir, "config-dir", "", "Optional directory containing multiple config files (.yaml, .json)") // NEW
}

// loadConfigFiles iterates over the specified directory and loads all .yaml or .json files.
func loadConfigFiles(manager *internal.SnapshotManager, dir string) error {
	logger.Infof("loading configuration files from directory: %s", dir)

	files, err := os.ReadDir(dir)
	if err != nil {
		return fmt.Errorf("failed to read directory %s: %w", dir, err)
	}

	var resourceFiles map[string][]types.Resource = make(map[string][]types.Resource)
	for _, file := range files {
		if file.IsDir() {
			continue
		}

		fileName := file.Name()
		// Only process files with .yaml, .yml, or .json extensions
		if strings.HasSuffix(fileName, ".yaml") || strings.HasSuffix(fileName, ".yml") || strings.HasSuffix(fileName, ".json") {
			filePath := filepath.Join(dir, fileName)
			logger.Infof("  -> loading config file: %s", filePath)

			var rf map[string][]types.Resource
			if rf, err = manager.LoadSnapshotFromFile(filePath); err != nil {
				return fmt.Errorf("failed to load snapshot from file %s: %w", filePath, err)
			}
			for k, v := range rf {
				resourceFiles[k] = append(resourceFiles[k], v...)
			}
			logger.Infof("loaded %d resources from %s", len(rf), filePath)
			// Note: SnapshotManager.LoadSnapshotFromFile must be implemented to *add*
			// resources to the existing snapshot, not replace it entirely, for this
			// iteration to work correctly. (Assuming the implementation you showed
			// previously, which replaced the resources, is now updated to merge/add them,
			// or that LoadSnapshotFromFile always creates a complete snapshot.)
			// Since your `snapshot.go` was updated to load a *complete* snapshot from a file,
			// we will rely on the user to provide files that contain complete, non-conflicting
			// configurations. If you want true file merging, the internal logic needs to change.
			// However, for this change, we'll keep the current behavior of LoadSnapshotFromFile
			// and proceed.
		}
	}
	if err := manager.SetSnapshot(context.TODO(), "snap-from-file", resourceFiles); err != nil {
		return fmt.Errorf("failed to set combined snapshot from files: %w", err)
	}
	logger.Infof("successfully loaded %d configuration files from %s", len(files), dir)
	return nil
}

func main() {
	flag.Parse()

	// Optional: Configure klog to flush logs when the application exits
	defer klog.Flush()
	// Create snapshot cache
	cache := cachev3.NewSnapshotCache(false, cachev3.IDHash{}, logger)

	// Create SnapshotManager
	manager := internal.NewSnapshotManager(cache, nodeID)

	loadedConfigs := false
	// OPTION 1: Load multiple config files from a directory
	if configDir != "" {
		if err := loadConfigFiles(manager, configDir); err != nil {
			logger.Errorf("failed to load configs from directory: %v", err)
			os.Exit(1)
		}

		loadedConfigs = true
		logger.Infof("successfully loaded all configuration files from %s", configDir)
	}

	// OPTION 2: Load single initial snapshot file (backwards compatible/single file mode)
	if snapshotFile != "" && !loadedConfigs {
		if _, err := os.Stat(snapshotFile); err == nil {
			if resources, err := manager.LoadSnapshotFromFile(snapshotFile); err != nil {
				logger.Errorf("failed to load snapshot from file: %v", err)
				os.Exit(1)
			} else {
				if err := manager.SetSnapshot(context.TODO(), "snap-from-file", resources); err != nil {
					logger.Errorf("failed to set loaded snapshot: %v", err)
					os.Exit(1)
				}
			}
			loadedConfigs = true
			logger.Infof("loaded initial snapshot from %s", snapshotFile)
		} else {
			logger.Warnf("snapshot file not found: %s", snapshotFile)
		}
	}

	// Ensure snapshot is consistent or create empty snapshot if no configs were loaded
	snap, err := manager.Cache.GetSnapshot(nodeID)
	if err != nil || !loadedConfigs {
		// If an error occurred or no files were loaded, set an initial empty snapshot
		if !loadedConfigs {
			logger.Warnf("no configuration files loaded, creating empty snapshot.")
			snap, _ = cachev3.NewSnapshot("snap-init", map[resourcev3.Type][]types.Resource{
				resourcev3.ClusterType:  {},
				resourcev3.RouteType:    {},
				resourcev3.ListenerType: {}, // Assuming ListenerType support is added in internal
			})
		} else {
			// This case should ideally not happen if loading was successful, but handles initial cache miss.
			logger.Warnf("no snapshot found in cache, creating empty snapshot.")
		}

		if err := cache.SetSnapshot(context.Background(), nodeID, snap); err != nil {
			logger.Errorf("failed to set initial snapshot: %v", err)
			os.Exit(1)
		}
	}

	logger.Infof("xDS snapshot ready: version %s", snap.GetVersion(string(resourcev3.ClusterType))) // Use a specific type to show version

	// Start xDS gRPC server
	ctx := context.Background()
	cb := &test.Callbacks{Debug: true}
	srv := server.NewServer(ctx, cache, cb)
	go internal.RunServer(srv, port) // your existing RunServer implementation

	// Start REST API server
	api := internal.NewAPI(manager)
	mux := http.NewServeMux()
	api.RegisterRoutes(mux)

	restAddr := ":" + fmt.Sprint(restPort)
	logger.Infof("starting REST API server on %s", restAddr)
	if err := http.ListenAndServe(restAddr, mux); err != nil {
		logger.Errorf("REST server error: %v", err)
		os.Exit(1)
	}
}