Newer
Older
AnthosCertManager / pkg / controller / context.go
@Yangyang Xie Yangyang Xie on 22 Nov 2022 8 KB fix issues
package controller

import (
	"context"
	"errors"
	"fmt"
	"time"

	clientset "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/clientset/versioned"
	"gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/clientset/versioned/scheme"
	informers "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/informers/externalversions"
	logf "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/logs"
	"gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/util"
	"github.com/go-logr/logr"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/client-go/discovery"
	kubeinformers "k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	clientv1 "k8s.io/client-go/kubernetes/typed/core/v1"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/tools/record"
	"k8s.io/client-go/util/flowcontrol"
	"k8s.io/utils/clock"
)

// This sets the informer's resync period to 10 hours
// following the controller-runtime defaults
// and following discussion: https://github.com/kubernetes-sigs/controller-runtime/pull/88#issuecomment-408500629
const resyncPeriod = 10 * time.Hour

// Context contains various types that are used by controller implementations.
// We purposely don't have specific informers/listers here, and instead keep a
// reference to a SharedInformerFactory so that controllers can choose
// themselves which listers are required.
// Each component should be given distinct Contexts, built from the
// ContextFactory that has configured the underlying client to use separate
// User Agents.
type Context struct {
	// RootContext is the root context for the controller
	RootContext context.Context

	// StopCh is a channel that will be closed when the controller is signalled
	// to exit
	StopCh <-chan struct{}

	// FieldManager is the string that should be used as the field manager when
	// applying API object. This value is derived from the user agent.
	FieldManager string
	// RESTConfig is the loaded Kubernetes apiserver rest client configuration
	RESTConfig *rest.Config
	// Client is a Kubernetes clientset
	Client kubernetes.Interface

	// ACMClient is a anthos-cert-manager clientset
	ACMClient clientset.Interface

	// DiscoveryClient is a discovery interface. Usually set to Client.Discovery unless a fake client is in use.
	DiscoveryClient discovery.DiscoveryInterface

	// Recorder to record events to
	Recorder record.EventRecorder

	// KubeSharedInformerFactory can be used to obtain shared
	// SharedIndexInformer instances for Kubernetes types
	KubeSharedInformerFactory kubeinformers.SharedInformerFactory
	// SharedInformerFactory can be used to obtain shared SharedIndexInformer
	// instances
	SharedInformerFactory informers.SharedInformerFactory

	ContextOptions
}

// ContextOptions are static Controller Context options.
type ContextOptions struct {
	// APIServerHost is the host address of the target Kubernetes API server.
	APIServerHost string

	// Kubeconfig is the optional file path location to a kubeconfig to connect
	// and authenticate to the API server.
	Kubeconfig string

	// Kubernetes API QPS is the value of the maximum QPS to the API server from
	// clients.
	KubernetesAPIQPS float32

	// KubernetesAPIBurst is the value of the Maximum burst for throttle.
	KubernetesAPIBurst int

	// Namespace is the namespace to operate within.
	// If unset, operates on all namespaces
	Namespace string

	// Clock should be used to access the current time instead of relying on
	// time.Now, to make it easier to test controllers that utilise time
	Clock clock.Clock

	IssuerOptions
	CertificateOptions
	SchedulerOptions
}

type IssuerOptions struct {
	// ClusterResourceNamespace is the namespace to store resources created by
	// non-namespaced resources (e.g. ClusterIssuer) in.
	ClusterResourceNamespace string

	// ClusterIssuerAmbientCredentials controls whether a cluster issuer should
	// pick up ambient credentials, such as those from metadata services, to
	// construct clients.
	ClusterIssuerAmbientCredentials bool

	// IssuerAmbientCredentials controls whether an issuer should pick up ambient
	// credentials, such as those from metadata services, to construct clients.
	IssuerAmbientCredentials bool
}

type CertificateOptions struct {
	// EnableOwnerRef controls whether the certificate is configured as an owner of
	// secret where the effective TLS certificate is stored.
	EnableOwnerRef bool
	// CopiedAnnotationPrefixes defines which annotations should be copied
	// Certificate -> CertificateRequest, CertificateRequest -> Order.
	CopiedAnnotationPrefixes []string
}

type SchedulerOptions struct {
	// MaxConcurrentChallenges determines the maximum number of challenges that can be
	// scheduled as 'processing' at once.
	MaxConcurrentChallenges int
}

// ContextFactory is used for constructing new Contexts who's clients have been
// configured with a User Agent built from the component name.
type ContextFactory struct {
	// baseRestConfig is the base Kubernetes REST config that can authenticate to
	// the Kubernetes API server.
	baseRestConfig *rest.Config

	// log is the factory logger which is used to construct event broadcasters.
	log logr.Logger

	// ctx is the base controller Context that all Contexts will be built from.
	ctx *Context
}

// NewContextFactory builds a ContextFactory that builds controller Contexts
// that have been configured for that components User Agent.
// All resulting Context's and clients contain the same RateLimiter and
// corresponding QPS and Burst buckets.
func NewContextFactory(ctx context.Context, opts ContextOptions) (*ContextFactory, error) {
	// Load the users Kubernetes config
	restConfig, err := clientcmd.BuildConfigFromFlags(opts.APIServerHost, opts.Kubeconfig)
	if err != nil {
		return nil, fmt.Errorf("error creating rest config: %w", err)
	}
	restConfig = util.RestConfigWithUserAgent(restConfig)
	restConfig.QPS = opts.KubernetesAPIQPS
	restConfig.Burst = opts.KubernetesAPIBurst

	// Construct a single RateLimiter used across all built Context's clients. A
	// single rate limiter (with corresponding QPS and Burst buckets) are
	// preserved for all Contexts.
	// Adapted from
	// https://github.com/kubernetes/client-go/blob/v0.23.3/kubernetes/clientset.go#L431-L435
	if restConfig.RateLimiter == nil && restConfig.QPS > 0 {
		if restConfig.Burst <= 0 {
			return nil, errors.New("burst is required to be greater than 0 when RateLimiter is not set and QPS is set to greater than 0")
		}
		restConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(restConfig.QPS, restConfig.Burst)
	}

	clients, err := buildClients(restConfig)
	if err != nil {
		return nil, err
	}

	sharedInformarFactory := informers.NewSharedInformerFactoryWithOptions(clients.acmClient, resyncPeriod, informers.WithNamespace(opts.Namespace))
	kubeSharedInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(clients.kubeClient, resyncPeriod, kubeinformers.WithNamespace(opts.Namespace))

	return &ContextFactory{
		baseRestConfig: restConfig,
		log:            logf.FromContext(ctx),
		ctx: &Context{
			RootContext:               ctx,
			StopCh:                    ctx.Done(),
			KubeSharedInformerFactory: kubeSharedInformerFactory,
			SharedInformerFactory:     sharedInformarFactory,
			ContextOptions:            opts,
		},
	}, nil
}

// Build builds a new controller Context who's clients have a User Agent
// derived from the optional component name.
func (c *ContextFactory) Build(component ...string) (*Context, error) {
	restConfig := util.RestConfigWithUserAgent(c.baseRestConfig, component...)

	clients, err := buildClients(restConfig)
	if err != nil {
		return nil, err
	}
	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartLogging(logf.WithInfof(c.log.V(logf.DebugLevel)).Infof)
	eventBroadcaster.StartRecordingToSink(&clientv1.EventSinkImpl{Interface: clients.kubeClient.CoreV1().Events("")})
	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: util.PrefixFromUserAgent(restConfig.UserAgent)})

	ctx := *c.ctx
	ctx.FieldManager = util.PrefixFromUserAgent(restConfig.UserAgent)
	ctx.RESTConfig = restConfig
	ctx.Client = clients.kubeClient
	ctx.ACMClient = clients.acmClient
	ctx.DiscoveryClient = clients.kubeClient.Discovery()
	ctx.Recorder = recorder

	return &ctx, nil
}

// contextClients is a helper struct containing API clients.
type contextClients struct {
	kubeClient kubernetes.Interface
	acmClient  clientset.Interface
}

// buildClients builds all required clients for the context using the given
// REST config.
func buildClients(restConfig *rest.Config) (contextClients, error) {

	// create an anthos cert manager client
	acmClient, err := clientset.NewForConfig(restConfig)
	if err != nil {
		return contextClients{}, fmt.Errorf("error creating internal group client: %w", err)
	}
	// create a Kubernetes api client
	kubeClient, err := kubernetes.NewForConfig(restConfig)
	if err != nil {
		return contextClients{}, fmt.Errorf("error creating kubernetes client: %w", err)
	}

	return contextClients{kubeClient, acmClient}, nil
}