diff --git a/cmd/flux/create.go b/cmd/flux/create.go index 795d9367..209c8f0a 100644 --- a/cmd/flux/create.go +++ b/cmd/flux/create.go @@ -132,7 +132,7 @@ func (names apiType) upsertAndWait(object upsertWaitable, mutate func() error) e logger.Waitingf("waiting for %s reconciliation", names.kind) if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, - isReady(kubeClient, namespacedName, object)); err != nil { + isObjectReadyConditionFunc(kubeClient, namespacedName, object.asClientObject())); err != nil { return err } logger.Successf("%s reconciliation completed", names.kind) diff --git a/cmd/flux/create_alert.go b/cmd/flux/create_alert.go index 0f50bf02..22a5d289 100644 --- a/cmd/flux/create_alert.go +++ b/cmd/flux/create_alert.go @@ -22,7 +22,6 @@ import ( "github.com/spf13/cobra" "k8s.io/apimachinery/pkg/api/errors" - apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" @@ -133,7 +132,7 @@ func createAlertCmdRun(cmd *cobra.Command, args []string) error { logger.Waitingf("waiting for Alert reconciliation") if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, - isAlertReady(kubeClient, namespacedName, &alert)); err != nil { + isObjectReadyConditionFunc(kubeClient, namespacedName, &alert)); err != nil { return err } logger.Successf("Alert %s is ready", name) @@ -170,22 +169,3 @@ func upsertAlert(ctx context.Context, kubeClient client.Client, logger.Successf("Alert updated") return namespacedName, nil } - -func isAlertReady(kubeClient client.Client, namespacedName types.NamespacedName, alert *notificationv1b2.Alert) wait.ConditionWithContextFunc { - return func(ctx context.Context) (bool, error) { - err := kubeClient.Get(ctx, namespacedName, alert) - if err != nil { - return false, err - } - - if c := apimeta.FindStatusCondition(alert.Status.Conditions, meta.ReadyCondition); c != nil { - switch c.Status { - case metav1.ConditionTrue: - return true, nil - case metav1.ConditionFalse: - return false, fmt.Errorf(c.Message) - } - } - return false, nil - } -} diff --git a/cmd/flux/create_alertprovider.go b/cmd/flux/create_alertprovider.go index 8523d1c5..468ec5d8 100644 --- a/cmd/flux/create_alertprovider.go +++ b/cmd/flux/create_alertprovider.go @@ -22,7 +22,6 @@ import ( "github.com/spf13/cobra" "k8s.io/apimachinery/pkg/api/errors" - apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" @@ -128,7 +127,7 @@ func createAlertProviderCmdRun(cmd *cobra.Command, args []string) error { logger.Waitingf("waiting for Provider reconciliation") if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, - isAlertProviderReady(kubeClient, namespacedName, &provider)); err != nil { + isObjectReadyConditionFunc(kubeClient, namespacedName, &provider)); err != nil { return err } @@ -167,22 +166,3 @@ func upsertAlertProvider(ctx context.Context, kubeClient client.Client, logger.Successf("Provider updated") return namespacedName, nil } - -func isAlertProviderReady(kubeClient client.Client, namespacedName types.NamespacedName, provider *notificationv1.Provider) wait.ConditionWithContextFunc { - return func(ctx context.Context) (bool, error) { - err := kubeClient.Get(ctx, namespacedName, provider) - if err != nil { - return false, err - } - - if c := apimeta.FindStatusCondition(provider.Status.Conditions, meta.ReadyCondition); c != nil { - switch c.Status { - case metav1.ConditionTrue: - return true, nil - case metav1.ConditionFalse: - return false, fmt.Errorf(c.Message) - } - } - return false, nil - } -} diff --git a/cmd/flux/create_helmrelease.go b/cmd/flux/create_helmrelease.go index b6ef92b3..64bfcfe3 100644 --- a/cmd/flux/create_helmrelease.go +++ b/cmd/flux/create_helmrelease.go @@ -32,7 +32,6 @@ import ( "github.com/spf13/cobra" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/api/errors" - apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" @@ -304,7 +303,7 @@ func createHelmReleaseCmdRun(cmd *cobra.Command, args []string) error { logger.Waitingf("waiting for HelmRelease reconciliation") if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, - isHelmReleaseReady(kubeClient, namespacedName, &helmRelease)); err != nil { + isObjectReadyConditionFunc(kubeClient, namespacedName, &helmRelease)); err != nil { return err } logger.Successf("HelmRelease %s is ready", name) @@ -344,22 +343,6 @@ func upsertHelmRelease(ctx context.Context, kubeClient client.Client, return namespacedName, nil } -func isHelmReleaseReady(kubeClient client.Client, namespacedName types.NamespacedName, helmRelease *helmv2.HelmRelease) wait.ConditionWithContextFunc { - return func(ctx context.Context) (bool, error) { - err := kubeClient.Get(ctx, namespacedName, helmRelease) - if err != nil { - return false, err - } - - // Confirm the state we are observing is for the current generation - if helmRelease.Generation != helmRelease.Status.ObservedGeneration { - return false, nil - } - - return apimeta.IsStatusConditionTrue(helmRelease.Status.Conditions, meta.ReadyCondition), nil - } -} - func validateStrategy(input string) bool { allowedStrategy := []string{"Revision", "ChartVersion"} diff --git a/cmd/flux/create_kustomization.go b/cmd/flux/create_kustomization.go index eb2f8fc1..ec9a5293 100644 --- a/cmd/flux/create_kustomization.go +++ b/cmd/flux/create_kustomization.go @@ -24,7 +24,6 @@ import ( "github.com/spf13/cobra" "k8s.io/apimachinery/pkg/api/errors" - apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" @@ -264,7 +263,7 @@ func createKsCmdRun(cmd *cobra.Command, args []string) error { logger.Waitingf("waiting for Kustomization reconciliation") if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, - isKustomizationReady(kubeClient, namespacedName, &kustomization)); err != nil { + isObjectReadyConditionFunc(kubeClient, namespacedName, &kustomization)); err != nil { return err } logger.Successf("Kustomization %s is ready", name) @@ -303,27 +302,3 @@ func upsertKustomization(ctx context.Context, kubeClient client.Client, logger.Successf("Kustomization updated") return namespacedName, nil } - -func isKustomizationReady(kubeClient client.Client, namespacedName types.NamespacedName, kustomization *kustomizev1.Kustomization) wait.ConditionWithContextFunc { - return func(ctx context.Context) (bool, error) { - err := kubeClient.Get(ctx, namespacedName, kustomization) - if err != nil { - return false, err - } - - // Confirm the state we are observing is for the current generation - if kustomization.Generation != kustomization.Status.ObservedGeneration { - return false, nil - } - - if c := apimeta.FindStatusCondition(kustomization.Status.Conditions, meta.ReadyCondition); c != nil { - switch c.Status { - case metav1.ConditionTrue: - return true, nil - case metav1.ConditionFalse: - return false, fmt.Errorf(c.Message) - } - } - return false, nil - } -} diff --git a/cmd/flux/create_receiver.go b/cmd/flux/create_receiver.go index 045ff771..ad6436cf 100644 --- a/cmd/flux/create_receiver.go +++ b/cmd/flux/create_receiver.go @@ -22,7 +22,6 @@ import ( "github.com/spf13/cobra" "k8s.io/apimachinery/pkg/api/errors" - apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" @@ -140,7 +139,7 @@ func createReceiverCmdRun(cmd *cobra.Command, args []string) error { logger.Waitingf("waiting for Receiver reconciliation") if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, - isReceiverReady(kubeClient, namespacedName, &receiver)); err != nil { + isObjectReadyConditionFunc(kubeClient, namespacedName, &receiver)); err != nil { return err } logger.Successf("Receiver %s is ready", name) @@ -179,22 +178,3 @@ func upsertReceiver(ctx context.Context, kubeClient client.Client, logger.Successf("Receiver updated") return namespacedName, nil } - -func isReceiverReady(kubeClient client.Client, namespacedName types.NamespacedName, receiver *notificationv1.Receiver) wait.ConditionWithContextFunc { - return func(ctx context.Context) (bool, error) { - err := kubeClient.Get(ctx, namespacedName, receiver) - if err != nil { - return false, err - } - - if c := apimeta.FindStatusCondition(receiver.Status.Conditions, meta.ReadyCondition); c != nil { - switch c.Status { - case metav1.ConditionTrue: - return true, nil - case metav1.ConditionFalse: - return false, fmt.Errorf(c.Message) - } - } - return false, nil - } -} diff --git a/cmd/flux/create_source_bucket.go b/cmd/flux/create_source_bucket.go index ebb6a834..32c88730 100644 --- a/cmd/flux/create_source_bucket.go +++ b/cmd/flux/create_source_bucket.go @@ -31,7 +31,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/fluxcd/pkg/apis/meta" - "github.com/fluxcd/pkg/runtime/conditions" sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" @@ -205,7 +204,7 @@ func createSourceBucketCmdRun(cmd *cobra.Command, args []string) error { logger.Waitingf("waiting for Bucket source reconciliation") if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, - isBucketReady(kubeClient, namespacedName, bucket)); err != nil { + isObjectReadyConditionFunc(kubeClient, namespacedName, bucket)); err != nil { return err } logger.Successf("Bucket source reconciliation completed") @@ -247,29 +246,3 @@ func upsertBucket(ctx context.Context, kubeClient client.Client, logger.Successf("Bucket source updated") return namespacedName, nil } - -func isBucketReady(kubeClient client.Client, namespacedName types.NamespacedName, bucket *sourcev1.Bucket) wait.ConditionWithContextFunc { - return func(ctx context.Context) (bool, error) { - err := kubeClient.Get(ctx, namespacedName, bucket) - if err != nil { - return false, err - } - - if c := conditions.Get(bucket, meta.ReadyCondition); c != nil { - // Confirm the Ready condition we are observing is for the - // current generation - if c.ObservedGeneration != bucket.GetGeneration() { - return false, nil - } - - // Further check the Status - switch c.Status { - case metav1.ConditionTrue: - return true, nil - case metav1.ConditionFalse: - return false, fmt.Errorf(c.Message) - } - } - return false, nil - } -} diff --git a/cmd/flux/create_source_git.go b/cmd/flux/create_source_git.go index 8c0d031d..67b03dab 100644 --- a/cmd/flux/create_source_git.go +++ b/cmd/flux/create_source_git.go @@ -35,7 +35,6 @@ import ( "sigs.k8s.io/yaml" "github.com/fluxcd/pkg/apis/meta" - "github.com/fluxcd/pkg/runtime/conditions" sourcev1 "github.com/fluxcd/source-controller/api/v1" @@ -326,7 +325,7 @@ func createSourceGitCmdRun(cmd *cobra.Command, args []string) error { logger.Waitingf("waiting for GitRepository source reconciliation") if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, - isGitRepositoryReady(kubeClient, namespacedName, &gitRepository)); err != nil { + isObjectReadyConditionFunc(kubeClient, namespacedName, &gitRepository)); err != nil { return err } logger.Successf("GitRepository source reconciliation completed") @@ -368,29 +367,3 @@ func upsertGitRepository(ctx context.Context, kubeClient client.Client, logger.Successf("GitRepository source updated") return namespacedName, nil } - -func isGitRepositoryReady(kubeClient client.Client, namespacedName types.NamespacedName, gitRepository *sourcev1.GitRepository) wait.ConditionWithContextFunc { - return func(ctx context.Context) (bool, error) { - err := kubeClient.Get(ctx, namespacedName, gitRepository) - if err != nil { - return false, err - } - - if c := conditions.Get(gitRepository, meta.ReadyCondition); c != nil { - // Confirm the Ready condition we are observing is for the - // current generation - if c.ObservedGeneration != gitRepository.GetGeneration() { - return false, nil - } - - // Further check the Status - switch c.Status { - case metav1.ConditionTrue: - return true, nil - case metav1.ConditionFalse: - return false, fmt.Errorf(c.Message) - } - } - return false, nil - } -} diff --git a/cmd/flux/create_source_git_test.go b/cmd/flux/create_source_git_test.go index 27365990..b34e4208 100644 --- a/cmd/flux/create_source_git_test.go +++ b/cmd/flux/create_source_git_test.go @@ -181,12 +181,21 @@ func TestCreateSourceGit(t *testing.T) { Time: time.Now(), }, } + repo.Status.ObservedGeneration = repo.GetGeneration() }, }, { "Failed", command, assertError("failed message"), func(repo *sourcev1.GitRepository) { + stalledCondition := metav1.Condition{ + Type: meta.StalledCondition, + Status: metav1.ConditionTrue, + Reason: sourcev1.URLInvalidReason, + Message: "failed message", + ObservedGeneration: repo.GetGeneration(), + } + apimeta.SetStatusCondition(&repo.Status.Conditions, stalledCondition) newCondition := metav1.Condition{ Type: meta.ReadyCondition, Status: metav1.ConditionFalse, @@ -195,6 +204,7 @@ func TestCreateSourceGit(t *testing.T) { ObservedGeneration: repo.GetGeneration(), } apimeta.SetStatusCondition(&repo.Status.Conditions, newCondition) + repo.Status.ObservedGeneration = repo.GetGeneration() }, }, { "NoArtifact", @@ -210,6 +220,7 @@ func TestCreateSourceGit(t *testing.T) { ObservedGeneration: repo.GetGeneration(), } apimeta.SetStatusCondition(&repo.Status.Conditions, newCondition) + repo.Status.ObservedGeneration = repo.GetGeneration() }, }, } diff --git a/cmd/flux/create_source_helm.go b/cmd/flux/create_source_helm.go index ea82fa8a..5b46f45e 100644 --- a/cmd/flux/create_source_helm.go +++ b/cmd/flux/create_source_helm.go @@ -23,7 +23,6 @@ import ( "os" "github.com/fluxcd/pkg/apis/meta" - "github.com/fluxcd/pkg/runtime/conditions" "github.com/spf13/cobra" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -232,7 +231,7 @@ func createSourceHelmCmdRun(cmd *cobra.Command, args []string) error { logger.Waitingf("waiting for HelmRepository source reconciliation") if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, - isHelmRepositoryReady(kubeClient, namespacedName, helmRepository)); err != nil { + isObjectReadyConditionFunc(kubeClient, namespacedName, helmRepository)); err != nil { return err } logger.Successf("HelmRepository source reconciliation completed") @@ -279,29 +278,3 @@ func upsertHelmRepository(ctx context.Context, kubeClient client.Client, logger.Successf("source updated") return namespacedName, nil } - -func isHelmRepositoryReady(kubeClient client.Client, namespacedName types.NamespacedName, helmRepository *sourcev1.HelmRepository) wait.ConditionWithContextFunc { - return func(ctx context.Context) (bool, error) { - err := kubeClient.Get(ctx, namespacedName, helmRepository) - if err != nil { - return false, err - } - - if c := conditions.Get(helmRepository, meta.ReadyCondition); c != nil { - // Confirm the Ready condition we are observing is for the - // current generation - if c.ObservedGeneration != helmRepository.GetGeneration() { - return false, nil - } - - // Further check the Status - switch c.Status { - case metav1.ConditionTrue: - return true, nil - case metav1.ConditionFalse: - return false, fmt.Errorf(c.Message) - } - } - return false, nil - } -} diff --git a/cmd/flux/create_source_oci.go b/cmd/flux/create_source_oci.go index 30fc4d8d..6393380c 100644 --- a/cmd/flux/create_source_oci.go +++ b/cmd/flux/create_source_oci.go @@ -29,7 +29,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/fluxcd/pkg/apis/meta" - "github.com/fluxcd/pkg/runtime/conditions" sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" @@ -193,7 +192,7 @@ func createSourceOCIRepositoryCmdRun(cmd *cobra.Command, args []string) error { logger.Waitingf("waiting for OCIRepository reconciliation") if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, - isOCIRepositoryReady(kubeClient, namespacedName, repository)); err != nil { + isObjectReadyConditionFunc(kubeClient, namespacedName, repository)); err != nil { return err } logger.Successf("OCIRepository reconciliation completed") @@ -235,29 +234,3 @@ func upsertOCIRepository(ctx context.Context, kubeClient client.Client, logger.Successf("OCIRepository updated") return namespacedName, nil } - -func isOCIRepositoryReady(kubeClient client.Client, namespacedName types.NamespacedName, ociRepository *sourcev1.OCIRepository) wait.ConditionWithContextFunc { - return func(ctx context.Context) (bool, error) { - err := kubeClient.Get(ctx, namespacedName, ociRepository) - if err != nil { - return false, err - } - - if c := conditions.Get(ociRepository, meta.ReadyCondition); c != nil { - // Confirm the Ready condition we are observing is for the - // current generation - if c.ObservedGeneration != ociRepository.GetGeneration() { - return false, nil - } - - // Further check the Status - switch c.Status { - case metav1.ConditionTrue: - return true, nil - case metav1.ConditionFalse: - return false, fmt.Errorf(c.Message) - } - } - return false, nil - } -} diff --git a/cmd/flux/main_e2e_test.go b/cmd/flux/main_e2e_test.go index 008b810f..f2b01cc6 100644 --- a/cmd/flux/main_e2e_test.go +++ b/cmd/flux/main_e2e_test.go @@ -25,10 +25,15 @@ import ( "os" "testing" + "github.com/go-logr/logr" + "sigs.k8s.io/controller-runtime/pkg/log" + "github.com/fluxcd/flux2/v2/internal/utils" ) func TestMain(m *testing.M) { + log.SetLogger(logr.New(log.NullLogSink{})) + // Ensure tests print consistent timestamps regardless of timezone os.Setenv("TZ", "UTC") diff --git a/cmd/flux/main_unit_test.go b/cmd/flux/main_unit_test.go index 8d3e6398..8e0af1aa 100644 --- a/cmd/flux/main_unit_test.go +++ b/cmd/flux/main_unit_test.go @@ -22,10 +22,13 @@ package main import ( "context" "fmt" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "os" "testing" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/log" ) // The test environment is long running process shared between tests, initialized @@ -34,6 +37,8 @@ import ( var testEnv *testEnvKubeManager func TestMain(m *testing.M) { + log.SetLogger(logr.New(log.NullLogSink{})) + // Ensure tests print consistent timestamps regardless of timezone os.Setenv("TZ", "UTC") diff --git a/cmd/flux/readiness.go b/cmd/flux/readiness.go new file mode 100644 index 00000000..f201625f --- /dev/null +++ b/cmd/flux/readiness.go @@ -0,0 +1,149 @@ +/* +Copyright 2023 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "fmt" + + kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status" + apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/fluxcd/pkg/apis/meta" + "github.com/fluxcd/pkg/runtime/object" + "github.com/fluxcd/pkg/runtime/patch" +) + +// objectStatusType is the type of object in terms of status when computing the +// readiness of an object. Readiness check method depends on the type of object. +// For a dynamic object, Ready status condition is considered only for the +// latest generation of the object. For a static object that don't have any +// condition, the object generation is not considered. +type objectStatusType int + +const ( + objectStatusDynamic objectStatusType = iota + objectStatusStatic +) + +// isObjectReady determines if an object is ready using the kstatus.Compute() +// result. statusType helps differenciate between static and dynamic objects to +// accurately check the object's readiness. A dynamic object may have some extra +// considerations depending on the object. +func isObjectReady(obj client.Object, statusType objectStatusType) (bool, error) { + observedGen, err := object.GetStatusObservedGeneration(obj) + if err != nil && err != object.ErrObservedGenerationNotFound { + return false, err + } + + if statusType == objectStatusDynamic { + // Object not reconciled yet. + if observedGen < 1 { + return false, nil + } + + cobj, ok := obj.(meta.ObjectWithConditions) + if !ok { + return false, fmt.Errorf("unable to get conditions from object") + } + + if c := apimeta.FindStatusCondition(cobj.GetConditions(), meta.ReadyCondition); c != nil { + // Ensure that the ready condition is for the latest generation of + // the object. + // NOTE: Some APIs like ImageUpdateAutomation and HelmRelease don't + // support per condition observed generation yet. Per condition + // observed generation for them are always zero. + // There are two strategies used across different object kinds to + // check the latest ready condition: + // - check that the ready condition's generation matches the + // object's generation. + // - check that the observed generation of the object in the + // status matches the object's generation. + // + // TODO: Once ImageUpdateAutomation and HelmRelease APIs have per + // condition observed generation, remove the object's observed + // generation and object's generation check (the second condition + // below). Also, try replacing this readiness check function with + // fluxcd/pkg/ssa's ResourceManager.Wait(), which uses kstatus + // internally to check readiness of the objects. + if c.ObservedGeneration != 0 && c.ObservedGeneration != obj.GetGeneration() { + return false, nil + } + if c.ObservedGeneration == 0 && observedGen != obj.GetGeneration() { + return false, nil + } + } else { + return false, nil + } + } + + u, err := patch.ToUnstructured(obj) + if err != nil { + return false, err + } + result, err := kstatus.Compute(u) + if err != nil { + return false, err + } + switch result.Status { + case kstatus.CurrentStatus: + return true, nil + case kstatus.InProgressStatus: + return false, nil + default: + return false, fmt.Errorf(result.Message) + } +} + +// isObjectReadyConditionFunc returns a wait.ConditionFunc to be used with +// wait.Poll* while polling for an object with dynamic status to be ready. +func isObjectReadyConditionFunc(kubeClient client.Client, namespaceName types.NamespacedName, obj client.Object) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { + err := kubeClient.Get(ctx, namespaceName, obj) + if err != nil { + return false, err + } + + return isObjectReady(obj, objectStatusDynamic) + } +} + +// isStaticObjectReadyConditionFunc returns a wait.ConditionFunc to be used with +// wait.Poll* while polling for an object with static or no status to be +// ready. +func isStaticObjectReadyConditionFunc(kubeClient client.Client, namespaceName types.NamespacedName, obj client.Object) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { + err := kubeClient.Get(ctx, namespaceName, obj) + if err != nil { + return false, err + } + + return isObjectReady(obj, objectStatusStatic) + } +} + +// kstatusCompute returns the kstatus computed result of a given object. +func kstatusCompute(obj client.Object) (result *kstatus.Result, err error) { + u, err := patch.ToUnstructured(obj) + if err != nil { + return result, err + } + return kstatus.Compute(u) +} diff --git a/cmd/flux/readiness_test.go b/cmd/flux/readiness_test.go new file mode 100644 index 00000000..4d987042 --- /dev/null +++ b/cmd/flux/readiness_test.go @@ -0,0 +1,139 @@ +/* +Copyright 2023 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + notificationv1 "github.com/fluxcd/notification-controller/api/v1beta3" + "github.com/fluxcd/pkg/apis/meta" + "github.com/fluxcd/pkg/runtime/conditions" + sourcev1 "github.com/fluxcd/source-controller/api/v1" +) + +func Test_isObjectReady(t *testing.T) { + // Ready object. + readyObj := &sourcev1.GitRepository{} + readyObj.Generation = 1 + readyObj.Status.ObservedGeneration = 1 + conditions.MarkTrue(readyObj, meta.ReadyCondition, "foo1", "bar1") + + // Not ready object. + notReadyObj := readyObj.DeepCopy() + conditions.MarkFalse(notReadyObj, meta.ReadyCondition, "foo2", "bar2") + + // Not reconciled object. + notReconciledObj := readyObj.DeepCopy() + notReconciledObj.Status = sourcev1.GitRepositoryStatus{ObservedGeneration: -1} + + // No condition. + noConditionObj := readyObj.DeepCopy() + noConditionObj.Status = sourcev1.GitRepositoryStatus{ObservedGeneration: 1} + + // Outdated condition. + readyObjOutdated := readyObj.DeepCopy() + readyObjOutdated.Generation = 2 + + // Object without per condition observed generation. + oldObj := readyObj.DeepCopy() + readyTrueCondn := conditions.TrueCondition(meta.ReadyCondition, "foo3", "bar3") + oldObj.Status.Conditions = []metav1.Condition{*readyTrueCondn} + + // Outdated object without per condition observed generation. + oldObjOutdated := oldObj.DeepCopy() + oldObjOutdated.Generation = 2 + + // Empty status object. + staticObj := readyObj.DeepCopy() + staticObj.Status = sourcev1.GitRepositoryStatus{} + + // No status object. + noStatusObj := ¬ificationv1.Provider{} + noStatusObj.Generation = 1 + + type args struct { + obj client.Object + statusType objectStatusType + } + tests := []struct { + name string + args args + want bool + wantErr bool + }{ + { + name: "dynamic ready", + args: args{obj: readyObj, statusType: objectStatusDynamic}, + want: true, + }, + { + name: "dynamic not ready", + args: args{obj: notReadyObj, statusType: objectStatusDynamic}, + want: false, + }, + { + name: "dynamic not reconciled", + args: args{obj: notReconciledObj, statusType: objectStatusDynamic}, + want: false, + }, + { + name: "dynamic not condition", + args: args{obj: noConditionObj, statusType: objectStatusDynamic}, + want: false, + }, + { + name: "dynamic ready outdated", + args: args{obj: readyObjOutdated, statusType: objectStatusDynamic}, + want: false, + }, + { + name: "dynamic ready without per condition gen", + args: args{obj: oldObj, statusType: objectStatusDynamic}, + want: true, + }, + { + name: "dynamic outdated ready status without per condition gen", + args: args{obj: oldObjOutdated, statusType: objectStatusDynamic}, + want: false, + }, + { + name: "static empty status", + args: args{obj: staticObj, statusType: objectStatusStatic}, + want: true, + }, + { + name: "static no status", + args: args{obj: noStatusObj, statusType: objectStatusStatic}, + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := isObjectReady(tt.args.obj, tt.args.statusType) + if (err != nil) != tt.wantErr { + t.Errorf("isObjectReady() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("isObjectReady() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/cmd/flux/reconcile.go b/cmd/flux/reconcile.go index 08b47a2a..7589dcb9 100644 --- a/cmd/flux/reconcile.go +++ b/cmd/flux/reconcile.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status" "github.com/spf13/cobra" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -30,8 +31,6 @@ import ( "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/client" - notificationv1 "github.com/fluxcd/notification-controller/api/v1" - notificationv1b2 "github.com/fluxcd/notification-controller/api/v1beta2" "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/flux2/v2/internal/utils" @@ -112,16 +111,6 @@ func (reconcile reconcileCommand) run(cmd *cobra.Command, args []string) error { } logger.Successf("%s annotated", reconcile.kind) - if reconcile.kind == notificationv1b2.AlertKind || reconcile.kind == notificationv1.ReceiverKind { - if err = wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, - isReconcileReady(kubeClient, namespacedName, reconcile.object)); err != nil { - return err - } - - logger.Successf(reconcile.object.successMessage()) - return nil - } - lastHandledReconcileAt := reconcile.object.lastHandledReconcileRequest() logger.Waitingf("waiting for %s reconciliation", reconcile.kind) if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, @@ -146,9 +135,17 @@ func reconciliationHandled(kubeClient client.Client, namespacedName types.Namesp if err != nil { return false, err } - isProgressing := apimeta.IsStatusConditionPresentAndEqual(reconcilableConditions(obj), - meta.ReadyCondition, metav1.ConditionUnknown) - return obj.lastHandledReconcileRequest() != lastHandledReconcileAt && !isProgressing, nil + + if obj.lastHandledReconcileRequest() == lastHandledReconcileAt { + return false, nil + } + + result, err := kstatusCompute(obj.asClientObject()) + if err != nil { + return false, err + } + + return result.Status == kstatus.CurrentStatus, nil } } @@ -174,22 +171,3 @@ func requestReconciliation(ctx context.Context, kubeClient client.Client, return kubeClient.Patch(ctx, object, patch) }) } - -func isReconcileReady(kubeClient client.Client, namespacedName types.NamespacedName, obj reconcilable) wait.ConditionWithContextFunc { - return func(ctx context.Context) (bool, error) { - err := kubeClient.Get(ctx, namespacedName, obj.asClientObject()) - if err != nil { - return false, err - } - - if c := apimeta.FindStatusCondition(reconcilableConditions(obj), meta.ReadyCondition); c != nil { - switch c.Status { - case metav1.ConditionTrue: - return true, nil - case metav1.ConditionFalse: - return false, fmt.Errorf(c.Message) - } - } - return false, nil - } -} diff --git a/cmd/flux/reconcile_alert.go b/cmd/flux/reconcile_alert.go index 655b366f..8dd8be61 100644 --- a/cmd/flux/reconcile_alert.go +++ b/cmd/flux/reconcile_alert.go @@ -40,5 +40,5 @@ func init() { } func (obj alertAdapter) lastHandledReconcileRequest() string { - return "" + return obj.Status.GetLastHandledReconcileRequest() } diff --git a/cmd/flux/reconcile_alertprovider.go b/cmd/flux/reconcile_alertprovider.go index 4f6ce136..5c3001af 100644 --- a/cmd/flux/reconcile_alertprovider.go +++ b/cmd/flux/reconcile_alertprovider.go @@ -17,18 +17,9 @@ limitations under the License. package main import ( - "context" - "fmt" - "time" - "github.com/spf13/cobra" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" notificationv1 "github.com/fluxcd/notification-controller/api/v1beta2" - "github.com/fluxcd/pkg/apis/meta" - - "github.com/fluxcd/flux2/v2/internal/utils" ) var reconcileAlertProviderCmd = &cobra.Command{ @@ -38,56 +29,16 @@ var reconcileAlertProviderCmd = &cobra.Command{ Example: ` # Trigger a reconciliation for an existing provider flux reconcile alert-provider slack`, ValidArgsFunction: resourceNamesCompletionFunc(notificationv1.GroupVersion.WithKind(notificationv1.ProviderKind)), - RunE: reconcileAlertProviderCmdRun, + RunE: reconcileCommand{ + apiType: alertProviderType, + object: alertProviderAdapter{¬ificationv1.Provider{}}, + }.run, } func init() { reconcileCmd.AddCommand(reconcileAlertProviderCmd) } -func reconcileAlertProviderCmdRun(cmd *cobra.Command, args []string) error { - if len(args) < 1 { - return fmt.Errorf("Provider name is required") - } - name := args[0] - - ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout) - defer cancel() - - kubeClient, err := utils.KubeClient(kubeconfigArgs, kubeclientOptions) - if err != nil { - return err - } - - namespacedName := types.NamespacedName{ - Namespace: *kubeconfigArgs.Namespace, - Name: name, - } - - logger.Actionf("annotating Provider %s in %s namespace", name, *kubeconfigArgs.Namespace) - var alertProvider notificationv1.Provider - err = kubeClient.Get(ctx, namespacedName, &alertProvider) - if err != nil { - return err - } - - if alertProvider.Annotations == nil { - alertProvider.Annotations = map[string]string{ - meta.ReconcileRequestAnnotation: time.Now().Format(time.RFC3339Nano), - } - } else { - alertProvider.Annotations[meta.ReconcileRequestAnnotation] = time.Now().Format(time.RFC3339Nano) - } - if err := kubeClient.Update(ctx, &alertProvider); err != nil { - return err - } - logger.Successf("Provider annotated") - - logger.Waitingf("waiting for reconciliation") - if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, - isAlertProviderReady(kubeClient, namespacedName, &alertProvider)); err != nil { - return err - } - logger.Successf("Provider reconciliation completed") - return nil +func (obj alertProviderAdapter) lastHandledReconcileRequest() string { + return obj.Status.GetLastHandledReconcileRequest() } diff --git a/cmd/flux/reconcile_receiver.go b/cmd/flux/reconcile_receiver.go index a16946a7..42a2dd24 100644 --- a/cmd/flux/reconcile_receiver.go +++ b/cmd/flux/reconcile_receiver.go @@ -17,18 +17,9 @@ limitations under the License. package main import ( - "context" - "fmt" - "time" - "github.com/spf13/cobra" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" notificationv1 "github.com/fluxcd/notification-controller/api/v1" - "github.com/fluxcd/pkg/apis/meta" - - "github.com/fluxcd/flux2/v2/internal/utils" ) var reconcileReceiverCmd = &cobra.Command{ @@ -38,62 +29,16 @@ var reconcileReceiverCmd = &cobra.Command{ Example: ` # Trigger a reconciliation for an existing receiver flux reconcile receiver main`, ValidArgsFunction: resourceNamesCompletionFunc(notificationv1.GroupVersion.WithKind(notificationv1.ReceiverKind)), - RunE: reconcileReceiverCmdRun, + RunE: reconcileCommand{ + apiType: receiverType, + object: receiverAdapter{¬ificationv1.Receiver{}}, + }.run, } func init() { reconcileCmd.AddCommand(reconcileReceiverCmd) } -func reconcileReceiverCmdRun(cmd *cobra.Command, args []string) error { - if len(args) < 1 { - return fmt.Errorf("receiver name is required") - } - name := args[0] - - ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout) - defer cancel() - - kubeClient, err := utils.KubeClient(kubeconfigArgs, kubeclientOptions) - if err != nil { - return err - } - - namespacedName := types.NamespacedName{ - Namespace: *kubeconfigArgs.Namespace, - Name: name, - } - - var receiver notificationv1.Receiver - err = kubeClient.Get(ctx, namespacedName, &receiver) - if err != nil { - return err - } - - if receiver.Spec.Suspend { - return fmt.Errorf("resource is suspended") - } - - logger.Actionf("annotating Receiver %s in %s namespace", name, *kubeconfigArgs.Namespace) - if receiver.Annotations == nil { - receiver.Annotations = map[string]string{ - meta.ReconcileRequestAnnotation: time.Now().Format(time.RFC3339Nano), - } - } else { - receiver.Annotations[meta.ReconcileRequestAnnotation] = time.Now().Format(time.RFC3339Nano) - } - if err := kubeClient.Update(ctx, &receiver); err != nil { - return err - } - logger.Successf("Receiver annotated") - - logger.Waitingf("waiting for Receiver reconciliation") - if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, - isReceiverReady(kubeClient, namespacedName, &receiver)); err != nil { - return err - } - - logger.Successf("Receiver reconciliation completed") - - return nil +func (obj receiverAdapter) lastHandledReconcileRequest() string { + return obj.Status.GetLastHandledReconcileRequest() } diff --git a/cmd/flux/resume.go b/cmd/flux/resume.go index d62895ca..4cb700a2 100644 --- a/cmd/flux/resume.go +++ b/cmd/flux/resume.go @@ -213,7 +213,7 @@ func (resume resumeCommand) reconcile(ctx context.Context, res resumable) reconc logger.Waitingf("waiting for %s reconciliation", resume.kind) if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, - isReady(resume.client, namespacedName, res)); err != nil { + isObjectReadyConditionFunc(resume.client, namespacedName, res.asClientObject())); err != nil { return reconcileResponse{ resumable: res, err: err, diff --git a/cmd/flux/resume_alertprovider.go b/cmd/flux/resume_alertprovider.go new file mode 100644 index 00000000..e0ba49b9 --- /dev/null +++ b/cmd/flux/resume_alertprovider.go @@ -0,0 +1,60 @@ +/* +Copyright 2023 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "github.com/spf13/cobra" + + notificationv1 "github.com/fluxcd/notification-controller/api/v1beta2" +) + +var resumeAlertProviderCmd = &cobra.Command{ + Use: "alert-provider [name]", + Short: "Resume a suspended Provider", + Long: `The resume command marks a previously suspended Provider resource for reconciliation and waits for it to +finish the apply.`, + Example: ` # Resume reconciliation for an existing Provider + flux resume alert-provider main + + # Resume reconciliation for multiple Providers + flux resume alert-provider main-1 main-2`, + ValidArgsFunction: resourceNamesCompletionFunc(notificationv1.GroupVersion.WithKind(notificationv1.ProviderKind)), + RunE: resumeCommand{ + apiType: alertProviderType, + list: &alertProviderListAdapter{¬ificationv1.ProviderList{}}, + }.run, +} + +func init() { + resumeCmd.AddCommand(resumeAlertProviderCmd) +} + +func (obj alertProviderAdapter) getObservedGeneration() int64 { + return obj.Provider.Status.ObservedGeneration +} + +func (obj alertProviderAdapter) setUnsuspended() { + obj.Provider.Spec.Suspend = false +} + +func (obj alertProviderAdapter) successMessage() string { + return "Provider reconciliation completed" +} + +func (a alertProviderListAdapter) resumeItem(i int) resumable { + return &alertProviderAdapter{&a.ProviderList.Items[i]} +} diff --git a/cmd/flux/status.go b/cmd/flux/status.go index 6e4e18c5..95c6e9f3 100644 --- a/cmd/flux/status.go +++ b/cmd/flux/status.go @@ -17,18 +17,9 @@ limitations under the License. package main import ( - "context" - "fmt" - - apimeta "k8s.io/apimachinery/pkg/api/meta" + "github.com/fluxcd/cli-utils/pkg/object" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/fluxcd/cli-utils/pkg/object" - "github.com/fluxcd/pkg/apis/meta" ) // statusable is used to see if a resource is considered ready in the usual way @@ -45,42 +36,6 @@ type oldConditions interface { GetStatusConditions() *[]metav1.Condition } -func statusableConditions(object statusable) []metav1.Condition { - if s, ok := object.(meta.ObjectWithConditions); ok { - return s.GetConditions() - } - - if s, ok := object.(oldConditions); ok { - return *s.GetStatusConditions() - } - - return []metav1.Condition{} -} - -func isReady(kubeClient client.Client, namespacedName types.NamespacedName, object statusable) wait.ConditionWithContextFunc { - return func(ctx context.Context) (bool, error) { - err := kubeClient.Get(ctx, namespacedName, object.asClientObject()) - if err != nil { - return false, err - } - - // Confirm the state we are observing is for the current generation - if object.GetGeneration() != object.getObservedGeneration() { - return false, nil - } - - if c := apimeta.FindStatusCondition(statusableConditions(object), meta.ReadyCondition); c != nil { - switch c.Status { - case metav1.ConditionTrue: - return true, nil - case metav1.ConditionFalse: - return false, fmt.Errorf(c.Message) - } - } - return false, nil - } -} - func buildComponentObjectRefs(components ...string) ([]object.ObjMetadata, error) { var objRefs []object.ObjMetadata for _, deployment := range components { diff --git a/cmd/flux/suspend_alertprovider.go b/cmd/flux/suspend_alertprovider.go new file mode 100644 index 00000000..5a69513c --- /dev/null +++ b/cmd/flux/suspend_alertprovider.go @@ -0,0 +1,56 @@ +/* +Copyright 2023 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "github.com/spf13/cobra" + + notificationv1 "github.com/fluxcd/notification-controller/api/v1beta2" +) + +var suspendAlertProviderCmd = &cobra.Command{ + Use: "alert-provider [name]", + Short: "Suspend reconciliation of Provider", + Long: `The suspend command disables the reconciliation of a Provider resource.`, + Example: ` # Suspend reconciliation for an existing Provider + flux suspend alert-provider main + + # Suspend reconciliation for multiple Providers + flux suspend alert-providers main-1 main-2`, + ValidArgsFunction: resourceNamesCompletionFunc(notificationv1.GroupVersion.WithKind(notificationv1.ProviderKind)), + RunE: suspendCommand{ + apiType: alertProviderType, + object: &alertProviderAdapter{¬ificationv1.Provider{}}, + list: &alertProviderListAdapter{¬ificationv1.ProviderList{}}, + }.run, +} + +func init() { + suspendCmd.AddCommand(suspendAlertProviderCmd) +} + +func (obj alertProviderAdapter) isSuspended() bool { + return obj.Provider.Spec.Suspend +} + +func (obj alertProviderAdapter) setSuspended() { + obj.Provider.Spec.Suspend = true +} + +func (a alertProviderListAdapter) item(i int) suspendable { + return &alertProviderAdapter{&a.ProviderList.Items[i]} +}