Newer
Older
AnthosCertManager / pkg / controller / issuers / controller.go
@Yangyang Xie Yangyang Xie on 22 Nov 2022 3 KB fix issues
package issuers

import (
	"context"

	"github.com/go-logr/logr"
	corelisters "k8s.io/client-go/listers/core/v1"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/record"
	"k8s.io/client-go/util/workqueue"

	corev1 "k8s.io/api/core/v1"
	k8sErrors "k8s.io/apimachinery/pkg/api/errors"

	"gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/issuer"
	logf "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/logs"

	acmClient "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/clientset/versioned"
	acmlisters "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/listers/anthoscertmanager/v1"
	controllerpkg "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller"
)

const (
	ControllerName = "issuers"
)

var keyFunc = controllerpkg.KeyFunc

type controller struct {
	issuerLister acmlisters.IssuerLister
	secretLister corelisters.SecretLister

	queue workqueue.RateLimitingInterface

	log logr.Logger

	acmClient acmClient.Interface

	recorder      record.EventRecorder
	issuerFactory issuer.Factory

	// fieldManager is the manager name used for the Apply operations.
	fieldManager string
}

// Register registers and constructs the controller using the provided context.
// It returns the workqueue to be used to enqueue items, a list of
// InformerSynced functions that must be synced, or an error.
func (c *controller) Register(ctx *controllerpkg.Context) (workqueue.RateLimitingInterface, []cache.InformerSynced, error) {
	c.log = logf.FromContext(ctx.RootContext, ControllerName)

	// create a queue used to queue up items to be processed
	c.queue = workqueue.NewNamedRateLimitingQueue(controllerpkg.DefaultItemBasedRateLimiter(), ControllerName)

	issuerInformer := ctx.SharedInformerFactory.AnthosCertmanager().V1().Issuers()
	secretInformer := ctx.KubeSharedInformerFactory.Core().V1().Secrets()

	mustSync := []cache.InformerSynced{
		issuerInformer.Informer().HasSynced,
		secretInformer.Informer().HasSynced,
	}

	c.issuerLister = issuerInformer.Lister()
	c.secretLister = secretInformer.Lister()

	issuerInformer.Informer().AddEventHandler(&controllerpkg.QueuingEventHandler{Queue: c.queue})
	secretInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{WorkFunc: c.secreMutated})

	// instantiate additional helpers used by this controller
	c.issuerFactory = issuer.NewFactory(ctx)
	c.acmClient = ctx.ACMClient
	c.fieldManager = ctx.FieldManager
	c.recorder = ctx.Recorder

	return c.queue, mustSync, nil
}

// TODO: replace with generic handleObject function (like Navigator)
func (c *controller) secreMutated(obj interface{}) {
	log := c.log.WithName("secreMutated")

	var secret *corev1.Secret
	var ok bool
	secret, ok = obj.(*corev1.Secret)
	if !ok {
		log.Error(nil, "object was not a secret object")
		return
	}
	log = logf.WithResource(log, secret)
	issuers, err := c.issuersForSecret(secret)
	if err != nil {
		log.Error(err, "error looking up issuers observing secret")
		return
	}
	for _, iss := range issuers {
		key, err := keyFunc(iss)
		if err != nil {
			log.Error(err, "error computing key for resource")
			continue
		}
		c.queue.AddRateLimited(key)
	}
}

func (c *controller) ProcessItem(ctx context.Context, key string) error {
	log := logf.FromContext(ctx)
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		log.Error(err, "invalid resource key")
		return nil
	}

	issuer, err := c.issuerLister.Issuers(namespace).Get(name)
	if err != nil {
		if k8sErrors.IsNotFound(err) {
			log.Error(err, "issuer in work queue  on longer exists")
			return nil
		}
		return err
	}

	ctx = logf.NewContext(ctx, logf.WithResource(log, issuer))
	return c.Sync(ctx, issuer)
}

func init() {
	controllerpkg.Register(ControllerName, func(ctx *controllerpkg.ContextFactory) (controllerpkg.Interface, error) {
		return controllerpkg.NewBuilder(ctx, ControllerName).
			For(&controller{}).
			Complete()
	})
}