Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mysql #2395

Draft
wants to merge 80 commits into
base: main
Choose a base branch
from
Draft

mysql #2395

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
06148f8
wip
serprex Dec 13, 2024
e0e005b
wip2
serprex Dec 13, 2024
27e9a12
some cdc loop sketch, fat checkpoint type
serprex Dec 15, 2024
2cd88bb
filling out logic to take in row
serprex Dec 16, 2024
7edd0fe
fix lints
serprex Dec 25, 2024
2f1d447
lift logic out of connector into activity, will make easier to implem…
serprex Dec 25, 2024
fb82b3e
mysql: GetVersionConnector
serprex Dec 25, 2024
c7ff51d
mysql fetched bytes counter
serprex Dec 25, 2024
b9d891d
fix lints
serprex Dec 26, 2024
fbe1766
fix nexus
serprex Dec 26, 2024
9af8c58
mysql: GetTableSchemaConnector
serprex Dec 26, 2024
618d3f1
GTIDSet juggling
serprex Dec 26, 2024
d816599
skip validation if source mysql, don't require CDCSyncConnector imple…
serprex Dec 27, 2024
25c60e7
turns out MySQL defaults to ANSI_QUOTES being disabled, so use backti…
serprex Dec 27, 2024
9ba7f66
fix nil deref
serprex Dec 27, 2024
3186f3a
fixes
serprex Dec 27, 2024
7744811
fail fast
serprex Dec 27, 2024
70fc2af
fix updates, fix syncer already open error
serprex Dec 28, 2024
ead7c8b
fix lints
serprex Dec 28, 2024
769c9ad
qrep first draft
serprex Dec 31, 2024
f467e14
cleanup
serprex Jan 2, 2025
d2cea38
fix partition query with pg improvements, fix full table refresh chec…
serprex Jan 3, 2025
09a4f65
set sql_mode = ANSI. It was this or adding template parameter for tab…
serprex Jan 3, 2025
b338d68
fixes
serprex Jan 3, 2025
b28325f
syncer gets recreated each batch
serprex Jan 3, 2025
02648fb
Support file position streaming. Probably not supposed to mix these m…
serprex Jan 3, 2025
58193d0
ExecuteSelectStreaming
serprex Jan 5, 2025
4a67a78
Support gtid_mode not ON
serprex Jan 6, 2025
9d6d513
e2e wip
serprex Jan 7, 2025
103a3db
import renames
serprex Jan 7, 2025
92e27e7
fix lints
serprex Jan 8, 2025
088a077
run mysql in ci
serprex Jan 8, 2025
8ad2979
mysql setup/teardown
serprex Jan 8, 2025
ff24e2d
setup mysql source, skip when test relies on Connector
serprex Jan 9, 2025
3b5a906
GetMySqlRows
serprex Jan 9, 2025
1af056c
SuiteSource Exec. Requires some care that SQL is compatible with pg &…
serprex Jan 9, 2025
c45fa8a
looks like serial is unsigned
serprex Jan 10, 2025
4b141ee
log date's bytes
serprex Jan 10, 2025
b70d03e
date/time formats
serprex Jan 10, 2025
412d5ee
sigh, mysql unquoted identifiers map to all caps, meanwhile postgres..
serprex Jan 10, 2025
490af9a
try new gtid code, also keep chipping away at date32 test
serprex Jan 10, 2025
01008b0
log mysql Execute
serprex Jan 10, 2025
b5b3568
share more connect logic
serprex Jan 10, 2025
58df5ca
test `
serprex Jan 10, 2025
7c1d600
need to close statement to not hit limits
serprex Jan 10, 2025
df2a8c8
show databases
serprex Jan 10, 2025
d6e621a
force query
serprex Jan 10, 2025
672c5e1
show tables
serprex Jan 10, 2025
c5675d0
log exits
serprex Jan 10, 2025
db8d7e2
did I misread ordering of these callbacks
serprex Jan 10, 2025
f5cfddd
less logging, crashed browser tab, but somehow we're seeing results i…
serprex Jan 10, 2025
ff1bb1d
I'm an idiot
serprex Jan 11, 2025
648ba2a
convert couple more tests to pg/mysql, remove Genericness
serprex Jan 11, 2025
31ab468
map blob to string since clickhouse string is binary target anyways
serprex Jan 13, 2025
09b8eca
gtid_mode is mysql specific
serprex Jan 13, 2025
19b2518
fix update
serprex Jan 13, 2025
6261fde
try mysql over maria
serprex Jan 13, 2025
126c8c4
more quoted key
serprex Jan 13, 2025
bdb6a3e
cleanup while preparing demo
serprex Jan 14, 2025
4363077
lint
serprex Jan 14, 2025
e9da7ff
is text blob with charset?
serprex Jan 14, 2025
4a03d8d
use charset for blob
serprex Jan 14, 2025
99661ee
don't trust mysql types
serprex Jan 14, 2025
b076396
e2e: snowflake does not like quoted columns, just avoid keyword in ge…
serprex Jan 14, 2025
56f8ccc
disable gtid on maria, log binary log status
serprex Jan 15, 2025
6d766d9
run e2e vs both maria & mysql
serprex Jan 15, 2025
eb43083
I'm a fool
serprex Jan 15, 2025
ec248fd
i give up
serprex Jan 16, 2025
eec16a3
remove comment
serprex Jan 16, 2025
327a59d
remove postgres SlotSignal
serprex Jan 17, 2025
57b4c5f
move mysql offset setup to SetupReplication
serprex Jan 17, 2025
8511f6c
fix
serprex Jan 17, 2025
e884f6b
need to init Conn
serprex Jan 17, 2025
8e7c85b
fix generic, remove GeneratePostgresFlowConnectionConfigs
serprex Jan 17, 2025
7e0c7dd
mysql logo
serprex Jan 17, 2025
ebf67f1
no need for param
serprex Jan 17, 2025
22e1106
mysql does not defer GetLastOffset to destination
serprex Jan 17, 2025
3ab0360
track position more accurately, avoid clearing it
serprex Jan 17, 2025
9927ae0
avoid segfault
serprex Jan 18, 2025
2d4c504
"
serprex Jan 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ jobs:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
POSTGRES_INITDB_ARGS: --locale=C.UTF-8
mysql:
image: mysql:oracle
ports:
- 3306:3306
env:
MYSQL_ROOT_PASSWORD: cipass
#mariadb:
# image: mariadb:lts-ubi
# ports:
# - 3300:3306
# env:
# MARIADB_ROOT_PASSWORD: cipass
redpanda:
image: redpandadata/redpanda@sha256:7214ddaf8426d25936459cf77c1f905566a4483a97d2b13006120dcd98a5c846
ports:
Expand Down
86 changes: 62 additions & 24 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log/slog"
"sync/atomic"
"time"

"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5"
Expand All @@ -31,8 +32,7 @@ import (
"github.com/PeerDB-io/peerdb/flow/shared"
)

// CheckConnectionResult is the result of a CheckConnection call.
type CheckConnectionResult struct {
type CheckMetadataTablesResult struct {
NeedsSetupMetadataTables bool
}

Expand All @@ -54,18 +54,39 @@ type StreamCloser interface {
func (a *FlowableActivity) CheckConnection(
ctx context.Context,
config *protos.SetupInput,
) (*CheckConnectionResult, error) {
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
dstConn, err := connectors.GetByNameAs[connectors.CDCSyncConnector](ctx, config.Env, a.CatalogPool, config.PeerName)
conn, err := connectors.GetByNameAs[connectors.CDCSyncConnector](ctx, config.Env, a.CatalogPool, config.PeerName)
if err != nil {
if errors.Is(err, errors.ErrUnsupported) {
return nil
}
a.Alerter.LogFlowError(ctx, config.FlowName, err)
return fmt.Errorf("failed to get connector: %w", err)
}
defer connectors.CloseConnector(ctx, conn)

return conn.ConnectionActive(ctx)
}

func (a *FlowableActivity) CheckMetadataTables(
ctx context.Context,
config *protos.SetupInput,
) (*CheckMetadataTablesResult, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
conn, err := connectors.GetByNameAs[connectors.CDCSyncConnector](ctx, config.Env, a.CatalogPool, config.PeerName)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowName, err)
return nil, fmt.Errorf("failed to get connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)
defer connectors.CloseConnector(ctx, conn)

needsSetup := dstConn.NeedsSetupMetadataTables(ctx)
needsSetup, err := conn.NeedsSetupMetadataTables(ctx)
if err != nil {
return nil, err
}

return &CheckConnectionResult{
return &CheckMetadataTablesResult{
NeedsSetupMetadataTables: needsSetup,
}, nil
}
Expand Down Expand Up @@ -467,15 +488,14 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
return nil, fmt.Errorf("failed to get partitions from source: %w", err)
}
if len(partitions) > 0 {
err = monitoring.InitializeQRepRun(
if err := monitoring.InitializeQRepRun(
ctx,
a.CatalogPool,
config,
runUUID,
partitions,
config.ParentMirrorName,
)
if err != nil {
); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -806,12 +826,30 @@ func (a *FlowableActivity) QRepHasNewRows(ctx context.Context,

logger.Info(fmt.Sprintf("current last partition value is %v", last))

result, err := srcConn.CheckForUpdatedMaxValue(ctx, config, last)
maxValue, err := srcConn.GetMaxValue(ctx, config, last)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return false, fmt.Errorf("failed to check for new rows: %w", err)
}
return result, nil

if maxValue == nil || last == nil || last.Range == nil {
return maxValue != nil, nil
}

switch x := last.Range.Range.(type) {
case *protos.PartitionRange_IntRange:
if maxValue.(int64) > x.IntRange.End {
return true, nil
}
case *protos.PartitionRange_TimestampRange:
if maxValue.(time.Time).After(x.TimestampRange.End.AsTime()) {
return true, nil
}
default:
return false, fmt.Errorf("unknown range type: %v", x)
}

return false, nil
}

func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) {
Expand Down Expand Up @@ -932,15 +970,15 @@ func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *prot
}
defer connectors.CloseConnector(ctx, srcConn)

err = srcConn.AddTablesToPublication(ctx, &protos.AddTablesToPublicationInput{
if err := srcConn.AddTablesToPublication(ctx, &protos.AddTablesToPublicationInput{
FlowJobName: cfg.FlowJobName,
PublicationName: cfg.PublicationName,
AdditionalTables: additionalTableMappings,
})
if err != nil {
}); err != nil {
a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err)
return err
}
return err
return nil
}

func (a *FlowableActivity) RemoveTablesFromPublication(
Expand All @@ -955,15 +993,15 @@ func (a *FlowableActivity) RemoveTablesFromPublication(
}
defer connectors.CloseConnector(ctx, srcConn)

err = srcConn.RemoveTablesFromPublication(ctx, &protos.RemoveTablesFromPublicationInput{
if err := srcConn.RemoveTablesFromPublication(ctx, &protos.RemoveTablesFromPublicationInput{
FlowJobName: cfg.FlowJobName,
PublicationName: cfg.PublicationName,
TablesToRemove: removedTablesMapping,
})
if err != nil {
}); err != nil {
a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err)
return err
}
return err
return nil
}

func (a *FlowableActivity) RemoveTablesFromRawTable(
Expand Down Expand Up @@ -1001,16 +1039,16 @@ func (a *FlowableActivity) RemoveTablesFromRawTable(
for _, table := range tablesToRemove {
tableNames = append(tableNames, table.DestinationTableIdentifier)
}
err = dstConn.RemoveTableEntriesFromRawTable(ctx, &protos.RemoveTablesFromRawTableInput{
if err := dstConn.RemoveTableEntriesFromRawTable(ctx, &protos.RemoveTablesFromRawTableInput{
FlowJobName: cfg.FlowJobName,
DestinationTableNames: tableNames,
SyncBatchId: syncBatchID,
NormalizeBatchId: normBatchID,
})
if err != nil {
}); err != nil {
a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err)
return err
}
return err
return nil
}

func (a *FlowableActivity) RemoveTablesFromCatalog(
Expand Down
38 changes: 23 additions & 15 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peerdb/flow/connectors"
"github.com/PeerDB-io/peerdb/flow/connectors/mysql"
connpostgres "github.com/PeerDB-io/peerdb/flow/connectors/postgres"
"github.com/PeerDB-io/peerdb/flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peerdb/flow/generated/protos"
Expand Down Expand Up @@ -133,23 +134,27 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
batchSize = 250_000
}

lastOffset, err := func() (int64, error) {
dstConn, err := connectors.GetByNameAs[TSync](ctx, config.Env, a.CatalogPool, config.DestinationName)
if err != nil {
return 0, fmt.Errorf("failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)
lastOffset, err := func() (model.CdcCheckpoint, error) {
if myConn, isMy := any(srcConn).(*connmysql.MySqlConnector); isMy {
return myConn.GetLastOffset(ctx, config.FlowJobName)
} else {
dstConn, err := connectors.GetByNameAs[TSync](ctx, config.Env, a.CatalogPool, config.DestinationName)
if err != nil {
return model.CdcCheckpoint{}, fmt.Errorf("failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

return dstConn.GetLastOffset(ctx, config.FlowJobName)
return dstConn.GetLastOffset(ctx, config.FlowJobName)
}
}()
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
}

logger.Info("pulling records...", slog.Int64("LastOffset", lastOffset))
logger.Info("pulling records...", slog.Any("LastOffset", lastOffset))
consumedOffset := atomic.Int64{}
consumedOffset.Store(lastOffset)
consumedOffset.Store(lastOffset.ID)

channelBufferSize, err := peerdbenv.PeerDBCDCChannelBufferSize(ctx, config.Env)
if err != nil {
Expand Down Expand Up @@ -224,7 +229,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
return nil, a.applySchemaDeltas(ctx, config, options, recordBatchSync.SchemaDeltas)
}

var syncStartTime time.Time
var res *model.SyncResponse
errGroup.Go(func() error {
dstConn, err := connectors.GetByNameAs[TSync](ctx, config.Env, a.CatalogPool, config.DestinationName)
Expand All @@ -251,7 +255,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
return err
}

syncStartTime = time.Now()
res, err = sync(dstConn, errCtx, &model.SyncRecordsRequest[Items]{
SyncBatchID: syncBatchID,
Records: recordBatchSync,
Expand All @@ -271,6 +274,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
return nil
})

syncStartTime := time.Now()
if err := errGroup.Wait(); err != nil {
// don't log flow error for "replState changed" and "slot is already active"
if !(temporal.IsApplicationError(err) ||
Expand All @@ -280,14 +284,18 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
if temporal.IsApplicationError(err) {
return nil, err
} else {
return nil, fmt.Errorf("failed to pull records: %w", err)
return nil, fmt.Errorf("[cdc] failed to pull records: %w", err)
}
}
syncState.Store(shared.Ptr("bookkeeping"))

syncDuration := time.Since(syncStartTime)
lastCheckpoint := recordBatchSync.GetLastCheckpoint()
srcConn.UpdateReplStateLastOffset(lastCheckpoint)
logger.Info("batch synced", slog.Any("checkpoint", lastCheckpoint))
if err := srcConn.UpdateReplStateLastOffset(ctx, lastCheckpoint); err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
}

if err := monitoring.UpdateNumRowsAndEndLSNForCDCBatch(
ctx, a.CatalogPool, flowName, res.CurrentSyncBatchID, uint32(res.NumRecordsSynced), lastCheckpoint,
Expand All @@ -296,7 +304,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
return nil, err
}

if err := monitoring.UpdateLatestLSNAtTargetForCDCFlow(ctx, a.CatalogPool, flowName, lastCheckpoint); err != nil {
if err := monitoring.UpdateLatestLSNAtTargetForCDCFlow(ctx, a.CatalogPool, flowName, lastCheckpoint.ID); err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
}
Expand Down Expand Up @@ -453,7 +461,7 @@ func replicateQRepPartition[TRead any, TWrite StreamCloser, TSync connectors.QRe
tmp, err := pullRecords(srcConn, errCtx, config, partition, stream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to pull records: %w", err)
return fmt.Errorf("[qrep] failed to pull records: %w", err)
}
numRecords := int64(tmp)
if err := monitoring.UpdatePullEndTimeAndRowsForPartition(
Expand Down
Loading
Loading