diff --git a/CHANGELOG.md b/CHANGELOG.md index a02514f..d52904f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,44 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## Unreleased +## v4.0.0-rc.1 + +### Fixes + +* Fix an issue preventing the `setup` command from running on a clickhouse backend because of the reorg settings. + +## v4.0.0-beta + +### Highlights + +* This release brings support for managing reorgs in Postgres database, enabled by default when `--undo-buffer-size` to 0. + +### Breaking changes + +* A change in your SQL schema may be required to keep existing substreams:SQL integrations working: + * The presence of a primary key (single key or composite) is now *MANDATORY* on every table. + * The `sf.substreams.sink.database.v1.TableChange` message, generated inside substreams, must now exactly match its primary key with the one in the SQL schema. + * You will need to re-run `setup` on your existing PostgreSQL databases to add the `substreams_history` table. You can use the new `--system-tables-only` flag to perform only that. + +* Since reorgs management is not yet supported on Clickhouse, users will have to set `--undo-buffer-size` to a non-zero value (`12` was the previous default) + +## Protodefs v1.0.4 + +* Added support for `rest_frontend` field with `enabled` boolean flag, aimed at this backend implementation: https://github.com/semiotic-ai/sql-wrapper + +## v3.0.5 + +* Fixed regression: `run` command was incorrectly only processing blocks staying behind the "FinalBlocks" cliff. + +## v3.0.4 + +* Fixed support for tables with primary keys misaligned with `database_changes`'s keys (fixing Clickhouse use case) + +## Protodefs v1.0.3 + +* Added support for selecting engine `postgres` or `clickhouse` to sinkconfig protobuf definition + +## v3.0.3 * Fixed missing uint8 and uint16 cast for clickhouse driver conversion * Bump version of `schema` dependency to fix errors with Clickhouse 23.9 enums being updated to String representation and not numbers. diff --git a/cmd/substreams-sink-sql/common_flags.go b/cmd/substreams-sink-sql/common_flags.go index 19e490c..5c63407 100644 --- a/cmd/substreams-sink-sql/common_flags.go +++ b/cmd/substreams-sink-sql/common_flags.go @@ -58,24 +58,22 @@ func newDBLoader( cmd *cobra.Command, psqlDSN string, flushInterval time.Duration, + handleReorgs bool, ) (*db.Loader, error) { moduleMismatchMode, err := db.ParseOnModuleHashMismatch(sflags.MustGetString(cmd, onModuleHashMistmatchFlag)) cli.NoError(err, "invalid mistmatch mode") - dbLoader, err := db.NewLoader(psqlDSN, flushInterval, moduleMismatchMode, zlog, tracer) + dbLoader, err := db.NewLoader(psqlDSN, flushInterval, moduleMismatchMode, &handleReorgs, zlog, tracer) if err != nil { return nil, fmt.Errorf("new psql loader: %w", err) } if err := dbLoader.LoadTables(); err != nil { - var e *db.CursorError + var e *db.SystemTableError if errors.As(err, &e) { - fmt.Printf("Error validating the cursors table: %s\n", e) - fmt.Println("You can use the following sql schema to create a cursors table") - fmt.Println() - fmt.Println(dbLoader.GetCreateCursorsTableSQL(false)) - fmt.Println() - return nil, fmt.Errorf("invalid cursors table") + fmt.Printf("Error validating the system table: %s\n", e) + fmt.Println("Did you run setup ?") + return nil, e } return nil, fmt.Errorf("load psql table: %w", err) diff --git a/cmd/substreams-sink-sql/generate_csv.go b/cmd/substreams-sink-sql/generate_csv.go index 963b9af..8aec4a4 100644 --- a/cmd/substreams-sink-sql/generate_csv.go +++ b/cmd/substreams-sink-sql/generate_csv.go @@ -100,7 +100,7 @@ func generateCsvE(cmd *cobra.Command, args []string) error { return fmt.Errorf("new base sinker: %w", err) } - dbLoader, err := newDBLoader(cmd, dsn, 0) // flush interval not used in CSV mode + dbLoader, err := newDBLoader(cmd, dsn, 0, false) // flush interval not used in CSV mode if err != nil { return fmt.Errorf("new db loader: %w", err) } diff --git a/cmd/substreams-sink-sql/run.go b/cmd/substreams-sink-sql/run.go index f60132f..701985f 100644 --- a/cmd/substreams-sink-sql/run.go +++ b/cmd/substreams-sink-sql/run.go @@ -13,14 +13,21 @@ import ( "github.com/streamingfast/substreams/manifest" ) +type ignoreUndoBufferSize struct{} + +func (i ignoreUndoBufferSize) IsIgnored(in string) bool { + return in == "undo-buffer-size" +} + var sinkRunCmd = Command(sinkRunE, "run [:]", "Runs SQL sink process", RangeArgs(2, 3), Flags(func(flags *pflag.FlagSet) { - sink.AddFlagsToSet(flags) + sink.AddFlagsToSet(flags, ignoreUndoBufferSize{}) AddCommonSinkerFlags(flags) + flags.Int("undo-buffer-size", 0, "If non-zero, handling of reorgs in the database is disabled. Instead, a buffer is introduced to only process a blocks once it has been confirmed by that many blocks, introducing a latency but slightly reducing the load on the database when close to head.") flags.Int("flush-interval", 1000, "When in catch up mode, flush every N blocks") flags.StringP("endpoint", "e", "", "Specify the substreams endpoint, ex: `mainnet.eth.streamingfast.io:443`") }), @@ -54,7 +61,8 @@ func sinkRunE(cmd *cobra.Command, args []string) error { return err } - // "github.com/streamingfast/substreams/manifest" + handleReorgs := sflags.MustGetInt(cmd, "undo-buffer-size") == 0 + sink, err := sink.NewFromViper( cmd, supportedOutputTypes, @@ -64,13 +72,12 @@ func sinkRunE(cmd *cobra.Command, args []string) error { blockRange, zlog, tracer, - sink.WithFinalBlocksOnly(), ) if err != nil { return fmt.Errorf("new base sinker: %w", err) } - dbLoader, err := newDBLoader(cmd, dsn, sflags.MustGetDuration(cmd, "flush-interval")) + dbLoader, err := newDBLoader(cmd, dsn, sflags.MustGetDuration(cmd, "flush-interval"), handleReorgs) if err != nil { return fmt.Errorf("new db loader: %w", err) } diff --git a/cmd/substreams-sink-sql/setup.go b/cmd/substreams-sink-sql/setup.go index 9a1f8d2..ba9c5d5 100644 --- a/cmd/substreams-sink-sql/setup.go +++ b/cmd/substreams-sink-sql/setup.go @@ -19,6 +19,7 @@ var sinkSetupCmd = Command(sinkSetupE, ExactArgs(2), Flags(func(flags *pflag.FlagSet) { flags.Bool("postgraphile", false, "Will append the necessary 'comments' on cursors table to fully support postgraphile") + flags.Bool("system-tables-only", false, "will only create/update the systems tables (cursors, substreams_history) and ignore the schema from the manifest") flags.Bool("ignore-duplicate-table-errors", false, "[Dev] Use this if you want to ignore duplicate table errors, take caution that this means the 'schemal.sql' file will not have run fully!") }), ) @@ -29,6 +30,7 @@ func sinkSetupE(cmd *cobra.Command, args []string) error { dsn := args[0] manifestPath := args[1] ignoreDuplicateTableErrors := sflags.MustGetBool(cmd, "ignore-duplicate-table-errors") + systemTableOnly := sflags.MustGetBool(cmd, "system-tables-only") reader, err := manifest.NewReader(manifestPath) if err != nil { @@ -44,12 +46,17 @@ func sinkSetupE(cmd *cobra.Command, args []string) error { return fmt.Errorf("extract sink config: %w", err) } - dbLoader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchError, zlog, tracer) + dbLoader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchError, nil, zlog, tracer) if err != nil { return fmt.Errorf("new psql loader: %w", err) } - err = dbLoader.SetupFromBytes(ctx, []byte(sinkConfig.Schema), sflags.MustGetBool(cmd, "postgraphile")) + schema := sinkConfig.Schema + if systemTableOnly { + schema = "" + } + + err = dbLoader.Setup(ctx, schema, sflags.MustGetBool(cmd, "postgraphile")) if err != nil { if isDuplicateTableError(err) && ignoreDuplicateTableErrors { zlog.Info("received duplicate table error, script dit not executed succesfully completed") diff --git a/cmd/substreams-sink-sql/tools.go b/cmd/substreams-sink-sql/tools.go index ca38bf1..398228c 100644 --- a/cmd/substreams-sink-sql/tools.go +++ b/cmd/substreams-sink-sql/tools.go @@ -65,7 +65,7 @@ var sinkToolsCmd = Group( ) func toolsReadCursorE(cmd *cobra.Command, _ []string) error { - loader := toolsCreateLoader(true) + loader := toolsCreateLoader() out, err := loader.GetAllCursors(cmd.Context()) cli.NoError(err, "Unable to get all cursors") @@ -83,7 +83,7 @@ func toolsReadCursorE(cmd *cobra.Command, _ []string) error { } func toolsWriteCursorE(cmd *cobra.Command, args []string) error { - loader := toolsCreateLoader(true) + loader := toolsCreateLoader() moduleHash := args[0] opaqueCursor := args[1] @@ -114,7 +114,7 @@ func toolsWriteCursorE(cmd *cobra.Command, args []string) error { } func toolsDeleteCursorE(cmd *cobra.Command, args []string) error { - loader := toolsCreateLoader(true) + loader := toolsCreateLoader() moduleHash := "" if !viper.GetBool("tools-cursor-delete-all") { @@ -143,18 +143,17 @@ func toolsDeleteCursorE(cmd *cobra.Command, args []string) error { return nil } -func toolsCreateLoader(enforceCursorTable bool) *db.Loader { +func toolsCreateLoader() *db.Loader { dsn := viper.GetString("tools-global-dsn") - loader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchIgnore, zlog, tracer) + loader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchIgnore, nil, zlog, tracer) cli.NoError(err, "Unable to instantiate database manager from DSN %q", dsn) if err := loader.LoadTables(); err != nil { - var cursorError *db.CursorError - if errors.As(err, &cursorError) { - if enforceCursorTable { - fmt.Println("It seems the 'cursors' table does not exit on this database, unable to retrieve DB loader") - os.Exit(1) - } + var systemTableError *db.SystemTableError + if errors.As(err, &systemTableError) { + fmt.Printf("Error validating the system table: %s\n", systemTableError) + fmt.Println("Did you run setup ?") + os.Exit(1) } cli.NoError(err, "Unable to load table information from database") diff --git a/db/cursor.go b/db/cursor.go index 439054a..44a1a62 100644 --- a/db/cursor.go +++ b/db/cursor.go @@ -118,7 +118,8 @@ func (l *Loader) InsertCursor(ctx context.Context, moduleHash string, c *sink.Cu // UpdateCursor updates the active cursor. If no cursor is active and no update occurred, returns // ErrCursorNotFound. If the update was not successful on the database, returns an error. // You can use tx=nil to run the query outside of a transaction. -func (l *Loader) UpdateCursor(ctx context.Context, tx *sql.Tx, moduleHash string, c *sink.Cursor) error { +func (l *Loader) UpdateCursor(ctx context.Context, tx Tx, moduleHash string, c *sink.Cursor) error { + l.logger.Debug("updating cursor", zap.String("module_hash", moduleHash), zap.Stringer("cursor", c)) _, err := l.runModifiyQuery(ctx, tx, "update", l.getDialect().GetUpdateCursorQuery( l.cursorTable.identifier, moduleHash, c, c.Block().Num(), c.Block().ID(), )) @@ -152,7 +153,7 @@ type sqlExecutor interface { // // If `tx` is nil, we use `l.DB` as the execution context, so an operations happening outside // a transaction. Otherwise, tx is the execution context. -func (l *Loader) runModifiyQuery(ctx context.Context, tx *sql.Tx, action string, query string) (rowsAffected int64, err error) { +func (l *Loader) runModifiyQuery(ctx context.Context, tx Tx, action string, query string) (rowsAffected int64, err error) { var executor sqlExecutor = l.DB if tx != nil { executor = tx diff --git a/db/db.go b/db/db.go index fd5dcd7..891e36f 100644 --- a/db/db.go +++ b/db/db.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "fmt" - "os" "time" "github.com/jimsmart/schema" @@ -15,6 +14,7 @@ import ( ) const CURSORS_TABLE = "cursors" +const HISTORY_TABLE = "substreams_history" // Make the typing a bit easier type OrderedMap[K comparable, V any] struct { @@ -25,7 +25,7 @@ func NewOrderedMap[K comparable, V any]() *OrderedMap[K, V] { return &OrderedMap[K, V]{OrderedMap: orderedmap.New[K, V]()} } -type CursorError struct { +type SystemTableError struct { error } @@ -39,17 +39,21 @@ type Loader struct { tables map[string]*TableInfo cursorTable *TableInfo + handleReorgs bool flushInterval time.Duration moduleMismatchMode OnModuleHashMismatch logger *zap.Logger tracer logging.Tracer + + testTx *TestTx // used for testing: if non-nil, 'loader.BeginTx()' will return this object instead of a real *sql.Tx } func NewLoader( psqlDsn string, flushInterval time.Duration, moduleMismatchMode OnModuleHashMismatch, + handleReorgs *bool, logger *zap.Logger, tracer logging.Tracer, ) (*Loader, error) { @@ -63,15 +67,6 @@ func NewLoader( return nil, fmt.Errorf("open db connection: %w", err) } - logger.Debug("created new DB loader", - zap.Duration("flush_interval", flushInterval), - zap.String("database", dsn.database), - zap.String("schema", dsn.schema), - zap.String("host", dsn.host), - zap.Int64("port", dsn.port), - zap.Stringer("on_module_hash_mismatch", moduleMismatchMode), - ) - l := &Loader{ DB: db, database: dsn.database, @@ -83,15 +78,55 @@ func NewLoader( logger: logger, tracer: tracer, } - _, err = l.tryDialect() if err != nil { return nil, fmt.Errorf("dialect not found: %s", err) } + if handleReorgs == nil { + // automatic detection + l.handleReorgs = !l.getDialect().OnlyInserts() + } else { + l.handleReorgs = *handleReorgs + } + + if l.handleReorgs && l.getDialect().OnlyInserts() { + return nil, fmt.Errorf("driver %s does not support reorg handling. You must use set a non-zero undo-buffer-size", dsn.driver) + } + + logger.Info("created new DB loader", + zap.Duration("flush_interval", flushInterval), + zap.String("driver", dsn.driver), + zap.String("database", dsn.database), + zap.String("schema", dsn.schema), + zap.String("host", dsn.host), + zap.Int64("port", dsn.port), + zap.Stringer("on_module_hash_mismatch", moduleMismatchMode), + zap.Bool("handle_reorgs", l.handleReorgs), + zap.String("dialect", fmt.Sprintf("%t", l.getDialect())), + ) + return l, nil } +type Tx interface { + Rollback() error + Commit() error + ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) + QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) +} + +func (l *Loader) Begin() (Tx, error) { + return l.BeginTx(context.Background(), nil) +} + +func (l *Loader) BeginTx(ctx context.Context, opts *sql.TxOptions) (Tx, error) { + if l.testTx != nil { + return l.testTx, nil + } + return l.DB.BeginTx(ctx, opts) +} + func (l *Loader) FlushInterval() time.Duration { return l.flushInterval } @@ -103,6 +138,7 @@ func (l *Loader) LoadTables() error { } seenCursorTable := false + seenHistoryTable := false for schemaTableName, columns := range schemaTables { schemaName := schemaTableName[0] tableName := schemaTableName[1] @@ -122,6 +158,9 @@ func (l *Loader) LoadTables() error { seenCursorTable = true } + if tableName == HISTORY_TABLE { + seenHistoryTable = true + } columnByName := make(map[string]*ColumnInfo, len(columns)) for _, f := range columns { @@ -145,8 +184,12 @@ func (l *Loader) LoadTables() error { } if !seenCursorTable { - return &CursorError{fmt.Errorf(`%s.%s table is not found`, EscapeIdentifier(l.schema), CURSORS_TABLE)} + return &SystemTableError{fmt.Errorf(`%s.%s table is not found`, EscapeIdentifier(l.schema), CURSORS_TABLE)} + } + if l.handleReorgs && !seenHistoryTable { + return &SystemTableError{fmt.Errorf("%s.%s table is not found and reorgs handling is enabled.", EscapeIdentifier(l.schema), HISTORY_TABLE)} } + l.cursorTable = l.tables[CURSORS_TABLE] return nil @@ -154,7 +197,7 @@ func (l *Loader) LoadTables() error { func (l *Loader) validateCursorTables(columns []*sql.ColumnType) (err error) { if len(columns) != 4 { - return &CursorError{fmt.Errorf("table requires 4 columns ('id', 'cursor', 'block_num', 'block_id')")} + return &SystemTableError{fmt.Errorf("table requires 4 columns ('id', 'cursor', 'block_num', 'block_id')")} } columnsCheck := map[string]string{ "block_num": "int64", @@ -165,29 +208,29 @@ func (l *Loader) validateCursorTables(columns []*sql.ColumnType) (err error) { for _, f := range columns { columnName := f.Name() if _, found := columnsCheck[columnName]; !found { - return &CursorError{fmt.Errorf("unexpected column %q in cursors table", columnName)} + return &SystemTableError{fmt.Errorf("unexpected column %q in cursors table", columnName)} } expectedType := columnsCheck[columnName] actualType := f.ScanType().Kind().String() if expectedType != actualType { - return &CursorError{fmt.Errorf("column %q has invalid type, expected %q has %q", columnName, expectedType, actualType)} + return &SystemTableError{fmt.Errorf("column %q has invalid type, expected %q has %q", columnName, expectedType, actualType)} } delete(columnsCheck, columnName) } if len(columnsCheck) != 0 { for k := range columnsCheck { - return &CursorError{fmt.Errorf("missing column %q from cursors", k)} + return &SystemTableError{fmt.Errorf("missing column %q from cursors", k)} } } key, err := schema.PrimaryKey(l.DB, l.schema, CURSORS_TABLE) if err != nil { - return &CursorError{fmt.Errorf("failed getting primary key: %w", err)} + return &SystemTableError{fmt.Errorf("failed getting primary key: %w", err)} } if len(key) == 0 { - return &CursorError{fmt.Errorf("primary key not found: %w", err)} + return &SystemTableError{fmt.Errorf("primary key not found: %w", err)} } if key[0] != "id" { - return &CursorError{fmt.Errorf("column 'id' should be primary key not %q", key[0])} + return &SystemTableError{fmt.Errorf("column 'id' should be primary key not %q", key[0])} } return nil } @@ -238,44 +281,39 @@ func (l *Loader) MarshalLogObject(encoder zapcore.ObjectEncoder) error { return nil } -// Setup creates the schema and the cursors table where the is a local file -// on disk. -func (l *Loader) Setup(ctx context.Context, schemaFile string, withPostgraphile bool) error { - b, err := os.ReadFile(schemaFile) - if err != nil { - return fmt.Errorf("read schema file: %w", err) - } - - return l.SetupFromBytes(ctx, b, withPostgraphile) -} - -// SetupFromBytes creates the schema and the cursors table where the is a byte array +// Setup creates the schema, cursors and history table where the is a byte array // taken from somewhere. -func (l *Loader) SetupFromBytes(ctx context.Context, schemaBytes []byte, withPostgraphile bool) error { - schemaSql := string(schemaBytes) - if err := l.getDialect().ExecuteSetupScript(ctx, l, schemaSql); err != nil { - return fmt.Errorf("exec schema: %w", err) +func (l *Loader) Setup(ctx context.Context, schemaSql string, withPostgraphile bool) error { + if schemaSql != "" { + if err := l.getDialect().ExecuteSetupScript(ctx, l, schemaSql); err != nil { + return fmt.Errorf("exec schema: %w", err) + } } if err := l.setupCursorTable(ctx, withPostgraphile); err != nil { return fmt.Errorf("setup cursor table: %w", err) } + if err := l.setupHistoryTable(ctx, withPostgraphile); err != nil { + return fmt.Errorf("setup history table: %w", err) + } + return nil } func (l *Loader) setupCursorTable(ctx context.Context, withPostgraphile bool) error { - _, err := l.ExecContext(ctx, l.GetCreateCursorsTableSQL(withPostgraphile)) - - if err != nil { - return fmt.Errorf("creating cursor table: %w", err) - } - - return nil + query := l.getDialect().GetCreateCursorQuery(l.schema, withPostgraphile) + _, err := l.ExecContext(ctx, query) + return err } -func (l *Loader) GetCreateCursorsTableSQL(withPostgraphile bool) string { - return l.getDialect().GetCreateCursorQuery(l.schema, withPostgraphile) +func (l *Loader) setupHistoryTable(ctx context.Context, withPostgraphile bool) error { + if l.getDialect().OnlyInserts() { + return nil + } + query := l.getDialect().GetCreateHistoryQuery(l.schema, withPostgraphile) + _, err := l.ExecContext(ctx, query) + return err } func (l *Loader) getDialect() dialect { diff --git a/db/dialect.go b/db/dialect.go index 1014c5b..ea1dae0 100644 --- a/db/dialect.go +++ b/db/dialect.go @@ -2,7 +2,6 @@ package db import ( "context" - "database/sql" "fmt" sink "github.com/streamingfast/substreams-sink" @@ -19,11 +18,13 @@ func (e UnknownDriverError) Error() string { type dialect interface { GetCreateCursorQuery(schema string, withPostgraphile bool) string + GetCreateHistoryQuery(schema string, withPostgraphile bool) string ExecuteSetupScript(ctx context.Context, l *Loader, schemaSql string) error DriverSupportRowsAffected() bool GetUpdateCursorQuery(table, moduleHash string, cursor *sink.Cursor, block_num uint64, block_id string) string ParseDatetimeNormalization(value string) string - Flush(tx *sql.Tx, ctx context.Context, l *Loader, outputModuleHash string, cursor *sink.Cursor) (int, error) + Flush(tx Tx, ctx context.Context, l *Loader, outputModuleHash string, lastFinalBlock uint64) (int, error) + Revert(tx Tx, ctx context.Context, l *Loader, lastValidFinalBlock uint64) error OnlyInserts() bool } diff --git a/db/dialect_clickhouse.go b/db/dialect_clickhouse.go index 7070dcc..cccad24 100644 --- a/db/dialect_clickhouse.go +++ b/db/dialect_clickhouse.go @@ -2,7 +2,6 @@ package db import ( "context" - "database/sql" "fmt" "math/big" "reflect" @@ -23,7 +22,7 @@ type clickhouseDialect struct{} // Clickhouse should be used to insert a lot of data in batches. The current official clickhouse // driver doesn't support Transactions for multiple tables. The only way to add in batches is // creating a transaction for a table, adding all rows and commiting it. -func (d clickhouseDialect) Flush(tx *sql.Tx, ctx context.Context, l *Loader, outputModuleHash string, cursor *sink.Cursor) (int, error) { +func (d clickhouseDialect) Flush(tx Tx, ctx context.Context, l *Loader, outputModuleHash string, lastFinalBlock uint64) (int, error) { var entryCount int for entriesPair := l.entries.Oldest(); entriesPair != nil; entriesPair = entriesPair.Next() { tableName := entriesPair.Key @@ -81,6 +80,10 @@ func (d clickhouseDialect) Flush(tx *sql.Tx, ctx context.Context, l *Loader, out return entryCount, nil } +func (d clickhouseDialect) Revert(tx Tx, ctx context.Context, l *Loader, lastValidFinalBlock uint64) error { + return fmt.Errorf("clickhouse driver does not support reorg management.") +} + func (d clickhouseDialect) GetCreateCursorQuery(schema string, withPostgraphile bool) string { _ = withPostgraphile // TODO: see if this can work return fmt.Sprintf(cli.Dedent(` @@ -91,7 +94,11 @@ func (d clickhouseDialect) GetCreateCursorQuery(schema string, withPostgraphile block_num Int64, block_id String ) Engine = ReplacingMergeTree() ORDER BY id; - `), EscapeIdentifier(schema), EscapeIdentifier("cursors")) + `), EscapeIdentifier(schema), EscapeIdentifier(CURSORS_TABLE)) +} + +func (d clickhouseDialect) GetCreateHistoryQuery(schema string, withPostgraphile bool) string { + panic("clickhouse does not support reorg management") } func (d clickhouseDialect) ExecuteSetupScript(ctx context.Context, l *Loader, schemaSql string) error { @@ -136,7 +143,7 @@ func convertOpToClickhouseValues(o *Operation) ([]any, error) { for i, v := range columns { convertedType, err := convertToType(o.data[v], o.table.columnsByName[v].scanType) if err != nil { - return nil, fmt.Errorf("converting value %q to type %q: %w", o.data[v], o.table.columnsByName[v].scanType, err) + return nil, fmt.Errorf("converting value %q to type %q in column %q: %w", o.data[v], o.table.columnsByName[v].scanType, v, err) } values[i] = convertedType } diff --git a/db/dialect_postgres.go b/db/dialect_postgres.go index 3b2599a..426817d 100644 --- a/db/dialect_postgres.go +++ b/db/dialect_postgres.go @@ -3,8 +3,10 @@ package db import ( "context" "database/sql" + "encoding/json" "fmt" "reflect" + "sort" "strconv" "strings" "time" @@ -17,7 +19,53 @@ import ( type postgresDialect struct{} -func (d postgresDialect) Flush(tx *sql.Tx, ctx context.Context, l *Loader, outputModuleHash string, cursor *sink.Cursor) (int, error) { +func (d postgresDialect) Revert(tx Tx, ctx context.Context, l *Loader, lastValidFinalBlock uint64) error { + query := fmt.Sprintf(`SELECT op,table_name,pk,prev_value,block_num FROM %s WHERE "block_num" > %d ORDER BY "block_num" DESC`, + d.historyTable(l.schema), + lastValidFinalBlock, + ) + + rows, err := tx.QueryContext(ctx, query) + if err != nil { + return err + } + + l.logger.Info("reverting forked block block(s)", zap.Uint64("last_valid_final_block", lastValidFinalBlock)) + if rows != nil { // rows will be nil with no error only in testing scenarios + defer rows.Close() + for rows.Next() { + var op string + var table_name string + var pk string + var prev_value_nullable sql.NullString + var block_num uint64 + if err := rows.Scan(&op, &table_name, &pk, &prev_value_nullable, &block_num); err != nil { + return fmt.Errorf("scanning row: %w", err) + } + l.logger.Debug("reverting", zap.String("operation", op), zap.String("table_name", table_name), zap.String("pk", pk), zap.Uint64("block_num", block_num)) + prev_value := prev_value_nullable.String + + if err := d.revertOp(tx, ctx, op, table_name, pk, prev_value, block_num); err != nil { + return fmt.Errorf("revertOp: %w", err) + } + } + if err := rows.Err(); err != nil { + return fmt.Errorf("iterating on rows from query %q: %w", query, err) + } + } + pruneHistory := fmt.Sprintf(`DELETE FROM %s WHERE "block_num" > %d;`, + d.historyTable(l.schema), + lastValidFinalBlock, + ) + + _, err = tx.ExecContext(ctx, pruneHistory) + if err != nil { + return fmt.Errorf("executing pruneHistory: %w", err) + } + return nil +} + +func (d postgresDialect) Flush(tx Tx, ctx context.Context, l *Loader, outputModuleHash string, lastFinalBlock uint64) (int, error) { var rowCount int for entriesPair := l.entries.Oldest(); entriesPair != nil; entriesPair = entriesPair.Next() { tableName := entriesPair.Key @@ -29,7 +77,7 @@ func (d postgresDialect) Flush(tx *sql.Tx, ctx context.Context, l *Loader, outpu for entryPair := entries.Oldest(); entryPair != nil; entryPair = entryPair.Next() { entry := entryPair.Value - query, err := d.prepareStatement(entry) + query, err := d.prepareStatement(l.schema, entry) if err != nil { return 0, fmt.Errorf("failed to prepare statement: %w", err) } @@ -45,9 +93,85 @@ func (d postgresDialect) Flush(tx *sql.Tx, ctx context.Context, l *Loader, outpu rowCount += entries.Len() } + if err := d.pruneReversibleSegment(tx, ctx, l.schema, lastFinalBlock); err != nil { + return 0, err + } + return rowCount, nil } +func (d postgresDialect) revertOp(tx Tx, ctx context.Context, op, escaped_table_name, pk, prev_value string, block_num uint64) error { + + pkmap := make(map[string]string) + if err := json.Unmarshal([]byte(pk), &pkmap); err != nil { + return fmt.Errorf("revertOp: unmarshalling %q: %w", pk, err) + } + switch op { + case "I": + query := fmt.Sprintf(`DELETE FROM %s WHERE %s;`, + escaped_table_name, + getPrimaryKeyWhereClause(pkmap), + ) + if _, err := tx.ExecContext(ctx, query); err != nil { + return fmt.Errorf("executing revert query %q: %w", query, err) + } + case "D": + query := fmt.Sprintf(`INSERT INTO %s SELECT * FROM json_populate_record(null::%s,%s);`, + escaped_table_name, + escaped_table_name, + escapeStringValue(prev_value), + ) + if _, err := tx.ExecContext(ctx, query); err != nil { + return fmt.Errorf("executing revert query %q: %w", query, err) + } + + case "U": + columns, err := sqlColumnNamesFromJSON(prev_value) + if err != nil { + return err + } + + query := fmt.Sprintf(`UPDATE %s SET(%s)=((SELECT %s FROM json_populate_record(null::%s,%s))) WHERE %s;`, + escaped_table_name, + columns, + columns, + escaped_table_name, + escapeStringValue(prev_value), + getPrimaryKeyWhereClause(pkmap), + ) + if _, err := tx.ExecContext(ctx, query); err != nil { + return fmt.Errorf("executing revert query %q: %w", query, err) + } + default: + panic("invalid op in revert command") + } + return nil +} + +func sqlColumnNamesFromJSON(in string) (string, error) { + valueMap := make(map[string]interface{}) + if err := json.Unmarshal([]byte(in), &valueMap); err != nil { + return "", fmt.Errorf("unmarshalling %q into valueMap: %w", in, err) + } + escapedNames := make([]string, len(valueMap)) + i := 0 + for k := range valueMap { + escapedNames[i] = EscapeIdentifier(k) + i++ + } + sort.Strings(escapedNames) + + return strings.Join(escapedNames, ","), nil +} + +func (d postgresDialect) pruneReversibleSegment(tx Tx, ctx context.Context, schema string, highestFinalBlock uint64) error { + query := fmt.Sprintf(`DELETE FROM %s WHERE block_num <= %d;`, d.historyTable(schema), highestFinalBlock) + if _, err := tx.ExecContext(ctx, query); err != nil { + return fmt.Errorf("executing prune query %q: %w", query, err) + } + return nil +} + func (d postgresDialect) GetCreateCursorQuery(schema string, withPostgraphile bool) string { out := fmt.Sprintf(cli.Dedent(` create table if not exists %s.%s @@ -57,10 +181,31 @@ func (d postgresDialect) GetCreateCursorQuery(schema string, withPostgraphile bo block_num bigint, block_id text ); - `), EscapeIdentifier(schema), EscapeIdentifier("cursors")) + `), EscapeIdentifier(schema), EscapeIdentifier(CURSORS_TABLE)) + if withPostgraphile { + out += fmt.Sprintf("COMMENT ON TABLE %s.%s IS E'@omit';", + EscapeIdentifier(schema), EscapeIdentifier(CURSORS_TABLE)) + } + return out +} + +func (d postgresDialect) GetCreateHistoryQuery(schema string, withPostgraphile bool) string { + out := fmt.Sprintf(cli.Dedent(` + create table if not exists %s + ( + id SERIAL PRIMARY KEY, + op char, + table_name text, + pk text, + prev_value text, + block_num bigint + ); + `), + d.historyTable(schema), + ) if withPostgraphile { out += fmt.Sprintf("COMMENT ON TABLE %s.%s IS E'@omit';", - EscapeIdentifier(schema), EscapeIdentifier("cursors")) + EscapeIdentifier(schema), EscapeIdentifier(HISTORY_TABLE)) } return out } @@ -90,7 +235,40 @@ func (d postgresDialect) OnlyInserts() bool { return false } -func (d *postgresDialect) prepareStatement(o *Operation) (string, error) { +func (d postgresDialect) historyTable(schema string) string { + return fmt.Sprintf("%s.%s", EscapeIdentifier(schema), EscapeIdentifier("substreams_history")) +} + +func (d postgresDialect) saveInsert(schema string, table string, primaryKey map[string]string, blockNum uint64) string { + return fmt.Sprintf(`INSERT INTO %s (op,table_name,pk,block_num) values (%s,%s,%s,%d);`, + d.historyTable(schema), + escapeStringValue("I"), + escapeStringValue(table), + escapeStringValue(primaryKeyToJSON(primaryKey)), + blockNum, + ) +} + +func (d postgresDialect) saveUpdate(schema string, escapedTableName string, primaryKey map[string]string, blockNum uint64) string { + return d.saveRow("U", schema, escapedTableName, primaryKey, blockNum) +} + +func (d postgresDialect) saveDelete(schema string, escapedTableName string, primaryKey map[string]string, blockNum uint64) string { + return d.saveRow("D", schema, escapedTableName, primaryKey, blockNum) +} + +func (d postgresDialect) saveRow(op, schema, escapedTableName string, primaryKey map[string]string, blockNum uint64) string { + schemaAndTable := fmt.Sprintf("%s.%s", EscapeIdentifier(schema), escapedTableName) + return fmt.Sprintf(`INSERT INTO %s (op,table_name,pk,prev_value,block_num) SELECT %s,%s,%s,row_to_json(%s),%d FROM %s.%s WHERE %s;`, + d.historyTable(schema), + escapeStringValue(op), escapeStringValue(schemaAndTable), escapeStringValue(primaryKeyToJSON(primaryKey)), escapedTableName, blockNum, + EscapeIdentifier(schema), escapedTableName, + getPrimaryKeyWhereClause(primaryKey), + ) + +} + +func (d *postgresDialect) prepareStatement(schema string, o *Operation) (string, error) { var columns, values []string if o.opType == OperationTypeInsert || o.opType == OperationTypeUpdate { var err error @@ -109,11 +287,15 @@ func (d *postgresDialect) prepareStatement(o *Operation) (string, error) { switch o.opType { case OperationTypeInsert: - return fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", + insertQuery := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s);", o.table.identifier, strings.Join(columns, ","), strings.Join(values, ","), - ), nil + ) + if o.reversibleBlockNum != nil { + return d.saveInsert(schema, o.table.identifier, o.primaryKey, *o.reversibleBlockNum) + insertQuery, nil + } + return insertQuery, nil case OperationTypeUpdate: updates := make([]string, len(columns)) @@ -122,18 +304,28 @@ func (d *postgresDialect) prepareStatement(o *Operation) (string, error) { } primaryKeySelector := getPrimaryKeyWhereClause(o.primaryKey) - return fmt.Sprintf("UPDATE %s SET %s WHERE %s", + + updateQuery := fmt.Sprintf("UPDATE %s SET %s WHERE %s", o.table.identifier, strings.Join(updates, ", "), primaryKeySelector, - ), nil + ) + + if o.reversibleBlockNum != nil { + return d.saveUpdate(schema, o.table.nameEscaped, o.primaryKey, *o.reversibleBlockNum) + updateQuery, nil + } + return updateQuery, nil case OperationTypeDelete: primaryKeyWhereClause := getPrimaryKeyWhereClause(o.primaryKey) - return fmt.Sprintf("DELETE FROM %s WHERE %s", + deleteQuery := fmt.Sprintf("DELETE FROM %s WHERE %s", o.table.identifier, primaryKeyWhereClause, - ), nil + ) + if o.reversibleBlockNum != nil { + return d.saveDelete(schema, o.table.nameEscaped, o.primaryKey, *o.reversibleBlockNum) + deleteQuery, nil + } + return deleteQuery, nil default: panic(fmt.Errorf("unknown operation type %q", o.opType)) @@ -149,7 +341,14 @@ func (d *postgresDialect) prepareColValues(table *TableInfo, colValues map[strin values = make([]string, len(colValues)) i := 0 - for columnName, value := range colValues { + for colName := range colValues { + columns[i] = colName + i++ + } + sort.Strings(columns) // sorted for determinism in tests + + for i, columnName := range columns { + value := colValues[columnName] columnInfo, found := table.columnsByName[columnName] if !found { return nil, nil, fmt.Errorf("cannot find column %q for table %q (valid columns are %q)", columnName, table.identifier, strings.Join(maps.Keys(table.columnsByName), ", ")) @@ -160,10 +359,8 @@ func (d *postgresDialect) prepareColValues(table *TableInfo, colValues map[strin return nil, nil, fmt.Errorf("getting sql value from table %s for column %q raw value %q: %w", table.identifier, columnName, value, err) } - columns[i] = columnInfo.escapedName values[i] = normalizedValue - - i++ + columns[i] = columnInfo.escapedName // escape the column name } return } @@ -180,6 +377,7 @@ func getPrimaryKeyWhereClause(primaryKey map[string]string) string { for key, value := range primaryKey { reg = append(reg, EscapeIdentifier(key)+" = "+escapeStringValue(value)) } + sort.Strings(reg) return strings.Join(reg[:], " AND ") } diff --git a/db/dialect_postgres_test.go b/db/dialect_postgres_test.go new file mode 100644 index 0000000..07a82a1 --- /dev/null +++ b/db/dialect_postgres_test.go @@ -0,0 +1,156 @@ +package db + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPrimaryKeyToJSON(t *testing.T) { + + tests := []struct { + name string + keys map[string]string + expect string + }{ + { + name: "single key", + keys: map[string]string{ + "id": "0xdeadbeef", + }, + expect: `{"id":"0xdeadbeef"}`, + }, + { + name: "two keys", + keys: map[string]string{ + "hash": "0xdeadbeef", + "idx": "5", + }, + expect: `{"hash":"0xdeadbeef","idx":"5"}`, + }, + { + name: "determinism", + keys: map[string]string{ + "bbb": "1", + "ccc": "2", + "aaa": "3", + "ddd": "4", + }, + expect: `{"aaa":"3","bbb":"1","ccc":"2","ddd":"4"}`, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + jsonKey := primaryKeyToJSON(test.keys) + assert.Equal(t, test.expect, jsonKey) + }) + } + +} + +func TestJSONToPrimaryKey(t *testing.T) { + + tests := []struct { + name string + in string + expect map[string]string + }{ + { + name: "single key", + in: `{"id":"0xdeadbeef"}`, + expect: map[string]string{ + "id": "0xdeadbeef", + }, + }, + { + name: "two keys", + in: `{"hash":"0xdeadbeef","idx":"5"}`, + expect: map[string]string{ + "hash": "0xdeadbeef", + "idx": "5", + }, + }, + { + name: "determinism", + in: `{"aaa":"3","bbb":"1","ccc":"2","ddd":"4"}`, + expect: map[string]string{ + "bbb": "1", + "ccc": "2", + "aaa": "3", + "ddd": "4", + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + out, err := jsonToPrimaryKey(test.in) + require.NoError(t, err) + assert.Equal(t, test.expect, out) + }) + } + +} + +func TestRevertOp(t *testing.T) { + + type row struct { + op string + table_name string + pk string + prev_value string + } + + tests := []struct { + name string + row row + expect string + }{ + { + name: "rollback insert row", + row: row{ + op: "I", + table_name: `"testschema"."xfer"`, + pk: `{"id":"2345"}`, + prev_value: "", // unused + }, + expect: `DELETE FROM "testschema"."xfer" WHERE "id" = '2345';`, + }, + { + name: "rollback delete row", + row: row{ + op: "D", + table_name: `"testschema"."xfer"`, + pk: `{"id":"2345"}`, + prev_value: `{"id":"2345","sender":"0xdead","receiver":"0xbeef"}`, + }, + expect: `INSERT INTO "testschema"."xfer" SELECT * FROM json_populate_record(null::"testschema"."xfer",` + + `'{"id":"2345","sender":"0xdead","receiver":"0xbeef"}');`, + }, + { + name: "rollback update row", + row: row{ + op: "U", + table_name: `"testschema"."xfer"`, + pk: `{"id":"2345"}`, + prev_value: `{"id":"2345","sender":"0xdead","receiver":"0xbeef"}`, + }, + expect: `UPDATE "testschema"."xfer" SET("id","receiver","sender")=((SELECT "id","receiver","sender" FROM json_populate_record(null::"testschema"."xfer",` + + `'{"id":"2345","sender":"0xdead","receiver":"0xbeef"}'))) WHERE "id" = '2345';`, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + tx := &TestTx{} + ctx := context.Background() + pd := postgresDialect{} + + row := test.row + err := pd.revertOp(tx, ctx, row.op, row.table_name, row.pk, row.prev_value, 9999) + require.NoError(t, err) + assert.Equal(t, []string{test.expect}, tx.Results()) + }) + } + +} diff --git a/db/flush.go b/db/flush.go index 3c2b5b4..1da37f8 100644 --- a/db/flush.go +++ b/db/flush.go @@ -10,11 +10,11 @@ import ( "go.uber.org/zap" ) -func (l *Loader) Flush(ctx context.Context, outputModuleHash string, cursor *sink.Cursor) (rowFlushedCount int, err error) { +func (l *Loader) Flush(ctx context.Context, outputModuleHash string, cursor *sink.Cursor, lastFinalBlock uint64) (rowFlushedCount int, err error) { ctx = clickhouse.Context(context.Background(), clickhouse.WithStdAsync(false)) startAt := time.Now() - tx, err := l.DB.BeginTx(ctx, nil) + tx, err := l.BeginTx(ctx, nil) if err != nil { return 0, fmt.Errorf("failed to being db transaction: %w", err) } @@ -26,7 +26,7 @@ func (l *Loader) Flush(ctx context.Context, outputModuleHash string, cursor *sin } }() - rowFlushedCount, err = l.getDialect().Flush(tx, ctx, l, outputModuleHash, cursor) + rowFlushedCount, err = l.getDialect().Flush(tx, ctx, l, outputModuleHash, lastFinalBlock) if err != nil { return 0, fmt.Errorf("dialect flush: %w", err) } @@ -46,6 +46,35 @@ func (l *Loader) Flush(ctx context.Context, outputModuleHash string, cursor *sin return rowFlushedCount, nil } +func (l *Loader) Revert(ctx context.Context, outputModuleHash string, cursor *sink.Cursor, lastValidBlock uint64) error { + tx, err := l.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("failed to being db transaction: %w", err) + } + defer func() { + if err != nil { + if err := tx.Rollback(); err != nil { + l.logger.Warn("failed to rollback transaction", zap.Error(err)) + } + } + }() + + if err := l.getDialect().Revert(tx, ctx, l, lastValidBlock); err != nil { + return err + } + + if err := l.UpdateCursor(ctx, tx, outputModuleHash, cursor); err != nil { + return fmt.Errorf("update cursor after revert: %w", err) + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("failed to commit db transaction: %w", err) + } + + l.logger.Debug("reverted changes to database", zap.Uint64("last_valid_block", lastValidBlock)) + return nil +} + func (l *Loader) reset() { for entriesPair := l.entries.Oldest(); entriesPair != nil; entriesPair = entriesPair.Next() { l.entries.Set(entriesPair.Key, NewOrderedMap[string, *Operation]()) diff --git a/db/operations.go b/db/operations.go index 5acb356..34302c7 100644 --- a/db/operations.go +++ b/db/operations.go @@ -1,6 +1,7 @@ package db import ( + "encoding/json" "fmt" "reflect" "regexp" @@ -23,39 +24,43 @@ const ( ) type Operation struct { - table *TableInfo - opType OperationType - primaryKey map[string]string - data map[string]string + table *TableInfo + opType OperationType + primaryKey map[string]string + data map[string]string + reversibleBlockNum *uint64 // nil if that block is known to be irreversible } func (o *Operation) String() string { return fmt.Sprintf("%s/%s (%s)", o.table.identifier, createRowUniqueID(o.primaryKey), strings.ToLower(string(o.opType))) } -func (l *Loader) newInsertOperation(table *TableInfo, primaryKey map[string]string, data map[string]string) *Operation { +func (l *Loader) newInsertOperation(table *TableInfo, primaryKey map[string]string, data map[string]string, reversibleBlockNum *uint64) *Operation { return &Operation{ - table: table, - opType: OperationTypeInsert, - primaryKey: primaryKey, - data: data, + table: table, + opType: OperationTypeInsert, + primaryKey: primaryKey, + data: data, + reversibleBlockNum: reversibleBlockNum, } } -func (l *Loader) newUpdateOperation(table *TableInfo, primaryKey map[string]string, data map[string]string) *Operation { +func (l *Loader) newUpdateOperation(table *TableInfo, primaryKey map[string]string, data map[string]string, reversibleBlockNum *uint64) *Operation { return &Operation{ - table: table, - opType: OperationTypeUpdate, - primaryKey: primaryKey, - data: data, + table: table, + opType: OperationTypeUpdate, + primaryKey: primaryKey, + data: data, + reversibleBlockNum: reversibleBlockNum, } } -func (l *Loader) newDeleteOperation(table *TableInfo, primaryKey map[string]string) *Operation { +func (l *Loader) newDeleteOperation(table *TableInfo, primaryKey map[string]string, reversibleBlockNum *uint64) *Operation { return &Operation{ - table: table, - opType: OperationTypeDelete, - primaryKey: primaryKey, + table: table, + opType: OperationTypeDelete, + primaryKey: primaryKey, + reversibleBlockNum: reversibleBlockNum, } } @@ -88,3 +93,22 @@ func escapeStringValue(valueToEscape string) string { return `'` + valueToEscape + `'` } + +// to store in an history table +func primaryKeyToJSON(primaryKey map[string]string) string { + m, err := json.Marshal(primaryKey) + if err != nil { + panic(err) // should never happen with map[string]string + } + return string(m) +} + +// to store in an history table +func jsonToPrimaryKey(in string) (map[string]string, error) { + out := make(map[string]string) + err := json.Unmarshal([]byte(in), &out) + if err != nil { + return nil, err + } + return out, nil +} diff --git a/db/operations_test.go b/db/operations_test.go index 268df7a..7fde5b0 100644 --- a/db/operations_test.go +++ b/db/operations_test.go @@ -19,7 +19,7 @@ func TestEscapeColumns(t *testing.T) { t.Skip(`PG_DSN not set, please specify PG_DSN to run this test, example: PG_DSN="psql://dev-node:insecure-change-me-in-prod@localhost:5432/dev-node?enable_incremental_sort=off&sslmode=disable"`) } - dbLoader, err := NewLoader(dsn, 0, OnModuleHashMismatchIgnore, zlog, tracer) + dbLoader, err := NewLoader(dsn, 0, OnModuleHashMismatchIgnore, nil, zlog, tracer) require.NoError(t, err) tx, err := dbLoader.DB.Begin() @@ -68,7 +68,7 @@ func TestEscapeValues(t *testing.T) { t.Skip(`PG_DSN not set, please specify PG_DSN to run this test, example: PG_DSN="psql://dev-node:insecure-change-me-in-prod@localhost:5432/dev-node?enable_incremental_sort=off&sslmode=disable"`) } - dbLoader, err := NewLoader(dsn, 0, OnModuleHashMismatchIgnore, zlog, tracer) + dbLoader, err := NewLoader(dsn, 0, OnModuleHashMismatchIgnore, nil, zlog, tracer) require.NoError(t, err) tx, err := dbLoader.DB.Begin() diff --git a/db/ops.go b/db/ops.go index c4e9416..2922ef0 100644 --- a/db/ops.go +++ b/db/ops.go @@ -10,7 +10,7 @@ import ( // Insert a row in the DB, it is assumed the table exists, you can do a // check before with HasTable() -func (l *Loader) Insert(tableName string, primaryKey map[string]string, data map[string]string) error { +func (l *Loader) Insert(tableName string, primaryKey map[string]string, data map[string]string, reversibleBlockNum *uint64) error { uniqueID := createRowUniqueID(primaryKey) if l.tracer.Enabled() { @@ -40,12 +40,14 @@ func (l *Loader) Insert(tableName string, primaryKey map[string]string, data map l.logger.Debug("primary key entry never existed for table, adding insert operation", zap.String("primary_key", uniqueID), zap.String("table_name", tableName)) } - // We need to make sure to add the primary key(s) in the data so that those column get created correctly + // We need to make sure to add the primary key(s) in the data so that those column get created correctly, but only if there is data for _, primary := range l.tables[tableName].primaryColumns { - data[primary.name] = primaryKey[primary.name] + if dataFromPrimaryKey, ok := primaryKey[primary.name]; ok { + data[primary.name] = dataFromPrimaryKey + } } - entry.Set(uniqueID, l.newInsertOperation(table, primaryKey, data)) + entry.Set(uniqueID, l.newInsertOperation(table, primaryKey, data, reversibleBlockNum)) l.entriesCount++ return nil } @@ -78,23 +80,24 @@ func createRowUniqueID(m map[string]string) string { func (l *Loader) GetPrimaryKey(tableName string, pk string) (map[string]string, error) { primaryKeyColumns := l.tables[tableName].primaryColumns - if len(primaryKeyColumns) > 1 { - return nil, fmt.Errorf("your Substreams sent a primary key but your database definition for table %q is using a composite primary key", tableName) - } - // There can be only 0 or 1 column here as we check above for > 1 and return. - // If there is 0, there is no primary key column in which case we return the - // received primary key as is under and "" (empty) column name. - if len(primaryKeyColumns) == 0 { - return map[string]string{"": pk}, nil + switch len(primaryKeyColumns) { + case 0: + return nil, fmt.Errorf("substreams sent a single primary key, but our sql table has none. This is unsupported.") + case 1: + return map[string]string{primaryKeyColumns[0].name: pk}, nil } - return map[string]string{primaryKeyColumns[0].name: pk}, nil + cols := make([]string, len(primaryKeyColumns)) + for i := range primaryKeyColumns { + cols[i] = primaryKeyColumns[i].name + } + return nil, fmt.Errorf("substreams sent a single primary key, but our sql table has a composite primary key (columns: %s). This is unsupported.", strings.Join(cols, ",")) } // Update a row in the DB, it is assumed the table exists, you can do a // check before with HasTable() -func (l *Loader) Update(tableName string, primaryKey map[string]string, data map[string]string) error { +func (l *Loader) Update(tableName string, primaryKey map[string]string, data map[string]string, reversibleBlockNum *uint64) error { if l.getDialect().OnlyInserts() { return fmt.Errorf("update operation is not supported by the current database") } @@ -143,13 +146,13 @@ func (l *Loader) Update(tableName string, primaryKey map[string]string, data map l.logger.Debug("primary key entry never existed for table, adding update operation", zap.String("primary_key", uniqueID), zap.String("table_name", tableName)) } - entry.Set(uniqueID, l.newUpdateOperation(table, primaryKey, data)) + entry.Set(uniqueID, l.newUpdateOperation(table, primaryKey, data, reversibleBlockNum)) return nil } // Delete a row in the DB, it is assumed the table exists, you can do a // check before with HasTable() -func (l *Loader) Delete(tableName string, primaryKey map[string]string) error { +func (l *Loader) Delete(tableName string, primaryKey map[string]string, reversibleBlockNum *uint64) error { if l.getDialect().OnlyInserts() { return fmt.Errorf("delete operation is not supported by the current database") } @@ -164,7 +167,7 @@ func (l *Loader) Delete(tableName string, primaryKey map[string]string) error { return fmt.Errorf("unknown table %q", tableName) } - if len(table.primaryColumns) == 0 { + if len(table.primaryColumns) != 1 { return fmt.Errorf("trying to perform a DELETE operation but table %q don't have a primary key(s) set, this is not accepted", tableName) } @@ -190,6 +193,6 @@ func (l *Loader) Delete(tableName string, primaryKey map[string]string) error { l.logger.Debug("adding deleting operation", zap.String("primary_key", uniqueID), zap.String("table_name", tableName)) } - entry.Set(uniqueID, l.newDeleteOperation(table, primaryKey)) + entry.Set(uniqueID, l.newDeleteOperation(table, primaryKey, reversibleBlockNum)) return nil } diff --git a/db/ops_test.go b/db/ops_test.go new file mode 100644 index 0000000..5c92729 --- /dev/null +++ b/db/ops_test.go @@ -0,0 +1,65 @@ +package db + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestGetPrimaryKey(t *testing.T) { + tests := []struct { + name string + in []*ColumnInfo + expectOut map[string]string + expectError bool + }{ + { + name: "no primkey error", + expectError: true, + }, + { + name: "more than one primkey error", + in: []*ColumnInfo{ + { + name: "one", + }, + { + name: "two", + }, + }, + expectError: true, + }, + { + name: "single than primkey ok", + in: []*ColumnInfo{ + { + name: "id", + }, + }, + expectOut: map[string]string{ + "id": "testval", + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + l := &Loader{ + tables: map[string]*TableInfo{ + "test": { + primaryColumns: test.in, + }, + }, + } + out, err := l.GetPrimaryKey("test", "testval") + if test.expectError { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, test.expectOut, out) + } + + }) + } + +} diff --git a/db/testing.go b/db/testing.go new file mode 100644 index 0000000..252207a --- /dev/null +++ b/db/testing.go @@ -0,0 +1,91 @@ +package db + +import ( + "context" + "database/sql" + + "github.com/streamingfast/logging" + "go.uber.org/zap" +) + +func NewTestLoader( + zlog *zap.Logger, + tracer logging.Tracer, + schema string, + tables map[string]*TableInfo, +) (*Loader, *TestTx) { + + loader, err := NewLoader("psql://x:5432/x", 0, OnModuleHashMismatchIgnore, nil, zlog, tracer) + if err != nil { + panic(err) + } + loader.testTx = &TestTx{} + loader.tables = tables + loader.schema = schema + loader.cursorTable = tables[CURSORS_TABLE] + return loader, loader.testTx + +} + +func TestTables(schema string) map[string]*TableInfo { + return map[string]*TableInfo{ + "xfer": mustNewTableInfo(schema, "xfer", []string{"id"}, map[string]*ColumnInfo{ + "id": NewColumnInfo("id", "text", ""), + "from": NewColumnInfo("from", "text", ""), + "to": NewColumnInfo("to", "text", ""), + }), + CURSORS_TABLE: mustNewTableInfo(schema, CURSORS_TABLE, []string{"id"}, map[string]*ColumnInfo{ + "block_num": NewColumnInfo("id", "int64", ""), + "block_id": NewColumnInfo("from", "text", ""), + "cursor": NewColumnInfo("cursor", "text", ""), + "id": NewColumnInfo("id", "text", ""), + }), + } +} + +func mustNewTableInfo(schema, name string, pkList []string, columnsByName map[string]*ColumnInfo) *TableInfo { + ti, err := NewTableInfo(schema, name, pkList, columnsByName) + if err != nil { + panic(err) + } + return ti +} + +type TestTx struct { + queries []string + next []*sql.Rows +} + +func (t *TestTx) Rollback() error { + t.queries = append(t.queries, "ROLLBACK") + return nil +} + +func (t *TestTx) Commit() error { + t.queries = append(t.queries, "COMMIT") + return nil +} + +func (t *TestTx) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) { + t.queries = append(t.queries, query) + return &testResult{}, nil +} + +func (t *TestTx) Results() []string { + return t.queries +} + +func (t *TestTx) QueryContext(ctx context.Context, query string, args ...any) (out *sql.Rows, err error) { + t.queries = append(t.queries, query) + return nil, nil +} + +type testResult struct{} + +func (t *testResult) LastInsertId() (int64, error) { + return 0, nil +} + +func (t *testResult) RowsAffected() (int64, error) { + return 1, nil +} diff --git a/db/types.go b/db/types.go index 42cea85..a05d18c 100644 --- a/db/types.go +++ b/db/types.go @@ -17,11 +17,11 @@ import ( type OnModuleHashMismatch uint type TableInfo struct { - schema string - schemaEscaped string - name string - nameEscaped string - columnsByName map[string]*ColumnInfo + schema string + schemaEscaped string + name string + nameEscaped string + columnsByName map[string]*ColumnInfo primaryColumns []*ColumnInfo // Identifier is equivalent to 'escape().escape()' but pre-computed @@ -34,7 +34,7 @@ func NewTableInfo(schema, name string, pkList []string, columnsByName map[string nameEscaped := EscapeIdentifier(name) primaryColumns := make([]*ColumnInfo, len(pkList)) - for i, primaryKeyColumnName := range(pkList) { + for i, primaryKeyColumnName := range pkList { primaryColumn, found := columnsByName[primaryKeyColumnName] if !found { return nil, fmt.Errorf("primary key column %q not found", primaryKeyColumnName) @@ -42,15 +42,18 @@ func NewTableInfo(schema, name string, pkList []string, columnsByName map[string primaryColumns[i] = primaryColumn } + if len(primaryColumns) == 0 { + return nil, fmt.Errorf("sql sink requires a primary key in every table, none was found in table %s.%s", schema, name) + } return &TableInfo{ - schema: schema, - schemaEscaped: schemaEscaped, - name: name, - nameEscaped: nameEscaped, - identifier: schemaEscaped + "." + nameEscaped, + schema: schema, + schemaEscaped: schemaEscaped, + name: name, + nameEscaped: nameEscaped, + identifier: schemaEscaped + "." + nameEscaped, primaryColumns: primaryColumns, - columnsByName: columnsByName, + columnsByName: columnsByName, }, nil } diff --git a/devel/eth-block-meta/schema.sql b/devel/eth-block-meta/schema.sql index 9b67175..0efd184 100644 --- a/devel/eth-block-meta/schema.sql +++ b/devel/eth-block-meta/schema.sql @@ -7,12 +7,3 @@ create table block_meta parent_hash text, timestamp text ); - -create table if not exists "public"."cursors" -( - id text not null constraint cursor_pk primary key, - cursor text, - block_num bigint, - block_id text -); - diff --git a/docs/tutorial/schema-clickhouse.sql b/docs/tutorial/schema-clickhouse.sql new file mode 100644 index 0000000..6fcd402 --- /dev/null +++ b/docs/tutorial/schema-clickhouse.sql @@ -0,0 +1,17 @@ +create table block_meta +( + id text not null constraint block_meta_pk primary key, + at timestamp, + number bigint, + hash text, + parent_hash text, + timestamp timestamp +); + +create table cursors +( + id text not null constraint cursor_pk primary key, + cursor text, + block_num bigint, + block_id text +); \ No newline at end of file diff --git a/docs/tutorial/substreams-clickhouse.yaml b/docs/tutorial/substreams-clickhouse.yaml new file mode 100644 index 0000000..da34169 --- /dev/null +++ b/docs/tutorial/substreams-clickhouse.yaml @@ -0,0 +1,49 @@ +specVersion: v0.1.0 +package: + name: 'substreams_postgresql_sink_tutorial' + version: v0.1.0 + +protobuf: + files: + - block_meta.proto + importPaths: + - ./proto + +imports: + sql: https://github.com/streamingfast/substreams-sink-sql/releases/download/protodefs-v1.0.3/substreams-sink-sql-protodefs-v1.0.3.spkg + blockmeta: https://github.com/streamingfast/substreams-eth-block-meta/releases/download/v0.5.1/substreams-eth-block-meta-v0.5.1.spkg + +binaries: + default: + type: wasm/rust-v1 + file: target/wasm32-unknown-unknown/release/substreams_postgresql_sink_tutorial.wasm + +modules: + - name: store_block_meta_start + kind: store + updatePolicy: set_if_not_exists + valueType: proto:eth.block_meta.v1.BlockMeta + inputs: + - source: sf.ethereum.type.v2.Block + + - name: db_out + kind: map + inputs: + - store: store_block_meta_start + mode: deltas + output: + type: proto:sf.substreams.sink.database.v1.DatabaseChanges + +network: mainnet + +sink: + module: db_out + type: sf.substreams.sink.sql.v1.Service + config: + schema: "./schema.sql" + wire_protocol_access: true + engine: clickhouse + postgraphile_frontend: + enabled: false + pgweb_frontend: + enabled: false diff --git a/docs/tutorial/substreams.yaml b/docs/tutorial/substreams.yaml index 3e36046..22e5df3 100644 --- a/docs/tutorial/substreams.yaml +++ b/docs/tutorial/substreams.yaml @@ -10,7 +10,7 @@ protobuf: - ./proto imports: - sql: https://github.com/streamingfast/substreams-sink-sql/releases/download/protodefs-v1.0.2/substreams-sink-sql-protodefs-v1.0.2.spkg + sql: https://github.com/streamingfast/substreams-sink-sql/releases/download/protodefs-v1.0.3/substreams-sink-sql-protodefs-v1.0.3.spkg blockmeta: https://github.com/streamingfast/substreams-eth-block-meta/releases/download/v0.5.1/substreams-eth-block-meta-v0.5.1.spkg binaries: @@ -42,6 +42,7 @@ sink: config: schema: "./schema.sql" wire_protocol_access: true + engine: postgres postgraphile_frontend: enabled: true pgweb_frontend: diff --git a/go.mod b/go.mod index 026f664..dc6a74e 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.15.0 github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091 - github.com/streamingfast/substreams v1.1.15-0.20231005155216-0f07427759df + github.com/streamingfast/substreams v1.1.18 github.com/streamingfast/substreams-sink v0.3.3-0.20230901183759-218c1d9ec645 github.com/streamingfast/substreams-sink-database-changes v1.1.3 github.com/stretchr/testify v1.8.4 @@ -139,7 +139,7 @@ require ( github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.11.0 // indirect github.com/streamingfast/atm v0.0.0-20220131151839-18c87005e680 // indirect - github.com/streamingfast/bstream v0.0.2-0.20230731165201-639b4f347707 + github.com/streamingfast/bstream v0.0.2-0.20230829131224-b9272048dc6a github.com/streamingfast/cli v0.0.4-0.20230825151644-8cc84512cd80 github.com/streamingfast/dbin v0.0.0-20210809205249-73d5eca35dc5 // indirect github.com/streamingfast/dgrpc v0.0.0-20230929132851-893fc52687fa // indirect diff --git a/go.sum b/go.sum index 07d4fed..5869a4d 100644 --- a/go.sum +++ b/go.sum @@ -1172,8 +1172,8 @@ github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980/go.mod h github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/streamingfast/atm v0.0.0-20220131151839-18c87005e680 h1:fGJnUx0shX9Y312QOlz+/+yLquihXRhNqctJ26jtZZM= github.com/streamingfast/atm v0.0.0-20220131151839-18c87005e680/go.mod h1:iISPGAstbUsPgyC3auLLi7PYUTi9lHv5z0COam0OPOY= -github.com/streamingfast/bstream v0.0.2-0.20230731165201-639b4f347707 h1:hJW+QNNJrR1boQuoEaajlMFjWh0XKt4Fcg33h9hT7Eo= -github.com/streamingfast/bstream v0.0.2-0.20230731165201-639b4f347707/go.mod h1:Njkx972HcZiz0djWBylxqO/eq686eDGr+egQ1lePj3Q= +github.com/streamingfast/bstream v0.0.2-0.20230829131224-b9272048dc6a h1:NeCO5JLz38HRK1uaV1Emo9u5gUSRtmtZZGNK8BKyLIE= +github.com/streamingfast/bstream v0.0.2-0.20230829131224-b9272048dc6a/go.mod h1:Njkx972HcZiz0djWBylxqO/eq686eDGr+egQ1lePj3Q= github.com/streamingfast/cli v0.0.4-0.20230825151644-8cc84512cd80 h1:UxJUTcEVkdZy8N77E3exz0iNlgQuxl4m220GPvzdZ2s= github.com/streamingfast/cli v0.0.4-0.20230825151644-8cc84512cd80/go.mod h1:QxjVH73Lkqk+mP8bndvhMuQDUINfkgsYhdCH/5TJFKI= github.com/streamingfast/dbin v0.0.0-20210809205249-73d5eca35dc5 h1:m/3aIPNXCwZ9m/dfYdOs8ftrS7GJl82ipVr6K2aZiBs= @@ -1200,8 +1200,8 @@ github.com/streamingfast/pbgo v0.0.6-0.20221020131607-255008258d28 h1:wmQg8T0rIF github.com/streamingfast/pbgo v0.0.6-0.20221020131607-255008258d28/go.mod h1:huKwfgTGFIFZMKSVbD5TywClM7zAeBUG/zePZMqvXQQ= github.com/streamingfast/shutter v1.5.0 h1:NpzDYzj0HVpSiDJVO/FFSL6QIK/YKOxY0gJAtyaTOgs= github.com/streamingfast/shutter v1.5.0/go.mod h1:B/T6efqdeMGbGwjzPS1ToXzYZI4kDzI5/u4I+7qbjY8= -github.com/streamingfast/substreams v1.1.15-0.20231005155216-0f07427759df h1:GPV+0Nn3PMoHzIPKAB5W1l4Nabvo/nmWxHq1UDoHVcQ= -github.com/streamingfast/substreams v1.1.15-0.20231005155216-0f07427759df/go.mod h1:fFJ8YYBXhzKTKBcC7vRQU6xZl/9KAfVfzuEB8C9hUVw= +github.com/streamingfast/substreams v1.1.18 h1:XRASHrXeWMOe5D7NXVbi+c9IcFB8hbbtI2oqdivNpE8= +github.com/streamingfast/substreams v1.1.18/go.mod h1:fFJ8YYBXhzKTKBcC7vRQU6xZl/9KAfVfzuEB8C9hUVw= github.com/streamingfast/substreams-sink v0.3.3-0.20230901183759-218c1d9ec645 h1:ZbYLft0R5hJBLhMFAdp7noAD9YrKC+r0nsU7Z5IDcfM= github.com/streamingfast/substreams-sink v0.3.3-0.20230901183759-218c1d9ec645/go.mod h1:nBPwmsjz+CV0HT5Vmp0XTiu+RjP8CbdhD5u+uC3lo84= github.com/streamingfast/substreams-sink-database-changes v1.1.3 h1:rXeGb/V2mjC8FftumRkMQxG2jtdLfHdLx9UQVUtAqS8= diff --git a/pb/last_generate.txt b/pb/last_generate.txt index db55e5b..becb9ed 100644 --- a/pb/last_generate.txt +++ b/pb/last_generate.txt @@ -1 +1 @@ -generate.sh - Thu 21 Sep 2023 16:03:41 EDT - stepd +generate.sh - Tue Nov 7 13:48:37 EST 2023 - colindickson diff --git a/pb/sf/substreams/sink/sql/v1/services.pb.go b/pb/sf/substreams/sink/sql/v1/services.pb.go index c333683..cde7bb3 100644 --- a/pb/sf/substreams/sink/sql/v1/services.pb.go +++ b/pb/sf/substreams/sink/sql/v1/services.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 +// protoc-gen-go v1.27.0 // protoc (unknown) // source: sf/substreams/sink/sql/v1/services.proto @@ -21,6 +21,55 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +type Service_Engine int32 + +const ( + Service_unset Service_Engine = 0 + Service_postgres Service_Engine = 1 + Service_clickhouse Service_Engine = 2 +) + +// Enum value maps for Service_Engine. +var ( + Service_Engine_name = map[int32]string{ + 0: "unset", + 1: "postgres", + 2: "clickhouse", + } + Service_Engine_value = map[string]int32{ + "unset": 0, + "postgres": 1, + "clickhouse": 2, + } +) + +func (x Service_Engine) Enum() *Service_Engine { + p := new(Service_Engine) + *p = x + return p +} + +func (x Service_Engine) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (Service_Engine) Descriptor() protoreflect.EnumDescriptor { + return file_sf_substreams_sink_sql_v1_services_proto_enumTypes[0].Descriptor() +} + +func (Service_Engine) Type() protoreflect.EnumType { + return &file_sf_substreams_sink_sql_v1_services_proto_enumTypes[0] +} + +func (x Service_Engine) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use Service_Engine.Descriptor instead. +func (Service_Engine) EnumDescriptor() ([]byte, []int) { + return file_sf_substreams_sink_sql_v1_services_proto_rawDescGZIP(), []int{0, 0} +} + type Service struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -29,10 +78,10 @@ type Service struct { // Containing both create table statements and index creation statements. Schema string `protobuf:"bytes,1,opt,name=schema,proto3" json:"schema,omitempty"` DbtConfig *DBTConfig `protobuf:"bytes,2,opt,name=dbt_config,json=dbtConfig,proto3,oneof" json:"dbt_config,omitempty"` - WireProtocolAccess bool `protobuf:"varint,3,opt,name=wire_protocol_access,json=wireProtocolAccess,proto3" json:"wire_protocol_access,omitempty"` HasuraFrontend *HasuraFrontend `protobuf:"bytes,4,opt,name=hasura_frontend,json=hasuraFrontend,proto3" json:"hasura_frontend,omitempty"` PostgraphileFrontend *PostgraphileFrontend `protobuf:"bytes,5,opt,name=postgraphile_frontend,json=postgraphileFrontend,proto3" json:"postgraphile_frontend,omitempty"` - PgwebFrontend *PGWebFrontend `protobuf:"bytes,6,opt,name=pgweb_frontend,json=pgwebFrontend,proto3" json:"pgweb_frontend,omitempty"` + Engine Service_Engine `protobuf:"varint,7,opt,name=engine,proto3,enum=sf.substreams.sink.sql.v1.Service_Engine" json:"engine,omitempty"` + RestFrontend *RESTFrontend `protobuf:"bytes,8,opt,name=rest_frontend,json=restFrontend,proto3" json:"rest_frontend,omitempty"` } func (x *Service) Reset() { @@ -81,13 +130,6 @@ func (x *Service) GetDbtConfig() *DBTConfig { return nil } -func (x *Service) GetWireProtocolAccess() bool { - if x != nil { - return x.WireProtocolAccess - } - return false -} - func (x *Service) GetHasuraFrontend() *HasuraFrontend { if x != nil { return x.HasuraFrontend @@ -102,19 +144,29 @@ func (x *Service) GetPostgraphileFrontend() *PostgraphileFrontend { return nil } -func (x *Service) GetPgwebFrontend() *PGWebFrontend { +func (x *Service) GetEngine() Service_Engine { if x != nil { - return x.PgwebFrontend + return x.Engine + } + return Service_unset +} + +func (x *Service) GetRestFrontend() *RESTFrontend { + if x != nil { + return x.RestFrontend } return nil } +// https://www.getdbt.com/product/what-is-dbt type DBTConfig struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Files []byte `protobuf:"bytes,1,opt,name=files,proto3" json:"files,omitempty"` + Files []byte `protobuf:"bytes,1,opt,name=files,proto3" json:"files,omitempty"` + RunIntervalSeconds int32 `protobuf:"varint,2,opt,name=run_interval_seconds,json=runIntervalSeconds,proto3" json:"run_interval_seconds,omitempty"` + Enabled bool `protobuf:"varint,3,opt,name=enabled,proto3" json:"enabled,omitempty"` } func (x *DBTConfig) Reset() { @@ -156,6 +208,21 @@ func (x *DBTConfig) GetFiles() []byte { return nil } +func (x *DBTConfig) GetRunIntervalSeconds() int32 { + if x != nil { + return x.RunIntervalSeconds + } + return 0 +} + +func (x *DBTConfig) GetEnabled() bool { + if x != nil { + return x.Enabled + } + return false +} + +// https://hasura.io/docs/latest/index/ type HasuraFrontend struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -203,6 +270,7 @@ func (x *HasuraFrontend) GetEnabled() bool { return false } +// https://www.graphile.org/postgraphile/ type PostgraphileFrontend struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -250,6 +318,7 @@ func (x *PostgraphileFrontend) GetEnabled() bool { return false } +// https://github.com/sosedoff/pgweb type PGWebFrontend struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -297,6 +366,54 @@ func (x *PGWebFrontend) GetEnabled() bool { return false } +// https://github.com/semiotic-ai/sql-wrapper +type RESTFrontend struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Enabled bool `protobuf:"varint,1,opt,name=enabled,proto3" json:"enabled,omitempty"` +} + +func (x *RESTFrontend) Reset() { + *x = RESTFrontend{} + if protoimpl.UnsafeEnabled { + mi := &file_sf_substreams_sink_sql_v1_services_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RESTFrontend) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RESTFrontend) ProtoMessage() {} + +func (x *RESTFrontend) ProtoReflect() protoreflect.Message { + mi := &file_sf_substreams_sink_sql_v1_services_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RESTFrontend.ProtoReflect.Descriptor instead. +func (*RESTFrontend) Descriptor() ([]byte, []int) { + return file_sf_substreams_sink_sql_v1_services_proto_rawDescGZIP(), []int{5} +} + +func (x *RESTFrontend) GetEnabled() bool { + if x != nil { + return x.Enabled + } + return false +} + var File_sf_substreams_sink_sql_v1_services_proto protoreflect.FileDescriptor var file_sf_substreams_sink_sql_v1_services_proto_rawDesc = []byte{ @@ -306,62 +423,73 @@ var file_sf_substreams_sink_sql_v1_services_proto_rawDesc = []byte{ 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x73, 0x71, 0x6c, 0x2e, 0x76, 0x31, 0x1a, 0x1b, 0x73, 0x66, 0x2f, 0x73, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x22, 0xbf, 0x03, 0x0a, 0x07, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x1e, + 0x74, 0x6f, 0x22, 0x80, 0x04, 0x0a, 0x07, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x1e, 0x0a, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x42, 0x06, 0xc2, 0x89, 0x01, 0x02, 0x08, 0x01, 0x52, 0x06, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x12, 0x48, 0x0a, 0x0a, 0x64, 0x62, 0x74, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x73, 0x66, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x73, 0x71, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x42, 0x54, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x48, 0x00, 0x52, 0x09, 0x64, 0x62, 0x74, 0x43, - 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x88, 0x01, 0x01, 0x12, 0x30, 0x0a, 0x14, 0x77, 0x69, 0x72, 0x65, - 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x5f, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x12, 0x77, 0x69, 0x72, 0x65, 0x50, 0x72, 0x6f, 0x74, - 0x6f, 0x63, 0x6f, 0x6c, 0x41, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12, 0x52, 0x0a, 0x0f, 0x68, 0x61, - 0x73, 0x75, 0x72, 0x61, 0x5f, 0x66, 0x72, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x64, 0x18, 0x04, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x73, 0x66, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x73, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x73, 0x71, 0x6c, 0x2e, 0x76, 0x31, 0x2e, - 0x48, 0x61, 0x73, 0x75, 0x72, 0x61, 0x46, 0x72, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x64, 0x52, 0x0e, - 0x68, 0x61, 0x73, 0x75, 0x72, 0x61, 0x46, 0x72, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x64, 0x12, 0x64, - 0x0a, 0x15, 0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x61, 0x70, 0x68, 0x69, 0x6c, 0x65, 0x5f, 0x66, - 0x72, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2f, 0x2e, - 0x73, 0x66, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x73, 0x69, - 0x6e, 0x6b, 0x2e, 0x73, 0x71, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x67, 0x72, - 0x61, 0x70, 0x68, 0x69, 0x6c, 0x65, 0x46, 0x72, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x64, 0x52, 0x14, - 0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x61, 0x70, 0x68, 0x69, 0x6c, 0x65, 0x46, 0x72, 0x6f, 0x6e, - 0x74, 0x65, 0x6e, 0x64, 0x12, 0x4f, 0x0a, 0x0e, 0x70, 0x67, 0x77, 0x65, 0x62, 0x5f, 0x66, 0x72, - 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x73, + 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x88, 0x01, 0x01, 0x12, 0x52, 0x0a, 0x0f, 0x68, 0x61, 0x73, 0x75, + 0x72, 0x61, 0x5f, 0x66, 0x72, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x29, 0x2e, 0x73, 0x66, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x73, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x73, 0x71, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x61, + 0x73, 0x75, 0x72, 0x61, 0x46, 0x72, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x64, 0x52, 0x0e, 0x68, 0x61, + 0x73, 0x75, 0x72, 0x61, 0x46, 0x72, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x64, 0x12, 0x64, 0x0a, 0x15, + 0x70, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x61, 0x70, 0x68, 0x69, 0x6c, 0x65, 0x5f, 0x66, 0x72, 0x6f, + 0x6e, 0x74, 0x65, 0x6e, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2f, 0x2e, 0x73, 0x66, + 0x2e, 0x73, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x73, 0x69, 0x6e, 0x6b, + 0x2e, 0x73, 0x71, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x6f, 0x73, 0x74, 0x67, 0x72, 0x61, 0x70, + 0x68, 0x69, 0x6c, 0x65, 0x46, 0x72, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x64, 0x52, 0x14, 0x70, 0x6f, + 0x73, 0x74, 0x67, 0x72, 0x61, 0x70, 0x68, 0x69, 0x6c, 0x65, 0x46, 0x72, 0x6f, 0x6e, 0x74, 0x65, + 0x6e, 0x64, 0x12, 0x41, 0x0a, 0x06, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x18, 0x07, 0x20, 0x01, + 0x28, 0x0e, 0x32, 0x29, 0x2e, 0x73, 0x66, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x73, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x73, 0x71, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x53, + 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x45, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x52, 0x06, 0x65, + 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x12, 0x4c, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x74, 0x5f, 0x66, 0x72, + 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x64, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x73, 0x66, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x73, 0x69, 0x6e, - 0x6b, 0x2e, 0x73, 0x71, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x47, 0x57, 0x65, 0x62, 0x46, 0x72, - 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x64, 0x52, 0x0d, 0x70, 0x67, 0x77, 0x65, 0x62, 0x46, 0x72, 0x6f, - 0x6e, 0x74, 0x65, 0x6e, 0x64, 0x42, 0x0d, 0x0a, 0x0b, 0x5f, 0x64, 0x62, 0x74, 0x5f, 0x63, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x22, 0x29, 0x0a, 0x09, 0x44, 0x42, 0x54, 0x43, 0x6f, 0x6e, 0x66, 0x69, - 0x67, 0x12, 0x1c, 0x0a, 0x05, 0x66, 0x69, 0x6c, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, - 0x42, 0x06, 0xc2, 0x89, 0x01, 0x02, 0x10, 0x01, 0x52, 0x05, 0x66, 0x69, 0x6c, 0x65, 0x73, 0x22, - 0x2a, 0x0a, 0x0e, 0x48, 0x61, 0x73, 0x75, 0x72, 0x61, 0x46, 0x72, 0x6f, 0x6e, 0x74, 0x65, 0x6e, - 0x64, 0x12, 0x18, 0x0a, 0x07, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x08, 0x52, 0x07, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x22, 0x30, 0x0a, 0x14, 0x50, - 0x6f, 0x73, 0x74, 0x67, 0x72, 0x61, 0x70, 0x68, 0x69, 0x6c, 0x65, 0x46, 0x72, 0x6f, 0x6e, 0x74, - 0x65, 0x6e, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x22, 0x29, 0x0a, - 0x0d, 0x50, 0x47, 0x57, 0x65, 0x62, 0x46, 0x72, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x64, 0x12, 0x18, + 0x6b, 0x2e, 0x73, 0x71, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x45, 0x53, 0x54, 0x46, 0x72, 0x6f, + 0x6e, 0x74, 0x65, 0x6e, 0x64, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x74, 0x46, 0x72, 0x6f, 0x6e, 0x74, + 0x65, 0x6e, 0x64, 0x22, 0x31, 0x0a, 0x06, 0x45, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x12, 0x09, 0x0a, + 0x05, 0x75, 0x6e, 0x73, 0x65, 0x74, 0x10, 0x00, 0x12, 0x0c, 0x0a, 0x08, 0x70, 0x6f, 0x73, 0x74, + 0x67, 0x72, 0x65, 0x73, 0x10, 0x01, 0x12, 0x0e, 0x0a, 0x0a, 0x63, 0x6c, 0x69, 0x63, 0x6b, 0x68, + 0x6f, 0x75, 0x73, 0x65, 0x10, 0x02, 0x42, 0x0d, 0x0a, 0x0b, 0x5f, 0x64, 0x62, 0x74, 0x5f, 0x63, + 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22, 0x75, 0x0a, 0x09, 0x44, 0x42, 0x54, 0x43, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x12, 0x1c, 0x0a, 0x05, 0x66, 0x69, 0x6c, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0c, 0x42, 0x06, 0xc2, 0x89, 0x01, 0x02, 0x10, 0x01, 0x52, 0x05, 0x66, 0x69, 0x6c, 0x65, 0x73, + 0x12, 0x30, 0x0a, 0x14, 0x72, 0x75, 0x6e, 0x5f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, + 0x5f, 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x12, + 0x72, 0x75, 0x6e, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x53, 0x65, 0x63, 0x6f, 0x6e, + 0x64, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x08, 0x52, 0x07, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x22, 0x2a, 0x0a, 0x0e, + 0x48, 0x61, 0x73, 0x75, 0x72, 0x61, 0x46, 0x72, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, - 0x07, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x42, 0xee, 0x01, 0x0a, 0x1d, 0x63, 0x6f, 0x6d, - 0x2e, 0x73, 0x66, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x73, - 0x69, 0x6e, 0x6b, 0x2e, 0x73, 0x71, 0x6c, 0x2e, 0x76, 0x31, 0x42, 0x0d, 0x53, 0x65, 0x72, 0x76, - 0x69, 0x63, 0x65, 0x73, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x35, 0x67, 0x69, 0x74, - 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, - 0x67, 0x66, 0x61, 0x73, 0x74, 0x2f, 0x73, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, - 0x2d, 0x73, 0x69, 0x6e, 0x6b, 0x2d, 0x73, 0x71, 0x6c, 0x2f, 0x70, 0x62, 0x3b, 0x70, 0x62, 0x73, - 0x71, 0x6c, 0xa2, 0x02, 0x04, 0x53, 0x53, 0x53, 0x53, 0xaa, 0x02, 0x19, 0x53, 0x66, 0x2e, 0x53, - 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x53, 0x69, 0x6e, 0x6b, 0x2e, 0x53, - 0x71, 0x6c, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x19, 0x53, 0x66, 0x5c, 0x53, 0x75, 0x62, 0x73, 0x74, - 0x72, 0x65, 0x61, 0x6d, 0x73, 0x5c, 0x53, 0x69, 0x6e, 0x6b, 0x5c, 0x53, 0x71, 0x6c, 0x5c, 0x56, - 0x31, 0xe2, 0x02, 0x25, 0x53, 0x66, 0x5c, 0x53, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, - 0x73, 0x5c, 0x53, 0x69, 0x6e, 0x6b, 0x5c, 0x53, 0x71, 0x6c, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, - 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x1d, 0x53, 0x66, 0x3a, 0x3a, - 0x53, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x3a, 0x3a, 0x53, 0x69, 0x6e, 0x6b, - 0x3a, 0x3a, 0x53, 0x71, 0x6c, 0x3a, 0x3a, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, + 0x07, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x22, 0x30, 0x0a, 0x14, 0x50, 0x6f, 0x73, 0x74, + 0x67, 0x72, 0x61, 0x70, 0x68, 0x69, 0x6c, 0x65, 0x46, 0x72, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x64, + 0x12, 0x18, 0x0a, 0x07, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x08, 0x52, 0x07, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x22, 0x29, 0x0a, 0x0d, 0x50, 0x47, + 0x57, 0x65, 0x62, 0x46, 0x72, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x65, + 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x65, 0x6e, + 0x61, 0x62, 0x6c, 0x65, 0x64, 0x22, 0x28, 0x0a, 0x0c, 0x52, 0x45, 0x53, 0x54, 0x46, 0x72, 0x6f, + 0x6e, 0x74, 0x65, 0x6e, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x42, + 0xee, 0x01, 0x0a, 0x1d, 0x63, 0x6f, 0x6d, 0x2e, 0x73, 0x66, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x73, 0x71, 0x6c, 0x2e, 0x76, + 0x31, 0x42, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x50, 0x72, 0x6f, 0x74, 0x6f, + 0x50, 0x01, 0x5a, 0x35, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x66, 0x61, 0x73, 0x74, 0x2f, 0x73, 0x75, 0x62, + 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2d, 0x73, 0x69, 0x6e, 0x6b, 0x2d, 0x73, 0x71, 0x6c, + 0x2f, 0x70, 0x62, 0x3b, 0x70, 0x62, 0x73, 0x71, 0x6c, 0xa2, 0x02, 0x04, 0x53, 0x53, 0x53, 0x53, + 0xaa, 0x02, 0x19, 0x53, 0x66, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, + 0x2e, 0x53, 0x69, 0x6e, 0x6b, 0x2e, 0x53, 0x71, 0x6c, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x19, 0x53, + 0x66, 0x5c, 0x53, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x5c, 0x53, 0x69, 0x6e, + 0x6b, 0x5c, 0x53, 0x71, 0x6c, 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x25, 0x53, 0x66, 0x5c, 0x53, 0x75, + 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x5c, 0x53, 0x69, 0x6e, 0x6b, 0x5c, 0x53, 0x71, + 0x6c, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, + 0xea, 0x02, 0x1d, 0x53, 0x66, 0x3a, 0x3a, 0x53, 0x75, 0x62, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x73, 0x3a, 0x3a, 0x53, 0x69, 0x6e, 0x6b, 0x3a, 0x3a, 0x53, 0x71, 0x6c, 0x3a, 0x3a, 0x56, 0x31, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -376,24 +504,28 @@ func file_sf_substreams_sink_sql_v1_services_proto_rawDescGZIP() []byte { return file_sf_substreams_sink_sql_v1_services_proto_rawDescData } -var file_sf_substreams_sink_sql_v1_services_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_sf_substreams_sink_sql_v1_services_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_sf_substreams_sink_sql_v1_services_proto_msgTypes = make([]protoimpl.MessageInfo, 6) var file_sf_substreams_sink_sql_v1_services_proto_goTypes = []interface{}{ - (*Service)(nil), // 0: sf.substreams.sink.sql.v1.Service - (*DBTConfig)(nil), // 1: sf.substreams.sink.sql.v1.DBTConfig - (*HasuraFrontend)(nil), // 2: sf.substreams.sink.sql.v1.HasuraFrontend - (*PostgraphileFrontend)(nil), // 3: sf.substreams.sink.sql.v1.PostgraphileFrontend - (*PGWebFrontend)(nil), // 4: sf.substreams.sink.sql.v1.PGWebFrontend + (Service_Engine)(0), // 0: sf.substreams.sink.sql.v1.Service.Engine + (*Service)(nil), // 1: sf.substreams.sink.sql.v1.Service + (*DBTConfig)(nil), // 2: sf.substreams.sink.sql.v1.DBTConfig + (*HasuraFrontend)(nil), // 3: sf.substreams.sink.sql.v1.HasuraFrontend + (*PostgraphileFrontend)(nil), // 4: sf.substreams.sink.sql.v1.PostgraphileFrontend + (*PGWebFrontend)(nil), // 5: sf.substreams.sink.sql.v1.PGWebFrontend + (*RESTFrontend)(nil), // 6: sf.substreams.sink.sql.v1.RESTFrontend } var file_sf_substreams_sink_sql_v1_services_proto_depIdxs = []int32{ - 1, // 0: sf.substreams.sink.sql.v1.Service.dbt_config:type_name -> sf.substreams.sink.sql.v1.DBTConfig - 2, // 1: sf.substreams.sink.sql.v1.Service.hasura_frontend:type_name -> sf.substreams.sink.sql.v1.HasuraFrontend - 3, // 2: sf.substreams.sink.sql.v1.Service.postgraphile_frontend:type_name -> sf.substreams.sink.sql.v1.PostgraphileFrontend - 4, // 3: sf.substreams.sink.sql.v1.Service.pgweb_frontend:type_name -> sf.substreams.sink.sql.v1.PGWebFrontend - 4, // [4:4] is the sub-list for method output_type - 4, // [4:4] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name + 2, // 0: sf.substreams.sink.sql.v1.Service.dbt_config:type_name -> sf.substreams.sink.sql.v1.DBTConfig + 3, // 1: sf.substreams.sink.sql.v1.Service.hasura_frontend:type_name -> sf.substreams.sink.sql.v1.HasuraFrontend + 4, // 2: sf.substreams.sink.sql.v1.Service.postgraphile_frontend:type_name -> sf.substreams.sink.sql.v1.PostgraphileFrontend + 0, // 3: sf.substreams.sink.sql.v1.Service.engine:type_name -> sf.substreams.sink.sql.v1.Service.Engine + 6, // 4: sf.substreams.sink.sql.v1.Service.rest_frontend:type_name -> sf.substreams.sink.sql.v1.RESTFrontend + 5, // [5:5] is the sub-list for method output_type + 5, // [5:5] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name } func init() { file_sf_substreams_sink_sql_v1_services_proto_init() } @@ -462,6 +594,18 @@ func file_sf_substreams_sink_sql_v1_services_proto_init() { return nil } } + file_sf_substreams_sink_sql_v1_services_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RESTFrontend); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } file_sf_substreams_sink_sql_v1_services_proto_msgTypes[0].OneofWrappers = []interface{}{} type x struct{} @@ -469,13 +613,14 @@ func file_sf_substreams_sink_sql_v1_services_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_sf_substreams_sink_sql_v1_services_proto_rawDesc, - NumEnums: 0, - NumMessages: 5, + NumEnums: 1, + NumMessages: 6, NumExtensions: 0, NumServices: 0, }, GoTypes: file_sf_substreams_sink_sql_v1_services_proto_goTypes, DependencyIndexes: file_sf_substreams_sink_sql_v1_services_proto_depIdxs, + EnumInfos: file_sf_substreams_sink_sql_v1_services_proto_enumTypes, MessageInfos: file_sf_substreams_sink_sql_v1_services_proto_msgTypes, }.Build() File_sf_substreams_sink_sql_v1_services_proto = out.File diff --git a/proto/sf/substreams/sink/sql/v1/services.proto b/proto/sf/substreams/sink/sql/v1/services.proto index 4ccb038..b7de741 100644 --- a/proto/sf/substreams/sink/sql/v1/services.proto +++ b/proto/sf/substreams/sink/sql/v1/services.proto @@ -10,24 +10,43 @@ message Service { // Containing both create table statements and index creation statements. string schema = 1 [ (sf.substreams.options).load_from_file = true ]; optional DBTConfig dbt_config = 2; - bool wire_protocol_access = 3; HasuraFrontend hasura_frontend = 4; PostgraphileFrontend postgraphile_frontend = 5; - PGWebFrontend pgweb_frontend = 6; + + enum Engine { + unset = 0; + postgres = 1; + clickhouse = 2; + } + + Engine engine = 7; + + RESTFrontend rest_frontend = 8; } +// https://www.getdbt.com/product/what-is-dbt message DBTConfig { bytes files = 1 [ (sf.substreams.options).zip_from_folder = true ]; + int32 run_interval_seconds = 2; + bool enabled = 3; } +// https://hasura.io/docs/latest/index/ message HasuraFrontend { bool enabled = 1; } +// https://www.graphile.org/postgraphile/ message PostgraphileFrontend { bool enabled = 1; } +// https://github.com/sosedoff/pgweb message PGWebFrontend { bool enabled = 1; } + +// https://github.com/semiotic-ai/sql-wrapper +message RESTFrontend { + bool enabled = 1; +} diff --git a/sinker/sinker.go b/sinker/sinker.go index 9cc2d31..24b0639 100644 --- a/sinker/sinker.go +++ b/sinker/sinker.go @@ -114,13 +114,15 @@ func (s *SQLSinker) HandleBlockScopedData(ctx context.Context, data *pbsubstream return fmt.Errorf("unmarshal database changes: %w", err) } - if err := s.applyDatabaseChanges(dbChanges); err != nil { + if err := s.applyDatabaseChanges(dbChanges, data.Clock.Number, data.FinalBlockHeight); err != nil { return fmt.Errorf("apply database changes: %w", err) } - if cursor.Block().Num()%s.batchBlockModulo(data, isLive) == 0 { + if data.Clock.Number%s.batchBlockModulo(data, isLive) == 0 { + s.logger.Debug("flushing to database", zap.Stringer("block", cursor.Block()), zap.Bool("is_live", *isLive)) + flushStart := time.Now() - rowFlushedCount, err := s.loader.Flush(ctx, s.OutputModuleHash(), cursor) + rowFlushedCount, err := s.loader.Flush(ctx, s.OutputModuleHash(), cursor, data.FinalBlockHeight) if err != nil { return fmt.Errorf("failed to flush at block %s: %w", cursor.Block(), err) } @@ -146,7 +148,7 @@ func (s *SQLSinker) HandleBlockScopedData(ctx context.Context, data *pbsubstream return nil } -func (s *SQLSinker) applyDatabaseChanges(dbChanges *pbdatabase.DatabaseChanges) error { +func (s *SQLSinker) applyDatabaseChanges(dbChanges *pbdatabase.DatabaseChanges, blockNum, finalBlockNum uint64) error { for _, change := range dbChanges.TableChanges { if !s.loader.HasTable(change.Table) { return fmt.Errorf( @@ -176,19 +178,24 @@ func (s *SQLSinker) applyDatabaseChanges(dbChanges *pbdatabase.DatabaseChanges) changes[field.Name] = field.NewValue } + var reversibleBlockNum *uint64 + if blockNum > finalBlockNum { + reversibleBlockNum = &blockNum + } + switch change.Operation { case pbdatabase.TableChange_CREATE: - err := s.loader.Insert(change.Table, primaryKeys, changes) + err := s.loader.Insert(change.Table, primaryKeys, changes, reversibleBlockNum) if err != nil { return fmt.Errorf("database insert: %w", err) } case pbdatabase.TableChange_UPDATE: - err := s.loader.Update(change.Table, primaryKeys, changes) + err := s.loader.Update(change.Table, primaryKeys, changes, reversibleBlockNum) if err != nil { return fmt.Errorf("database update: %w", err) } case pbdatabase.TableChange_DELETE: - err := s.loader.Delete(change.Table, primaryKeys) + err := s.loader.Delete(change.Table, primaryKeys, reversibleBlockNum) if err != nil { return fmt.Errorf("database delete: %w", err) } @@ -200,7 +207,7 @@ func (s *SQLSinker) applyDatabaseChanges(dbChanges *pbdatabase.DatabaseChanges) } func (s *SQLSinker) HandleBlockUndoSignal(ctx context.Context, data *pbsubstreamsrpc.BlockUndoSignal, cursor *sink.Cursor) error { - return fmt.Errorf("received undo signal but there is no handling of undo, this is because you used `--undo-buffer-size=0` which is invalid right now") + return s.loader.Revert(ctx, s.OutputModuleHash(), cursor, data.LastValidBlock.Number) } func (s *SQLSinker) batchBlockModulo(blockData *pbsubstreamsrpc.BlockScopedData, isLive *bool) uint64 { diff --git a/sinker/sinker_test.go b/sinker/sinker_test.go new file mode 100644 index 0000000..a60459e --- /dev/null +++ b/sinker/sinker_test.go @@ -0,0 +1,384 @@ +package sinker + +import ( + "context" + "database/sql" + "fmt" + "testing" + + "github.com/streamingfast/bstream" + "github.com/streamingfast/logging" + sink "github.com/streamingfast/substreams-sink" + pbdatabase "github.com/streamingfast/substreams-sink-database-changes/pb/sf/substreams/sink/database/v1" + "github.com/streamingfast/substreams-sink-sql/db" + "github.com/streamingfast/substreams/client" + pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" + pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "google.golang.org/protobuf/types/known/anypb" + + _ "github.com/lib/pq" +) + +var logger *zap.Logger +var tracer logging.Tracer + +func init() { + logger, tracer = logging.ApplicationLogger("test", "test") +} + +func TestInserts(t *testing.T) { + + type event struct { + blockNum uint64 + libNum uint64 + tableChanges []*pbdatabase.TableChange + undoSignal bool + } + + tests := []struct { + name string + events []event + expectSQL []string + queryResponses []*sql.Rows + }{ + { + name: "insert final block", + events: []event{ + { + blockNum: 10, + libNum: 10, + tableChanges: []*pbdatabase.TableChange{insertRowSinglePK("xfer", "1234", "from", "sender1", "to", "receiver1")}, + }, + }, + expectSQL: []string{ + `INSERT INTO "testschema"."xfer" ("from","id","to") VALUES ('sender1','1234','receiver1');`, + `DELETE FROM "testschema"."substreams_history" WHERE block_num <= 10;`, + `UPDATE "testschema"."cursors" set cursor = 'bN7dsAhRyo44yl_ykkjA36WwLpc_DFtvXwrlIBBBj4r2', block_num = 10, block_id = '10' WHERE id = '756e75736564';`, + `COMMIT`, + }, + }, + { + name: "insert two final blocks", + events: []event{ + { + blockNum: 10, + libNum: 10, + tableChanges: []*pbdatabase.TableChange{insertRowSinglePK("xfer", "1234", "from", "sender1", "to", "receiver1")}, + }, + { + blockNum: 11, + libNum: 11, + tableChanges: []*pbdatabase.TableChange{insertRowSinglePK("xfer", "2345", "from", "sender2", "to", "receiver2")}, + }, + }, + expectSQL: []string{ + `INSERT INTO "testschema"."xfer" ("from","id","to") VALUES ('sender1','1234','receiver1');`, + `DELETE FROM "testschema"."substreams_history" WHERE block_num <= 10;`, + `UPDATE "testschema"."cursors" set cursor = 'bN7dsAhRyo44yl_ykkjA36WwLpc_DFtvXwrlIBBBj4r2', block_num = 10, block_id = '10' WHERE id = '756e75736564';`, + `COMMIT`, + `INSERT INTO "testschema"."xfer" ("from","id","to") VALUES ('sender2','2345','receiver2');`, + `DELETE FROM "testschema"."substreams_history" WHERE block_num <= 11;`, + `UPDATE "testschema"."cursors" set cursor = 'dR5-m-1v1TQvlVRfIM9SXaWwLpc_DFtuXwrkIBBAj4r3', block_num = 11, block_id = '11' WHERE id = '756e75736564';`, + `COMMIT`, + }, + }, + { + name: "insert a reversible blocks", + events: []event{ + { + blockNum: 10, + libNum: 5, + tableChanges: []*pbdatabase.TableChange{insertRowSinglePK("xfer", "1234", "from", "sender1", "to", "receiver1")}, + }, + }, + expectSQL: []string{ + `INSERT INTO "testschema"."substreams_history" (op,table_name,pk,block_num) values ('I','"testschema"."xfer"','{"id":"1234"}',10);` + + `INSERT INTO "testschema"."xfer" ("from","id","to") VALUES ('sender1','1234','receiver1');`, + `DELETE FROM "testschema"."substreams_history" WHERE block_num <= 5;`, + `UPDATE "testschema"."cursors" set cursor = 'i4tY9gOcWnhKoGjRCl2VUKWwLpcyB1plVAvvLxtE', block_num = 10, block_id = '10' WHERE id = '756e75736564';`, + `COMMIT`, + }, + }, + { + name: "insert, then update", + events: []event{ + { + blockNum: 10, + libNum: 5, + tableChanges: []*pbdatabase.TableChange{insertRowMultiplePK("xfer", map[string]string{"id": "1234", "idx": "3"}, "from", "sender1", "to", "receiver1")}, + }, + { + blockNum: 11, + libNum: 6, + tableChanges: []*pbdatabase.TableChange{ + updateRowMultiplePK("xfer", map[string]string{"id": "2345", "idx": "3"}, "from", "sender2", "to", "receiver2"), + }, + }, + }, + expectSQL: []string{ + `INSERT INTO "testschema"."substreams_history" (op,table_name,pk,block_num) values ('I','"testschema"."xfer"','{"id":"1234","idx":"3"}',10);` + + `INSERT INTO "testschema"."xfer" ("from","id","to") VALUES ('sender1','1234','receiver1');`, + `DELETE FROM "testschema"."substreams_history" WHERE block_num <= 5;`, + `UPDATE "testschema"."cursors" set cursor = 'i4tY9gOcWnhKoGjRCl2VUKWwLpcyB1plVAvvLxtE', block_num = 10, block_id = '10' WHERE id = '756e75736564';`, + `COMMIT`, + `INSERT INTO "testschema"."substreams_history" (op,table_name,pk,prev_value,block_num) SELECT 'U','"testschema"."xfer"','{"id":"2345","idx":"3"}',row_to_json("xfer"),11 FROM "testschema"."xfer" WHERE "id" = '2345' AND "idx" = '3';` + + `UPDATE "testschema"."xfer" SET "from"='sender2', "to"='receiver2' WHERE "id" = '2345' AND "idx" = '3'`, + `DELETE FROM "testschema"."substreams_history" WHERE block_num <= 6;`, + `UPDATE "testschema"."cursors" set cursor = 'LamYQ1PoEJyzLTRd7kdEiKWwLpcyB1tlVArvLBtH', block_num = 11, block_id = '11' WHERE id = '756e75736564';`, + `COMMIT`, + }, + }, + + { + name: "insert, then update, then delete (update disappears)", + events: []event{ + { + blockNum: 10, + libNum: 5, + tableChanges: []*pbdatabase.TableChange{insertRowMultiplePK("xfer", map[string]string{"id": "1234", "idx": "3"}, "from", "sender1", "to", "receiver1")}, + }, + { + blockNum: 11, + libNum: 6, + tableChanges: []*pbdatabase.TableChange{ + updateRowMultiplePK("xfer", map[string]string{"id": "2345", "idx": "3"}, "from", "sender2", "to", "receiver2"), + deleteRowMultiplePK("xfer", map[string]string{"id": "2345", "idx": "3"}), + }, + }, + }, + expectSQL: []string{ + `INSERT INTO "testschema"."substreams_history" (op,table_name,pk,block_num) values ('I','"testschema"."xfer"','{"id":"1234","idx":"3"}',10);` + + `INSERT INTO "testschema"."xfer" ("from","id","to") VALUES ('sender1','1234','receiver1');`, + `DELETE FROM "testschema"."substreams_history" WHERE block_num <= 5;`, + `UPDATE "testschema"."cursors" set cursor = 'i4tY9gOcWnhKoGjRCl2VUKWwLpcyB1plVAvvLxtE', block_num = 10, block_id = '10' WHERE id = '756e75736564';`, + `COMMIT`, + // the following gets deduped + //`INSERT INTO "testschema"."substreams_history" (op,table_name,pk,prev_value,block_num) SELECT 'U','"testschema"."xfer"','{"id":"2345","idx":"3"}',row_to_json("xfer"),11 FROM "testschema"."xfer" WHERE "id" = '2345' AND "idx" = '3';` + + // `UPDATE "testschema"."xfer" SET "from"='sender2', "to"='receiver2' WHERE "id" = '2345' AND "idx" = '3'`, + `INSERT INTO "testschema"."substreams_history" (op,table_name,pk,prev_value,block_num) SELECT 'D','"testschema"."xfer"','{"id":"2345","idx":"3"}',row_to_json("xfer"),11 FROM "testschema"."xfer" WHERE "id" = '2345' AND "idx" = '3';` + + `DELETE FROM "testschema"."xfer" WHERE "id" = '2345' AND "idx" = '3'`, + `DELETE FROM "testschema"."substreams_history" WHERE block_num <= 6;`, + `UPDATE "testschema"."cursors" set cursor = 'LamYQ1PoEJyzLTRd7kdEiKWwLpcyB1tlVArvLBtH', block_num = 11, block_id = '11' WHERE id = '756e75736564';`, + `COMMIT`, + }, + }, + + { + name: "insert two reversible blocks, then UNDO last", + events: []event{ + { + blockNum: 10, + libNum: 5, + tableChanges: []*pbdatabase.TableChange{insertRowSinglePK("xfer", "1234", "from", "sender1", "to", "receiver1")}, + }, + { + blockNum: 11, + libNum: 5, + tableChanges: []*pbdatabase.TableChange{insertRowSinglePK("xfer", "2345", "from", "sender2", "to", "receiver2")}, + }, + { + blockNum: 10, // undo everything above 10 + libNum: 5, + undoSignal: true, + }, + }, + expectSQL: []string{ + `INSERT INTO "testschema"."substreams_history" (op,table_name,pk,block_num) values ('I','"testschema"."xfer"','{"id":"1234"}',10);` + + `INSERT INTO "testschema"."xfer" ("from","id","to") VALUES ('sender1','1234','receiver1');`, + `DELETE FROM "testschema"."substreams_history" WHERE block_num <= 5;`, + `UPDATE "testschema"."cursors" set cursor = 'i4tY9gOcWnhKoGjRCl2VUKWwLpcyB1plVAvvLxtE', block_num = 10, block_id = '10' WHERE id = '756e75736564';`, + `COMMIT`, + `INSERT INTO "testschema"."substreams_history" (op,table_name,pk,block_num) values ('I','"testschema"."xfer"','{"id":"2345"}',11);` + + `INSERT INTO "testschema"."xfer" ("from","id","to") VALUES ('sender2','2345','receiver2');`, + `DELETE FROM "testschema"."substreams_history" WHERE block_num <= 5;`, + `UPDATE "testschema"."cursors" set cursor = 'Euaqz6R-ylLG0gbdej7Me6WwLpcyB1tlVArvLxtE', block_num = 11, block_id = '11' WHERE id = '756e75736564';`, + `COMMIT`, + `SELECT op,table_name,pk,prev_value,block_num FROM "testschema"."substreams_history" WHERE "block_num" > 10 ORDER BY "block_num" DESC`, + + //`DELETE FROM "testschema"."xfer" WHERE "id" = "2345";`, // this mechanism is tested in db.revertOp + `DELETE FROM "testschema"."substreams_history" WHERE "block_num" > 10;`, + `UPDATE "testschema"."cursors" set cursor = 'i4tY9gOcWnhKoGjRCl2VUKWwLpcyB1plVAvvLxtE', block_num = 10, block_id = '10' WHERE id = '756e75736564';`, + `COMMIT`, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + l, tx := db.NewTestLoader( + logger, + tracer, + "testschema", + db.TestTables("testschema"), + ) + s, err := sink.New(sink.SubstreamsModeDevelopment, testPackage, testPackage.Modules.Modules[0], []byte("unused"), testClientConfig, logger, nil) + require.NoError(t, err) + sinker, _ := New(s, l, logger, nil) + + for _, evt := range test.events { + if evt.undoSignal { + cursor := simpleCursor(evt.blockNum, evt.libNum) + err := sinker.HandleBlockUndoSignal(ctx, &pbsubstreamsrpc.BlockUndoSignal{ + LastValidBlock: &pbsubstreams.BlockRef{Id: fmt.Sprintf("%d", evt.blockNum), Number: evt.blockNum}, + LastValidCursor: cursor, + }, sink.MustNewCursor(cursor)) + require.NoError(t, err) + continue + } + + err := sinker.HandleBlockScopedData( + ctx, + blockScopedData("db_out", evt.tableChanges, evt.blockNum, evt.libNum), + flushEveryBlock, sink.MustNewCursor(simpleCursor(evt.blockNum, evt.libNum)), + ) + require.NoError(t, err) + } + + results := tx.Results() + assert.Equal(t, test.expectSQL, results) + + }) + } + +} + +var T = true +var flushEveryBlock = &T + +var testPackage = &pbsubstreams.Package{ + Modules: &pbsubstreams.Modules{ + Modules: []*pbsubstreams.Module{ + { + Name: "db_out", + Kind: &pbsubstreams.Module_KindMap_{}, + Output: &pbsubstreams.Module_Output{ + Type: "proto:sf.substreams.sink.database.v1.DatabaseChanges", + }, + }, + }, + }, +} + +var testClientConfig = &client.SubstreamsClientConfig{} + +func pruneAbove(blockNum uint64) string { + return fmt.Sprintf(`DELETE FROM "testschema"."inserts_history" WHERE block_num > %d;DELETE FROM "testschema"."updates_history" WHERE block_num > %d;DELETE FROM "testschema"."deletes_history" WHERE block_num > %d;`, + blockNum, blockNum, blockNum) +} + +func pruneBelow(blockNum uint64) string { + return fmt.Sprintf(`DELETE FROM "testschema"."inserts_history" WHERE block_num <= %d;DELETE FROM "testschema"."updates_history" WHERE block_num <= %d;DELETE FROM "testschema"."deletes_history" WHERE block_num <= %d;`, + blockNum, blockNum, blockNum) +} + +func getFields(fieldsAndValues ...string) (out []*pbdatabase.Field) { + if len(fieldsAndValues)%2 != 0 { + panic("tableChangeSinglePK needs even number of fieldsAndValues") + } + for i := 0; i < len(fieldsAndValues); i += 2 { + out = append(out, &pbdatabase.Field{ + Name: fieldsAndValues[i], + NewValue: fieldsAndValues[i+1], + }) + } + return +} + +func insertRowSinglePK(table string, pk string, fieldsAndValues ...string) *pbdatabase.TableChange { + return &pbdatabase.TableChange{ + Table: table, + PrimaryKey: &pbdatabase.TableChange_Pk{ + Pk: pk, + }, + Operation: pbdatabase.TableChange_CREATE, + Fields: getFields(fieldsAndValues...), + } +} + +func insertRowMultiplePK(table string, pk map[string]string, fieldsAndValues ...string) *pbdatabase.TableChange { + return &pbdatabase.TableChange{ + Table: table, + PrimaryKey: &pbdatabase.TableChange_CompositePk{ + CompositePk: &pbdatabase.CompositePrimaryKey{ + Keys: pk, + }, + }, + Operation: pbdatabase.TableChange_CREATE, + Fields: getFields(fieldsAndValues...), + } +} + +func updateRowMultiplePK(table string, pk map[string]string, fieldsAndValues ...string) *pbdatabase.TableChange { + return &pbdatabase.TableChange{ + Table: table, + PrimaryKey: &pbdatabase.TableChange_CompositePk{ + CompositePk: &pbdatabase.CompositePrimaryKey{ + Keys: pk, + }, + }, + Operation: pbdatabase.TableChange_UPDATE, + Fields: getFields(fieldsAndValues...), + } +} +func deleteRowMultiplePK(table string, pk map[string]string) *pbdatabase.TableChange { + return &pbdatabase.TableChange{ + Table: table, + PrimaryKey: &pbdatabase.TableChange_CompositePk{ + CompositePk: &pbdatabase.CompositePrimaryKey{ + Keys: pk, + }, + }, + Operation: pbdatabase.TableChange_DELETE, + } +} + +func blockScopedData(module string, changes []*pbdatabase.TableChange, blockNum uint64, finalBlockNum uint64) *pbsubstreamsrpc.BlockScopedData { + mapOutput, err := anypb.New(&pbdatabase.DatabaseChanges{ + TableChanges: changes, + }) + if err != nil { + panic(err) + } + + return &pbsubstreamsrpc.BlockScopedData{ + Output: &pbsubstreamsrpc.MapModuleOutput{ + Name: module, + MapOutput: mapOutput, + }, + Clock: clock(fmt.Sprintf("%d", blockNum), blockNum), + Cursor: simpleCursor(blockNum, finalBlockNum), + FinalBlockHeight: finalBlockNum, + } +} +func mustNewTableInfo(schema, name string, pkList []string, columnsByName map[string]*db.ColumnInfo) *db.TableInfo { + ti, err := db.NewTableInfo(schema, name, pkList, columnsByName) + if err != nil { + panic(err) + } + return ti +} + +func clock(id string, num uint64) *pbsubstreams.Clock { + return &pbsubstreams.Clock{Id: id, Number: num} +} + +func simpleCursor(num, finalNum uint64) string { + id := fmt.Sprintf("%d", num) + finalID := fmt.Sprintf("%d", finalNum) + blk := bstream.NewBlockRef(id, num) + lib := bstream.NewBlockRef(finalID, finalNum) + step := bstream.StepNew + if id == finalID { + step = bstream.StepNewIrreversible + } + + return (&bstream.Cursor{ + Step: step, + Block: blk, + LIB: lib, + HeadBlock: blk, + }).ToOpaque() +} diff --git a/substreams.yaml b/substreams.yaml index 7ff3022..c958d75 100644 --- a/substreams.yaml +++ b/substreams.yaml @@ -1,7 +1,7 @@ specVersion: v0.1.0 package: name: substreams_sink_sql_protodefs - version: v1.0.2 + version: v1.0.7 url: https://github.com/streamingfast/substreams-sink-sql doc: | Protobuf definitions for Substreams SQL Sink modules.