Skip to content

Commit

Permalink
remove redundant code
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Sep 20, 2023
1 parent fd503d8 commit 0794778
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 63 deletions.
22 changes: 3 additions & 19 deletions flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,31 +405,15 @@ func GenerateMergeCommand(
FROM %s
QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s DESC) = 1
`, tempTableName, strings.Join(partitionKeyCols, ","), partitionKeyCols[0])
var mergeCmd string
if watermarkCol == "xmin" { // we don't want to depend on wmc
mergeCmd = fmt.Sprintf(`

mergeCmd := fmt.Sprintf(`
MERGE INTO %s dst
USING (%s) src
ON %s
WHEN MATCHED THEN UPDATE SET %s
WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s)
`, dstTable, selectCmd, upsertKeyClause,
updateSetClause, insertColumnsClause, insertValuesClause)
} else {
watermarkCol, ok := caseMatchedCols[strings.ToLower(watermarkCol)]
if !ok {
return "", fmt.Errorf("watermark column '%s' not found in destination table", watermarkCol)
}
quotedWMC := utils.QuoteIdentifier(watermarkCol)
mergeCmd = fmt.Sprintf(`
MERGE INTO %s dst
USING (%s) src
ON %s
WHEN MATCHED AND src.%s > dst.%s THEN UPDATE SET %s
WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s)
`, dstTable, selectCmd, upsertKeyClause, quotedWMC, quotedWMC,
updateSetClause, insertColumnsClause, insertValuesClause)
}
updateSetClause, insertColumnsClause, insertValuesClause)

return mergeCmd, nil
}
Expand Down
44 changes: 0 additions & 44 deletions flow/e2e/snowflake/qrep_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,50 +211,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() {

env.AssertExpectations(s.T())
}

func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_XMIN_SF_Append() {
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

numRows := 10

tblName := "test_qrep_flow_avro_sf_xmin_append"
s.setupSourceTable(tblName, numRows)
s.setupSFDestinationTable(tblName)

dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName)

query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE xmin::text::bigint BETWEEN {{.start}} AND {{.end}}",
snowflakeSuffix, tblName)

qrepConfig, err := e2e.CreateQRepWorkflowConfig(
"test_qrep_flow_avro_sf_xmin",
fmt.Sprintf("e2e_test_%s.%s", snowflakeSuffix, tblName),
dstSchemaQualified,
query,
protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO,
s.sfHelper.Peer,
"",
)

qrepConfig.WatermarkColumn = "xmin"
s.NoError(err)

e2e.RunQrepFlowWorkflow(env, qrepConfig)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())

// assert that error contains "invalid connection configs"
err = env.GetWorkflowError()
s.NoError(err)

sel := e2e.GetOwnersSelectorString()
s.compareTableContentsSF(tblName, sel, true)

env.AssertExpectations(s.T())
}

func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() {
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)
Expand Down

0 comments on commit 0794778

Please sign in to comment.