Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add resource annotation and ignore recovery type #2817

Merged
merged 2 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions charts/postgres-operator/crds/postgresqls.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,9 @@ spec:
type: string
batchSize:
type: integer
cpu:
type: string
pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
database:
type: string
enableRecovery:
Expand All @@ -522,6 +525,9 @@ spec:
type: object
additionalProperties:
type: string
memory:
type: string
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
tables:
type: object
additionalProperties:
Expand All @@ -533,6 +539,8 @@ spec:
type: string
idColumn:
type: string
ignoreRecovery:
type: boolean
payloadColumn:
type: string
recoveryEventType:
Expand Down
30 changes: 23 additions & 7 deletions docs/reference/cluster_manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -652,11 +652,11 @@ can have the following properties:

* **applicationId**
The application name to which the database and CDC belongs to. For each
set of streams with a distinct `applicationId` a separate stream CR as well
as a separate logical replication slot will be created. This means there can
be different streams in the same database and streams with the same
`applicationId` are bundled in one stream CR. The stream CR will be called
like the Postgres cluster plus "-<applicationId>" suffix. Required.
set of streams with a distinct `applicationId` a separate stream resource as
well as a separate logical replication slot will be created. This means there
can be different streams in the same database and streams with the same
`applicationId` are bundled in one stream resource. The stream resource will
be called like the Postgres cluster plus "-<applicationId>" suffix. Required.

* **database**
Name of the database from where events will be published via Postgres'
Expand All @@ -667,7 +667,8 @@ can have the following properties:

* **tables**
Defines a map of table names and their properties (`eventType`, `idColumn`
and `payloadColumn`). The CDC operator is following the [outbox pattern](https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/).
and `payloadColumn`). Required.
The CDC operator is following the [outbox pattern](https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/).
The application is responsible for putting events into a (JSON/B or VARCHAR)
payload column of the outbox table in the structure of the specified target
event type. The operator will create a [PUBLICATION](https://www.postgresql.org/docs/16/logical-replication-publication.html)
Expand All @@ -676,12 +677,27 @@ can have the following properties:
committed to the outbox table. The `idColumn` will be used in telemetry for
the CDC operator. The names for `idColumn` and `payloadColumn` can be
configured. Defaults are `id` and `payload`. The target `eventType` has to
be defined. Required.
be defined. One can also specify a `recoveryEventType` that will be used
for a dead letter queue. By enabling `ignoreRecovery`, you can choose to
ignore failingn events.
idanovinda marked this conversation as resolved.
Show resolved Hide resolved

* **filter**
Streamed events can be filtered by a jsonpath expression for each table.
Optional.

* **enableRecovery**
Flag to enable a dead letter queue recovery for all streams tables.
Alternatively, recovery can also be enable for single outbox tables by only
specifying a `recoveryEventType` and no `enableRecovery` flag. When set to
false or missing, events will be retried until consuming succeeded. You can
use a `filter` expression to get rid of poison pills. Optional.

* **batchSize**
Defines the size of batches in which events are consumed. Optional.
Defaults to 1.

* **cpu**
CPU requests to be set as an annotation on the stream resource. Optional.

* **memory**
memory requests to be set as an annotation on the stream resource. Optional.
8 changes: 8 additions & 0 deletions manifests/postgresql.crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,9 @@ spec:
type: string
batchSize:
type: integer
cpu:
type: string
pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
database:
type: string
enableRecovery:
Expand All @@ -520,6 +523,9 @@ spec:
type: object
additionalProperties:
type: string
memory:
type: string
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
tables:
type: object
additionalProperties:
Expand All @@ -531,6 +537,8 @@ spec:
type: string
idColumn:
type: string
ignoreRecovery:
type: boolean
payloadColumn:
type: string
recoveryEventType:
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/acid.zalan.do/v1/postgresql_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,16 @@ type Stream struct {
Tables map[string]StreamTable `json:"tables"`
Filter map[string]*string `json:"filter,omitempty"`
BatchSize *uint32 `json:"batchSize,omitempty"`
CPU *string `json:"cpu,omitempty"`
Memory *string `json:"memory,omitempty"`
EnableRecovery *bool `json:"enableRecovery,omitempty"`
}

// StreamTable defines properties of outbox tables for FabricEventStreams
type StreamTable struct {
EventType string `json:"eventType"`
RecoveryEventType string `json:"recoveryEventType,omitempty"`
IgnoreRecovery *bool `json:"ignoreRecovery,omitempty"`
IdColumn *string `json:"idColumn,omitempty"`
PayloadColumn *string `json:"payloadColumn,omitempty"`
}
15 changes: 15 additions & 0 deletions pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 28 additions & 3 deletions pkg/cluster/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,16 +178,35 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za

func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEventStream {
eventStreams := make([]zalandov1.EventStream, 0)
resourceAnnotations := map[string]string{}

for _, stream := range c.Spec.Streams {
if stream.ApplicationId != appId {
continue
}
if stream.CPU != nil {
cpu, exists := resourceAnnotations[constants.EventStreamCpuAnnotationKey]
if exists {
isSmaller, _ := util.IsSmallerQuantity(cpu, *stream.CPU)
if isSmaller {
resourceAnnotations[constants.EventStreamCpuAnnotationKey] = *stream.CPU
}
}
}
if stream.Memory != nil {
memory, exists := resourceAnnotations[constants.EventStreamMemoryAnnotationKey]
if exists {
isSmaller, _ := util.IsSmallerQuantity(memory, *stream.Memory)
if isSmaller {
resourceAnnotations[constants.EventStreamMemoryAnnotationKey] = *stream.Memory
}
}
}
for tableName, table := range stream.Tables {
streamSource := c.getEventStreamSource(stream, tableName, table.IdColumn)
streamFlow := getEventStreamFlow(table.PayloadColumn)
streamSink := getEventStreamSink(stream, table.EventType)
streamRecovery := getEventStreamRecovery(stream, table.RecoveryEventType, table.EventType)
streamRecovery := getEventStreamRecovery(stream, table.RecoveryEventType, table.EventType, table.IgnoreRecovery)

eventStreams = append(eventStreams, zalandov1.EventStream{
EventStreamFlow: streamFlow,
Expand All @@ -207,7 +226,7 @@ func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEvent
Name: fmt.Sprintf("%s-%s", c.Name, strings.ToLower(util.RandomPassword(5))),
Namespace: c.Namespace,
Labels: c.labelsSet(true),
Annotations: c.AnnotationsToPropagate(c.annotationsSet(nil)),
Annotations: c.AnnotationsToPropagate(c.annotationsSet(resourceAnnotations)),
OwnerReferences: c.ownerReferences(),
},
Spec: zalandov1.FabricEventStreamSpec{
Expand Down Expand Up @@ -247,14 +266,20 @@ func getEventStreamSink(stream acidv1.Stream, eventType string) zalandov1.EventS
}
}

func getEventStreamRecovery(stream acidv1.Stream, recoveryEventType, eventType string) zalandov1.EventStreamRecovery {
func getEventStreamRecovery(stream acidv1.Stream, recoveryEventType, eventType string, ignoreRecovery *bool) zalandov1.EventStreamRecovery {
if (stream.EnableRecovery != nil && !*stream.EnableRecovery) ||
(stream.EnableRecovery == nil && recoveryEventType == "") {
return zalandov1.EventStreamRecovery{
Type: constants.EventStreamRecoveryNoneType,
}
}

if ignoreRecovery != nil && *ignoreRecovery {
return zalandov1.EventStreamRecovery{
Type: constants.EventStreamRecoveryIgnoreType,
}
}

if stream.EnableRecovery != nil && *stream.EnableRecovery && recoveryEventType == "" {
recoveryEventType = fmt.Sprintf("%s-%s", eventType, constants.EventStreamRecoverySuffix)
}
Expand Down
52 changes: 50 additions & 2 deletions pkg/cluster/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,18 @@ var (
EventType: "stream-type-b",
RecoveryEventType: "stream-type-b-dlq",
},
"data.foofoobar": {
EventType: "stream-type-c",
IgnoreRecovery: util.True(),
},
},
EnableRecovery: util.True(),
Filter: map[string]*string{
"data.bar": k8sutil.StringToPointer("[?(@.source.txId > 500 && @.source.lsn > 123456)]"),
},
BatchSize: k8sutil.UInt32ToPointer(uint32(100)),
CPU: k8sutil.StringToPointer("250m"),
Memory: k8sutil.StringToPointer("500Mi"),
},
},
TeamID: "acid",
Expand All @@ -88,6 +94,10 @@ var (
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-12345", clusterName),
Namespace: namespace,
Annotations: map[string]string{
constants.EventStreamCpuAnnotationKey: "250m",
constants.EventStreamMemoryAnnotationKey: "500Mi",
},
Labels: map[string]string{
"application": "spilo",
"cluster-name": fmt.Sprintf("%s-2", clusterName),
Expand Down Expand Up @@ -180,6 +190,37 @@ var (
Type: constants.EventStreamSourcePGType,
},
},
{
EventStreamFlow: zalandov1.EventStreamFlow{
Type: constants.EventStreamFlowPgGenericType,
},
EventStreamRecovery: zalandov1.EventStreamRecovery{
Type: constants.EventStreamRecoveryIgnoreType,
},
EventStreamSink: zalandov1.EventStreamSink{
EventType: "stream-type-c",
MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)),
Type: constants.EventStreamSinkNakadiType,
},
EventStreamSource: zalandov1.EventStreamSource{
Connection: zalandov1.Connection{
DBAuth: zalandov1.DBAuth{
Name: fmt.Sprintf("fes-user.%s.credentials.postgresql.acid.zalan.do", clusterName),
PasswordKey: "password",
Type: constants.EventStreamSourceAuthType,
UserKey: "username",
},
Url: fmt.Sprintf("jdbc:postgresql://%s.%s/foo?user=%s&ssl=true&sslmode=require", clusterName, namespace, fesUser),
SlotName: slotName,
PluginType: constants.EventStreamSourcePluginType,
},
Schema: "data",
EventStreamTable: zalandov1.EventStreamTable{
Name: "foofoobar",
},
Type: constants.EventStreamSourcePGType,
},
},
},
},
}
Expand Down Expand Up @@ -545,8 +586,8 @@ func TestSyncStreams(t *testing.T) {

func TestSameStreams(t *testing.T) {
testName := "TestSameStreams"
annotationsA := map[string]string{"owned-by": "acid"}
annotationsB := map[string]string{"owned-by": "foo"}
annotationsA := map[string]string{constants.EventStreamMemoryAnnotationKey: "500Mi"}
annotationsB := map[string]string{constants.EventStreamMemoryAnnotationKey: "1Gi"}

stream1 := zalandov1.EventStream{
EventStreamFlow: zalandov1.EventStreamFlow{},
Expand Down Expand Up @@ -638,6 +679,13 @@ func TestSameStreams(t *testing.T) {
match: false,
reason: "event stream specs differ",
},
{
subTest: "event stream annotations differ",
streamsA: newFabricEventStream([]zalandov1.EventStream{stream2}, nil),
streamsB: newFabricEventStream([]zalandov1.EventStream{stream3}, annotationsA),
match: false,
reason: "event stream specs differ",
},
{
subTest: "event stream annotations differ",
streamsA: newFabricEventStream([]zalandov1.EventStream{stream2}, annotationsA),
Expand Down
27 changes: 15 additions & 12 deletions pkg/util/constants/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ package constants

// PostgreSQL specific constants
const (
EventStreamCRDApiVersion = "zalando.org/v1"
EventStreamCRDKind = "FabricEventStream"
EventStreamCRDName = "fabriceventstreams.zalando.org"
EventStreamSourcePGType = "PostgresLogicalReplication"
EventStreamSourceSlotPrefix = "fes"
EventStreamSourcePluginType = "pgoutput"
EventStreamSourceAuthType = "DatabaseAuthenticationSecret"
EventStreamFlowPgGenericType = "PostgresWalToGenericNakadiEvent"
EventStreamSinkNakadiType = "Nakadi"
EventStreamRecoveryNoneType = "None"
EventStreamRecoveryDLQType = "DeadLetter"
EventStreamRecoverySuffix = "dead-letter-queue"
EventStreamCRDApiVersion = "zalando.org/v1"
EventStreamCRDKind = "FabricEventStream"
EventStreamCRDName = "fabriceventstreams.zalando.org"
EventStreamSourcePGType = "PostgresLogicalReplication"
EventStreamSourceSlotPrefix = "fes"
EventStreamSourcePluginType = "pgoutput"
EventStreamSourceAuthType = "DatabaseAuthenticationSecret"
EventStreamFlowPgGenericType = "PostgresWalToGenericNakadiEvent"
EventStreamSinkNakadiType = "Nakadi"
EventStreamRecoveryDLQType = "DeadLetter"
EventStreamRecoveryIgnoreType = "Ignore"
EventStreamRecoveryNoneType = "None"
EventStreamRecoverySuffix = "dead-letter-queue"
EventStreamCpuAnnotationKey = "fes.zalando.org/FES_CPU"
EventStreamMemoryAnnotationKey = "fes.zalando.org/FES_MEMORY"
)
Loading