/* Copyright 2023 The Kubernetes Authors. Copyright 2023 The Flux authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package main import ( "context" "fmt" "os" "sort" "strings" "time" "github.com/spf13/cobra" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/duration" "k8s.io/apimachinery/pkg/watch" runtimeresource "k8s.io/cli-runtime/pkg/resource" cmdutil "k8s.io/kubectl/pkg/cmd/util" "sigs.k8s.io/controller-runtime/pkg/client" helmv2 "github.com/fluxcd/helm-controller/api/v2beta1" autov1 "github.com/fluxcd/image-automation-controller/api/v1beta1" imagev1 "github.com/fluxcd/image-reflector-controller/api/v1beta2" kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1beta2" notificationv1 "github.com/fluxcd/notification-controller/api/v1beta2" sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" "github.com/fluxcd/flux2/internal/utils" "github.com/fluxcd/flux2/pkg/printers" ) var eventsCmd = &cobra.Command{ Use: "events", Short: "Display Kubernetes events for Flux resources", Long: "The events sub-command shows Kubernetes events from Flux resources", Example: ` # Display events for flux resources in default namespace flux events -n default # Display events for flux resources in all namespaces flux events -A # Display events for flux resources flux events --for Kustomization/podinfo `, RunE: eventsCmdRun, } type eventFlags struct { allNamespaces bool watch bool forSelector string filterTypes []string } var eventArgs eventFlags func init() { eventsCmd.Flags().BoolVarP(&eventArgs.allNamespaces, "all-namespaces", "A", false, "display events from Flux resources across all namespaces") eventsCmd.Flags().BoolVarP(&eventArgs.watch, "watch", "w", false, "indicate if the events should be streamed") eventsCmd.Flags().StringVar(&eventArgs.forSelector, "for", "", "get events for a particular object") eventsCmd.Flags().StringSliceVar(&eventArgs.filterTypes, "types", []string{}, "filter events for certain types") rootCmd.AddCommand(eventsCmd) } func eventsCmdRun(cmd *cobra.Command, args []string) error { ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout) defer cancel() kubeclient, err := utils.KubeClient(kubeconfigArgs, kubeclientOptions) if err != nil { return err } namespace := *kubeconfigArgs.Namespace if eventArgs.allNamespaces { namespace = "" } var diffRefNs bool clientListOpts := getListOpt(namespace, eventArgs.forSelector) var refListOpts [][]client.ListOption if eventArgs.forSelector != "" { refs, err := getObjectRef(ctx, kubeclient, eventArgs.forSelector, *kubeconfigArgs.Namespace) if err != nil { return err } for _, ref := range refs { kind, name, refNs := utils.ParseObjectKindNameNamespace(ref) if refNs != namespace { diffRefNs = true } refSelector := fmt.Sprintf("%s/%s", kind, name) refListOpts = append(refListOpts, getListOpt(refNs, refSelector)) } } showNamespace := namespace == "" || diffRefNs if eventArgs.watch { return eventsCmdWatchRun(ctx, kubeclient, clientListOpts, refListOpts, showNamespace) } rows, err := getRows(ctx, kubeclient, clientListOpts, refListOpts, showNamespace) if len(rows) == 0 { if eventArgs.allNamespaces { logger.Failuref("No flux events found.") } else { logger.Failuref("No flux events found in %s namespace.\n", *kubeconfigArgs.Namespace) } return nil } headers := getHeaders(showNamespace) err = printers.TablePrinter(headers).Print(cmd.OutOrStdout(), rows) return err } func getRows(ctx context.Context, kubeclient client.Client, clientListOpts []client.ListOption, refListOpts [][]client.ListOption, showNs bool) ([][]string, error) { el := &corev1.EventList{} if err := addEventsToList(ctx, kubeclient, el, clientListOpts); err != nil { return nil, err } for _, refOpts := range refListOpts { if err := addEventsToList(ctx, kubeclient, el, refOpts); err != nil { return nil, err } } sort.Sort(SortableEvents(el.Items)) var rows [][]string for _, item := range el.Items { if filterEvent(item) { continue } rows = append(rows, getEventRow(item, showNs)) } return rows, nil } func addEventsToList(ctx context.Context, kubeclient client.Client, el *corev1.EventList, clientListOpts []client.ListOption) error { listOpts := &metav1.ListOptions{} err := runtimeresource.FollowContinue(listOpts, func(options metav1.ListOptions) (runtime.Object, error) { newEvents := &corev1.EventList{} err := kubeclient.List(ctx, newEvents, clientListOpts...) if err != nil { return nil, fmt.Errorf("error getting events: %w", err) } el.Items = append(el.Items, newEvents.Items...) return newEvents, nil }) return err } func getListOpt(namespace, selector string) []client.ListOption { clientListOpts := []client.ListOption{client.Limit(cmdutil.DefaultChunkSize), client.InNamespace(namespace)} if selector != "" { kind, name := utils.ParseObjectKindName(selector) sel := fields.AndSelectors( fields.OneTermEqualSelector("involvedObject.kind", kind), fields.OneTermEqualSelector("involvedObject.name", name)) clientListOpts = append(clientListOpts, client.MatchingFieldsSelector{Selector: sel}) } return clientListOpts } func eventsCmdWatchRun(ctx context.Context, kubeclient client.WithWatch, listOpts []client.ListOption, refListOpts [][]client.ListOption, showNs bool) error { event := &corev1.EventList{} eventWatch, err := kubeclient.Watch(ctx, event, listOpts...) if err != nil { return err } defer eventWatch.Stop() firstIteration := true handleEvent := func(e watch.Event) error { if e.Type == watch.Deleted { return nil } event, ok := e.Object.(*corev1.Event) if !ok { return nil } if filterEvent(*event) { return nil } rows := getEventRow(*event, showNs) var hdr []string if firstIteration { hdr = getHeaders(showNs) firstIteration = false } err = printers.TablePrinter(hdr).Print(os.Stdout, [][]string{rows}) if err != nil { return err } return nil } for _, refOpts := range refListOpts { refEventWatch, err := kubeclient.Watch(ctx, event, refOpts...) if err != nil { return err } defer refEventWatch.Stop() go receiveEventChan(ctx, refEventWatch, handleEvent) } return receiveEventChan(ctx, eventWatch, handleEvent) } func receiveEventChan(ctx context.Context, eventWatch watch.Interface, f func(e watch.Event) error) error { for { select { case e, ok := <-eventWatch.ResultChan(): if !ok { return nil } err := f(e) if err != nil { return err } case <-ctx.Done(): return nil } } } func getHeaders(showNs bool) []string { headers := []string{"Last seen", "Type", "Reason", "Object", "Message"} if showNs { headers = append(namespaceHeader, headers...) } return headers } var fluxKinds = []string{sourcev1.GitRepositoryKind, sourcev1.HelmRepositoryKind, sourcev1.OCIRepositoryKind, sourcev1.BucketKind, sourcev1.HelmChartKind, kustomizev1.KustomizationKind, helmv2.HelmReleaseKind, notificationv1.AlertKind, notificationv1.ProviderKind, imagev1.ImageRepositoryKind, imagev1.ImagePolicyKind, autov1.ImageUpdateAutomationKind} func getEventRow(e corev1.Event, showNs bool) []string { var row []string if showNs { row = []string{e.Namespace} } row = append(row, getLastSeen(e), e.Type, e.Reason, fmt.Sprintf("%s/%s", e.InvolvedObject.Kind, e.InvolvedObject.Name), e.Message) return row } // getObjectRef is used to get the metadata of a resource that the selector(in the format ) references. // It returns an empty string if the resource doesn't reference any resource // and a string with the format `/.` if it does. func getObjectRef(ctx context.Context, kubeclient client.Client, selector string, ns string) ([]string, error) { kind, name := utils.ParseObjectKindName(selector) ref, err := getGroupVersionAndRef(kind, name, ns) if err != nil { return nil, fmt.Errorf("error getting groupversion: %w", err) } // the resource has no source ref if len(ref.field) == 0 { return nil, nil } obj := &unstructured.Unstructured{} obj.SetGroupVersionKind(schema.GroupVersionKind{ Kind: kind, Version: ref.gv.Version, Group: ref.gv.Group, }) objName := types.NamespacedName{ Namespace: ns, Name: name, } err = kubeclient.Get(ctx, objName, obj) if err != nil { return nil, err } var ok bool refKind := ref.kind if refKind == "" { kindField := append(ref.field, "kind") refKind, ok, err = unstructured.NestedString(obj.Object, kindField...) if err != nil { return nil, err } if !ok { return nil, fmt.Errorf("field '%s' for '%s' not found", strings.Join(kindField, "."), objName) } } nameField := append(ref.field, "name") refName, ok, err := unstructured.NestedString(obj.Object, nameField...) if err != nil { return nil, err } if !ok { return nil, fmt.Errorf("field '%s' for '%s' not found", strings.Join(nameField, "."), objName) } var allRefs []string refNamespace := ns if ref.crossNamespaced { namespaceField := append(ref.field, "namespace") namespace, ok, err := unstructured.NestedString(obj.Object, namespaceField...) if err != nil { return nil, err } if ok { refNamespace = namespace } } allRefs = append(allRefs, fmt.Sprintf("%s/%s.%s", refKind, refName, refNamespace)) for _, ref := range ref.otherRefs { allRefs = append(allRefs, fmt.Sprintf("%s.%s", ref, refNamespace)) } return allRefs, nil } type refInfo struct { gv schema.GroupVersion kind string crossNamespaced bool otherRefs []string field []string } func getGroupVersionAndRef(kind, name, ns string) (refInfo, error) { switch kind { case kustomizev1.KustomizationKind: return refInfo{ gv: kustomizev1.GroupVersion, crossNamespaced: true, field: []string{"spec", "sourceRef"}, }, nil case helmv2.HelmReleaseKind: return refInfo{ gv: helmv2.GroupVersion, crossNamespaced: true, otherRefs: []string{fmt.Sprintf("HelmChart/%s-%s", ns, name)}, field: []string{"spec", "chart", "spec", "sourceRef"}, }, nil case notificationv1.AlertKind: return refInfo{ gv: notificationv1.GroupVersion, kind: notificationv1.ProviderKind, crossNamespaced: false, field: []string{"spec", "providerRef"}, }, nil case notificationv1.ReceiverKind, notificationv1.ProviderKind: return refInfo{ gv: notificationv1.GroupVersion, }, nil case imagev1.ImagePolicyKind: return refInfo{ gv: imagev1.GroupVersion, kind: imagev1.ImageRepositoryKind, crossNamespaced: true, field: []string{"spec", "imageRepositoryRef"}, }, nil case sourcev1.GitRepositoryKind, sourcev1.HelmChartKind, sourcev1.BucketKind, sourcev1.HelmRepositoryKind, sourcev1.OCIRepositoryKind: return refInfo{gv: sourcev1.GroupVersion}, nil case autov1.ImageUpdateAutomationKind: return refInfo{gv: autov1.GroupVersion}, nil case imagev1.ImageRepositoryKind: return refInfo{gv: imagev1.GroupVersion}, nil default: return refInfo{}, fmt.Errorf("'%s' is not a flux kind", kind) } } func filterEvent(e corev1.Event) bool { if !utils.ContainsItemString(fluxKinds, e.InvolvedObject.Kind) { return true } if len(eventArgs.filterTypes) > 0 { _, equal := utils.ContainsEqualFoldItemString(eventArgs.filterTypes, e.Type) if !equal { return true } } return false } // The functions below are copied from: https://github.com/kubernetes/kubectl/blob/master/pkg/cmd/events/events.go#L347 // SortableEvents implements sort.Interface for []api.Event by time type SortableEvents []corev1.Event func (list SortableEvents) Len() int { return len(list) } func (list SortableEvents) Swap(i, j int) { list[i], list[j] = list[j], list[i] } // Return the time that should be used for sorting, which can come from // various places in corev1.Event. func eventTime(event corev1.Event) time.Time { if event.Series != nil { return event.Series.LastObservedTime.Time } if !event.LastTimestamp.Time.IsZero() { return event.LastTimestamp.Time } return event.EventTime.Time } func (list SortableEvents) Less(i, j int) bool { return eventTime(list[i]).Before(eventTime(list[j])) } func getLastSeen(e corev1.Event) string { var interval string firstTimestampSince := translateMicroTimestampSince(e.EventTime) if e.EventTime.IsZero() { firstTimestampSince = translateTimestampSince(e.FirstTimestamp) } if e.Series != nil { interval = fmt.Sprintf("%s (x%d over %s)", translateMicroTimestampSince(e.Series.LastObservedTime), e.Series.Count, firstTimestampSince) } else if e.Count > 1 { interval = fmt.Sprintf("%s (x%d over %s)", translateTimestampSince(e.LastTimestamp), e.Count, firstTimestampSince) } else { interval = firstTimestampSince } return interval } // translateMicroTimestampSince returns the elapsed time since timestamp in // human-readable approximation. func translateMicroTimestampSince(timestamp metav1.MicroTime) string { if timestamp.IsZero() { return "" } return duration.HumanDuration(time.Since(timestamp.Time)) } // translateTimestampSince returns the elapsed time since timestamp in // human-readable approximation. func translateTimestampSince(timestamp metav1.Time) string { if timestamp.IsZero() { return "" } return duration.HumanDuration(time.Since(timestamp.Time)) }