Skip to content

Commit

Permalink
remove postgres SlotSignal
Browse files Browse the repository at this point in the history
this avoids an unnecessary goroutine,
instead of having a goroutine wait on channel close to close connection just close connection

but the larger reason I'm doing this is so that this logic can move behind a connector interface,
where mysql will use this as an opportunity to SHOW BINARY LOG STATUS
  • Loading branch information
serprex committed Jan 17, 2025
1 parent 4eefa08 commit 81191b0
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 85 deletions.
18 changes: 8 additions & 10 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"go.temporal.io/sdk/activity"

Expand All @@ -20,7 +21,7 @@ import (

type SlotSnapshotState struct {
connector connectors.CDCPullConnector
signal connpostgres.SlotSignal
slotConn *pgx.Conn
snapshotName string
}

Expand All @@ -43,7 +44,7 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s
defer a.SnapshotStatesMutex.Unlock()

if s, ok := a.SlotSnapshotStates[flowJobName]; ok {
close(s.signal.CloneComplete)
s.slotConn.Close(ctx)
connectors.CloseConnector(ctx, s.connector)
delete(a.SlotSnapshotStates, flowJobName)
}
Expand Down Expand Up @@ -76,15 +77,12 @@ func (a *SnapshotActivity) SetupReplication(
connectors.CloseConnector(ctx, conn)
}

slotSignal := connpostgres.NewSlotSignal()
go conn.SetupReplication(ctx, slotSignal, config)

logger.Info("waiting for slot to be created...")
slotInfo := <-slotSignal.SlotCreated
slotInfo, err := conn.SetupReplication(ctx, config)

if slotInfo.Err != nil {
closeConnectionForError(slotInfo.Err)
return nil, fmt.Errorf("slot error: %w", slotInfo.Err)
if err != nil {
closeConnectionForError(err)
return nil, fmt.Errorf("slot error: %w", err)
} else {
logger.Info("slot created", slog.String("SlotName", slotInfo.SlotName))
}
Expand All @@ -93,7 +91,7 @@ func (a *SnapshotActivity) SetupReplication(
defer a.SnapshotStatesMutex.Unlock()

a.SlotSnapshotStates[config.FlowJobName] = SlotSnapshotState{
signal: slotSignal,
slotConn: slotInfo.Conn,
snapshotName: slotInfo.SnapshotName,
connector: conn,
}
Expand Down
53 changes: 20 additions & 33 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,53 +356,46 @@ func (c *PostgresConnector) CreatePublication(
// createSlotAndPublication creates the replication slot and publication.
func (c *PostgresConnector) createSlotAndPublication(
ctx context.Context,
signal SlotSignal,
s SlotCheckResult,
slot string,
publication string,
tableNameMapping map[string]model.NameAndExclude,
doInitialCopy bool,
) {
) (SlotCreationResult, error) {
// iterate through source tables and create publication,
// expecting tablenames to be schema qualified
if !s.PublicationExists {
srcTableNames := make([]string, 0, len(tableNameMapping))
for srcTableName := range tableNameMapping {
parsedSrcTableName, err := utils.ParseSchemaTable(srcTableName)
if err != nil {
signal.SlotCreated <- SlotCreationResult{
Err: fmt.Errorf("[publication-creation] source table identifier %s is invalid", srcTableName),
}
return
return SlotCreationResult{}, fmt.Errorf("[publication-creation] source table identifier %s is invalid", srcTableName)
}
srcTableNames = append(srcTableNames, parsedSrcTableName.String())
}
if err := c.CreatePublication(ctx, srcTableNames, publication); err != nil {
signal.SlotCreated <- SlotCreationResult{Err: err}
return
return SlotCreationResult{}, err
}
}

// create slot only after we succeeded in creating publication.
if !s.SlotExists {
conn, err := c.CreateReplConn(ctx)
if err != nil {
signal.SlotCreated <- SlotCreationResult{Err: fmt.Errorf("[slot] error acquiring connection: %w", err)}
return
return SlotCreationResult{}, fmt.Errorf("[slot] error acquiring connection: %w", err)
}
defer conn.Close(ctx)

c.logger.Warn(fmt.Sprintf("Creating replication slot '%s'", slot))

// THIS IS NOT IN A TX!
if _, err := conn.Exec(ctx, "SET idle_in_transaction_session_timeout=0"); err != nil {
signal.SlotCreated <- SlotCreationResult{Err: fmt.Errorf("[slot] error setting idle_in_transaction_session_timeout: %w", err)}
return
conn.Close(ctx)
return SlotCreationResult{}, fmt.Errorf("[slot] error setting idle_in_transaction_session_timeout: %w", err)
}

if _, err := conn.Exec(ctx, "SET lock_timeout=0"); err != nil {
signal.SlotCreated <- SlotCreationResult{Err: fmt.Errorf("[slot] error setting lock_timeout: %w", err)}
return
conn.Close(ctx)
return SlotCreationResult{}, fmt.Errorf("[slot] error setting lock_timeout: %w", err)
}

opts := pglogrepl.CreateReplicationSlotOptions{
Expand All @@ -411,39 +404,33 @@ func (c *PostgresConnector) createSlotAndPublication(
}
res, err := pglogrepl.CreateReplicationSlot(ctx, conn.PgConn(), slot, "pgoutput", opts)
if err != nil {
signal.SlotCreated <- SlotCreationResult{Err: fmt.Errorf("[slot] error creating replication slot: %w", err)}
return
conn.Close(ctx)
return SlotCreationResult{}, fmt.Errorf("[slot] error creating replication slot: %w", err)
}

pgversion, err := c.MajorVersion(ctx)
if err != nil {
signal.SlotCreated <- SlotCreationResult{Err: fmt.Errorf("[slot] error getting PG version: %w", err)}
return
conn.Close(ctx)
return SlotCreationResult{}, fmt.Errorf("[slot] error getting PG version: %w", err)
}

c.logger.Info(fmt.Sprintf("Created replication slot '%s'", slot))
slotDetails := SlotCreationResult{
return SlotCreationResult{
SlotName: res.SlotName,
SnapshotName: res.SnapshotName,
Err: nil,
SupportsTIDScans: pgversion >= shared.POSTGRES_13,
}
signal.SlotCreated <- slotDetails
c.logger.Info("Waiting for clone to complete")
<-signal.CloneComplete
c.logger.Info("Clone complete")
}, nil
} else {
c.logger.Info(fmt.Sprintf("Replication slot '%s' already exists", slot))
slotDetails := SlotCreationResult{
var err error
if doInitialCopy {
err = ErrSlotAlreadyExists
}
return SlotCreationResult{
SlotName: slot,
SnapshotName: "",
Err: nil,
SupportsTIDScans: false,
}
if doInitialCopy {
slotDetails.Err = ErrSlotAlreadyExists
}
signal.SlotCreated <- slotDetails
}, err
}
}

Expand Down
24 changes: 15 additions & 9 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ type ReplState struct {
LastOffset atomic.Int64
}

type SlotCreationResult struct {

Check failure on line 60 in flow/connectors/postgres/postgres.go

View workflow job for this annotation

GitHub Actions / lint

fieldalignment: struct with 48 pointer bytes could be 32 (govet)
SlotName string
SnapshotName string
SupportsTIDScans bool
Conn *pgx.Conn
}

func NewPostgresConnector(ctx context.Context, env map[string]string, pgConfig *protos.PostgresConfig) (*PostgresConnector, error) {
logger := shared.LoggerFromCtx(ctx)
flowNameInApplicationName, err := peerdbenv.PeerDBApplicationNamePerMirrorName(ctx, nil)
Expand Down Expand Up @@ -1088,13 +1095,13 @@ func (c *PostgresConnector) FinishExport(tx any) error {
return pgtx.Commit(timeout)
}

// SetupReplication sets up replication for the source connector.
func (c *PostgresConnector) SetupReplication(ctx context.Context, signal SlotSignal, req *protos.SetupReplicationInput) {
// SetupReplication sets up replication for the source connector
func (c *PostgresConnector) SetupReplication(
ctx context.Context,
req *protos.SetupReplicationInput,
) (SlotCreationResult, error) {
if !shared.IsValidReplicationName(req.FlowJobName) {
signal.SlotCreated <- SlotCreationResult{
Err: fmt.Errorf("invalid flow job name: `%s`, it should be ^[a-z_][a-z0-9_]*$", req.FlowJobName),
}
return
return SlotCreationResult{}, fmt.Errorf("invalid flow job name: `%s`, it should be ^[a-z_][a-z0-9_]*$", req.FlowJobName)
}

// Slotname would be the job name prefixed with "peerflow_slot_"
Expand All @@ -1111,8 +1118,7 @@ func (c *PostgresConnector) SetupReplication(ctx context.Context, signal SlotSig
// Check if the replication slot and publication exist
exists, err := c.checkSlotAndPublication(ctx, slotName, publicationName)
if err != nil {
signal.SlotCreated <- SlotCreationResult{Err: err}
return
return SlotCreationResult{}, err
}

tableNameMapping := make(map[string]model.NameAndExclude, len(req.TableNameMapping))
Expand All @@ -1123,7 +1129,7 @@ func (c *PostgresConnector) SetupReplication(ctx context.Context, signal SlotSig
}
}
// Create the replication slot and publication
c.createSlotAndPublication(ctx, signal, exists, slotName, publicationName, tableNameMapping, req.DoInitialSnapshot)
return c.createSlotAndPublication(ctx, exists, slotName, publicationName, tableNameMapping, req.DoInitialSnapshot)
}

func (c *PostgresConnector) PullFlowCleanup(ctx context.Context, jobName string) error {
Expand Down
24 changes: 0 additions & 24 deletions flow/connectors/postgres/slot_signal.go

This file was deleted.

14 changes: 5 additions & 9 deletions flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/jackc/pgx/v5/pgtype"
"github.com/stretchr/testify/require"

connpostgres "github.com/PeerDB-io/peerdb/flow/connectors/postgres"
"github.com/PeerDB-io/peerdb/flow/e2e"
"github.com/PeerDB-io/peerdb/flow/e2eshared"
"github.com/PeerDB-io/peerdb/flow/generated/protos"
Expand Down Expand Up @@ -159,16 +158,13 @@ func (s PeerFlowE2ETestSuitePG) TestSimpleSlotCreation() {
},
}

signal := connpostgres.NewSlotSignal()
go s.conn.SetupReplication(context.Background(), signal, setupReplicationInput)

s.t.Log("waiting for slot creation to complete: " + flowJobName)
slotInfo := <-signal.SlotCreated
s.t.Logf("slot creation complete: %v. Signaling clone complete in 2 seconds", slotInfo)
time.Sleep(2 * time.Second)
close(signal.CloneComplete)
slotInfo, err := s.conn.SetupReplication(context.Background(), setupReplicationInput)
require.NoError(s.t, err)

s.t.Logf("slot creation complete: %v", slotInfo)
slotInfo.Conn.Close(nil)

Check failure on line 166 in flow/e2e/postgres/qrep_flow_pg_test.go

View workflow job for this annotation

GitHub Actions / lint

SA1012: do not pass a nil Context, even if a function permits it; pass context.TODO if you are unsure about which Context to use (staticcheck)

require.NoError(s.t, slotInfo.Err)
s.t.Logf("successfully setup replication: %s", flowJobName)
}

Expand Down

0 comments on commit 81191b0

Please sign in to comment.