From b3d7730e7932ba7764ba7ae3f41783c2b47b22f4 Mon Sep 17 00:00:00 2001 From: jonathan-innis Date: Mon, 1 Feb 2021 10:54:15 -0800 Subject: [PATCH] Use status polling in bootstrap command Signed-off-by: jonathan-innis --- cmd/flux/bootstrap.go | 24 ++++++++-- cmd/flux/status.go | 105 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 124 insertions(+), 5 deletions(-) diff --git a/cmd/flux/bootstrap.go b/cmd/flux/bootstrap.go index e2683546..a2d0d7e7 100644 --- a/cmd/flux/bootstrap.go +++ b/cmd/flux/bootstrap.go @@ -162,12 +162,26 @@ func applyInstallManifests(ctx context.Context, manifestPath string, components return fmt.Errorf("install failed") } - for _, deployment := range components { - kubectlArgs = []string{"-n", rootArgs.namespace, "rollout", "status", "deployment", deployment, "--timeout", rootArgs.timeout.String()} - if _, err := utils.ExecKubectlCommand(ctx, utils.ModeOS, rootArgs.kubeconfig, rootArgs.kubecontext, kubectlArgs...); err != nil { - return fmt.Errorf("install failed") - } + kubeConfig, err := utils.KubeConfig(rootArgs.kubeconfig, rootArgs.kubecontext) + if err != nil { + return fmt.Errorf("install failed") + } + timeout, err := time.ParseDuration(rootArgs.timeout.String()) + if err != nil { + return fmt.Errorf("install failed") + } + + statusChecker := StatusChecker{} + if err = statusChecker.New(kubeConfig, timeout); err != nil { + return fmt.Errorf("install failed") + } + if err = statusChecker.AddChecks(components); err != nil { + return fmt.Errorf("install failed") } + if err = statusChecker.Assess(time.Second); err != nil { + return fmt.Errorf("install failed") + } + return nil } diff --git a/cmd/flux/status.go b/cmd/flux/status.go index 1e421e3e..8a8db105 100644 --- a/cmd/flux/status.go +++ b/cmd/flux/status.go @@ -39,6 +39,13 @@ type statusable interface { GetStatusConditions() *[]metav1.Condition } +type StatusChecker struct { + client client.Client + timeout time.Duration + objRefs []object.ObjMetadata + statusPoller *polling.StatusPoller +} + func isReady(ctx context.Context, kubeClient client.Client, namespacedName types.NamespacedName, object statusable) wait.ConditionFunc { return func() (bool, error) { @@ -63,3 +70,101 @@ func isReady(ctx context.Context, kubeClient client.Client, return false, nil } } + +func (sc *StatusChecker) New(kubeConfig *rest.Config, timeout time.Duration) error { + restMapper, err := apiutil.NewDynamicRESTMapper(kubeConfig) + if err != nil { + return err + } + client, err := client.New(kubeConfig, client.Options{Mapper: restMapper}) + if err != nil { + return err + } + statusPoller := polling.NewStatusPoller(client, restMapper) + sc.client = client + sc.statusPoller = statusPoller + sc.timeout = timeout + return err +} + +func (sc *StatusChecker) AddChecks(components []string) error { + var componentRefs []object.ObjMetadata + for _, deployment := range components { + objMeta, err := object.CreateObjMetadata(rootArgs.namespace, deployment, schema.GroupKind{Group: "apps", Kind: "Deployment"}) + if err != nil { + return err + } + componentRefs = append(componentRefs, objMeta) + } + sc.objRefs = componentRefs + return nil +} + +func (sc *StatusChecker) Assess(pollInterval time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), sc.timeout) + defer cancel() + + opts := polling.Options{PollInterval: pollInterval, UseCache: true} + eventsChan := sc.statusPoller.Poll(ctx, sc.objRefs, opts) + coll := collector.NewResourceStatusCollector(sc.objRefs) + done := coll.ListenWithObserver(eventsChan, collector.ObserverFunc( + func(statusCollector *collector.ResourceStatusCollector, e event.Event) { + var rss []*event.ResourceStatus + for _, rs := range statusCollector.ResourceStatuses { + rss = append(rss, rs) + } + desired := status.CurrentStatus + aggStatus := aggregator.AggregateStatus(rss, desired) + if aggStatus == desired { + cancel() + return + } + }), + ) + <-done + + + if coll.Error != nil { + return coll.Error + } + + if ctx.Err() == context.DeadlineExceeded { + ids := []string{} + for _, rs := range coll.ResourceStatuses { + if rs.Status != status.CurrentStatus { + id := sc.objMetadataToString(rs.Identifier) + ids = append(ids, id) + } + } + return fmt.Errorf("Health check timed out for [%v]", strings.Join(ids, ", ")) + } + return nil +} + +func (sc *StatusChecker) toObjMetadata(cr []meta.NamespacedObjectKindReference) ([]object.ObjMetadata, error) { + oo := []object.ObjMetadata{} + for _, c := range cr { + // For backwards compatibility + if c.APIVersion == "" { + c.APIVersion = "apps/v1" + } + + gv, err := schema.ParseGroupVersion(c.APIVersion) + if err != nil { + return []object.ObjMetadata{}, err + } + + gk := schema.GroupKind{Group: gv.Group, Kind: c.Kind} + o, err := object.CreateObjMetadata(c.Namespace, c.Name, gk) + if err != nil { + return []object.ObjMetadata{}, err + } + + oo = append(oo, o) + } + return oo, nil +} + +func (sc *StatusChecker) objMetadataToString(om object.ObjMetadata) string { + return fmt.Sprintf("%s '%s/%s'", om.GroupKind.Kind, om.Namespace, om.Name) +}