package requestmanager import ( "bytes" "context" "crypto" "encoding/pem" "fmt" "strconv" "time" apiutil "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/api/util" "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/util/pki" acmapi "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/apis/anthoscertmanager/v1" acmmeta "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/apis/meta/v1" acmclient "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/clientset/versioned" acmlisters "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/listers/anthoscertmanager/v1" controllerpkg "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller" "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller/certificates" "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/util/predicate" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/wait" acminformers "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/informers/externalversions" logf "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/logs" "github.com/go-logr/logr" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/informers" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/utils/clock" ) const ( ControllerName = "certificates-request-manager" reasonRequestFailed = "RequestFailed" reasonRequested = "Requested" ) var ( certificateGvk = acmapi.SchemeGroupVersion.WithKind("Certificate") ) type controller struct { certificateLister acmlisters.CertificateLister certificateRequestLister acmlisters.CertificateRequestLister secretLister corelisters.SecretLister client acmclient.Interface recorder record.EventRecorder clock clock.Clock fieldManager string } func NewController( log logr.Logger, client acmclient.Interface, factory informers.SharedInformerFactory, acmFactory acminformers.SharedInformerFactory, recorder record.EventRecorder, clock clock.Clock, certificateControllerOptions controllerpkg.CertificateOptions, fieldManager string, ) (*controller, workqueue.RateLimitingInterface, []cache.InformerSynced) { // create a queue used to queue up items to be processed queue := workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(time.Second*1, time.Second*30), ControllerName) // obtain references to all the informers used by this controller certificateInformer := acmFactory.AnthosCertmanager().V1().Certificates() certificateRequestInformer := acmFactory.AnthosCertmanager().V1().CertificateRequests() secretsInformer := factory.Core().V1().Secrets() certificateInformer.Informer().AddEventHandler(&controllerpkg.QueuingEventHandler{Queue: queue}) certificateRequestInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{ // Trigger reconciles on changes to any 'owned' CertificateRequest resources WorkFunc: certificates.EnqueueCertificatesForResourceUsingPredicates(log, queue, certificateInformer.Lister(), labels.Everything(), predicate.ResourceOwnerOf, ), }) secretsInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{ // Trigger reconciles on changes to any 'owned' secret resources WorkFunc: certificates.EnqueueCertificatesForResourceUsingPredicates(log, queue, certificateInformer.Lister(), labels.Everything(), predicate.ResourceOwnerOf, ), }) // build a list of InformerSynced functions that will be returned by the Register method. // the controller will only begin processing items once all of these informers have synced. mustSync := []cache.InformerSynced{ secretsInformer.Informer().HasSynced, certificateRequestInformer.Informer().HasSynced, certificateInformer.Informer().HasSynced, } return &controller{ certificateLister: certificateInformer.Lister(), certificateRequestLister: certificateRequestInformer.Lister(), secretLister: secretsInformer.Lister(), client: client, recorder: recorder, clock: clock, // copiedAnnotationPrefixes: certificateControllerOptions.CopiedAnnotationPrefixes, fieldManager: fieldManager, }, queue, mustSync } func (c *controller) ProcessItem(ctx context.Context, key string) error { log := logf.FromContext(ctx).WithValues("key", key) ctx = logf.NewContext(ctx, log) namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { log.Error(err, "invalid resource key passed to ProcessItem") return nil } crt, err := c.certificateLister.Certificates(namespace).Get(name) if apierrors.IsNotFound(err) { log.V(logf.DebugLevel).Info("certificate not found for key", "error", err.Error()) return nil } if err != nil { return err } // Confirm the certificate has the issuing condition if !apiutil.CertificateHasCondition(crt, acmapi.CertificateCondition{ Type: acmapi.CertificateConditionIssuing, Status: acmmeta.ConditionTrue, }) { return nil } // Check for and fetch the `status.nextPrivateKeySecretName` secret if crt.Status.NextPrivateKeySecretName == nil { log.V(logf.DebugLevel).Info("status.nextPrivateKeySecretName not yet set, waiting for keymanager before processing certificate") return nil } nextPrivateKeySecret, err := c.secretLister.Secrets(crt.Namespace).Get(*crt.Status.NextPrivateKeySecretName) if apierrors.IsNotFound(err) { log.V(logf.DebugLevel).Info("nextPrivateKeySecretName Secret resource does not exist, waiting for keymanager to create it before continuing") return nil } if err != nil { return err } if nextPrivateKeySecret.Data == nil || len(nextPrivateKeySecret.Data[corev1.TLSPrivateKeyKey]) == 0 { log.V(logf.DebugLevel).Info("Next private key secret does not contain any valid data, waiting for keymanager before processing certificate") return nil } pk, err := pki.DecodePrivateKeyBytes(nextPrivateKeySecret.Data[corev1.TLSPrivateKeyKey]) if err != nil { log.Error(err, "Failed to decode next private key secret data, waiting for keymanager before processing certificate") return nil } // Discover all 'owned' CertificateRequests requests, err := certificates.ListCertificateRequestsMatchingPredicates(c.certificateRequestLister.CertificateRequests(crt.Namespace), labels.Everything(), predicate.ResourceOwnedBy(crt)) if err != nil { return err } // delete any existing CertificateRequest resources that do not have a // revision annotation if requests, err = c.deleteRequestsWithoutRevision(ctx, requests...); err != nil { return err } currentCertificateRevision := 0 if crt.Status.Revision != nil { currentCertificateRevision = *crt.Status.Revision } nextRevision := currentCertificateRevision + 1 requests, err = requestsWithRevision(requests, currentCertificateRevision) if err != nil { return err } requests, err = c.deleteRequestsNotMatchingSpec(ctx, crt, pk.Public(), requests...) if err != nil { return err } requests, err = c.deleteCurrentFailedRequests(ctx, crt, requests...) if err != nil { return err } if len(requests) > 1 { log.V(logf.ErrorLevel).Info("Multiple matching CertificateRequest resources exist, delete one of them. This is likely an error and should be reported on the issue tracker!") return nil } if len(requests) == 1 { // Nothing to do as we've already verified that the CertificateRequest // is up to date above. return nil } return c.createNewCertificateRequest(ctx, crt, pk, nextRevision, nextPrivateKeySecret.Name) } func requestsWithRevision(reqs []*acmapi.CertificateRequest, revision int) ([]*acmapi.CertificateRequest, error) { var remaining []*acmapi.CertificateRequest for _, req := range reqs { if req.Annotations == nil || req.Annotations[acmapi.CertificateRequestRevisionAnnotationKey] == "" { return nil, fmt.Errorf("certificaterequest %q does not contain revision annotation", req.Name) } reqRevisionStr := req.Annotations[acmapi.CertificateRequestRevisionAnnotationKey] reqRevision, err := strconv.ParseInt(reqRevisionStr, 10, 0) if err != nil { return nil, err } if reqRevision == int64(revision) { remaining = append(remaining, req) } } return remaining, nil } func (c *controller) deleteCurrentFailedRequests(ctx context.Context, crt *acmapi.Certificate, reqs ...*acmapi.CertificateRequest) ([]*acmapi.CertificateRequest, error) { log := logf.FromContext(ctx).WithValues("Certificate", crt.Name) var remaining []*acmapi.CertificateRequest for _, req := range reqs { log = logf.WithRelatedResource(log, req) // Check if there are any 'current' CertificateRequests that // failed during the previous issuance cycle. Those should be // deleted so that a new one gets created and the issuance is // re-tried. In practice no more than one CertificateRequest is // expected at this point. crReadyCond := apiutil.GetCertificateRequestCondition(req, acmapi.CertificateRequestConditionReady) if crReadyCond == nil || crReadyCond.Status != acmmeta.ConditionFalse || crReadyCond.Reason != acmapi.CertificateRequestReasonFailed { remaining = append(remaining, req) continue } certIssuingCond := apiutil.GetCertificateCondition(crt, acmapi.CertificateConditionIssuing) if certIssuingCond == nil { // This should never happen log.V(logf.ErrorLevel).Info("Certificate does not have Issuing condition") return nil, nil } // If the Issuing condition on the Certificate is newer than the // failure time on CertificateRequest, it means that the // CertificateRequest failed during the previous issuance (for the // same revision). If it is a CertificateRequest that failed // during the previous issuance, then it should be deleted so // that we create a new one for this issuance. if req.Status.FailureTime.Before(certIssuingCond.LastTransitionTime) { log.V(logf.DebugLevel).Info("Found a failed CertificateRequest for previous issuance of this revision, deleting...") if err := c.client.AnthosCertmanagerV1().CertificateRequests(req.Namespace).Delete(ctx, req.Name, metav1.DeleteOptions{}); err != nil { return nil, err } continue } remaining = append(remaining, req) } return remaining, nil } func (c *controller) deleteRequestsNotMatchingSpec(ctx context.Context, crt *acmapi.Certificate, publicKey crypto.PublicKey, reqs ...*acmapi.CertificateRequest) ([]*acmapi.CertificateRequest, error) { log := logf.FromContext(ctx) var remaining []*acmapi.CertificateRequest for _, req := range reqs { log := logf.WithRelatedResource(log, req) violations, err := certificates.RequestMatchesSpec(req, crt.Spec) if err != nil { log.Error(err, "Failed to check if CertificateRequest matches spec, deleting CertificateRequest") if err := c.client.AnthosCertmanagerV1().CertificateRequests(req.Namespace).Delete(ctx, req.Name, metav1.DeleteOptions{}); err != nil { return nil, err } continue } if len(violations) > 0 { log.V(logf.InfoLevel).WithValues("violations", violations).Info("CertificateRequest does not match requirements on certificate.spec, deleting CertificateRequest", "violations", violations) if err := c.client.AnthosCertmanagerV1().CertificateRequests(req.Namespace).Delete(ctx, req.Name, metav1.DeleteOptions{}); err != nil { return nil, err } continue } x509Req, err := pki.DecodeX509CertificateRequestBytes(req.Spec.Request) if err != nil { // this case cannot happen as RequestMatchesSpec would have returned an error too return nil, err } matches, err := pki.PublicKeyMatchesCSR(publicKey, x509Req) if err != nil { return nil, err } if !matches { log.V(logf.DebugLevel).Info("CertificateRequest contains a CSR that does not have the same public key as the stored next private key secret, deleting CertificateRequest") if err := c.client.AnthosCertmanagerV1().CertificateRequests(req.Namespace).Delete(ctx, req.Name, metav1.DeleteOptions{}); err != nil { return nil, err } continue } remaining = append(remaining, req) } return remaining, nil } func (c *controller) deleteRequestsWithoutRevision(ctx context.Context, reqs ...*acmapi.CertificateRequest) ([]*acmapi.CertificateRequest, error) { log := logf.FromContext(ctx) var remaining []*acmapi.CertificateRequest for _, req := range reqs { log := logf.WithRelatedResource(log, req) if req.Annotations == nil || req.Annotations[acmapi.CertificateRequestRevisionAnnotationKey] == "" { log.V(logf.DebugLevel).Info("Deleting CertificateRequest as it does not contain a revision annotation") if err := c.client.AnthosCertmanagerV1().CertificateRequests(req.Namespace).Delete(ctx, req.Name, metav1.DeleteOptions{}); err != nil { return nil, err } continue } reqRevisionStr := req.Annotations[acmapi.CertificateRequestRevisionAnnotationKey] _, err := strconv.ParseInt(reqRevisionStr, 10, 0) if err != nil { log.V(logf.DebugLevel).Info("Deleting CertificateRequest as it contains an invalid revision annotation") if err := c.client.AnthosCertmanagerV1().CertificateRequests(req.Namespace).Delete(ctx, req.Name, metav1.DeleteOptions{}); err != nil { return nil, err } continue } remaining = append(remaining, req) } return remaining, nil } func (c *controller) createNewCertificateRequest(ctx context.Context, crt *acmapi.Certificate, pk crypto.Signer, nextRevision int, nextPrivateKeySecretName string) error { log := logf.FromContext(ctx) x509CSR, err := pki.GenerateCSR(crt) if err != nil { log.Error(err, "Failed to generate CSR - will not retry") return nil } csrDER, err := pki.EncodeCSR(x509CSR, pk) if err != nil { return err } csrPEM := bytes.NewBuffer([]byte{}) err = pem.Encode(csrPEM, &pem.Block{Type: "CERTIFICATE REQUEST", Bytes: csrDER}) if err != nil { return err } annotations := controllerpkg.BuildAnnotationsToCopy(crt.Annotations, []string{}) annotations[acmapi.CertificateRequestRevisionAnnotationKey] = strconv.Itoa(nextRevision) annotations[acmapi.CertificateRequestPrivateKeyAnnotationKey] = nextPrivateKeySecretName annotations[acmapi.CertificateNameKey] = crt.Name cr := &acmapi.CertificateRequest{ ObjectMeta: metav1.ObjectMeta{ Namespace: crt.Namespace, GenerateName: apiutil.DNSSafeShortenTo52Characters(crt.Name) + "-", Annotations: annotations, Labels: crt.Labels, OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(crt, certificateGvk)}, }, Spec: acmapi.CertificateRequestSpec{ Duration: crt.Spec.Duration, IssuerRef: crt.Spec.IssuerRef, Request: csrPEM.Bytes(), IsCA: crt.Spec.IsCA, Usages: crt.Spec.Usages, }, } cr, err = c.client.AnthosCertmanagerV1().CertificateRequests(cr.Namespace).Create(ctx, cr, metav1.CreateOptions{FieldManager: c.fieldManager}) if err != nil { c.recorder.Eventf(crt, corev1.EventTypeWarning, reasonRequestFailed, "Failed to create CertificateRequest: "+err.Error()) return err } c.recorder.Eventf(crt, corev1.EventTypeNormal, reasonRequested, "Created new CertificateRequest resource %q", cr.Name) if err := c.waitForCertificateRequestToExist(cr.Namespace, cr.Name); err != nil { return fmt.Errorf("failed whilst waiting for CertificateRequest to exist - this may indicate an apiserver running slowly. Request will be retried") } return nil } func (c *controller) waitForCertificateRequestToExist(namespace, name string) error { return wait.Poll(time.Millisecond*100, time.Second*5, func() (bool, error) { _, err := c.certificateRequestLister.CertificateRequests(namespace).Get(name) if apierrors.IsNotFound(err) { return false, nil } if err != nil { return false, err } return true, nil }) } // controllerWrapper wraps the `controller` structure to make it implement // the controllerpkg.queueingController interface type controllerWrapper struct { *controller } func (c *controllerWrapper) Register(ctx *controllerpkg.Context) (workqueue.RateLimitingInterface, []cache.InformerSynced, error) { // construct a new named logger to be reused throughout the controller log := logf.FromContext(ctx.RootContext, ControllerName) ctrl, queue, mustSync := NewController(log, ctx.ACMClient, ctx.KubeSharedInformerFactory, ctx.SharedInformerFactory, ctx.Recorder, ctx.Clock, ctx.CertificateOptions, ctx.FieldManager, ) c.controller = ctrl return queue, mustSync, nil } func init() { controllerpkg.Register(ControllerName, func(ctx *controllerpkg.ContextFactory) (controllerpkg.Interface, error) { return controllerpkg.NewBuilder(ctx, ControllerName). For(&controllerWrapper{}). Complete() }) }