From 45837d2d1bc0a294c580cd49d144ffe885443815 Mon Sep 17 00:00:00 2001 From: Stefan Prodan Date: Fri, 8 Aug 2025 13:23:00 +0300 Subject: [PATCH] Implement `flux migrate` command The migrate command must be run before a Flux minor version upgrade. The command migrates the Flux custom resources stored in Kubernetes etcd to their latest API version, ensuring the Flux components can continue to function correctly after the upgrade. Signed-off-by: Stefan Prodan --- .github/workflows/e2e.yaml | 3 + cmd/flux/migrate.go | 184 +++++++++++++++++++++++++++++++++++++ 2 files changed, 187 insertions(+) create mode 100644 cmd/flux/migrate.go diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml index bcf08f43..0c046f40 100644 --- a/.github/workflows/e2e.yaml +++ b/.github/workflows/e2e.yaml @@ -238,6 +238,9 @@ jobs: - name: flux check run: | ./bin/flux check + - name: flux migrate + run: | + ./bin/flux migrate - name: flux version run: | ./bin/flux version diff --git a/cmd/flux/migrate.go b/cmd/flux/migrate.go new file mode 100644 index 00000000..4c304a72 --- /dev/null +++ b/cmd/flux/migrate.go @@ -0,0 +1,184 @@ +/* +Copyright 2025 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "fmt" + + "github.com/spf13/cobra" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/fluxcd/flux2/v2/internal/utils" +) + +var migrateCmd = &cobra.Command{ + Use: "migrate", + Args: cobra.NoArgs, + Short: "Migrate the Flux custom resources to their latest API version", + Long: `The migrate command must be run before a Flux minor version upgrade. +The command migrates the Flux custom resources stored in Kubernetes etcd to their latest API version, +ensuring the Flux components can continue to function correctly after the upgrade. +`, + RunE: runMigrateCmd, +} + +func init() { + rootCmd.AddCommand(migrateCmd) +} + +func runMigrateCmd(cmd *cobra.Command, args []string) error { + logger.Actionf("starting migration of custom resources") + ctx, cancel := context.WithTimeout(context.Background(), rootArgs.timeout) + defer cancel() + + cfg, err := utils.KubeConfig(kubeconfigArgs, kubeclientOptions) + if err != nil { + return fmt.Errorf("Kubernetes client initialization failed: %s", err.Error()) + } + + kubeClient, err := client.New(cfg, client.Options{Scheme: utils.NewScheme()}) + if err != nil { + return err + } + + migrator := NewMigrator(kubeClient, client.MatchingLabels{ + "app.kubernetes.io/part-of": "flux", + }) + + if err := migrator.Run(ctx); err != nil { + return err + } + + logger.Successf("custom resources migrated successfully") + return nil +} + +type Migrator struct { + labelSelector client.MatchingLabels + kubeClient client.Client +} + +// NewMigrator creates a new Migrator instance with the specified label selector. +func NewMigrator(kubeClient client.Client, labelSelector client.MatchingLabels) *Migrator { + return &Migrator{ + labelSelector: labelSelector, + kubeClient: kubeClient, + } +} + +func (m *Migrator) Run(ctx context.Context) error { + crdList := &apiextensionsv1.CustomResourceDefinitionList{} + + if err := m.kubeClient.List(ctx, crdList, m.labelSelector); err != nil { + return fmt.Errorf("failed to list CRDs: %w", err) + } + + for _, crd := range crdList.Items { + if err := m.migrateCRD(ctx, crd.Name); err != nil { + return err + } + } + + return nil +} + +func (m *Migrator) migrateCRD(ctx context.Context, name string) error { + crd := &apiextensionsv1.CustomResourceDefinition{} + + if err := m.kubeClient.Get(ctx, client.ObjectKey{Name: name}, crd); err != nil { + return fmt.Errorf("failed to get CRD %s: %w", name, err) + } + + // get the latest storage version for the CRD + storageVersion := m.getStorageVersion(crd) + if storageVersion == "" { + return fmt.Errorf("no storage version found for CRD %s", name) + } + + // migrate all the resources for the CRD + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + return m.migrateCR(ctx, crd, storageVersion) + }) + if err != nil { + return fmt.Errorf("failed to migrate resources for CRD %s: %w", name, err) + } + + // set the CRD status to contain only the latest storage version + if len(crd.Status.StoredVersions) > 1 || crd.Status.StoredVersions[0] != storageVersion { + crd.Status.StoredVersions = []string{storageVersion} + if err := m.kubeClient.Status().Update(ctx, crd); err != nil { + return fmt.Errorf("failed to update CRD %s status: %w", crd.Name, err) + } + logger.Successf("%s migrated to storage version %s", crd.Name, storageVersion) + } + return nil +} + +// migrateCR migrates all CRs for the given CRD to the specified version by patching them with an empty patch. +func (m *Migrator) migrateCR(ctx context.Context, crd *apiextensionsv1.CustomResourceDefinition, version string) error { + list := &unstructured.UnstructuredList{} + + apiVersion := crd.Spec.Group + "/" + version + listKind := crd.Spec.Names.ListKind + + list.SetAPIVersion(apiVersion) + list.SetKind(listKind) + + err := m.kubeClient.List(ctx, list, client.InNamespace("")) + if err != nil { + return fmt.Errorf("failed to list resources for CRD %s: %w", crd.Name, err) + } + + if len(list.Items) == 0 { + return nil + } + + for _, item := range list.Items { + // patch the resource with an empty patch to update the version + if err := m.kubeClient.Patch( + ctx, + &item, + client.RawPatch(client.Merge.Type(), []byte("{}")), + ); err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf(" %s/%s/%s failed to migrate: %w", + item.GetKind(), item.GetNamespace(), item.GetName(), err) + } + + logger.Successf("%s/%s/%s migrated to version %s", + item.GetKind(), item.GetNamespace(), item.GetName(), version) + } + + return nil +} + +// getStorageVersion retrieves the storage version of a CustomResourceDefinition. +func (m *Migrator) getStorageVersion(crd *apiextensionsv1.CustomResourceDefinition) string { + var version string + for _, v := range crd.Spec.Versions { + if v.Storage { + version = v.Name + break + } + } + + return version +}