mirror of https://github.com/fluxcd/flux2.git
				
				
				
			
			You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			331 lines
		
	
	
		
			9.9 KiB
		
	
	
	
		
			Go
		
	
			
		
		
	
	
			331 lines
		
	
	
		
			9.9 KiB
		
	
	
	
		
			Go
		
	
| /*
 | |
| Copyright 2021 The Flux authors
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package main
 | |
| 
 | |
| import (
 | |
| 	"bufio"
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"os"
 | |
| 	"sort"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"text/template"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/spf13/cobra"
 | |
| 	corev1 "k8s.io/api/core/v1"
 | |
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | |
| 	"k8s.io/client-go/kubernetes"
 | |
| 	"k8s.io/client-go/rest"
 | |
| 	"k8s.io/kubectl/pkg/util"
 | |
| 	"k8s.io/kubectl/pkg/util/podutils"
 | |
| 
 | |
| 	"github.com/fluxcd/flux2/v2/internal/flags"
 | |
| 	"github.com/fluxcd/flux2/v2/internal/utils"
 | |
| 	"github.com/fluxcd/flux2/v2/pkg/manifestgen"
 | |
| )
 | |
| 
 | |
| var logsCmd = &cobra.Command{
 | |
| 	Use:   "logs",
 | |
| 	Short: "Display formatted logs for Flux components",
 | |
| 	Long:  withPreviewNote("The logs command displays formatted logs from various Flux components."),
 | |
| 	Example: `  # Print the reconciliation logs of all Flux custom resources in your cluster
 | |
|   flux logs --all-namespaces
 | |
|   
 | |
|   # Print all logs of all Flux custom resources newer than 2 minutes
 | |
|   flux logs --all-namespaces --since=2m
 | |
| 
 | |
|   # Stream logs for a particular log level
 | |
|   flux logs --follow --level=error --all-namespaces
 | |
| 
 | |
|   # Filter logs by kind, name and namespace
 | |
|   flux logs --kind=Kustomization --name=podinfo --namespace=default
 | |
| 
 | |
|   # Print logs when Flux is installed in a different namespace than flux-system
 | |
|   flux logs --flux-namespace=my-namespace
 | |
|     `,
 | |
| 	RunE: logsCmdRun,
 | |
| }
 | |
| 
 | |
| type logsFlags struct {
 | |
| 	logLevel      flags.LogLevel
 | |
| 	follow        bool
 | |
| 	tail          int64
 | |
| 	kind          string
 | |
| 	name          string
 | |
| 	fluxNamespace string
 | |
| 	allNamespaces bool
 | |
| 	sinceTime     string
 | |
| 	sinceDuration time.Duration
 | |
| }
 | |
| 
 | |
| var logsArgs = logsFlags{
 | |
| 	tail: -1,
 | |
| }
 | |
| 
 | |
| const controllerContainer = "manager"
 | |
| 
 | |
| func init() {
 | |
| 	logsCmd.Flags().Var(&logsArgs.logLevel, "level", logsArgs.logLevel.Description())
 | |
| 	logsCmd.Flags().StringVarP(&logsArgs.kind, "kind", "", logsArgs.kind, "displays errors of a particular toolkit kind e.g GitRepository")
 | |
| 	logsCmd.Flags().StringVarP(&logsArgs.name, "name", "", logsArgs.name, "specifies the name of the object logs to be displayed")
 | |
| 	logsCmd.Flags().BoolVarP(&logsArgs.follow, "follow", "f", logsArgs.follow, "specifies if the logs should be streamed")
 | |
| 	logsCmd.Flags().Int64VarP(&logsArgs.tail, "tail", "", logsArgs.tail, "lines of recent log file to display")
 | |
| 	logsCmd.Flags().StringVarP(&logsArgs.fluxNamespace, "flux-namespace", "", rootArgs.defaults.Namespace, "the namespace where the Flux components are running")
 | |
| 	logsCmd.Flags().BoolVarP(&logsArgs.allNamespaces, "all-namespaces", "A", false, "displays logs for objects across all namespaces")
 | |
| 	logsCmd.Flags().DurationVar(&logsArgs.sinceDuration, "since", logsArgs.sinceDuration, "Only return logs newer than a relative duration like 5s, 2m, or 3h. Defaults to all logs. Only one of since-time / since may be used.")
 | |
| 	logsCmd.Flags().StringVar(&logsArgs.sinceTime, "since-time", logsArgs.sinceTime, "Only return logs after a specific date (RFC3339). Defaults to all logs. Only one of since-time / since may be used.")
 | |
| 	rootCmd.AddCommand(logsCmd)
 | |
| }
 | |
| 
 | |
| func logsCmdRun(cmd *cobra.Command, args []string) error {
 | |
| 	fluxSelector := fmt.Sprintf("%s=%s", manifestgen.PartOfLabelKey, manifestgen.PartOfLabelValue)
 | |
| 
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout)
 | |
| 	defer cancel()
 | |
| 
 | |
| 	cfg, err := utils.KubeConfig(kubeconfigArgs, kubeclientOptions)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	clientset, err := kubernetes.NewForConfig(cfg)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	if len(args) > 0 {
 | |
| 		return fmt.Errorf("no argument required")
 | |
| 	}
 | |
| 
 | |
| 	pods, err := getPods(ctx, clientset, logsArgs.fluxNamespace, fluxSelector)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	logOpts := &corev1.PodLogOptions{
 | |
| 		Follow: logsArgs.follow,
 | |
| 	}
 | |
| 
 | |
| 	if logsArgs.tail > -1 {
 | |
| 		logOpts.TailLines = &logsArgs.tail
 | |
| 	}
 | |
| 
 | |
| 	if len(logsArgs.sinceTime) > 0 && logsArgs.sinceDuration != 0 {
 | |
| 		return fmt.Errorf("at most one of `sinceTime` or `sinceDuration` may be specified")
 | |
| 	}
 | |
| 
 | |
| 	if len(logsArgs.sinceTime) > 0 {
 | |
| 		t, err := util.ParseRFC3339(logsArgs.sinceTime, metav1.Now)
 | |
| 		if err != nil {
 | |
| 			return fmt.Errorf("%s is not a valid (RFC3339) time", logsArgs.sinceTime)
 | |
| 		}
 | |
| 		logOpts.SinceTime = &t
 | |
| 	}
 | |
| 
 | |
| 	if logsArgs.sinceDuration != 0 {
 | |
| 		// round up to the nearest second
 | |
| 		sec := int64(logsArgs.sinceDuration.Round(time.Second).Seconds())
 | |
| 		logOpts.SinceSeconds = &sec
 | |
| 	}
 | |
| 
 | |
| 	var requests []rest.ResponseWrapper
 | |
| 	for _, pod := range pods {
 | |
| 		logOpts := logOpts.DeepCopy()
 | |
| 		if len(pod.Spec.Containers) > 1 {
 | |
| 			logOpts.Container = controllerContainer
 | |
| 		}
 | |
| 		req := clientset.CoreV1().Pods(logsArgs.fluxNamespace).GetLogs(pod.Name, logOpts)
 | |
| 		requests = append(requests, req)
 | |
| 	}
 | |
| 
 | |
| 	if logsArgs.follow && len(requests) > 1 {
 | |
| 		return parallelPodLogs(ctx, requests)
 | |
| 	}
 | |
| 
 | |
| 	return podLogs(ctx, requests)
 | |
| }
 | |
| 
 | |
| // getPods searches for all Deployments in the given namespace that match the given label and returns a list of Pods
 | |
| // from these Deployments. For each Deployment a single Pod is chosen (based on various factors such as the running
 | |
| // state). If no Pod is found, an error is returned.
 | |
| func getPods(ctx context.Context, c *kubernetes.Clientset, ns string, label string) ([]corev1.Pod, error) {
 | |
| 	var ret []corev1.Pod
 | |
| 
 | |
| 	opts := metav1.ListOptions{
 | |
| 		LabelSelector: label,
 | |
| 	}
 | |
| 	deployList, err := c.AppsV1().Deployments(ns).List(ctx, opts)
 | |
| 	if err != nil {
 | |
| 		return ret, err
 | |
| 	}
 | |
| 
 | |
| 	for _, deploy := range deployList.Items {
 | |
| 		label := deploy.Spec.Template.Labels
 | |
| 		opts := metav1.ListOptions{
 | |
| 			LabelSelector: createLabelStringFromMap(label),
 | |
| 		}
 | |
| 		podList, err := c.CoreV1().Pods(ns).List(ctx, opts)
 | |
| 		if err != nil {
 | |
| 			return ret, err
 | |
| 		}
 | |
| 		pods := []*corev1.Pod{}
 | |
| 		for i := range podList.Items {
 | |
| 			pod := podList.Items[i]
 | |
| 			pods = append(pods, &pod)
 | |
| 		}
 | |
| 
 | |
| 		if len(pods) > 0 {
 | |
| 			// sort pods to prioritize running pods over others
 | |
| 			sort.Sort(podutils.ByLogging(pods))
 | |
| 			ret = append(ret, *pods[0])
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if len(ret) == 0 {
 | |
| 		return nil, fmt.Errorf("no Flux pods found in namespace %q", ns)
 | |
| 	}
 | |
| 
 | |
| 	return ret, nil
 | |
| }
 | |
| 
 | |
| func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error {
 | |
| 	reader, writer := io.Pipe()
 | |
| 	errReader, errWriter := io.Pipe()
 | |
| 	wg := &sync.WaitGroup{}
 | |
| 	wg.Add(len(requests))
 | |
| 
 | |
| 	for _, request := range requests {
 | |
| 		go func(req rest.ResponseWrapper) {
 | |
| 			defer wg.Done()
 | |
| 			if err := logRequest(ctx, req, writer); err != nil {
 | |
| 				fmt.Fprintf(errWriter, "failed getting logs: %s\n", err)
 | |
| 				return
 | |
| 			}
 | |
| 		}(request)
 | |
| 	}
 | |
| 
 | |
| 	go func() {
 | |
| 		wg.Wait()
 | |
| 		writer.Close()
 | |
| 		errWriter.Close()
 | |
| 	}()
 | |
| 
 | |
| 	stdoutErrCh := asyncCopy(os.Stdout, reader)
 | |
| 	stderrErrCh := asyncCopy(os.Stderr, errReader)
 | |
| 
 | |
| 	return errors.Join(<-stdoutErrCh, <-stderrErrCh)
 | |
| }
 | |
| 
 | |
| // asyncCopy copies all data from dst to src asynchronously and returns a channel for reading an error value.
 | |
| // This is basically an asynchronous wrapper around `io.Copy`. The returned channel is unbuffered and always is sent
 | |
| // a value (either nil or the error from `io.Copy`) as soon as `io.Copy` returns.
 | |
| // This function lets you copy from multiple sources into multiple destinations in parallel.
 | |
| func asyncCopy(dst io.Writer, src io.Reader) <-chan error {
 | |
| 	errCh := make(chan error)
 | |
| 	go func(errCh chan error) {
 | |
| 		_, err := io.Copy(dst, src)
 | |
| 		errCh <- err
 | |
| 	}(errCh)
 | |
| 
 | |
| 	return errCh
 | |
| }
 | |
| 
 | |
| func podLogs(ctx context.Context, requests []rest.ResponseWrapper) error {
 | |
| 	var retErr error
 | |
| 	for _, req := range requests {
 | |
| 		if err := logRequest(ctx, req, os.Stdout); err != nil {
 | |
| 			fmt.Fprintf(os.Stderr, "failed getting logs: %s\n", err)
 | |
| 			retErr = fmt.Errorf("failed to collect logs from all Flux pods")
 | |
| 			continue
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return retErr
 | |
| }
 | |
| 
 | |
| func createLabelStringFromMap(m map[string]string) string {
 | |
| 	var strArr []string
 | |
| 	for key, val := range m {
 | |
| 		pair := fmt.Sprintf("%v=%v", key, val)
 | |
| 		strArr = append(strArr, pair)
 | |
| 	}
 | |
| 
 | |
| 	return strings.Join(strArr, ",")
 | |
| }
 | |
| 
 | |
| func logRequest(ctx context.Context, request rest.ResponseWrapper, w io.Writer) error {
 | |
| 	stream, err := request.Stream(ctx)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer stream.Close()
 | |
| 
 | |
| 	scanner := bufio.NewScanner(stream)
 | |
| 
 | |
| 	const logTmpl = "{{.Timestamp}} {{.Level}} {{or .Kind .ControllerKind}}{{if .Name}}/{{.Name}}.{{.Namespace}}{{end}} - {{.Message}} {{.Error}}\n"
 | |
| 	t, err := template.New("log").Parse(logTmpl)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("unable to create template, err: %s", err)
 | |
| 	}
 | |
| 
 | |
| 	bw := bufio.NewWriter(w)
 | |
| 	for scanner.Scan() {
 | |
| 		line := scanner.Text()
 | |
| 		if !strings.HasPrefix(line, "{") {
 | |
| 			continue
 | |
| 		}
 | |
| 		var l ControllerLogEntry
 | |
| 		if err := json.Unmarshal([]byte(line), &l); err != nil {
 | |
| 			logger.Failuref("parse error: %s", err)
 | |
| 			break
 | |
| 		}
 | |
| 		filterPrintLog(t, &l, bw)
 | |
| 		bw.Flush()
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func filterPrintLog(t *template.Template, l *ControllerLogEntry, w io.Writer) {
 | |
| 	if (logsArgs.logLevel == "" || logsArgs.logLevel == l.Level) &&
 | |
| 		(logsArgs.kind == "" || strings.EqualFold(logsArgs.kind, l.Kind) || strings.EqualFold(logsArgs.kind, l.ControllerKind)) &&
 | |
| 		(logsArgs.name == "" || strings.EqualFold(logsArgs.name, l.Name)) &&
 | |
| 		(logsArgs.allNamespaces || strings.EqualFold(*kubeconfigArgs.Namespace, l.Namespace)) {
 | |
| 		err := t.Execute(w, l)
 | |
| 		if err != nil {
 | |
| 			logger.Failuref("log template error: %s", err)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| type ControllerLogEntry struct {
 | |
| 	Timestamp      string         `json:"ts"`
 | |
| 	Level          flags.LogLevel `json:"level"`
 | |
| 	Message        string         `json:"msg"`
 | |
| 	Error          string         `json:"error,omitempty"`
 | |
| 	Logger         string         `json:"logger"`
 | |
| 	Kind           string         `json:"reconciler kind,omitempty"`
 | |
| 	ControllerKind string         `json:"controllerKind,omitempty"`
 | |
| 	Name           string         `json:"name,omitempty"`
 | |
| 	Namespace      string         `json:"namespace,omitempty"`
 | |
| }
 |