diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 2a7be9519e..6f2aec7f8f 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -2,6 +2,7 @@ package connpostgres import ( "context" + "encoding/binary" "fmt" "log/slog" "time" @@ -25,9 +26,9 @@ import ( "github.com/PeerDB-io/peer-flow/shared" ) -type MessageLSN struct { - msg pglogrepl.Message - lsn pglogrepl.LSN +type TxBuffer struct { + Lsn pglogrepl.LSN + Streams [][]byte } type PostgresCDCSource struct { @@ -35,7 +36,7 @@ type PostgresCDCSource struct { *PostgresCDCConfig typeMap *pgtype.Map commitLock *pglogrepl.BeginMessage - txBuffer map[uint32][]MessageLSN + txBuffer map[uint32][]TxBuffer inStream bool } @@ -61,9 +62,9 @@ type startReplicationOpts struct { } func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *PostgresCDCSource { - var txBuffer map[uint32][]MessageLSN + var txBuffer map[uint32][]TxBuffer if cdcConfig.Version >= 2 { - txBuffer = make(map[uint32][]MessageLSN) + txBuffer = make(map[uint32][]TxBuffer) } return &PostgresCDCSource{ PostgresConnector: c, @@ -378,7 +379,7 @@ func PullCdcRecords[Items model.Items]( if p.commitLock == nil { cdclen := cdcRecordStore.Len() if cdclen >= 0 && uint32(cdclen) >= req.MaxBatchSize { - return nil + break } if waitingForCommit { @@ -387,7 +388,7 @@ func PullCdcRecords[Items model.Items]( p.FlowJobName, cdcRecordStore.Len()), ) - return nil + break } } @@ -433,7 +434,7 @@ func PullCdcRecords[Items model.Items]( if pgconn.Timeout(err) { logger.Info(fmt.Sprintf("Stand-by deadline reached, returning currently accumulated records - %d", cdcRecordStore.Len())) - return nil + break } else { return fmt.Errorf("ReceiveMessage failed: %w", err) } @@ -476,7 +477,7 @@ func PullCdcRecords[Items model.Items]( logger.Debug(fmt.Sprintf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s\n", xld.WALStart, xld.ServerWALEnd, xld.ServerTime)) - if err := recordProcessor.processXLogData(ctx, p, xld, clientXLogPos); err != nil { + if err := recordProcessor.processXLogData(ctx, p, xld, msg.Data[1:], clientXLogPos); err != nil { return fmt.Errorf("error processing message: %w", err) } @@ -485,6 +486,23 @@ func PullCdcRecords[Items model.Items]( } } } + + for xid, txbufs := range p.txBuffer { + for _, txbuf := range txbufs { + if _, err := p.CatalogPool.Exec( + ctx, + "insert into v2cdc (flow_name, xid, lsn, stream) values ($1, $2, $3, $4) on conflict do nothing", + p.FlowJobName, + xid, + txbuf.Lsn, + txbuf.Streams, + ); err != nil { + return err + } + } + } + + return nil } func (p *PostgresCDCSource) baseRecord(lsn pglogrepl.LSN) model.BaseRecord { @@ -502,6 +520,7 @@ func (rp *cdcRecordProcessor[Items]) processXLogData( ctx context.Context, p *PostgresCDCSource, xld pglogrepl.XLogData, + xldbytes []byte, currentClientXlogPos pglogrepl.LSN, ) error { var logicalMsg pglogrepl.Message @@ -509,7 +528,18 @@ func (rp *cdcRecordProcessor[Items]) processXLogData( if p.Version < 2 { logicalMsg, err = pglogrepl.Parse(xld.WALData) } else { - logicalMsg, err = pglogrepl.ParseV2(xld.WALData, p.inStream) + if p.inStream && + (xld.WALData[0] == byte(pglogrepl.MessageTypeUpdate) || + xld.WALData[0] == byte(pglogrepl.MessageTypeInsert) || + xld.WALData[0] == byte(pglogrepl.MessageTypeDelete) || + xld.WALData[0] == byte(pglogrepl.MessageTypeRelation)) { + xid := binary.BigEndian.Uint32(xld.WALData[1:]) + txbufs := p.txBuffer[xid] + idx := len(txbufs) - 1 + txbufs[idx].Streams = append(txbufs[idx].Streams, xldbytes) + } else { + logicalMsg, err = pglogrepl.ParseV2(xld.WALData, p.inStream) + } } if err != nil { return fmt.Errorf("error parsing logical message: %w", err) @@ -534,15 +564,15 @@ func (rp *cdcRecordProcessor[Items]) processMessage( case *pglogrepl.InsertMessage: return rp.processInsertMessage(p, lsn, msg) case *pglogrepl.InsertMessageV2: - p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], MessageLSN{msg: &msg.InsertMessage, lsn: lsn}) + return rp.processInsertMessage(p, lsn, &msg.InsertMessage) case *pglogrepl.UpdateMessage: return rp.processUpdateMessage(p, lsn, msg) case *pglogrepl.UpdateMessageV2: - p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], MessageLSN{msg: &msg.UpdateMessage, lsn: lsn}) + return rp.processUpdateMessage(p, lsn, &msg.UpdateMessage) case *pglogrepl.DeleteMessage: return rp.processDeleteMessage(p, lsn, msg) case *pglogrepl.DeleteMessageV2: - p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], MessageLSN{msg: &msg.DeleteMessage, lsn: lsn}) + return rp.processDeleteMessage(p, lsn, &msg.DeleteMessage) case *pglogrepl.CommitMessage: // for a commit message, update the last checkpoint id for the record batch. logger.Debug(fmt.Sprintf("CommitMessage => CommitLSN: %v, TransactionEndLSN: %v", @@ -550,9 +580,22 @@ func (rp *cdcRecordProcessor[Items]) processMessage( rp.records.UpdateLatestCheckpoint(int64(msg.CommitLSN)) p.commitLock = nil case *pglogrepl.StreamCommitMessageV2: - for _, m := range p.txBuffer[msg.Xid] { - if err := rp.processMessage(ctx, p, m.lsn, m.msg, currentClientXlogPos); err != nil { - return err + // TODO first replay streams from catalog + // TODO select streams from v2cdc where flow_name = $1 and xid = $2 order by lsn + txbufs := p.txBuffer[msg.Xid] + for _, txbuf := range txbufs { + for _, m := range txbuf.Streams { + mxld, err := pglogrepl.ParseXLogData(m) + if err != nil { + return err + } + logicalMsg, err = pglogrepl.ParseV2(mxld.WALData, p.inStream) + if err != nil { + return err + } + if err := rp.processMessage(ctx, p, mxld.WALStart, logicalMsg, currentClientXlogPos); err != nil { + return err + } } } rp.records.UpdateLatestCheckpoint(int64(msg.CommitLSN)) @@ -560,20 +603,11 @@ func (rp *cdcRecordProcessor[Items]) processMessage( case *pglogrepl.StreamAbortMessageV2: delete(p.txBuffer, msg.Xid) case *pglogrepl.RelationMessage: - // treat all relation messages as corresponding to parent if partitioned. - msg.RelationID = p.getParentRelIDIfPartitioned(msg.RelationID) - - if _, exists := p.SrcTableIDNameMapping[msg.RelationID]; !exists { - return nil - } - - logger.Debug(fmt.Sprintf("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v", - msg.RelationID, msg.Namespace, msg.RelationName, msg.Columns)) - return rp.processRelationMessage(ctx, p, currentClientXlogPos, msg) case *pglogrepl.RelationMessageV2: - p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], MessageLSN{msg: &msg.RelationMessage, lsn: lsn}) + return rp.processRelationMessage(ctx, p, currentClientXlogPos, &msg.RelationMessage) case *pglogrepl.StreamStartMessageV2: + p.txBuffer[msg.Xid] = append(p.txBuffer[msg.Xid], TxBuffer{Lsn: lsn}) p.inStream = true case *pglogrepl.StreamStopMessageV2: p.inStream = false @@ -748,8 +782,7 @@ func (rp *cdcRecordProcessor[Items]) processDeleteMessage( } isFullReplica := rp.pullRequest.TableNameSchemaMapping[tableName].IsReplicaIdentityFull if isFullReplica { - err := rp.addRecordWithKey(p.logger, model.TableWithPkey{}, rec) - if err != nil { + if err := rp.addRecordWithKey(p.logger, model.TableWithPkey{}, rec); err != nil { return err } } else { @@ -778,8 +811,7 @@ func (rp *cdcRecordProcessor[Items]) processDeleteMessage( // A delete can only be followed by an INSERT, which does not need backfilling // No need to store DeleteRecords in memory or disk. - err = rp.addRecordWithKey(p.logger, model.TableWithPkey{}, rec) - if err != nil { + if err := rp.addRecordWithKey(p.logger, model.TableWithPkey{}, rec); err != nil { return err } } @@ -805,25 +837,35 @@ func (rp *cdcRecordProcessor[Items]) processRelationMessage( ctx context.Context, p *PostgresCDCSource, lsn pglogrepl.LSN, - currRel *pglogrepl.RelationMessage, + msg *pglogrepl.RelationMessage, ) error { + // treat all relation messages as corresponding to parent if partitioned. + msg.RelationID = p.getParentRelIDIfPartitioned(msg.RelationID) + + if _, exists := p.SrcTableIDNameMapping[msg.RelationID]; !exists { + return nil + } + + p.logger.Debug(fmt.Sprintf("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v", + msg.RelationID, msg.Namespace, msg.RelationName, msg.Columns)) + // not present in tables to sync, return immediately - if _, ok := p.SrcTableIDNameMapping[currRel.RelationID]; !ok { + if _, ok := p.SrcTableIDNameMapping[msg.RelationID]; !ok { p.logger.Info("relid not present in srcTableIDNameMapping, skipping relation message", - slog.Uint64("relId", uint64(currRel.RelationID))) + slog.Uint64("relId", uint64(msg.RelationID))) return nil } // retrieve current TableSchema for table changed // tableNameSchemaMapping uses dst table name as the key, so annoying lookup - prevSchema := p.TableNameSchemaMapping[p.TableNameMapping[p.SrcTableIDNameMapping[currRel.RelationID]].Name] + prevSchema := p.TableNameSchemaMapping[p.TableNameMapping[p.SrcTableIDNameMapping[msg.RelationID]].Name] // creating maps for lookup later prevRelMap := make(map[string]string) currRelMap := make(map[string]string) for _, column := range prevSchema.Columns { prevRelMap[column.Name] = column.Type } - for _, column := range currRel.Columns { + for _, column := range msg.Columns { switch prevSchema.System { case protos.TypeSystem_Q: qKind := p.postgresOIDToQValueKind(column.DataType) @@ -842,16 +884,16 @@ func (rp *cdcRecordProcessor[Items]) processRelationMessage( } schemaDelta := &protos.TableSchemaDelta{ - SrcTableName: p.SrcTableIDNameMapping[currRel.RelationID], - DstTableName: p.TableNameMapping[p.SrcTableIDNameMapping[currRel.RelationID]].Name, + SrcTableName: p.SrcTableIDNameMapping[msg.RelationID], + DstTableName: p.TableNameMapping[p.SrcTableIDNameMapping[msg.RelationID]].Name, AddedColumns: make([]*protos.FieldDescription, 0), System: prevSchema.System, } - for _, column := range currRel.Columns { + for _, column := range msg.Columns { // not present in previous relation message, but in current one, so added. if _, ok := prevRelMap[column.Name]; !ok { // only add to delta if not excluded - if _, ok := p.TableNameMapping[p.SrcTableIDNameMapping[currRel.RelationID]].Exclude[column.Name]; !ok { + if _, ok := p.TableNameMapping[p.SrcTableIDNameMapping[msg.RelationID]].Exclude[column.Name]; !ok { schemaDelta.AddedColumns = append(schemaDelta.AddedColumns, &protos.FieldDescription{ Name: column.Name, Type: currRelMap[column.Name], @@ -874,7 +916,7 @@ func (rp *cdcRecordProcessor[Items]) processRelationMessage( } } - p.relationMessageMapping[currRel.RelationID] = currRel + p.relationMessageMapping[msg.RelationID] = msg // only log audit if there is actionable delta if len(schemaDelta.AddedColumns) > 0 { rec := &model.RelationRecord[Items]{ diff --git a/nexus/catalog/migrations/V26__v2.sql b/nexus/catalog/migrations/V26__v2.sql new file mode 100644 index 0000000000..4dbd34b173 --- /dev/null +++ b/nexus/catalog/migrations/V26__v2.sql @@ -0,0 +1,7 @@ +create table v2cdc ( + flow_name text, + xid xid, + lsn pg_lsn, + stream bytea[], + primary key (flow_name, xid, lsn) +);