diff --git a/cmd/flux/create.go b/cmd/flux/create.go index 2368b771..795d9367 100644 --- a/cmd/flux/create.go +++ b/cmd/flux/create.go @@ -131,8 +131,8 @@ func (names apiType) upsertAndWait(object upsertWaitable, mutate func() error) e } logger.Waitingf("waiting for %s reconciliation", names.kind) - if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - isReady(ctx, kubeClient, namespacedName, object)); err != nil { + if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, + isReady(kubeClient, namespacedName, object)); err != nil { return err } logger.Successf("%s reconciliation completed", names.kind) diff --git a/cmd/flux/create_alert.go b/cmd/flux/create_alert.go index a980c1fa..0f50bf02 100644 --- a/cmd/flux/create_alert.go +++ b/cmd/flux/create_alert.go @@ -132,8 +132,8 @@ func createAlertCmdRun(cmd *cobra.Command, args []string) error { } logger.Waitingf("waiting for Alert reconciliation") - if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - isAlertReady(ctx, kubeClient, namespacedName, &alert)); err != nil { + if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, + isAlertReady(kubeClient, namespacedName, &alert)); err != nil { return err } logger.Successf("Alert %s is ready", name) @@ -171,9 +171,8 @@ func upsertAlert(ctx context.Context, kubeClient client.Client, return namespacedName, nil } -func isAlertReady(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, alert *notificationv1b2.Alert) wait.ConditionFunc { - return func() (bool, error) { +func isAlertReady(kubeClient client.Client, namespacedName types.NamespacedName, alert *notificationv1b2.Alert) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { err := kubeClient.Get(ctx, namespacedName, alert) if err != nil { return false, err diff --git a/cmd/flux/create_alertprovider.go b/cmd/flux/create_alertprovider.go index cef69baf..8523d1c5 100644 --- a/cmd/flux/create_alertprovider.go +++ b/cmd/flux/create_alertprovider.go @@ -127,8 +127,8 @@ func createAlertProviderCmdRun(cmd *cobra.Command, args []string) error { } logger.Waitingf("waiting for Provider reconciliation") - if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - isAlertProviderReady(ctx, kubeClient, namespacedName, &provider)); err != nil { + if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, + isAlertProviderReady(kubeClient, namespacedName, &provider)); err != nil { return err } @@ -168,9 +168,8 @@ func upsertAlertProvider(ctx context.Context, kubeClient client.Client, return namespacedName, nil } -func isAlertProviderReady(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, provider *notificationv1.Provider) wait.ConditionFunc { - return func() (bool, error) { +func isAlertProviderReady(kubeClient client.Client, namespacedName types.NamespacedName, provider *notificationv1.Provider) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { err := kubeClient.Get(ctx, namespacedName, provider) if err != nil { return false, err diff --git a/cmd/flux/create_helmrelease.go b/cmd/flux/create_helmrelease.go index 3721c1e7..b6ef92b3 100644 --- a/cmd/flux/create_helmrelease.go +++ b/cmd/flux/create_helmrelease.go @@ -303,8 +303,8 @@ func createHelmReleaseCmdRun(cmd *cobra.Command, args []string) error { } logger.Waitingf("waiting for HelmRelease reconciliation") - if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - isHelmReleaseReady(ctx, kubeClient, namespacedName, &helmRelease)); err != nil { + if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, + isHelmReleaseReady(kubeClient, namespacedName, &helmRelease)); err != nil { return err } logger.Successf("HelmRelease %s is ready", name) @@ -344,9 +344,8 @@ func upsertHelmRelease(ctx context.Context, kubeClient client.Client, return namespacedName, nil } -func isHelmReleaseReady(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, helmRelease *helmv2.HelmRelease) wait.ConditionFunc { - return func() (bool, error) { +func isHelmReleaseReady(kubeClient client.Client, namespacedName types.NamespacedName, helmRelease *helmv2.HelmRelease) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { err := kubeClient.Get(ctx, namespacedName, helmRelease) if err != nil { return false, err diff --git a/cmd/flux/create_kustomization.go b/cmd/flux/create_kustomization.go index fea9d836..eb2f8fc1 100644 --- a/cmd/flux/create_kustomization.go +++ b/cmd/flux/create_kustomization.go @@ -263,8 +263,8 @@ func createKsCmdRun(cmd *cobra.Command, args []string) error { } logger.Waitingf("waiting for Kustomization reconciliation") - if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - isKustomizationReady(ctx, kubeClient, namespacedName, &kustomization)); err != nil { + if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, + isKustomizationReady(kubeClient, namespacedName, &kustomization)); err != nil { return err } logger.Successf("Kustomization %s is ready", name) @@ -304,9 +304,8 @@ func upsertKustomization(ctx context.Context, kubeClient client.Client, return namespacedName, nil } -func isKustomizationReady(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, kustomization *kustomizev1.Kustomization) wait.ConditionFunc { - return func() (bool, error) { +func isKustomizationReady(kubeClient client.Client, namespacedName types.NamespacedName, kustomization *kustomizev1.Kustomization) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { err := kubeClient.Get(ctx, namespacedName, kustomization) if err != nil { return false, err diff --git a/cmd/flux/create_receiver.go b/cmd/flux/create_receiver.go index 5597dae2..045ff771 100644 --- a/cmd/flux/create_receiver.go +++ b/cmd/flux/create_receiver.go @@ -139,8 +139,8 @@ func createReceiverCmdRun(cmd *cobra.Command, args []string) error { } logger.Waitingf("waiting for Receiver reconciliation") - if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - isReceiverReady(ctx, kubeClient, namespacedName, &receiver)); err != nil { + if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, + isReceiverReady(kubeClient, namespacedName, &receiver)); err != nil { return err } logger.Successf("Receiver %s is ready", name) @@ -180,9 +180,8 @@ func upsertReceiver(ctx context.Context, kubeClient client.Client, return namespacedName, nil } -func isReceiverReady(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, receiver *notificationv1.Receiver) wait.ConditionFunc { - return func() (bool, error) { +func isReceiverReady(kubeClient client.Client, namespacedName types.NamespacedName, receiver *notificationv1.Receiver) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { err := kubeClient.Get(ctx, namespacedName, receiver) if err != nil { return false, err diff --git a/cmd/flux/create_source_bucket.go b/cmd/flux/create_source_bucket.go index 68ef8f3e..ebb6a834 100644 --- a/cmd/flux/create_source_bucket.go +++ b/cmd/flux/create_source_bucket.go @@ -204,8 +204,8 @@ func createSourceBucketCmdRun(cmd *cobra.Command, args []string) error { } logger.Waitingf("waiting for Bucket source reconciliation") - if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - isBucketReady(ctx, kubeClient, namespacedName, bucket)); err != nil { + if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, + isBucketReady(kubeClient, namespacedName, bucket)); err != nil { return err } logger.Successf("Bucket source reconciliation completed") @@ -248,9 +248,8 @@ func upsertBucket(ctx context.Context, kubeClient client.Client, return namespacedName, nil } -func isBucketReady(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, bucket *sourcev1.Bucket) wait.ConditionFunc { - return func() (bool, error) { +func isBucketReady(kubeClient client.Client, namespacedName types.NamespacedName, bucket *sourcev1.Bucket) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { err := kubeClient.Get(ctx, namespacedName, bucket) if err != nil { return false, err diff --git a/cmd/flux/create_source_git.go b/cmd/flux/create_source_git.go index 2c6cc59a..8c0d031d 100644 --- a/cmd/flux/create_source_git.go +++ b/cmd/flux/create_source_git.go @@ -325,8 +325,8 @@ func createSourceGitCmdRun(cmd *cobra.Command, args []string) error { } logger.Waitingf("waiting for GitRepository source reconciliation") - if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - isGitRepositoryReady(ctx, kubeClient, namespacedName, &gitRepository)); err != nil { + if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, + isGitRepositoryReady(kubeClient, namespacedName, &gitRepository)); err != nil { return err } logger.Successf("GitRepository source reconciliation completed") @@ -369,9 +369,8 @@ func upsertGitRepository(ctx context.Context, kubeClient client.Client, return namespacedName, nil } -func isGitRepositoryReady(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, gitRepository *sourcev1.GitRepository) wait.ConditionFunc { - return func() (bool, error) { +func isGitRepositoryReady(kubeClient client.Client, namespacedName types.NamespacedName, gitRepository *sourcev1.GitRepository) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { err := kubeClient.Get(ctx, namespacedName, gitRepository) if err != nil { return false, err diff --git a/cmd/flux/create_source_helm.go b/cmd/flux/create_source_helm.go index e7fa876b..ea82fa8a 100644 --- a/cmd/flux/create_source_helm.go +++ b/cmd/flux/create_source_helm.go @@ -231,8 +231,8 @@ func createSourceHelmCmdRun(cmd *cobra.Command, args []string) error { } logger.Waitingf("waiting for HelmRepository source reconciliation") - if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - isHelmRepositoryReady(ctx, kubeClient, namespacedName, helmRepository)); err != nil { + if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, + isHelmRepositoryReady(kubeClient, namespacedName, helmRepository)); err != nil { return err } logger.Successf("HelmRepository source reconciliation completed") @@ -280,9 +280,8 @@ func upsertHelmRepository(ctx context.Context, kubeClient client.Client, return namespacedName, nil } -func isHelmRepositoryReady(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, helmRepository *sourcev1.HelmRepository) wait.ConditionFunc { - return func() (bool, error) { +func isHelmRepositoryReady(kubeClient client.Client, namespacedName types.NamespacedName, helmRepository *sourcev1.HelmRepository) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { err := kubeClient.Get(ctx, namespacedName, helmRepository) if err != nil { return false, err diff --git a/cmd/flux/create_source_oci.go b/cmd/flux/create_source_oci.go index 388ab18c..30fc4d8d 100644 --- a/cmd/flux/create_source_oci.go +++ b/cmd/flux/create_source_oci.go @@ -192,8 +192,8 @@ func createSourceOCIRepositoryCmdRun(cmd *cobra.Command, args []string) error { } logger.Waitingf("waiting for OCIRepository reconciliation") - if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - isOCIRepositoryReady(ctx, kubeClient, namespacedName, repository)); err != nil { + if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, + isOCIRepositoryReady(kubeClient, namespacedName, repository)); err != nil { return err } logger.Successf("OCIRepository reconciliation completed") @@ -236,9 +236,8 @@ func upsertOCIRepository(ctx context.Context, kubeClient client.Client, return namespacedName, nil } -func isOCIRepositoryReady(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, ociRepository *sourcev1.OCIRepository) wait.ConditionFunc { - return func() (bool, error) { +func isOCIRepositoryReady(kubeClient client.Client, namespacedName types.NamespacedName, ociRepository *sourcev1.OCIRepository) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { err := kubeClient.Get(ctx, namespacedName, ociRepository) if err != nil { return false, err diff --git a/cmd/flux/reconcile.go b/cmd/flux/reconcile.go index 56f5f271..08b47a2a 100644 --- a/cmd/flux/reconcile.go +++ b/cmd/flux/reconcile.go @@ -113,8 +113,8 @@ func (reconcile reconcileCommand) run(cmd *cobra.Command, args []string) error { logger.Successf("%s annotated", reconcile.kind) if reconcile.kind == notificationv1b2.AlertKind || reconcile.kind == notificationv1.ReceiverKind { - if err = wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - isReconcileReady(ctx, kubeClient, namespacedName, reconcile.object)); err != nil { + if err = wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, + isReconcileReady(kubeClient, namespacedName, reconcile.object)); err != nil { return err } @@ -124,8 +124,8 @@ func (reconcile reconcileCommand) run(cmd *cobra.Command, args []string) error { lastHandledReconcileAt := reconcile.object.lastHandledReconcileRequest() logger.Waitingf("waiting for %s reconciliation", reconcile.kind) - if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - reconciliationHandled(ctx, kubeClient, namespacedName, reconcile.object, lastHandledReconcileAt)); err != nil { + if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, + reconciliationHandled(kubeClient, namespacedName, reconcile.object, lastHandledReconcileAt)); err != nil { return err } readyCond := apimeta.FindStatusCondition(reconcilableConditions(reconcile.object), meta.ReadyCondition) @@ -140,9 +140,8 @@ func (reconcile reconcileCommand) run(cmd *cobra.Command, args []string) error { return nil } -func reconciliationHandled(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, obj reconcilable, lastHandledReconcileAt string) wait.ConditionFunc { - return func() (bool, error) { +func reconciliationHandled(kubeClient client.Client, namespacedName types.NamespacedName, obj reconcilable, lastHandledReconcileAt string) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { err := kubeClient.Get(ctx, namespacedName, obj.asClientObject()) if err != nil { return false, err @@ -176,9 +175,8 @@ func requestReconciliation(ctx context.Context, kubeClient client.Client, }) } -func isReconcileReady(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, obj reconcilable) wait.ConditionFunc { - return func() (bool, error) { +func isReconcileReady(kubeClient client.Client, namespacedName types.NamespacedName, obj reconcilable) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { err := kubeClient.Get(ctx, namespacedName, obj.asClientObject()) if err != nil { return false, err diff --git a/cmd/flux/reconcile_alertprovider.go b/cmd/flux/reconcile_alertprovider.go index becf980f..4f6ce136 100644 --- a/cmd/flux/reconcile_alertprovider.go +++ b/cmd/flux/reconcile_alertprovider.go @@ -84,8 +84,8 @@ func reconcileAlertProviderCmdRun(cmd *cobra.Command, args []string) error { logger.Successf("Provider annotated") logger.Waitingf("waiting for reconciliation") - if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - isAlertProviderReady(ctx, kubeClient, namespacedName, &alertProvider)); err != nil { + if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, + isAlertProviderReady(kubeClient, namespacedName, &alertProvider)); err != nil { return err } logger.Successf("Provider reconciliation completed") diff --git a/cmd/flux/reconcile_receiver.go b/cmd/flux/reconcile_receiver.go index 8a04ea7a..a16946a7 100644 --- a/cmd/flux/reconcile_receiver.go +++ b/cmd/flux/reconcile_receiver.go @@ -88,8 +88,8 @@ func reconcileReceiverCmdRun(cmd *cobra.Command, args []string) error { logger.Successf("Receiver annotated") logger.Waitingf("waiting for Receiver reconciliation") - if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - isReceiverReady(ctx, kubeClient, namespacedName, &receiver)); err != nil { + if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, + isReceiverReady(kubeClient, namespacedName, &receiver)); err != nil { return err } diff --git a/cmd/flux/reconcile_with_source.go b/cmd/flux/reconcile_with_source.go index d0c3ae78..ac0a6f15 100644 --- a/cmd/flux/reconcile_with_source.go +++ b/cmd/flux/reconcile_with_source.go @@ -82,8 +82,8 @@ func (reconcile reconcileWithSourceCommand) run(cmd *cobra.Command, args []strin logger.Successf("%s annotated", reconcile.kind) logger.Waitingf("waiting for %s reconciliation", reconcile.kind) - if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - reconciliationHandled(ctx, kubeClient, namespacedName, reconcile.object, lastHandledReconcileAt)); err != nil { + if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, + reconciliationHandled(kubeClient, namespacedName, reconcile.object, lastHandledReconcileAt)); err != nil { return err } diff --git a/cmd/flux/resume.go b/cmd/flux/resume.go index bc27c4eb..d62895ca 100644 --- a/cmd/flux/resume.go +++ b/cmd/flux/resume.go @@ -212,8 +212,8 @@ func (resume resumeCommand) reconcile(ctx context.Context, res resumable) reconc logger.Waitingf("waiting for %s reconciliation", resume.kind) - if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout, - isReady(ctx, resume.client, namespacedName, res)); err != nil { + if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true, + isReady(resume.client, namespacedName, res)); err != nil { return reconcileResponse{ resumable: res, err: err, diff --git a/cmd/flux/status.go b/cmd/flux/status.go index e6c4b5fc..6fb41565 100644 --- a/cmd/flux/status.go +++ b/cmd/flux/status.go @@ -56,9 +56,8 @@ func statusableConditions(object statusable) []metav1.Condition { return []metav1.Condition{} } -func isReady(ctx context.Context, kubeClient client.Client, - namespacedName types.NamespacedName, object statusable) wait.ConditionFunc { - return func() (bool, error) { +func isReady(kubeClient client.Client, namespacedName types.NamespacedName, object statusable) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { err := kubeClient.Get(ctx, namespacedName, object.asClientObject()) if err != nil { return false, err diff --git a/pkg/bootstrap/bootstrap.go b/pkg/bootstrap/bootstrap.go index 5d10b724..1111f6e9 100644 --- a/pkg/bootstrap/bootstrap.go +++ b/pkg/bootstrap/bootstrap.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "k8s.io/apimachinery/pkg/util/wait" "strings" "time" @@ -172,10 +173,8 @@ func kustomizationPathDiffers(ctx context.Context, kube client.Client, objKey cl return k.Spec.Path, nil } -func kustomizationReconciled(ctx context.Context, kube client.Client, objKey client.ObjectKey, - kustomization *kustomizev1.Kustomization, expectRevision string) func() (bool, error) { - - return func() (bool, error) { +func kustomizationReconciled(kube client.Client, objKey client.ObjectKey, kustomization *kustomizev1.Kustomization, expectRevision string) wait.ConditionWithContextFunc { + return func(ctx context.Context) (bool, error) { if err := kube.Get(ctx, objKey, kustomization); err != nil { return false, err } diff --git a/pkg/bootstrap/bootstrap_plain_git.go b/pkg/bootstrap/bootstrap_plain_git.go index da341d6a..ba5a7f58 100644 --- a/pkg/bootstrap/bootstrap_plain_git.go +++ b/pkg/bootstrap/bootstrap_plain_git.go @@ -401,9 +401,8 @@ func (b *PlainGitBootstrapper) ReportKustomizationHealth(ctx context.Context, op expectRevision := fmt.Sprintf("%s@%s", options.Branch, git.Hash(head).Digest()) var k kustomizev1.Kustomization - if err := wait.PollImmediate(pollInterval, timeout, kustomizationReconciled( - ctx, b.kube, objKey, &k, expectRevision), - ); err != nil { + if err := wait.PollUntilContextTimeout(ctx, pollInterval, timeout, true, + kustomizationReconciled(b.kube, objKey, &k, expectRevision)); err != nil { b.logger.Failuref(err.Error()) return err }