Skip to content

Commit

Permalink
Add support for partition by in sinks
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbyiliev committed Oct 2, 2024
1 parent 73c08fe commit d13b2a7
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 3 deletions.
1 change: 1 addition & 0 deletions docs/resources/sink_kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ resource "materialize_sink_kafka" "example_sink_kafka" {
- `key` (List of String) An optional list of columns to use for the Kafka key. If unspecified, the Kafka key is left unset.
- `key_not_enforced` (Boolean) Disable Materialize's validation of the key's uniqueness.
- `ownership_role` (String) The owernship role of the object.
- `partition_by` (String) A SQL expression used to partition the data in the Kafka sink. Can only be used with `ENVELOPE UPSERT`.
- `region` (String) The region to use for the resource connection. If not set, the default region is used.
- `schema_name` (String) The identifier for the sink schema in Materialize. Defaults to `public`.
- `snapshot` (Boolean) Whether to emit the consolidated results of the query before the sink was created at the start of the sink.
Expand Down
11 changes: 11 additions & 0 deletions pkg/materialize/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type SinkKafkaBuilder struct {
snapshot bool
headers string
keyNotEnforced bool
partitionBy string
}

func NewSinkKafkaBuilder(conn *sqlx.DB, obj MaterializeObject) *SinkKafkaBuilder {
Expand Down Expand Up @@ -124,6 +125,11 @@ func (b *SinkKafkaBuilder) KeyNotEnforced(s bool) *SinkKafkaBuilder {
return b
}

func (b *SinkKafkaBuilder) PartitionBy(expr string) *SinkKafkaBuilder {
b.partitionBy = expr
return b
}

func (b *SinkKafkaBuilder) Create() error {
q := strings.Builder{}
q.WriteString(fmt.Sprintf(`CREATE SINK %s`, b.QualifiedName()))
Expand Down Expand Up @@ -157,6 +163,11 @@ func (b *SinkKafkaBuilder) Create() error {
}
q.WriteString(fmt.Sprintf(`, TOPIC CONFIG MAP[%s]`, strings.Join(configItems, ", ")))
}

if b.partitionBy != "" {
q.WriteString(fmt.Sprintf(`, PARTITION BY %s`, b.partitionBy))
}

q.WriteString(")")
}

Expand Down
26 changes: 26 additions & 0 deletions pkg/materialize/sink_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,3 +512,29 @@ func TestSinkKafkaAvroCompatibilityLevelsCreate(t *testing.T) {
}
})
}

func TestSinkKafkaPartitionByCreate(t *testing.T) {
testhelpers.WithMockDb(t, func(db *sqlx.DB, mock sqlmock.Sqlmock) {
mock.ExpectExec(
`CREATE SINK "database"."schema"."sink"
FROM "database"."schema"."src"
INTO KAFKA CONNECTION "database"."schema"."kafka_conn"
\(TOPIC 'testdrive-snk1-seed', PARTITION BY customer_id\)
FORMAT JSON
ENVELOPE UPSERT;`,
).WillReturnResult(sqlmock.NewResult(1, 1))

o := MaterializeObject{Name: "sink", SchemaName: "schema", DatabaseName: "database"}
b := NewSinkKafkaBuilder(db, o)
b.From(IdentifierSchemaStruct{Name: "src", SchemaName: "schema", DatabaseName: "database"})
b.KafkaConnection(IdentifierSchemaStruct{Name: "kafka_conn", SchemaName: "schema", DatabaseName: "database"})
b.Topic("testdrive-snk1-seed")
b.PartitionBy("customer_id")
b.Format(SinkFormatSpecStruct{Json: true})
b.Envelope(KafkaSinkEnvelopeStruct{Upsert: true})

if err := b.Create(); err != nil {
t.Fatal(err)
}
})
}
7 changes: 7 additions & 0 deletions pkg/provider/acceptance_sink_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestAccSinkKafka_basic(t *testing.T) {
resource.TestCheckResourceAttr("materialize_sink_kafka.sink_kafka_headers", "snapshot", "true"),
resource.TestCheckResourceAttr("materialize_sink_kafka.sink_kafka_headers", "headers", "column_1"),
resource.TestCheckResourceAttr("materialize_sink_kafka.sink_kafka_headers", "envelope.0.upsert", "true"),
resource.TestCheckResourceAttr("materialize_sink_kafka.sink_kafka_headers", "partition_by", "column_2"),
),
},
{
Expand Down Expand Up @@ -302,6 +303,10 @@ func testAccSinkKafkaResource(roleName, connName, tableName, sinkName, sink2Name
name = "column_1"
type = "map[text => text]"
}
column {
name = "column_2"
type = "int"
}
lifecycle {
ignore_changes = [column]
}
Expand Down Expand Up @@ -410,6 +415,8 @@ func testAccSinkKafkaResource(roleName, connName, tableName, sinkName, sink2Name
envelope {
upsert = true
}
partition_by = "column_2"
}
`, roleName, connName, tableName, sinkName, sink2Name, sinkOwner, comment)
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/resources/resource_sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ var sinkKafkaSchema = map[string]*schema.Schema{
Optional: true,
ForceNew: true,
},
"partition_by": {
Description: "A SQL expression used to partition the data in the Kafka sink. Can only be used with `ENVELOPE UPSERT`.",
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},
"region": RegionSchema(),
}

Expand Down Expand Up @@ -221,6 +227,10 @@ func sinkKafkaCreate(ctx context.Context, d *schema.ResourceData, meta any) diag
b.Headers(v.(string))
}

if v, ok := d.GetOk("partition_by"); ok {
b.PartitionBy(v.(string))
}

// create resource
if err := b.Create(); err != nil {
return diag.FromErr(err)
Expand Down
7 changes: 4 additions & 3 deletions pkg/resources/resource_sink_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ var inSinkKafka = map[string]interface{}{
},
},
},
"envelope": []interface{}{map[string]interface{}{"upsert": true}},
"snapshot": false,
"envelope": []interface{}{map[string]interface{}{"upsert": true}},
"partition_by": "partition_by",
"snapshot": false,
}

func TestResourceSinkKafkaCreate(t *testing.T) {
Expand All @@ -109,7 +110,7 @@ func TestResourceSinkKafkaCreate(t *testing.T) {
IN CLUSTER "cluster" FROM "database"."public"."item"
INTO KAFKA CONNECTION "materialize"."public"."kafka_conn"
\(TOPIC 'topic', COMPRESSION TYPE = gzip, TOPIC REPLICATION FACTOR = 3, TOPIC PARTITION COUNT = 6,
TOPIC CONFIG MAP\[('cleanup.policy' => 'compact'|'retention.ms' => '86400000'),\s*('cleanup.policy' => 'compact'|'retention.ms' => '86400000')\]\)
TOPIC CONFIG MAP\[('cleanup.policy' => 'compact'|'retention.ms' => '86400000'),\s*('cleanup.policy' => 'compact'|'retention.ms' => '86400000')\], PARTITION BY partition_by\)
KEY \(key_1, key_2\)
NOT ENFORCED HEADERS headers FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION "database"."schema"."csr_conn"
\(AVRO KEY FULLNAME 'avro_key_fullname' AVRO VALUE FULLNAME 'avro_value_fullname',
Expand Down

0 comments on commit d13b2a7

Please sign in to comment.