Replace kubectl with Go server-side apply
Signed-off-by: Stefan Prodan <stefan.prodan@gmail.com>
This commit is contained in:
@@ -175,41 +175,14 @@ func (b *PlainGitBootstrapper) ReconcileComponents(ctx context.Context, manifest
|
||||
// Apply components using any existing customisations
|
||||
kfile := filepath.Join(filepath.Dir(componentsYAML), konfig.DefaultKustomizationFileName())
|
||||
if _, err := os.Stat(kfile); err == nil {
|
||||
tmpDir, err := os.MkdirTemp("", "gotk-crds")
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
// Extract the CRDs from the components manifest
|
||||
crdsYAML := filepath.Join(tmpDir, "gotk-crds.yaml")
|
||||
if err := utils.ExtractCRDs(componentsYAML, crdsYAML); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Apply the CRDs
|
||||
b.logger.Actionf("installing toolkit.fluxcd.io CRDs")
|
||||
kubectlArgs := []string{"apply", "-f", crdsYAML}
|
||||
if _, err = utils.ExecKubectlCommand(ctx, utils.ModeStderrOS, b.kubeconfig, b.kubecontext, kubectlArgs...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Wait for CRDs to be established
|
||||
b.logger.Waitingf("waiting for CRDs to be reconciled")
|
||||
kubectlArgs = []string{"wait", "--for", "condition=established", "-f", crdsYAML}
|
||||
if _, err = utils.ExecKubectlCommand(ctx, utils.ModeStderrOS, b.kubeconfig, b.kubecontext, kubectlArgs...); err != nil {
|
||||
return err
|
||||
}
|
||||
b.logger.Successf("CRDs reconciled successfully")
|
||||
|
||||
// Apply the components and their patches
|
||||
b.logger.Actionf("installing components in %q namespace", options.Namespace)
|
||||
kubectlArgs = []string{"apply", "-k", filepath.Dir(componentsYAML)}
|
||||
if _, err = utils.ExecKubectlCommand(ctx, utils.ModeStderrOS, b.kubeconfig, b.kubecontext, kubectlArgs...); err != nil {
|
||||
if _, err := utils.Apply(ctx, b.kubeconfig, b.kubecontext, kfile); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// Apply the CRDs and controllers
|
||||
b.logger.Actionf("installing components in %q namespace", options.Namespace)
|
||||
kubectlArgs := []string{"apply", "-f", componentsYAML}
|
||||
if _, err = utils.ExecKubectlCommand(ctx, utils.ModeStderrOS, b.kubeconfig, b.kubecontext, kubectlArgs...); err != nil {
|
||||
if _, err := utils.Apply(ctx, b.kubeconfig, b.kubecontext, componentsYAML); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -336,10 +309,10 @@ func (b *PlainGitBootstrapper) ReconcileSyncConfig(ctx context.Context, options
|
||||
|
||||
// Apply to cluster
|
||||
b.logger.Actionf("applying sync manifests")
|
||||
kubectlArgs := []string{"apply", "-k", filepath.Join(b.git.Path(), filepath.Dir(kusManifests.Path))}
|
||||
if _, err = utils.ExecKubectlCommand(ctx, utils.ModeStderrOS, b.kubeconfig, b.kubecontext, kubectlArgs...); err != nil {
|
||||
if _, err := utils.Apply(ctx, b.kubeconfig, b.kubecontext, filepath.Join(b.git.Path(), kusManifests.Path)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b.logger.Successf("reconciled sync configuration")
|
||||
|
||||
return nil
|
||||
|
||||
81
internal/utils/apply.go
Normal file
81
internal/utils/apply.go
Normal file
@@ -0,0 +1,81 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/fluxcd/pkg/ssa"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
|
||||
"sigs.k8s.io/kustomize/api/konfig"
|
||||
|
||||
"github.com/fluxcd/flux2/pkg/manifestgen/kustomization"
|
||||
)
|
||||
|
||||
// Apply is the equivalent of 'kubectl apply --server-side -f'.
|
||||
// If the given manifest is a kustomization.yaml, then apply performs the equivalent of 'kubectl apply --server-side -k'.
|
||||
func Apply(ctx context.Context, kubeConfigPath string, kubeContext string, manifestPath string) (string, error) {
|
||||
cfg, err := KubeConfig(kubeConfigPath, kubeContext)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
restMapper, err := apiutil.NewDynamicRESTMapper(cfg)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
kubeClient, err := client.New(cfg, client.Options{Mapper: restMapper})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
kubePoller := polling.NewStatusPoller(kubeClient, restMapper)
|
||||
|
||||
resourceManager := ssa.NewResourceManager(kubeClient, kubePoller, ssa.Owner{
|
||||
Field: "flux",
|
||||
Group: "fluxcd.io",
|
||||
})
|
||||
|
||||
objs, err := readObjects(manifestPath)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if len(objs) < 1 {
|
||||
return "", fmt.Errorf("no Kubernetes objects found at: %s", manifestPath)
|
||||
}
|
||||
|
||||
changeSet, err := resourceManager.ApplyAllStaged(ctx, objs, false, time.Minute)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return changeSet.String(), nil
|
||||
}
|
||||
|
||||
func readObjects(manifestPath string) ([]*unstructured.Unstructured, error) {
|
||||
if _, err := os.Stat(manifestPath); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if filepath.Base(manifestPath) == konfig.DefaultKustomizationFileName() {
|
||||
resources, err := kustomization.Build(filepath.Dir(manifestPath))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ssa.ReadObjects(bytes.NewReader(resources))
|
||||
}
|
||||
|
||||
ms, err := os.Open(manifestPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer ms.Close()
|
||||
|
||||
return ssa.ReadObjects(bufio.NewReader(ms))
|
||||
}
|
||||
Reference in New Issue
Block a user