Skip to content

Commit

Permalink
support for upsert
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Sep 20, 2023
1 parent c26c9a7 commit fd503d8
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 18 deletions.
44 changes: 26 additions & 18 deletions flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,11 +379,6 @@ func GenerateMergeCommand(
upsertKeyCols[i] = caseMatchedCols[strings.ToLower(col)]
}

watermarkCol, ok := caseMatchedCols[strings.ToLower(watermarkCol)]
if !ok {
return "", fmt.Errorf("watermark column '%s' not found in destination table", watermarkCol)
}

upsertKeys := []string{}
partitionKeyCols := []string{}
for _, key := range upsertKeyCols {
Expand All @@ -405,23 +400,36 @@ func GenerateMergeCommand(
updateSetClause := strings.Join(updateSetClauses, ", ")
insertColumnsClause := strings.Join(insertColumnsClauses, ", ")
insertValuesClause := strings.Join(insertValuesClauses, ", ")

quotedWMC := utils.QuoteIdentifier(watermarkCol)

selectCmd := fmt.Sprintf(`
SELECT *
FROM %s
QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s DESC) = 1
`, tempTableName, strings.Join(partitionKeyCols, ","), quotedWMC)

mergeCmd := fmt.Sprintf(`
MERGE INTO %s dst
USING (%s) src
ON %s
WHEN MATCHED AND src.%s > dst.%s THEN UPDATE SET %s
WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s)
`, dstTable, selectCmd, upsertKeyClause, quotedWMC, quotedWMC,
updateSetClause, insertColumnsClause, insertValuesClause)
`, tempTableName, strings.Join(partitionKeyCols, ","), partitionKeyCols[0])
var mergeCmd string
if watermarkCol == "xmin" { // we don't want to depend on wmc
mergeCmd = fmt.Sprintf(`
MERGE INTO %s dst
USING (%s) src
ON %s
WHEN MATCHED THEN UPDATE SET %s
WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s)
`, dstTable, selectCmd, upsertKeyClause,
updateSetClause, insertColumnsClause, insertValuesClause)
} else {
watermarkCol, ok := caseMatchedCols[strings.ToLower(watermarkCol)]
if !ok {
return "", fmt.Errorf("watermark column '%s' not found in destination table", watermarkCol)
}
quotedWMC := utils.QuoteIdentifier(watermarkCol)
mergeCmd = fmt.Sprintf(`
MERGE INTO %s dst
USING (%s) src
ON %s
WHEN MATCHED AND src.%s > dst.%s THEN UPDATE SET %s
WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s)
`, dstTable, selectCmd, upsertKeyClause, quotedWMC, quotedWMC,
updateSetClause, insertColumnsClause, insertValuesClause)
}

return mergeCmd, nil
}
Expand Down
46 changes: 46 additions & 0 deletions flow/e2e/snowflake/qrep_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,52 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_XMIN_SF_Append() {
env.AssertExpectations(s.T())
}

func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() {
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

numRows := 10

tblName := "test_qrep_flow_avro_sf_ups_xmin"
s.setupSourceTable(tblName, numRows)
s.setupSFDestinationTable(tblName)

dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName)

query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE xmin::text::bigint BETWEEN {{.start}} AND {{.end}}",
snowflakeSuffix, tblName)

qrepConfig, err := e2e.CreateQRepWorkflowConfig(
"test_qrep_flow_avro_sf_xmin",
fmt.Sprintf("e2e_test_%s.%s", snowflakeSuffix, tblName),
dstSchemaQualified,
query,
protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO,
s.sfHelper.Peer,
"",
)
qrepConfig.WriteMode = &protos.QRepWriteMode{
WriteType: protos.QRepWriteType_QREP_WRITE_MODE_UPSERT,
UpsertKeyColumns: []string{"id"},
}
qrepConfig.WatermarkColumn = "xmin"
s.NoError(err)

e2e.RunQrepFlowWorkflow(env, qrepConfig)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())

// assert that error contains "invalid connection configs"
err = env.GetWorkflowError()
s.NoError(err)

sel := e2e.GetOwnersSelectorString()
s.compareTableContentsSF(tblName, sel, true)

env.AssertExpectations(s.T())
}

func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() {
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)
Expand Down

0 comments on commit fd503d8

Please sign in to comment.