Skip to content

Commit

Permalink
add resource annotation and ignore recovery type
Browse files Browse the repository at this point in the history
  • Loading branch information
FxKu committed Dec 13, 2024
1 parent c206eb3 commit b6ea848
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 24 deletions.
8 changes: 8 additions & 0 deletions charts/postgres-operator/crds/postgresqls.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,9 @@ spec:
type: string
batchSize:
type: integer
cpu:
type: string
pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
database:
type: string
enableRecovery:
Expand All @@ -522,6 +525,9 @@ spec:
type: object
additionalProperties:
type: string
memory:
type: string
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
tables:
type: object
additionalProperties:
Expand All @@ -533,6 +539,8 @@ spec:
type: string
idColumn:
type: string
ignoreRecovery:
type: boolean
payloadColumn:
type: string
recoveryEventType:
Expand Down
30 changes: 23 additions & 7 deletions docs/reference/cluster_manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -652,11 +652,11 @@ can have the following properties:

* **applicationId**
The application name to which the database and CDC belongs to. For each
set of streams with a distinct `applicationId` a separate stream CR as well
as a separate logical replication slot will be created. This means there can
be different streams in the same database and streams with the same
`applicationId` are bundled in one stream CR. The stream CR will be called
like the Postgres cluster plus "-<applicationId>" suffix. Required.
set of streams with a distinct `applicationId` a separate stream resource as
well as a separate logical replication slot will be created. This means there
can be different streams in the same database and streams with the same
`applicationId` are bundled in one stream resource. The stream resource will
be called like the Postgres cluster plus "-<applicationId>" suffix. Required.

* **database**
Name of the database from where events will be published via Postgres'
Expand All @@ -667,7 +667,8 @@ can have the following properties:

* **tables**
Defines a map of table names and their properties (`eventType`, `idColumn`
and `payloadColumn`). The CDC operator is following the [outbox pattern](https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/).
and `payloadColumn`). Required.
The CDC operator is following the [outbox pattern](https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/).
The application is responsible for putting events into a (JSON/B or VARCHAR)
payload column of the outbox table in the structure of the specified target
event type. The operator will create a [PUBLICATION](https://www.postgresql.org/docs/16/logical-replication-publication.html)
Expand All @@ -676,12 +677,27 @@ can have the following properties:
committed to the outbox table. The `idColumn` will be used in telemetry for
the CDC operator. The names for `idColumn` and `payloadColumn` can be
configured. Defaults are `id` and `payload`. The target `eventType` has to
be defined. Required.
be defined. One can also specify a `recoveryEventType` that will be used
for a dead letter queue. By enabling `ignoreRecovery`, you can choose to
ignore failingn events.

* **filter**
Streamed events can be filtered by a jsonpath expression for each table.
Optional.

* **enableRecovery**
Flag to enable a dead letter queue recovery for all streams tables.
Alternatively, recovery can also be enable for single outbox tables by only
specifying a `recoveryEventType` and no `enableRecovery` flag. When set to
false or missing, events will be retried until consuming succeeded. You can
use a `filter` expression to get rid of poison pills. Optional.

* **batchSize**
Defines the size of batches in which events are consumed. Optional.
Defaults to 1.

* **cpu**
CPU requests to be set as an annotation on the stream resource. Optional.

* **memory**
memory requests to be set as an annotation on the stream resource. Optional.
8 changes: 8 additions & 0 deletions manifests/postgresql.crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,9 @@ spec:
type: string
batchSize:
type: integer
cpu:
type: string
pattern: '^(\d+m|\d+(\.\d{1,3})?)$'
database:
type: string
enableRecovery:
Expand All @@ -520,6 +523,9 @@ spec:
type: object
additionalProperties:
type: string
memory:
type: string
pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$'
tables:
type: object
additionalProperties:
Expand All @@ -531,6 +537,8 @@ spec:
type: string
idColumn:
type: string
ignoreRecovery:
type: boolean
payloadColumn:
type: string
recoveryEventType:
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/acid.zalan.do/v1/postgresql_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,16 @@ type Stream struct {
Tables map[string]StreamTable `json:"tables"`
Filter map[string]*string `json:"filter,omitempty"`
BatchSize *uint32 `json:"batchSize,omitempty"`
CPU *string `json:"cpu,omitempty"`
Memory *string `json:"memory,omitempty"`
EnableRecovery *bool `json:"enableRecovery,omitempty"`
}

// StreamTable defines properties of outbox tables for FabricEventStreams
type StreamTable struct {
EventType string `json:"eventType"`
RecoveryEventType string `json:"recoveryEventType,omitempty"`
IgnoreRecovery *bool `json:"ignoreRecovery,omitempty"`
IdColumn *string `json:"idColumn,omitempty"`
PayloadColumn *string `json:"payloadColumn,omitempty"`
}
15 changes: 15 additions & 0 deletions pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 28 additions & 3 deletions pkg/cluster/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,16 +178,35 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za

func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEventStream {
eventStreams := make([]zalandov1.EventStream, 0)
resourceAnnotations := map[string]string{}

for _, stream := range c.Spec.Streams {
if stream.ApplicationId != appId {
continue
}
if stream.CPU != nil {
cpu, exists := resourceAnnotations[constants.EventStreamCpuAnnotationKey]
if exists {
isSmaller, _ := util.IsSmallerQuantity(cpu, *stream.CPU)
if isSmaller {
resourceAnnotations[constants.EventStreamCpuAnnotationKey] = *stream.CPU
}
}
}
if stream.Memory != nil {
memory, exists := resourceAnnotations[constants.EventStreamMemoryAnnotationKey]
if exists {
isSmaller, _ := util.IsSmallerQuantity(memory, *stream.Memory)
if isSmaller {
resourceAnnotations[constants.EventStreamMemoryAnnotationKey] = *stream.Memory
}
}
}
for tableName, table := range stream.Tables {
streamSource := c.getEventStreamSource(stream, tableName, table.IdColumn)
streamFlow := getEventStreamFlow(table.PayloadColumn)
streamSink := getEventStreamSink(stream, table.EventType)
streamRecovery := getEventStreamRecovery(stream, table.RecoveryEventType, table.EventType)
streamRecovery := getEventStreamRecovery(stream, table.RecoveryEventType, table.EventType, table.IgnoreRecovery)

eventStreams = append(eventStreams, zalandov1.EventStream{
EventStreamFlow: streamFlow,
Expand All @@ -207,7 +226,7 @@ func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEvent
Name: fmt.Sprintf("%s-%s", c.Name, strings.ToLower(util.RandomPassword(5))),
Namespace: c.Namespace,
Labels: c.labelsSet(true),
Annotations: c.AnnotationsToPropagate(c.annotationsSet(nil)),
Annotations: c.AnnotationsToPropagate(c.annotationsSet(resourceAnnotations)),
OwnerReferences: c.ownerReferences(),
},
Spec: zalandov1.FabricEventStreamSpec{
Expand Down Expand Up @@ -247,14 +266,20 @@ func getEventStreamSink(stream acidv1.Stream, eventType string) zalandov1.EventS
}
}

func getEventStreamRecovery(stream acidv1.Stream, recoveryEventType, eventType string) zalandov1.EventStreamRecovery {
func getEventStreamRecovery(stream acidv1.Stream, recoveryEventType, eventType string, ignoreRecovery *bool) zalandov1.EventStreamRecovery {
if (stream.EnableRecovery != nil && !*stream.EnableRecovery) ||
(stream.EnableRecovery == nil && recoveryEventType == "") {
return zalandov1.EventStreamRecovery{
Type: constants.EventStreamRecoveryNoneType,
}
}

if ignoreRecovery != nil && *ignoreRecovery {
return zalandov1.EventStreamRecovery{
Type: constants.EventStreamRecoveryIgnoreType,
}
}

if stream.EnableRecovery != nil && *stream.EnableRecovery && recoveryEventType == "" {
recoveryEventType = fmt.Sprintf("%s-%s", eventType, constants.EventStreamRecoverySuffix)
}
Expand Down
52 changes: 50 additions & 2 deletions pkg/cluster/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,18 @@ var (
EventType: "stream-type-b",
RecoveryEventType: "stream-type-b-dlq",
},
"data.foofoobar": {
EventType: "stream-type-c",
IgnoreRecovery: util.True(),
},
},
EnableRecovery: util.True(),
Filter: map[string]*string{
"data.bar": k8sutil.StringToPointer("[?(@.source.txId > 500 && @.source.lsn > 123456)]"),
},
BatchSize: k8sutil.UInt32ToPointer(uint32(100)),
CPU: k8sutil.StringToPointer("250m"),
Memory: k8sutil.StringToPointer("500Mi"),
},
},
TeamID: "acid",
Expand All @@ -88,6 +94,10 @@ var (
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-12345", clusterName),
Namespace: namespace,
Annotations: map[string]string{
constants.EventStreamCpuAnnotationKey: "250m",
constants.EventStreamMemoryAnnotationKey: "500Mi",
},
Labels: map[string]string{
"application": "spilo",
"cluster-name": fmt.Sprintf("%s-2", clusterName),
Expand Down Expand Up @@ -180,6 +190,37 @@ var (
Type: constants.EventStreamSourcePGType,
},
},
{
EventStreamFlow: zalandov1.EventStreamFlow{
Type: constants.EventStreamFlowPgGenericType,
},
EventStreamRecovery: zalandov1.EventStreamRecovery{
Type: constants.EventStreamRecoveryIgnoreType,
},
EventStreamSink: zalandov1.EventStreamSink{
EventType: "stream-type-c",
MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)),
Type: constants.EventStreamSinkNakadiType,
},
EventStreamSource: zalandov1.EventStreamSource{
Connection: zalandov1.Connection{
DBAuth: zalandov1.DBAuth{
Name: fmt.Sprintf("fes-user.%s.credentials.postgresql.acid.zalan.do", clusterName),
PasswordKey: "password",
Type: constants.EventStreamSourceAuthType,
UserKey: "username",
},
Url: fmt.Sprintf("jdbc:postgresql://%s.%s/foo?user=%s&ssl=true&sslmode=require", clusterName, namespace, fesUser),
SlotName: slotName,
PluginType: constants.EventStreamSourcePluginType,
},
Schema: "data",
EventStreamTable: zalandov1.EventStreamTable{
Name: "foofoobar",
},
Type: constants.EventStreamSourcePGType,
},
},
},
},
}
Expand Down Expand Up @@ -545,8 +586,8 @@ func TestSyncStreams(t *testing.T) {

func TestSameStreams(t *testing.T) {
testName := "TestSameStreams"
annotationsA := map[string]string{"owned-by": "acid"}
annotationsB := map[string]string{"owned-by": "foo"}
annotationsA := map[string]string{constants.EventStreamMemoryAnnotationKey: "500Mi"}
annotationsB := map[string]string{constants.EventStreamMemoryAnnotationKey: "1Gi"}

stream1 := zalandov1.EventStream{
EventStreamFlow: zalandov1.EventStreamFlow{},
Expand Down Expand Up @@ -638,6 +679,13 @@ func TestSameStreams(t *testing.T) {
match: false,
reason: "event stream specs differ",
},
{
subTest: "event stream annotations differ",
streamsA: newFabricEventStream([]zalandov1.EventStream{stream2}, nil),
streamsB: newFabricEventStream([]zalandov1.EventStream{stream3}, annotationsA),
match: false,
reason: "event stream specs differ",
},
{
subTest: "event stream annotations differ",
streamsA: newFabricEventStream([]zalandov1.EventStream{stream2}, annotationsA),
Expand Down
27 changes: 15 additions & 12 deletions pkg/util/constants/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ package constants

// PostgreSQL specific constants
const (
EventStreamCRDApiVersion = "zalando.org/v1"
EventStreamCRDKind = "FabricEventStream"
EventStreamCRDName = "fabriceventstreams.zalando.org"
EventStreamSourcePGType = "PostgresLogicalReplication"
EventStreamSourceSlotPrefix = "fes"
EventStreamSourcePluginType = "pgoutput"
EventStreamSourceAuthType = "DatabaseAuthenticationSecret"
EventStreamFlowPgGenericType = "PostgresWalToGenericNakadiEvent"
EventStreamSinkNakadiType = "Nakadi"
EventStreamRecoveryNoneType = "None"
EventStreamRecoveryDLQType = "DeadLetter"
EventStreamRecoverySuffix = "dead-letter-queue"
EventStreamCRDApiVersion = "zalando.org/v1"
EventStreamCRDKind = "FabricEventStream"
EventStreamCRDName = "fabriceventstreams.zalando.org"
EventStreamSourcePGType = "PostgresLogicalReplication"
EventStreamSourceSlotPrefix = "fes"
EventStreamSourcePluginType = "pgoutput"
EventStreamSourceAuthType = "DatabaseAuthenticationSecret"
EventStreamFlowPgGenericType = "PostgresWalToGenericNakadiEvent"
EventStreamSinkNakadiType = "Nakadi"
EventStreamRecoveryDLQType = "DeadLetter"
EventStreamRecoveryIgnoreType = "Ignore"
EventStreamRecoveryNoneType = "None"
EventStreamRecoverySuffix = "dead-letter-queue"
EventStreamCpuAnnotationKey = "fes.zalando.org/FES_CPU"
EventStreamMemoryAnnotationKey = "fes.zalando.org/FES_MEMORY"
)

0 comments on commit b6ea848

Please sign in to comment.