Newer
Older
AnthosCertManager / cmd / controller / app / controller.go
package app

import (
	"context"
	"fmt"

	"gitbucket.jerxie.com/yangyangxie/AnthosCertManager/cmd/controller/app/options"
	"gitbucket.jerxie.com/yangyangxie/AnthosCertManager/cmd/util"
	"gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller"
	logf "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/logs"
	"golang.org/x/sync/errgroup"
	utilerrors "k8s.io/apimachinery/pkg/util/errors"
	"k8s.io/utils/clock"
)

func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) error {
	rootCtx, cancelContext := context.WithCancel(util.ContextWithStopCh(context.Background(), stopCh))
	defer cancelContext()

	rootCtx = logf.NewContext(rootCtx, logf.Log, "controller")
	log := logf.FromContext(rootCtx)
	g, rootCtx := errgroup.WithContext(rootCtx)

	ctxFactory, err := buildControllerContextFactory(rootCtx, opts)
	if err != nil {
		return err
	}

	// Build the base controller context for the cert-manager controller manager
	// used here.
	ctx, err := ctxFactory.Build()
	if err != nil {
		return err
	}

	enabledControllers := opts.EnabledControllers()
	logf.Log.V(logf.DebugLevel).Info(fmt.Sprintf("enabled controllers: %s", enabledControllers))

	//TODO: Start metrics server
	//TODO: Start profiler
	//TODO: Start leader elect

	for n, fn := range controller.Known() {
		log := log.WithValues("controller", n)

		// only run a controller if it is been enabled
		if !enabledControllers.Has(n) {
			log.V(logf.InfoLevel).Info("not starting controller as it's disabled")
			continue
		}

		iface, err := fn(ctxFactory)
		if err != nil {
			if err != nil {
				err = fmt.Errorf("error starting controller: %v", err)
				cancelContext()
				if err1 := g.Wait(); err1 != nil {
					return utilerrors.NewAggregate([]error{err, err1})
				}
				return err
			}
		}

		g.Go(func() error {
			log.V(logf.InfoLevel).Info("starting controller")

			// TODO: make this either a constant or a command line flag
			workers := 5
			return iface.Run(workers, rootCtx.Done())
		})
	}

	log.V(logf.DebugLevel).Info("starting shared informer factories")
	ctx.SharedInformerFactory.Start(rootCtx.Done())
	ctx.KubeSharedInformerFactory.Start(rootCtx.Done())

	err = g.Wait()
	if err != nil {
		return fmt.Errorf("error starting controller: %v", err)
	}
	log.V(logf.InfoLevel).Info("control loops exited")

	return nil
}

func buildControllerContextFactory(ctx context.Context, opts *options.ControllerOptions) (*controller.ContextFactory, error) {
	// log := logf.FromContext(ctx)

	ctxFactory, err := controller.NewContextFactory(ctx, controller.ContextOptions{
		Clock:         clock.RealClock{},
		Kubeconfig:    opts.Kubeconfig,
		APIServerHost: opts.APIServerHost,
		Namespace:     opts.Namespace,
	})

	if err != nil {
		return nil, err
	}

	return ctxFactory, nil
}