Make `flux logs` more lenient

UX changes:

- Only print an error when a pod doesn't have a matching container
  instead of exiting early.
- Return a non-zero status code when no pod is found at all.

Details:

In certain situations there might be 3rd-party pods running in the
Flux namespace that cause the command to fail streaming logs, e.g.
when they have multiple containers but none of them is called
`manager` (which all Flux-maintained pods do). An example of such a
situation is when Flux is installed with the 3rd-party Flux extension
on AKS.

The `logs` command is now more forgiving and merely logs an error in
these situations instead of completely bailing out. It still returns a
non-zero exit code.

For the parallel log streaming with `-f` the code is now a little more
complex so that errors are now written to stderr in parallel with all
other logs written to stdout. That's what `asyncCopy` is for.

refs #3944

Signed-off-by: Max Jonas Werner <mail@makk.es>
pull/3945/head
Max Jonas Werner 2 years ago
parent f01cf5e04c
commit cbdd71e44e
No known key found for this signature in database
GPG Key ID: EB525E0F02B52140

@ -20,6 +20,7 @@ import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
@ -76,7 +77,7 @@ type logsFlags struct {
sinceSeconds time.Duration
}
var logsArgs = &logsFlags{
var logsArgs = logsFlags{
tail: -1,
}
@ -115,7 +116,7 @@ func logsCmdRun(cmd *cobra.Command, args []string) error {
return fmt.Errorf("no argument required")
}
pods, err := getPods(ctx, clientset, fluxSelector)
pods, err := getPods(ctx, clientset, logsArgs.fluxNamespace, fluxSelector)
if err != nil {
return err
}
@ -163,13 +164,16 @@ func logsCmdRun(cmd *cobra.Command, args []string) error {
return podLogs(ctx, requests)
}
func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]corev1.Pod, error) {
// getPods searches for all Deployments in the given namespace that match the given label and returns a list of Pods
// from these Deployments. For each Deployment a single Pod is chosen (based on various factors such as the running
// state). If no Pod is found, an error is returned.
func getPods(ctx context.Context, c *kubernetes.Clientset, ns string, label string) ([]corev1.Pod, error) {
var ret []corev1.Pod
opts := metav1.ListOptions{
LabelSelector: label,
}
deployList, err := c.AppsV1().Deployments(logsArgs.fluxNamespace).List(ctx, opts)
deployList, err := c.AppsV1().Deployments(ns).List(ctx, opts)
if err != nil {
return ret, err
}
@ -179,7 +183,7 @@ func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]core
opts := metav1.ListOptions{
LabelSelector: createLabelStringFromMap(label),
}
podList, err := c.CoreV1().Pods(logsArgs.fluxNamespace).List(ctx, opts)
podList, err := c.CoreV1().Pods(ns).List(ctx, opts)
if err != nil {
return ret, err
}
@ -196,11 +200,16 @@ func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]core
}
}
if len(ret) == 0 {
return nil, fmt.Errorf("no Flux pods found in namespace %q", ns)
}
return ret, nil
}
func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error {
reader, writer := io.Pipe()
errReader, errWriter := io.Pipe()
wg := &sync.WaitGroup{}
wg.Add(len(requests))
@ -208,7 +217,7 @@ func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error
go func(req rest.ResponseWrapper) {
defer wg.Done()
if err := logRequest(ctx, req, writer); err != nil {
writer.CloseWithError(err)
fmt.Fprintf(errWriter, "failed getting logs: %s\n", err)
return
}
}(request)
@ -217,20 +226,40 @@ func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error
go func() {
wg.Wait()
writer.Close()
errWriter.Close()
}()
_, err := io.Copy(os.Stdout, reader)
return err
stdoutErrCh := asyncCopy(os.Stdout, reader)
stderrErrCh := asyncCopy(os.Stderr, errReader)
return errors.Join(<-stdoutErrCh, <-stderrErrCh)
}
// asyncCopy copies all data from from dst to src asynchronously and returns a channel for reading an error value.
// This is basically an asynchronous wrapper around `io.Copy`. The returned channel is unbuffered and always is sent
// a value (either nil or the error from `io.Copy`) as soon as `io.Copy` returns.
// This function lets you copy from multiple sources into multiple destinations in parallel.
func asyncCopy(dst io.Writer, src io.Reader) <-chan error {
errCh := make(chan error)
go func(errCh chan error) {
_, err := io.Copy(dst, src)
errCh <- err
}(errCh)
return errCh
}
func podLogs(ctx context.Context, requests []rest.ResponseWrapper) error {
var retErr error
for _, req := range requests {
if err := logRequest(ctx, req, os.Stdout); err != nil {
return err
fmt.Fprintf(os.Stderr, "failed getting logs: %s\n", err)
retErr = fmt.Errorf("failed to collect logs from all Flux pods")
continue
}
}
return nil
return retErr
}
func createLabelStringFromMap(m map[string]string) string {

@ -0,0 +1,88 @@
//go:build e2e
// +build e2e
/*
Copyright 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"testing"
)
func TestLogsNoArgs(t *testing.T) {
cmd := cmdTestCase{
args: "logs",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}
func TestLogsWrongNamespace(t *testing.T) {
cmd := cmdTestCase{
args: "logs --flux-namespace=default",
assert: assertError(`no Flux pods found in namespace "default"`),
}
cmd.runTestCmd(t)
}
func TestLogsAllNamespaces(t *testing.T) {
cmd := cmdTestCase{
args: "logs --all-namespaces",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}
func TestLogsSince(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since=2m",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}
func TestLogsSinceInvalid(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since=XXX",
assert: assertError(`invalid argument "XXX" for "--since" flag: time: invalid duration "XXX"`),
}
cmd.runTestCmd(t)
}
func TestLogsSinceTime(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since-time=2021-08-06T14:26:25.546Z",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}
func TestLogsSinceTimeInvalid(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since-time=XXX",
assert: assertError("XXX is not a valid (RFC3339) time"),
}
cmd.runTestCmd(t)
}
func TestLogsSinceOnlyOneAllowed(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since=2m --since-time=2021-08-06T14:26:25.546Z",
assert: assertError("at most one of `sinceTime` or `sinceSeconds` may be specified"),
}
cmd.runTestCmd(t)
}

@ -30,73 +30,17 @@ import (
. "github.com/onsi/gomega"
)
func TestLogsNoArgs(t *testing.T) {
cmd := cmdTestCase{
args: "logs",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}
func TestLogsAllNamespaces(t *testing.T) {
cmd := cmdTestCase{
args: "logs --all-namespaces",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}
func TestLogsSince(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since=2m",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}
func TestLogsSinceInvalid(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since=XXX",
assert: assertError(`invalid argument "XXX" for "--since" flag: time: invalid duration "XXX"`),
}
cmd.runTestCmd(t)
}
func TestLogsSinceTime(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since-time=2021-08-06T14:26:25.546Z",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}
func TestLogsSinceTimeInvalid(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since-time=XXX",
assert: assertError("XXX is not a valid (RFC3339) time"),
}
cmd.runTestCmd(t)
}
func TestLogsSinceOnlyOneAllowed(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since=2m --since-time=2021-08-06T14:26:25.546Z",
assert: assertError("at most one of `sinceTime` or `sinceSeconds` may be specified"),
}
cmd.runTestCmd(t)
}
func TestLogRequest(t *testing.T) {
mapper := &testResponseMapper{}
tests := []struct {
name string
namespace string
flags *logsFlags
flags logsFlags
assertFile string
}{
{
name: "all logs",
flags: &logsFlags{
flags: logsFlags{
tail: -1,
allNamespaces: true,
},
@ -105,14 +49,14 @@ func TestLogRequest(t *testing.T) {
{
name: "filter by namespace",
namespace: "default",
flags: &logsFlags{
flags: logsFlags{
tail: -1,
},
assertFile: "testdata/logs/namespace.txt",
},
{
name: "filter by kind and namespace",
flags: &logsFlags{
flags: logsFlags{
tail: -1,
kind: "Kustomization",
},
@ -120,7 +64,7 @@ func TestLogRequest(t *testing.T) {
},
{
name: "filter by loglevel",
flags: &logsFlags{
flags: logsFlags{
tail: -1,
logLevel: "error",
allNamespaces: true,
@ -130,7 +74,7 @@ func TestLogRequest(t *testing.T) {
{
name: "filter by namespace, name, loglevel and kind",
namespace: "flux-system",
flags: &logsFlags{
flags: logsFlags{
tail: -1,
logLevel: "error",
kind: "Kustomization",
@ -163,7 +107,7 @@ func TestLogRequest(t *testing.T) {
// reset flags to default
*kubeconfigArgs.Namespace = rootArgs.defaults.Namespace
logsArgs = &logsFlags{
logsArgs = logsFlags{
tail: -1,
}
})

@ -392,6 +392,10 @@ func resetCmdArgs() {
alertProviderArgs = alertProviderFlags{}
bootstrapArgs = NewBootstrapFlags()
bServerArgs = bServerFlags{}
logsArgs = logsFlags{
tail: -1,
fluxNamespace: rootArgs.defaults.Namespace,
}
buildKsArgs = buildKsFlags{}
checkArgs = checkFlags{}
createArgs = createFlags{}

Loading…
Cancel
Save