Skip to content

services/horizon: Add horizon flags to enable ingestion load test ledger backend #5602

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 31 additions & 24 deletions ingest/loadtest/ledger_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -66,11 +67,11 @@ func (r *LedgerBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, er
func readLedgerEntries(path string) ([]xdr.LedgerEntry, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
return nil, fmt.Errorf("could not open file: %w", err)
}
stream, err := xdr.NewZstdStream(file)
if err != nil {
return nil, err
return nil, fmt.Errorf("could not open zstd read stream: %w", err)
}

var entries []xdr.LedgerEntry
Expand All @@ -81,13 +82,13 @@ func readLedgerEntries(path string) ([]xdr.LedgerEntry, error) {
break
}
if err != nil {
return nil, err
return nil, fmt.Errorf("could not read from zstd stream: %w", err)
}
entries = append(entries, entry)
}

if err = stream.Close(); err != nil {
return nil, err
return nil, fmt.Errorf("could not close zstd stream: %w", err)
}
return entries, nil
}
Expand All @@ -101,25 +102,25 @@ func (r *LedgerBackend) PrepareRange(ctx context.Context, ledgerRange ledgerback
}
generatedLedgerEntries, err := readLedgerEntries(r.config.LedgerEntriesFilePath)
if err != nil {
return err
return fmt.Errorf("could not parse ledger entries file: %w", err)
}
generatedLedgersFile, err := os.Open(r.config.LedgersFilePath)
if err != nil {
return err
return fmt.Errorf("could not open ledgers file: %w", err)
}
generatedLedgers, err := xdr.NewZstdStream(generatedLedgersFile)
if err != nil {
return err
return fmt.Errorf("could not open zstd stream for ledgers file: %w", err)
}

err = r.config.LedgerBackend.PrepareRange(ctx, ledgerRange)
if err != nil {
return err
return fmt.Errorf("could not prepare range using real ledger backend: %w", err)
}
cur := ledgerRange.From()
firstLedger, err := r.config.LedgerBackend.GetLedger(ctx, cur)
if err != nil {
return err
return fmt.Errorf("could not get ledger %v from real ledger backend: %w", cur, err)
}
var changes xdr.LedgerEntryChanges
for i := 0; i < len(generatedLedgerEntries); i++ {
Expand All @@ -139,8 +140,11 @@ func (r *LedgerBackend) PrepareRange(ctx context.Context, ledgerRange ledgerback

mergedLedgersFile, err := os.CreateTemp("", "merged-ledgers")
if err != nil {
return err
return fmt.Errorf("could not create merged ledgers file: %w", err)
}
log.WithField("path", mergedLedgersFile.Name()).
Info("creating temporary merged ledgers file")

cleanup := true
defer func() {
if cleanup {
Expand All @@ -149,49 +153,52 @@ func (r *LedgerBackend) PrepareRange(ctx context.Context, ledgerRange ledgerback
}()
writer, err := zstd.NewWriter(mergedLedgersFile)
if err != nil {
return err
return fmt.Errorf("could not create zstd writer for merged ledgers file: %w", err)
}

var latestLedgerSeq uint32
for cur = cur + 1; !ledgerRange.Bounded() || cur <= ledgerRange.To(); cur++ {
var ledger xdr.LedgerCloseMeta
ledger, err = r.config.LedgerBackend.GetLedger(ctx, cur)
if err != nil {
return err
return fmt.Errorf("could not get ledger %v from real ledger backend: %w", cur, err)
}
var generatedLedger xdr.LedgerCloseMeta
if err = generatedLedgers.ReadOne(&generatedLedger); err == io.EOF {
break
} else if err != nil {
return err
return fmt.Errorf("could not get generated ledger: %w", err)
}
if err = MergeLedgers(r.config.NetworkPassphrase, &ledger, generatedLedger); err != nil {
return err
return fmt.Errorf("could not merge ledgers: %w", err)
}
if err = xdr.MarshalFramed(writer, ledger); err != nil {
return err
return fmt.Errorf("could not marshal ledger to stream: %w", err)
}
latestLedgerSeq = cur
}
if err = generatedLedgers.Close(); err != nil {
return err
return fmt.Errorf("could not close generated ledgers xdr stream: %w", err)
}
if err = writer.Close(); err != nil {
return err
return fmt.Errorf("could not close zstd writer: %w", err)
}
if err = mergedLedgersFile.Sync(); err != nil {
return err
return fmt.Errorf("could not sync merged ledgers file: %w", err)
}

if _, err = mergedLedgersFile.Seek(0, 0); err != nil {
return err
return fmt.Errorf("could not seek to begining of merged ledgers file: %w", err)
}
mergedLedgersStream, err := xdr.NewZstdStream(mergedLedgersFile)
if err != nil {
return err
return fmt.Errorf("could not open zstd read stream for merged ledgers file: %w", err)
}
cleanup = false

log.WithField("start", r.startLedgerSeq).
WithField("end", latestLedgerSeq).
Info("ingesting ledgers from loadtest ledger backend")
r.mergedLedgersFilePath = mergedLedgersFile.Name()
r.mergedLedgersStream = mergedLedgersStream
// from this point, ledgers will be available at a rate of once
Expand Down Expand Up @@ -244,7 +251,7 @@ func (r *LedgerBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.Led
sequence,
)
} else if err != nil {
return ledger, err
return ledger, fmt.Errorf("could read ledger from merged ledgers stream: %w", err)
}
if ledger.LedgerSequence() != r.nextLedgerSeq {
return ledger, fmt.Errorf(
Expand All @@ -264,15 +271,15 @@ func (r *LedgerBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.Led

func (r *LedgerBackend) Close() error {
if err := r.config.LedgerBackend.Close(); err != nil {
return err
return fmt.Errorf("could not close real ledger backend: %w", err)
}
if r.mergedLedgersStream != nil {
// closing the stream will also close the ledgers file
if err := r.mergedLedgersStream.Close(); err != nil {
return err
return fmt.Errorf("could not close merged ledgers xdr stream: %w", err)
}
if err := os.Remove(r.mergedLedgersFilePath); err != nil {
return err
return fmt.Errorf("could not remove merged ledgers file: %w", err)
}
}
return nil
Expand Down
3 changes: 3 additions & 0 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
SkipTxmeta: config.SkipTxmeta,
LedgerBackendType: ledgerBackendType,
StorageBackendConfig: storageBackendConfig,
LoadTestFixturesPath: config.IngestionLoadTestFixturesPath,
LoadTestLedgersPath: config.IngestionLoadTestLedgersPath,
LoadTestCloseDuration: config.IngestionLoadTestCloseDuration,
}

if ingestConfig.HistorySession, err = db.Open("postgres", config.DatabaseURL); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ type Config struct {
CaptiveCoreConfigUseDB bool
HistoryArchiveCaching bool

IngestionLoadTestFixturesPath string
IngestionLoadTestLedgersPath string
IngestionLoadTestCloseDuration time.Duration

StellarCoreURL string

// MaxDBConnections has a priority over all 4 values below.
Expand Down
31 changes: 31 additions & 0 deletions services/horizon/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,37 @@ func Flags() (*Config, support.ConfigOptions) {
ConfigKey: &config.CaptiveCoreBinaryPath,
UsedInCommands: IngestionCommands,
},
&support.ConfigOption{
Name: "ingestion-load-test-fixtures-path",
OptType: types.String,
FlagDefault: "",
Required: false,
Usage: "path to ledger entries file which will be used as fixtures for the ingestion load test.",
ConfigKey: &config.IngestionLoadTestFixturesPath,
UsedInCommands: IngestionCommands,
},
&support.ConfigOption{
Name: "ingestion-load-test-ledgers-path",
OptType: types.String,
FlagDefault: "",
Required: false,
Usage: "path to ledgers file which will be replayed in the ingestion load test.",
ConfigKey: &config.IngestionLoadTestLedgersPath,
UsedInCommands: IngestionCommands,
},
&support.ConfigOption{
Name: "ingestion-load-test-close-duration",
OptType: types.Float64,
FlagDefault: 2.0,
Required: false,
CustomSetValue: func(co *support.ConfigOption) error {
*(co.ConfigKey.(*time.Duration)) = time.Duration(viper.GetFloat64(co.Name)) * time.Second
return nil
},
Usage: "the time (in seconds) it takes to close ledgers in the ingestion load test.",
ConfigKey: &config.IngestionLoadTestCloseDuration,
UsedInCommands: IngestionCommands,
},
&support.ConfigOption{
Name: DisableTxSubFlagName,
OptType: types.Bool,
Expand Down
19 changes: 19 additions & 0 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/ingest/loadtest"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/services/horizon/internal/ingest/filters"
apkg "github.com/stellar/go/support/app"
Expand Down Expand Up @@ -151,6 +152,10 @@ type Config struct {

LedgerBackendType LedgerBackendType
StorageBackendConfig StorageBackendConfig

LoadTestFixturesPath string
LoadTestLedgersPath string
LoadTestCloseDuration time.Duration
}

const (
Expand Down Expand Up @@ -335,6 +340,20 @@ func NewSystem(config Config) (System, error) {
}
}

if config.LoadTestLedgersPath != "" {
if !config.DisableStateVerification {
cancel()
return nil, fmt.Errorf("state verication cannot be enabled during ingestion load tests")
}
ledgerBackend = loadtest.NewLedgerBackend(loadtest.LedgerBackendConfig{
NetworkPassphrase: config.NetworkPassphrase,
LedgerBackend: ledgerBackend,
LedgersFilePath: config.LoadTestLedgersPath,
LedgerEntriesFilePath: config.LoadTestFixturesPath,
LedgerCloseDuration: config.LoadTestCloseDuration,
})
}

historyQ := &history.Q{config.HistorySession.Clone()}
historyAdapter := newHistoryArchiveAdapter(archive)
filters := filters.NewFilters()
Expand Down
3 changes: 3 additions & 0 deletions services/horizon/internal/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ func initIngester(app *App) {
RetentionCount: uint32(app.config.HistoryRetentionCount),
BatchSize: uint32(app.config.HistoryRetentionReapCount),
},
LoadTestFixturesPath: app.config.IngestionLoadTestFixturesPath,
LoadTestLedgersPath: app.config.IngestionLoadTestLedgersPath,
LoadTestCloseDuration: app.config.IngestionLoadTestCloseDuration,
})

if err != nil {
Expand Down
Loading