Skip to content

Commit

Permalink
generic e2e test for partitioned tables (#2383)
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Dec 20, 2024
1 parent 8aee197 commit b87d0a3
Showing 1 changed file with 51 additions and 0 deletions.
51 changes: 51 additions & 0 deletions flow/e2e/generic/generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,3 +299,54 @@ func (s Generic) Test_Simple_Schema_Changes() {

e2e.RequireEnvCanceled(t, env)
}

func (s Generic) Test_Partitioned_Table() {
t := s.T()
srcTable := "test_partition"
dstTable := "test_partition_dst"
srcSchemaTable := e2e.AttachSchema(s, srcTable)

_, err := s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE %[1]s(
id SERIAL NOT NULL,
name TEXT,
created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT now(),
PRIMARY KEY (created_at, id)
) PARTITION BY RANGE(created_at);
CREATE TABLE %[1]s_2024q1
PARTITION OF %[1]s
FOR VALUES FROM ('2024-01-01') TO ('2024-04-01');
CREATE TABLE %[1]s_2024q2
PARTITION OF %[1]s
FOR VALUES FROM ('2024-04-01') TO ('2024-07-01');
CREATE TABLE %[1]s_2024q3
PARTITION OF %[1]s
FOR VALUES FROM ('2024-07-01') TO ('2024-10-01');
`, srcSchemaTable))
require.NoError(t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: e2e.AddSuffix(s, "test_partition"),
TableMappings: e2e.TableMappings(s, srcTable, dstTable),
Destination: s.Peer().Name,
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(t)

tc := e2e.NewTemporalClient(t)
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)

e2e.SetupCDCFlowStatusQuery(t, env, flowConnConfig)
// insert 10 rows into the source table
for i := range 10 {
testName := fmt.Sprintf("test_name_%d", i)
_, err = s.Connector().Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(name, created_at) VALUES ($1, '2024-%d-01')
`, srcSchemaTable, max(1, i)), testName)
e2e.EnvNoError(t, env, err)
}
t.Log("Inserted 10 rows into the source table")

e2e.EnvWaitForEqualTablesWithNames(env, s, "normalizing 10 rows", srcTable, dstTable, `id,name,created_at`)
env.Cancel()
e2e.RequireEnvCanceled(t, env)
}

0 comments on commit b87d0a3

Please sign in to comment.