Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 27, 2024
1 parent 1579cda commit 4ac03f0
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 12 deletions.
22 changes: 12 additions & 10 deletions flow/connectors/mysql/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,7 @@ func (c *MySqlConnector) PullRecords(
otelManager *otel_metrics.OtelManager,
req *model.PullRecordsRequest[model.RecordItems],
) error {
defer func() {
req.RecordStream.Close()
}()
defer req.RecordStream.Close()
gset, err := mysql.ParseGTIDSet(c.config.Flavor, req.LastOffset.Text)
if err != nil {
return err
Expand All @@ -323,11 +321,16 @@ func (c *MySqlConnector) PullRecords(
}
}

timeoutCtx, cancelTimeout := context.WithTimeout(ctx, req.IdleTimeout)
defer cancelTimeout()

var recordCount uint32
for {
// TODO put req.IdleTimeout timer on this
event, err := mystream.GetEvent(ctx)
event, err := mystream.GetEvent(timeoutCtx)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return nil
}
return err
}

Expand Down Expand Up @@ -355,13 +358,12 @@ func (c *MySqlConnector) PullRecords(
}
c.replState = newset
case *replication.RowsEvent:
sourceTableName := string(ev.Table.Table) // TODO need ev.Table.Schema?
sourceTableName := string(ev.Table.Schema) + "." + string(ev.Table.Table) // TODO this is fragile
destinationTableName := req.TableNameMapping[sourceTableName].Name
schema := req.TableNameSchemaMapping[destinationTableName]
for _, row := range ev.Rows {
var record model.Record[model.RecordItems]
// TODO need mapping of column index to column name
var items model.RecordItems
items := model.NewRecordItems(len(row))
switch event.Header.EventType {
case replication.WRITE_ROWS_EVENTv0, replication.UPDATE_ROWS_EVENTv0, replication.DELETE_ROWS_EVENTv0:
return errors.New("mysql v0 replication protocol not supported")
Expand All @@ -377,10 +379,10 @@ func (c *MySqlConnector) PullRecords(
DestinationTableName: destinationTableName,
}
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
var oldItems model.RecordItems
oldItems := model.NewRecordItems(len(row) / 2)
for idx, val := range row {
fd := schema.Columns[idx>>1]
qv := qvalueFromMysql(ev.Table.ColumnType[idx], qvalue.QValueKind(fd.Type), val)
qv := qvalueFromMysql(ev.Table.ColumnType[idx>>1], qvalue.QValueKind(fd.Type), val)
if (idx & 1) == 0 { // TODO test that it isn't other way around
oldItems.AddColumn(fd.Name, qv)
} else {
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ const (
getLastNormalizeBatchID_SQL = "SELECT normalize_batch_id FROM %s.%s WHERE mirror_job_name=$1"
createNormalizedTableSQL = "CREATE TABLE IF NOT EXISTS %s(%s)"
checkTableExistsSQL = "SELECT EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = $1 AND tablename = $2)"
upsertJobMetadataForSyncSQL = `INSERT INTO %s.%s (mirror_job_name, lsn_offset, lsn_text, sync_batch_id) AS j VALUES ($1,$2,$3,$4)
upsertJobMetadataForSyncSQL = `INSERT INTO %s.%s AS j (mirror_job_name,lsn_offset,lsn_text,sync_batch_id,normalize_batch_id) VALUES ($1,$2,$3,$4,0)
ON CONFLICT(mirror_job_name) DO UPDATE SET lsn_offset=GREATEST(j.lsn_offset, EXCLUDED.lsn_offset),
lsn_text = EXCLUDED.lsn_text, sync_batch_id=EXCLUDED.sync_batch_id`
lsn_text=EXCLUDED.lsn_text, sync_batch_id=EXCLUDED.sync_batch_id`
checkIfJobMetadataExistsSQL = "SELECT COUNT(1)::TEXT::BOOL FROM %s.%s WHERE mirror_job_name=$1"
updateMetadataForNormalizeRecordsSQL = "UPDATE %s.%s SET normalize_batch_id=$1 WHERE mirror_job_name=$2"

Expand Down
1 change: 1 addition & 0 deletions nexus/catalog/migrations/V42__mysql_metadata.sql
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
ALTER TABLE metadata_last_sync_state ADD COLUMN IF NOT EXISTS last_text text NOT NULL DEFAULT '';
ALTER TABLE peerdb_stats.cdc_batches ADD COLUMN IF NOT EXISTS batch_end_lsn_text text NOT NULL DEFAULT '';

0 comments on commit 4ac03f0

Please sign in to comment.