package issuers import ( "context" "github.com/go-logr/logr" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" corev1 "k8s.io/api/core/v1" k8sErrors "k8s.io/apimachinery/pkg/api/errors" "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/issuer" logf "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/logs" acmClient "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/clientset/versioned" acmlisters "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/listers/anthoscertmanager/v1" controllerpkg "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller" ) const ( ControllerName = "issuers" ) var keyFunc = controllerpkg.KeyFunc type controller struct { issuerLister acmlisters.IssuerLister secretLister corelisters.SecretLister queue workqueue.RateLimitingInterface log logr.Logger acmClient acmClient.Interface recorder record.EventRecorder issuerFactory issuer.Factory // fieldManager is the manager name used for the Apply operations. fieldManager string } // Register registers and constructs the controller using the provided context. // It returns the workqueue to be used to enqueue items, a list of // InformerSynced functions that must be synced, or an error. func (c *controller) Register(ctx *controllerpkg.Context) (workqueue.RateLimitingInterface, []cache.InformerSynced, error) { c.log = logf.FromContext(ctx.RootContext, ControllerName) // create a queue used to queue up items to be processed c.queue = workqueue.NewNamedRateLimitingQueue(controllerpkg.DefaultItemBasedRateLimiter(), ControllerName) issuerInformer := ctx.SharedInformerFactory.AnthosCertmanager().V1().Issuers() secretInformer := ctx.KubeSharedInformerFactory.Core().V1().Secrets() mustSync := []cache.InformerSynced{ issuerInformer.Informer().HasSynced, secretInformer.Informer().HasSynced, } c.issuerLister = issuerInformer.Lister() c.secretLister = secretInformer.Lister() issuerInformer.Informer().AddEventHandler(&controllerpkg.QueuingEventHandler{Queue: c.queue}) secretInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{WorkFunc: c.secreMutated}) // instantiate additional helpers used by this controller c.issuerFactory = issuer.NewFactory(ctx) c.acmClient = ctx.ACMClient c.fieldManager = ctx.FieldManager c.recorder = ctx.Recorder return c.queue, mustSync, nil } // TODO: replace with generic handleObject function (like Navigator) func (c *controller) secreMutated(obj interface{}) { log := c.log.WithName("secreMutated") var secret *corev1.Secret var ok bool secret, ok = obj.(*corev1.Secret) if !ok { log.Error(nil, "object was not a secret object") return } log = logf.WithResource(log, secret) issuers, err := c.issuersForSecret(secret) if err != nil { log.Error(err, "error looking up issuers observing secret") return } for _, iss := range issuers { key, err := keyFunc(iss) if err != nil { log.Error(err, "error computing key for resource") continue } c.queue.AddRateLimited(key) } } func (c *controller) ProcessItem(ctx context.Context, key string) error { log := logf.FromContext(ctx) namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { log.Error(err, "invalid resource key") return nil } issuer, err := c.issuerLister.Issuers(namespace).Get(name) if err != nil { if k8sErrors.IsNotFound(err) { log.Error(err, "issuer in work queue on longer exists") return nil } return err } ctx = logf.NewContext(ctx, logf.WithResource(log, issuer)) return c.Sync(ctx, issuer) } func init() { controllerpkg.Register(ControllerName, func(ctx *controllerpkg.ContextFactory) (controllerpkg.Interface, error) { return controllerpkg.NewBuilder(ctx, ControllerName). For(&controller{}). Complete() }) }