Newer
Older
AnthosCertManager / pkg / controller / certificates / issuing / issuing_controller.go
package issuing

import (
	"context"
	"crypto"
	"time"

	apiutil "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/api/util"
	acmapi "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/apis/anthoscertmanager/v1"
	acmmeta "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/apis/meta/v1"
	acmclient "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/clientset/versioned"
	acminformers "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/informers/externalversions"
	acmlisters "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/listers/anthoscertmanager/v1"
	controllerpkg "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller"
	"gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller/certificates"
	"gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller/certificates/issuing/internal"
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

	policies "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller/certificates/policies"
	logf "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/logs"
	utilpki "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/util/pki"
	"gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/util/predicate"
	"github.com/go-logr/logr"
	apierrors "k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	corelisters "k8s.io/client-go/listers/core/v1"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/record"
	"k8s.io/client-go/util/workqueue"
	"k8s.io/utils/clock"
)

const (
	ControllerName = "certificates-issuing"
)

// This controller observes the state of the certificate's 'Issuing' condition,
// which will then copy the signed certificates and private key to the target
// Secret resource.
type controller struct {
	certificateLister        acmlisters.CertificateLister
	certificateRequestLister acmlisters.CertificateRequestLister
	secretLister             corelisters.SecretLister
	recorder                 record.EventRecorder
	clock                    clock.Clock
	client                   acmclient.Interface

	// postIssuancePolicyChain is the policies chain to ensure that all Secret
	// metadata and output formats are kept are present and correct.
	postIssuancePolicyChain policies.Chain

	// secretsUpdateData is used by the SecretTemplate controller for
	// re-reconciling Secrets where the SecretTemplate is not up to date with a
	// Certificate's secret.
	secretsUpdateData func(context.Context, *acmapi.Certificate, internal.SecretData) error
}

func NewController(
	log logr.Logger,
	kubeClient kubernetes.Interface,
	client acmclient.Interface,
	factory informers.SharedInformerFactory,
	acmFactory acminformers.SharedInformerFactory,
	recorder record.EventRecorder,
	clock clock.Clock,
	certificateControllerOptions controllerpkg.CertificateOptions,
	fieldManager string,
) (*controller, workqueue.RateLimitingInterface, []cache.InformerSynced) {

	// create a queue used to queue up items to be processed
	queue := workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(time.Second, time.Second*30), ControllerName)

	// obtain references to all the informers used by this controller
	certificateInformer := acmFactory.AnthosCertmanager().V1().Certificates()
	certificateRequestInformer := acmFactory.AnthosCertmanager().V1().CertificateRequests()
	secretsInformer := factory.Core().V1().Secrets()

	certificateInformer.Informer().AddEventHandler(&controllerpkg.QueuingEventHandler{Queue: queue})
	certificateRequestInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{
		WorkFunc: certificates.EnqueueCertificatesForResourceUsingPredicates(log, queue, certificateInformer.Lister(), labels.Everything(), predicate.ResourceOwnerOf),
	})
	secretsInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{
		// Issuer reconciles on changes to the Secret named `spec.nextPrivateKeySecretName`
		WorkFunc: certificates.EnqueueCertificatesForResourceUsingPredicates(log, queue, certificateInformer.Lister(), labels.Everything(), predicate.ResourceOwnerOf, predicate.ExtractResourceName(predicate.CertificateNextPrivateKeySecretName)),
	})

	// build a list of InformerSynced functions that will be returned by the Register method.
	// the controller will only begin processing items once all of these informers have synced.
	mustSync := []cache.InformerSynced{
		certificateInformer.Informer().HasSynced,
		certificateInformer.Informer().HasSynced,
		secretsInformer.Informer().HasSynced,
	}

	secretsManager := internal.NewSecretsManager(
		kubeClient.CoreV1(), secretsInformer.Lister(),
		fieldManager, certificateControllerOptions.EnableOwnerRef,
	)

	return &controller{
		certificateLister:        certificateInformer.Lister(),
		certificateRequestLister: certificateRequestInformer.Lister(),
		secretLister:             secretsInformer.Lister(),
		recorder:                 recorder,
		clock:                    clock,
		secretsUpdateData:        secretsManager.UpdateData,
		client:                   client,
		postIssuancePolicyChain: policies.NewSecretPostIssuancePolicyChain(certificateControllerOptions.EnableOwnerRef,
			fieldManager),
	}, queue, mustSync
}

func (c *controller) ProcessItem(ctx context.Context, key string) error {
	ctx, cancel := context.WithTimeout(ctx, time.Second*10)
	defer cancel()

	log := logf.FromContext(ctx).WithValues("key", key)

	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		return err
	}

	crt, err := c.certificateLister.Certificates(namespace).Get(name)
	if apierrors.IsNotFound(err) {
		log.V(logf.DebugLevel).Info("Certificate not found for key", "error", err.Error())
	}
	if err != nil {
		return err
	}

	log = logf.WithResource(log, crt)
	ctx = logf.NewContext(ctx, log)

	// if the certificate condition is not equal to issuing, we need to ensure all non-issuing related secret Data is correct.
	if !apiutil.CertificateHasCondition(crt, acmapi.CertificateCondition{
		Type:   acmapi.CertificateConditionIssuing,
		Status: acmmeta.ConditionTrue,
	}) {
		return c.ensureSecretData(ctx, log, crt)
	}

	return nil
}

// updateOrApplyStatus will update the controller status.
func (c *controller) updateOrApplyStatus(ctx context.Context, crt *acmapi.Certificate, conditionRemoved bool) error {
	// TODO: conditionally doing the operation based on SSA feature gate

	_, err := c.client.AnthosCertmanagerV1().Certificates(crt.Namespace).UpdateStatus(ctx, crt, metav1.UpdateOptions{})
	return err
}

// issueCertificate will ensure the public key of the CSR matches the signed
// certificate, and then store the certificate, CA and private key into the
// Secret in the appropriate format type.
func (c *controller) issueCertificate(ctx context.Context, nextRevision int, crt *acmapi.Certificate, req *acmapi.CertificateRequest, pk crypto.Signer) error {
	crt = crt.DeepCopy()
	if crt.Spec.PrivateKey == nil {
		crt.Spec.PrivateKey = &acmapi.CertificatePrivateKey{}
	}

	pkData, err := utilpki.EncodePrivateKey(pk, crt.Spec.PrivateKey.Encoding)
	if err != nil {
		return err
	}

	secretData := internal.SecretData{
		PrivateKey:  pkData,
		Certificate: req.Status.Certificate,
		CA:          req.Status.CA,
	}

	if err := c.secretsUpdateData(ctx, crt, secretData); err != nil {
		return err
	}

	// Set status.revision to revision of the CertificateRequest
	crt.Status.Revision = &nextRevision

	// Remove Issuing status condition
	// TODO: Once we move to only server-side apply API calls, this should be changed to setting the Issuing condition to False
	apiutil.RemoveCertificateCondition(crt, acmapi.CertificateConditionIssuing)

	// Clean the failed attempts
	crt.Status.FailedIssuanceAttempts = nil

	// Clean status.lastFailureTime
	crt.Status.LastFailureTime = nil

	if err := c.updateOrApplyStatus(ctx, crt, true); err != nil {
		return err
	}

	message := "The certificate has been successfully issused"
	c.recorder.Event(crt, corev1.EventTypeNormal, "Issuing", message)
	return nil
}

// controllerWrapper wraps the `controller` structure to make it implement
// the controllerpkg.queueingController interface
type controllerWrapper struct {
	*controller
}

func (c *controllerWrapper) Register(ctx *controllerpkg.Context) (workqueue.RateLimitingInterface, []cache.InformerSynced, error) {
	// construct a new named logger to be reused throughout the controller
	log := logf.FromContext(ctx.RootContext, ControllerName)

	ctrl, queue, mustSync := NewController(log,
		ctx.Client,
		ctx.ACMClient,
		ctx.KubeSharedInformerFactory,
		ctx.SharedInformerFactory,
		ctx.Recorder,
		ctx.Clock,
		ctx.CertificateOptions,
		ctx.FieldManager,
	)
	c.controller = ctrl

	return queue, mustSync, nil
}

func init() {
	controllerpkg.Register(ControllerName, func(ctx *controllerpkg.ContextFactory) (controllerpkg.Interface, error) {
		return controllerpkg.NewBuilder(ctx, ControllerName).For(&controllerWrapper{}).Complete()
	})
}