Skip to content

Commit

Permalink
wip persist
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed May 28, 2024
1 parent 1e60e1d commit a7cd919
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 42 deletions.
126 changes: 84 additions & 42 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package connpostgres

import (
"context"
"encoding/binary"
"fmt"
"log/slog"
"time"
Expand All @@ -25,17 +26,17 @@ import (
"github.com/PeerDB-io/peer-flow/shared"
)

type MessageLSN struct {
msg pglogrepl.Message
lsn pglogrepl.LSN
type TxBuffer struct {

Check failure on line 29 in flow/connectors/postgres/cdc.go

View workflow job for this annotation

GitHub Actions / lint

fieldalignment: struct with 16 pointer bytes could be 8 (govet)
Lsn pglogrepl.LSN
Streams [][]byte
}

type PostgresCDCSource struct {
*PostgresConnector
*PostgresCDCConfig
typeMap *pgtype.Map
commitLock *pglogrepl.BeginMessage
txBuffer map[uint32][]MessageLSN
txBuffer map[uint32][]TxBuffer
inStream bool
}

Expand All @@ -61,9 +62,9 @@ type startReplicationOpts struct {
}

func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *PostgresCDCSource {
var txBuffer map[uint32][]MessageLSN
var txBuffer map[uint32][]TxBuffer
if cdcConfig.Version >= 2 {
txBuffer = make(map[uint32][]MessageLSN)
txBuffer = make(map[uint32][]TxBuffer)
}
return &PostgresCDCSource{
PostgresConnector: c,
Expand Down Expand Up @@ -378,7 +379,7 @@ func PullCdcRecords[Items model.Items](
if p.commitLock == nil {
cdclen := cdcRecordStore.Len()
if cdclen >= 0 && uint32(cdclen) >= req.MaxBatchSize {
return nil
break
}

if waitingForCommit {
Expand All @@ -387,7 +388,7 @@ func PullCdcRecords[Items model.Items](
p.FlowJobName,
cdcRecordStore.Len()),
)
return nil
break
}
}

Expand Down Expand Up @@ -433,7 +434,7 @@ func PullCdcRecords[Items model.Items](
if pgconn.Timeout(err) {
logger.Info(fmt.Sprintf("Stand-by deadline reached, returning currently accumulated records - %d",
cdcRecordStore.Len()))
return nil
break
} else {
return fmt.Errorf("ReceiveMessage failed: %w", err)
}
Expand Down Expand Up @@ -476,7 +477,7 @@ func PullCdcRecords[Items model.Items](

logger.Debug(fmt.Sprintf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s\n",
xld.WALStart, xld.ServerWALEnd, xld.ServerTime))
if err := recordProcessor.processXLogData(ctx, p, xld, clientXLogPos); err != nil {
if err := recordProcessor.processXLogData(ctx, p, xld, msg.Data[1:], clientXLogPos); err != nil {
return fmt.Errorf("error processing message: %w", err)
}

Expand All @@ -485,6 +486,23 @@ func PullCdcRecords[Items model.Items](
}
}
}

for xid, txbufs := range p.txBuffer {
for _, txbuf := range txbufs {
if _, err := p.CatalogPool.Exec(
ctx,
"insert into v2cdc (flow_name, xid, lsn, stream) values ($1, $2, $3, $4) on conflict do nothing",
p.FlowJobName,
xid,
txbuf.Lsn,
txbuf.Streams,
); err != nil {
return err
}
}
}

return nil
}

func (p *PostgresCDCSource) baseRecord(lsn pglogrepl.LSN) model.BaseRecord {
Expand All @@ -502,14 +520,26 @@ func (rp *cdcRecordProcessor[Items]) processXLogData(
ctx context.Context,
p *PostgresCDCSource,
xld pglogrepl.XLogData,
xldbytes []byte,
currentClientXlogPos pglogrepl.LSN,
) error {
var logicalMsg pglogrepl.Message
var err error
if p.Version < 2 {
logicalMsg, err = pglogrepl.Parse(xld.WALData)
} else {
logicalMsg, err = pglogrepl.ParseV2(xld.WALData, p.inStream)
if p.inStream &&
(xld.WALData[0] == byte(pglogrepl.MessageTypeUpdate) ||
xld.WALData[0] == byte(pglogrepl.MessageTypeInsert) ||
xld.WALData[0] == byte(pglogrepl.MessageTypeDelete) ||
xld.WALData[0] == byte(pglogrepl.MessageTypeRelation)) {
xid := binary.BigEndian.Uint32(xld.WALData[1:])
txbufs := p.txBuffer[xid]
idx := len(txbufs) - 1
txbufs[idx].Streams = append(txbufs[idx].Streams, xldbytes)
} else {
logicalMsg, err = pglogrepl.ParseV2(xld.WALData, p.inStream)
}
}
if err != nil {
return fmt.Errorf("error parsing logical message: %w", err)
Expand All @@ -534,46 +564,50 @@ func (rp *cdcRecordProcessor[Items]) processMessage(
case *pglogrepl.InsertMessage:
return rp.processInsertMessage(p, lsn, msg)
case *pglogrepl.InsertMessageV2:
p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], MessageLSN{msg: &msg.InsertMessage, lsn: lsn})
return rp.processInsertMessage(p, lsn, &msg.InsertMessage)
case *pglogrepl.UpdateMessage:
return rp.processUpdateMessage(p, lsn, msg)
case *pglogrepl.UpdateMessageV2:
p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], MessageLSN{msg: &msg.UpdateMessage, lsn: lsn})
return rp.processUpdateMessage(p, lsn, &msg.UpdateMessage)
case *pglogrepl.DeleteMessage:
return rp.processDeleteMessage(p, lsn, msg)
case *pglogrepl.DeleteMessageV2:
p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], MessageLSN{msg: &msg.DeleteMessage, lsn: lsn})
return rp.processDeleteMessage(p, lsn, &msg.DeleteMessage)
case *pglogrepl.CommitMessage:
// for a commit message, update the last checkpoint id for the record batch.
logger.Debug(fmt.Sprintf("CommitMessage => CommitLSN: %v, TransactionEndLSN: %v",
msg.CommitLSN, msg.TransactionEndLSN))
rp.records.UpdateLatestCheckpoint(int64(msg.CommitLSN))
p.commitLock = nil
case *pglogrepl.StreamCommitMessageV2:
for _, m := range p.txBuffer[msg.Xid] {
if err := rp.processMessage(ctx, p, m.lsn, m.msg, currentClientXlogPos); err != nil {
return err
// TODO first replay streams from catalog
// TODO select streams from v2cdc where flow_name = $1 and xid = $2 order by lsn
txbufs := p.txBuffer[msg.Xid]
for _, txbuf := range txbufs {
for _, m := range txbuf.Streams {
mxld, err := pglogrepl.ParseXLogData(m)
if err != nil {
return err
}
logicalMsg, err = pglogrepl.ParseV2(mxld.WALData, p.inStream)
if err != nil {
return err
}
if err := rp.processMessage(ctx, p, mxld.WALStart, logicalMsg, currentClientXlogPos); err != nil {
return err
}
}
}
rp.records.UpdateLatestCheckpoint(int64(msg.CommitLSN))
delete(p.txBuffer, msg.Xid)
case *pglogrepl.StreamAbortMessageV2:
delete(p.txBuffer, msg.Xid)
case *pglogrepl.RelationMessage:
// treat all relation messages as corresponding to parent if partitioned.
msg.RelationID = p.getParentRelIDIfPartitioned(msg.RelationID)

if _, exists := p.SrcTableIDNameMapping[msg.RelationID]; !exists {
return nil
}

logger.Debug(fmt.Sprintf("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v",
msg.RelationID, msg.Namespace, msg.RelationName, msg.Columns))

return rp.processRelationMessage(ctx, p, currentClientXlogPos, msg)
case *pglogrepl.RelationMessageV2:
p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], MessageLSN{msg: &msg.RelationMessage, lsn: lsn})
return rp.processRelationMessage(ctx, p, currentClientXlogPos, &msg.RelationMessage)
case *pglogrepl.StreamStartMessageV2:
p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], TxBuffer{Lsn: lsn})
p.inStream = true
case *pglogrepl.StreamStopMessageV2:
p.inStream = false
Expand Down Expand Up @@ -748,8 +782,7 @@ func (rp *cdcRecordProcessor[Items]) processDeleteMessage(
}
isFullReplica := rp.pullRequest.TableNameSchemaMapping[tableName].IsReplicaIdentityFull
if isFullReplica {
err := rp.addRecordWithKey(p.logger, model.TableWithPkey{}, rec)
if err != nil {
if err := rp.addRecordWithKey(p.logger, model.TableWithPkey{}, rec); err != nil {
return err
}
} else {
Expand Down Expand Up @@ -778,8 +811,7 @@ func (rp *cdcRecordProcessor[Items]) processDeleteMessage(

// A delete can only be followed by an INSERT, which does not need backfilling
// No need to store DeleteRecords in memory or disk.
err = rp.addRecordWithKey(p.logger, model.TableWithPkey{}, rec)
if err != nil {
if err := rp.addRecordWithKey(p.logger, model.TableWithPkey{}, rec); err != nil {
return err
}
}
Expand All @@ -805,25 +837,35 @@ func (rp *cdcRecordProcessor[Items]) processRelationMessage(
ctx context.Context,
p *PostgresCDCSource,
lsn pglogrepl.LSN,
currRel *pglogrepl.RelationMessage,
msg *pglogrepl.RelationMessage,
) error {
// treat all relation messages as corresponding to parent if partitioned.
msg.RelationID = p.getParentRelIDIfPartitioned(msg.RelationID)

if _, exists := p.SrcTableIDNameMapping[msg.RelationID]; !exists {
return nil
}

p.logger.Debug(fmt.Sprintf("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v",
msg.RelationID, msg.Namespace, msg.RelationName, msg.Columns))

// not present in tables to sync, return immediately
if _, ok := p.SrcTableIDNameMapping[currRel.RelationID]; !ok {
if _, ok := p.SrcTableIDNameMapping[msg.RelationID]; !ok {
p.logger.Info("relid not present in srcTableIDNameMapping, skipping relation message",
slog.Uint64("relId", uint64(currRel.RelationID)))
slog.Uint64("relId", uint64(msg.RelationID)))
return nil
}

// retrieve current TableSchema for table changed
// tableNameSchemaMapping uses dst table name as the key, so annoying lookup
prevSchema := p.TableNameSchemaMapping[p.TableNameMapping[p.SrcTableIDNameMapping[currRel.RelationID]].Name]
prevSchema := p.TableNameSchemaMapping[p.TableNameMapping[p.SrcTableIDNameMapping[msg.RelationID]].Name]
// creating maps for lookup later
prevRelMap := make(map[string]string)
currRelMap := make(map[string]string)
for _, column := range prevSchema.Columns {
prevRelMap[column.Name] = column.Type
}
for _, column := range currRel.Columns {
for _, column := range msg.Columns {
switch prevSchema.System {
case protos.TypeSystem_Q:
qKind := p.postgresOIDToQValueKind(column.DataType)
Expand All @@ -842,16 +884,16 @@ func (rp *cdcRecordProcessor[Items]) processRelationMessage(
}

schemaDelta := &protos.TableSchemaDelta{
SrcTableName: p.SrcTableIDNameMapping[currRel.RelationID],
DstTableName: p.TableNameMapping[p.SrcTableIDNameMapping[currRel.RelationID]].Name,
SrcTableName: p.SrcTableIDNameMapping[msg.RelationID],
DstTableName: p.TableNameMapping[p.SrcTableIDNameMapping[msg.RelationID]].Name,
AddedColumns: make([]*protos.FieldDescription, 0),
System: prevSchema.System,
}
for _, column := range currRel.Columns {
for _, column := range msg.Columns {
// not present in previous relation message, but in current one, so added.
if _, ok := prevRelMap[column.Name]; !ok {
// only add to delta if not excluded
if _, ok := p.TableNameMapping[p.SrcTableIDNameMapping[currRel.RelationID]].Exclude[column.Name]; !ok {
if _, ok := p.TableNameMapping[p.SrcTableIDNameMapping[msg.RelationID]].Exclude[column.Name]; !ok {
schemaDelta.AddedColumns = append(schemaDelta.AddedColumns, &protos.FieldDescription{
Name: column.Name,
Type: currRelMap[column.Name],
Expand All @@ -874,7 +916,7 @@ func (rp *cdcRecordProcessor[Items]) processRelationMessage(
}
}

p.relationMessageMapping[currRel.RelationID] = currRel
p.relationMessageMapping[msg.RelationID] = msg
// only log audit if there is actionable delta
if len(schemaDelta.AddedColumns) > 0 {
rec := &model.RelationRecord[Items]{
Expand Down
7 changes: 7 additions & 0 deletions nexus/catalog/migrations/V26__v2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
create table v2cdc (
flow_name text,
xid xid,
lsn pg_lsn,
stream bytea[],
primary key (flow_name, xid, lsn)
);

0 comments on commit a7cd919

Please sign in to comment.