From 1e4922146352151aa30d50c7f5c829c796267b8f Mon Sep 17 00:00:00 2001 From: Jonathan Innis Date: Sun, 31 Jan 2021 22:14:08 -0800 Subject: [PATCH] Add kstatus checker to install and check cmds Signed-off-by: jonathan-innis --- cmd/flux/bootstrap.go | 22 ++++-------- cmd/flux/check.go | 15 +++++--- cmd/flux/install.go | 14 +++++--- cmd/flux/status.go | 80 ++++++++++++++++++------------------------- 4 files changed, 60 insertions(+), 71 deletions(-) diff --git a/cmd/flux/bootstrap.go b/cmd/flux/bootstrap.go index 1c92e4f6..d3650dc0 100644 --- a/cmd/flux/bootstrap.go +++ b/cmd/flux/bootstrap.go @@ -162,24 +162,14 @@ func applyInstallManifests(ctx context.Context, manifestPath string, components return fmt.Errorf("install failed") } - kubeConfig, err := utils.KubeConfig(rootArgs.kubeconfig, rootArgs.kubecontext) + statusChecker := StatusChecker{} + err := statusChecker.New(time.Second, rootArgs.timeout) if err != nil { - return fmt.Errorf("install failed") + return fmt.Errorf("install failed with: %v", err) } - timeout, err := time.ParseDuration(rootArgs.timeout.String()) + err = statusChecker.Assess(components...) if err != nil { - return fmt.Errorf("install failed") - } - - statusChecker := StatusChecker{} - if err = statusChecker.New(kubeConfig, timeout); err != nil { - return fmt.Errorf("install failed") - } - if err = statusChecker.AddChecks(components); err != nil { - return fmt.Errorf("install failed") - } - if err = statusChecker.Assess(time.Second); err != nil { - return fmt.Errorf("install failed") + return fmt.Errorf("install failed with: %v", err) } return nil @@ -305,4 +295,4 @@ func checkIfBootstrapPathDiffers(ctx context.Context, kubeClient client.Client, } return fluxSystemKustomization.Spec.Path, true -} \ No newline at end of file +} diff --git a/cmd/flux/check.go b/cmd/flux/check.go index b2037cb7..c7d7afb6 100644 --- a/cmd/flux/check.go +++ b/cmd/flux/check.go @@ -22,6 +22,7 @@ import ( "os" "os/exec" "strings" + "time" "github.com/blang/semver/v4" "github.com/fluxcd/flux2/internal/utils" @@ -172,16 +173,22 @@ func componentsCheck() bool { ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout) defer cancel() + statusChecker := StatusChecker{} + err := statusChecker.New(time.Second, rootArgs.timeout) + if err != nil { + return false + } + ok := true for _, deployment := range checkArgs.components { - kubectlArgs := []string{"-n", rootArgs.namespace, "rollout", "status", "deployment", deployment, "--timeout", rootArgs.timeout.String()} - if output, err := utils.ExecKubectlCommand(ctx, utils.ModeCapture, rootArgs.kubeconfig, rootArgs.kubecontext, kubectlArgs...); err != nil { - logger.Failuref("%s: %s", deployment, strings.TrimSuffix(output, "\n")) + err = statusChecker.Assess(deployment) + if err != nil { + logger.Failuref("%s: %s", deployment, err) ok = false } else { logger.Successf("%s is healthy", deployment) } - kubectlArgs = []string{"-n", rootArgs.namespace, "get", "deployment", deployment, "-o", "jsonpath=\"{..image}\""} + kubectlArgs := []string{"-n", rootArgs.namespace, "get", "deployment", deployment, "-o", "jsonpath=\"{..image}\""} if output, err := utils.ExecKubectlCommand(ctx, utils.ModeCapture, rootArgs.kubeconfig, rootArgs.kubecontext, kubectlArgs...); err == nil { logger.Actionf(strings.TrimPrefix(strings.TrimSuffix(output, "\""), "\"")) } diff --git a/cmd/flux/install.go b/cmd/flux/install.go index 819aab9d..4f7e7d8b 100644 --- a/cmd/flux/install.go +++ b/cmd/flux/install.go @@ -23,6 +23,7 @@ import ( "os" "path/filepath" "strings" + "time" "github.com/spf13/cobra" @@ -178,14 +179,19 @@ func installCmdRun(cmd *cobra.Command, args []string) error { logger.Successf("install completed") } + statusChecker := StatusChecker{} + err = statusChecker.New(time.Second, rootArgs.timeout) + if err != nil { + return fmt.Errorf("install failed with: %v", err) + } + logger.Waitingf("verifying installation") for _, deployment := range components { - kubectlArgs = []string{"-n", rootArgs.namespace, "rollout", "status", "deployment", deployment, "--timeout", rootArgs.timeout.String()} - if _, err := utils.ExecKubectlCommand(ctx, applyOutput, rootArgs.kubeconfig, rootArgs.kubecontext, kubectlArgs...); err != nil { + err := statusChecker.Assess(deployment) + if err != nil { return fmt.Errorf("install failed") - } else { - logger.Successf("%s ready", deployment) } + logger.Successf("%s ready", deployment) } logger.Successf("install finished") diff --git a/cmd/flux/status.go b/cmd/flux/status.go index 006d2a77..e1918a32 100644 --- a/cmd/flux/status.go +++ b/cmd/flux/status.go @@ -19,12 +19,14 @@ package main import ( "context" "fmt" + "strings" + "time" + apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/rest" "sigs.k8s.io/cli-utils/pkg/kstatus/polling" "sigs.k8s.io/cli-utils/pkg/kstatus/polling/aggregator" "sigs.k8s.io/cli-utils/pkg/kstatus/polling/collector" @@ -33,9 +35,8 @@ import ( "sigs.k8s.io/cli-utils/pkg/object" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" - "strings" - "time" + "github.com/fluxcd/flux2/internal/utils" "github.com/fluxcd/pkg/apis/meta" ) @@ -50,10 +51,10 @@ type statusable interface { } type StatusChecker struct { - client client.Client - timeout time.Duration - objRefs []object.ObjMetadata + pollInterval time.Duration + timeout time.Duration statusPoller *polling.StatusPoller + messageQueue chan string } func isReady(ctx context.Context, kubeClient client.Client, @@ -81,7 +82,11 @@ func isReady(ctx context.Context, kubeClient client.Client, } } -func (sc *StatusChecker) New(kubeConfig *rest.Config, timeout time.Duration) error { +func (sc *StatusChecker) New(pollInterval time.Duration, timeout time.Duration) error { + kubeConfig, err := utils.KubeConfig(rootArgs.kubeconfig, rootArgs.kubecontext) + if err != nil { + return err + } restMapper, err := apiutil.NewDynamicRESTMapper(kubeConfig) if err != nil { return err @@ -91,32 +96,26 @@ func (sc *StatusChecker) New(kubeConfig *rest.Config, timeout time.Duration) err return err } statusPoller := polling.NewStatusPoller(client, restMapper) - sc.client = client sc.statusPoller = statusPoller + sc.pollInterval = pollInterval sc.timeout = timeout + sc.messageQueue = make(chan string) return err } -func (sc *StatusChecker) AddChecks(components []string) error { - var componentRefs []object.ObjMetadata - for _, deployment := range components { - objMeta, err := object.CreateObjMetadata(rootArgs.namespace, deployment, schema.GroupKind{Group: "apps", Kind: "Deployment"}) - if err != nil { - return err - } - componentRefs = append(componentRefs, objMeta) - } - sc.objRefs = componentRefs - return nil -} - -func (sc *StatusChecker) Assess(pollInterval time.Duration) error { +func (sc *StatusChecker) Assess(components ...string) error { ctx, cancel := context.WithTimeout(context.Background(), sc.timeout) defer cancel() - opts := polling.Options{PollInterval: pollInterval, UseCache: true} - eventsChan := sc.statusPoller.Poll(ctx, sc.objRefs, opts) - coll := collector.NewResourceStatusCollector(sc.objRefs) + objRefs, err := sc.getObjectRefs(components) + if err != nil { + return err + } + + opts := polling.Options{PollInterval: sc.pollInterval, UseCache: true} + eventsChan := sc.statusPoller.Poll(ctx, objRefs, opts) + + coll := collector.NewResourceStatusCollector(objRefs) done := coll.ListenWithObserver(eventsChan, collector.ObserverFunc( func(statusCollector *collector.ResourceStatusCollector, e event.Event) { var rss []*event.ResourceStatus @@ -133,7 +132,6 @@ func (sc *StatusChecker) Assess(pollInterval time.Duration) error { ) <-done - if coll.Error != nil { return coll.Error } @@ -146,35 +144,23 @@ func (sc *StatusChecker) Assess(pollInterval time.Duration) error { ids = append(ids, id) } } - return fmt.Errorf("Health check timed out for [%v]", strings.Join(ids, ", ")) + return fmt.Errorf("Status check timed out for component(s): [%v]", strings.Join(ids, ", ")) } return nil } -func (sc *StatusChecker) toObjMetadata(cr []meta.NamespacedObjectKindReference) ([]object.ObjMetadata, error) { - oo := []object.ObjMetadata{} - for _, c := range cr { - // For backwards compatibility - if c.APIVersion == "" { - c.APIVersion = "apps/v1" - } - - gv, err := schema.ParseGroupVersion(c.APIVersion) - if err != nil { - return []object.ObjMetadata{}, err - } - - gk := schema.GroupKind{Group: gv.Group, Kind: c.Kind} - o, err := object.CreateObjMetadata(c.Namespace, c.Name, gk) +func (sc *StatusChecker) getObjectRefs(components []string) ([]object.ObjMetadata, error) { + var objRefs []object.ObjMetadata + for _, deployment := range components { + objMeta, err := object.CreateObjMetadata(rootArgs.namespace, deployment, schema.GroupKind{Group: "apps", Kind: "Deployment"}) if err != nil { - return []object.ObjMetadata{}, err + return nil, err } - - oo = append(oo, o) + objRefs = append(objRefs, objMeta) } - return oo, nil + return objRefs, nil } func (sc *StatusChecker) objMetadataToString(om object.ObjMetadata) string { return fmt.Sprintf("%s '%s/%s'", om.GroupKind.Kind, om.Namespace, om.Name) -} \ No newline at end of file +}