diff --git a/cmd/flux/bootstrap.go b/cmd/flux/bootstrap.go index ea12e0a4..ba84a91d 100644 --- a/cmd/flux/bootstrap.go +++ b/cmd/flux/bootstrap.go @@ -36,6 +36,7 @@ import ( "github.com/fluxcd/flux2/pkg/manifestgen/install" kus "github.com/fluxcd/flux2/pkg/manifestgen/kustomization" "github.com/fluxcd/flux2/pkg/manifestgen/sync" + "github.com/fluxcd/flux2/pkg/status" ) var bootstrapCmd = &cobra.Command{ @@ -176,19 +177,24 @@ func generateInstallManifests(targetPath, namespace, tmpDir string, localManifes func applyInstallManifests(ctx context.Context, manifestPath string, components []string) error { kubectlArgs := []string{"apply", "-f", manifestPath} if _, err := utils.ExecKubectlCommand(ctx, utils.ModeOS, rootArgs.kubeconfig, rootArgs.kubecontext, kubectlArgs...); err != nil { - return fmt.Errorf("install failed") + return fmt.Errorf("install failed: %w", err) } - - statusChecker, err := NewStatusChecker(time.Second, rootArgs.timeout) + kubeConfig, err := utils.KubeConfig(rootArgs.kubeconfig, rootArgs.kubecontext) + if err != nil { + return fmt.Errorf("install failed: %w", err) + } + statusChecker, err := status.NewStatusChecker(kubeConfig, time.Second, rootArgs.timeout, logger) + if err != nil { + return fmt.Errorf("install failed: %w", err) + } + componentRefs, err := buildComponentObjectRefs(components...) if err != nil { return fmt.Errorf("install failed: %w", err) } - logger.Waitingf("verifying installation") - if err := statusChecker.Assess(components...); err != nil { + if err := statusChecker.Assess(componentRefs...); err != nil { return fmt.Errorf("install failed") } - return nil } diff --git a/cmd/flux/check.go b/cmd/flux/check.go index 5e1a24f3..da4fca96 100644 --- a/cmd/flux/check.go +++ b/cmd/flux/check.go @@ -34,6 +34,7 @@ import ( "github.com/fluxcd/flux2/internal/utils" "github.com/fluxcd/flux2/pkg/manifestgen/install" + "github.com/fluxcd/flux2/pkg/status" ) var checkCmd = &cobra.Command{ @@ -205,12 +206,17 @@ func componentsCheck() bool { ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout) defer cancel() - kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext) + kubeConfig, err := utils.KubeConfig(rootArgs.kubeconfig, rootArgs.kubecontext) + if err != nil { + return false + } + + statusChecker, err := status.NewStatusChecker(kubeConfig, time.Second, rootArgs.timeout, logger) if err != nil { return false } - statusChecker, err := NewStatusChecker(time.Second, rootArgs.timeout) + kubeClient, err := utils.KubeClient(rootArgs.kubeconfig, rootArgs.kubecontext) if err != nil { return false } @@ -220,10 +226,10 @@ func componentsCheck() bool { var list v1.DeploymentList if err := kubeClient.List(ctx, &list, client.InNamespace(rootArgs.namespace), selector); err == nil { for _, d := range list.Items { - if err := statusChecker.Assess(d.Name); err != nil { - ok = false - } else { - logger.Successf("%s: healthy", d.Name) + if ref, err := buildComponentObjectRefs(d.Name); err == nil { + if err := statusChecker.Assess(ref...); err != nil { + ok = false + } } for _, c := range d.Spec.Template.Spec.Containers { logger.Actionf(c.Image) diff --git a/cmd/flux/install.go b/cmd/flux/install.go index fb2740e2..fbbc586b 100644 --- a/cmd/flux/install.go +++ b/cmd/flux/install.go @@ -30,6 +30,7 @@ import ( "github.com/fluxcd/flux2/internal/flags" "github.com/fluxcd/flux2/internal/utils" "github.com/fluxcd/flux2/pkg/manifestgen/install" + "github.com/fluxcd/flux2/pkg/status" ) var installCmd = &cobra.Command{ @@ -200,7 +201,7 @@ func installCmdRun(cmd *cobra.Command, args []string) error { applyOutput = utils.ModeOS } if _, err := utils.ExecKubectlCommand(ctx, applyOutput, rootArgs.kubeconfig, rootArgs.kubecontext, kubectlArgs...); err != nil { - return fmt.Errorf("install failed") + return fmt.Errorf("install failed: %w", err) } if installArgs.dryRun { @@ -208,13 +209,20 @@ func installCmdRun(cmd *cobra.Command, args []string) error { return nil } - statusChecker, err := NewStatusChecker(time.Second, time.Minute) + kubeConfig, err := utils.KubeConfig(rootArgs.kubeconfig, rootArgs.kubecontext) + if err != nil { + return fmt.Errorf("install failed: %w", err) + } + statusChecker, err := status.NewStatusChecker(kubeConfig, time.Second, rootArgs.timeout, logger) + if err != nil { + return fmt.Errorf("install failed: %w", err) + } + componentRefs, err := buildComponentObjectRefs(components...) if err != nil { return fmt.Errorf("install failed: %w", err) } - logger.Waitingf("verifying installation") - if err := statusChecker.Assess(components...); err != nil { + if err := statusChecker.Assess(componentRefs...); err != nil { return fmt.Errorf("install failed") } diff --git a/cmd/flux/status.go b/cmd/flux/status.go index fab26c2b..631a3e2f 100644 --- a/cmd/flux/status.go +++ b/cmd/flux/status.go @@ -19,26 +19,16 @@ package main import ( "context" "fmt" - "time" - appsv1 "k8s.io/api/apps/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" - "sigs.k8s.io/cli-utils/pkg/kstatus/polling" - "sigs.k8s.io/cli-utils/pkg/kstatus/polling/aggregator" - "sigs.k8s.io/cli-utils/pkg/kstatus/polling/collector" - "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event" - "sigs.k8s.io/cli-utils/pkg/kstatus/status" "sigs.k8s.io/cli-utils/pkg/object" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "github.com/fluxcd/pkg/apis/meta" - - "github.com/fluxcd/flux2/internal/utils" ) // statusable is used to see if a resource is considered ready in the usual way @@ -51,13 +41,6 @@ type statusable interface { GetStatusConditions() *[]metav1.Condition } -type StatusChecker struct { - pollInterval time.Duration - timeout time.Duration - client client.Client - statusPoller *polling.StatusPoller -} - func isReady(ctx context.Context, kubeClient client.Client, namespacedName types.NamespacedName, object statusable) wait.ConditionFunc { return func() (bool, error) { @@ -83,74 +66,7 @@ func isReady(ctx context.Context, kubeClient client.Client, } } -func NewStatusChecker(pollInterval time.Duration, timeout time.Duration) (*StatusChecker, error) { - kubeConfig, err := utils.KubeConfig(rootArgs.kubeconfig, rootArgs.kubecontext) - if err != nil { - return nil, err - } - restMapper, err := apiutil.NewDynamicRESTMapper(kubeConfig) - if err != nil { - return nil, err - } - client, err := client.New(kubeConfig, client.Options{Mapper: restMapper}) - if err != nil { - return nil, err - } - - return &StatusChecker{ - pollInterval: pollInterval, - timeout: timeout, - client: client, - statusPoller: polling.NewStatusPoller(client, restMapper), - }, nil -} - -func (sc *StatusChecker) Assess(components ...string) error { - ctx, cancel := context.WithTimeout(context.Background(), sc.timeout) - defer cancel() - - objRefs, err := sc.getObjectRefs(components) - if err != nil { - return err - } - - opts := polling.Options{PollInterval: sc.pollInterval, UseCache: true} - eventsChan := sc.statusPoller.Poll(ctx, objRefs, opts) - - coll := collector.NewResourceStatusCollector(objRefs) - done := coll.ListenWithObserver(eventsChan, collector.ObserverFunc( - func(statusCollector *collector.ResourceStatusCollector, e event.Event) { - var rss []*event.ResourceStatus - for _, rs := range statusCollector.ResourceStatuses { - rss = append(rss, rs) - } - desired := status.CurrentStatus - aggStatus := aggregator.AggregateStatus(rss, desired) - if aggStatus == desired { - cancel() - return - } - }), - ) - <-done - - if coll.Error != nil || ctx.Err() == context.DeadlineExceeded { - for _, rs := range coll.ResourceStatuses { - if rs.Status != status.CurrentStatus { - if !sc.deploymentExists(rs.Identifier) { - logger.Failuref("%s: deployment not found", rs.Identifier.Name) - } else { - logger.Failuref("%s: unhealthy (timed out waiting for rollout)", rs.Identifier.Name) - } - } - } - return fmt.Errorf("timed out waiting for condition") - } - - return nil -} - -func (sc *StatusChecker) getObjectRefs(components []string) ([]object.ObjMetadata, error) { +func buildComponentObjectRefs(components ...string) ([]object.ObjMetadata, error) { var objRefs []object.ObjMetadata for _, deployment := range components { objMeta, err := object.CreateObjMetadata(rootArgs.namespace, deployment, schema.GroupKind{Group: "apps", Kind: "Deployment"}) @@ -161,20 +77,3 @@ func (sc *StatusChecker) getObjectRefs(components []string) ([]object.ObjMetadat } return objRefs, nil } - -func (sc *StatusChecker) objMetadataToString(om object.ObjMetadata) string { - return fmt.Sprintf("%s '%s/%s'", om.GroupKind.Kind, om.Namespace, om.Name) -} - -func (sc *StatusChecker) deploymentExists(om object.ObjMetadata) bool { - ctx, cancel := context.WithTimeout(context.Background(), sc.timeout) - defer cancel() - - namespacedName := types.NamespacedName{ - Namespace: om.Namespace, - Name: om.Name, - } - var existing appsv1.Deployment - err := sc.client.Get(ctx, namespacedName, &existing) - return err == nil -} diff --git a/pkg/log/log.go b/pkg/log/log.go index c9ed2128..2833e077 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -22,7 +22,7 @@ type Logger interface { Generatef(format string, a ...interface{}) // Waitingf logs a formatted waiting message. Waitingf(format string, a ...interface{}) - // Waitingf logs a formatted success message. + // Successf logs a formatted success message. Successf(format string, a ...interface{}) // Failuref logs a formatted failure message. Failuref(format string, a ...interface{}) diff --git a/pkg/status/status.go b/pkg/status/status.go new file mode 100644 index 00000000..1a5abc37 --- /dev/null +++ b/pkg/status/status.go @@ -0,0 +1,109 @@ +/* +Copyright 2020, 2021 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package status + +import ( + "context" + "fmt" + "strings" + "time" + + "k8s.io/client-go/rest" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/aggregator" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/collector" + "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event" + "sigs.k8s.io/cli-utils/pkg/kstatus/status" + "sigs.k8s.io/cli-utils/pkg/object" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + + "github.com/fluxcd/flux2/pkg/log" +) + +type StatusChecker struct { + pollInterval time.Duration + timeout time.Duration + client client.Client + statusPoller *polling.StatusPoller + logger log.Logger +} + +func NewStatusChecker(kubeConfig *rest.Config, pollInterval time.Duration, timeout time.Duration, log log.Logger) (*StatusChecker, error) { + restMapper, err := apiutil.NewDynamicRESTMapper(kubeConfig) + if err != nil { + return nil, err + } + c, err := client.New(kubeConfig, client.Options{Mapper: restMapper}) + if err != nil { + return nil, err + } + + return &StatusChecker{ + pollInterval: pollInterval, + timeout: timeout, + client: c, + statusPoller: polling.NewStatusPoller(c, restMapper), + logger: log, + }, nil +} + +func (sc *StatusChecker) Assess(identifiers ...object.ObjMetadata) error { + ctx, cancel := context.WithTimeout(context.Background(), sc.timeout) + defer cancel() + + opts := polling.Options{PollInterval: sc.pollInterval, UseCache: true} + eventsChan := sc.statusPoller.Poll(ctx, identifiers, opts) + + coll := collector.NewResourceStatusCollector(identifiers) + done := coll.ListenWithObserver(eventsChan, desiredStatusNotifierFunc(cancel, status.CurrentStatus)) + + <-done + + for _, rs := range coll.ResourceStatuses { + switch rs.Status { + case status.CurrentStatus: + sc.logger.Successf("%s: %s ready", rs.Identifier.Name, strings.ToLower(rs.Identifier.GroupKind.Kind)) + case status.NotFoundStatus: + sc.logger.Failuref("%s: %s not found", rs.Identifier.Name, strings.ToLower(rs.Identifier.GroupKind.Kind)) + default: + sc.logger.Failuref("%s: %s not ready", rs.Identifier.Name, strings.ToLower(rs.Identifier.GroupKind.Kind)) + } + } + + if coll.Error != nil || ctx.Err() == context.DeadlineExceeded { + return fmt.Errorf("timed out waiting for condition") + } + return nil +} + +// desiredStatusNotifierFunc returns an Observer function for the +// ResourceStatusCollector that will cancel the context (using the cancelFunc) +// when all resources have reached the desired status. +func desiredStatusNotifierFunc(cancelFunc context.CancelFunc, + desired status.Status) collector.ObserverFunc { + return func(rsc *collector.ResourceStatusCollector, _ event.Event) { + var rss []*event.ResourceStatus + for _, rs := range rsc.ResourceStatuses { + rss = append(rss, rs) + } + aggStatus := aggregator.AggregateStatus(rss, desired) + if aggStatus == desired { + cancelFunc() + } + } +}