Skip to content

Commit

Permalink
remove genericness
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 13, 2025
1 parent 4d666ac commit 9456d69
Show file tree
Hide file tree
Showing 14 changed files with 62 additions and 51 deletions.
4 changes: 2 additions & 2 deletions flow/e2e/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func (s PeerFlowE2ETestSuiteBQ) Conn() *pgx.Conn {
return s.conn.Conn()
}

func (s PeerFlowE2ETestSuiteBQ) Connector() *connpostgres.PostgresConnector {
return s.conn
func (s PeerFlowE2ETestSuiteBQ) Source() e2e.SuiteSource {
return &e2e.PostgresSource{PostgresConnector: s.conn}
}

func (s PeerFlowE2ETestSuiteBQ) DestinationConnector() connectors.Connector {
Expand Down
4 changes: 4 additions & 0 deletions flow/e2e/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func (s ClickHouseSuite) Connector() *connpostgres.PostgresConnector {
return c
}

func (s ClickHouseSuite) Source() e2e.SuiteSource {
return s.source
}

func (s ClickHouseSuite) DestinationConnector() connectors.Connector {
// TODO have CH connector
return nil
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type SuiteSource interface {
Exec(sql string) error
}

func TableMappings[TSource connectors.Connector](s GenericSuite[TSource], tables ...string) []*protos.TableMapping {
func TableMappings(s GenericSuite, tables ...string) []*protos.TableMapping {
if len(tables)&1 != 0 {
panic("must receive even number of table names")
}
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func (s elasticsearchSuite) T() *testing.T {
return s.t
}

func (s elasticsearchSuite) Connector() *connpostgres.PostgresConnector {
return s.conn
func (s elasticsearchSuite) Source() e2e.SuiteSource {
return &e2e.PostgresSource{PostgresConnector: s.conn}
}

func (s elasticsearchSuite) Suffix() string {
Expand Down
6 changes: 3 additions & 3 deletions flow/e2e/eventhub/peer_flow_eh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ func (s EventhubsSuite) T() *testing.T {
return s.t
}

func (s EventhubsSuite) Connector() *connpostgres.PostgresConnector {
return s.conn
func (s EventhubsSuite) Source() e2e.SuiteSource {
return &e2e.PostgresSource{PostgresConnector: s.conn}
}

func (s EventhubsSuite) Conn() *pgx.Conn {
return s.Connector().Conn()
return s.conn.Conn()
}

func (s EventhubsSuite) Suffix() string {
Expand Down
12 changes: 10 additions & 2 deletions flow/e2e/generic/generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,24 @@ func TestGenericCH(t *testing.T) {
}

type Generic struct {
e2e.GenericSuite[*connpostgres.PostgresConnector]
e2e.GenericSuite
}

func SetupGenericSuite[T e2e.GenericSuite[*connpostgres.PostgresConnector]](f func(t *testing.T) T) func(t *testing.T) Generic {
func SetupGenericSuite[T e2e.GenericSuite](f func(t *testing.T) T) func(t *testing.T) Generic {
return func(t *testing.T) Generic {
t.Helper()
return Generic{f(t)}
}
}

func (s Generic) Connector() *connpostgres.PostgresConnector {
if connector, ok := s.Source().Connector().(*connpostgres.PostgresConnector); ok {
return connector
}
s.T().SkipNow()
return nil
}

func (s Generic) Test_Simple_Flow() {
t := s.T()
srcTable := "test_simple"
Expand Down
6 changes: 3 additions & 3 deletions flow/e2e/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ func (s KafkaSuite) T() *testing.T {
return s.t
}

func (s KafkaSuite) Connector() *connpostgres.PostgresConnector {
return s.conn
func (s KafkaSuite) Source() e2e.SuiteSource {
return &e2e.PostgresSource{PostgresConnector: s.conn}
}

func (s KafkaSuite) Conn() *pgx.Conn {
return s.Connector().Conn()
return s.conn.Conn()
}

func (s KafkaSuite) Suffix() string {
Expand Down
7 changes: 3 additions & 4 deletions flow/e2e/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,12 @@ func (s *PostgresSource) Teardown(t *testing.T, suffix string) {
}
}

func TearDownPostgres(s Suite[*connpostgres.PostgresConnector]) {
func TearDownPostgres(s Suite) {
t := s.T()
t.Helper()

conn := s.Connector()
if conn != nil {
conn := s.Connector().Conn()
if connector, ok := s.Source().Connector().(*connpostgres.PostgresConnector); ok {
conn := connector.Conn()
t.Log("begin tearing down postgres schema", s.Suffix())
deadline := time.Now().Add(2 * time.Minute)
for {
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func (s PeerFlowE2ETestSuitePG) T() *testing.T {
return s.t
}

func (s PeerFlowE2ETestSuitePG) Connector() *connpostgres.PostgresConnector {
return s.conn
func (s PeerFlowE2ETestSuitePG) Source() e2e.SuiteSource {
return &e2e.PostgresSource{PostgresConnector: s.conn}
}

func (s PeerFlowE2ETestSuitePG) DestinationConnector() connectors.Connector {
Expand Down
6 changes: 3 additions & 3 deletions flow/e2e/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ func (s PubSubSuite) T() *testing.T {
return s.t
}

func (s PubSubSuite) Connector() *connpostgres.PostgresConnector {
return s.conn
func (s PubSubSuite) Source() e2e.SuiteSource {
return &e2e.PostgresSource{PostgresConnector: s.conn}
}

func (s PubSubSuite) Conn() *pgx.Conn {
return s.Connector().Conn()
return s.conn.Conn()
}

func (s PubSubSuite) Suffix() string {
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/s3/qrep_flow_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func (s PeerFlowE2ETestSuiteS3) T() *testing.T {
return s.t
}

func (s PeerFlowE2ETestSuiteS3) Connector() *connpostgres.PostgresConnector {
return s.conn
func (s PeerFlowE2ETestSuiteS3) Source() e2e.SuiteSource {
return &e2e.PostgresSource{PostgresConnector: s.conn}
}

func (s PeerFlowE2ETestSuiteS3) Suffix() string {
Expand Down
6 changes: 3 additions & 3 deletions flow/e2e/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ func (s PeerFlowE2ETestSuiteSF) T() *testing.T {
return s.t
}

func (s PeerFlowE2ETestSuiteSF) Connector() *connpostgres.PostgresConnector {
return s.conn
func (s PeerFlowE2ETestSuiteSF) Source() e2e.SuiteSource {
return &e2e.PostgresSource{PostgresConnector: s.conn}
}

func (s PeerFlowE2ETestSuiteSF) DestinationConnector() connectors.Connector {
return s.connector
}

func (s PeerFlowE2ETestSuiteSF) Conn() *pgx.Conn {
return s.Connector().Conn()
return s.conn.Conn()
}

func (s PeerFlowE2ETestSuiteSF) Suffix() string {
Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/sqlserver/qrep_flow_sqlserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func (s PeerFlowE2ETestSuiteSQLServer) Conn() *pgx.Conn {
return s.conn.Conn()
}

func (s PeerFlowE2ETestSuiteSQLServer) Connector() *connpostgres.PostgresConnector {
return s.conn
func (s PeerFlowE2ETestSuiteSQLServer) Source() e2e.SuiteSource {
return &e2e.PostgresSource{PostgresConnector: s.conn}
}

func (s PeerFlowE2ETestSuiteSQLServer) Suffix() string {
Expand Down
44 changes: 22 additions & 22 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,30 +40,30 @@ func init() {
_ = godotenv.Load()
}

type Suite[TSource connectors.Connector] interface {
type Suite interface {
e2eshared.Suite
T() *testing.T
Connector() TSource
Source() SuiteSource
Suffix() string
}

type RowSource[TSource connectors.Connector] interface {
Suite[TSource]
type RowSource interface {
Suite
GetRows(table, cols string) (*model.QRecordBatch, error)
}

type GenericSuite[TSource connectors.Connector] interface {
RowSource[TSource]
type GenericSuite interface {
RowSource
Peer() *protos.Peer
DestinationConnector() connectors.Connector
DestinationTable(table string) string
}

func AttachSchema(s interface{ Suffix() string }, table string) string {
func AttachSchema(s Suite, table string) string {
return fmt.Sprintf("e2e_test_%s.%s", s.Suffix(), table)
}

func AddSuffix[T connectors.Connector](s Suite[T], str string) string {
func AddSuffix(s Suite, str string) string {
return fmt.Sprintf("%s_%s", str, s.Suffix())
}

Expand Down Expand Up @@ -135,9 +135,9 @@ func GetMySqlRows(conn *connmysql.MySqlConnector, suffix string, table string, c
return batch, nil
}

func GetSuiteSourceRows[TSource connectors.Connector](suite Suite[TSource], table string, cols string) (*model.QRecordBatch, error) {
func GetSuiteSourceRows(suite Suite, table string, cols string) (*model.QRecordBatch, error) {
// TODO move to SuiteSource
switch conn := any(suite.Connector()).(type) {
switch conn := any(suite.Source().Connector()).(type) {
case *connpostgres.PostgresConnector:
return GetPgRows(conn, suite.Suffix(), table, cols)
case *connmysql.MySqlConnector:
Expand All @@ -147,7 +147,7 @@ func GetSuiteSourceRows[TSource connectors.Connector](suite Suite[TSource], tabl
}
}

func RequireEqualTables[TSource connectors.Connector](suite RowSource[TSource], table string, cols string) {
func RequireEqualTables(suite RowSource, table string, cols string) {
t := suite.T()
t.Helper()

Expand All @@ -160,12 +160,12 @@ func RequireEqualTables[TSource connectors.Connector](suite RowSource[TSource],
require.True(t, e2eshared.CheckEqualRecordBatches(t, sourceRows, rows))
}

func EnvEqualTables[TSource connectors.Connector](env WorkflowRun, suite RowSource[TSource], table string, cols string) {
func EnvEqualTables(env WorkflowRun, suite RowSource, table string, cols string) {
EnvEqualTablesWithNames(env, suite, table, table, cols)
}

func EnvEqualTablesWithNames[TSource connectors.Connector](
env WorkflowRun, suite RowSource[TSource], srcTable string, dstTable string, cols string,
func EnvEqualTablesWithNames(
env WorkflowRun, suite RowSource, srcTable string, dstTable string, cols string,
) {
t := suite.T()
t.Helper()
Expand All @@ -179,9 +179,9 @@ func EnvEqualTablesWithNames[TSource connectors.Connector](
EnvEqualRecordBatches(t, env, sourceRows, rows)
}

func EnvWaitForEqualTables[TSource connectors.Connector](
func EnvWaitForEqualTables(
env WorkflowRun,
suite RowSource[TSource],
suite RowSource,
reason string,
table string,
cols string,
Expand All @@ -190,9 +190,9 @@ func EnvWaitForEqualTables[TSource connectors.Connector](
EnvWaitForEqualTablesWithNames(env, suite, reason, table, table, cols)
}

func EnvWaitForEqualTablesWithNames[TSource connectors.Connector](
func EnvWaitForEqualTablesWithNames(
env WorkflowRun,
suite RowSource[TSource],
suite RowSource,
reason string,
srcTable string,
dstTable string,
Expand Down Expand Up @@ -220,9 +220,9 @@ func EnvWaitForEqualTablesWithNames[TSource connectors.Connector](
})
}

func EnvWaitForCount[TSource connectors.Connector](
func EnvWaitForCount(
env WorkflowRun,
suite RowSource[TSource],
suite RowSource,
reason string,
dstTable string,
cols string,
Expand Down Expand Up @@ -591,7 +591,7 @@ func GetOwnersSelectorStringsSF() [2]string {
return [2]string{strings.Join(pgFields, ","), strings.Join(sfFields, ",")}
}

func ExpectedDestinationIdentifier[T connectors.Connector](s GenericSuite[T], ident string) string {
func ExpectedDestinationIdentifier(s GenericSuite, ident string) string {
switch s.DestinationConnector().(type) {
case *connsnowflake.SnowflakeConnector:
return strings.ToUpper(ident)
Expand All @@ -600,7 +600,7 @@ func ExpectedDestinationIdentifier[T connectors.Connector](s GenericSuite[T], id
}
}

func ExpectedDestinationTableName[T connectors.Connector](s GenericSuite[T], table string) string {
func ExpectedDestinationTableName(s GenericSuite, table string) string {
return ExpectedDestinationIdentifier(s, s.DestinationTable(table))
}

Expand Down

0 comments on commit 9456d69

Please sign in to comment.