Newer
Older
AnthosCertManager / pkg / controller / certificates / trigger / trigger_controller.go
package trigger

import (
	"context"
	"time"

	acmapi "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/apis/anthoscertmanager/v1"
	acmmeta "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/apis/meta/v1"
	acmclient "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/clientset/versioned"
	acmlisters "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/listers/anthoscertmanager/v1"
	controllerpkg "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller"
	"gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller/certificates"
	"gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/util/predicate"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

	"github.com/go-logr/logr"
	corev1 "k8s.io/api/core/v1"

	apiutil "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/api/util"
	acminformers "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/informers/externalversions"
	policies "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller/certificates/policies"
	logf "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/logs"
	apierrors "k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/client-go/informers"
	corelisters "k8s.io/client-go/listers/core/v1"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/record"
	"k8s.io/client-go/util/workqueue"
	"k8s.io/utils/clock"
)

const (
	ControllerName = "certificates-trigger"
)

type controller struct {
	certificateLister        acmlisters.CertificateLister
	certificateRequestLister acmlisters.CertificateRequestLister
	secretLister             corelisters.SecretLister
	client                   acmclient.Interface
	recorder                 record.EventRecorder
	// scheduledWorkQueue       scheduler.ScheduledWorkQueue

	// fieldManager is the string which will be used as the Field Manager on
	// fields created or edited by the cert-manager Kubernetes client during
	// Apply API calls.
	fieldManager string

	// The following are used for testing purposes.
	clock              clock.Clock
	shouldReissue      policies.Func
	dataForCertificate func(context.Context, *acmapi.Certificate) (policies.Input, error)
}

func NewController(log logr.Logger,
	client acmclient.Interface,
	factory informers.SharedInformerFactory,
	acmFactory acminformers.SharedInformerFactory,
	recorder record.EventRecorder,
	clock clock.Clock,
	shouldReissue policies.Func,
	fieldManager string,
) (*controller, workqueue.RateLimitingInterface, []cache.InformerSynced) {

	// create a queue used to queue up items to be processed.
	queue := workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(time.Second*1, time.Second*30), ControllerName)

	// obtain references to all the informers used by this controller
	certificateInformer := acmFactory.AnthosCertmanager().V1().Certificates()
	certificateRequestInformer := acmFactory.AnthosCertmanager().V1().CertificateRequests()
	secretsInformer := factory.Core().V1().Secrets()

	// Certificate events will be handled asynchonizelly
	certificateInformer.Informer().AddEventHandler(&controllerpkg.QueuingEventHandler{Queue: queue})

	// When a CertificateRequest resource changes, enqueue the Certificate resource that owns it.
	certificateRequestInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{
		WorkFunc: certificates.EnqueueCertificatesForResourceUsingPredicates(log, queue, certificateInformer.Lister(), labels.Everything(), predicate.ResourceOwnerOf),
	})

	// When a secret resource changes, enqueue any certificate resource that name it as  spec.SecretName
	secretsInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{
		WorkFunc: certificates.EnqueueCertificatesForResourceUsingPredicates(log, queue, certificateInformer.Lister(), labels.Everything(), predicate.ResourceOwnerOf),
	})

	// build a list of InformerSynced functions that will be returned by the Register method.
	// the controller will only begin processing items once all of these informers have synced.
	mustSync := []cache.InformerSynced{
		certificateRequestInformer.Informer().HasSynced,
		secretsInformer.Informer().HasSynced,
		certificateInformer.Informer().HasSynced,
	}

	return &controller{
		certificateLister:        certificateInformer.Lister(),
		certificateRequestLister: certificateRequestInformer.Lister(),
		secretLister:             secretsInformer.Lister(),
		client:                   client,
		recorder:                 recorder,
		//scheduledWorkQueue:       scheduler.NewScheduledWorkQueue(clock, queue.Add),
		fieldManager: fieldManager,

		// The following are used for testing purposes.
		clock:         clock,
		shouldReissue: shouldReissue,
		dataForCertificate: (&policies.Gatherer{
			CertificateRequestLister: certificateRequestInformer.Lister(),
			SecretLister:             secretsInformer.Lister(),
		}).DataForCertificate,
	}, queue, mustSync
}

func (c *controller) ProcessItem(ctx context.Context, key string) error {
	log := logf.FromContext(ctx).WithValues("key", key)
	ctx = logf.NewContext(ctx, log)
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		log.Error(err, "invalid resource key passed to ProcessItem")
		return nil
	}

	crt, err := c.certificateLister.Certificates(namespace).Get(name)
	if apierrors.IsNotFound(err) {
		log.V(logf.DebugLevel).Info("certificate not found for key", "error", err.Error())
		return nil
	}
	if err != nil {
		return err
	}

	if apiutil.CertificateHasCondition(crt, acmapi.CertificateCondition{
		Type:   acmapi.CertificateConditionIssuing,
		Status: acmmeta.ConditionTrue,
	}) {
		// Do nothing if an issuance is already in progress
		return nil
	}

	input, err := c.dataForCertificate(ctx, crt)
	if err != nil {
		return err
	}

	if crt.Status.RenewalTime != nil {
		//TODO Implement a scheduler to recheck if the certificate is near the expire
	}

	reason, message, reissue := c.shouldReissue(input)

	if !reissue {
		// no re-issuance required
		return nil
	}

	log.V(logf.InfoLevel).Info("Certificate must be re-issued", "reason", reason, "message", message)
	crt = crt.DeepCopy()
	apiutil.SetCertificateCondition(crt, crt.Generation, acmapi.CertificateConditionIssuing, acmmeta.ConditionTrue, reason, message)
	if err := c.updateOrApplyStatus(ctx, crt); err != nil {
		return err
	}
	c.recorder.Event(crt, corev1.EventTypeNormal, "Issuing", message)

	return nil
}

func (c *controller) updateOrApplyStatus(ctx context.Context, crt *acmapi.Certificate) error {
	_, err := c.client.AnthosCertmanagerV1().Certificates(crt.Namespace).UpdateStatus(ctx, crt, metav1.UpdateOptions{})
	return err
}

// controllerWrapper wraps the `controller` structure to make it implement
// the controllerpkg.queueingController interface
type controllerWrapper struct {
	*controller
}

func (c *controllerWrapper) Register(ctx *controllerpkg.Context) (workqueue.RateLimitingInterface, []cache.InformerSynced, error) {
	// construct a new named logger to be reused throughout the controller
	log := logf.FromContext(ctx.RootContext, ControllerName)

	ctrl, queue, mustSync := NewController(log,
		ctx.ACMClient,
		ctx.KubeSharedInformerFactory,
		ctx.SharedInformerFactory,
		ctx.Recorder,
		ctx.Clock,
		policies.NewTriggerPolicyChain(ctx.Clock).Evaluate,
		ctx.FieldManager,
	)
	c.controller = ctrl

	return queue, mustSync, nil
}

func init() {
	controllerpkg.Register(ControllerName, func(ctx *controllerpkg.ContextFactory) (controllerpkg.Interface, error) {
		return controllerpkg.NewBuilder(ctx, ControllerName).
			For(&controllerWrapper{}).
			Complete()
	})
}