Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Graceful termination of the operator #2662

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 140 additions & 22 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,13 @@ import (
"os"
goruntime "runtime"
"strings"

"github.com/tigera/operator/pkg/render/common/networkpolicy"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/cache"
"time"

"github.com/cloudflare/cfssl/log"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/tools/clientcmd"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/yaml"

v3 "github.com/tigera/api/pkg/apis/projectcalico/v3"
operatorv1 "github.com/tigera/operator/api/v1"
v1 "github.com/tigera/operator/api/v1"
operatorv1beta1 "github.com/tigera/operator/api/v1beta1"
"github.com/tigera/operator/controllers"
"github.com/tigera/operator/pkg/active"
Expand All @@ -55,7 +40,25 @@ import (
"github.com/tigera/operator/pkg/controller/utils"
"github.com/tigera/operator/pkg/crds"
"github.com/tigera/operator/pkg/dns"
"github.com/tigera/operator/pkg/render/common/networkpolicy"
"github.com/tigera/operator/version"

"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/tools/clientcmd"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/yaml"
// +kubebuilder:scaffold:imports
)

Expand All @@ -66,11 +69,11 @@ var (
)

func init() {
// +kubebuilder:scaffold:scheme
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(apiextensions.AddToScheme(scheme))
utilruntime.Must(operatorv1.AddToScheme(scheme))
utilruntime.Must(operatorv1beta1.AddToScheme(scheme))
// +kubebuilder:scaffold:scheme
utilruntime.Must(apis.AddToScheme(scheme))
}

Expand All @@ -93,6 +96,8 @@ func main() {
var printEnterpriseCRDs string
var sgSetup bool
var manageCRDs bool
var preDelete bool

flag.BoolVar(&enableLeaderElection, "enable-leader-election", true,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
Expand All @@ -110,6 +115,9 @@ func main() {
"Setup Security Groups in AWS (should only be used on OpenShift).")
flag.BoolVar(&manageCRDs, "manage-crds", false,
"Operator should manage the projectcalico.org and operator.tigera.io CRDs.")
flag.BoolVar(&preDelete, "pre-delete", false,
"Run helm pre-deletion hook logic, then exit.")

opts := zap.Options{}
opts.BindFlags(flag.CommandLine)
flag.Parse()
Expand Down Expand Up @@ -163,15 +171,15 @@ func main() {

printVersion()

ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())

cfg, err := config.GetConfig()
if err != nil {
log.Error(err, "")
os.Exit(1)
}

c, err := client.New(cfg, client.Options{})
c, err := client.New(cfg, client.Options{Scheme: scheme})
if err != nil {
log.Error(err, "")
os.Exit(1)
Expand Down Expand Up @@ -209,6 +217,19 @@ func main() {
os.Exit(1)
}

if preDelete {
// We've built a client - we can use it to clean up.
if err := executePreDeleteHook(ctx, c); err != nil {
log.Error(err, "Failed to complete pre-delete hook")
os.Exit(1)
}
os.Exit(0)
}

// sigHandler is a context that is canceled when we receive a termination
// signal. We don't want to immeditely terminate upon receipt of such a signal since
// there may be cleanup required. So, we will pass a separate context to our controllers.
// That context will be canceled after a successful cleanup.
sigHandler := ctrl.SetupSignalHandler()
active.WaitUntilActive(cs, c, sigHandler, setupLog)
log.Info("Active operator: proceeding")
Expand Down Expand Up @@ -253,6 +274,68 @@ func main() {
os.Exit(1)
}

// Start a goroutine to handle termination.
go func() {
// Cancel the main context when we are done.
defer cancel()

// Wait for a signal.
<-sigHandler.Done()

// Check if we need to do any cleanup.
client := mgr.GetClient()
instance := &v1.Installation{}
retries := 0
for {
if err := client.Get(ctx, utils.DefaultInstanceKey, instance); errors.IsNotFound(err) {
// No installation - we can exit immediately.
return
} else if err != nil {
// Error querying - retry after a small sleep.
if retries >= 5 {
log.Errorf("Too many retries, exiting with error: %s", err)
return
}
log.Errorf("Error querying Installation, will retry: %s", err)
retries++
time.Sleep(1 * time.Second)
continue
}

// Success
break
}

if instance.DeletionTimestamp == nil {
// Installation isn't terminating, so we can exit immediately.
return
}

// We need to wait for termination to complete. We can do this by checking if the Installation
// resource has been cleaned up or not.
to := 60 * time.Second
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set terminationGracePeriodSeconds to 60s to match this since the default is 30s?

log.Infof("Waiting up to %s for graceful termination to complete", to)
timeout := time.After(to)
for {
select {
case <-timeout:
// Timeout. Continue with shutdown.
log.Warning("Timed out waiting for graceful shutdown to complete")
return
default:
err := client.Get(ctx, utils.DefaultInstanceKey, instance)
if errors.IsNotFound(err) {
// Installation has been cleaned up, we can terminate.
log.Info("Graceful termination complete")
return
} else if err != nil {
log.Errorf("Error querying Installation: %s", err)
}
time.Sleep(1 * time.Second)
}
}
}()

clientset, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
log.Error(err, "Failed to get client for auto provider discovery")
Expand Down Expand Up @@ -324,7 +407,7 @@ func main() {
ClusterDomain: clusterDomain,
KubernetesVersion: kubernetesVersion,
ManageCRDs: manageCRDs,
ShutdownContext: sigHandler,
ShutdownContext: ctx,
MultiTenant: multiTenant,
}

Expand All @@ -335,7 +418,7 @@ func main() {
}

setupLog.Info("starting manager")
if err := mgr.Start(sigHandler); err != nil {
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
Expand Down Expand Up @@ -416,3 +499,38 @@ func showCRDs(variant operatorv1.ProductVariant, outputType string) error {

return nil
}

func executePreDeleteHook(ctx context.Context, c client.Client) error {
defer log.Info("preDelete hook exiting")

// Clean up any custom-resources first - this will trigger teardown of pods deloyed
// by the operator, and give the operator a chance to clean up gracefully.
installation := &operatorv1.Installation{}
installation.Name = utils.DefaultInstanceKey.Name
apiserver := &operatorv1.APIServer{}
apiserver.Name = utils.DefaultInstanceKey.Name
for _, o := range []client.Object{installation, apiserver} {
if err := c.Delete(ctx, o); err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
}

// Wait for the Installation to be deleted.
to := time.After(5 * time.Minute)
for {
select {
case <-to:
return fmt.Errorf("Timeout waiting for pre-delete hook")
default:
if err := c.Get(ctx, utils.DefaultInstanceKey, installation); errors.IsNotFound(err) {
// It's gone! We can return.
return nil
}
}
log.Info("Waiting for Installation to be fully deleted")
time.Sleep(5 * time.Second)
}
}
29 changes: 22 additions & 7 deletions pkg/controller/installation/core_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import (
"github.com/tigera/operator/pkg/dns"
"github.com/tigera/operator/pkg/render"
rcertificatemanagement "github.com/tigera/operator/pkg/render/certificatemanagement"
rmeta "github.com/tigera/operator/pkg/render/common/meta"
"github.com/tigera/operator/pkg/render/common/networkpolicy"
"github.com/tigera/operator/pkg/render/common/resourcequota"
"github.com/tigera/operator/pkg/render/kubecontrollers"
Expand Down Expand Up @@ -896,22 +897,36 @@ func (r *ReconcileInstallation) Reconcile(ctx context.Context, request reconcile

// See the section 'Node and Installation finalizer' at the top of this file for details.
if terminating {
// Keep the finalizer on the Installation until the ClusterRoleBinding for calico-node
// (and ClusterRole and ServiceAccount) is removed.
// Keep a finalizer on the Installation object until all necessary dependencies have been cleaned up.
// This ensures we don't delete the CNI plugin and calico-node too early, as they are a pre-requisite for tearing
// down networking for other pods deployed by this operator.
doneTerminating := true

// Wait until the calico-node cluster role binding has been cleaned up.
crb := rbacv1.ClusterRoleBinding{}
crbKey := types.NamespacedName{Name: "calico-node"}
err := r.client.Get(ctx, crbKey, &crb)
key := types.NamespacedName{Name: "calico-node"}
err := r.client.Get(ctx, key, &crb)
if err != nil && !apierrors.IsNotFound(err) {
r.status.SetDegraded(operator.ResourceNotFound, "Unable to get ClusterRoleBinding", err, reqLogger)
return reconcile.Result{}, err
}
found := false
for _, x := range crb.Finalizers {
if x == render.NodeFinalizer {
found = true
doneTerminating = false
}
}
if !found {

// Wait until the apiserver namespace has been deleted.
ns := corev1.Namespace{}
key = types.NamespacedName{Name: rmeta.APIServerNamespace(instance.Spec.Variant)}
err = r.client.Get(ctx, key, &ns)
if !apierrors.IsNotFound(err) {
// We're not ready to terminate if the apiserer namespace hasn't been deleted.
doneTerminating = false
}

// If all of the above checks passed, we can clear the finalizer.
if doneTerminating {
reqLogger.Info("Removing installation finalizer")
removeInstallationFinalizer(instance)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/render/namespaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ func (c *namespaceComponent) Objects() ([]client.Object, []client.Object) {
ns := []client.Object{
CreateNamespace(common.CalicoNamespace, c.cfg.Installation.KubernetesProvider, PSSPrivileged),
}

// If we're terminating, we don't want to delete the namespace right away.
// It will be cleaned up by Kubernetes when the Installation object is finally released.
if c.cfg.Terminating {
ns = []client.Object{}
}

if c.cfg.Installation.Variant == operatorv1.TigeraSecureEnterprise {
// We need to always have ns tigera-dex even when the Authentication CR is not present, so policies can be added to this namespace.
ns = append(ns, CreateNamespace(DexObjectName, c.cfg.Installation.KubernetesProvider, PSSRestricted))
Expand Down