From 76c6ebbc2c3e5dad3d8fa8361b2d5336f62bb1a8 Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 13 Feb 2025 20:30:44 +0000 Subject: [PATCH 1/3] Add horizon flags to enable ingestion load test ledger backend --- ingest/loadtest/ledger_backend.go | 4 +++ services/horizon/cmd/db.go | 3 +++ services/horizon/internal/config.go | 4 +++ services/horizon/internal/flags.go | 31 ++++++++++++++++++++++++ services/horizon/internal/ingest/main.go | 18 ++++++++++++++ services/horizon/internal/init.go | 3 +++ 6 files changed, 63 insertions(+) diff --git a/ingest/loadtest/ledger_backend.go b/ingest/loadtest/ledger_backend.go index b02188edb6..99a257abda 100644 --- a/ingest/loadtest/ledger_backend.go +++ b/ingest/loadtest/ledger_backend.go @@ -12,6 +12,7 @@ import ( "github.com/stellar/go/ingest" "github.com/stellar/go/ingest/ledgerbackend" + "github.com/stellar/go/support/log" "github.com/stellar/go/xdr" ) @@ -192,6 +193,9 @@ func (r *LedgerBackend) PrepareRange(ctx context.Context, ledgerRange ledgerback } cleanup = false + log.WithField("start", r.startLedgerSeq). + WithField("end", latestLedgerSeq). + Info("ingesting ledgers from loadtest ledger backend") r.mergedLedgersFilePath = mergedLedgersFile.Name() r.mergedLedgersStream = mergedLedgersStream // from this point, ledgers will be available at a rate of once diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 58c096a782..f970cfb23f 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -195,6 +195,9 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, SkipTxmeta: config.SkipTxmeta, LedgerBackendType: ledgerBackendType, StorageBackendConfig: storageBackendConfig, + LoadTestFixturesPath: config.IngestionLoadTestFixturesPath, + LoadTestLedgersPath: config.IngestionLoadTestLedgersPath, + LoadTestCloseDuration: config.IngestionLoadTestCloseDuration, } if ingestConfig.HistorySession, err = db.Open("postgres", config.DatabaseURL); err != nil { diff --git a/services/horizon/internal/config.go b/services/horizon/internal/config.go index b0e1ecb9df..547a74ddf4 100644 --- a/services/horizon/internal/config.go +++ b/services/horizon/internal/config.go @@ -28,6 +28,10 @@ type Config struct { CaptiveCoreConfigUseDB bool HistoryArchiveCaching bool + IngestionLoadTestFixturesPath string + IngestionLoadTestLedgersPath string + IngestionLoadTestCloseDuration time.Duration + StellarCoreURL string // MaxDBConnections has a priority over all 4 values below. diff --git a/services/horizon/internal/flags.go b/services/horizon/internal/flags.go index 397e3342ad..a56fbcf669 100644 --- a/services/horizon/internal/flags.go +++ b/services/horizon/internal/flags.go @@ -186,6 +186,37 @@ func Flags() (*Config, support.ConfigOptions) { ConfigKey: &config.CaptiveCoreBinaryPath, UsedInCommands: IngestionCommands, }, + &support.ConfigOption{ + Name: "ingestion-load-test-fixtures-path", + OptType: types.String, + FlagDefault: "", + Required: false, + Usage: "path to ledger entries file which will be used as fixtures for the ingestion load test.", + ConfigKey: &config.IngestionLoadTestFixturesPath, + UsedInCommands: IngestionCommands, + }, + &support.ConfigOption{ + Name: "ingestion-load-test-ledgers-path", + OptType: types.String, + FlagDefault: "", + Required: false, + Usage: "path to ledgers file which will be replayed in the ingestion load test.", + ConfigKey: &config.IngestionLoadTestLedgersPath, + UsedInCommands: IngestionCommands, + }, + &support.ConfigOption{ + Name: "ingestion-load-test-close-duration", + OptType: types.Float64, + FlagDefault: 2.0, + Required: false, + CustomSetValue: func(co *support.ConfigOption) error { + *(co.ConfigKey.(*time.Duration)) = time.Duration(viper.GetFloat64(co.Name)) * time.Second + return nil + }, + Usage: "the time (in seconds) it takes to close ledgers in the ingestion load test.", + ConfigKey: &config.IngestionLoadTestCloseDuration, + UsedInCommands: IngestionCommands, + }, &support.ConfigOption{ Name: DisableTxSubFlagName, OptType: types.Bool, diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 8711684e59..258d6eca64 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -17,6 +17,7 @@ import ( "github.com/stellar/go/historyarchive" "github.com/stellar/go/ingest" "github.com/stellar/go/ingest/ledgerbackend" + "github.com/stellar/go/ingest/loadtest" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/services/horizon/internal/ingest/filters" apkg "github.com/stellar/go/support/app" @@ -151,6 +152,10 @@ type Config struct { LedgerBackendType LedgerBackendType StorageBackendConfig StorageBackendConfig + + LoadTestFixturesPath string + LoadTestLedgersPath string + LoadTestCloseDuration time.Duration } const ( @@ -335,6 +340,19 @@ func NewSystem(config Config) (System, error) { } } + if config.LoadTestLedgersPath != "" { + if !config.DisableStateVerification { + return nil, fmt.Errorf("state verication cannot be enabled during ingestion load tests") + } + ledgerBackend = loadtest.NewLedgerBackend(loadtest.LedgerBackendConfig{ + NetworkPassphrase: config.NetworkPassphrase, + LedgerBackend: ledgerBackend, + LedgersFilePath: config.LoadTestLedgersPath, + LedgerEntriesFilePath: config.LoadTestFixturesPath, + LedgerCloseDuration: config.LoadTestCloseDuration, + }) + } + historyQ := &history.Q{config.HistorySession.Clone()} historyAdapter := newHistoryArchiveAdapter(archive) filters := filters.NewFilters() diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index c245970117..6529eeb31f 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -108,6 +108,9 @@ func initIngester(app *App) { RetentionCount: uint32(app.config.HistoryRetentionCount), BatchSize: uint32(app.config.HistoryRetentionReapCount), }, + LoadTestFixturesPath: app.config.IngestionLoadTestFixturesPath, + LoadTestLedgersPath: app.config.IngestionLoadTestLedgersPath, + LoadTestCloseDuration: app.config.IngestionLoadTestCloseDuration, }) if err != nil { From 4b477b42e4cbac385d23f7f52e92d0ed5524826f Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 13 Feb 2025 22:38:41 +0000 Subject: [PATCH 2/3] fix go vet --- services/horizon/internal/ingest/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 258d6eca64..26c0c2c83e 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -342,6 +342,7 @@ func NewSystem(config Config) (System, error) { if config.LoadTestLedgersPath != "" { if !config.DisableStateVerification { + cancel() return nil, fmt.Errorf("state verication cannot be enabled during ingestion load tests") } ledgerBackend = loadtest.NewLedgerBackend(loadtest.LedgerBackendConfig{ From f6ad702aaea59919e86467736b1833e2938519b3 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 14 Feb 2025 10:00:28 +0000 Subject: [PATCH 3/3] add descriptive errors --- ingest/loadtest/ledger_backend.go | 51 ++++++++++++++++--------------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/ingest/loadtest/ledger_backend.go b/ingest/loadtest/ledger_backend.go index 99a257abda..549d2725da 100644 --- a/ingest/loadtest/ledger_backend.go +++ b/ingest/loadtest/ledger_backend.go @@ -67,11 +67,11 @@ func (r *LedgerBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, er func readLedgerEntries(path string) ([]xdr.LedgerEntry, error) { file, err := os.Open(path) if err != nil { - return nil, err + return nil, fmt.Errorf("could not open file: %w", err) } stream, err := xdr.NewZstdStream(file) if err != nil { - return nil, err + return nil, fmt.Errorf("could not open zstd read stream: %w", err) } var entries []xdr.LedgerEntry @@ -82,13 +82,13 @@ func readLedgerEntries(path string) ([]xdr.LedgerEntry, error) { break } if err != nil { - return nil, err + return nil, fmt.Errorf("could not read from zstd stream: %w", err) } entries = append(entries, entry) } if err = stream.Close(); err != nil { - return nil, err + return nil, fmt.Errorf("could not close zstd stream: %w", err) } return entries, nil } @@ -102,25 +102,25 @@ func (r *LedgerBackend) PrepareRange(ctx context.Context, ledgerRange ledgerback } generatedLedgerEntries, err := readLedgerEntries(r.config.LedgerEntriesFilePath) if err != nil { - return err + return fmt.Errorf("could not parse ledger entries file: %w", err) } generatedLedgersFile, err := os.Open(r.config.LedgersFilePath) if err != nil { - return err + return fmt.Errorf("could not open ledgers file: %w", err) } generatedLedgers, err := xdr.NewZstdStream(generatedLedgersFile) if err != nil { - return err + return fmt.Errorf("could not open zstd stream for ledgers file: %w", err) } err = r.config.LedgerBackend.PrepareRange(ctx, ledgerRange) if err != nil { - return err + return fmt.Errorf("could not prepare range using real ledger backend: %w", err) } cur := ledgerRange.From() firstLedger, err := r.config.LedgerBackend.GetLedger(ctx, cur) if err != nil { - return err + return fmt.Errorf("could not get ledger %v from real ledger backend: %w", cur, err) } var changes xdr.LedgerEntryChanges for i := 0; i < len(generatedLedgerEntries); i++ { @@ -140,8 +140,11 @@ func (r *LedgerBackend) PrepareRange(ctx context.Context, ledgerRange ledgerback mergedLedgersFile, err := os.CreateTemp("", "merged-ledgers") if err != nil { - return err + return fmt.Errorf("could not create merged ledgers file: %w", err) } + log.WithField("path", mergedLedgersFile.Name()). + Info("creating temporary merged ledgers file") + cleanup := true defer func() { if cleanup { @@ -150,7 +153,7 @@ func (r *LedgerBackend) PrepareRange(ctx context.Context, ledgerRange ledgerback }() writer, err := zstd.NewWriter(mergedLedgersFile) if err != nil { - return err + return fmt.Errorf("could not create zstd writer for merged ledgers file: %w", err) } var latestLedgerSeq uint32 @@ -158,38 +161,38 @@ func (r *LedgerBackend) PrepareRange(ctx context.Context, ledgerRange ledgerback var ledger xdr.LedgerCloseMeta ledger, err = r.config.LedgerBackend.GetLedger(ctx, cur) if err != nil { - return err + return fmt.Errorf("could not get ledger %v from real ledger backend: %w", cur, err) } var generatedLedger xdr.LedgerCloseMeta if err = generatedLedgers.ReadOne(&generatedLedger); err == io.EOF { break } else if err != nil { - return err + return fmt.Errorf("could not get generated ledger: %w", err) } if err = MergeLedgers(r.config.NetworkPassphrase, &ledger, generatedLedger); err != nil { - return err + return fmt.Errorf("could not merge ledgers: %w", err) } if err = xdr.MarshalFramed(writer, ledger); err != nil { - return err + return fmt.Errorf("could not marshal ledger to stream: %w", err) } latestLedgerSeq = cur } if err = generatedLedgers.Close(); err != nil { - return err + return fmt.Errorf("could not close generated ledgers xdr stream: %w", err) } if err = writer.Close(); err != nil { - return err + return fmt.Errorf("could not close zstd writer: %w", err) } if err = mergedLedgersFile.Sync(); err != nil { - return err + return fmt.Errorf("could not sync merged ledgers file: %w", err) } if _, err = mergedLedgersFile.Seek(0, 0); err != nil { - return err + return fmt.Errorf("could not seek to begining of merged ledgers file: %w", err) } mergedLedgersStream, err := xdr.NewZstdStream(mergedLedgersFile) if err != nil { - return err + return fmt.Errorf("could not open zstd read stream for merged ledgers file: %w", err) } cleanup = false @@ -248,7 +251,7 @@ func (r *LedgerBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.Led sequence, ) } else if err != nil { - return ledger, err + return ledger, fmt.Errorf("could read ledger from merged ledgers stream: %w", err) } if ledger.LedgerSequence() != r.nextLedgerSeq { return ledger, fmt.Errorf( @@ -268,15 +271,15 @@ func (r *LedgerBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.Led func (r *LedgerBackend) Close() error { if err := r.config.LedgerBackend.Close(); err != nil { - return err + return fmt.Errorf("could not close real ledger backend: %w", err) } if r.mergedLedgersStream != nil { // closing the stream will also close the ledgers file if err := r.mergedLedgersStream.Close(); err != nil { - return err + return fmt.Errorf("could not close merged ledgers xdr stream: %w", err) } if err := os.Remove(r.mergedLedgersFilePath); err != nil { - return err + return fmt.Errorf("could not remove merged ledgers file: %w", err) } } return nil