Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Rename Singular Attributes #419

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/resources/source_kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ resource "materialize_source_kafka" "example_source_kafka" {
- `ownership_role` (String) The owernship role of the object.
- `schema_name` (String) The identifier for the source schema. Defaults to `public`.
- `size` (String) The size of the source. If not specified, the `cluster_name` option must be specified.
- `start_offset` (List of Number) Read partitions from the specified offset.
- `start_offsets` (List of Number) Read partitions from the specified offset.
- `start_timestamp` (Number) Use the specified value to set `START OFFSET` based on the Kafka timestamp.
- `value_format` (Block List, Max: 1) Set the value format explicitly. (see [below for nested schema](#nestedblock--value_format))

Expand Down
2 changes: 1 addition & 1 deletion docs/resources/source_postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ resource "materialize_source_postgres" "example_source_postgres" {
- `database_name` (String) The identifier for the source database. Defaults to `MZ_DATABASE` environment variable if set or `materialize` if environment variable is not set.
- `expose_progress` (Block List, Max: 1) The name of the progress subsource for the source. If this is not specified, the subsource will be named `<src_name>_progress`. (see [below for nested schema](#nestedblock--expose_progress))
- `ownership_role` (String) The owernship role of the object.
- `schema` (List of String) Creates subsources for specific schemas. If neither table or schema is specified, will default to ALL TABLES
- `schema_name` (String) The identifier for the source schema. Defaults to `public`.
- `schemas` (List of String) Creates subsources for specific schemas. If neither table or schema is specified, will default to ALL TABLES
- `size` (String) The size of the source. If not specified, the `cluster_name` option must be specified.
- `table` (Block List) Creates subsources for specific tables. If neither table or schema is specified, will default to ALL TABLES (see [below for nested schema](#nestedblock--table))
- `text_columns` (List of String) Decode data as text for specific columns that contain PostgreSQL types that are unsupported in Materialize. Can only be updated in place when also updating a corresponding `table` attribute.
Expand Down
2 changes: 1 addition & 1 deletion integration/source.tf
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ resource "materialize_source_postgres" "example_source_postgres_schema" {
name = "source_postgres_schema"
size = "3xsmall"
publication = "mz_source"
schema = ["PUBLIC"]
schemas = ["PUBLIC"]

postgres_connection {
name = materialize_connection_postgres.postgres_connection.name
Expand Down
111 changes: 111 additions & 0 deletions pkg/provider/acceptance_source_kafka_migration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package provider

import (
"fmt"
"testing"

"github.com/hashicorp/terraform-plugin-testing/helper/acctest"
"github.com/hashicorp/terraform-plugin-testing/helper/resource"
)

func TestAccSourceKafkaMigration_basic(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still working on this. Still working on finding better examples of doing the migration tests.

addTestTopic()
sourceName := acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)
resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
CheckDestroy: nil,
Steps: []resource.TestStep{
{
ExternalProviders: map[string]resource.ExternalProvider{
"materialize": {
VersionConstraint: "0.4.1",
Source: "MaterializeInc/materialize",
},
},
Config: testAccSourceKafkaMigrationV0Resource(sourceName),
Check: resource.ComposeTestCheckFunc(
testAccCheckSourceKafkaExists("materialize_source_kafka.test"),
resource.TestCheckResourceAttr("materialize_source_kafka.test", "start_offset.#", "1"),
),
},
{
ProviderFactories: testAccProviderFactories,
Config: testAccSourceKafkaMigrationV1Resource(sourceName),
Check: resource.ComposeTestCheckFunc(
testAccCheckSourceKafkaExists("materialize_source_kafka.test"),
resource.TestCheckResourceAttr("materialize_source_kafka.test", "start_offsets.#", "1"),
),
},
{
ProviderFactories: testAccProviderFactories,
ResourceName: "materialize_source_kafka.test",
ImportState: true,
ImportStateVerify: false,
},
},
})
}

func testAccSourceKafkaMigrationV0Resource(sourceName string) string {
return fmt.Sprintf(`
resource "materialize_connection_kafka" "test" {
name = "%[1]s"
kafka_broker {
broker = "redpanda:9092"
}
security_protocol = "PLAINTEXT"
}

resource "materialize_source_kafka" "test" {
name = "%[1]s"
kafka_connection {
name = materialize_connection_kafka.test.name
}

size = "3xsmall"
topic = "terraform"
key_format {
text = true
}
value_format {
text = true
}
envelope {
none = true
}
start_offset = [0]
}
`, sourceName)
}

func testAccSourceKafkaMigrationV1Resource(sourceName string) string {
return fmt.Sprintf(`
resource "materialize_connection_kafka" "test" {
name = "%[1]s"
kafka_broker {
broker = "redpanda:9092"
}
security_protocol = "PLAINTEXT"
}

resource "materialize_source_kafka" "test" {
name = "%[1]s"
kafka_connection {
name = materialize_connection_kafka.test.name
}

size = "3xsmall"
topic = "terraform"
key_format {
text = true
}
value_format {
text = true
}
envelope {
none = true
}
start_offset = [0]
}
`, sourceName)
}
4 changes: 2 additions & 2 deletions pkg/provider/acceptance_source_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestAccSourceKafka_basic(t *testing.T) {
resource.TestCheckResourceAttr("materialize_source_kafka.test", "kafka_connection.0.name", connName),
resource.TestCheckResourceAttr("materialize_source_kafka.test", "kafka_connection.0.database_name", "materialize"),
resource.TestCheckResourceAttr("materialize_source_kafka.test", "kafka_connection.0.schema_name", "public"),
resource.TestCheckResourceAttr("materialize_source_kafka.test", "start_offset.#", "1"),
resource.TestCheckResourceAttr("materialize_source_kafka.test", "start_offsets.#", "1"),
resource.TestCheckResourceAttr("materialize_source_kafka.test", "include_timestamp_alias", "timestamp_alias"),
resource.TestCheckResourceAttr("materialize_source_kafka.test", "include_offset", "true"),
resource.TestCheckResourceAttr("materialize_source_kafka.test", "include_offset_alias", "offset_alias"),
Expand Down Expand Up @@ -212,7 +212,7 @@ func testAccSourceKafkaResource(roleName, connName, sourceName, source2Name, sou
none = true
}

start_offset = [0]
start_offsets = [0]
include_timestamp_alias = "timestamp_alias"
include_offset = true
include_offset_alias = "offset_alias"
Expand Down
6 changes: 3 additions & 3 deletions pkg/provider/acceptance_source_postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func TestAccSourcePostgresSchema_basic(t *testing.T) {
resource.TestCheckResourceAttr("materialize_source_postgres.test", "qualified_sql_name", fmt.Sprintf(`"materialize"."public"."%s"`, sourceName+"_source")),
resource.TestCheckResourceAttr("materialize_source_postgres.test", "cluster_name", sourceName+"_cluster"),
resource.TestCheckResourceAttr("materialize_source_postgres.test", "publication", "mz_source"),
resource.TestCheckResourceAttr("materialize_source_postgres.test", "schema.#", "1"),
resource.TestCheckResourceAttr("materialize_source_postgres.test", "schema.0", "PUBLIC"),
resource.TestCheckResourceAttr("materialize_source_postgres.test", "schemas.#", "1"),
resource.TestCheckResourceAttr("materialize_source_postgres.test", "schemas.0", "PUBLIC"),
),
},
{
Expand Down Expand Up @@ -353,7 +353,7 @@ func testAccSourcePostgresResourceSchema(sourceName string) string {
database_name = materialize_connection_postgres.test.database_name
}
publication = "mz_source"
schema = ["PUBLIC"]
schemas = ["PUBLIC"]
}
`, sourceName)
}
Expand Down
159 changes: 155 additions & 4 deletions pkg/resources/resource_source_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package resources

import (
"context"
"fmt"
"log"

"github.com/MaterializeInc/terraform-provider-materialize/pkg/materialize"
Expand All @@ -12,6 +13,140 @@ import (
"github.com/jmoiron/sqlx"
)

func sourceKafkaSchemaV0() *schema.Resource {
return &schema.Resource{
Schema: map[string]*schema.Schema{"name": ObjectNameSchema("source", true, false),
"schema_name": SchemaNameSchema("source", false),
"database_name": DatabaseNameSchema("source", false),
"qualified_sql_name": QualifiedNameSchema("source"),
"comment": CommentSchema(false),
"cluster_name": ObjectClusterNameSchema("source"),
"size": ObjectSizeSchema("source"),
"kafka_connection": IdentifierSchema("kafka_connection", "The Kafka connection to use in the source.", true),
"topic": {
Description: "The Kafka topic you want to subscribe to.",
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"include_key": {
Description: "Include a column containing the Kafka message key.",
Type: schema.TypeBool,
Optional: true,
ForceNew: true,
},
"include_key_alias": {
Description: "Provide an alias for the key column.",
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},
"include_headers": {
Description: "Include message headers.",
Type: schema.TypeBool,
Optional: true,
ForceNew: true,
Default: false,
},
"include_headers_alias": {
Description: "Provide an alias for the headers column.",
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},
"include_partition": {
Description: "Include a partition column containing the Kafka message partition",
Type: schema.TypeBool,
Optional: true,
ForceNew: true,
},
"include_partition_alias": {
Description: "Provide an alias for the partition column.",
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},
"include_offset": {
Description: "Include an offset column containing the Kafka message offset.",
Type: schema.TypeBool,
Optional: true,
ForceNew: true,
},
"include_offset_alias": {
Description: "Provide an alias for the offset column.",
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},
"include_timestamp": {
Description: "Include a timestamp column containing the Kafka message timestamp.",
Type: schema.TypeBool,
Optional: true,
ForceNew: true,
},
"include_timestamp_alias": {
Description: "Provide an alias for the timestamp column.",
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},
"format": FormatSpecSchema("format", "How to decode raw bytes from different formats into data structures Materialize can understand at runtime.", false),
"key_format": FormatSpecSchema("key_format", "Set the key format explicitly.", false),
"value_format": FormatSpecSchema("value_format", "Set the value format explicitly.", false),
"envelope": {
Description: "How Materialize should interpret records (e.g. append-only, upsert)..",
Type: schema.TypeList,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"upsert": {
Description: "Use the upsert envelope, which uses message keys to handle CRUD operations.",
Type: schema.TypeBool,
Optional: true,
ForceNew: true,
ConflictsWith: []string{"envelope.0.debezium", "envelope.0.none"},
},
"debezium": {
Description: "Use the Debezium envelope, which uses a diff envelope to handle CRUD operations.",
Type: schema.TypeBool,
Optional: true,
ForceNew: true,
ConflictsWith: []string{"envelope.0.upsert", "envelope.0.none"},
},
"none": {
Description: "Use an append-only envelope. This means that records will only be appended and cannot be updated or deleted.",
Type: schema.TypeBool,
Optional: true,
ForceNew: true,
ConflictsWith: []string{"envelope.0.upsert", "envelope.0.debezium"},
},
},
},
Optional: true,
ForceNew: true,
},
"start_offset": {
Description: "Read partitions from the specified offset.",
Type: schema.TypeList,
Elem: &schema.Schema{Type: schema.TypeInt},
Optional: true,
ForceNew: true,
ConflictsWith: []string{"start_timestamp"},
},
"start_timestamp": {
Description: "Use the specified value to set `START OFFSET` based on the Kafka timestamp.",
Type: schema.TypeInt,
Optional: true,
ForceNew: true,
ConflictsWith: []string{"start_offset"},
},
"expose_progress": IdentifierSchema("expose_progress", "The name of the progress subsource for the source. If this is not specified, the subsource will be named `<src_name>_progress`.", false),
"subsource": SubsourceSchema(),
"ownership_role": OwnershipRoleSchema(),
},
}
}

var sourceKafkaSchema = map[string]*schema.Schema{
"name": ObjectNameSchema("source", true, false),
"schema_name": SchemaNameSchema("source", false),
Expand Down Expand Up @@ -123,7 +258,7 @@ var sourceKafkaSchema = map[string]*schema.Schema{
Optional: true,
ForceNew: true,
},
"start_offset": {
"start_offsets": {
Description: "Read partitions from the specified offset.",
Type: schema.TypeList,
Elem: &schema.Schema{Type: schema.TypeInt},
Expand All @@ -136,13 +271,21 @@ var sourceKafkaSchema = map[string]*schema.Schema{
Type: schema.TypeInt,
Optional: true,
ForceNew: true,
ConflictsWith: []string{"start_offset"},
ConflictsWith: []string{"start_offsets"},
},
"expose_progress": IdentifierSchema("expose_progress", "The name of the progress subsource for the source. If this is not specified, the subsource will be named `<src_name>_progress`.", false),
"subsource": SubsourceSchema(),
"ownership_role": OwnershipRoleSchema(),
}

func sourceKafkaStateUpgradeV0(_ context.Context, rawState map[string]interface{}, _ interface{}) (map[string]interface{}, error) {
if rawState == nil {
return nil, fmt.Errorf("SourcePostgres resource state upgrade failed, state is nil")
}
rawState["start_offsets"] = rawState["start_offset"]
return rawState, nil
}

func SourceKafka() *schema.Resource {
return &schema.Resource{
Description: "A Kafka source describes a Kafka cluster you want Materialize to read data from.",
Expand All @@ -156,7 +299,15 @@ func SourceKafka() *schema.Resource {
StateContext: schema.ImportStatePassthroughContext,
},

Schema: sourceKafkaSchema,
Schema: sourceKafkaSchema,
SchemaVersion: 1,
StateUpgraders: []schema.StateUpgrader{
{
Version: 0,
Type: sourceKafkaSchemaV0().CoreConfigSchema().ImpliedType(),
Upgrade: sourceKafkaStateUpgradeV0,
},
},
}
}

Expand Down Expand Up @@ -245,7 +396,7 @@ func sourceKafkaCreate(ctx context.Context, d *schema.ResourceData, meta any) di
b.Envelope(envelope)
}

if v, ok := d.GetOk("start_offset"); ok {
if v, ok := d.GetOk("start_offsets"); ok {
so := materialize.GetSliceValueInt(v.([]interface{}))
b.StartOffset(so)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/resources/resource_source_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var inSourceKafka = map[string]interface{}{
},
},
"envelope": []interface{}{map[string]interface{}{"upsert": true}},
"start_offset": []interface{}{1, 2, 3},
"start_offsets": []interface{}{1, 2, 3},
"start_timestamp": -1000,
}

Expand Down
Loading