package controller import ( "reflect" "strings" "time" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ) var ( // KeyFunc creates a key for an API object. The key can be passed to a // worker function that processes an object from a queue such as // ProcessItem. KeyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc ) // QueuingEventHandler is an implementation of cache.ResourceEventHandler that // simply queues objects that are added/updated/deleted. type QueuingEventHandler struct { Queue workqueue.RateLimitingInterface } // DefaultItemBasedRateLimiter returns a new rate limiter with base delay of 5 // seconds, max delay of 5 minutes. func DefaultItemBasedRateLimiter() workqueue.RateLimiter { return workqueue.NewItemExponentialFailureRateLimiter(time.Second*5, time.Minute*5) } // Enqueue adds a key for an object to the workqueue. func (q *QueuingEventHandler) Enqueue(obj interface{}) { key, err := KeyFunc(obj) if err != nil { runtime.HandleError(err) return } q.Queue.Add(key) } // OnAdd adds a newly created object to the workqueue. func (q *QueuingEventHandler) OnAdd(obj interface{}) { q.Enqueue(obj) } // OnUpdate adds an updated object to the workqueue. func (q *QueuingEventHandler) OnUpdate(old, new interface{}) { if reflect.DeepEqual(old, new) { return } q.Enqueue(new) } // OnDelete adds a deleted object to the workqueue for processing. func (q *QueuingEventHandler) OnDelete(obj interface{}) { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if ok { obj = tombstone.Obj } q.Enqueue(obj) } // BlockingEventHandler is an implementation of cache.ResourceEventHandler that // simply synchronously calls it's WorkFunc upon calls to OnAdd, OnUpdate or // OnDelete. type BlockingEventHandler struct { WorkFunc func(obj interface{}) } // Enqueue synchronously adds a key for an object to the workqueue. func (b *BlockingEventHandler) Enqueue(obj interface{}) { b.WorkFunc(obj) } // OnAdd synchronously adds a newly created object to the workqueue. func (b *BlockingEventHandler) OnAdd(obj interface{}) { b.WorkFunc(obj) } // OnUpdate synchronously adds an updated object to the workqueue. func (b *BlockingEventHandler) OnUpdate(old, new interface{}) { if reflect.DeepEqual(old, new) { return } b.WorkFunc(new) } // OnDelete synchronously adds a deleted object to the workqueue. func (b *BlockingEventHandler) OnDelete(obj interface{}) { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if ok { obj = tombstone.Obj } b.WorkFunc(obj) } // BuildAnnotationsCopy takes a map of annotations and a list of prefix // filters and builds a filtered map of annotations. It is used to filter // annotations to be copied from Certificate to CertificateRequest and from // CertificateSigningRequest to Order. func BuildAnnotationsToCopy(allAnnotations map[string]string, prefixes []string) map[string]string { filteredAnnotations := make(map[string]string) includeAll := false for _, v := range prefixes { if v == "*" { includeAll = true } } for _, annotation := range prefixes { prefix := strings.TrimPrefix(annotation, "-") for k, v := range allAnnotations { if strings.HasPrefix(annotation, "-") { if strings.HasPrefix(k, prefix) { // If this is an annotation to not be copied. delete(filteredAnnotations, k) } } else if includeAll || strings.HasPrefix(k, annotation) { // If this is an annotation to be copied or if 'all' should be copied. filteredAnnotations[k] = v } } } return filteredAnnotations }