h3nryc0ding 1 week ago committed by GitHub
commit b963abc105
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -18,8 +18,8 @@ package main
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"os"
"time" "time"
"github.com/Masterminds/semver/v3" "github.com/Masterminds/semver/v3"
@ -59,6 +59,28 @@ type checkFlags struct {
pollInterval time.Duration pollInterval time.Duration
} }
type checkResult struct {
Title string
Entries []checkEntry
}
type checkEntry struct {
Name string
Failed bool
}
func (cr *checkResult) Failed() bool {
if cr == nil {
return false
}
for _, entry := range cr.Entries {
if entry.Failed {
return true
}
}
return false
}
var kubernetesConstraints = []string{ var kubernetesConstraints = []string{
">=1.30.0-0", ">=1.30.0-0",
} }
@ -77,187 +99,298 @@ func init() {
rootCmd.AddCommand(checkCmd) rootCmd.AddCommand(checkCmd)
} }
func runCheckCmd(cmd *cobra.Command, args []string) error { func runCheckCmd(_ *cobra.Command, _ []string) error {
logger.Actionf("checking prerequisites")
checkFailed := false
fluxCheck()
ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout) ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout)
defer cancel() defer cancel()
cfg, err := utils.KubeConfig(kubeconfigArgs, kubeclientOptions) cfg, err := utils.KubeConfig(kubeconfigArgs, kubeclientOptions)
if err != nil { if err != nil {
return fmt.Errorf("Kubernetes client initialization failed: %s", err.Error()) return fmt.Errorf("kubernetes client initialization failed: %w", err)
} }
kubeClient, err := client.New(cfg, client.Options{Scheme: utils.NewScheme()}) kubeClient, err := client.New(cfg, client.Options{Scheme: utils.NewScheme()})
if err != nil { if err != nil {
return err return fmt.Errorf("error creating kubernetes client: %w", err)
} }
if !kubernetesCheck(cfg, kubernetesConstraints) { if !runPreChecks(ctx, cfg) {
checkFailed = true return errors.New("pre-installation checks failed")
} }
if checkArgs.pre { if checkArgs.pre {
if checkFailed { logger.Actionf("All pre-installation checks passed")
os.Exit(1) return nil
}
if !runChecks(ctx, kubeClient) {
return errors.New("checks failed")
} }
logger.Successf("prerequisites checks passed")
logger.Actionf("All checks passed")
return nil return nil
} }
logger.Actionf("checking version in cluster") func runPreChecks(_ context.Context, kConfig *rest.Config) bool {
if !fluxClusterVersionCheck(ctx, kubeClient) { checks := []func() checkResult{
checkFailed = true fluxCheck,
func() checkResult {
return kubernetesCheck(kConfig, kubernetesConstraints)
},
} }
logger.Actionf("checking controllers") return runGenericChecks(checks)
if !componentsCheck(ctx, kubeClient) {
checkFailed = true
} }
logger.Actionf("checking crds") func runChecks(ctx context.Context, kClient client.Client) bool {
if !crdsCheck(ctx, kubeClient) { checks := []func() checkResult{
checkFailed = true func() checkResult { return fluxClusterVersionCheck(ctx, kClient) },
func() checkResult { return controllersCheck(ctx, kClient) },
func() checkResult { return crdsCheck(ctx, kClient) },
} }
if checkFailed { return runGenericChecks(checks)
logger.Failuref("check failed")
os.Exit(1)
} }
logger.Successf("all checks passed") func runGenericChecks(checks []func() checkResult) bool {
return nil success := true
for _, check := range checks {
result := check()
logCheckResult(result)
if result.Failed() {
success = false
}
}
return success
}
func logCheckResult(res checkResult) {
logger.Actionf("Checking %s", res.Title)
for _, entry := range res.Entries {
if entry.Failed {
logger.Failuref("%s", entry.Name)
} else {
logger.Successf("%s", entry.Name)
}
} }
}
func fluxCheck() checkResult {
res := checkResult{Title: "flux pre-installation"}
func fluxCheck() {
curSv, err := version.ParseVersion(VERSION) curSv, err := version.ParseVersion(VERSION)
if err != nil { if err != nil {
return res.Entries = append(res.Entries, checkEntry{
Name: fmt.Sprintf("error parsing current version: %s", err.Error()),
Failed: true,
})
return res
} }
// Exclude development builds. // Exclude development builds.
if curSv.Prerelease() != "" { if curSv.Prerelease() != "" {
return res.Entries = append(res.Entries, checkEntry{
Name: fmt.Sprintf("flux %s is a development build", curSv),
})
return res
} }
latest, err := install.GetLatestVersion() latest, err := install.GetLatestVersion()
if err != nil { if err != nil {
return res.Entries = append(res.Entries, checkEntry{
Name: fmt.Sprintf("error getting latest version: %s", err.Error()),
Failed: true,
})
return res
} }
latestSv, err := version.ParseVersion(latest) latestSv, err := version.ParseVersion(latest)
if err != nil { if err != nil {
return res.Entries = append(res.Entries, checkEntry{
Name: fmt.Sprintf("error parsing latest version: %s", err.Error()),
Failed: true,
})
return res
} }
if latestSv.GreaterThan(curSv) { if latestSv.GreaterThan(curSv) {
logger.Failuref("flux %s <%s (new CLI version is available, please upgrade)", curSv, latestSv) res.Entries = append(res.Entries, checkEntry{
Name: fmt.Sprintf("flux %s <%s (new CLI version is available, please upgrade)", curSv, latestSv),
Failed: true,
})
} else {
res.Entries = append(res.Entries, checkEntry{
Name: fmt.Sprintf("flux %s >=%s (latest CLI version)", curSv, latestSv),
})
} }
return res
} }
func kubernetesCheck(cfg *rest.Config, constraints []string) bool { func kubernetesCheck(cfg *rest.Config, constraints []string) checkResult {
res := checkResult{Title: "kubernetes pre-installation"}
clientSet, err := kubernetes.NewForConfig(cfg) clientSet, err := kubernetes.NewForConfig(cfg)
if err != nil { if err != nil {
logger.Failuref("Kubernetes client initialization failed: %s", err.Error()) res.Entries = append(res.Entries, checkEntry{
return false Name: fmt.Sprintf("error creating kubernetes client: %s", err.Error()),
Failed: true,
})
return res
} }
kv, err := clientSet.Discovery().ServerVersion() kv, err := clientSet.Discovery().ServerVersion()
if err != nil { if err != nil {
logger.Failuref("Kubernetes API call failed: %s", err.Error()) res.Entries = append(res.Entries, checkEntry{
return false Name: fmt.Sprintf("error getting kubernetes version: %s", err.Error()),
Failed: true,
})
return res
} }
v, err := version.ParseVersion(kv.String()) v, err := version.ParseVersion(kv.String())
if err != nil { if err != nil {
logger.Failuref("Kubernetes version can't be determined") res.Entries = append(res.Entries, checkEntry{
return false Name: fmt.Sprintf("error parsing kubernetes version: %s", err.Error()),
Failed: true,
})
return res
} }
var valid bool
var vrange string
for _, constraint := range constraints { for _, constraint := range constraints {
c, _ := semver.NewConstraint(constraint) c, _ := semver.NewConstraint(constraint)
if c.Check(v) { entry := checkEntry{
valid = true Name: fmt.Sprintf("kubernetes %s%s", v.String(), constraint),
vrange = constraint
break
} }
if !c.Check(v) {
entry.Failed = true
} }
res.Entries = append(res.Entries, entry)
if !valid {
logger.Failuref("Kubernetes version %s does not match %s", v.Original(), constraints[0])
return false
} }
logger.Successf("Kubernetes %s %s", v.String(), vrange) return res
return true
} }
func componentsCheck(ctx context.Context, kubeClient client.Client) bool { func controllersCheck(ctx context.Context, kubeClient client.Client) checkResult {
res := checkResult{Title: "flux controllers"}
statusChecker, err := status.NewStatusCheckerWithClient(kubeClient, checkArgs.pollInterval, rootArgs.timeout, logger) statusChecker, err := status.NewStatusCheckerWithClient(kubeClient, checkArgs.pollInterval, rootArgs.timeout, logger)
if err != nil { if err != nil {
return false res.Entries = append(res.Entries, checkEntry{
Name: fmt.Sprintf("error creating status checker: %s", err.Error()),
Failed: true,
})
return res
} }
ok := true
selector := client.MatchingLabels{manifestgen.PartOfLabelKey: manifestgen.PartOfLabelValue} selector := client.MatchingLabels{manifestgen.PartOfLabelKey: manifestgen.PartOfLabelValue}
var list v1.DeploymentList var list v1.DeploymentList
ns := *kubeconfigArgs.Namespace ns := *kubeconfigArgs.Namespace
if err := kubeClient.List(ctx, &list, client.InNamespace(ns), selector); err == nil {
if err := kubeClient.List(ctx, &list, client.InNamespace(ns), selector); err != nil {
res.Entries = append(res.Entries, checkEntry{
Name: fmt.Sprintf("error listing deployments: %s", err.Error()),
Failed: true,
})
return res
}
if len(list.Items) == 0 { if len(list.Items) == 0 {
logger.Failuref("no controllers found in the '%s' namespace with the label selector '%s=%s'", res.Entries = append(res.Entries, checkEntry{
ns, manifestgen.PartOfLabelKey, manifestgen.PartOfLabelValue) Name: fmt.Sprintf("no controllers found in the '%s' namespace with the label selector '%s=%s'",
return false ns, manifestgen.PartOfLabelKey, manifestgen.PartOfLabelValue),
Failed: true,
})
return res
} }
for _, d := range list.Items { for _, d := range list.Items {
if ref, err := buildComponentObjectRefs(d.Name); err == nil { ref, err := buildComponentObjectRefs(d.Name)
if err := statusChecker.Assess(ref...); err != nil { if err != nil {
ok = false res.Entries = append(res.Entries, checkEntry{
} Name: fmt.Sprintf("error building component object refs for %s: %s", d.Name, err.Error()),
Failed: true,
})
continue
} }
if err := statusChecker.Assess(ref...); err != nil {
res.Entries = append(res.Entries, checkEntry{
Name: fmt.Sprintf("error checking status of %s: %s", d.Name, err.Error()),
Failed: true,
})
} else {
for _, c := range d.Spec.Template.Spec.Containers { for _, c := range d.Spec.Template.Spec.Containers {
logger.Actionf(c.Image) res.Entries = append(res.Entries, checkEntry{
Name: fmt.Sprintf(c.Image),
})
} }
} }
} }
return ok
return res
} }
func crdsCheck(ctx context.Context, kubeClient client.Client) bool { func crdsCheck(ctx context.Context, kubeClient client.Client) checkResult {
ok := true res := checkResult{Title: "flux crds"}
selector := client.MatchingLabels{manifestgen.PartOfLabelKey: manifestgen.PartOfLabelValue} selector := client.MatchingLabels{manifestgen.PartOfLabelKey: manifestgen.PartOfLabelValue}
var list apiextensionsv1.CustomResourceDefinitionList var list apiextensionsv1.CustomResourceDefinitionList
if err := kubeClient.List(ctx, &list, client.InNamespace(*kubeconfigArgs.Namespace), selector); err == nil {
if err := kubeClient.List(ctx, &list, client.InNamespace(*kubeconfigArgs.Namespace), selector); err != nil {
res.Entries = append(res.Entries, checkEntry{
Name: fmt.Sprintf("error listing CRDs: %s", err.Error()),
Failed: true,
})
return res
}
if len(list.Items) == 0 { if len(list.Items) == 0 {
logger.Failuref("no crds found with the label selector '%s=%s'", res.Entries = append(res.Entries, checkEntry{
manifestgen.PartOfLabelKey, manifestgen.PartOfLabelValue) Name: fmt.Sprintf("no crds found with the label selector '%s=%s'",
return false manifestgen.PartOfLabelKey, manifestgen.PartOfLabelValue),
Failed: true,
})
return res
} }
for _, crd := range list.Items { for _, crd := range list.Items {
versions := crd.Status.StoredVersions versions := crd.Status.StoredVersions
if len(versions) > 0 { if len(versions) > 0 {
logger.Successf(crd.Name + "/" + versions[len(versions)-1]) res.Entries = append(res.Entries, checkEntry{
Name: fmt.Sprintf("%s/%s", crd.Name, versions[len(versions)-1]),
})
} else { } else {
ok = false res.Entries = append(res.Entries, checkEntry{
logger.Failuref("no stored versions for %s", crd.Name) Name: fmt.Sprintf("no stored versions for %s", crd.Name),
} Failed: true,
})
} }
} }
return ok
return res
} }
func fluxClusterVersionCheck(ctx context.Context, kubeClient client.Client) bool { func fluxClusterVersionCheck(ctx context.Context, kubeClient client.Client) checkResult {
res := checkResult{Title: "flux installation status"}
clusterInfo, err := getFluxClusterInfo(ctx, kubeClient) clusterInfo, err := getFluxClusterInfo(ctx, kubeClient)
if err != nil { if err != nil {
logger.Failuref("checking failed: %s", err.Error()) res.Entries = append(res.Entries, checkEntry{
return false Name: fmt.Sprintf("error getting cluster info: %s", err.Error()),
Failed: true,
})
return res
} }
if clusterInfo.distribution() != "" { if clusterInfo.distribution() != "" {
logger.Successf("distribution: %s", clusterInfo.distribution()) res.Entries = append(res.Entries, checkEntry{
Name: fmt.Sprintf("distribution: %s", clusterInfo.distribution()),
})
} }
logger.Successf("bootstrapped: %t", clusterInfo.bootstrapped)
return true res.Entries = append(res.Entries, checkEntry{
Name: fmt.Sprintf("bootstrapped: %t", clusterInfo.bootstrapped),
})
return res
} }

@ -277,7 +277,7 @@ func installCmdRun(cmd *cobra.Command, args []string) error {
} }
logger.Waitingf("verifying installation") logger.Waitingf("verifying installation")
if err := statusChecker.Assess(componentRefs...); err != nil { if err := statusChecker.Assess(componentRefs...); err != nil {
return fmt.Errorf("install failed") return fmt.Errorf("install failed: %w", err)
} }
logger.Successf("install finished") logger.Successf("install finished")

@ -1,3 +1,5 @@
► checking prerequisites ► Checking flux pre-installation
✔ Kubernetes {{ .serverVersion }} >=1.30.0-0 ✔ flux 0.0.0-dev.0 is a development build
✔ prerequisites checks passed ► Checking kubernetes pre-installation
✔ kubernetes {{ .serverVersion }}>=1.30.0-0
► All pre-installation checks passed

@ -18,6 +18,7 @@ package status
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"sort" "sort"
"strings" "strings"
@ -85,22 +86,20 @@ func (sc *StatusChecker) Assess(identifiers ...object.ObjMetadata) error {
sort.SliceStable(identifiers, func(i, j int) bool { sort.SliceStable(identifiers, func(i, j int) bool {
return strings.Compare(identifiers[i].Name, identifiers[j].Name) < 0 return strings.Compare(identifiers[i].Name, identifiers[j].Name) < 0
}) })
var errs []error
for _, id := range identifiers { for _, id := range identifiers {
rs := coll.ResourceStatuses[id] rs := coll.ResourceStatuses[id]
switch rs.Status { if rs.Status == status.NotFoundStatus {
case status.CurrentStatus: errs = append(errs, fmt.Errorf("%s: %s not found", rs.Identifier.Name, strings.ToLower(rs.Identifier.GroupKind.Kind)))
sc.logger.Successf("%s: %s ready", rs.Identifier.Name, strings.ToLower(rs.Identifier.GroupKind.Kind)) } else if rs.Status != status.CurrentStatus {
case status.NotFoundStatus: errs = append(errs, fmt.Errorf("%s: %s not ready", rs.Identifier.Name, strings.ToLower(rs.Identifier.GroupKind.Kind)))
sc.logger.Failuref("%s: %s not found", rs.Identifier.Name, strings.ToLower(rs.Identifier.GroupKind.Kind))
default:
sc.logger.Failuref("%s: %s not ready", rs.Identifier.Name, strings.ToLower(rs.Identifier.GroupKind.Kind))
} }
} }
if coll.Error != nil || ctx.Err() == context.DeadlineExceeded { if coll.Error != nil || errors.Is(ctx.Err(), context.DeadlineExceeded) {
return fmt.Errorf("timed out waiting for all resources to be ready") errs = append(errs, fmt.Errorf("timeout waiting for resources to be ready"))
} }
return nil return errors.Join(errs...)
} }
// desiredStatusNotifierFunc returns an Observer function for the // desiredStatusNotifierFunc returns an Observer function for the

Loading…
Cancel
Save