Newer
Older
AnthosCertManager / pkg / controller / controller.go
package controller

import (
	"context"
	"sync"
	"time"

	logf "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/logs"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/util/workqueue"
)

type runFunc func(context.Context)

type runPeriodicFunc struct {
	fn       runFunc
	interval time.Duration
}

type controller struct {
	// ctx is the root golang context for the controller
	ctx context.Context

	// name of the controller
	name string

	// the function that should be called when an item is popped
	// off the workqueue
	syncHandler func(ctx context.Context, key string) error

	// mustSync is a slice of informers that must have synced before
	// this controller can start
	mustSync []cache.InformerSynced

	// a set of functions will be called just after the controller initialization, once.
	runFirstFuncs []runFunc

	// a set of functions will be called every interval, periodic.
	periodicFuncs []runPeriodicFunc

	// queue is a reference to the queue used to enqueue resources
	// to be processed
	queue workqueue.RateLimitingInterface
}

type queueingController interface {
	Register(*Context) (workqueue.RateLimitingInterface, []cache.InformerSynced, error)
	ProcessItem(ctx context.Context, key string) error
}

// Create a new controller instance
func NewController(ctx context.Context,
	name string,
	syncFunc func(ctx context.Context, key string) error,
	mustSync []cache.InformerSynced,
	runPeriodicFunc []runPeriodicFunc,
	queue workqueue.RateLimitingInterface) Interface {
	return &controller{
		ctx:           ctx,
		name:          name,
		syncHandler:   syncFunc,
		mustSync:      mustSync,
		periodicFuncs: runPeriodicFunc,
		queue:         queue,
	}
}

// Run starts the controller loop
func (c *controller) Run(workers int, stopCh <-chan struct{}) error {
	ctx, cancel := context.WithCancel(c.ctx)
	defer cancel()
	log := logf.FromContext(ctx)

	log.V(logf.DebugLevel).Info("starting control loop")

	var wg sync.WaitGroup
	for i := 0; i < workers; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			c.worker(ctx)
		}()
	}

	// run the intial funcs
	for _, f := range c.runFirstFuncs {
		f(ctx)
	}

	// fun the periodical funcs
	for _, f := range c.periodicFuncs {
		f := f // capture range variable
		go wait.Until(func() { f.fn(ctx) }, f.interval, stopCh)
	}

	<-stopCh
	log.V(logf.InfoLevel).Info("shutting down queue as workqueue signaled shutdown")
	c.queue.ShutDown()
	log.V(logf.DebugLevel).Info("waiting for workers to exit...")
	wg.Wait()
	log.V(logf.DebugLevel).Info("workers exited")
	return nil
}

func (c *controller) worker(ctx context.Context) {
	log := logf.FromContext(ctx)
	log.V(logf.DebugLevel).Info("starting worker")
	for {
		obj, shutdown := c.queue.Get()
		if shutdown {
			break
		}

		var key string
		// use an inlined function so we can use defer
		func() {
			defer c.queue.Done(obj)
			var ok bool
			if key, ok = obj.(string); !ok {
				return
			}
			log := log.WithValues("key", key)
			log.V(logf.DebugLevel).Info("syncing item")

			err := c.syncHandler(ctx, key)
			if err != nil {
				log.Error(err, "re-queuing item due to error processing")

				c.queue.AddRateLimited(obj)
				return
			}

			log.V(logf.DebugLevel).Info("finished processing work item")
			c.queue.Forget(obj)
		}()
	}
	log.V(logf.DebugLevel).Info("exiting worker loop")
}