1
0
mirror of synced 2026-02-06 19:05:55 +00:00

Add graceful shutdown when interrupted

If implemented this permit restoring a clean state in case of signal
interruption.

Signed-off-by: Soule BA <soule@weave.works>
This commit is contained in:
Soule BA
2022-01-12 11:50:19 +01:00
parent f7d9ee90cd
commit 306f8f5715
23 changed files with 134 additions and 800 deletions

297
internal/build/build.go Normal file
View File

@@ -0,0 +1,297 @@
/*
Copyright 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package build
import (
"bytes"
"context"
"encoding/base64"
"fmt"
"sync"
"time"
"github.com/fluxcd/flux2/internal/utils"
kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1beta2"
"github.com/fluxcd/pkg/kustomize"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/cli-runtime/pkg/genericclioptions"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/kustomize/api/resmap"
"sigs.k8s.io/kustomize/api/resource"
"sigs.k8s.io/kustomize/kyaml/filesys"
)
const (
controllerName = "kustomize-controller"
controllerGroup = "kustomize.toolkit.fluxcd.io"
mask = "**SOPS**"
)
var defaultTimeout = 80 * time.Second
// Builder builds yaml manifests
// It retrieves the kustomization object from the k8s cluster
// and overlays the manifests with the resources specified in the resourcesPath
type Builder struct {
client client.WithWatch
restMapper meta.RESTMapper
name string
namespace string
resourcesPath string
// mu is used to synchronize access to the kustomization file
mu sync.Mutex
action kustomize.Action
kustomization *kustomizev1.Kustomization
timeout time.Duration
}
type BuilderOptionFunc func(b *Builder) error
func WithTimeout(timeout time.Duration) BuilderOptionFunc {
return func(b *Builder) error {
b.timeout = timeout
return nil
}
}
// NewBuilder returns a new Builder
// to dp : create functional options
func NewBuilder(rcg *genericclioptions.ConfigFlags, name, resources string, opts ...BuilderOptionFunc) (*Builder, error) {
kubeClient, err := utils.KubeClient(rcg)
if err != nil {
return nil, err
}
restMapper, err := rcg.ToRESTMapper()
if err != nil {
return nil, err
}
b := &Builder{
client: kubeClient,
restMapper: restMapper,
name: name,
namespace: *rcg.Namespace,
resourcesPath: resources,
}
for _, opt := range opts {
if err := opt(b); err != nil {
return nil, err
}
}
if b.timeout == 0 {
b.timeout = defaultTimeout
}
return b, nil
}
func (b *Builder) getKustomization(ctx context.Context) (*kustomizev1.Kustomization, error) {
namespacedName := types.NamespacedName{
Namespace: b.namespace,
Name: b.name,
}
k := &kustomizev1.Kustomization{}
err := b.client.Get(ctx, namespacedName, k)
if err != nil {
return nil, err
}
return k, nil
}
// Build builds the yaml manifests from the kustomization object
// and overlays the manifests with the resources specified in the resourcesPath
// It expects a kustomization.yaml file in the resourcesPath, and it will
// generate a kustomization.yaml file if it doesn't exist
func (b *Builder) Build() ([]byte, error) {
m, err := b.build()
if err != nil {
return nil, err
}
resources, err := m.AsYaml()
if err != nil {
return nil, fmt.Errorf("kustomize build failed: %w", err)
}
return resources, nil
}
func (b *Builder) build() (m resmap.ResMap, err error) {
ctx, cancel := context.WithTimeout(context.Background(), b.timeout)
defer cancel()
// Get the kustomization object
k, err := b.getKustomization(ctx)
if err != nil {
return
}
// store the kustomization object
b.kustomization = k
// generate kustomization.yaml if needed
action, er := b.generate(*k, b.resourcesPath)
if er != nil {
errf := kustomize.CleanDirectory(b.resourcesPath, action)
err = fmt.Errorf("failed to generate kustomization.yaml: %w", fmt.Errorf("%v %v", er, errf))
return
}
b.action = action
defer func() {
errf := b.Cancel()
if err == nil {
err = errf
}
}()
// build the kustomization
m, err = b.do(ctx, *k, b.resourcesPath)
if err != nil {
return
}
for _, res := range m.Resources() {
// set owner labels
err = b.setOwnerLabels(res)
if err != nil {
return
}
// make sure secrets are masked
err = trimSopsData(res)
if err != nil {
return
}
}
return
}
func (b *Builder) generate(kustomization kustomizev1.Kustomization, dirPath string) (kustomize.Action, error) {
data, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&kustomization)
if err != nil {
return "", err
}
gen := kustomize.NewGenerator(unstructured.Unstructured{Object: data})
// acuire the lock
b.mu.Lock()
defer b.mu.Unlock()
return gen.WriteFile(dirPath, kustomize.WithSaveOriginalKustomization())
}
func (b *Builder) do(ctx context.Context, kustomization kustomizev1.Kustomization, dirPath string) (resmap.ResMap, error) {
fs := filesys.MakeFsOnDisk()
// acuire the lock
b.mu.Lock()
defer b.mu.Unlock()
m, err := kustomize.BuildKustomization(fs, dirPath)
if err != nil {
return nil, fmt.Errorf("kustomize build failed: %w", err)
}
for _, res := range m.Resources() {
// run variable substitutions
if kustomization.Spec.PostBuild != nil {
data, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&kustomization)
if err != nil {
return nil, err
}
outRes, err := kustomize.SubstituteVariables(ctx, b.client, unstructured.Unstructured{Object: data}, res)
if err != nil {
return nil, fmt.Errorf("var substitution failed for '%s': %w", res.GetName(), err)
}
if outRes != nil {
_, err = m.Replace(res)
if err != nil {
return nil, err
}
}
}
}
return m, nil
}
func (b *Builder) setOwnerLabels(res *resource.Resource) error {
labels := res.GetLabels()
labels[controllerGroup+"/name"] = b.kustomization.GetName()
labels[controllerGroup+"/namespace"] = b.kustomization.GetNamespace()
err := res.SetLabels(labels)
if err != nil {
return err
}
return nil
}
func trimSopsData(res *resource.Resource) error {
// sopsMess is the base64 encoded mask
sopsMess := base64.StdEncoding.EncodeToString([]byte(mask))
if res.GetKind() == "Secret" {
dataMap := res.GetDataMap()
for k, v := range dataMap {
data, err := base64.StdEncoding.DecodeString(v)
if err != nil {
if _, ok := err.(base64.CorruptInputError); ok {
return fmt.Errorf("failed to decode secret data: %w", err)
}
}
if bytes.Contains(data, []byte("sops")) && bytes.Contains(data, []byte("ENC[")) {
dataMap[k] = sopsMess
}
}
res.SetDataMap(dataMap)
}
return nil
}
// Cancel cancels the build
// It restores a clean reprository
func (b *Builder) Cancel() error {
// acuire the lock
b.mu.Lock()
defer b.mu.Unlock()
err := kustomize.CleanDirectory(b.resourcesPath, b.action)
if err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,120 @@
/*
Copyright 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package build
import (
"testing"
"github.com/google/go-cmp/cmp"
"sigs.k8s.io/kustomize/api/resource"
"sigs.k8s.io/kustomize/kyaml/yaml"
)
func TestTrimSopsData(t *testing.T) {
testCases := []struct {
name string
yamlStr string
expected string
}{
{
name: "secret with sops token",
yamlStr: `apiVersion: v1
kind: Secret
metadata:
name: my-secret
type: Opaque
data:
token: |
ewoJImRhdGEiOiAiRU5DW0FFUzI1Nl9HQ00sZGF0YTpvQmU1UGxQbWZRQ1VVYzRzcUtJbW
p3PT0saXY6TUxMRVcxNVFDOWtSZFZWYWdKbnpMQ1NrMHhaR1dJcEFlVGZIenl4VDEwZz0s
dGFnOkszR2tCQ0dTK3V0NFRwazZuZGIwQ0E9PSx0eXBlOnN0cl0iLAoJInNvcHMiOiB7Cg
kJImttcyI6IG51bGwsCgkJImdjcF9rbXMiOiBudWxsLAoJCSJhenVyZV9rdiI6IG51bGws
CgkJImhjX3ZhdWx0IjogbnVsbCwKCQkiYWdlIjogWwoJCQl7CgkJCQkicmVjaXBpZW50Ij
ogImFnZTEwbGEyZ2Uwd3R2eDNxcjdkYXRxZjdyczR5bmd4c3pkYWw5MjdmczlydWthbXI4
dTJwc2hzdnR6N2NlIiwKCQkJCSJlbmMiOiAiLS0tLS1CRUdJTiBBR0UgRU5DUllQVEVEIE
ZJTEUtLS0tLVxuWVdkbExXVnVZM0o1Y0hScGIyNHViM0puTDNZeENpMCtJRmd5TlRVeE9T
QTFMMlJwWkhScksxRlNWbVlyZDFWYVxuWTBoeFdGUXpTREJzVDFrM1dqTnRZbVUxUW1saW
FESnljWGxOQ25GMVlqZE5PVGhWYlZOdk1HOXJOUzlaVVhad1xuTW5WMGJuUlVNR050ZWpG
UGJ6TTRVMlV6V2tzemVWa0tMUzB0SUdKNlVHaHhNVVYzWW1WSlRIbEpTVUpwUlZSWlxuVm
pkMFJWUmFkVTh3ZWt4WFRISXJZVXBsWWtOMmFFRUswSS9NQ0V0WFJrK2IvTjJHMUpGM3ZI
UVQyNGRTaFdZRFxudytKSVVTQTNhTGYyc3YwenIyTWRVRWRWV0JKb004blQ0RDR4VmJCT1
JEKzY2OVcrOW5EZVN3PT1cbi0tLS0tRU5EIEFHRSBFTkNSWVBURUQgRklMRS0tLS0tXG4i
CgkJCX0KCQldLAoJCSJsYXN0bW9kaWZpZWQiOiAiMjAyMS0xMS0yNlQxNjozNDo1MVoiLA
oJCSJtYWMiOiAiRU5DW0FFUzI1Nl9HQ00sZGF0YTpDT0d6ZjVZQ0hOTlA2ejRKYUVLcmpO
M004ZjUrUTF1S1VLVE1Id2ozODgvSUNtTHlpMnNTclRtajdQUCtYN005alRWd2E4d1ZnWV
RwTkxpVkp4K0xjeHF2SVhNMFR5bysvQ3UxenJmYW85OGFpQUNQOCtUU0VEaUZRTnRFdXMy
M0grZC9YMWhxTXdSSERJM2tRKzZzY2dFR25xWTU3cjNSRFNBM0U4RWhIcjQ9LGl2Okx4aX
RWSVltOHNyWlZxRnVlSmg5bG9DbEE0NFkyWjNYQVZZbXhlc01tT2c9LHRhZzpZOHFGRDhV
R2xEZndOU3Y3eGxjbjZBPT0sdHlwZTpzdHJdIiwKCQkicGdwIjogbnVsbCwKCQkidW5lbm
NyeXB0ZWRfc3VmZml4IjogIl91bmVuY3J5cHRlZCIsCgkJInZlcnNpb24iOiAiMy43LjEi
Cgl9Cn0=
`,
expected: `apiVersion: v1
data:
token: KipTT1BTKio=
kind: Secret
metadata:
name: my-secret
type: Opaque
`,
},
{
name: "secret with basic auth",
yamlStr: `apiVersion: v1
data:
password: cGFzc3dvcmQK
username: YWRtaW4K
kind: Secret
metadata:
name: secret-basic-auth
type: kubernetes.io/basic-auth
`,
expected: `apiVersion: v1
data:
password: cGFzc3dvcmQK
username: YWRtaW4K
kind: Secret
metadata:
name: secret-basic-auth
type: kubernetes.io/basic-auth
`,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
r, err := yaml.Parse(tc.yamlStr)
if err != nil {
t.Fatalf("unable to parse yaml: %v", err)
}
resource := &resource.Resource{RNode: *r}
err = trimSopsData(resource)
if err != nil {
t.Fatalf("unable to trim sops data: %v", err)
}
sYaml, err := resource.AsYAML()
if err != nil {
t.Fatalf("unable to convert sanitized resources to yaml: %v", err)
}
if diff := cmp.Diff(string(sYaml), tc.expected); diff != "" {
t.Errorf("unexpected sanitized resources: (-got +want)%v", diff)
}
})
}
}

287
internal/build/diff.go Normal file
View File

@@ -0,0 +1,287 @@
package build
import (
"bytes"
"context"
"encoding/base64"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"strings"
kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1beta2"
"github.com/fluxcd/pkg/ssa"
"github.com/gonvenience/bunt"
"github.com/gonvenience/ytbx"
"github.com/google/go-cmp/cmp"
"github.com/homeport/dyff/pkg/dyff"
"github.com/lucasb-eyer/go-colorful"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/yaml"
)
func (b *Builder) Manager() (*ssa.ResourceManager, error) {
statusPoller := polling.NewStatusPoller(b.client, b.restMapper, nil)
owner := ssa.Owner{
Field: controllerName,
Group: controllerGroup,
}
return ssa.NewResourceManager(b.client, statusPoller, owner), nil
}
func (b *Builder) Diff() (string, error) {
output := strings.Builder{}
res, err := b.Build()
if err != nil {
return "", err
}
// convert the build result into Kubernetes unstructured objects
objects, err := ssa.ReadObjects(bytes.NewReader(res))
if err != nil {
return "", err
}
resourceManager, err := b.Manager()
if err != nil {
return "", err
}
ctx, cancel := context.WithTimeout(context.Background(), b.timeout)
defer cancel()
if err := ssa.SetNativeKindsDefaults(objects); err != nil {
return "", err
}
// create an inventory of objects to be reconciled
newInventory := newInventory()
for _, obj := range objects {
diffOptions := ssa.DiffOptions{
Exclusions: map[string]string{
"kustomize.toolkit.fluxcd.io/reconcile": "disabled",
},
}
change, liveObject, mergedObject, err := resourceManager.Diff(ctx, obj, diffOptions)
if err != nil {
if b.kustomization.Spec.Force && ssa.IsImmutableError(err) {
output.WriteString(writeString(fmt.Sprintf("► %s created\n", obj.GetName()), bunt.Green))
} else {
output.WriteString(writeString(fmt.Sprintf("✗ %v\n", err), bunt.Red))
}
continue
}
// if the object is a sops secret, we need to
// make sure we diff only if the keys are different
if obj.GetKind() == "Secret" && change.Action == string(ssa.ConfiguredAction) {
diffSopsSecret(obj, liveObject, mergedObject, change)
}
if change.Action == string(ssa.CreatedAction) {
output.WriteString(writeString(fmt.Sprintf("► %s created\n", change.Subject), bunt.Green))
}
if change.Action == string(ssa.ConfiguredAction) {
output.WriteString(writeString(fmt.Sprintf("► %s drifted\n", change.Subject), bunt.WhiteSmoke))
liveFile, mergedFile, tmpDir, err := writeYamls(liveObject, mergedObject)
if err != nil {
return "", err
}
defer cleanupDir(tmpDir)
err = diff(liveFile, mergedFile, &output)
if err != nil {
return "", err
}
}
addObjectsToInventory(newInventory, change)
}
if b.kustomization.Spec.Prune {
oldStatus := b.kustomization.Status.DeepCopy()
if oldStatus.Inventory != nil {
diffObjects, err := diffInventory(oldStatus.Inventory, newInventory)
if err != nil {
return "", err
}
for _, object := range diffObjects {
output.WriteString(writeString(fmt.Sprintf("► %s deleted\n", ssa.FmtUnstructured(object)), bunt.OrangeRed))
}
}
}
return output.String(), nil
}
func writeYamls(liveObject, mergedObject *unstructured.Unstructured) (string, string, string, error) {
tmpDir, err := os.MkdirTemp("", "")
if err != nil {
return "", "", "", err
}
liveYAML, _ := yaml.Marshal(liveObject)
liveFile := filepath.Join(tmpDir, "live.yaml")
if err := os.WriteFile(liveFile, liveYAML, 0644); err != nil {
return "", "", "", err
}
mergedYAML, _ := yaml.Marshal(mergedObject)
mergedFile := filepath.Join(tmpDir, "merged.yaml")
if err := os.WriteFile(mergedFile, mergedYAML, 0644); err != nil {
return "", "", "", err
}
return liveFile, mergedFile, tmpDir, nil
}
func writeString(t string, color colorful.Color) string {
return bunt.Style(
t,
bunt.EachLine(),
bunt.Foreground(color),
)
}
func cleanupDir(dir string) error {
return os.RemoveAll(dir)
}
func diff(liveFile, mergedFile string, output io.Writer) error {
from, to, err := ytbx.LoadFiles(liveFile, mergedFile)
if err != nil {
return fmt.Errorf("failed to load input files: %w", err)
}
report, err := dyff.CompareInputFiles(from, to,
dyff.IgnoreOrderChanges(false),
dyff.KubernetesEntityDetection(true),
)
if err != nil {
return fmt.Errorf("failed to compare input files: %w", err)
}
reportWriter := &dyff.HumanReport{
Report: report,
OmitHeader: true,
}
if err := reportWriter.WriteReport(output); err != nil {
return fmt.Errorf("failed to print report: %w", err)
}
return nil
}
func diffSopsSecret(obj, liveObject, mergedObject *unstructured.Unstructured, change *ssa.ChangeSetEntry) {
data := obj.Object["data"]
for _, v := range data.(map[string]interface{}) {
v, err := base64.StdEncoding.DecodeString(v.(string))
if err != nil {
fmt.Println(err)
}
if bytes.Contains(v, []byte(mask)) {
if liveObject != nil && mergedObject != nil {
change.Action = string(ssa.UnchangedAction)
dataLive := liveObject.Object["data"].(map[string]interface{})
dataMerged := mergedObject.Object["data"].(map[string]interface{})
if cmp.Diff(keys(dataLive), keys(dataMerged)) != "" {
change.Action = string(ssa.ConfiguredAction)
}
}
}
}
}
func keys(m map[string]interface{}) []string {
keys := make([]string, len(m))
i := 0
for k := range m {
keys[i] = k
i++
}
return keys
}
// diffInventory returns the slice of objects that do not exist in the target inventory.
func diffInventory(inv *kustomizev1.ResourceInventory, target *kustomizev1.ResourceInventory) ([]*unstructured.Unstructured, error) {
versionOf := func(i *kustomizev1.ResourceInventory, objMetadata object.ObjMetadata) string {
for _, entry := range i.Entries {
if entry.ID == objMetadata.String() {
return entry.Version
}
}
return ""
}
objects := make([]*unstructured.Unstructured, 0)
aList, err := listMetaInInventory(inv)
if err != nil {
return nil, err
}
bList, err := listMetaInInventory(target)
if err != nil {
return nil, err
}
list := aList.Diff(bList)
if len(list) == 0 {
return objects, nil
}
for _, metadata := range list {
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(schema.GroupVersionKind{
Group: metadata.GroupKind.Group,
Kind: metadata.GroupKind.Kind,
Version: versionOf(inv, metadata),
})
u.SetName(metadata.Name)
u.SetNamespace(metadata.Namespace)
objects = append(objects, u)
}
sort.Sort(ssa.SortableUnstructureds(objects))
return objects, nil
}
// listMetaInInventory returns the inventory entries as object.ObjMetadata objects.
func listMetaInInventory(inv *kustomizev1.ResourceInventory) (object.ObjMetadataSet, error) {
var metas []object.ObjMetadata
for _, e := range inv.Entries {
m, err := object.ParseObjMetadata(e.ID)
if err != nil {
return metas, err
}
metas = append(metas, m)
}
return metas, nil
}
func newInventory() *kustomizev1.ResourceInventory {
return &kustomizev1.ResourceInventory{
Entries: []kustomizev1.ResourceRef{},
}
}
// addObjectsToInventory extracts the metadata from the given objects and adds it to the inventory.
func addObjectsToInventory(inv *kustomizev1.ResourceInventory, entry *ssa.ChangeSetEntry) error {
if entry == nil {
return nil
}
inv.Entries = append(inv.Entries, kustomizev1.ResourceRef{
ID: entry.ObjMetadata.String(),
Version: entry.GroupVersion,
})
return nil
}