package issuing import ( "context" "crypto" "time" apiutil "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/api/util" acmapi "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/apis/anthoscertmanager/v1" acmmeta "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/apis/meta/v1" acmclient "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/clientset/versioned" acminformers "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/informers/externalversions" acmlisters "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/listers/anthoscertmanager/v1" controllerpkg "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller" "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller/certificates" "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller/certificates/issuing/internal" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" policies "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller/certificates/policies" logf "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/logs" utilpki "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/util/pki" "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/util/predicate" "github.com/go-logr/logr" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/utils/clock" ) const ( ControllerName = "certificates-issuing" ) // This controller observes the state of the certificate's 'Issuing' condition, // which will then copy the signed certificates and private key to the target // Secret resource. type controller struct { certificateLister acmlisters.CertificateLister certificateRequestLister acmlisters.CertificateRequestLister secretLister corelisters.SecretLister recorder record.EventRecorder clock clock.Clock client acmclient.Interface // postIssuancePolicyChain is the policies chain to ensure that all Secret // metadata and output formats are kept are present and correct. postIssuancePolicyChain policies.Chain // secretsUpdateData is used by the SecretTemplate controller for // re-reconciling Secrets where the SecretTemplate is not up to date with a // Certificate's secret. secretsUpdateData func(context.Context, *acmapi.Certificate, internal.SecretData) error } func NewController( log logr.Logger, kubeClient kubernetes.Interface, client acmclient.Interface, factory informers.SharedInformerFactory, acmFactory acminformers.SharedInformerFactory, recorder record.EventRecorder, clock clock.Clock, certificateControllerOptions controllerpkg.CertificateOptions, fieldManager string, ) (*controller, workqueue.RateLimitingInterface, []cache.InformerSynced) { // create a queue used to queue up items to be processed queue := workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(time.Second, time.Second*30), ControllerName) // obtain references to all the informers used by this controller certificateInformer := acmFactory.AnthosCertmanager().V1().Certificates() certificateRequestInformer := acmFactory.AnthosCertmanager().V1().CertificateRequests() secretsInformer := factory.Core().V1().Secrets() certificateInformer.Informer().AddEventHandler(&controllerpkg.QueuingEventHandler{Queue: queue}) certificateRequestInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{ WorkFunc: certificates.EnqueueCertificatesForResourceUsingPredicates(log, queue, certificateInformer.Lister(), labels.Everything(), predicate.ResourceOwnerOf), }) secretsInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{ // Issuer reconciles on changes to the Secret named `spec.nextPrivateKeySecretName` WorkFunc: certificates.EnqueueCertificatesForResourceUsingPredicates(log, queue, certificateInformer.Lister(), labels.Everything(), predicate.ResourceOwnerOf, predicate.ExtractResourceName(predicate.CertificateNextPrivateKeySecretName)), }) // build a list of InformerSynced functions that will be returned by the Register method. // the controller will only begin processing items once all of these informers have synced. mustSync := []cache.InformerSynced{ certificateInformer.Informer().HasSynced, certificateInformer.Informer().HasSynced, secretsInformer.Informer().HasSynced, } secretsManager := internal.NewSecretsManager( kubeClient.CoreV1(), secretsInformer.Lister(), fieldManager, certificateControllerOptions.EnableOwnerRef, ) return &controller{ certificateLister: certificateInformer.Lister(), certificateRequestLister: certificateRequestInformer.Lister(), secretLister: secretsInformer.Lister(), recorder: recorder, clock: clock, secretsUpdateData: secretsManager.UpdateData, client: client, postIssuancePolicyChain: policies.NewSecretPostIssuancePolicyChain(certificateControllerOptions.EnableOwnerRef, fieldManager), }, queue, mustSync } func (c *controller) ProcessItem(ctx context.Context, key string) error { ctx, cancel := context.WithTimeout(ctx, time.Second*10) defer cancel() log := logf.FromContext(ctx).WithValues("key", key) namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } crt, err := c.certificateLister.Certificates(namespace).Get(name) if apierrors.IsNotFound(err) { log.V(logf.DebugLevel).Info("Certificate not found for key", "error", err.Error()) } if err != nil { return err } log = logf.WithResource(log, crt) ctx = logf.NewContext(ctx, log) // if the certificate condition is not equal to issuing, we need to ensure all non-issuing related secret Data is correct. if !apiutil.CertificateHasCondition(crt, acmapi.CertificateCondition{ Type: acmapi.CertificateConditionIssuing, Status: acmmeta.ConditionTrue, }) { return c.ensureSecretData(ctx, log, crt) } return nil } // updateOrApplyStatus will update the controller status. func (c *controller) updateOrApplyStatus(ctx context.Context, crt *acmapi.Certificate, conditionRemoved bool) error { // TODO: conditionally doing the operation based on SSA feature gate _, err := c.client.AnthosCertmanagerV1().Certificates(crt.Namespace).UpdateStatus(ctx, crt, metav1.UpdateOptions{}) return err } // issueCertificate will ensure the public key of the CSR matches the signed // certificate, and then store the certificate, CA and private key into the // Secret in the appropriate format type. func (c *controller) issueCertificate(ctx context.Context, nextRevision int, crt *acmapi.Certificate, req *acmapi.CertificateRequest, pk crypto.Signer) error { crt = crt.DeepCopy() if crt.Spec.PrivateKey == nil { crt.Spec.PrivateKey = &acmapi.CertificatePrivateKey{} } pkData, err := utilpki.EncodePrivateKey(pk, crt.Spec.PrivateKey.Encoding) if err != nil { return err } secretData := internal.SecretData{ PrivateKey: pkData, Certificate: req.Status.Certificate, CA: req.Status.CA, } if err := c.secretsUpdateData(ctx, crt, secretData); err != nil { return err } // Set status.revision to revision of the CertificateRequest crt.Status.Revision = &nextRevision // Remove Issuing status condition // TODO: Once we move to only server-side apply API calls, this should be changed to setting the Issuing condition to False apiutil.RemoveCertificateCondition(crt, acmapi.CertificateConditionIssuing) // Clean the failed attempts crt.Status.FailedIssuanceAttempts = nil // Clean status.lastFailureTime crt.Status.LastFailureTime = nil if err := c.updateOrApplyStatus(ctx, crt, true); err != nil { return err } message := "The certificate has been successfully issused" c.recorder.Event(crt, corev1.EventTypeNormal, "Issuing", message) return nil } // controllerWrapper wraps the `controller` structure to make it implement // the controllerpkg.queueingController interface type controllerWrapper struct { *controller } func (c *controllerWrapper) Register(ctx *controllerpkg.Context) (workqueue.RateLimitingInterface, []cache.InformerSynced, error) { // construct a new named logger to be reused throughout the controller log := logf.FromContext(ctx.RootContext, ControllerName) ctrl, queue, mustSync := NewController(log, ctx.Client, ctx.ACMClient, ctx.KubeSharedInformerFactory, ctx.SharedInformerFactory, ctx.Recorder, ctx.Clock, ctx.CertificateOptions, ctx.FieldManager, ) c.controller = ctrl return queue, mustSync, nil } func init() { controllerpkg.Register(ControllerName, func(ctx *controllerpkg.ContextFactory) (controllerpkg.Interface, error) { return controllerpkg.NewBuilder(ctx, ControllerName).For(&controllerWrapper{}).Complete() }) }