Skip to content


Ensure podAnnotations are removed from pods if reset in the config
Browse files Browse the repository at this point in the history
  • Loading branch information
hughcapet committed Dec 23, 2024
1 parent 6035fdd commit 93d718e
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool
newPodAnnotation := new.Spec.JobTemplate.Spec.Template.Annotations
curPodAnnotation := cur.Spec.JobTemplate.Spec.Template.Annotations
if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation); changed {
return false, fmt.Sprintf("new job's pod template metadata annotations does not match " + reason)
return false, fmt.Sprint("new job's pod template metadata annotations do not match " + reason)

newPgVersion := getPgVersion(new)
Expand Down
60 changes: 46 additions & 14 deletions pkg/cluster/connection_pooler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1037,10 +1037,44 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
syncReason = append(syncReason, specReason...)

listOptions := metav1.ListOptions{
LabelSelector: labels.Set(c.connectionPoolerLabels(role, true).MatchLabels).String(),
pods, err = c.listPoolerPods(listOptions)
newPodAnnotations := c.annotationsSet(c.generatePodAnnotations(&c.Spec))
if changed, reason := c.compareAnnotations(deployment.Spec.Template.Annotations, newPodAnnotations); changed {
specSync = true
syncReason = append(syncReason, []string{"new connection pooler's pod template annotations do not match the current ones: " + reason}...)

if strings.Contains(reason, "Removed") {
for anno := range deployment.Spec.Template.Annotations {
if _, ok := newPodAnnotations[anno]; !ok {
// template annotation was removed
for _, ignore := range c.OpConfig.IgnoredAnnotations {
if anno == ignore {
annotationToRemove := []byte(fmt.Sprintf(`{"metadata":{"annotations":{"%s":null}}}`, anno))
annotationToRemoveTemplate := []byte(fmt.Sprintf(`{"spec":{"template":{"metadata":{"annotations":{"%s":null}}}}}`, anno))
deployment, err = c.KubeClient.Deployments(c.Namespace).Patch(context.TODO(),
deployment.Name, types.StrategicMergePatchType, annotationToRemoveTemplate, metav1.PatchOptions{}, "")
if err != nil {
c.logger.Errorf("failed to remove annotation %s from %s connection pooler's pod template: %v",
anno, role, err)
return nil, err
for _, pod := range pods {
_, err = c.KubeClient.Pods(c.Namespace).Patch(context.TODO(), pod.Name,
types.StrategicMergePatchType, annotationToRemove, metav1.PatchOptions{})
if err != nil {
c.logger.Errorf("failed to remove annotation %s from pod %s: %v", anno, pod.Name, err)
return nil, err
deployment.Spec.Template.Annotations = newPodAnnotations

Expand All @@ -1060,7 +1094,6 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
if err != nil {
return syncReason, err
c.ConnectionPooler[role].Deployment = deployment

newAnnotations := c.AnnotationsToPropagate(c.annotationsSet(nil)) // including the downscaling annotations
Expand All @@ -1069,15 +1102,10 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
if err != nil {
return nil, err
c.ConnectionPooler[role].Deployment = deployment

// check if pooler pods must be replaced due to secret update
listOptions := metav1.ListOptions{
LabelSelector: labels.Set(c.connectionPoolerLabels(role, true).MatchLabels).String(),
pods, err = c.listPoolerPods(listOptions)
if err != nil {
return nil, err
Expand All @@ -1098,18 +1126,22 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
if err != nil {
return nil, fmt.Errorf("could not delete pooler pod: %v", err)
} else if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations); changed {
patchData, err := metaAnnotationsPatch(deployment.Spec.Template.Annotations)
if err != nil {
return nil, fmt.Errorf("could not form patch for pooler's pod annotations: %v", err)
_, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
if err != nil {
return nil, fmt.Errorf("could not patch annotations for pooler's pod %q: %v", pod.Name, err)
} else {
if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations); changed {
patchData, err := metaAnnotationsPatch(deployment.Spec.Template.Annotations)
if err != nil {
return nil, fmt.Errorf("could not form patch for pooler's pod annotations: %v", err)
_, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
if err != nil {
return nil, fmt.Errorf("could not patch annotations for pooler's pod %q: %v", pod.Name, err)

c.ConnectionPooler[role].Deployment = deployment

if service, err = c.KubeClient.Services(c.Namespace).Get(context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}); err == nil {
c.ConnectionPooler[role].Service = service
desiredSvc := c.generateConnectionPoolerService(c.ConnectionPooler[role])
Expand Down
39 changes: 39 additions & 0 deletions pkg/cluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,25 @@ func (c *Cluster) syncStatefulSet() error {
for anno := range c.Statefulset.Spec.Template.Annotations {
if _, ok := desiredSts.Spec.Template.Annotations[anno]; !ok {
// template annotation was removed
for _, ignore := range c.OpConfig.IgnoredAnnotations {
if anno == ignore {
annotationToRemove := []byte(fmt.Sprintf(`{"metadata":{"annotations":{"%s":null}}}`, anno))
for _, pod := range pods {
_, err = c.KubeClient.Pods(c.Namespace).Patch(context.Background(), pod.Name,
types.StrategicMergePatchType, annotationToRemove, metav1.PatchOptions{})
if err != nil {
c.logger.Errorf("failed to remove annotation %s from pod %s: %v", anno, pod.Name, err)
return err
if !cmp.match {
if cmp.rollingUpdate {
Expand Down Expand Up @@ -1594,6 +1613,26 @@ func (c *Cluster) syncLogicalBackupJob() error {
if reason != "" {
c.logger.Infof("reason: %s", reason)
if strings.Contains(reason, "annotations do not match") {
for anno := range job.Spec.JobTemplate.Spec.Template.Annotations {
if _, ok := desiredJob.Spec.JobTemplate.Spec.Template.Annotations[anno]; !ok {
// template annotation was removed
for _, ignore := range c.OpConfig.IgnoredAnnotations {
if anno == ignore {
annotationToRemoveTemplate := []byte(fmt.Sprintf(
`{"spec":{"jobTemplate":{"spec":{"template":{"metadata":{"annotations":{"%s":null}}}}}}}`, anno))
job, err = c.KubeClient.CronJobs(c.Namespace).Patch(context.TODO(),
jobName, types.StrategicMergePatchType, annotationToRemoveTemplate, metav1.PatchOptions{}, "")
if err != nil {
c.logger.Errorf("failed to remove annotation %s from the logical backup job %q pod template: %v", anno, jobName, err)
return err
if err = c.patchLogicalBackupJob(desiredJob); err != nil {
return fmt.Errorf("could not update logical backup job to match desired state: %v", err)
Expand Down
157 changes: 157 additions & 0 deletions pkg/cluster/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,163 @@ func TestSyncStatefulSetsAnnotations(t *testing.T) {

func TestPodAnnotationsSync(t *testing.T) {
clusterName := "acid-test-cluster-2"
namespace := "default"
podAnnotation := "no-scale-down"
podAnnotations := map[string]string{"no-scale-down": "true"}

ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockClient := mocks.NewMockHTTPClient(ctrl)
client, _ := newFakeK8sAnnotationsClient()

pg := acidv1.Postgresql{
ObjectMeta: metav1.ObjectMeta{
Name: clusterName,
Namespace: namespace,
Spec: acidv1.PostgresSpec{
Volume: acidv1.Volume{
Size: "1Gi",
EnableConnectionPooler: boolToPointer(true),
EnableLogicalBackup: true,
EnableReplicaConnectionPooler: boolToPointer(true),
PodAnnotations: podAnnotations,
NumberOfInstances: 2,

var cluster = New(
OpConfig: config.Config{
PatroniAPICheckInterval: time.Duration(1),
PatroniAPICheckTimeout: time.Duration(5),
PodManagementPolicy: "ordered_ready",
ConnectionPooler: config.ConnectionPooler{
ConnectionPoolerDefaultCPURequest: "100m",
ConnectionPoolerDefaultCPULimit: "100m",
ConnectionPoolerDefaultMemoryRequest: "100Mi",
ConnectionPoolerDefaultMemoryLimit: "100Mi",
NumberOfInstances: k8sutil.Int32ToPointer(1),
Resources: config.Resources{
ClusterLabels: map[string]string{"application": "spilo"},
ClusterNameLabel: "cluster-name",
DefaultCPURequest: "300m",
DefaultCPULimit: "300m",
DefaultMemoryRequest: "300Mi",
DefaultMemoryLimit: "300Mi",
PodRoleLabel: "spilo-role",
ResourceCheckInterval: time.Duration(3),
ResourceCheckTimeout: time.Duration(10),
}, client, pg, logger, eventRecorder)

configJson := `{"postgresql": {"parameters": {"log_min_duration_statement": 200, "max_connections": 50}}}, "ttl": 20}`
response := http.Response{
StatusCode: 200,
Body: io.NopCloser(bytes.NewReader([]byte(configJson))),

mockClient.EXPECT().Do(gomock.Any()).Return(&response, nil).AnyTimes()
cluster.patroni = patroni.New(patroniLogger, mockClient)
cluster.Name = clusterName
cluster.Namespace = namespace
clusterOptions := clusterLabelsOptions(cluster)

// create a statefulset
_, err := cluster.createStatefulSet()
assert.NoError(t, err)
// create a pods
podsList := createPods(cluster)
for _, pod := range podsList {
_, err = cluster.KubeClient.Pods(namespace).Create(context.TODO(), &pod, metav1.CreateOptions{})
assert.NoError(t, err)
// create connection pooler
_, err = cluster.createConnectionPooler(mockInstallLookupFunction)
assert.NoError(t, err)

// create cron job
err = cluster.createLogicalBackupJob()
assert.NoError(t, err)

err = cluster.Sync(&cluster.Postgresql)
assert.NoError(t, err)

// 1. PodAnnotations set
stsList, err := cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions)
assert.NoError(t, err)
for _, sts := range stsList.Items {
assert.Contains(t, sts.Spec.Template.Annotations, podAnnotation)

for _, role := range []PostgresRole{Master, Replica} {
deploy, err := cluster.KubeClient.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{})
assert.NoError(t, err)
assert.Contains(t, deploy.Spec.Template.Annotations, podAnnotation,
fmt.Sprintf("pooler deployment pod template %s should contain annotation %s, found %#v",
deploy.Name, podAnnotation, deploy.Spec.Template.Annotations))
assert.NoError(t, err)

podList, err := cluster.KubeClient.Pods(namespace).List(context.TODO(), clusterOptions)
assert.NoError(t, err)
for _, pod := range podList.Items {
assert.Contains(t, pod.Annotations, podAnnotation,
fmt.Sprintf("pod %s should contain annotation %s, found %#v", pod.Name, podAnnotation, pod.Annotations))
assert.NoError(t, err)

cronJobList, err := cluster.KubeClient.CronJobs(namespace).List(context.TODO(), clusterOptions)
assert.NoError(t, err)
for _, cronJob := range cronJobList.Items {
assert.Contains(t, cronJob.Spec.JobTemplate.Spec.Template.Annotations, podAnnotation,
fmt.Sprintf("logical backup cron job's pod template should contain annotation %s, found %#v",
podAnnotation, cronJob.Spec.JobTemplate.Spec.Template.Annotations))

// 2 PodAnnotations removed
newSpec := cluster.Postgresql.DeepCopy()
newSpec.Spec.PodAnnotations = nil
err = cluster.Sync(newSpec)
assert.NoError(t, err)

stsList, err = cluster.KubeClient.StatefulSets(namespace).List(context.TODO(), clusterOptions)
assert.NoError(t, err)
for _, sts := range stsList.Items {
assert.NotContains(t, sts.Spec.Template.Annotations, "no-scale-down")

for _, role := range []PostgresRole{Master, Replica} {
deploy, err := cluster.KubeClient.Deployments(namespace).Get(context.TODO(), cluster.connectionPoolerName(role), metav1.GetOptions{})
assert.NoError(t, err)
assert.NotContains(t, deploy.Spec.Template.Annotations, podAnnotation,
fmt.Sprintf("pooler deployment pod template %s should not contain annotation %s, found %#v",
deploy.Name, podAnnotation, deploy.Spec.Template.Annotations))
assert.NoError(t, err)

podList, err = cluster.KubeClient.Pods(namespace).List(context.TODO(), clusterOptions)
assert.NoError(t, err)
for _, pod := range podList.Items {
assert.NotContains(t, pod.Annotations, "no-scale-down",
fmt.Sprintf("pod %s should not contain annotation %s, found %#v", pod.Name, podAnnotation, pod.Annotations))

cronJobList, err = cluster.KubeClient.CronJobs(namespace).List(context.TODO(), clusterOptions)
assert.NoError(t, err)
for _, cronJob := range cronJobList.Items {
assert.NotContains(t, cronJob.Annotations, podAnnotation,
fmt.Sprintf("logical backup cron job's pod template should not contain annotation %s, found %#v",
podAnnotation, cronJob.Spec.JobTemplate.Spec.Template.Annotations))

func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {
testName := "test config comparison"
client, _ := newFakeK8sSyncClient()
Expand Down

0 comments on commit 93d718e

Please sign in to comment.