diff --git a/cmd/flux/resume_helmrelease.go b/cmd/flux/resume_helmrelease.go index c9538af3..b9ede2c9 100644 --- a/cmd/flux/resume_helmrelease.go +++ b/cmd/flux/resume_helmrelease.go @@ -17,20 +17,9 @@ limitations under the License. package main import ( - "context" "fmt" - - "github.com/fluxcd/flux2/internal/utils" - "github.com/fluxcd/pkg/apis/meta" - - "github.com/spf13/cobra" - apimeta "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - "sigs.k8s.io/controller-runtime/pkg/client" - helmv2 "github.com/fluxcd/helm-controller/api/v2beta1" + "github.com/spf13/cobra" ) var resumeHrCmd = &cobra.Command{ @@ -42,76 +31,24 @@ finish the apply.`, Example: ` # Resume reconciliation for an existing Helm release flux resume hr podinfo `, - RunE: resumeHrCmdRun, + RunE: resumeCommand{ + apiType: helmReleaseType, + object: helmReleaseAdapter{&helmv2.HelmRelease{}}, + }.run, } func init() { resumeCmd.AddCommand(resumeHrCmd) } -func resumeHrCmdRun(cmd *cobra.Command, args []string) error { - if len(args) < 1 { - return fmt.Errorf("HelmRelease name is required") - } - name := args[0] - - ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout) - defer cancel() - - kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext) - if err != nil { - return err - } - - namespacedName := types.NamespacedName{ - Namespace: rootArgs.namespace, - Name: name, - } - var helmRelease helmv2.HelmRelease - err = kubeClient.Get(ctx, namespacedName, &helmRelease) - if err != nil { - return err - } - - logger.Actionf("resuming HelmRelease %s in %s namespace", name, rootArgs.namespace) - helmRelease.Spec.Suspend = false - if err := kubeClient.Update(ctx, &helmRelease); err != nil { - return err - } - logger.Successf("HelmRelease resumed") - - logger.Waitingf("waiting for HelmRelease reconciliation") - if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - isHelmReleaseResumed(ctx, kubeClient, namespacedName, &helmRelease)); err != nil { - return err - } - logger.Successf("HelmRelease reconciliation completed") - - logger.Successf("applied revision %s", helmRelease.Status.LastAppliedRevision) - return nil +func (obj helmReleaseAdapter) getObservedGeneration() int64 { + return obj.HelmRelease.Status.ObservedGeneration } -func isHelmReleaseResumed(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, helmRelease *helmv2.HelmRelease) wait.ConditionFunc { - return func() (bool, error) { - err := kubeClient.Get(ctx, namespacedName, helmRelease) - if err != nil { - return false, err - } - - // Confirm the state we are observing is for the current generation - if helmRelease.Generation != helmRelease.Status.ObservedGeneration { - return false, err - } +func (obj helmReleaseAdapter) setUnsuspended() { + obj.HelmRelease.Spec.Suspend = false +} - if c := apimeta.FindStatusCondition(helmRelease.Status.Conditions, meta.ReadyCondition); c != nil { - switch c.Status { - case metav1.ConditionTrue: - return true, nil - case metav1.ConditionFalse: - return false, fmt.Errorf(c.Message) - } - } - return false, nil - } +func (obj helmReleaseAdapter) successMessage() string { + return fmt.Sprintf("applied revision %s", obj.Status.LastAppliedRevision) } diff --git a/cmd/flux/resume_kustomization.go b/cmd/flux/resume_kustomization.go index 286a8be6..d7d0d35a 100644 --- a/cmd/flux/resume_kustomization.go +++ b/cmd/flux/resume_kustomization.go @@ -17,19 +17,10 @@ limitations under the License. package main import ( - "context" "fmt" - "github.com/fluxcd/flux2/internal/utils" - "github.com/fluxcd/pkg/apis/meta" - kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1beta1" "github.com/spf13/cobra" - apimeta "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - "sigs.k8s.io/controller-runtime/pkg/client" ) var resumeKsCmd = &cobra.Command{ @@ -41,76 +32,24 @@ finish the apply.`, Example: ` # Resume reconciliation for an existing Kustomization flux resume ks podinfo `, - RunE: resumeKsCmdRun, + RunE: resumeCommand{ + apiType: kustomizationType, + object: kustomizationAdapter{&kustomizev1.Kustomization{}}, + }.run, } func init() { resumeCmd.AddCommand(resumeKsCmd) } -func resumeKsCmdRun(cmd *cobra.Command, args []string) error { - if len(args) < 1 { - return fmt.Errorf("Kustomization name is required") - } - name := args[0] - - ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout) - defer cancel() - - kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext) - if err != nil { - return err - } - - namespacedName := types.NamespacedName{ - Namespace: rootArgs.namespace, - Name: name, - } - var kustomization kustomizev1.Kustomization - err = kubeClient.Get(ctx, namespacedName, &kustomization) - if err != nil { - return err - } - - logger.Actionf("resuming Kustomization %s in %s namespace", name, rootArgs.namespace) - kustomization.Spec.Suspend = false - if err := kubeClient.Update(ctx, &kustomization); err != nil { - return err - } - logger.Successf("Kustomization resumed") - - logger.Waitingf("waiting for Kustomization reconciliation") - if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - isKustomizationResumed(ctx, kubeClient, namespacedName, &kustomization)); err != nil { - return err - } - logger.Successf("Kustomization reconciliation completed") - - logger.Successf("applied revision %s", kustomization.Status.LastAppliedRevision) - return nil +func (obj kustomizationAdapter) getObservedGeneration() int64 { + return obj.Kustomization.Status.ObservedGeneration } -func isKustomizationResumed(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, kustomization *kustomizev1.Kustomization) wait.ConditionFunc { - return func() (bool, error) { - err := kubeClient.Get(ctx, namespacedName, kustomization) - if err != nil { - return false, err - } - - // Confirm the state we are observing is for the current generation - if kustomization.Generation != kustomization.Status.ObservedGeneration { - return false, nil - } +func (obj kustomizationAdapter) setUnsuspended() { + obj.Kustomization.Spec.Suspend = false +} - if c := apimeta.FindStatusCondition(kustomization.Status.Conditions, meta.ReadyCondition); c != nil { - switch c.Status { - case metav1.ConditionTrue: - return true, nil - case metav1.ConditionFalse: - return false, fmt.Errorf(c.Message) - } - } - return false, nil - } +func (obj kustomizationAdapter) successMessage() string { + return fmt.Sprintf("applied revision %s", obj.Status.LastAppliedRevision) } diff --git a/cmd/flux/resume_source_bucket.go b/cmd/flux/resume_source_bucket.go index deba5916..2b094986 100644 --- a/cmd/flux/resume_source_bucket.go +++ b/cmd/flux/resume_source_bucket.go @@ -17,20 +17,8 @@ limitations under the License. package main import ( - "context" - "fmt" - - "github.com/fluxcd/flux2/internal/utils" - "github.com/fluxcd/pkg/apis/meta" - - "github.com/spf13/cobra" - apimeta "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - "sigs.k8s.io/controller-runtime/pkg/client" - sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" + "github.com/spf13/cobra" ) var resumeSourceBucketCmd = &cobra.Command{ @@ -40,76 +28,20 @@ var resumeSourceBucketCmd = &cobra.Command{ Example: ` # Resume reconciliation for an existing Bucket flux resume source bucket podinfo `, - RunE: resumeSourceBucketCmdRun, + RunE: resumeCommand{ + apiType: bucketType, + object: &bucketAdapter{&sourcev1.Bucket{}}, + }.run, } func init() { resumeSourceCmd.AddCommand(resumeSourceBucketCmd) } -func resumeSourceBucketCmdRun(cmd *cobra.Command, args []string) error { - if len(args) < 1 { - return fmt.Errorf("source name is required") - } - name := args[0] - - ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout) - defer cancel() - - kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext) - if err != nil { - return err - } - - namespacedName := types.NamespacedName{ - Namespace: rootArgs.namespace, - Name: name, - } - var bucket sourcev1.Bucket - err = kubeClient.Get(ctx, namespacedName, &bucket) - if err != nil { - return err - } - - logger.Actionf("resuming source %s in %s namespace", name, rootArgs.namespace) - bucket.Spec.Suspend = false - if err := kubeClient.Update(ctx, &bucket); err != nil { - return err - } - logger.Successf("source resumed") - - logger.Waitingf("waiting for Bucket reconciliation") - if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - isBucketResumed(ctx, kubeClient, namespacedName, &bucket)); err != nil { - return err - } - logger.Successf("Bucket reconciliation completed") - - logger.Successf("fetched revision %s", bucket.Status.Artifact.Revision) - return nil +func (obj bucketAdapter) getObservedGeneration() int64 { + return obj.Bucket.Status.ObservedGeneration } -func isBucketResumed(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, bucket *sourcev1.Bucket) wait.ConditionFunc { - return func() (bool, error) { - err := kubeClient.Get(ctx, namespacedName, bucket) - if err != nil { - return false, err - } - - // Confirm the state we are observing is for the current generation - if bucket.Generation != bucket.Status.ObservedGeneration { - return false, nil - } - - if c := apimeta.FindStatusCondition(bucket.Status.Conditions, meta.ReadyCondition); c != nil { - switch c.Status { - case metav1.ConditionTrue: - return true, nil - case metav1.ConditionFalse: - return false, fmt.Errorf(c.Message) - } - } - return false, nil - } +func (obj bucketAdapter) setUnsuspended() { + obj.Bucket.Spec.Suspend = false } diff --git a/cmd/flux/resume_source_chart.go b/cmd/flux/resume_source_chart.go index bcfa465f..8f58fd6e 100644 --- a/cmd/flux/resume_source_chart.go +++ b/cmd/flux/resume_source_chart.go @@ -17,20 +17,10 @@ limitations under the License. package main import ( - "context" "fmt" - "github.com/fluxcd/flux2/internal/utils" - "github.com/fluxcd/pkg/apis/meta" - - "github.com/spf13/cobra" - apimeta "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - "sigs.k8s.io/controller-runtime/pkg/client" - sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" + "github.com/spf13/cobra" ) var resumeSourceHelmChartCmd = &cobra.Command{ @@ -40,76 +30,24 @@ var resumeSourceHelmChartCmd = &cobra.Command{ Example: ` # Resume reconciliation for an existing HelmChart flux resume source chart podinfo `, - RunE: resumeSourceHelmChartCmdRun, + RunE: resumeCommand{ + apiType: helmChartType, + object: &helmChartAdapter{&sourcev1.HelmChart{}}, + }.run, } func init() { resumeSourceCmd.AddCommand(resumeSourceHelmChartCmd) } -func resumeSourceHelmChartCmdRun(cmd *cobra.Command, args []string) error { - if len(args) < 1 { - return fmt.Errorf("source name is required") - } - name := args[0] - - ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout) - defer cancel() - - kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext) - if err != nil { - return err - } - - namespacedName := types.NamespacedName{ - Namespace: rootArgs.namespace, - Name: name, - } - var repository sourcev1.HelmChart - err = kubeClient.Get(ctx, namespacedName, &repository) - if err != nil { - return err - } - - logger.Actionf("resuming source %s in %s namespace", name, rootArgs.namespace) - repository.Spec.Suspend = false - if err := kubeClient.Update(ctx, &repository); err != nil { - return err - } - logger.Successf("source resumed") - - logger.Waitingf("waiting for HelmChart reconciliation") - if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - isHelmChartResumed(ctx, kubeClient, namespacedName, &repository)); err != nil { - return err - } - logger.Successf("HelmChart reconciliation completed") - - logger.Successf("fetched revision %s", repository.Status.Artifact.Revision) - return nil +func (obj helmChartAdapter) getObservedGeneration() int64 { + return obj.HelmChart.Status.ObservedGeneration } -func isHelmChartResumed(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, chart *sourcev1.HelmChart) wait.ConditionFunc { - return func() (bool, error) { - err := kubeClient.Get(ctx, namespacedName, chart) - if err != nil { - return false, err - } - - // Confirm the state we are observing is for the current generation - if chart.Generation != chart.Status.ObservedGeneration { - return false, nil - } +func (obj helmChartAdapter) setUnsuspended() { + obj.HelmChart.Spec.Suspend = false +} - if c := apimeta.FindStatusCondition(chart.Status.Conditions, meta.ReadyCondition); c != nil { - switch c.Status { - case metav1.ConditionTrue: - return true, nil - case metav1.ConditionFalse: - return false, fmt.Errorf(c.Message) - } - } - return false, nil - } +func (obj helmChartAdapter) successMessage() string { + return fmt.Sprintf("fetched revision %s", obj.Status.Artifact.Revision) } diff --git a/cmd/flux/resume_source_git.go b/cmd/flux/resume_source_git.go index 56c7c47f..44ccd58f 100644 --- a/cmd/flux/resume_source_git.go +++ b/cmd/flux/resume_source_git.go @@ -17,20 +17,8 @@ limitations under the License. package main import ( - "context" - "fmt" - - "github.com/fluxcd/flux2/internal/utils" - "github.com/fluxcd/pkg/apis/meta" - - "github.com/spf13/cobra" - apimeta "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - "sigs.k8s.io/controller-runtime/pkg/client" - sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" + "github.com/spf13/cobra" ) var resumeSourceGitCmd = &cobra.Command{ @@ -40,76 +28,20 @@ var resumeSourceGitCmd = &cobra.Command{ Example: ` # Resume reconciliation for an existing GitRepository flux resume source git podinfo `, - RunE: resumeSourceGitCmdRun, + RunE: resumeCommand{ + apiType: gitRepositoryType, + object: gitRepositoryAdapter{&sourcev1.GitRepository{}}, + }.run, } func init() { resumeSourceCmd.AddCommand(resumeSourceGitCmd) } -func resumeSourceGitCmdRun(cmd *cobra.Command, args []string) error { - if len(args) < 1 { - return fmt.Errorf("source name is required") - } - name := args[0] - - ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout) - defer cancel() - - kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext) - if err != nil { - return err - } - - namespacedName := types.NamespacedName{ - Namespace: rootArgs.namespace, - Name: name, - } - var repository sourcev1.GitRepository - err = kubeClient.Get(ctx, namespacedName, &repository) - if err != nil { - return err - } - - logger.Actionf("resuming source %s in %s namespace", name, rootArgs.namespace) - repository.Spec.Suspend = false - if err := kubeClient.Update(ctx, &repository); err != nil { - return err - } - logger.Successf("source resumed") - - logger.Waitingf("waiting for GitRepository reconciliation") - if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - isGitRepositoryResumed(ctx, kubeClient, namespacedName, &repository)); err != nil { - return err - } - logger.Successf("GitRepository reconciliation completed") - - logger.Successf("fetched revision %s", repository.Status.Artifact.Revision) - return nil +func (obj gitRepositoryAdapter) getObservedGeneration() int64 { + return obj.GitRepository.Status.ObservedGeneration } -func isGitRepositoryResumed(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, repository *sourcev1.GitRepository) wait.ConditionFunc { - return func() (bool, error) { - err := kubeClient.Get(ctx, namespacedName, repository) - if err != nil { - return false, err - } - - // Confirm the state we are observing is for the current generation - if repository.Generation != repository.Status.ObservedGeneration { - return false, nil - } - - if c := apimeta.FindStatusCondition(repository.Status.Conditions, meta.ReadyCondition); c != nil { - switch c.Status { - case metav1.ConditionTrue: - return true, nil - case metav1.ConditionFalse: - return false, fmt.Errorf(c.Message) - } - } - return false, nil - } +func (obj gitRepositoryAdapter) setUnsuspended() { + obj.GitRepository.Spec.Suspend = false } diff --git a/cmd/flux/resume_source_helm.go b/cmd/flux/resume_source_helm.go index 97a7a7b1..f266c7f1 100644 --- a/cmd/flux/resume_source_helm.go +++ b/cmd/flux/resume_source_helm.go @@ -17,20 +17,8 @@ limitations under the License. package main import ( - "context" - "fmt" - - "github.com/fluxcd/flux2/internal/utils" - "github.com/fluxcd/pkg/apis/meta" - - "github.com/spf13/cobra" - apimeta "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - "sigs.k8s.io/controller-runtime/pkg/client" - sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" + "github.com/spf13/cobra" ) var resumeSourceHelmCmd = &cobra.Command{ @@ -40,76 +28,20 @@ var resumeSourceHelmCmd = &cobra.Command{ Example: ` # Resume reconciliation for an existing HelmRepository flux resume source helm bitnami `, - RunE: resumeSourceHelmCmdRun, + RunE: resumeCommand{ + apiType: helmRepositoryType, + object: helmRepositoryAdapter{&sourcev1.HelmRepository{}}, + }.run, } func init() { resumeSourceCmd.AddCommand(resumeSourceHelmCmd) } -func resumeSourceHelmCmdRun(cmd *cobra.Command, args []string) error { - if len(args) < 1 { - return fmt.Errorf("source name is required") - } - name := args[0] - - ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout) - defer cancel() - - kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext) - if err != nil { - return err - } - - namespacedName := types.NamespacedName{ - Namespace: rootArgs.namespace, - Name: name, - } - var repository sourcev1.HelmRepository - err = kubeClient.Get(ctx, namespacedName, &repository) - if err != nil { - return err - } - - logger.Actionf("resuming source %s in %s namespace", name, rootArgs.namespace) - repository.Spec.Suspend = false - if err := kubeClient.Update(ctx, &repository); err != nil { - return err - } - logger.Successf("source resumed") - - logger.Waitingf("waiting for HelmRepository reconciliation") - if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - isHelmRepositoryResumed(ctx, kubeClient, namespacedName, &repository)); err != nil { - return err - } - logger.Successf("HelmRepository reconciliation completed") - - logger.Successf("fetched revision %s", repository.Status.Artifact.Revision) - return nil +func (obj helmRepositoryAdapter) getObservedGeneration() int64 { + return obj.HelmRepository.Status.ObservedGeneration } -func isHelmRepositoryResumed(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, repository *sourcev1.HelmRepository) wait.ConditionFunc { - return func() (bool, error) { - err := kubeClient.Get(ctx, namespacedName, repository) - if err != nil { - return false, err - } - - // Confirm the state we are observing is for the current generation - if repository.Generation != repository.Status.ObservedGeneration { - return false, nil - } - - if c := apimeta.FindStatusCondition(repository.Status.Conditions, meta.ReadyCondition); c != nil { - switch c.Status { - case metav1.ConditionTrue: - return true, nil - case metav1.ConditionFalse: - return false, fmt.Errorf(c.Message) - } - } - return false, nil - } +func (obj helmRepositoryAdapter) setUnsuspended() { + obj.HelmRepository.Spec.Suspend = false }