Newer
Older
AnthosCertManager / pkg / controller / certificates / requestmanager / requestmanager_controller.go
package requestmanager

import (
	"bytes"
	"context"
	"crypto"
	"encoding/pem"
	"fmt"
	"strconv"
	"time"

	apiutil "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/api/util"
	"gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/util/pki"

	acmapi "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/apis/anthoscertmanager/v1"
	acmmeta "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/apis/meta/v1"
	acmclient "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/clientset/versioned"
	acmlisters "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/listers/anthoscertmanager/v1"

	controllerpkg "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller"
	"gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller/certificates"
	"gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/util/predicate"
	corev1 "k8s.io/api/core/v1"
	apierrors "k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/util/wait"

	acminformers "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/informers/externalversions"
	logf "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/logs"
	"github.com/go-logr/logr"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/client-go/informers"
	corelisters "k8s.io/client-go/listers/core/v1"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/record"
	"k8s.io/client-go/util/workqueue"
	"k8s.io/utils/clock"
)

const (
	ControllerName      = "certificates-request-manager"
	reasonRequestFailed = "RequestFailed"
	reasonRequested     = "Requested"
)

var (
	certificateGvk = acmapi.SchemeGroupVersion.WithKind("Certificate")
)

type controller struct {
	certificateLister        acmlisters.CertificateLister
	certificateRequestLister acmlisters.CertificateRequestLister
	secretLister             corelisters.SecretLister

	client       acmclient.Interface
	recorder     record.EventRecorder
	clock        clock.Clock
	fieldManager string
}

func NewController(
	log logr.Logger,
	client acmclient.Interface,
	factory informers.SharedInformerFactory,
	acmFactory acminformers.SharedInformerFactory,
	recorder record.EventRecorder,
	clock clock.Clock,
	certificateControllerOptions controllerpkg.CertificateOptions,
	fieldManager string,
) (*controller, workqueue.RateLimitingInterface, []cache.InformerSynced) {

	// create a queue used to queue up items to be processed
	queue := workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(time.Second*1, time.Second*30), ControllerName)

	// obtain references to all the informers used by this controller
	certificateInformer := acmFactory.AnthosCertmanager().V1().Certificates()
	certificateRequestInformer := acmFactory.AnthosCertmanager().V1().CertificateRequests()
	secretsInformer := factory.Core().V1().Secrets()

	certificateInformer.Informer().AddEventHandler(&controllerpkg.QueuingEventHandler{Queue: queue})
	certificateRequestInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{
		// Trigger reconciles on changes to any 'owned' CertificateRequest resources
		WorkFunc: certificates.EnqueueCertificatesForResourceUsingPredicates(log, queue, certificateInformer.Lister(), labels.Everything(),
			predicate.ResourceOwnerOf,
		),
	})
	secretsInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{
		// Trigger reconciles on changes to any 'owned' secret resources
		WorkFunc: certificates.EnqueueCertificatesForResourceUsingPredicates(log, queue, certificateInformer.Lister(), labels.Everything(),
			predicate.ResourceOwnerOf,
		),
	})

	// build a list of InformerSynced functions that will be returned by the Register method.
	// the controller will only begin processing items once all of these informers have synced.
	mustSync := []cache.InformerSynced{
		secretsInformer.Informer().HasSynced,
		certificateRequestInformer.Informer().HasSynced,
		certificateInformer.Informer().HasSynced,
	}

	return &controller{
		certificateLister:        certificateInformer.Lister(),
		certificateRequestLister: certificateRequestInformer.Lister(),
		secretLister:             secretsInformer.Lister(),
		client:                   client,
		recorder:                 recorder,
		clock:                    clock,
		// copiedAnnotationPrefixes: certificateControllerOptions.CopiedAnnotationPrefixes,
		fieldManager: fieldManager,
	}, queue, mustSync
}

func (c *controller) ProcessItem(ctx context.Context, key string) error {
	log := logf.FromContext(ctx).WithValues("key", key)

	ctx = logf.NewContext(ctx, log)
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		log.Error(err, "invalid resource key passed to ProcessItem")
		return nil
	}

	crt, err := c.certificateLister.Certificates(namespace).Get(name)
	if apierrors.IsNotFound(err) {
		log.V(logf.DebugLevel).Info("certificate not found for key", "error", err.Error())
		return nil
	}
	if err != nil {
		return err
	}

	// Confirm the certificate has the issuing condition
	if !apiutil.CertificateHasCondition(crt, acmapi.CertificateCondition{
		Type:   acmapi.CertificateConditionIssuing,
		Status: acmmeta.ConditionTrue,
	}) {
		return nil
	}

	// Check for and fetch the `status.nextPrivateKeySecretName` secret
	if crt.Status.NextPrivateKeySecretName == nil {
		log.V(logf.DebugLevel).Info("status.nextPrivateKeySecretName not yet set, waiting for keymanager before processing certificate")
		return nil
	}
	nextPrivateKeySecret, err := c.secretLister.Secrets(crt.Namespace).Get(*crt.Status.NextPrivateKeySecretName)
	if apierrors.IsNotFound(err) {
		log.V(logf.DebugLevel).Info("nextPrivateKeySecretName Secret resource does not exist, waiting for keymanager to create it before continuing")
		return nil
	}
	if err != nil {
		return err
	}
	if nextPrivateKeySecret.Data == nil || len(nextPrivateKeySecret.Data[corev1.TLSPrivateKeyKey]) == 0 {
		log.V(logf.DebugLevel).Info("Next private key secret does not contain any valid data, waiting for keymanager before processing certificate")
		return nil
	}
	pk, err := pki.DecodePrivateKeyBytes(nextPrivateKeySecret.Data[corev1.TLSPrivateKeyKey])
	if err != nil {
		log.Error(err, "Failed to decode next private key secret data, waiting for keymanager before processing certificate")
		return nil
	}

	// Discover all 'owned' CertificateRequests
	requests, err := certificates.ListCertificateRequestsMatchingPredicates(c.certificateRequestLister.CertificateRequests(crt.Namespace), labels.Everything(), predicate.ResourceOwnedBy(crt))
	if err != nil {
		return err
	}

	// delete any existing CertificateRequest resources that do not have a
	// revision annotation
	if requests, err = c.deleteRequestsWithoutRevision(ctx, requests...); err != nil {
		return err
	}

	currentCertificateRevision := 0
	if crt.Status.Revision != nil {
		currentCertificateRevision = *crt.Status.Revision
	}

	nextRevision := currentCertificateRevision + 1

	requests, err = requestsWithRevision(requests, currentCertificateRevision)
	if err != nil {
		return err
	}

	requests, err = c.deleteRequestsNotMatchingSpec(ctx, crt, pk.Public(), requests...)
	if err != nil {
		return err
	}

	requests, err = c.deleteCurrentFailedRequests(ctx, crt, requests...)
	if err != nil {
		return err
	}

	if len(requests) > 1 {
		log.V(logf.ErrorLevel).Info("Multiple matching CertificateRequest resources exist, delete one of them. This is likely an error and should be reported on the issue tracker!")
		return nil
	}

	if len(requests) == 1 {
		// Nothing to do as we've already verified that the CertificateRequest
		// is up to date above.
		return nil
	}

	return c.createNewCertificateRequest(ctx, crt, pk, nextRevision, nextPrivateKeySecret.Name)
}

func requestsWithRevision(reqs []*acmapi.CertificateRequest, revision int) ([]*acmapi.CertificateRequest, error) {
	var remaining []*acmapi.CertificateRequest
	for _, req := range reqs {
		if req.Annotations == nil || req.Annotations[acmapi.CertificateRequestRevisionAnnotationKey] == "" {
			return nil, fmt.Errorf("certificaterequest %q does not contain revision annotation", req.Name)
		}
		reqRevisionStr := req.Annotations[acmapi.CertificateRequestRevisionAnnotationKey]
		reqRevision, err := strconv.ParseInt(reqRevisionStr, 10, 0)
		if err != nil {
			return nil, err
		}

		if reqRevision == int64(revision) {
			remaining = append(remaining, req)
		}
	}
	return remaining, nil
}

func (c *controller) deleteCurrentFailedRequests(ctx context.Context, crt *acmapi.Certificate, reqs ...*acmapi.CertificateRequest) ([]*acmapi.CertificateRequest, error) {
	log := logf.FromContext(ctx).WithValues("Certificate", crt.Name)
	var remaining []*acmapi.CertificateRequest
	for _, req := range reqs {
		log = logf.WithRelatedResource(log, req)

		// Check if there are any 'current' CertificateRequests that
		// failed during the previous issuance cycle. Those should be
		// deleted so that a new one gets created and the issuance is
		// re-tried. In practice no more than one CertificateRequest is
		// expected at this point.
		crReadyCond := apiutil.GetCertificateRequestCondition(req, acmapi.CertificateRequestConditionReady)
		if crReadyCond == nil || crReadyCond.Status != acmmeta.ConditionFalse || crReadyCond.Reason != acmapi.CertificateRequestReasonFailed {
			remaining = append(remaining, req)
			continue
		}

		certIssuingCond := apiutil.GetCertificateCondition(crt, acmapi.CertificateConditionIssuing)
		if certIssuingCond == nil {
			// This should never happen
			log.V(logf.ErrorLevel).Info("Certificate does not have Issuing condition")
			return nil, nil
		}
		// If the Issuing condition on the Certificate is newer than the
		// failure time on CertificateRequest, it means that the
		// CertificateRequest failed during the previous issuance (for the
		// same revision). If it is a CertificateRequest that failed
		// during the previous issuance, then it should be deleted so
		// that we create a new one for this issuance.
		if req.Status.FailureTime.Before(certIssuingCond.LastTransitionTime) {
			log.V(logf.DebugLevel).Info("Found a failed CertificateRequest for previous issuance of this revision, deleting...")
			if err := c.client.AnthosCertmanagerV1().CertificateRequests(req.Namespace).Delete(ctx, req.Name, metav1.DeleteOptions{}); err != nil {
				return nil, err
			}
			continue
		}
		remaining = append(remaining, req)
	}
	return remaining, nil
}

func (c *controller) deleteRequestsNotMatchingSpec(ctx context.Context, crt *acmapi.Certificate, publicKey crypto.PublicKey, reqs ...*acmapi.CertificateRequest) ([]*acmapi.CertificateRequest, error) {
	log := logf.FromContext(ctx)
	var remaining []*acmapi.CertificateRequest
	for _, req := range reqs {
		log := logf.WithRelatedResource(log, req)
		violations, err := certificates.RequestMatchesSpec(req, crt.Spec)
		if err != nil {
			log.Error(err, "Failed to check if CertificateRequest matches spec, deleting CertificateRequest")
			if err := c.client.AnthosCertmanagerV1().CertificateRequests(req.Namespace).Delete(ctx, req.Name, metav1.DeleteOptions{}); err != nil {
				return nil, err
			}
			continue
		}
		if len(violations) > 0 {
			log.V(logf.InfoLevel).WithValues("violations", violations).Info("CertificateRequest does not match requirements on certificate.spec, deleting CertificateRequest", "violations", violations)
			if err := c.client.AnthosCertmanagerV1().CertificateRequests(req.Namespace).Delete(ctx, req.Name, metav1.DeleteOptions{}); err != nil {
				return nil, err
			}
			continue
		}
		x509Req, err := pki.DecodeX509CertificateRequestBytes(req.Spec.Request)
		if err != nil {
			// this case cannot happen as RequestMatchesSpec would have returned an error too
			return nil, err
		}
		matches, err := pki.PublicKeyMatchesCSR(publicKey, x509Req)
		if err != nil {
			return nil, err
		}
		if !matches {
			log.V(logf.DebugLevel).Info("CertificateRequest contains a CSR that does not have the same public key as the stored next private key secret, deleting CertificateRequest")
			if err := c.client.AnthosCertmanagerV1().CertificateRequests(req.Namespace).Delete(ctx, req.Name, metav1.DeleteOptions{}); err != nil {
				return nil, err
			}
			continue
		}
		remaining = append(remaining, req)
	}
	return remaining, nil
}

func (c *controller) deleteRequestsWithoutRevision(ctx context.Context, reqs ...*acmapi.CertificateRequest) ([]*acmapi.CertificateRequest, error) {
	log := logf.FromContext(ctx)
	var remaining []*acmapi.CertificateRequest
	for _, req := range reqs {
		log := logf.WithRelatedResource(log, req)
		if req.Annotations == nil || req.Annotations[acmapi.CertificateRequestRevisionAnnotationKey] == "" {
			log.V(logf.DebugLevel).Info("Deleting CertificateRequest as it does not contain a revision annotation")
			if err := c.client.AnthosCertmanagerV1().CertificateRequests(req.Namespace).Delete(ctx, req.Name, metav1.DeleteOptions{}); err != nil {
				return nil, err
			}
			continue
		}
		reqRevisionStr := req.Annotations[acmapi.CertificateRequestRevisionAnnotationKey]
		_, err := strconv.ParseInt(reqRevisionStr, 10, 0)
		if err != nil {
			log.V(logf.DebugLevel).Info("Deleting CertificateRequest as it contains an invalid revision annotation")
			if err := c.client.AnthosCertmanagerV1().CertificateRequests(req.Namespace).Delete(ctx, req.Name, metav1.DeleteOptions{}); err != nil {
				return nil, err
			}
			continue
		}

		remaining = append(remaining, req)
	}
	return remaining, nil
}

func (c *controller) createNewCertificateRequest(ctx context.Context, crt *acmapi.Certificate, pk crypto.Signer, nextRevision int, nextPrivateKeySecretName string) error {
	log := logf.FromContext(ctx)
	x509CSR, err := pki.GenerateCSR(crt)
	if err != nil {
		log.Error(err, "Failed to generate CSR - will not retry")
		return nil
	}
	csrDER, err := pki.EncodeCSR(x509CSR, pk)
	if err != nil {
		return err
	}

	csrPEM := bytes.NewBuffer([]byte{})
	err = pem.Encode(csrPEM, &pem.Block{Type: "CERTIFICATE REQUEST", Bytes: csrDER})
	if err != nil {
		return err
	}

	annotations := controllerpkg.BuildAnnotationsToCopy(crt.Annotations, []string{})
	annotations[acmapi.CertificateRequestRevisionAnnotationKey] = strconv.Itoa(nextRevision)
	annotations[acmapi.CertificateRequestPrivateKeyAnnotationKey] = nextPrivateKeySecretName
	annotations[acmapi.CertificateNameKey] = crt.Name

	cr := &acmapi.CertificateRequest{
		ObjectMeta: metav1.ObjectMeta{
			Namespace:       crt.Namespace,
			GenerateName:    apiutil.DNSSafeShortenTo52Characters(crt.Name) + "-",
			Annotations:     annotations,
			Labels:          crt.Labels,
			OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(crt, certificateGvk)},
		},
		Spec: acmapi.CertificateRequestSpec{
			Duration:  crt.Spec.Duration,
			IssuerRef: crt.Spec.IssuerRef,
			Request:   csrPEM.Bytes(),
			IsCA:      crt.Spec.IsCA,
			Usages:    crt.Spec.Usages,
		},
	}

	cr, err = c.client.AnthosCertmanagerV1().CertificateRequests(cr.Namespace).Create(ctx, cr, metav1.CreateOptions{FieldManager: c.fieldManager})
	if err != nil {
		c.recorder.Eventf(crt, corev1.EventTypeWarning, reasonRequestFailed, "Failed to create CertificateRequest: "+err.Error())
		return err
	}

	c.recorder.Eventf(crt, corev1.EventTypeNormal, reasonRequested, "Created new CertificateRequest resource %q", cr.Name)
	if err := c.waitForCertificateRequestToExist(cr.Namespace, cr.Name); err != nil {
		return fmt.Errorf("failed whilst waiting for CertificateRequest to exist - this may indicate an apiserver running slowly. Request will be retried")
	}
	return nil
}

func (c *controller) waitForCertificateRequestToExist(namespace, name string) error {
	return wait.Poll(time.Millisecond*100, time.Second*5, func() (bool, error) {
		_, err := c.certificateRequestLister.CertificateRequests(namespace).Get(name)
		if apierrors.IsNotFound(err) {
			return false, nil
		}
		if err != nil {
			return false, err
		}
		return true, nil
	})
}

// controllerWrapper wraps the `controller` structure to make it implement
// the controllerpkg.queueingController interface
type controllerWrapper struct {
	*controller
}

func (c *controllerWrapper) Register(ctx *controllerpkg.Context) (workqueue.RateLimitingInterface, []cache.InformerSynced, error) {
	// construct a new named logger to be reused throughout the controller
	log := logf.FromContext(ctx.RootContext, ControllerName)

	ctrl, queue, mustSync := NewController(log,
		ctx.ACMClient,
		ctx.KubeSharedInformerFactory,
		ctx.SharedInformerFactory,
		ctx.Recorder,
		ctx.Clock,
		ctx.CertificateOptions,
		ctx.FieldManager,
	)
	c.controller = ctrl

	return queue, mustSync, nil
}

func init() {
	controllerpkg.Register(ControllerName, func(ctx *controllerpkg.ContextFactory) (controllerpkg.Interface, error) {
		return controllerpkg.NewBuilder(ctx, ControllerName).
			For(&controllerWrapper{}).
			Complete()
	})
}