Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove slot signal #2452

Merged
merged 3 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 32 additions & 15 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ import (
"github.com/PeerDB-io/peerdb/flow/shared"
)

// CheckConnectionResult is the result of a CheckConnection call.
type CheckConnectionResult struct {
type CheckMetadataTablesResult struct {
NeedsSetupMetadataTables bool
}

Expand All @@ -54,18 +53,36 @@ type StreamCloser interface {
func (a *FlowableActivity) CheckConnection(
ctx context.Context,
config *protos.SetupInput,
) (*CheckConnectionResult, error) {
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
dstConn, err := connectors.GetByNameAs[connectors.CDCSyncConnector](ctx, config.Env, a.CatalogPool, config.PeerName)
conn, err := connectors.GetByNameAs[connectors.CDCSyncConnector](ctx, config.Env, a.CatalogPool, config.PeerName)
if err != nil {
if errors.Is(err, errors.ErrUnsupported) {
return nil
}
a.Alerter.LogFlowError(ctx, config.FlowName, err)
return fmt.Errorf("failed to get connector: %w", err)
}
defer connectors.CloseConnector(ctx, conn)

return conn.ConnectionActive(ctx)
}

func (a *FlowableActivity) CheckMetadataTables(
ctx context.Context,
config *protos.SetupInput,
) (*CheckMetadataTablesResult, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
conn, err := connectors.GetByNameAs[connectors.CDCSyncConnector](ctx, config.Env, a.CatalogPool, config.PeerName)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowName, err)
return nil, fmt.Errorf("failed to get connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)
defer connectors.CloseConnector(ctx, conn)

needsSetup := dstConn.NeedsSetupMetadataTables(ctx)
needsSetup := conn.NeedsSetupMetadataTables(ctx)

return &CheckConnectionResult{
return &CheckMetadataTablesResult{
NeedsSetupMetadataTables: needsSetup,
}, nil
}
Expand Down Expand Up @@ -939,15 +956,15 @@ func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *prot
}
defer connectors.CloseConnector(ctx, srcConn)

err = srcConn.AddTablesToPublication(ctx, &protos.AddTablesToPublicationInput{
if err := srcConn.AddTablesToPublication(ctx, &protos.AddTablesToPublicationInput{
FlowJobName: cfg.FlowJobName,
PublicationName: cfg.PublicationName,
AdditionalTables: additionalTableMappings,
})
if err != nil {
}); err != nil {
a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err)
return err
}
return err
return nil
}

func (a *FlowableActivity) RemoveTablesFromPublication(
Expand All @@ -962,15 +979,15 @@ func (a *FlowableActivity) RemoveTablesFromPublication(
}
defer connectors.CloseConnector(ctx, srcConn)

err = srcConn.RemoveTablesFromPublication(ctx, &protos.RemoveTablesFromPublicationInput{
if err := srcConn.RemoveTablesFromPublication(ctx, &protos.RemoveTablesFromPublicationInput{
FlowJobName: cfg.FlowJobName,
PublicationName: cfg.PublicationName,
TablesToRemove: removedTablesMapping,
})
if err != nil {
}); err != nil {
a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err)
return err
}
return err
return nil
}

func (a *FlowableActivity) RemoveTablesFromRawTable(
Expand Down
38 changes: 15 additions & 23 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package activities

import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
Expand All @@ -13,14 +12,13 @@ import (

"github.com/PeerDB-io/peerdb/flow/alerting"
"github.com/PeerDB-io/peerdb/flow/connectors"
connpostgres "github.com/PeerDB-io/peerdb/flow/connectors/postgres"
"github.com/PeerDB-io/peerdb/flow/generated/protos"
"github.com/PeerDB-io/peerdb/flow/shared"
)

type SlotSnapshotState struct {
connector connectors.CDCPullConnector
signal connpostgres.SlotSignal
connector connectors.CDCPullConnectorCore
slotConn interface{ Close(context.Context) error }
snapshotName string
}

Expand All @@ -43,7 +41,9 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(ctx context.Context, flowJobName s
defer a.SnapshotStatesMutex.Unlock()

if s, ok := a.SlotSnapshotStates[flowJobName]; ok {
close(s.signal.CloneComplete)
if s.slotConn != nil {
s.slotConn.Close(ctx)
}
connectors.CloseConnector(ctx, s.connector)
delete(a.SlotSnapshotStates, flowJobName)
}
Expand All @@ -61,30 +61,22 @@ func (a *SnapshotActivity) SetupReplication(

a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job")

conn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, nil, a.CatalogPool, config.PeerName)
conn, err := connectors.GetByNameAs[connectors.CDCPullConnectorCore](ctx, nil, a.CatalogPool, config.PeerName)
if err != nil {
if errors.Is(err, errors.ErrUnsupported) {
logger.Info("setup replication is no-op for non-postgres source")
return nil, nil
}
return nil, fmt.Errorf("failed to get connector: %w", err)
}

closeConnectionForError := func(err error) {
logger.Info("waiting for slot to be created...")
slotInfo, err := conn.SetupReplication(ctx, config)

if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
// it is important to close the connection here as it is not closed in CloseSlotKeepAlive
connectors.CloseConnector(ctx, conn)
}

slotSignal := connpostgres.NewSlotSignal()
go conn.SetupReplication(ctx, slotSignal, config)

logger.Info("waiting for slot to be created...")
slotInfo := <-slotSignal.SlotCreated

if slotInfo.Err != nil {
closeConnectionForError(slotInfo.Err)
return nil, fmt.Errorf("slot error: %w", slotInfo.Err)
return nil, fmt.Errorf("slot error: %w", err)
} else if slotInfo.Conn == nil && slotInfo.SlotName == "" {
logger.Info("replication setup without slot")
return nil, nil
} else {
logger.Info("slot created", slog.String("SlotName", slotInfo.SlotName))
}
Expand All @@ -93,7 +85,7 @@ func (a *SnapshotActivity) SetupReplication(
defer a.SnapshotStatesMutex.Unlock()

a.SlotSnapshotStates[config.FlowJobName] = SlotSnapshotState{
signal: slotSignal,
slotConn: slotInfo.Conn,
snapshotName: slotInfo.SnapshotName,
connector: conn,
}
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type ValidationConnector interface {
type GetTableSchemaConnector interface {
Connector

// GetTableSchema returns the schema of a table in terms of QValueKind.
// GetTableSchema returns the schema of a table in terms of type system.
GetTableSchema(
ctx context.Context,
env map[string]string,
Expand All @@ -67,6 +67,9 @@ type CDCPullConnectorCore interface {
// `any` from ExportSnapshot passed here when done, allowing transaction to commit
FinishExport(any) error

// Setup replication in prep for initial copy
SetupReplication(context.Context, *protos.SetupReplicationInput) (model.SetupReplicationResult, error)

// Methods related to retrieving and pushing records for this connector as a source and destination.
SetupReplConn(context.Context) error

Expand Down
57 changes: 20 additions & 37 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,16 +221,15 @@ func (c *PostgresConnector) getNullableColumns(ctx context.Context, relID uint32

func (c *PostgresConnector) tableExists(ctx context.Context, schemaTable *utils.SchemaTable) (bool, error) {
var exists pgtype.Bool
err := c.conn.QueryRow(ctx,
if err := c.conn.QueryRow(ctx,
`SELECT EXISTS (
SELECT FROM pg_tables
WHERE schemaname = $1
AND tablename = $2
)`,
schemaTable.Schema,
schemaTable.Table,
).Scan(&exists)
if err != nil {
).Scan(&exists); err != nil {
return false, fmt.Errorf("error checking if table exists: %w", err)
}

Expand Down Expand Up @@ -362,53 +361,46 @@ func (c *PostgresConnector) CreatePublication(
// createSlotAndPublication creates the replication slot and publication.
func (c *PostgresConnector) createSlotAndPublication(
ctx context.Context,
signal SlotSignal,
s SlotCheckResult,
slot string,
publication string,
tableNameMapping map[string]model.NameAndExclude,
doInitialCopy bool,
) {
) (model.SetupReplicationResult, error) {
// iterate through source tables and create publication,
// expecting tablenames to be schema qualified
if !s.PublicationExists {
srcTableNames := make([]string, 0, len(tableNameMapping))
for srcTableName := range tableNameMapping {
parsedSrcTableName, err := utils.ParseSchemaTable(srcTableName)
if err != nil {
signal.SlotCreated <- SlotCreationResult{
Err: fmt.Errorf("[publication-creation] source table identifier %s is invalid", srcTableName),
}
return
return model.SetupReplicationResult{}, fmt.Errorf("[publication-creation] source table identifier %s is invalid", srcTableName)
}
srcTableNames = append(srcTableNames, parsedSrcTableName.String())
}
if err := c.CreatePublication(ctx, srcTableNames, publication); err != nil {
signal.SlotCreated <- SlotCreationResult{Err: err}
return
return model.SetupReplicationResult{}, err
}
}

// create slot only after we succeeded in creating publication.
if !s.SlotExists {
conn, err := c.CreateReplConn(ctx)
if err != nil {
signal.SlotCreated <- SlotCreationResult{Err: fmt.Errorf("[slot] error acquiring connection: %w", err)}
return
return model.SetupReplicationResult{}, fmt.Errorf("[slot] error acquiring connection: %w", err)
}
defer conn.Close(ctx)

c.logger.Warn(fmt.Sprintf("Creating replication slot '%s'", slot))

// THIS IS NOT IN A TX!
if _, err := conn.Exec(ctx, "SET idle_in_transaction_session_timeout=0"); err != nil {
signal.SlotCreated <- SlotCreationResult{Err: fmt.Errorf("[slot] error setting idle_in_transaction_session_timeout: %w", err)}
return
conn.Close(ctx)
return model.SetupReplicationResult{}, fmt.Errorf("[slot] error setting idle_in_transaction_session_timeout: %w", err)
}

if _, err := conn.Exec(ctx, "SET lock_timeout=0"); err != nil {
signal.SlotCreated <- SlotCreationResult{Err: fmt.Errorf("[slot] error setting lock_timeout: %w", err)}
return
conn.Close(ctx)
return model.SetupReplicationResult{}, fmt.Errorf("[slot] error setting lock_timeout: %w", err)
}

opts := pglogrepl.CreateReplicationSlotOptions{
Expand All @@ -417,39 +409,30 @@ func (c *PostgresConnector) createSlotAndPublication(
}
res, err := pglogrepl.CreateReplicationSlot(ctx, conn.PgConn(), slot, "pgoutput", opts)
if err != nil {
signal.SlotCreated <- SlotCreationResult{Err: fmt.Errorf("[slot] error creating replication slot: %w", err)}
return
conn.Close(ctx)
return model.SetupReplicationResult{}, fmt.Errorf("[slot] error creating replication slot: %w", err)
}

pgversion, err := c.MajorVersion(ctx)
if err != nil {
signal.SlotCreated <- SlotCreationResult{Err: fmt.Errorf("[slot] error getting PG version: %w", err)}
return
conn.Close(ctx)
return model.SetupReplicationResult{}, fmt.Errorf("[slot] error getting PG version: %w", err)
}

c.logger.Info(fmt.Sprintf("Created replication slot '%s'", slot))
slotDetails := SlotCreationResult{
return model.SetupReplicationResult{
Conn: conn,
SlotName: res.SlotName,
SnapshotName: res.SnapshotName,
Err: nil,
SupportsTIDScans: pgversion >= shared.POSTGRES_13,
}
signal.SlotCreated <- slotDetails
c.logger.Info("Waiting for clone to complete")
<-signal.CloneComplete
c.logger.Info("Clone complete")
}, nil
} else {
c.logger.Info(fmt.Sprintf("Replication slot '%s' already exists", slot))
slotDetails := SlotCreationResult{
SlotName: slot,
SnapshotName: "",
Err: nil,
SupportsTIDScans: false,
}
var err error
if doInitialCopy {
slotDetails.Err = ErrSlotAlreadyExists
err = ErrSlotAlreadyExists
}
signal.SlotCreated <- slotDetails
return model.SetupReplicationResult{SlotName: slot}, err
}
}

Expand Down
19 changes: 9 additions & 10 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (c *PostgresConnector) fetchCustomTypeMapping(ctx context.Context) (map[uin
}

func (c *PostgresConnector) CreateReplConn(ctx context.Context) (*pgx.Conn, error) {
// create a separate connection pool for non-replication queries as replication connections cannot
// create a separate connection for non-replication queries as replication connections cannot
// be used for extended query protocol, i.e. prepared statements
replConfig, err := pgx.ParseConfig(c.connStr)
if err != nil {
Expand Down Expand Up @@ -1090,13 +1090,13 @@ func (c *PostgresConnector) FinishExport(tx any) error {
return pgtx.Commit(timeout)
}

// SetupReplication sets up replication for the source connector.
func (c *PostgresConnector) SetupReplication(ctx context.Context, signal SlotSignal, req *protos.SetupReplicationInput) {
// SetupReplication sets up replication for the source connector
func (c *PostgresConnector) SetupReplication(
ctx context.Context,
req *protos.SetupReplicationInput,
) (model.SetupReplicationResult, error) {
if !shared.IsValidReplicationName(req.FlowJobName) {
signal.SlotCreated <- SlotCreationResult{
Err: fmt.Errorf("invalid flow job name: `%s`, it should be ^[a-z_][a-z0-9_]*$", req.FlowJobName),
}
return
return model.SetupReplicationResult{}, fmt.Errorf("invalid flow job name: `%s`, it should be ^[a-z_][a-z0-9_]*$", req.FlowJobName)
}

// Slotname would be the job name prefixed with "peerflow_slot_"
Expand All @@ -1113,8 +1113,7 @@ func (c *PostgresConnector) SetupReplication(ctx context.Context, signal SlotSig
// Check if the replication slot and publication exist
exists, err := c.checkSlotAndPublication(ctx, slotName, publicationName)
if err != nil {
signal.SlotCreated <- SlotCreationResult{Err: err}
return
return model.SetupReplicationResult{}, err
}

tableNameMapping := make(map[string]model.NameAndExclude, len(req.TableNameMapping))
Expand All @@ -1125,7 +1124,7 @@ func (c *PostgresConnector) SetupReplication(ctx context.Context, signal SlotSig
}
}
// Create the replication slot and publication
c.createSlotAndPublication(ctx, signal, exists, slotName, publicationName, tableNameMapping, req.DoInitialSnapshot)
return c.createSlotAndPublication(ctx, exists, slotName, publicationName, tableNameMapping, req.DoInitialSnapshot)
}

func (c *PostgresConnector) PullFlowCleanup(ctx context.Context, jobName string) error {
Expand Down
24 changes: 0 additions & 24 deletions flow/connectors/postgres/slot_signal.go

This file was deleted.

Loading
Loading