package trigger import ( "context" "time" acmapi "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/apis/anthoscertmanager/v1" acmmeta "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/apis/meta/v1" acmclient "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/clientset/versioned" acmlisters "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/listers/anthoscertmanager/v1" controllerpkg "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller" "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller/certificates" "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/util/predicate" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" apiutil "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/api/util" acminformers "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/informers/externalversions" policies "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller/certificates/policies" logf "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/logs" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/informers" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/utils/clock" ) const ( ControllerName = "certificates-trigger" ) type controller struct { certificateLister acmlisters.CertificateLister certificateRequestLister acmlisters.CertificateRequestLister secretLister corelisters.SecretLister client acmclient.Interface recorder record.EventRecorder // scheduledWorkQueue scheduler.ScheduledWorkQueue // fieldManager is the string which will be used as the Field Manager on // fields created or edited by the cert-manager Kubernetes client during // Apply API calls. fieldManager string // The following are used for testing purposes. clock clock.Clock shouldReissue policies.Func dataForCertificate func(context.Context, *acmapi.Certificate) (policies.Input, error) } func NewController(log logr.Logger, client acmclient.Interface, factory informers.SharedInformerFactory, acmFactory acminformers.SharedInformerFactory, recorder record.EventRecorder, clock clock.Clock, shouldReissue policies.Func, fieldManager string, ) (*controller, workqueue.RateLimitingInterface, []cache.InformerSynced) { // create a queue used to queue up items to be processed. queue := workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(time.Second*1, time.Second*30), ControllerName) // obtain references to all the informers used by this controller certificateInformer := acmFactory.AnthosCertmanager().V1().Certificates() certificateRequestInformer := acmFactory.AnthosCertmanager().V1().CertificateRequests() secretsInformer := factory.Core().V1().Secrets() // Certificate events will be handled asynchonizelly certificateInformer.Informer().AddEventHandler(&controllerpkg.QueuingEventHandler{Queue: queue}) // When a CertificateRequest resource changes, enqueue the Certificate resource that owns it. certificateRequestInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{ WorkFunc: certificates.EnqueueCertificatesForResourceUsingPredicates(log, queue, certificateInformer.Lister(), labels.Everything(), predicate.ResourceOwnerOf), }) // When a secret resource changes, enqueue any certificate resource that name it as spec.SecretName secretsInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{ WorkFunc: certificates.EnqueueCertificatesForResourceUsingPredicates(log, queue, certificateInformer.Lister(), labels.Everything(), predicate.ResourceOwnerOf), }) // build a list of InformerSynced functions that will be returned by the Register method. // the controller will only begin processing items once all of these informers have synced. mustSync := []cache.InformerSynced{ certificateRequestInformer.Informer().HasSynced, secretsInformer.Informer().HasSynced, certificateInformer.Informer().HasSynced, } return &controller{ certificateLister: certificateInformer.Lister(), certificateRequestLister: certificateRequestInformer.Lister(), secretLister: secretsInformer.Lister(), client: client, recorder: recorder, //scheduledWorkQueue: scheduler.NewScheduledWorkQueue(clock, queue.Add), fieldManager: fieldManager, // The following are used for testing purposes. clock: clock, shouldReissue: shouldReissue, dataForCertificate: (&policies.Gatherer{ CertificateRequestLister: certificateRequestInformer.Lister(), SecretLister: secretsInformer.Lister(), }).DataForCertificate, }, queue, mustSync } func (c *controller) ProcessItem(ctx context.Context, key string) error { log := logf.FromContext(ctx).WithValues("key", key) ctx = logf.NewContext(ctx, log) namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { log.Error(err, "invalid resource key passed to ProcessItem") return nil } crt, err := c.certificateLister.Certificates(namespace).Get(name) if apierrors.IsNotFound(err) { log.V(logf.DebugLevel).Info("certificate not found for key", "error", err.Error()) return nil } if err != nil { return err } if apiutil.CertificateHasCondition(crt, acmapi.CertificateCondition{ Type: acmapi.CertificateConditionIssuing, Status: acmmeta.ConditionTrue, }) { // Do nothing if an issuance is already in progress return nil } input, err := c.dataForCertificate(ctx, crt) if err != nil { return err } if crt.Status.RenewalTime != nil { //TODO Implement a scheduler to recheck if the certificate is near the expire } reason, message, reissue := c.shouldReissue(input) if !reissue { // no re-issuance required return nil } log.V(logf.InfoLevel).Info("Certificate must be re-issued", "reason", reason, "message", message) crt = crt.DeepCopy() apiutil.SetCertificateCondition(crt, crt.Generation, acmapi.CertificateConditionIssuing, acmmeta.ConditionTrue, reason, message) if err := c.updateOrApplyStatus(ctx, crt); err != nil { return err } c.recorder.Event(crt, corev1.EventTypeNormal, "Issuing", message) return nil } func (c *controller) updateOrApplyStatus(ctx context.Context, crt *acmapi.Certificate) error { _, err := c.client.AnthosCertmanagerV1().Certificates(crt.Namespace).UpdateStatus(ctx, crt, metav1.UpdateOptions{}) return err } // controllerWrapper wraps the `controller` structure to make it implement // the controllerpkg.queueingController interface type controllerWrapper struct { *controller } func (c *controllerWrapper) Register(ctx *controllerpkg.Context) (workqueue.RateLimitingInterface, []cache.InformerSynced, error) { // construct a new named logger to be reused throughout the controller log := logf.FromContext(ctx.RootContext, ControllerName) ctrl, queue, mustSync := NewController(log, ctx.ACMClient, ctx.KubeSharedInformerFactory, ctx.SharedInformerFactory, ctx.Recorder, ctx.Clock, policies.NewTriggerPolicyChain(ctx.Clock).Evaluate, ctx.FieldManager, ) c.controller = ctrl return queue, mustSync, nil } func init() { controllerpkg.Register(ControllerName, func(ctx *controllerpkg.ContextFactory) (controllerpkg.Interface, error) { return controllerpkg.NewBuilder(ctx, ControllerName). For(&controllerWrapper{}). Complete() }) }