From cbdd71e44eca6e877036ec16666b9b27d466bb66 Mon Sep 17 00:00:00 2001 From: Max Jonas Werner Date: Thu, 1 Jun 2023 15:53:05 +0200 Subject: [PATCH] Make `flux logs` more lenient UX changes: - Only print an error when a pod doesn't have a matching container instead of exiting early. - Return a non-zero status code when no pod is found at all. Details: In certain situations there might be 3rd-party pods running in the Flux namespace that cause the command to fail streaming logs, e.g. when they have multiple containers but none of them is called `manager` (which all Flux-maintained pods do). An example of such a situation is when Flux is installed with the 3rd-party Flux extension on AKS. The `logs` command is now more forgiving and merely logs an error in these situations instead of completely bailing out. It still returns a non-zero exit code. For the parallel log streaming with `-f` the code is now a little more complex so that errors are now written to stderr in parallel with all other logs written to stdout. That's what `asyncCopy` is for. refs #3944 Signed-off-by: Max Jonas Werner --- cmd/flux/logs.go | 49 ++++++++--- cmd/flux/logs_e2e_test.go | 88 ++++++++++++++++++++ cmd/flux/{logs_test.go => logs_unit_test.go} | 70 ++-------------- cmd/flux/main_test.go | 4 + 4 files changed, 138 insertions(+), 73 deletions(-) create mode 100644 cmd/flux/logs_e2e_test.go rename cmd/flux/{logs_test.go => logs_unit_test.go} (76%) diff --git a/cmd/flux/logs.go b/cmd/flux/logs.go index d8c27ff2..84d68acb 100644 --- a/cmd/flux/logs.go +++ b/cmd/flux/logs.go @@ -20,6 +20,7 @@ import ( "bufio" "context" "encoding/json" + "errors" "fmt" "io" "os" @@ -76,7 +77,7 @@ type logsFlags struct { sinceSeconds time.Duration } -var logsArgs = &logsFlags{ +var logsArgs = logsFlags{ tail: -1, } @@ -115,7 +116,7 @@ func logsCmdRun(cmd *cobra.Command, args []string) error { return fmt.Errorf("no argument required") } - pods, err := getPods(ctx, clientset, fluxSelector) + pods, err := getPods(ctx, clientset, logsArgs.fluxNamespace, fluxSelector) if err != nil { return err } @@ -163,13 +164,16 @@ func logsCmdRun(cmd *cobra.Command, args []string) error { return podLogs(ctx, requests) } -func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]corev1.Pod, error) { +// getPods searches for all Deployments in the given namespace that match the given label and returns a list of Pods +// from these Deployments. For each Deployment a single Pod is chosen (based on various factors such as the running +// state). If no Pod is found, an error is returned. +func getPods(ctx context.Context, c *kubernetes.Clientset, ns string, label string) ([]corev1.Pod, error) { var ret []corev1.Pod opts := metav1.ListOptions{ LabelSelector: label, } - deployList, err := c.AppsV1().Deployments(logsArgs.fluxNamespace).List(ctx, opts) + deployList, err := c.AppsV1().Deployments(ns).List(ctx, opts) if err != nil { return ret, err } @@ -179,7 +183,7 @@ func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]core opts := metav1.ListOptions{ LabelSelector: createLabelStringFromMap(label), } - podList, err := c.CoreV1().Pods(logsArgs.fluxNamespace).List(ctx, opts) + podList, err := c.CoreV1().Pods(ns).List(ctx, opts) if err != nil { return ret, err } @@ -196,11 +200,16 @@ func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]core } } + if len(ret) == 0 { + return nil, fmt.Errorf("no Flux pods found in namespace %q", ns) + } + return ret, nil } func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error { reader, writer := io.Pipe() + errReader, errWriter := io.Pipe() wg := &sync.WaitGroup{} wg.Add(len(requests)) @@ -208,7 +217,7 @@ func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error go func(req rest.ResponseWrapper) { defer wg.Done() if err := logRequest(ctx, req, writer); err != nil { - writer.CloseWithError(err) + fmt.Fprintf(errWriter, "failed getting logs: %s\n", err) return } }(request) @@ -217,20 +226,40 @@ func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error go func() { wg.Wait() writer.Close() + errWriter.Close() }() - _, err := io.Copy(os.Stdout, reader) - return err + stdoutErrCh := asyncCopy(os.Stdout, reader) + stderrErrCh := asyncCopy(os.Stderr, errReader) + + return errors.Join(<-stdoutErrCh, <-stderrErrCh) +} + +// asyncCopy copies all data from from dst to src asynchronously and returns a channel for reading an error value. +// This is basically an asynchronous wrapper around `io.Copy`. The returned channel is unbuffered and always is sent +// a value (either nil or the error from `io.Copy`) as soon as `io.Copy` returns. +// This function lets you copy from multiple sources into multiple destinations in parallel. +func asyncCopy(dst io.Writer, src io.Reader) <-chan error { + errCh := make(chan error) + go func(errCh chan error) { + _, err := io.Copy(dst, src) + errCh <- err + }(errCh) + + return errCh } func podLogs(ctx context.Context, requests []rest.ResponseWrapper) error { + var retErr error for _, req := range requests { if err := logRequest(ctx, req, os.Stdout); err != nil { - return err + fmt.Fprintf(os.Stderr, "failed getting logs: %s\n", err) + retErr = fmt.Errorf("failed to collect logs from all Flux pods") + continue } } - return nil + return retErr } func createLabelStringFromMap(m map[string]string) string { diff --git a/cmd/flux/logs_e2e_test.go b/cmd/flux/logs_e2e_test.go new file mode 100644 index 00000000..65a2d76e --- /dev/null +++ b/cmd/flux/logs_e2e_test.go @@ -0,0 +1,88 @@ +//go:build e2e +// +build e2e + +/* +Copyright 2021 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "testing" +) + +func TestLogsNoArgs(t *testing.T) { + cmd := cmdTestCase{ + args: "logs", + assert: assertSuccess(), + } + cmd.runTestCmd(t) +} + +func TestLogsWrongNamespace(t *testing.T) { + cmd := cmdTestCase{ + args: "logs --flux-namespace=default", + assert: assertError(`no Flux pods found in namespace "default"`), + } + cmd.runTestCmd(t) +} + +func TestLogsAllNamespaces(t *testing.T) { + cmd := cmdTestCase{ + args: "logs --all-namespaces", + assert: assertSuccess(), + } + cmd.runTestCmd(t) +} + +func TestLogsSince(t *testing.T) { + cmd := cmdTestCase{ + args: "logs --since=2m", + assert: assertSuccess(), + } + cmd.runTestCmd(t) +} + +func TestLogsSinceInvalid(t *testing.T) { + cmd := cmdTestCase{ + args: "logs --since=XXX", + assert: assertError(`invalid argument "XXX" for "--since" flag: time: invalid duration "XXX"`), + } + cmd.runTestCmd(t) +} + +func TestLogsSinceTime(t *testing.T) { + cmd := cmdTestCase{ + args: "logs --since-time=2021-08-06T14:26:25.546Z", + assert: assertSuccess(), + } + cmd.runTestCmd(t) +} + +func TestLogsSinceTimeInvalid(t *testing.T) { + cmd := cmdTestCase{ + args: "logs --since-time=XXX", + assert: assertError("XXX is not a valid (RFC3339) time"), + } + cmd.runTestCmd(t) +} + +func TestLogsSinceOnlyOneAllowed(t *testing.T) { + cmd := cmdTestCase{ + args: "logs --since=2m --since-time=2021-08-06T14:26:25.546Z", + assert: assertError("at most one of `sinceTime` or `sinceSeconds` may be specified"), + } + cmd.runTestCmd(t) +} diff --git a/cmd/flux/logs_test.go b/cmd/flux/logs_unit_test.go similarity index 76% rename from cmd/flux/logs_test.go rename to cmd/flux/logs_unit_test.go index cb07234e..336f3314 100644 --- a/cmd/flux/logs_test.go +++ b/cmd/flux/logs_unit_test.go @@ -30,73 +30,17 @@ import ( . "github.com/onsi/gomega" ) -func TestLogsNoArgs(t *testing.T) { - cmd := cmdTestCase{ - args: "logs", - assert: assertSuccess(), - } - cmd.runTestCmd(t) -} - -func TestLogsAllNamespaces(t *testing.T) { - cmd := cmdTestCase{ - args: "logs --all-namespaces", - assert: assertSuccess(), - } - cmd.runTestCmd(t) -} - -func TestLogsSince(t *testing.T) { - cmd := cmdTestCase{ - args: "logs --since=2m", - assert: assertSuccess(), - } - cmd.runTestCmd(t) -} - -func TestLogsSinceInvalid(t *testing.T) { - cmd := cmdTestCase{ - args: "logs --since=XXX", - assert: assertError(`invalid argument "XXX" for "--since" flag: time: invalid duration "XXX"`), - } - cmd.runTestCmd(t) -} - -func TestLogsSinceTime(t *testing.T) { - cmd := cmdTestCase{ - args: "logs --since-time=2021-08-06T14:26:25.546Z", - assert: assertSuccess(), - } - cmd.runTestCmd(t) -} - -func TestLogsSinceTimeInvalid(t *testing.T) { - cmd := cmdTestCase{ - args: "logs --since-time=XXX", - assert: assertError("XXX is not a valid (RFC3339) time"), - } - cmd.runTestCmd(t) -} - -func TestLogsSinceOnlyOneAllowed(t *testing.T) { - cmd := cmdTestCase{ - args: "logs --since=2m --since-time=2021-08-06T14:26:25.546Z", - assert: assertError("at most one of `sinceTime` or `sinceSeconds` may be specified"), - } - cmd.runTestCmd(t) -} - func TestLogRequest(t *testing.T) { mapper := &testResponseMapper{} tests := []struct { name string namespace string - flags *logsFlags + flags logsFlags assertFile string }{ { name: "all logs", - flags: &logsFlags{ + flags: logsFlags{ tail: -1, allNamespaces: true, }, @@ -105,14 +49,14 @@ func TestLogRequest(t *testing.T) { { name: "filter by namespace", namespace: "default", - flags: &logsFlags{ + flags: logsFlags{ tail: -1, }, assertFile: "testdata/logs/namespace.txt", }, { name: "filter by kind and namespace", - flags: &logsFlags{ + flags: logsFlags{ tail: -1, kind: "Kustomization", }, @@ -120,7 +64,7 @@ func TestLogRequest(t *testing.T) { }, { name: "filter by loglevel", - flags: &logsFlags{ + flags: logsFlags{ tail: -1, logLevel: "error", allNamespaces: true, @@ -130,7 +74,7 @@ func TestLogRequest(t *testing.T) { { name: "filter by namespace, name, loglevel and kind", namespace: "flux-system", - flags: &logsFlags{ + flags: logsFlags{ tail: -1, logLevel: "error", kind: "Kustomization", @@ -163,7 +107,7 @@ func TestLogRequest(t *testing.T) { // reset flags to default *kubeconfigArgs.Namespace = rootArgs.defaults.Namespace - logsArgs = &logsFlags{ + logsArgs = logsFlags{ tail: -1, } }) diff --git a/cmd/flux/main_test.go b/cmd/flux/main_test.go index d153d479..dd9a6c28 100644 --- a/cmd/flux/main_test.go +++ b/cmd/flux/main_test.go @@ -392,6 +392,10 @@ func resetCmdArgs() { alertProviderArgs = alertProviderFlags{} bootstrapArgs = NewBootstrapFlags() bServerArgs = bServerFlags{} + logsArgs = logsFlags{ + tail: -1, + fluxNamespace: rootArgs.defaults.Namespace, + } buildKsArgs = buildKsFlags{} checkArgs = checkFlags{} createArgs = createFlags{}