package issuing
import (
"context"
"crypto"
"time"
apiutil "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/api/util"
acmapi "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/apis/anthoscertmanager/v1"
acmmeta "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/apis/meta/v1"
acmClient "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/clientset/versioned"
acmInformers "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/informers/externalversions"
acmlisters "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/client/listers/anthoscertmanager/v1"
controllerpkg "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller"
"gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller/certificates"
"gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller/certificates/issuing/internal"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
policies "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/controller/certificates/policies"
logf "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/logs"
utilpki "gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/util/pki"
"gitbucket.jerxie.com/yangyangxie/AnthosCertManager/pkg/util/predicate"
"github.com/go-logr/logr"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
)
const (
ControllerName = "certificates-issuing"
)
// This controller observes the state of the certificate's 'Issuing' condition,
// which will then copy the signed certificates and private key to the target
// Secret resource.
type controller struct {
certificateLister acmlisters.CertificateLister
certificateRequestLister acmlisters.CertificateRequestLister
secretLister corelisters.SecretLister
recorder record.EventRecorder
clock clock.Clock
client acmClient.Interface
// postIssuancePolicyChain is the policies chain to ensure that all Secret
// metadata and output formats are kept are present and correct.
postIssuancePolicyChain policies.Chain
// secretsUpdateData is used by the SecretTemplate controller for
// re-reconciling Secrets where the SecretTemplate is not up to date with a
// Certificate's secret.
secretsUpdateData func(context.Context, *acmapi.Certificate, internal.SecretData) error
}
func NewController(
log logr.Logger,
kubeClient kubernetes.Interface,
client acmClient.Interface,
factory informers.SharedInformerFactory,
acmFactory acmInformers.SharedInformerFactory,
recorder record.EventRecorder,
clock clock.Clock,
certificateControllerOptions controllerpkg.CertificateOptions,
fieldManager string,
) (*controller, workqueue.RateLimitingInterface, []cache.InformerSynced) {
// create a queue used to queue up items to be processed
queue := workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(time.Second, time.Second*30), ControllerName)
// obtain references to all the informers used by this controller
certificateInformer := acmFactory.AnthosCertmanager().V1().Certificates()
certificateRequestInformer := acmFactory.AnthosCertmanager().V1().CertificateRequests()
secretsInformer := factory.Core().V1().Secrets()
certificateInformer.Informer().AddEventHandler(&controllerpkg.QueuingEventHandler{Queue: queue})
certificateRequestInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{
WorkFunc: certificates.EnqueueCertificatesForResourceUsingPredicates(log, queue, certificateInformer.Lister(), labels.Everything(), predicate.ResourceOwnerOf),
})
secretsInformer.Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{
// Issuer reconciles on changes to the Secret named `spec.nextPrivateKeySecretName`
WorkFunc: certificates.EnqueueCertificatesForResourceUsingPredicates(log, queue, certificateInformer.Lister(), labels.Everything(), predicate.ResourceOwnerOf, predicate.ExtractResourceName(predicate.CertificateNextPrivateKeySecretName)),
})
// build a list of InformerSynced functions that will be returned by the Register method.
// the controller will only begin processing items once all of these informers have synced.
mustSync := []cache.InformerSynced{
certificateInformer.Informer().HasSynced,
certificateInformer.Informer().HasSynced,
secretsInformer.Informer().HasSynced,
}
secretsManager := internal.NewSecretsManager(
kubeClient.CoreV1(), secretsInformer.Lister(),
fieldManager, certificateControllerOptions.EnableOwnerRef,
)
return &controller{
certificateLister: certificateInformer.Lister(),
certificateRequestLister: certificateRequestInformer.Lister(),
secretLister: secretsInformer.Lister(),
recorder: recorder,
clock: clock,
secretsUpdateData: secretsManager.UpdateData,
client: client,
postIssuancePolicyChain: policies.NewSecretPostIssuancePolicyChain(certificateControllerOptions.EnableOwnerRef,
fieldManager),
}, queue, mustSync
}
func (c *controller) ProcessItem(ctx context.Context, key string) error {
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
log := logf.FromContext(ctx).WithValues("key", key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return nil
}
crt, err := c.certificateLister.Certificates(namespace).Get(name)
if apierrors.IsNotFound(err) {
log.V(logf.DebugLevel).Info("Certificate not found for key", "error", err.Error())
}
if err != nil {
return err
}
log = logf.WithResource(log, crt)
ctx = logf.NewContext(ctx, log)
// if the certificate condition is not equal to issuing, we need to ensure all non-issuing related secret Data is correct.
if !apiutil.CertificateHasCondition(crt, acmapi.CertificateCondition{
Type: acmapi.CertificateConditionIssuing,
Status: acmmeta.ConditionTrue,
}) {
return c.ensureSecretData(ctx, log, crt)
}
return nil
}
// updateOrApplyStatus will update the controller status.
func (c *controller) updateOrApplyStatus(ctx context.Context, crt *acmapi.Certificate, conditionRemoved bool) error {
// TODO: conditionally doing the operation based on SSA feature gate
_, err := c.client.AnthosCertmanagerV1().Certificates(crt.Namespace).UpdateStatus(ctx, crt, metav1.UpdateOptions{})
return err
}
// issueCertificate will ensure the public key of the CSR matches the signed
// certificate, and then store the certificate, CA and private key into the
// Secret in the appropriate format type.
func (c *controller) issueCertificate(ctx context.Context, nextRevision int, crt *acmapi.Certificate, req *acmapi.CertificateRequest, pk crypto.Signer) error {
crt = crt.DeepCopy()
if crt.Spec.PrivateKey == nil {
crt.Spec.PrivateKey = &acmapi.CertificatePrivateKey{}
}
pkData, err := utilpki.EncodePrivateKey(pk, crt.Spec.PrivateKey.Encoding)
if err != nil {
return err
}
secretData := internal.SecretData{
PrivateKey: pkData,
Certificate: req.Status.Certificate,
CA: req.Status.CA,
}
if err := c.secretsUpdateData(ctx, crt, secretData); err != nil {
return err
}
// Set status.revision to revision of the CertificateRequest
crt.Status.Revision = &nextRevision
// Remove Issuing status condition
// TODO: Once we move to only server-side apply API calls, this should be changed to setting the Issuing condition to False
apiutil.RemoveCertificateCondition(crt, acmapi.CertificateConditionIssuing)
// Clearn the failed attempts
crt.Status.FailedIssuanceAttempts = nil
// Clean status.lastFailureTime
crt.Status.LastFailureTime = nil
if err := c.updateOrApplyStatus(ctx, crt, true); err != nil {
return err
}
message := "The certificate has been successfully issused"
c.recorder.Event(crt, corev1.EventTypeNormal, "Issuing", message)
return nil
}
// controllerWrapper wraps the `controller` structure to make it implement
// the controllerpkg.queueingController interface
type controllerWrapper struct {
*controller
}
func (c *controllerWrapper) Register(ctx *controllerpkg.Context) (workqueue.RateLimitingInterface, []cache.InformerSynced, error) {
// construct a new named logger to be reused throughout the controller
log := logf.FromContext(ctx.RootContext, ControllerName)
ctrl, queue, mustSync := NewController(log,
ctx.Client,
ctx.ACMClient,
ctx.KubeSharedInformerFactory,
ctx.SharedInformerFactory,
ctx.Recorder,
ctx.Clock,
ctx.CertificateOptions,
ctx.FieldManager,
)
c.controller = ctrl
return queue, mustSync, nil
}
func init() {
controllerpkg.Register(ControllerName, func(ctx *controllerpkg.ContextFactory) (controllerpkg.Interface, error) {
return controllerpkg.NewBuilder(ctx, ControllerName).For(&controllerWrapper{}).Complete()
})
}