Newer
Older
EnvoyControlPlane / internal / app / app.go
// internal/app/app.go
package app

import (
	"context"
	"database/sql"
	"fmt"
	"net/http"
	"os"
	"path/filepath"
	"strings"

	"github.com/envoyproxy/go-control-plane/pkg/cache/types"
	cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
	resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
	"github.com/envoyproxy/go-control-plane/pkg/server/v3"
	"github.com/envoyproxy/go-control-plane/pkg/test/v3"

	"envoy-control-plane/internal"
	"envoy-control-plane/internal/config"
	internallog "envoy-control-plane/internal/log"
	"envoy-control-plane/internal/pkg/api"
	"envoy-control-plane/internal/pkg/cert"
	rtspserver "envoy-control-plane/internal/pkg/server"
	"envoy-control-plane/internal/pkg/snapshot"
	internalstorage "envoy-control-plane/internal/pkg/storage"
)

// loadConfigFiles has been moved or recreated to accept and use a context
// It is the same logic extracted from your original main.go
func loadConfigFiles(ctx context.Context, dir string) (map[string][]types.Resource, error) {
	log := internallog.LogFromContext(ctx)
	log.Infof("loading configuration files from directory: %s", dir)

	files, err := os.ReadDir(dir)
	if err != nil {
		return nil, fmt.Errorf("failed to read directory %s: %w", dir, err)
	}

	resourceFiles := make(map[string][]types.Resource)
	for _, file := range files {
		if file.IsDir() {
			continue
		}
		fileName := file.Name()
		if strings.HasSuffix(fileName, ".yaml") || strings.HasSuffix(fileName, ".yml") || strings.HasSuffix(fileName, ".json") {
			filePath := filepath.Join(dir, fileName)
			log.Infof("  -> loading config file: %s", filePath)

			rf, err := snapshot.LoadSnapshotFromFile(ctx, filePath)
			if err != nil {
				return nil, fmt.Errorf("failed to load snapshot from file %s: %w", filePath, err)
			}
			for k, v := range rf {
				resourceFiles[k] = append(resourceFiles[k], v...)
			}
			log.Infof("loaded %d resources from %s", len(rf), filePath)
		}
	}
	return resourceFiles, nil
}

// loadInitialConfig attempts to load the configuration from DB, config dir, or snapshot file.
// It also ensures the loaded config is saved to the DB if it came from files.
func loadInitialConfig(
	ctx context.Context,
	manager *snapshot.SnapshotManager,
	storage *internalstorage.Storage,
	cfg *config.Config,
) error {
	log := internallog.LogFromContext(ctx)
	loadedConfigs := false

	// --- 1. Attempt to load from Database ---
	snapCfg, err := storage.RebuildSnapshot(ctx)
	if err == nil && (len(snapCfg.EnabledClusters) > 0 || len(snapCfg.EnabledListeners) > 0) {
		if err := manager.SetSnapshotFromConfig(ctx, "snap-from-db", snapCfg); err != nil {
			return fmt.Errorf("failed to set DB snapshot: %w", err)
		}
		loadedConfigs = true
		log.Infof("loaded snapshot from database")
	}

	if !loadedConfigs {
		var resources map[string][]types.Resource
		snapSource := ""

		// --- 2. Attempt to load from Config Directory ---
		if cfg.ConfigDir != "" {
			resources, err = loadConfigFiles(ctx, cfg.ConfigDir)
			if err != nil {
				return fmt.Errorf("failed to load configs from directory: %w", err)
			}
			snapSource = "snap-from-dir"
			loadedConfigs = len(resources) > 0
			if loadedConfigs {
				log.Infof("loaded snapshot from directory: %s", cfg.ConfigDir)
			}
		}

		// --- 3. Attempt to load from Snapshot File (only if dir didn't load any) ---
		if !loadedConfigs && cfg.SnapshotFile != "" {
			if _, err := os.Stat(cfg.SnapshotFile); err == nil {
				resources, err = snapshot.LoadSnapshotFromFile(ctx, cfg.SnapshotFile)
				if err != nil {
					return fmt.Errorf("failed to load snapshot from file: %w", err)
				}
				snapSource = "snap-from-file"
				loadedConfigs = len(resources) > 0
				if loadedConfigs {
					log.Infof("loaded snapshot from file: %s", cfg.SnapshotFile)
				}
			} else {
				log.Warnf("snapshot file not found: %s", cfg.SnapshotFile)
			}
		}

		// --- 4. Apply and Save Loaded Config (if any) ---
		if loadedConfigs {
			if err := manager.SetSnapshot(ctx, snapSource, resources); err != nil {
				return fmt.Errorf("failed to set loaded snapshot: %w", err)
			}

			// Save the newly loaded snapshot to the DB for persistence
			snapCfgToSave, err := manager.SnapshotToConfig(ctx, cfg.NodeID)
			if err != nil {
				return fmt.Errorf("failed to convert snapshot to DB config: %w", err)
			}
			if err := storage.SaveSnapshot(ctx, snapCfgToSave, internalstorage.DeleteLogical); err != nil {
				return fmt.Errorf("failed to save initial snapshot into DB: %w", err)
			}
			log.Infof("initial snapshot written into database from %s", snapSource)
		}
	}

	// --- 5. Ensure an initial snapshot exists (even an empty one) ---
	snap, err := manager.Cache.GetSnapshot(cfg.NodeID)
	if err != nil || !loadedConfigs {
		log.Warnf("no valid snapshot found, creating empty snapshot")
		snap, _ = cachev3.NewSnapshot("snap-init", map[resourcev3.Type][]types.Resource{
			resourcev3.ClusterType:  {},
			resourcev3.RouteType:    {},
			resourcev3.ListenerType: {},
			// Add other resource types if needed (e.g., resourcev3.SecretType)
		})
		if err := manager.Cache.SetSnapshot(ctx, cfg.NodeID, snap); err != nil {
			return fmt.Errorf("failed to set initial empty snapshot: %w", err)
		}
	}

	log.Infof("xDS snapshot ready: version %s", snap.GetVersion(string(resourcev3.ClusterType)))

	return nil
}

// Run encapsulates the entire application startup logic.
func Run(ctx context.Context) error {
	log := internallog.LogFromContext(ctx)
	cfg := config.GetConfig() // Use the globally available config

	// 1. Conditional Certificate Issuance (Non-blocking)
	cert.RunCertIssuance(ctx)

	// 2. Database Initialization
	dbConnStr, dbDriver, err := internalstorage.SetupDBConnection(ctx, cfg.DBConnStr)
	if err != nil {
		return fmt.Errorf("database setup failed: %w", err)
	}
	db, err := sql.Open(dbDriver, dbConnStr)
	if err != nil {
		return fmt.Errorf("failed to connect to DB: %w", err)
	}
	defer db.Close()

	// 3. Storage and Snapshot Manager Setup
	storage := internalstorage.NewStorage(db, dbDriver)
	if err := storage.InitSchema(ctx); err != nil {
		return fmt.Errorf("failed to initialize DB schema: %w", err)
	}

	cache := cachev3.NewSnapshotCache(false, cachev3.IDHash{}, log)
	manager := snapshot.NewSnapshotManager(cache, cfg.NodeID, storage)

	// 4. Load Initial Configuration (DB, Dir, or File)
	if err := loadInitialConfig(ctx, manager, storage, cfg); err != nil {
		return fmt.Errorf("failed to load initial configuration: %w", err)
	}

	// 5. Start xDS gRPC Server
	cb := &test.Callbacks{Debug: true}
	srv := server.NewServer(ctx, cache, cb)
	go internal.RunServer(srv, cfg.Port) // Assuming internal.RunServer is correct

	// 6. Start REST API Server
	mux := http.NewServeMux()
	api.RegisterRoutes(mux, manager, cfg.EnableCertIssuance, cfg.WebrootPath) // Pass needed dependencies

	return rtspserver.RunRESTServer(ctx, mux, cfg.RESTPort, cfg.WebrootPath, cfg.EnableCertIssuance)
}