From 5df8f7313c40e224043e2b50c7959cb07951dbe5 Mon Sep 17 00:00:00 2001 From: Somtochi Onyekwere Date: Thu, 28 Jan 2021 12:10:40 +0100 Subject: [PATCH] Refactor reconcile commands Signed-off-by: Somtochi Onyekwere --- cmd/flux/reconcile_helmrelease.go | 15 ++++- cmd/flux/reconcile_kustomization.go | 10 ++- cmd/flux/reconcile_source_bucket.go | 90 +++----------------------- cmd/flux/reconcile_source_git.go | 96 +++------------------------- cmd/flux/reconcile_source_helm.go | 97 +++-------------------------- 5 files changed, 46 insertions(+), 262 deletions(-) diff --git a/cmd/flux/reconcile_helmrelease.go b/cmd/flux/reconcile_helmrelease.go index 0444d4b3..5d919582 100644 --- a/cmd/flux/reconcile_helmrelease.go +++ b/cmd/flux/reconcile_helmrelease.go @@ -99,11 +99,20 @@ func reconcileHrCmdRun(cmd *cobra.Command, args []string) error { } switch helmRelease.Spec.Chart.Spec.SourceRef.Kind { case sourcev1.HelmRepositoryKind: - err = reconcileSourceHelmCmdRun(nil, []string{helmRelease.Spec.Chart.Spec.SourceRef.Name}) + err = reconcileCommand{ + apiType: helmRepositoryType, + object: helmRepositoryAdapter{&sourcev1.HelmRepository{}}, + }.run(nil, []string{helmRelease.Spec.Chart.Spec.SourceRef.Name}) case sourcev1.GitRepositoryKind: - err = reconcileSourceGitCmdRun(nil, []string{helmRelease.Spec.Chart.Spec.SourceRef.Name}) + err = reconcileCommand{ + apiType: gitRepositoryType, + object: gitRepositoryAdapter{&sourcev1.GitRepository{}}, + }.run(nil, []string{helmRelease.Spec.Chart.Spec.SourceRef.Name}) case sourcev1.BucketKind: - err = reconcileSourceBucketCmdRun(nil, []string{helmRelease.Spec.Chart.Spec.SourceRef.Name}) + err = reconcileCommand{ + apiType: bucketType, + object: bucketAdapter{&sourcev1.Bucket{}}, + }.run(nil, []string{helmRelease.Spec.Chart.Spec.SourceRef.Name}) } if err != nil { return err diff --git a/cmd/flux/reconcile_kustomization.go b/cmd/flux/reconcile_kustomization.go index 24f29094..1b0aaaa0 100644 --- a/cmd/flux/reconcile_kustomization.go +++ b/cmd/flux/reconcile_kustomization.go @@ -97,9 +97,15 @@ func reconcileKsCmdRun(cmd *cobra.Command, args []string) error { } switch kustomization.Spec.SourceRef.Kind { case sourcev1.GitRepositoryKind: - err = reconcileSourceGitCmdRun(nil, []string{kustomization.Spec.SourceRef.Name}) + err = reconcileCommand{ + apiType: gitRepositoryType, + object: gitRepositoryAdapter{&sourcev1.GitRepository{}}, + }.run(nil, []string{kustomization.Spec.SourceRef.Name}) case sourcev1.BucketKind: - err = reconcileSourceBucketCmdRun(nil, []string{kustomization.Spec.SourceRef.Name}) + err = reconcileCommand{ + apiType: bucketType, + object: bucketAdapter{&sourcev1.Bucket{}}, + }.run(nil, []string{kustomization.Spec.SourceRef.Name}) } if err != nil { return err diff --git a/cmd/flux/reconcile_source_bucket.go b/cmd/flux/reconcile_source_bucket.go index 3fe8e017..143718f2 100644 --- a/cmd/flux/reconcile_source_bucket.go +++ b/cmd/flux/reconcile_source_bucket.go @@ -19,13 +19,7 @@ package main import ( "context" "fmt" - "time" - "github.com/fluxcd/pkg/apis/meta" - "k8s.io/client-go/util/retry" - - "github.com/fluxcd/flux2/internal/utils" - "github.com/spf13/cobra" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -43,64 +37,16 @@ var reconcileSourceBucketCmd = &cobra.Command{ Example: ` # Trigger a reconciliation for an existing source flux reconcile source bucket podinfo `, - RunE: reconcileSourceBucketCmdRun, + RunE: reconcileCommand{ + apiType: bucketType, + object: bucketAdapter{&sourcev1.Bucket{}}, + }.run, } func init() { reconcileSourceCmd.AddCommand(reconcileSourceBucketCmd) } -func reconcileSourceBucketCmdRun(cmd *cobra.Command, args []string) error { - if len(args) < 1 { - return fmt.Errorf("source name is required") - } - name := args[0] - - ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout) - defer cancel() - - kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext) - if err != nil { - return err - } - - namespacedName := types.NamespacedName{ - Namespace: rootArgs.namespace, - Name: name, - } - var bucket sourcev1.Bucket - err = kubeClient.Get(ctx, namespacedName, &bucket) - if err != nil { - return err - } - - if bucket.Spec.Suspend { - return fmt.Errorf("resource is suspended") - } - - lastHandledReconcileAt := bucket.Status.LastHandledReconcileAt - logger.Actionf("annotating Bucket source %s in %s namespace", name, rootArgs.namespace) - if err := requestBucketReconciliation(ctx, kubeClient, namespacedName, &bucket); err != nil { - return err - } - logger.Successf("Bucket source annotated") - - logger.Waitingf("waiting for Bucket source reconciliation") - if err := wait.PollImmediate( - rootArgs.pollInterval, rootArgs.timeout, - bucketReconciliationHandled(ctx, kubeClient, namespacedName, &bucket, lastHandledReconcileAt), - ); err != nil { - return err - } - logger.Successf("Bucket source reconciliation completed") - - if apimeta.IsStatusConditionFalse(bucket.Status.Conditions, meta.ReadyCondition) { - return fmt.Errorf("Bucket source reconciliation failed") - } - logger.Successf("fetched revision %s", bucket.Status.Artifact.Revision) - return nil -} - func isBucketReady(ctx context.Context, kubeClient client.Client, namespacedName types.NamespacedName, bucket *sourcev1.Bucket) wait.ConditionFunc { return func() (bool, error) { @@ -126,30 +72,10 @@ func isBucketReady(ctx context.Context, kubeClient client.Client, } } -func bucketReconciliationHandled(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, bucket *sourcev1.Bucket, lastHandledReconcileAt string) wait.ConditionFunc { - return func() (bool, error) { - err := kubeClient.Get(ctx, namespacedName, bucket) - if err != nil { - return false, err - } - return bucket.Status.LastHandledReconcileAt != lastHandledReconcileAt, nil - } +func (obj bucketAdapter) lastHandledReconcileRequest() string { + return obj.Status.GetLastHandledReconcileRequest() } -func requestBucketReconciliation(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, bucket *sourcev1.Bucket) error { - return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) { - if err := kubeClient.Get(ctx, namespacedName, bucket); err != nil { - return err - } - if bucket.Annotations == nil { - bucket.Annotations = map[string]string{ - meta.ReconcileRequestAnnotation: time.Now().Format(time.RFC3339Nano), - } - } else { - bucket.Annotations[meta.ReconcileRequestAnnotation] = time.Now().Format(time.RFC3339Nano) - } - return kubeClient.Update(ctx, bucket) - }) +func (obj bucketAdapter) successMessage() string { + return fmt.Sprintf("fetched revision %s", obj.Status.Artifact.Revision) } diff --git a/cmd/flux/reconcile_source_git.go b/cmd/flux/reconcile_source_git.go index d3185970..c737e67b 100644 --- a/cmd/flux/reconcile_source_git.go +++ b/cmd/flux/reconcile_source_git.go @@ -17,21 +17,9 @@ limitations under the License. package main import ( - "context" "fmt" - "time" - - "github.com/fluxcd/flux2/internal/utils" - "github.com/fluxcd/pkg/apis/meta" - apimeta "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/client-go/util/retry" - "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/spf13/cobra" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" + "github.com/spf13/cobra" ) var reconcileSourceGitCmd = &cobra.Command{ @@ -41,86 +29,20 @@ var reconcileSourceGitCmd = &cobra.Command{ Example: ` # Trigger a git pull for an existing source flux reconcile source git podinfo `, - RunE: reconcileSourceGitCmdRun, + RunE: reconcileCommand{ + apiType: gitRepositoryType, + object: gitRepositoryAdapter{&sourcev1.GitRepository{}}, + }.run, } func init() { reconcileSourceCmd.AddCommand(reconcileSourceGitCmd) } -func reconcileSourceGitCmdRun(cmd *cobra.Command, args []string) error { - if len(args) < 1 { - return fmt.Errorf("source name is required") - } - name := args[0] - - ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout) - defer cancel() - - kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext) - if err != nil { - return err - } - - namespacedName := types.NamespacedName{ - Namespace: rootArgs.namespace, - Name: name, - } - var repository sourcev1.GitRepository - err = kubeClient.Get(ctx, namespacedName, &repository) - if err != nil { - return err - } - - if repository.Spec.Suspend { - return fmt.Errorf("resource is suspended") - } - - logger.Actionf("annotating GitRepository source %s in %s namespace", name, rootArgs.namespace) - if err := requestGitRepositoryReconciliation(ctx, kubeClient, namespacedName, &repository); err != nil { - return err - } - logger.Successf("GitRepository source annotated") - - lastHandledReconcileAt := repository.Status.LastHandledReconcileAt - logger.Waitingf("waiting for GitRepository source reconciliation") - if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - gitRepositoryReconciliationHandled(ctx, kubeClient, namespacedName, &repository, lastHandledReconcileAt)); err != nil { - return err - } - logger.Successf("GitRepository source reconciliation completed") - - if apimeta.IsStatusConditionFalse(repository.Status.Conditions, meta.ReadyCondition) { - return fmt.Errorf("GitRepository source reconciliation failed") - } - logger.Successf("fetched revision %s", repository.Status.Artifact.Revision) - return nil -} - -func gitRepositoryReconciliationHandled(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, repository *sourcev1.GitRepository, lastHandledReconcileAt string) wait.ConditionFunc { - return func() (bool, error) { - err := kubeClient.Get(ctx, namespacedName, repository) - if err != nil { - return false, err - } - return repository.Status.LastHandledReconcileAt != lastHandledReconcileAt, nil - } +func (obj gitRepositoryAdapter) lastHandledReconcileRequest() string { + return obj.Status.GetLastHandledReconcileRequest() } -func requestGitRepositoryReconciliation(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, repository *sourcev1.GitRepository) error { - return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) { - if err := kubeClient.Get(ctx, namespacedName, repository); err != nil { - return err - } - if repository.Annotations == nil { - repository.Annotations = map[string]string{ - meta.ReconcileRequestAnnotation: time.Now().Format(time.RFC3339Nano), - } - } else { - repository.Annotations[meta.ReconcileRequestAnnotation] = time.Now().Format(time.RFC3339Nano) - } - return kubeClient.Update(ctx, repository) - }) +func (obj gitRepositoryAdapter) successMessage() string { + return fmt.Sprintf("fetched revision %s", obj.Status.Artifact.Revision) } diff --git a/cmd/flux/reconcile_source_helm.go b/cmd/flux/reconcile_source_helm.go index 97c9a168..e72d711f 100644 --- a/cmd/flux/reconcile_source_helm.go +++ b/cmd/flux/reconcile_source_helm.go @@ -17,22 +17,9 @@ limitations under the License. package main import ( - "context" "fmt" - "time" - - "github.com/fluxcd/pkg/apis/meta" - "k8s.io/client-go/util/retry" - - "github.com/fluxcd/flux2/internal/utils" - - "github.com/spf13/cobra" - apimeta "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - "sigs.k8s.io/controller-runtime/pkg/client" - sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" + "github.com/spf13/cobra" ) var reconcileSourceHelmCmd = &cobra.Command{ @@ -42,86 +29,20 @@ var reconcileSourceHelmCmd = &cobra.Command{ Example: ` # Trigger a reconciliation for an existing source flux reconcile source helm podinfo `, - RunE: reconcileSourceHelmCmdRun, + RunE: reconcileCommand{ + apiType: helmRepositoryType, + object: helmRepositoryAdapter{&sourcev1.HelmRepository{}}, + }.run, } func init() { reconcileSourceCmd.AddCommand(reconcileSourceHelmCmd) } -func reconcileSourceHelmCmdRun(cmd *cobra.Command, args []string) error { - if len(args) < 1 { - return fmt.Errorf("HelmRepository source name is required") - } - name := args[0] - - ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout) - defer cancel() - - kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext) - if err != nil { - return err - } - - namespacedName := types.NamespacedName{ - Namespace: rootArgs.namespace, - Name: name, - } - var repository sourcev1.HelmRepository - err = kubeClient.Get(ctx, namespacedName, &repository) - if err != nil { - return err - } - - if repository.Spec.Suspend { - return fmt.Errorf("resource is suspended") - } - - logger.Actionf("annotating HelmRepository source %s in %s namespace", name, rootArgs.namespace) - if err := requestHelmRepositoryReconciliation(ctx, kubeClient, namespacedName, &repository); err != nil { - return err - } - logger.Successf("HelmRepository source annotated") - - lastHandledReconcileAt := repository.Status.LastHandledReconcileAt - logger.Waitingf("waiting for HelmRepository source reconciliation") - if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - helmRepositoryReconciliationHandled(ctx, kubeClient, namespacedName, &repository, lastHandledReconcileAt)); err != nil { - return err - } - logger.Successf("HelmRepository source reconciliation completed") - - if apimeta.IsStatusConditionFalse(repository.Status.Conditions, meta.ReadyCondition) { - return fmt.Errorf("HelmRepository source reconciliation failed") - } - logger.Successf("fetched revision %s", repository.Status.Artifact.Revision) - return nil -} - -func helmRepositoryReconciliationHandled(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, repository *sourcev1.HelmRepository, lastHandledReconcileAt string) wait.ConditionFunc { - return func() (bool, error) { - err := kubeClient.Get(ctx, namespacedName, repository) - if err != nil { - return false, err - } - return repository.Status.LastHandledReconcileAt != lastHandledReconcileAt, nil - } +func (obj helmRepositoryAdapter) lastHandledReconcileRequest() string { + return obj.Status.GetLastHandledReconcileRequest() } -func requestHelmRepositoryReconciliation(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, repository *sourcev1.HelmRepository) error { - return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) { - if err := kubeClient.Get(ctx, namespacedName, repository); err != nil { - return err - } - if repository.Annotations == nil { - repository.Annotations = map[string]string{ - meta.ReconcileRequestAnnotation: time.Now().Format(time.RFC3339Nano), - } - } else { - repository.Annotations[meta.ReconcileRequestAnnotation] = time.Now().Format(time.RFC3339Nano) - } - return kubeClient.Update(ctx, repository) - }) +func (obj helmRepositoryAdapter) successMessage() string { + return fmt.Sprintf("fetched revision %s", obj.Status.Artifact.Revision) }