package controller
import (
"context"
"sync"
"time"
logf "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/logs"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
type runFunc func(context.Context)
type runPeriodicFunc struct {
fn runFunc
interval time.Duration
}
type controller struct {
// ctx is the root golang context for the controller
ctx context.Context
// name of the controller
name string
// the function that should be called when an item is popped
// off the workqueue
syncHandler func(ctx context.Context, key string) error
// mustSync is a slice of informers that must have synced before
// this controller can start
mustSync []cache.InformerSynced
// a set of functions will be called just after the controller initialization, once.
runFirstFuncs []runFunc
// a set of functions will be called every interval, periodic.
periodicFuncs []runPeriodicFunc
// queue is a reference to the queue used to enqueue resources
// to be processed
queue workqueue.RateLimitingInterface
}
type queueingController interface {
Register(*Context) (workqueue.RateLimitingInterface, []cache.InformerSynced, error)
ProcessItem(ctx context.Context, key string) error
}
// Create a new controller instance
func NewController(ctx context.Context,
name string,
syncFunc func(ctx context.Context, key string) error,
mustSync []cache.InformerSynced,
runPeriodicFunc []runPeriodicFunc,
queue workqueue.RateLimitingInterface) Interface {
return &controller{
ctx: ctx,
name: name,
syncHandler: syncFunc,
mustSync: mustSync,
periodicFuncs: runPeriodicFunc,
queue: queue,
}
}
// Run starts the controller loop
func (c *controller) Run(workers int, stopCh <-chan struct{}) error {
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
log := logf.FromContext(ctx)
log.V(logf.DebugLevel).Info("starting control loop")
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
c.worker(ctx)
}()
}
// run the intial funcs
for _, f := range c.runFirstFuncs {
f(ctx)
}
// fun the periodical funcs
for _, f := range c.periodicFuncs {
f := f // capture range variable
go wait.Until(func() { f.fn(ctx) }, f.interval, stopCh)
}
<-stopCh
log.V(logf.InfoLevel).Info("shutting down queue as workqueue signaled shutdown")
c.queue.ShutDown()
log.V(logf.DebugLevel).Info("waiting for workers to exit...")
wg.Wait()
log.V(logf.DebugLevel).Info("workers exited")
return nil
}
func (c *controller) worker(ctx context.Context) {
log := logf.FromContext(ctx)
log.V(logf.DebugLevel).Info("starting worker")
for {
obj, shutdown := c.queue.Get()
if shutdown {
break
}
var key string
// use an inlined function so we can use defer
func() {
defer c.queue.Done(obj)
var ok bool
if key, ok = obj.(string); !ok {
return
}
log := log.WithValues("key", key)
log.V(logf.DebugLevel).Info("syncing item")
err := c.syncHandler(ctx, key)
if err != nil {
log.Error(err, "re-queuing item due to error processing")
c.queue.AddRateLimited(obj)
return
}
log.V(logf.DebugLevel).Info("finished processing work item")
c.queue.Forget(obj)
}()
}
log.V(logf.DebugLevel).Info("exiting worker loop")
}