package issuers
import (
"context"
"github.com/go-logr/logr"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
"gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/issuer"
logf "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/logs"
acmClient "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/clientset/versioned"
acmlisters "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/listers/anthoscertmanager/v1"
controllerpkg "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller"
)
const (
ControllerName = "issuers"
)
var keyFunc = controllerpkg.KeyFunc
type controller struct {
issuerLister acmlisters.IssuerLister
secretLister corelisters.SecretLister
queue workqueue.RateLimitingInterface
log logr.Logger
acmClient acmClient.Interface
recorder record.EventRecorder
issuerFactory issuer.Factory
// fieldManager is the manager name used for the Apply operations.
fieldManager string
}
// Register registers and constructs the controller using the provided context.
// It returns the workqueue to be used to enqueue items, a list of
// InformerSynced functions that must be synced, or an error.
func (c *controller) Register(ctx *controllerpkg.Context) (workqueue.RateLimitingInterface, []cache.InformerSynced, error) {
c.log = logf.FromContext(ctx.RootContext, ControllerName)
// create a queue used to queue up items to be processed
c.queue = workqueue.NewNamedRateLimitingQueue(controllerpkg.DefaultItemBasedRateLimiter(), ControllerName)
issuerInformer := ctx.SharedInformerFactory.AnthosCertmanager().V1().Issuers()
secretInformer := ctx.KubeSharedInformerFactory.Core().V1().Secrets()
mustSync := []cache.InformerSynced{
issuerInformer.Informer().HasSynced,
secretInformer.Informer().HasSynced,
}
c.issuerLister = issuerInformer.Lister()
c.secretLister = secretInformer.Lister()
issuerInformer.Informer().AddEventHandler(&controllerpkg.QueuingEventHandler{Queue: c.queue})
secretInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{WorkFunc: c.secreMutated})
// instantiate additional helpers used by this controller
// c.issuerFactory = issuer.NewFactory(ctx)
c.acmClient = ctx.ACMClient
c.fieldManager = ctx.FieldManager
c.recorder = ctx.Recorder
return c.queue, mustSync, nil
}
// TODO: replace with generic handleObject function (like Navigator)
func (c *controller) secreMutated(obj interface{}) {
log := c.log.WithName("secreMutated")
var secret *corev1.Secret
var ok bool
secret, ok = obj.(*corev1.Secret)
if !ok {
log.Error(nil, "object was not a secret object")
return
}
log = logf.WithResource(log, secret)
issuers, err := c.issuersForSecret(secret)
if err != nil {
log.Error(err, "error looking up issuers observing secret")
return
}
for _, iss := range issuers {
key, err := keyFunc(iss)
if err != nil {
log.Error(err, "error computing key for resource")
continue
}
c.queue.AddRateLimited(key)
}
}
func (c *controller) ProcessItem(ctx context.Context, key string) error {
log := logf.FromContext(ctx)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
log.Error(err, "invalid resource key")
return nil
}
issuer, err := c.issuerLister.Issuers(namespace).Get(name)
if err != nil {
if k8sErrors.IsNotFound(err) {
log.Error(err, "issuer in work queue on longer exists")
return nil
}
return err
}
ctx = logf.NewContext(ctx, logf.WithResource(log, issuer))
return c.Sync(ctx, issuer)
}
func init() {
controllerpkg.Register(ControllerName, func(ctx *controllerpkg.ContextFactory) (controllerpkg.Interface, error) {
return controllerpkg.NewBuilder(ctx, ControllerName).
For(&controller{}).
Complete()
})
}