diff --git a/cmd/flux/kustomization_test.go b/cmd/flux/kustomization_test.go index e49aba46..2f6176ac 100644 --- a/cmd/flux/kustomization_test.go +++ b/cmd/flux/kustomization_test.go @@ -61,11 +61,26 @@ func TestKustomizationFromGit(t *testing.T) { "testdata/kustomization/suspend_kustomization_from_git.golden", tmpl, }, + { + "suspend kustomization tkfg foo tkfg bar", + "testdata/kustomization/suspend_kustomization_from_git_multiple_args.golden", + tmpl, + }, + { + "resume kustomization tkfg foo --wait", + "testdata/kustomization/resume_kustomization_from_git_multiple_args_wait.golden", + tmpl, + }, { "resume kustomization tkfg", "testdata/kustomization/resume_kustomization_from_git.golden", tmpl, }, + { + "resume kustomization tkfg tkfg", + "testdata/kustomization/resume_kustomization_from_git_multiple_args.golden", + tmpl, + }, { "delete kustomization tkfg --silent", "testdata/kustomization/delete_kustomization_from_git.golden", diff --git a/cmd/flux/main_test.go b/cmd/flux/main_test.go index dd9a6c28..37398309 100644 --- a/cmd/flux/main_test.go +++ b/cmd/flux/main_test.go @@ -365,6 +365,12 @@ func executeTemplate(content string, templateValues map[string]string) (string, // Run the command and return the captured output. func executeCommand(cmd string) (string, error) { defer resetCmdArgs() + defer func() { + // need to set this explicitly because apparently its value isn't changed + // in subsequent executions which causes tests to fail that rely on the value + // of "Changed". + resumeCmd.PersistentFlags().Lookup("wait").Changed = false + }() args, err := shellwords.Parse(cmd) if err != nil { return "", err diff --git a/cmd/flux/object.go b/cmd/flux/object.go index 26b3a040..681d86e0 100644 --- a/cmd/flux/object.go +++ b/cmd/flux/object.go @@ -47,7 +47,7 @@ type copyable interface { deepCopyClientObject() client.Object } -// listAdapater is the analogue to adapter, but for lists; the +// listAdapter is the analogue to adapter, but for lists; the // controller runtime distinguishes between methods dealing with // objects and lists. type listAdapter interface { diff --git a/cmd/flux/resume.go b/cmd/flux/resume.go index 9c592d6c..bc27c4eb 100644 --- a/cmd/flux/resume.go +++ b/cmd/flux/resume.go @@ -19,6 +19,8 @@ package main import ( "context" "fmt" + "sort" + "sync" "github.com/spf13/cobra" "k8s.io/apimachinery/pkg/types" @@ -59,8 +61,10 @@ type resumable interface { type resumeCommand struct { apiType - object resumable - list listResumable + client client.WithWatch + list listResumable + namespace string + shouldReconcile bool } type listResumable interface { @@ -68,6 +72,11 @@ type listResumable interface { resumeItem(i int) resumable } +type reconcileResponse struct { + resumable + err error +} + func (resume resumeCommand) run(cmd *cobra.Command, args []string) error { if len(args) < 1 && !resumeArgs.all { return fmt.Errorf("%s name is required", resume.humanKind) @@ -80,52 +89,162 @@ func (resume resumeCommand) run(cmd *cobra.Command, args []string) error { if err != nil { return err } + resume.client = kubeClient + resume.namespace = *kubeconfigArgs.Namespace - var listOpts []client.ListOption - listOpts = append(listOpts, client.InNamespace(*kubeconfigArgs.Namespace)) - if len(args) > 0 { - listOpts = append(listOpts, client.MatchingFields{ - "metadata.name": args[0], - }) - } + // require waiting for the object(s) if the user has not provided the --wait flag and gave exactly + // one object to resume. This is necessary to maintain backwards compatibility with prior versions + // of this command. Otherwise just follow the value of the --wait flag (including its default). + resume.shouldReconcile = !resumeCmd.PersistentFlags().Changed("wait") && len(args) == 1 || resumeArgs.wait - err = kubeClient.List(ctx, resume.list.asClientList(), listOpts...) + resumables, err := resume.getPatchedResumables(ctx, args) if err != nil { return err } + var wg sync.WaitGroup + wg.Add(len(resumables)) + + resultChan := make(chan reconcileResponse, len(resumables)) + for _, r := range resumables { + go func(res resumable) { + defer wg.Done() + resultChan <- resume.reconcile(ctx, res) + }(r) + } + + go func() { + defer close(resultChan) + wg.Wait() + }() + + reconcileResps := make([]reconcileResponse, 0, len(resumables)) + for c := range resultChan { + reconcileResps = append(reconcileResps, c) + } + + resume.printMessage(reconcileResps) + + return nil +} + +// getPatchedResumables returns a list of the given resumable objects that have been patched to be resumed. +// If the args slice is empty, it patches all resumable objects in the given namespace. +func (resume *resumeCommand) getPatchedResumables(ctx context.Context, args []string) ([]resumable, error) { + if len(args) < 1 { + objs, err := resume.patch(ctx, []client.ListOption{ + client.InNamespace(resume.namespace), + }) + if err != nil { + return nil, fmt.Errorf("failed patching objects: %w", err) + } + + return objs, nil + } + + var resumables []resumable + processed := make(map[string]struct{}, len(args)) + for _, arg := range args { + if _, has := processed[arg]; has { + continue // skip object that user might have provided more than once + } + processed[arg] = struct{}{} + + objs, err := resume.patch(ctx, []client.ListOption{ + client.InNamespace(resume.namespace), + client.MatchingFields{ + "metadata.name": arg, + }, + }) + if err != nil { + return nil, err + } + + resumables = append(resumables, objs...) + } + + return resumables, nil +} + +// Patches resumable objects by setting their status to unsuspended. +// Returns a slice of resumables that have been patched and any error encountered during patching. +func (resume resumeCommand) patch(ctx context.Context, listOpts []client.ListOption) ([]resumable, error) { + if err := resume.client.List(ctx, resume.list.asClientList(), listOpts...); err != nil { + return nil, err + } + if resume.list.len() == 0 { - logger.Failuref("no %s objects found in %s namespace", resume.kind, *kubeconfigArgs.Namespace) - return nil + logger.Failuref("no %s objects found in %s namespace", resume.kind, resume.namespace) + return nil, nil } + var resumables []resumable + for i := 0; i < resume.list.len(); i++ { - logger.Actionf("resuming %s %s in %s namespace", resume.humanKind, resume.list.resumeItem(i).asClientObject().GetName(), *kubeconfigArgs.Namespace) obj := resume.list.resumeItem(i) + logger.Actionf("resuming %s %s in %s namespace", resume.humanKind, obj.asClientObject().GetName(), resume.namespace) + patch := client.MergeFrom(obj.deepCopyClientObject()) obj.setUnsuspended() - if err := kubeClient.Patch(ctx, obj.asClientObject(), patch); err != nil { - return err + if err := resume.client.Patch(ctx, obj.asClientObject(), patch); err != nil { + return nil, err } + resumables = append(resumables, obj) + logger.Successf("%s resumed", resume.humanKind) + } + + return resumables, nil +} + +// Waits for resumable object to be reconciled and returns the object and any error encountered while waiting. +// Returns an empty reconcileResponse, if shouldReconcile is false. +func (resume resumeCommand) reconcile(ctx context.Context, res resumable) reconcileResponse { + if !resume.shouldReconcile { + return reconcileResponse{} + } - if resumeArgs.wait || !resumeArgs.all { - namespacedName := types.NamespacedName{ - Name: resume.list.resumeItem(i).asClientObject().GetName(), - Namespace: *kubeconfigArgs.Namespace, - } - - logger.Waitingf("waiting for %s reconciliation", resume.kind) - if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - isReady(ctx, kubeClient, namespacedName, resume.list.resumeItem(i))); err != nil { - logger.Failuref(err.Error()) - continue - } - logger.Successf("%s reconciliation completed", resume.kind) - logger.Successf(resume.list.resumeItem(i).successMessage()) + namespacedName := types.NamespacedName{ + Name: res.asClientObject().GetName(), + Namespace: resume.namespace, + } + + logger.Waitingf("waiting for %s reconciliation", resume.kind) + + if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, + isReady(ctx, resume.client, namespacedName, res)); err != nil { + return reconcileResponse{ + resumable: res, + err: err, } } - return nil + return reconcileResponse{ + resumable: res, + err: nil, + } +} + +// Sorts the given reconcileResponses by resumable name and prints the success/error message for each response. +func (resume resumeCommand) printMessage(responses []reconcileResponse) { + sort.Slice(responses, func(i, j int) bool { + r1, r2 := responses[i], responses[j] + if r1.resumable == nil || r2.resumable == nil { + return false + } + return r1.asClientObject().GetName() <= r2.asClientObject().GetName() + }) + + // Print success/error message. + for _, r := range responses { + if r.resumable == nil { + continue + } + if r.err != nil { + logger.Failuref(r.err.Error()) + } + logger.Successf("%s %s reconciliation completed", resume.kind, r.asClientObject().GetName()) + logger.Successf(r.successMessage()) + } } diff --git a/cmd/flux/resume_alert.go b/cmd/flux/resume_alert.go index 5b86fb45..eee79658 100644 --- a/cmd/flux/resume_alert.go +++ b/cmd/flux/resume_alert.go @@ -32,7 +32,6 @@ finish the apply.`, ValidArgsFunction: resourceNamesCompletionFunc(notificationv1.GroupVersion.WithKind(notificationv1.AlertKind)), RunE: resumeCommand{ apiType: alertType, - object: alertAdapter{¬ificationv1.Alert{}}, list: &alertListAdapter{¬ificationv1.AlertList{}}, }.run, } diff --git a/cmd/flux/resume_helmrelease.go b/cmd/flux/resume_helmrelease.go index 0de4fb90..aadf7bd0 100644 --- a/cmd/flux/resume_helmrelease.go +++ b/cmd/flux/resume_helmrelease.go @@ -35,7 +35,6 @@ finish the apply.`, ValidArgsFunction: resourceNamesCompletionFunc(helmv2.GroupVersion.WithKind(helmv2.HelmReleaseKind)), RunE: resumeCommand{ apiType: helmReleaseType, - object: helmReleaseAdapter{&helmv2.HelmRelease{}}, list: helmReleaseListAdapter{&helmv2.HelmReleaseList{}}, }.run, } diff --git a/cmd/flux/resume_image_repository.go b/cmd/flux/resume_image_repository.go index a5e69c26..44bae7e0 100644 --- a/cmd/flux/resume_image_repository.go +++ b/cmd/flux/resume_image_repository.go @@ -31,7 +31,6 @@ var resumeImageRepositoryCmd = &cobra.Command{ ValidArgsFunction: resourceNamesCompletionFunc(imagev1.GroupVersion.WithKind(imagev1.ImageRepositoryKind)), RunE: resumeCommand{ apiType: imageRepositoryType, - object: imageRepositoryAdapter{&imagev1.ImageRepository{}}, list: imageRepositoryListAdapter{&imagev1.ImageRepositoryList{}}, }.run, } diff --git a/cmd/flux/resume_image_updateauto.go b/cmd/flux/resume_image_updateauto.go index 8cc40bfc..f24a4833 100644 --- a/cmd/flux/resume_image_updateauto.go +++ b/cmd/flux/resume_image_updateauto.go @@ -31,7 +31,6 @@ var resumeImageUpdateCmd = &cobra.Command{ ValidArgsFunction: resourceNamesCompletionFunc(autov1.GroupVersion.WithKind(autov1.ImageUpdateAutomationKind)), RunE: resumeCommand{ apiType: imageUpdateAutomationType, - object: imageUpdateAutomationAdapter{&autov1.ImageUpdateAutomation{}}, list: imageUpdateAutomationListAdapter{&autov1.ImageUpdateAutomationList{}}, }.run, } diff --git a/cmd/flux/resume_kustomization.go b/cmd/flux/resume_kustomization.go index fb8da244..a7cc07e2 100644 --- a/cmd/flux/resume_kustomization.go +++ b/cmd/flux/resume_kustomization.go @@ -35,7 +35,6 @@ finish the apply.`, ValidArgsFunction: resourceNamesCompletionFunc(kustomizev1.GroupVersion.WithKind(kustomizev1.KustomizationKind)), RunE: resumeCommand{ apiType: kustomizationType, - object: kustomizationAdapter{&kustomizev1.Kustomization{}}, list: kustomizationListAdapter{&kustomizev1.KustomizationList{}}, }.run, } diff --git a/cmd/flux/resume_receiver.go b/cmd/flux/resume_receiver.go index eef0d1e0..e8c3fefa 100644 --- a/cmd/flux/resume_receiver.go +++ b/cmd/flux/resume_receiver.go @@ -32,7 +32,6 @@ finish the apply.`, ValidArgsFunction: resourceNamesCompletionFunc(notificationv1.GroupVersion.WithKind(notificationv1.ReceiverKind)), RunE: resumeCommand{ apiType: receiverType, - object: receiverAdapter{¬ificationv1.Receiver{}}, list: receiverListAdapter{¬ificationv1.ReceiverList{}}, }.run, } diff --git a/cmd/flux/resume_source_bucket.go b/cmd/flux/resume_source_bucket.go index 4fd32bc2..51188cfe 100644 --- a/cmd/flux/resume_source_bucket.go +++ b/cmd/flux/resume_source_bucket.go @@ -31,7 +31,6 @@ var resumeSourceBucketCmd = &cobra.Command{ ValidArgsFunction: resourceNamesCompletionFunc(sourcev1.GroupVersion.WithKind(sourcev1.BucketKind)), RunE: resumeCommand{ apiType: bucketType, - object: bucketAdapter{&sourcev1.Bucket{}}, list: bucketListAdapter{&sourcev1.BucketList{}}, }.run, } diff --git a/cmd/flux/resume_source_chart.go b/cmd/flux/resume_source_chart.go index bbe895c1..2fe50f9d 100644 --- a/cmd/flux/resume_source_chart.go +++ b/cmd/flux/resume_source_chart.go @@ -33,7 +33,6 @@ var resumeSourceHelmChartCmd = &cobra.Command{ ValidArgsFunction: resourceNamesCompletionFunc(sourcev1.GroupVersion.WithKind(sourcev1.HelmChartKind)), RunE: resumeCommand{ apiType: helmChartType, - object: &helmChartAdapter{&sourcev1.HelmChart{}}, list: &helmChartListAdapter{&sourcev1.HelmChartList{}}, }.run, } diff --git a/cmd/flux/resume_source_git.go b/cmd/flux/resume_source_git.go index 00655285..6918eec0 100644 --- a/cmd/flux/resume_source_git.go +++ b/cmd/flux/resume_source_git.go @@ -31,7 +31,6 @@ var resumeSourceGitCmd = &cobra.Command{ ValidArgsFunction: resourceNamesCompletionFunc(sourcev1.GroupVersion.WithKind(sourcev1.GitRepositoryKind)), RunE: resumeCommand{ apiType: gitRepositoryType, - object: gitRepositoryAdapter{&sourcev1.GitRepository{}}, list: gitRepositoryListAdapter{&sourcev1.GitRepositoryList{}}, }.run, } diff --git a/cmd/flux/resume_source_helm.go b/cmd/flux/resume_source_helm.go index 6e80895f..65670fbc 100644 --- a/cmd/flux/resume_source_helm.go +++ b/cmd/flux/resume_source_helm.go @@ -31,7 +31,6 @@ var resumeSourceHelmCmd = &cobra.Command{ ValidArgsFunction: resourceNamesCompletionFunc(sourcev1.GroupVersion.WithKind(sourcev1.HelmRepositoryKind)), RunE: resumeCommand{ apiType: helmRepositoryType, - object: helmRepositoryAdapter{&sourcev1.HelmRepository{}}, list: helmRepositoryListAdapter{&sourcev1.HelmRepositoryList{}}, }.run, } diff --git a/cmd/flux/resume_source_oci.go b/cmd/flux/resume_source_oci.go index 3c121c47..979d30bd 100644 --- a/cmd/flux/resume_source_oci.go +++ b/cmd/flux/resume_source_oci.go @@ -31,7 +31,6 @@ var resumeSourceOCIRepositoryCmd = &cobra.Command{ ValidArgsFunction: resourceNamesCompletionFunc(sourcev1.GroupVersion.WithKind(sourcev1.OCIRepositoryKind)), RunE: resumeCommand{ apiType: ociRepositoryType, - object: ociRepositoryAdapter{&sourcev1.OCIRepository{}}, list: ociRepositoryListAdapter{&sourcev1.OCIRepositoryList{}}, }.run, } diff --git a/cmd/flux/suspend.go b/cmd/flux/suspend.go index a7abe356..e0023c7e 100644 --- a/cmd/flux/suspend.go +++ b/cmd/flux/suspend.go @@ -18,6 +18,7 @@ package main import ( "context" + "errors" "fmt" "github.com/spf13/cobra" @@ -75,22 +76,53 @@ func (suspend suspendCommand) run(cmd *cobra.Command, args []string) error { return err } - var listOpts []client.ListOption - listOpts = append(listOpts, client.InNamespace(*kubeconfigArgs.Namespace)) - if len(args) > 0 { - listOpts = append(listOpts, client.MatchingFields{ - "metadata.name": args[0], - }) + if len(args) < 1 && suspendArgs.all { + listOpts := []client.ListOption{ + client.InNamespace(*kubeconfigArgs.Namespace), + } + + if err := suspend.patch(ctx, kubeClient, listOpts); err != nil { + return err + } + + return nil } - err = kubeClient.List(ctx, suspend.list.asClientList(), listOpts...) - if err != nil { + processed := make(map[string]struct{}, len(args)) + for _, arg := range args { + if _, has := processed[arg]; has { + continue // skip object that user might have provided more than once + } + processed[arg] = struct{}{} + + listOpts := []client.ListOption{ + client.InNamespace(*kubeconfigArgs.Namespace), + client.MatchingFields{ + "metadata.name": arg, + }, + } + + if err := suspend.patch(ctx, kubeClient, listOpts); err != nil { + if err == ErrNoObjectsFound { + logger.Failuref("%s %s not found in %s namespace", suspend.kind, arg, *kubeconfigArgs.Namespace) + } else { + logger.Failuref("failed suspending %s %s in %s namespace: %s", suspend.kind, arg, *kubeconfigArgs.Namespace, err.Error()) + } + } + } + + return nil +} + +var ErrNoObjectsFound = errors.New("no objects found") + +func (suspend suspendCommand) patch(ctx context.Context, kubeClient client.WithWatch, listOpts []client.ListOption) error { + if err := kubeClient.List(ctx, suspend.list.asClientList(), listOpts...); err != nil { return err } if suspend.list.len() == 0 { - logger.Failuref("no %s objects found in %s namespace", suspend.kind, *kubeconfigArgs.Namespace) - return nil + return ErrNoObjectsFound } for i := 0; i < suspend.list.len(); i++ { @@ -102,8 +134,8 @@ func (suspend suspendCommand) run(cmd *cobra.Command, args []string) error { if err := kubeClient.Patch(ctx, obj.asClientObject(), patch); err != nil { return err } - logger.Successf("%s suspended", suspend.humanKind) + logger.Successf("%s suspended", suspend.humanKind) } return nil diff --git a/cmd/flux/testdata/helmrelease/resume_helmrelease_from_git.golden b/cmd/flux/testdata/helmrelease/resume_helmrelease_from_git.golden index 4369d16a..0ceaf0d4 100644 --- a/cmd/flux/testdata/helmrelease/resume_helmrelease_from_git.golden +++ b/cmd/flux/testdata/helmrelease/resume_helmrelease_from_git.golden @@ -1,5 +1,5 @@ ► resuming helmrelease thrfg in {{ .ns }} namespace ✔ helmrelease resumed ◎ waiting for HelmRelease reconciliation -✔ HelmRelease reconciliation completed +✔ HelmRelease thrfg reconciliation completed ✔ applied revision 6.3.5 diff --git a/cmd/flux/testdata/kustomization/resume_kustomization_from_git.golden b/cmd/flux/testdata/kustomization/resume_kustomization_from_git.golden index 93d8a012..f8492110 100644 --- a/cmd/flux/testdata/kustomization/resume_kustomization_from_git.golden +++ b/cmd/flux/testdata/kustomization/resume_kustomization_from_git.golden @@ -1,5 +1,5 @@ ► resuming kustomization tkfg in {{ .ns }} namespace ✔ kustomization resumed ◎ waiting for Kustomization reconciliation -✔ Kustomization reconciliation completed +✔ Kustomization tkfg reconciliation completed ✔ applied revision 6.3.5@sha1:67e2c98a60dc92283531412a9e604dd4bae005a9 diff --git a/cmd/flux/testdata/kustomization/resume_kustomization_from_git_multiple_args.golden b/cmd/flux/testdata/kustomization/resume_kustomization_from_git_multiple_args.golden new file mode 100644 index 00000000..318f29f2 --- /dev/null +++ b/cmd/flux/testdata/kustomization/resume_kustomization_from_git_multiple_args.golden @@ -0,0 +1,2 @@ +► resuming kustomization tkfg in {{ .ns }} namespace +✔ kustomization resumed diff --git a/cmd/flux/testdata/kustomization/resume_kustomization_from_git_multiple_args_wait.golden b/cmd/flux/testdata/kustomization/resume_kustomization_from_git_multiple_args_wait.golden new file mode 100644 index 00000000..e0775140 --- /dev/null +++ b/cmd/flux/testdata/kustomization/resume_kustomization_from_git_multiple_args_wait.golden @@ -0,0 +1,6 @@ +► resuming kustomization tkfg in {{ .ns }} namespace +✔ kustomization resumed +✗ no Kustomization objects found in {{ .ns }} namespace +◎ waiting for Kustomization reconciliation +✔ Kustomization tkfg reconciliation completed +✔ applied revision 6.3.5@sha1:67e2c98a60dc92283531412a9e604dd4bae005a9 diff --git a/cmd/flux/testdata/kustomization/suspend_kustomization_from_git_multiple_args.golden b/cmd/flux/testdata/kustomization/suspend_kustomization_from_git_multiple_args.golden new file mode 100644 index 00000000..42abd122 --- /dev/null +++ b/cmd/flux/testdata/kustomization/suspend_kustomization_from_git_multiple_args.golden @@ -0,0 +1,4 @@ +► suspending kustomization tkfg in {{ .ns }} namespace +✔ kustomization suspended +✗ Kustomization foo not found in {{ .ns }} namespace +✗ Kustomization bar not found in {{ .ns }} namespace diff --git a/cmd/flux/testdata/oci/resume_oci.golden b/cmd/flux/testdata/oci/resume_oci.golden index 00e628d5..34a0deba 100644 --- a/cmd/flux/testdata/oci/resume_oci.golden +++ b/cmd/flux/testdata/oci/resume_oci.golden @@ -1,5 +1,5 @@ ► resuming source oci thrfg in {{ .ns }} namespace ✔ source oci resumed ◎ waiting for OCIRepository reconciliation -✔ OCIRepository reconciliation completed +✔ OCIRepository thrfg reconciliation completed ✔ fetched revision 6.3.5@sha256:6c959c51ccbb952e5fe4737563338a0aaf975675dcf812912cf09e5463181871