package controller import ( "context" "sync" "time" logf "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/logs" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ) type runFunc func(context.Context) type runPeriodicFunc struct { fn runFunc interval time.Duration } type controller struct { // ctx is the root golang context for the controller ctx context.Context // name of the controller name string // the function that should be called when an item is popped // off the workqueue syncHandler func(ctx context.Context, key string) error // mustSync is a slice of informers that must have synced before // this controller can start mustSync []cache.InformerSynced // a set of functions will be called just after the controller initialization, once. runFirstFuncs []runFunc // a set of functions will be called every interval, periodic. periodicFuncs []runPeriodicFunc // queue is a reference to the queue used to enqueue resources // to be processed queue workqueue.RateLimitingInterface } type queueingController interface { Register(*Context) (workqueue.RateLimitingInterface, []cache.InformerSynced, error) ProcessItem(ctx context.Context, key string) error } // Create a new controller instance func NewController(ctx context.Context, name string, syncFunc func(ctx context.Context, key string) error, mustSync []cache.InformerSynced, runPeriodicFunc []runPeriodicFunc, queue workqueue.RateLimitingInterface) Interface { return &controller{ ctx: ctx, name: name, syncHandler: syncFunc, mustSync: mustSync, periodicFuncs: runPeriodicFunc, queue: queue, } } // Run starts the controller loop func (c *controller) Run(workers int, stopCh <-chan struct{}) error { ctx, cancel := context.WithCancel(c.ctx) defer cancel() log := logf.FromContext(ctx) log.V(logf.DebugLevel).Info("starting control loop") var wg sync.WaitGroup for i := 0; i < workers; i++ { wg.Add(1) go func() { defer wg.Done() c.worker(ctx) }() } // run the intial funcs for _, f := range c.runFirstFuncs { f(ctx) } // fun the periodical funcs for _, f := range c.periodicFuncs { f := f // capture range variable go wait.Until(func() { f.fn(ctx) }, f.interval, stopCh) } <-stopCh log.V(logf.InfoLevel).Info("shutting down queue as workqueue signaled shutdown") c.queue.ShutDown() log.V(logf.DebugLevel).Info("waiting for workers to exit...") wg.Wait() log.V(logf.DebugLevel).Info("workers exited") return nil } func (c *controller) worker(ctx context.Context) { log := logf.FromContext(ctx) log.V(logf.DebugLevel).Info("starting worker") for { obj, shutdown := c.queue.Get() if shutdown { break } var key string // use an inlined function so we can use defer func() { defer c.queue.Done(obj) var ok bool if key, ok = obj.(string); !ok { return } log := log.WithValues("key", key) log.V(logf.DebugLevel).Info("syncing item") err := c.syncHandler(ctx, key) if err != nil { log.Error(err, "re-queuing item due to error processing") c.queue.AddRateLimited(obj) return } log.V(logf.DebugLevel).Info("finished processing work item") c.queue.Forget(obj) }() } log.V(logf.DebugLevel).Info("exiting worker loop") }