From 3c8716f6ac797ca78f4b06a78a38abd6aeb20b1c Mon Sep 17 00:00:00 2001 From: Hidde Beydals Date: Fri, 20 Nov 2020 13:10:35 +0100 Subject: [PATCH] Utilize LastHandledReconcileAt for reconcile cmds Signed-off-by: Hidde Beydals --- cmd/flux/create_source_helm.go | 27 +++++++++++++ cmd/flux/reconcile_source_bucket.go | 55 ++++++++++++++++++------- cmd/flux/reconcile_source_git.go | 56 +++++++++++++++++-------- cmd/flux/reconcile_source_helm.go | 63 ++++++++++++++--------------- 4 files changed, 137 insertions(+), 64 deletions(-) diff --git a/cmd/flux/create_source_helm.go b/cmd/flux/create_source_helm.go index d6c94617..c187fa3e 100644 --- a/cmd/flux/create_source_helm.go +++ b/cmd/flux/create_source_helm.go @@ -23,9 +23,11 @@ import ( "net/url" "os" + "github.com/fluxcd/pkg/apis/meta" "github.com/spf13/cobra" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" @@ -242,3 +244,28 @@ func upsertHelmRepository(ctx context.Context, kubeClient client.Client, logger.Successf("source updated") return namespacedName, nil } + +func isHelmRepositoryReady(ctx context.Context, kubeClient client.Client, + namespacedName types.NamespacedName, helmRepository *sourcev1.HelmRepository) wait.ConditionFunc { + return func() (bool, error) { + err := kubeClient.Get(ctx, namespacedName, helmRepository) + if err != nil { + return false, err + } + + // Confirm the state we are observing is for the current generation + if helmRepository.Generation != helmRepository.Status.ObservedGeneration { + return false, nil + } + + if c := apimeta.FindStatusCondition(helmRepository.Status.Conditions, meta.ReadyCondition); c != nil { + switch c.Status { + case metav1.ConditionTrue: + return true, nil + case metav1.ConditionFalse: + return false, fmt.Errorf(c.Message) + } + } + return false, nil + } +} diff --git a/cmd/flux/reconcile_source_bucket.go b/cmd/flux/reconcile_source_bucket.go index 2a6e69b4..f9520507 100644 --- a/cmd/flux/reconcile_source_bucket.go +++ b/cmd/flux/reconcile_source_bucket.go @@ -21,8 +21,10 @@ import ( "fmt" "time" - "github.com/fluxcd/flux2/internal/utils" "github.com/fluxcd/pkg/apis/meta" + "k8s.io/client-go/util/retry" + + "github.com/fluxcd/flux2/internal/utils" "github.com/spf13/cobra" apimeta "k8s.io/apimachinery/pkg/api/meta" @@ -66,35 +68,30 @@ func reconcileSourceBucketCmdRun(cmd *cobra.Command, args []string) error { Namespace: namespace, Name: name, } - - logger.Actionf("annotating Bucket source %s in %s namespace", name, namespace) var bucket sourcev1.Bucket err = kubeClient.Get(ctx, namespacedName, &bucket) if err != nil { return err } - if bucket.Annotations == nil { - bucket.Annotations = map[string]string{ - meta.ReconcileAtAnnotation: time.Now().Format(time.RFC3339Nano), - } - } else { - bucket.Annotations[meta.ReconcileAtAnnotation] = time.Now().Format(time.RFC3339Nano) - } - if err := kubeClient.Update(ctx, &bucket); err != nil { + lastHandledReconcileAt := bucket.Status.LastHandledReconcileAt + logger.Actionf("annotating Bucket source %s in %s namespace", name, namespace) + if err := requestBucketReconciliation(ctx, kubeClient, namespacedName, &bucket); err != nil { return err } logger.Successf("Bucket source annotated") logger.Waitingf("waiting for Bucket source reconciliation") - if err := wait.PollImmediate(pollInterval, timeout, - isBucketReady(ctx, kubeClient, namespacedName, &bucket)); err != nil { + if err := wait.PollImmediate( + pollInterval, timeout, + bucketReconciliationHandled(ctx, kubeClient, namespacedName, &bucket, lastHandledReconcileAt), + ); err != nil { return err } logger.Successf("Bucket source reconciliation completed") - if bucket.Status.Artifact == nil { - return fmt.Errorf("Bucket source reconciliation completed but no artifact was found") + if apimeta.IsStatusConditionFalse(bucket.Status.Conditions, meta.ReadyCondition) { + return fmt.Errorf("Bucket source reconciliation failed") } logger.Successf("fetched revision %s", bucket.Status.Artifact.Revision) return nil @@ -124,3 +121,31 @@ func isBucketReady(ctx context.Context, kubeClient client.Client, return false, nil } } + +func bucketReconciliationHandled(ctx context.Context, kubeClient client.Client, + namespacedName types.NamespacedName, bucket *sourcev1.Bucket, lastHandledReconcileAt string) wait.ConditionFunc { + return func() (bool, error) { + err := kubeClient.Get(ctx, namespacedName, bucket) + if err != nil { + return false, err + } + return bucket.Status.LastHandledReconcileAt != lastHandledReconcileAt, nil + } +} + +func requestBucketReconciliation(ctx context.Context, kubeClient client.Client, + namespacedName types.NamespacedName, bucket *sourcev1.Bucket) error { + return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) { + if err := kubeClient.Get(ctx, namespacedName, bucket); err != nil { + return err + } + if bucket.Annotations == nil { + bucket.Annotations = map[string]string{ + meta.ReconcileAtAnnotation: time.Now().Format(time.RFC3339Nano), + } + } else { + bucket.Annotations[meta.ReconcileAtAnnotation] = time.Now().Format(time.RFC3339Nano) + } + return kubeClient.Update(ctx, bucket) + }) +} diff --git a/cmd/flux/reconcile_source_git.go b/cmd/flux/reconcile_source_git.go index 31cbc0b1..cb95abbc 100644 --- a/cmd/flux/reconcile_source_git.go +++ b/cmd/flux/reconcile_source_git.go @@ -23,6 +23,9 @@ import ( "github.com/fluxcd/flux2/internal/utils" "github.com/fluxcd/pkg/apis/meta" + apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/spf13/cobra" "k8s.io/apimachinery/pkg/types" @@ -63,36 +66,57 @@ func reconcileSourceGitCmdRun(cmd *cobra.Command, args []string) error { Namespace: namespace, Name: name, } - - logger.Actionf("annotating GitRepository source %s in %s namespace", name, namespace) - var gitRepository sourcev1.GitRepository - err = kubeClient.Get(ctx, namespacedName, &gitRepository) + var repository sourcev1.GitRepository + err = kubeClient.Get(ctx, namespacedName, &repository) if err != nil { return err } - if gitRepository.Annotations == nil { - gitRepository.Annotations = map[string]string{ - meta.ReconcileAtAnnotation: time.Now().Format(time.RFC3339Nano), - } - } else { - gitRepository.Annotations[meta.ReconcileAtAnnotation] = time.Now().Format(time.RFC3339Nano) - } - if err := kubeClient.Update(ctx, &gitRepository); err != nil { + logger.Actionf("annotating GitRepository source %s in %s namespace", name, namespace) + if err := requestGitRepositoryReconciliation(ctx, kubeClient, namespacedName, &repository); err != nil { return err } logger.Successf("GitRepository source annotated") + lastHandledReconcileAt := repository.Status.LastHandledReconcileAt logger.Waitingf("waiting for GitRepository source reconciliation") if err := wait.PollImmediate(pollInterval, timeout, - isGitRepositoryReady(ctx, kubeClient, namespacedName, &gitRepository)); err != nil { + gitRepositoryReconciliationHandled(ctx, kubeClient, namespacedName, &repository, lastHandledReconcileAt)); err != nil { return err } logger.Successf("GitRepository source reconciliation completed") - if gitRepository.Status.Artifact == nil { - return fmt.Errorf("GitRepository source reconciliation completed but no artifact was found") + if apimeta.IsStatusConditionFalse(repository.Status.Conditions, meta.ReadyCondition) { + return fmt.Errorf("GitRepository source reconciliation failed") } - logger.Successf("fetched revision %s", gitRepository.Status.Artifact.Revision) + logger.Successf("fetched revision %s", repository.Status.Artifact.Revision) return nil } + +func gitRepositoryReconciliationHandled(ctx context.Context, kubeClient client.Client, + namespacedName types.NamespacedName, repository *sourcev1.GitRepository, lastHandledReconcileAt string) wait.ConditionFunc { + return func() (bool, error) { + err := kubeClient.Get(ctx, namespacedName, repository) + if err != nil { + return false, err + } + return repository.Status.LastHandledReconcileAt != lastHandledReconcileAt, nil + } +} + +func requestGitRepositoryReconciliation(ctx context.Context, kubeClient client.Client, + namespacedName types.NamespacedName, repository *sourcev1.GitRepository) error { + return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) { + if err := kubeClient.Get(ctx, namespacedName, repository); err != nil { + return err + } + if repository.Annotations == nil { + repository.Annotations = map[string]string{ + meta.ReconcileAtAnnotation: time.Now().Format(time.RFC3339Nano), + } + } else { + repository.Annotations[meta.ReconcileAtAnnotation] = time.Now().Format(time.RFC3339Nano) + } + return kubeClient.Update(ctx, repository) + }) +} diff --git a/cmd/flux/reconcile_source_helm.go b/cmd/flux/reconcile_source_helm.go index d2adaa7b..076fcf08 100644 --- a/cmd/flux/reconcile_source_helm.go +++ b/cmd/flux/reconcile_source_helm.go @@ -21,12 +21,13 @@ import ( "fmt" "time" - "github.com/fluxcd/flux2/internal/utils" "github.com/fluxcd/pkg/apis/meta" + "k8s.io/client-go/util/retry" + + "github.com/fluxcd/flux2/internal/utils" "github.com/spf13/cobra" apimeta "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "sigs.k8s.io/controller-runtime/pkg/client" @@ -66,61 +67,57 @@ func reconcileSourceHelmCmdRun(cmd *cobra.Command, args []string) error { Namespace: namespace, Name: name, } - - logger.Actionf("annotating HelmRepository source %s in %s namespace", name, namespace) - var helmRepository sourcev1.HelmRepository - err = kubeClient.Get(ctx, namespacedName, &helmRepository) + var repository sourcev1.HelmRepository + err = kubeClient.Get(ctx, namespacedName, &repository) if err != nil { return err } - if helmRepository.Annotations == nil { - helmRepository.Annotations = map[string]string{ - meta.ReconcileAtAnnotation: time.Now().Format(time.RFC3339Nano), - } - } else { - helmRepository.Annotations[meta.ReconcileAtAnnotation] = time.Now().Format(time.RFC3339Nano) - } - if err := kubeClient.Update(ctx, &helmRepository); err != nil { + logger.Actionf("annotating HelmRepository source %s in %s namespace", name, namespace) + if err := requestHelmRepositoryReconciliation(ctx, kubeClient, namespacedName, &repository); err != nil { return err } logger.Successf("HelmRepository source annotated") + lastHandledReconcileAt := repository.Status.LastHandledReconcileAt logger.Waitingf("waiting for HelmRepository source reconciliation") if err := wait.PollImmediate(pollInterval, timeout, - isHelmRepositoryReady(ctx, kubeClient, namespacedName, &helmRepository)); err != nil { + helmRepositoryReconciliationHandled(ctx, kubeClient, namespacedName, &repository, lastHandledReconcileAt)); err != nil { return err } logger.Successf("HelmRepository source reconciliation completed") - if helmRepository.Status.Artifact == nil { - return fmt.Errorf("HelmRepository source reconciliation completed but no artifact was found") + if apimeta.IsStatusConditionFalse(repository.Status.Conditions, meta.ReadyCondition) { + return fmt.Errorf("HelmRepository source reconciliation failed") } - logger.Successf("fetched revision %s", helmRepository.Status.Artifact.Revision) + logger.Successf("fetched revision %s", repository.Status.Artifact.Revision) return nil } -func isHelmRepositoryReady(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, helmRepository *sourcev1.HelmRepository) wait.ConditionFunc { +func helmRepositoryReconciliationHandled(ctx context.Context, kubeClient client.Client, + namespacedName types.NamespacedName, repository *sourcev1.HelmRepository, lastHandledReconcileAt string) wait.ConditionFunc { return func() (bool, error) { - err := kubeClient.Get(ctx, namespacedName, helmRepository) + err := kubeClient.Get(ctx, namespacedName, repository) if err != nil { return false, err } + return repository.Status.LastHandledReconcileAt != lastHandledReconcileAt, nil + } +} - // Confirm the state we are observing is for the current generation - if helmRepository.Generation != helmRepository.Status.ObservedGeneration { - return false, nil +func requestHelmRepositoryReconciliation(ctx context.Context, kubeClient client.Client, + namespacedName types.NamespacedName, repository *sourcev1.HelmRepository) error { + return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) { + if err := kubeClient.Get(ctx, namespacedName, repository); err != nil { + return err } - - if c := apimeta.FindStatusCondition(helmRepository.Status.Conditions, meta.ReadyCondition); c != nil { - switch c.Status { - case metav1.ConditionTrue: - return true, nil - case metav1.ConditionFalse: - return false, fmt.Errorf(c.Message) + if repository.Annotations == nil { + repository.Annotations = map[string]string{ + meta.ReconcileAtAnnotation: time.Now().Format(time.RFC3339Nano), } + } else { + repository.Annotations[meta.ReconcileAtAnnotation] = time.Now().Format(time.RFC3339Nano) } - return false, nil - } + return kubeClient.Update(ctx, repository) + }) }