refactor resume command
Signed-off-by: Somtochi Onyekwere <somtochionyekwere@gmail.com>
This commit is contained in:
@@ -17,20 +17,9 @@ limitations under the License.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/fluxcd/flux2/internal/utils"
|
||||
"github.com/fluxcd/pkg/apis/meta"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
helmv2 "github.com/fluxcd/helm-controller/api/v2beta1"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var resumeHrCmd = &cobra.Command{
|
||||
@@ -42,76 +31,24 @@ finish the apply.`,
|
||||
Example: ` # Resume reconciliation for an existing Helm release
|
||||
flux resume hr podinfo
|
||||
`,
|
||||
RunE: resumeHrCmdRun,
|
||||
RunE: resumeCommand{
|
||||
apiType: helmReleaseType,
|
||||
object: helmReleaseAdapter{&helmv2.HelmRelease{}},
|
||||
}.run,
|
||||
}
|
||||
|
||||
func init() {
|
||||
resumeCmd.AddCommand(resumeHrCmd)
|
||||
}
|
||||
|
||||
func resumeHrCmdRun(cmd *cobra.Command, args []string) error {
|
||||
if len(args) < 1 {
|
||||
return fmt.Errorf("HelmRelease name is required")
|
||||
}
|
||||
name := args[0]
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout)
|
||||
defer cancel()
|
||||
|
||||
kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
namespacedName := types.NamespacedName{
|
||||
Namespace: rootArgs.namespace,
|
||||
Name: name,
|
||||
}
|
||||
var helmRelease helmv2.HelmRelease
|
||||
err = kubeClient.Get(ctx, namespacedName, &helmRelease)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Actionf("resuming HelmRelease %s in %s namespace", name, rootArgs.namespace)
|
||||
helmRelease.Spec.Suspend = false
|
||||
if err := kubeClient.Update(ctx, &helmRelease); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Successf("HelmRelease resumed")
|
||||
|
||||
logger.Waitingf("waiting for HelmRelease reconciliation")
|
||||
if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout,
|
||||
isHelmReleaseResumed(ctx, kubeClient, namespacedName, &helmRelease)); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Successf("HelmRelease reconciliation completed")
|
||||
|
||||
logger.Successf("applied revision %s", helmRelease.Status.LastAppliedRevision)
|
||||
return nil
|
||||
func (obj helmReleaseAdapter) getObservedGeneration() int64 {
|
||||
return obj.HelmRelease.Status.ObservedGeneration
|
||||
}
|
||||
|
||||
func isHelmReleaseResumed(ctx context.Context, kubeClient client.Client,
|
||||
namespacedName types.NamespacedName, helmRelease *helmv2.HelmRelease) wait.ConditionFunc {
|
||||
return func() (bool, error) {
|
||||
err := kubeClient.Get(ctx, namespacedName, helmRelease)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Confirm the state we are observing is for the current generation
|
||||
if helmRelease.Generation != helmRelease.Status.ObservedGeneration {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if c := apimeta.FindStatusCondition(helmRelease.Status.Conditions, meta.ReadyCondition); c != nil {
|
||||
switch c.Status {
|
||||
case metav1.ConditionTrue:
|
||||
return true, nil
|
||||
case metav1.ConditionFalse:
|
||||
return false, fmt.Errorf(c.Message)
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
func (obj helmReleaseAdapter) setUnsuspended() {
|
||||
obj.HelmRelease.Spec.Suspend = false
|
||||
}
|
||||
|
||||
func (obj helmReleaseAdapter) successMessage() string {
|
||||
return fmt.Sprintf("applied revision %s", obj.Status.LastAppliedRevision)
|
||||
}
|
||||
|
||||
@@ -17,19 +17,10 @@ limitations under the License.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/fluxcd/flux2/internal/utils"
|
||||
"github.com/fluxcd/pkg/apis/meta"
|
||||
|
||||
kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1beta1"
|
||||
"github.com/spf13/cobra"
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
)
|
||||
|
||||
var resumeKsCmd = &cobra.Command{
|
||||
@@ -41,76 +32,24 @@ finish the apply.`,
|
||||
Example: ` # Resume reconciliation for an existing Kustomization
|
||||
flux resume ks podinfo
|
||||
`,
|
||||
RunE: resumeKsCmdRun,
|
||||
RunE: resumeCommand{
|
||||
apiType: kustomizationType,
|
||||
object: kustomizationAdapter{&kustomizev1.Kustomization{}},
|
||||
}.run,
|
||||
}
|
||||
|
||||
func init() {
|
||||
resumeCmd.AddCommand(resumeKsCmd)
|
||||
}
|
||||
|
||||
func resumeKsCmdRun(cmd *cobra.Command, args []string) error {
|
||||
if len(args) < 1 {
|
||||
return fmt.Errorf("Kustomization name is required")
|
||||
}
|
||||
name := args[0]
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout)
|
||||
defer cancel()
|
||||
|
||||
kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
namespacedName := types.NamespacedName{
|
||||
Namespace: rootArgs.namespace,
|
||||
Name: name,
|
||||
}
|
||||
var kustomization kustomizev1.Kustomization
|
||||
err = kubeClient.Get(ctx, namespacedName, &kustomization)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Actionf("resuming Kustomization %s in %s namespace", name, rootArgs.namespace)
|
||||
kustomization.Spec.Suspend = false
|
||||
if err := kubeClient.Update(ctx, &kustomization); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Successf("Kustomization resumed")
|
||||
|
||||
logger.Waitingf("waiting for Kustomization reconciliation")
|
||||
if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout,
|
||||
isKustomizationResumed(ctx, kubeClient, namespacedName, &kustomization)); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Successf("Kustomization reconciliation completed")
|
||||
|
||||
logger.Successf("applied revision %s", kustomization.Status.LastAppliedRevision)
|
||||
return nil
|
||||
func (obj kustomizationAdapter) getObservedGeneration() int64 {
|
||||
return obj.Kustomization.Status.ObservedGeneration
|
||||
}
|
||||
|
||||
func isKustomizationResumed(ctx context.Context, kubeClient client.Client,
|
||||
namespacedName types.NamespacedName, kustomization *kustomizev1.Kustomization) wait.ConditionFunc {
|
||||
return func() (bool, error) {
|
||||
err := kubeClient.Get(ctx, namespacedName, kustomization)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Confirm the state we are observing is for the current generation
|
||||
if kustomization.Generation != kustomization.Status.ObservedGeneration {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if c := apimeta.FindStatusCondition(kustomization.Status.Conditions, meta.ReadyCondition); c != nil {
|
||||
switch c.Status {
|
||||
case metav1.ConditionTrue:
|
||||
return true, nil
|
||||
case metav1.ConditionFalse:
|
||||
return false, fmt.Errorf(c.Message)
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
func (obj kustomizationAdapter) setUnsuspended() {
|
||||
obj.Kustomization.Spec.Suspend = false
|
||||
}
|
||||
|
||||
func (obj kustomizationAdapter) successMessage() string {
|
||||
return fmt.Sprintf("applied revision %s", obj.Status.LastAppliedRevision)
|
||||
}
|
||||
|
||||
@@ -17,20 +17,8 @@ limitations under the License.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/fluxcd/flux2/internal/utils"
|
||||
"github.com/fluxcd/pkg/apis/meta"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var resumeSourceBucketCmd = &cobra.Command{
|
||||
@@ -40,76 +28,20 @@ var resumeSourceBucketCmd = &cobra.Command{
|
||||
Example: ` # Resume reconciliation for an existing Bucket
|
||||
flux resume source bucket podinfo
|
||||
`,
|
||||
RunE: resumeSourceBucketCmdRun,
|
||||
RunE: resumeCommand{
|
||||
apiType: bucketType,
|
||||
object: &bucketAdapter{&sourcev1.Bucket{}},
|
||||
}.run,
|
||||
}
|
||||
|
||||
func init() {
|
||||
resumeSourceCmd.AddCommand(resumeSourceBucketCmd)
|
||||
}
|
||||
|
||||
func resumeSourceBucketCmdRun(cmd *cobra.Command, args []string) error {
|
||||
if len(args) < 1 {
|
||||
return fmt.Errorf("source name is required")
|
||||
}
|
||||
name := args[0]
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout)
|
||||
defer cancel()
|
||||
|
||||
kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
namespacedName := types.NamespacedName{
|
||||
Namespace: rootArgs.namespace,
|
||||
Name: name,
|
||||
}
|
||||
var bucket sourcev1.Bucket
|
||||
err = kubeClient.Get(ctx, namespacedName, &bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Actionf("resuming source %s in %s namespace", name, rootArgs.namespace)
|
||||
bucket.Spec.Suspend = false
|
||||
if err := kubeClient.Update(ctx, &bucket); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Successf("source resumed")
|
||||
|
||||
logger.Waitingf("waiting for Bucket reconciliation")
|
||||
if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout,
|
||||
isBucketResumed(ctx, kubeClient, namespacedName, &bucket)); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Successf("Bucket reconciliation completed")
|
||||
|
||||
logger.Successf("fetched revision %s", bucket.Status.Artifact.Revision)
|
||||
return nil
|
||||
func (obj bucketAdapter) getObservedGeneration() int64 {
|
||||
return obj.Bucket.Status.ObservedGeneration
|
||||
}
|
||||
|
||||
func isBucketResumed(ctx context.Context, kubeClient client.Client,
|
||||
namespacedName types.NamespacedName, bucket *sourcev1.Bucket) wait.ConditionFunc {
|
||||
return func() (bool, error) {
|
||||
err := kubeClient.Get(ctx, namespacedName, bucket)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Confirm the state we are observing is for the current generation
|
||||
if bucket.Generation != bucket.Status.ObservedGeneration {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if c := apimeta.FindStatusCondition(bucket.Status.Conditions, meta.ReadyCondition); c != nil {
|
||||
switch c.Status {
|
||||
case metav1.ConditionTrue:
|
||||
return true, nil
|
||||
case metav1.ConditionFalse:
|
||||
return false, fmt.Errorf(c.Message)
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
func (obj bucketAdapter) setUnsuspended() {
|
||||
obj.Bucket.Spec.Suspend = false
|
||||
}
|
||||
|
||||
@@ -17,20 +17,10 @@ limitations under the License.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/fluxcd/flux2/internal/utils"
|
||||
"github.com/fluxcd/pkg/apis/meta"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var resumeSourceHelmChartCmd = &cobra.Command{
|
||||
@@ -40,76 +30,24 @@ var resumeSourceHelmChartCmd = &cobra.Command{
|
||||
Example: ` # Resume reconciliation for an existing HelmChart
|
||||
flux resume source chart podinfo
|
||||
`,
|
||||
RunE: resumeSourceHelmChartCmdRun,
|
||||
RunE: resumeCommand{
|
||||
apiType: helmChartType,
|
||||
object: &helmChartAdapter{&sourcev1.HelmChart{}},
|
||||
}.run,
|
||||
}
|
||||
|
||||
func init() {
|
||||
resumeSourceCmd.AddCommand(resumeSourceHelmChartCmd)
|
||||
}
|
||||
|
||||
func resumeSourceHelmChartCmdRun(cmd *cobra.Command, args []string) error {
|
||||
if len(args) < 1 {
|
||||
return fmt.Errorf("source name is required")
|
||||
}
|
||||
name := args[0]
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout)
|
||||
defer cancel()
|
||||
|
||||
kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
namespacedName := types.NamespacedName{
|
||||
Namespace: rootArgs.namespace,
|
||||
Name: name,
|
||||
}
|
||||
var repository sourcev1.HelmChart
|
||||
err = kubeClient.Get(ctx, namespacedName, &repository)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Actionf("resuming source %s in %s namespace", name, rootArgs.namespace)
|
||||
repository.Spec.Suspend = false
|
||||
if err := kubeClient.Update(ctx, &repository); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Successf("source resumed")
|
||||
|
||||
logger.Waitingf("waiting for HelmChart reconciliation")
|
||||
if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout,
|
||||
isHelmChartResumed(ctx, kubeClient, namespacedName, &repository)); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Successf("HelmChart reconciliation completed")
|
||||
|
||||
logger.Successf("fetched revision %s", repository.Status.Artifact.Revision)
|
||||
return nil
|
||||
func (obj helmChartAdapter) getObservedGeneration() int64 {
|
||||
return obj.HelmChart.Status.ObservedGeneration
|
||||
}
|
||||
|
||||
func isHelmChartResumed(ctx context.Context, kubeClient client.Client,
|
||||
namespacedName types.NamespacedName, chart *sourcev1.HelmChart) wait.ConditionFunc {
|
||||
return func() (bool, error) {
|
||||
err := kubeClient.Get(ctx, namespacedName, chart)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Confirm the state we are observing is for the current generation
|
||||
if chart.Generation != chart.Status.ObservedGeneration {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if c := apimeta.FindStatusCondition(chart.Status.Conditions, meta.ReadyCondition); c != nil {
|
||||
switch c.Status {
|
||||
case metav1.ConditionTrue:
|
||||
return true, nil
|
||||
case metav1.ConditionFalse:
|
||||
return false, fmt.Errorf(c.Message)
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
func (obj helmChartAdapter) setUnsuspended() {
|
||||
obj.HelmChart.Spec.Suspend = false
|
||||
}
|
||||
|
||||
func (obj helmChartAdapter) successMessage() string {
|
||||
return fmt.Sprintf("fetched revision %s", obj.Status.Artifact.Revision)
|
||||
}
|
||||
|
||||
@@ -17,20 +17,8 @@ limitations under the License.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/fluxcd/flux2/internal/utils"
|
||||
"github.com/fluxcd/pkg/apis/meta"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var resumeSourceGitCmd = &cobra.Command{
|
||||
@@ -40,76 +28,20 @@ var resumeSourceGitCmd = &cobra.Command{
|
||||
Example: ` # Resume reconciliation for an existing GitRepository
|
||||
flux resume source git podinfo
|
||||
`,
|
||||
RunE: resumeSourceGitCmdRun,
|
||||
RunE: resumeCommand{
|
||||
apiType: gitRepositoryType,
|
||||
object: gitRepositoryAdapter{&sourcev1.GitRepository{}},
|
||||
}.run,
|
||||
}
|
||||
|
||||
func init() {
|
||||
resumeSourceCmd.AddCommand(resumeSourceGitCmd)
|
||||
}
|
||||
|
||||
func resumeSourceGitCmdRun(cmd *cobra.Command, args []string) error {
|
||||
if len(args) < 1 {
|
||||
return fmt.Errorf("source name is required")
|
||||
}
|
||||
name := args[0]
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout)
|
||||
defer cancel()
|
||||
|
||||
kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
namespacedName := types.NamespacedName{
|
||||
Namespace: rootArgs.namespace,
|
||||
Name: name,
|
||||
}
|
||||
var repository sourcev1.GitRepository
|
||||
err = kubeClient.Get(ctx, namespacedName, &repository)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Actionf("resuming source %s in %s namespace", name, rootArgs.namespace)
|
||||
repository.Spec.Suspend = false
|
||||
if err := kubeClient.Update(ctx, &repository); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Successf("source resumed")
|
||||
|
||||
logger.Waitingf("waiting for GitRepository reconciliation")
|
||||
if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout,
|
||||
isGitRepositoryResumed(ctx, kubeClient, namespacedName, &repository)); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Successf("GitRepository reconciliation completed")
|
||||
|
||||
logger.Successf("fetched revision %s", repository.Status.Artifact.Revision)
|
||||
return nil
|
||||
func (obj gitRepositoryAdapter) getObservedGeneration() int64 {
|
||||
return obj.GitRepository.Status.ObservedGeneration
|
||||
}
|
||||
|
||||
func isGitRepositoryResumed(ctx context.Context, kubeClient client.Client,
|
||||
namespacedName types.NamespacedName, repository *sourcev1.GitRepository) wait.ConditionFunc {
|
||||
return func() (bool, error) {
|
||||
err := kubeClient.Get(ctx, namespacedName, repository)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Confirm the state we are observing is for the current generation
|
||||
if repository.Generation != repository.Status.ObservedGeneration {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if c := apimeta.FindStatusCondition(repository.Status.Conditions, meta.ReadyCondition); c != nil {
|
||||
switch c.Status {
|
||||
case metav1.ConditionTrue:
|
||||
return true, nil
|
||||
case metav1.ConditionFalse:
|
||||
return false, fmt.Errorf(c.Message)
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
func (obj gitRepositoryAdapter) setUnsuspended() {
|
||||
obj.GitRepository.Spec.Suspend = false
|
||||
}
|
||||
|
||||
@@ -17,20 +17,8 @@ limitations under the License.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/fluxcd/flux2/internal/utils"
|
||||
"github.com/fluxcd/pkg/apis/meta"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var resumeSourceHelmCmd = &cobra.Command{
|
||||
@@ -40,76 +28,20 @@ var resumeSourceHelmCmd = &cobra.Command{
|
||||
Example: ` # Resume reconciliation for an existing HelmRepository
|
||||
flux resume source helm bitnami
|
||||
`,
|
||||
RunE: resumeSourceHelmCmdRun,
|
||||
RunE: resumeCommand{
|
||||
apiType: helmRepositoryType,
|
||||
object: helmRepositoryAdapter{&sourcev1.HelmRepository{}},
|
||||
}.run,
|
||||
}
|
||||
|
||||
func init() {
|
||||
resumeSourceCmd.AddCommand(resumeSourceHelmCmd)
|
||||
}
|
||||
|
||||
func resumeSourceHelmCmdRun(cmd *cobra.Command, args []string) error {
|
||||
if len(args) < 1 {
|
||||
return fmt.Errorf("source name is required")
|
||||
}
|
||||
name := args[0]
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout)
|
||||
defer cancel()
|
||||
|
||||
kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
namespacedName := types.NamespacedName{
|
||||
Namespace: rootArgs.namespace,
|
||||
Name: name,
|
||||
}
|
||||
var repository sourcev1.HelmRepository
|
||||
err = kubeClient.Get(ctx, namespacedName, &repository)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Actionf("resuming source %s in %s namespace", name, rootArgs.namespace)
|
||||
repository.Spec.Suspend = false
|
||||
if err := kubeClient.Update(ctx, &repository); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Successf("source resumed")
|
||||
|
||||
logger.Waitingf("waiting for HelmRepository reconciliation")
|
||||
if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout,
|
||||
isHelmRepositoryResumed(ctx, kubeClient, namespacedName, &repository)); err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Successf("HelmRepository reconciliation completed")
|
||||
|
||||
logger.Successf("fetched revision %s", repository.Status.Artifact.Revision)
|
||||
return nil
|
||||
func (obj helmRepositoryAdapter) getObservedGeneration() int64 {
|
||||
return obj.HelmRepository.Status.ObservedGeneration
|
||||
}
|
||||
|
||||
func isHelmRepositoryResumed(ctx context.Context, kubeClient client.Client,
|
||||
namespacedName types.NamespacedName, repository *sourcev1.HelmRepository) wait.ConditionFunc {
|
||||
return func() (bool, error) {
|
||||
err := kubeClient.Get(ctx, namespacedName, repository)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Confirm the state we are observing is for the current generation
|
||||
if repository.Generation != repository.Status.ObservedGeneration {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if c := apimeta.FindStatusCondition(repository.Status.Conditions, meta.ReadyCondition); c != nil {
|
||||
switch c.Status {
|
||||
case metav1.ConditionTrue:
|
||||
return true, nil
|
||||
case metav1.ConditionFalse:
|
||||
return false, fmt.Errorf(c.Message)
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
func (obj helmRepositoryAdapter) setUnsuspended() {
|
||||
obj.HelmRepository.Spec.Suspend = false
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user