package controller import ( "context" "errors" "fmt" "time" clientset "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/clientset/versioned" "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/clientset/versioned/scheme" informers "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/informers/externalversions" logf "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/logs" "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/util" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/discovery" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" clientv1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/flowcontrol" "k8s.io/utils/clock" ) // This sets the informer's resync period to 10 hours // following the controller-runtime defaults // and following discussion: https://github.com/kubernetes-sigs/controller-runtime/pull/88#issuecomment-408500629 const resyncPeriod = 10 * time.Hour // Context contains various types that are used by controller implementations. // We purposely don't have specific informers/listers here, and instead keep a // reference to a SharedInformerFactory so that controllers can choose // themselves which listers are required. // Each component should be given distinct Contexts, built from the // ContextFactory that has configured the underlying client to use separate // User Agents. type Context struct { // RootContext is the root context for the controller RootContext context.Context // StopCh is a channel that will be closed when the controller is signalled // to exit StopCh <-chan struct{} // FieldManager is the string that should be used as the field manager when // applying API object. This value is derived from the user agent. FieldManager string // RESTConfig is the loaded Kubernetes apiserver rest client configuration RESTConfig *rest.Config // Client is a Kubernetes clientset Client kubernetes.Interface // ACMClient is a anthos-cert-manager clientset ACMClient clientset.Interface // DiscoveryClient is a discovery interface. Usually set to Client.Discovery unless a fake client is in use. DiscoveryClient discovery.DiscoveryInterface // Recorder to record events to Recorder record.EventRecorder // KubeSharedInformerFactory can be used to obtain shared // SharedIndexInformer instances for Kubernetes types KubeSharedInformerFactory kubeinformers.SharedInformerFactory // SharedInformerFactory can be used to obtain shared SharedIndexInformer // instances SharedInformerFactory informers.SharedInformerFactory ContextOptions } // ContextOptions are static Controller Context options. type ContextOptions struct { // APIServerHost is the host address of the target Kubernetes API server. APIServerHost string // Kubeconfig is the optional file path location to a kubeconfig to connect // and authenticate to the API server. Kubeconfig string // Kubernetes API QPS is the value of the maximum QPS to the API server from // clients. KubernetesAPIQPS float32 // KubernetesAPIBurst is the value of the Maximum burst for throttle. KubernetesAPIBurst int // Namespace is the namespace to operate within. // If unset, operates on all namespaces Namespace string // Clock should be used to access the current time instead of relying on // time.Now, to make it easier to test controllers that utilise time Clock clock.Clock IssuerOptions CertificateOptions SchedulerOptions } type IssuerOptions struct { // ClusterResourceNamespace is the namespace to store resources created by // non-namespaced resources (e.g. ClusterIssuer) in. ClusterResourceNamespace string // ClusterIssuerAmbientCredentials controls whether a cluster issuer should // pick up ambient credentials, such as those from metadata services, to // construct clients. ClusterIssuerAmbientCredentials bool // IssuerAmbientCredentials controls whether an issuer should pick up ambient // credentials, such as those from metadata services, to construct clients. IssuerAmbientCredentials bool } type CertificateOptions struct { // EnableOwnerRef controls whether the certificate is configured as an owner of // secret where the effective TLS certificate is stored. EnableOwnerRef bool // CopiedAnnotationPrefixes defines which annotations should be copied // Certificate -> CertificateRequest, CertificateRequest -> Order. CopiedAnnotationPrefixes []string } type SchedulerOptions struct { // MaxConcurrentChallenges determines the maximum number of challenges that can be // scheduled as 'processing' at once. MaxConcurrentChallenges int } // ContextFactory is used for constructing new Contexts who's clients have been // configured with a User Agent built from the component name. type ContextFactory struct { // baseRestConfig is the base Kubernetes REST config that can authenticate to // the Kubernetes API server. baseRestConfig *rest.Config // log is the factory logger which is used to construct event broadcasters. log logr.Logger // ctx is the base controller Context that all Contexts will be built from. ctx *Context } // NewContextFactory builds a ContextFactory that builds controller Contexts // that have been configured for that components User Agent. // All resulting Context's and clients contain the same RateLimiter and // corresponding QPS and Burst buckets. func NewContextFactory(ctx context.Context, opts ContextOptions) (*ContextFactory, error) { // Load the users Kubernetes config restConfig, err := clientcmd.BuildConfigFromFlags(opts.APIServerHost, opts.Kubeconfig) if err != nil { return nil, fmt.Errorf("error creating rest config: %w", err) } restConfig = util.RestConfigWithUserAgent(restConfig) restConfig.QPS = opts.KubernetesAPIQPS restConfig.Burst = opts.KubernetesAPIBurst // Construct a single RateLimiter used across all built Context's clients. A // single rate limiter (with corresponding QPS and Burst buckets) are // preserved for all Contexts. // Adapted from // https://github.com/kubernetes/client-go/blob/v0.23.3/kubernetes/clientset.go#L431-L435 if restConfig.RateLimiter == nil && restConfig.QPS > 0 { if restConfig.Burst <= 0 { return nil, errors.New("burst is required to be greater than 0 when RateLimiter is not set and QPS is set to greater than 0") } restConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(restConfig.QPS, restConfig.Burst) } clients, err := buildClients(restConfig) if err != nil { return nil, err } sharedInformarFactory := informers.NewSharedInformerFactoryWithOptions(clients.acmClient, resyncPeriod, informers.WithNamespace(opts.Namespace)) kubeSharedInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(clients.kubeClient, resyncPeriod, kubeinformers.WithNamespace(opts.Namespace)) return &ContextFactory{ baseRestConfig: restConfig, log: logf.FromContext(ctx), ctx: &Context{ RootContext: ctx, StopCh: ctx.Done(), KubeSharedInformerFactory: kubeSharedInformerFactory, SharedInformerFactory: sharedInformarFactory, ContextOptions: opts, }, }, nil } // Build builds a new controller Context who's clients have a User Agent // derived from the optional component name. func (c *ContextFactory) Build(component ...string) (*Context, error) { restConfig := util.RestConfigWithUserAgent(c.baseRestConfig, component...) clients, err := buildClients(restConfig) if err != nil { return nil, err } eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(logf.WithInfof(c.log.V(logf.DebugLevel)).Infof) eventBroadcaster.StartRecordingToSink(&clientv1.EventSinkImpl{Interface: clients.kubeClient.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: util.PrefixFromUserAgent(restConfig.UserAgent)}) ctx := *c.ctx ctx.FieldManager = util.PrefixFromUserAgent(restConfig.UserAgent) ctx.RESTConfig = restConfig ctx.Client = clients.kubeClient ctx.ACMClient = clients.acmClient ctx.DiscoveryClient = clients.kubeClient.Discovery() ctx.Recorder = recorder return &ctx, nil } // contextClients is a helper struct containing API clients. type contextClients struct { kubeClient kubernetes.Interface acmClient clientset.Interface } // buildClients builds all required clients for the context using the given // REST config. func buildClients(restConfig *rest.Config) (contextClients, error) { // create an anthos cert manager client acmClient, err := clientset.NewForConfig(restConfig) if err != nil { return contextClients{}, fmt.Errorf("error creating internal group client: %w", err) } // create a Kubernetes api client kubeClient, err := kubernetes.NewForConfig(restConfig) if err != nil { return contextClients{}, fmt.Errorf("error creating kubernetes client: %w", err) } return contextClients{kubeClient, acmClient}, nil }