From 63c31e20047f26edb75ffad7cb53bb60d9eab17f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Tue, 30 Jul 2024 12:01:15 +0200 Subject: [PATCH 1/4] add reader-node-firehose --- .gitignore | 4 +- cmd/apps/reader_node_firehose.go | 74 +++++++++ cmd/main.go | 1 + node-manager/app/firehose_reader/app.go | 146 ++++++++++++++++++ .../app/firehose_reader/console_reader.go | 114 ++++++++++++++ 5 files changed, 338 insertions(+), 1 deletion(-) create mode 100644 cmd/apps/reader_node_firehose.go create mode 100644 node-manager/app/firehose_reader/app.go create mode 100644 node-manager/app/firehose_reader/console_reader.go diff --git a/.gitignore b/.gitignore index c6d0140..44db5e9 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,6 @@ .envrc .env .DS_Store -firehose-data* \ No newline at end of file +firehose-data* +/firecore +/firehose.yaml diff --git a/cmd/apps/reader_node_firehose.go b/cmd/apps/reader_node_firehose.go new file mode 100644 index 0000000..42a0241 --- /dev/null +++ b/cmd/apps/reader_node_firehose.go @@ -0,0 +1,74 @@ +// Copyright 2021 dfuse Platform Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package apps + +import ( + "github.com/spf13/cobra" + "github.com/spf13/viper" + firecore "github.com/streamingfast/firehose-core" + "github.com/streamingfast/firehose-core/launcher" + nodeManager "github.com/streamingfast/firehose-core/node-manager" + "github.com/streamingfast/firehose-core/node-manager/app/firehose_reader" + "github.com/streamingfast/firehose-core/node-manager/metrics" + "github.com/streamingfast/logging" + "go.uber.org/zap" +) + +func RegisterReaderNodeFirehoseApp[B firecore.Block](chain *firecore.Chain[B], rootLog *zap.Logger) { + appLogger, appTracer := logging.PackageLogger("reader-node-firehose", chain.LoggerPackageID("reader-node-firehose")) + + launcher.RegisterApp(rootLog, &launcher.AppDef{ + ID: "reader-node-firehose", + Title: "Reader Node (Firehose)", + Description: "Blocks reading node, unmanaged, reads Firehose logs from standard input and transform them into Firehose chain specific blocks", + RegisterFlags: func(cmd *cobra.Command) error { + cmd.Flags().String("reader-node-firehose-endpoint", "", "Firehose endpoint to connect to.") + cmd.Flags().String("reader-node-firehose-state", "{data-dir}/reader/state", "State file to store the cursor from the Firehose connection in.") + cmd.Flags().String("reader-node-firehose-compression", "zstd", "Firehose compression, one of 'gzip', 'zstd' or 'none'.") + cmd.Flags().Bool("reader-node-firehose-insecure", false, "Skip TLS validation when connecting to a Firehose endpoint.") + cmd.Flags().Bool("reader-node-firehose-plaintext", false, "Connect to a Firehose endpoint using a non-encrypted, plaintext connection.") + + return nil + }, + FactoryFunc: func(runtime *launcher.Runtime) (launcher.App, error) { + sfDataDir := runtime.AbsDataDir + archiveStoreURL := firecore.MustReplaceDataDir(sfDataDir, viper.GetString("common-one-block-store-url")) + + metricID := "reader-node-firehose" + headBlockTimeDrift := metrics.NewHeadBlockTimeDrift(metricID) + headBlockNumber := metrics.NewHeadBlockNumber(metricID) + appReadiness := metrics.NewAppReadiness(metricID) + metricsAndReadinessManager := nodeManager.NewMetricsAndReadinessManager(headBlockTimeDrift, headBlockNumber, appReadiness, viper.GetDuration("reader-node-readiness-max-latency")) + + return firehose_reader.New(&firehose_reader.Config{ + GRPCAddr: viper.GetString("reader-node-grpc-listen-addr"), + OneBlocksStoreURL: archiveStoreURL, + MindReadBlocksChanCapacity: viper.GetInt("reader-node-blocks-chan-capacity"), + StartBlockNum: viper.GetUint64("reader-node-start-block-num"), + StopBlockNum: viper.GetUint64("reader-node-stop-block-num"), + WorkingDir: firecore.MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-working-dir")), + OneBlockSuffix: viper.GetString("reader-node-one-block-suffix"), + + FirehoseEndpoint: viper.GetString("reader-node-firehose-endpoint"), + FirehoseStateFile: firecore.MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-firehose-state")), + FirehoseInsecureConn: viper.GetBool("reader-node-firehose-insecure"), + FirehosePlaintextConn: viper.GetBool("reader-node-firehose-plaintext"), + FirehoseCompression: viper.GetString("reader-node-firehose-compression"), + }, &firehose_reader.Modules{ + MetricsAndReadinessManager: metricsAndReadinessManager, + }, appLogger, appTracer), nil + }, + }) +} diff --git a/cmd/main.go b/cmd/main.go index d0774a0..1fd3953 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -101,6 +101,7 @@ func Main[B firecore.Block](chain *firecore.Chain[B]) { registerCommonFlags(chain) apps.RegisterReaderNodeApp(chain, rootLog) apps.RegisterReaderNodeStdinApp(chain, rootLog) + apps.RegisterReaderNodeFirehoseApp(chain, rootLog) apps.RegisterMergerApp(rootLog) apps.RegisterRelayerApp(rootLog) apps.RegisterFirehoseApp(chain, rootLog) diff --git a/node-manager/app/firehose_reader/app.go b/node-manager/app/firehose_reader/app.go new file mode 100644 index 0000000..fe43509 --- /dev/null +++ b/node-manager/app/firehose_reader/app.go @@ -0,0 +1,146 @@ +// Copyright 2019 dfuse Platform Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package firehose_reader + +import ( + "fmt" + "github.com/streamingfast/bstream/blockstream" + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + dgrpcserver "github.com/streamingfast/dgrpc/server" + dgrpcfactory "github.com/streamingfast/dgrpc/server/factory" + nodeManager "github.com/streamingfast/firehose-core/node-manager" + "github.com/streamingfast/firehose-core/node-manager/mindreader" + "github.com/streamingfast/logging" + pbheadinfo "github.com/streamingfast/pbgo/sf/headinfo/v1" + "github.com/streamingfast/shutter" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +type Config struct { + GRPCAddr string + OneBlocksStoreURL string + OneBlockSuffix string + MindReadBlocksChanCapacity int + StartBlockNum uint64 + StopBlockNum uint64 + WorkingDir string + LogToZap bool + DebugDeepMind bool + + FirehoseEndpoint string + FirehoseStateFile string + FirehosePlaintextConn bool + FirehoseInsecureConn bool + FirehoseCompression string +} + +type Modules struct { + MetricsAndReadinessManager *nodeManager.MetricsAndReadinessManager + RegisterGRPCService func(server grpc.ServiceRegistrar) error +} + +type App struct { + *shutter.Shutter + Config *Config + ReadyFunc func() + modules *Modules + zlogger *zap.Logger + tracer logging.Tracer +} + +func New(c *Config, modules *Modules, zlogger *zap.Logger, tracer logging.Tracer) *App { + n := &App{ + Shutter: shutter.New(), + Config: c, + ReadyFunc: func() {}, + modules: modules, + zlogger: zlogger, + tracer: tracer, + } + return n +} + +func (a *App) Run() error { + a.zlogger.Info("launching reader-node-firehose app (reading from firehose)", zap.Reflect("config", a.Config)) + + gs := dgrpcfactory.ServerFromOptions(dgrpcserver.WithLogger(a.zlogger)) + + blockStreamServer := blockstream.NewUnmanagedServer( + blockstream.ServerOptionWithLogger(a.zlogger), + blockstream.ServerOptionWithBuffer(1), + ) + + firehoseReader, err := NewFirehoseReader(a.Config.FirehoseEndpoint, a.Config.FirehoseCompression, a.Config.FirehoseInsecureConn, a.Config.FirehosePlaintextConn, a.zlogger) + if err != nil { + return err + } + + a.zlogger.Info("launching reader log plugin") + mindreaderLogPlugin, err := mindreader.NewMindReaderPlugin( + a.Config.OneBlocksStoreURL, + a.Config.WorkingDir, + firehoseReader.NoopConsoleReader, + a.Config.StartBlockNum, + a.Config.StopBlockNum, + a.Config.MindReadBlocksChanCapacity, + a.modules.MetricsAndReadinessManager.UpdateHeadBlock, + func(_ error) {}, + a.Config.OneBlockSuffix, + blockStreamServer, + a.zlogger, + a.tracer, + ) + if err != nil { + return err + } + + a.zlogger.Debug("configuring shutter") + mindreaderLogPlugin.OnTerminated(a.Shutdown) + a.OnTerminating(mindreaderLogPlugin.Shutdown) + + serviceRegistrar := gs.ServiceRegistrar() + pbheadinfo.RegisterHeadInfoServer(serviceRegistrar, blockStreamServer) + pbbstream.RegisterBlockStreamServer(serviceRegistrar, blockStreamServer) + + if a.modules.RegisterGRPCService != nil { + err := a.modules.RegisterGRPCService(gs.ServiceRegistrar()) + if err != nil { + return fmt.Errorf("register extra grpc service: %w", err) + } + } + gs.OnTerminated(a.Shutdown) + go gs.Launch(a.Config.GRPCAddr) + + a.zlogger.Debug("launching firehose reader") + err = firehoseReader.Launch(a.Config.StartBlockNum, a.Config.StopBlockNum, a.Config.FirehoseStateFile) + if err != nil { + return err + } + + a.zlogger.Debug("running reader log plugin") + mindreaderLogPlugin.Launch() + go a.modules.MetricsAndReadinessManager.Launch() + + return nil +} + +func (a *App) OnReady(f func()) { + a.ReadyFunc = f +} + +func (a *App) IsReady() bool { + return true +} diff --git a/node-manager/app/firehose_reader/console_reader.go b/node-manager/app/firehose_reader/console_reader.go new file mode 100644 index 0000000..1b69dce --- /dev/null +++ b/node-manager/app/firehose_reader/console_reader.go @@ -0,0 +1,114 @@ +package firehose_reader + +import ( + "context" + "errors" + "fmt" + "github.com/mostynb/go-grpc-compression/zstd" + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + "github.com/streamingfast/firehose-core/firehose/client" + "github.com/streamingfast/firehose-core/node-manager/mindreader" + pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/encoding/gzip" + "os" +) + +type FirehoseReader struct { + firehoseClient pbfirehose.StreamClient + firehoseStream pbfirehose.Stream_BlocksClient + closeFunc func() error + callOpts []grpc.CallOption + zlogger *zap.Logger + cursorStateFile string +} + +func NewFirehoseReader(endpoint, compression string, insecure, plaintext bool, zlogger *zap.Logger) (*FirehoseReader, error) { + firehoseClient, closeFunc, callOpts, err := client.NewFirehoseClient(endpoint, "", "", insecure, plaintext) + if err != nil { + return nil, err + } + + switch compression { + case "gzip": + callOpts = append(callOpts, grpc.UseCompressor(gzip.Name)) + case "zstd": + callOpts = append(callOpts, grpc.UseCompressor(zstd.Name)) + case "none": + default: + return nil, fmt.Errorf("invalid compression: %q, must be one of 'gzip', 'zstd' or 'none'", compression) + } + + res := &FirehoseReader{ + firehoseClient: firehoseClient, + closeFunc: closeFunc, + callOpts: callOpts, + zlogger: zlogger, + } + + return res, nil +} + +func (f *FirehoseReader) Launch(startBlock, stopBlock uint64, cursorFile string) error { + + cursor, err := os.ReadFile(cursorFile) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("unable to read cursor file: %w", err) + } + + if len(cursor) > 0 { + f.zlogger.Info("found cursor file, ignoring start block number", zap.String("cursor", string(cursor)), zap.String("state_file", cursorFile)) + } + + stream, err := f.firehoseClient.Blocks(context.Background(), &pbfirehose.Request{ + StartBlockNum: int64(startBlock), + Cursor: string(cursor), + StopBlockNum: stopBlock, + FinalBlocksOnly: false, + }, f.callOpts...) + if err != nil { + return fmt.Errorf("failed to request block stream from Firehose: %w", err) + } + + f.firehoseStream = stream + f.cursorStateFile = cursorFile + + return nil +} + +func (f *FirehoseReader) NoopConsoleReader(_ chan string) (mindreader.ConsolerReader, error) { + return f, nil +} + +func (f *FirehoseReader) ReadBlock() (obj *pbbstream.Block, err error) { + + res, err := f.firehoseStream.Recv() + if err != nil { + return nil, err + } + + err = os.WriteFile(f.cursorStateFile, []byte(res.Cursor), 0644) + if err != nil { + return nil, fmt.Errorf("failed to write cursor to state file: %w", err) + } + + return &pbbstream.Block{ + Number: res.Metadata.Num, + Id: res.Metadata.Id, + ParentId: res.Metadata.ParentId, + Timestamp: res.Metadata.Time, + LibNum: res.Metadata.LibNum, + ParentNum: res.Metadata.ParentNum, + Payload: res.Block, + }, nil +} + +func (f *FirehoseReader) Done() <-chan interface{} { + //TODO implement me + panic("implement me") +} + +func (f *FirehoseReader) Close() error { + return f.closeFunc() +} From 52fa732a74bdeec30225411e910945e12eedebc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Tue, 30 Jul 2024 12:16:25 +0200 Subject: [PATCH 2/4] print stats --- .../app/firehose_reader/console_reader.go | 11 ++++ node-manager/app/firehose_reader/metrics.go | 11 ++++ .../app/firehose_reader/reader_stats.go | 55 +++++++++++++++++++ 3 files changed, 77 insertions(+) create mode 100644 node-manager/app/firehose_reader/metrics.go create mode 100644 node-manager/app/firehose_reader/reader_stats.go diff --git a/node-manager/app/firehose_reader/console_reader.go b/node-manager/app/firehose_reader/console_reader.go index 1b69dce..514c4b0 100644 --- a/node-manager/app/firehose_reader/console_reader.go +++ b/node-manager/app/firehose_reader/console_reader.go @@ -13,6 +13,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/encoding/gzip" "os" + "time" ) type FirehoseReader struct { @@ -22,6 +23,7 @@ type FirehoseReader struct { callOpts []grpc.CallOption zlogger *zap.Logger cursorStateFile string + stats *firehoseReaderStats } func NewFirehoseReader(endpoint, compression string, insecure, plaintext bool, zlogger *zap.Logger) (*FirehoseReader, error) { @@ -45,6 +47,7 @@ func NewFirehoseReader(endpoint, compression string, insecure, plaintext bool, z closeFunc: closeFunc, callOpts: callOpts, zlogger: zlogger, + stats: newFirehoseReaderStats(), } return res, nil @@ -73,6 +76,7 @@ func (f *FirehoseReader) Launch(startBlock, stopBlock uint64, cursorFile string) f.firehoseStream = stream f.cursorStateFile = cursorFile + f.stats.StartPeriodicLogToZap(context.Background(), f.zlogger, 10*time.Second) return nil } @@ -93,6 +97,12 @@ func (f *FirehoseReader) ReadBlock() (obj *pbbstream.Block, err error) { return nil, fmt.Errorf("failed to write cursor to state file: %w", err) } + BlockReadCount.Inc() + f.stats.lastBlock = pbbstream.BlockRef{ + Num: res.Metadata.Num, + Id: res.Metadata.Id, + } + return &pbbstream.Block{ Number: res.Metadata.Num, Id: res.Metadata.Id, @@ -110,5 +120,6 @@ func (f *FirehoseReader) Done() <-chan interface{} { } func (f *FirehoseReader) Close() error { + f.stats.StopPeriodicLogToZap() return f.closeFunc() } diff --git a/node-manager/app/firehose_reader/metrics.go b/node-manager/app/firehose_reader/metrics.go new file mode 100644 index 0000000..68677cf --- /dev/null +++ b/node-manager/app/firehose_reader/metrics.go @@ -0,0 +1,11 @@ +package firehose_reader + +import "github.com/streamingfast/dmetrics" + +var metrics = dmetrics.NewSet(dmetrics.PrefixNameWith("reader_node_firehose")) + +func init() { + metrics.Register() +} + +var BlockReadCount = metrics.NewCounter("block_read_count", "The number of blocks read by the Firehose reader") diff --git a/node-manager/app/firehose_reader/reader_stats.go b/node-manager/app/firehose_reader/reader_stats.go new file mode 100644 index 0000000..2df596f --- /dev/null +++ b/node-manager/app/firehose_reader/reader_stats.go @@ -0,0 +1,55 @@ +package firehose_reader + +import ( + "context" + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + "github.com/streamingfast/dmetrics" + "go.uber.org/zap" + "time" +) + +type firehoseReaderStats struct { + lastBlock pbbstream.BlockRef + blockRate *dmetrics.AvgRatePromCounter + + cancelPeriodicLogger context.CancelFunc +} + +func newFirehoseReaderStats() *firehoseReaderStats { + return &firehoseReaderStats{ + lastBlock: pbbstream.BlockRef{}, + blockRate: dmetrics.MustNewAvgRateFromPromCounter(BlockReadCount, 1*time.Second, 30*time.Second, "blocks"), + } +} + +func (s *firehoseReaderStats) StartPeriodicLogToZap(ctx context.Context, logger *zap.Logger, logEach time.Duration) { + ctx, s.cancelPeriodicLogger = context.WithCancel(ctx) + + go func() { + ticker := time.NewTicker(logEach) + for { + select { + case <-ticker.C: + logger.Info("reader node statistics", s.ZapFields()...) + case <-ctx.Done(): + return + } + } + }() +} + +func (s *firehoseReaderStats) StopPeriodicLogToZap() { + if s.cancelPeriodicLogger != nil { + s.cancelPeriodicLogger() + } +} + +func (s *firehoseReaderStats) ZapFields() []zap.Field { + fields := []zap.Field{ + zap.Stringer("block_rate", s.blockRate), + zap.Uint64("last_block_num", s.lastBlock.Num), + zap.String("last_block_id", s.lastBlock.Id), + } + + return fields +} From 9067cc18c5949b9669cd7d1fe388662e248a034e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Wed, 4 Sep 2024 16:01:03 +0200 Subject: [PATCH 3/4] add firehose authentication --- cmd/apps/reader_node_firehose.go | 18 ++++++---- node-manager/app/firehose_reader/app.go | 19 +++++++---- .../app/firehose_reader/console_reader.go | 33 +++++++++++++++---- 3 files changed, 50 insertions(+), 20 deletions(-) diff --git a/cmd/apps/reader_node_firehose.go b/cmd/apps/reader_node_firehose.go index 42a0241..d8c6e2d 100644 --- a/cmd/apps/reader_node_firehose.go +++ b/cmd/apps/reader_node_firehose.go @@ -24,6 +24,7 @@ import ( "github.com/streamingfast/firehose-core/node-manager/metrics" "github.com/streamingfast/logging" "go.uber.org/zap" + "os" ) func RegisterReaderNodeFirehoseApp[B firecore.Block](chain *firecore.Chain[B], rootLog *zap.Logger) { @@ -39,6 +40,8 @@ func RegisterReaderNodeFirehoseApp[B firecore.Block](chain *firecore.Chain[B], r cmd.Flags().String("reader-node-firehose-compression", "zstd", "Firehose compression, one of 'gzip', 'zstd' or 'none'.") cmd.Flags().Bool("reader-node-firehose-insecure", false, "Skip TLS validation when connecting to a Firehose endpoint.") cmd.Flags().Bool("reader-node-firehose-plaintext", false, "Connect to a Firehose endpoint using a non-encrypted, plaintext connection.") + cmd.Flags().String("reader-node-firehose-api-key-env-var", "FIREHOSE_API_KEY", "Look for an API key directly in this environment variable to authenticate against endpoint (alternative to api-token-env-var)") + cmd.Flags().String("reader-node-firehose-api-token-env-var", "FIREHOSE_API_TOKEN", "Look for a JWT in this environment variable to authenticate against endpoint (alternative to api-key-env-var)") return nil }, @@ -51,7 +54,6 @@ func RegisterReaderNodeFirehoseApp[B firecore.Block](chain *firecore.Chain[B], r headBlockNumber := metrics.NewHeadBlockNumber(metricID) appReadiness := metrics.NewAppReadiness(metricID) metricsAndReadinessManager := nodeManager.NewMetricsAndReadinessManager(headBlockTimeDrift, headBlockNumber, appReadiness, viper.GetDuration("reader-node-readiness-max-latency")) - return firehose_reader.New(&firehose_reader.Config{ GRPCAddr: viper.GetString("reader-node-grpc-listen-addr"), OneBlocksStoreURL: archiveStoreURL, @@ -61,11 +63,15 @@ func RegisterReaderNodeFirehoseApp[B firecore.Block](chain *firecore.Chain[B], r WorkingDir: firecore.MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-working-dir")), OneBlockSuffix: viper.GetString("reader-node-one-block-suffix"), - FirehoseEndpoint: viper.GetString("reader-node-firehose-endpoint"), - FirehoseStateFile: firecore.MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-firehose-state")), - FirehoseInsecureConn: viper.GetBool("reader-node-firehose-insecure"), - FirehosePlaintextConn: viper.GetBool("reader-node-firehose-plaintext"), - FirehoseCompression: viper.GetString("reader-node-firehose-compression"), + FirehoseConfig: firehose_reader.FirehoseConfig{ + Endpoint: viper.GetString("reader-node-firehose-endpoint"), + StateFile: firecore.MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-firehose-state")), + InsecureConn: viper.GetBool("reader-node-firehose-insecure"), + PlaintextConn: viper.GetBool("reader-node-firehose-plaintext"), + Compression: viper.GetString("reader-node-firehose-compression"), + ApiKey: os.Getenv(viper.GetString("reader-node-firehose-api-token-env-var")), + Jwt: os.Getenv(viper.GetString("reader-node-firehose-api-key-env-var")), + }, }, &firehose_reader.Modules{ MetricsAndReadinessManager: metricsAndReadinessManager, }, appLogger, appTracer), nil diff --git a/node-manager/app/firehose_reader/app.go b/node-manager/app/firehose_reader/app.go index fe43509..4f1c044 100644 --- a/node-manager/app/firehose_reader/app.go +++ b/node-manager/app/firehose_reader/app.go @@ -39,12 +39,17 @@ type Config struct { WorkingDir string LogToZap bool DebugDeepMind bool + FirehoseConfig FirehoseConfig +} - FirehoseEndpoint string - FirehoseStateFile string - FirehosePlaintextConn bool - FirehoseInsecureConn bool - FirehoseCompression string +type FirehoseConfig struct { + Endpoint string + StateFile string + PlaintextConn bool + InsecureConn bool + ApiKey string + Jwt string + Compression string } type Modules struct { @@ -83,7 +88,7 @@ func (a *App) Run() error { blockstream.ServerOptionWithBuffer(1), ) - firehoseReader, err := NewFirehoseReader(a.Config.FirehoseEndpoint, a.Config.FirehoseCompression, a.Config.FirehoseInsecureConn, a.Config.FirehosePlaintextConn, a.zlogger) + firehoseReader, err := NewFirehoseReader(a.Config.FirehoseConfig, a.zlogger) if err != nil { return err } @@ -125,7 +130,7 @@ func (a *App) Run() error { go gs.Launch(a.Config.GRPCAddr) a.zlogger.Debug("launching firehose reader") - err = firehoseReader.Launch(a.Config.StartBlockNum, a.Config.StopBlockNum, a.Config.FirehoseStateFile) + err = firehoseReader.Launch(a.Config.StartBlockNum, a.Config.StopBlockNum, a.Config.FirehoseConfig.StateFile) if err != nil { return err } diff --git a/node-manager/app/firehose_reader/console_reader.go b/node-manager/app/firehose_reader/console_reader.go index 514c4b0..7c153a2 100644 --- a/node-manager/app/firehose_reader/console_reader.go +++ b/node-manager/app/firehose_reader/console_reader.go @@ -23,23 +23,25 @@ type FirehoseReader struct { callOpts []grpc.CallOption zlogger *zap.Logger cursorStateFile string + cursor string stats *firehoseReaderStats } -func NewFirehoseReader(endpoint, compression string, insecure, plaintext bool, zlogger *zap.Logger) (*FirehoseReader, error) { - firehoseClient, closeFunc, callOpts, err := client.NewFirehoseClient(endpoint, "", "", insecure, plaintext) +func NewFirehoseReader(config FirehoseConfig, zlogger *zap.Logger) (*FirehoseReader, error) { + + firehoseClient, closeFunc, callOpts, err := client.NewFirehoseClient(config.Endpoint, config.Jwt, config.ApiKey, config.InsecureConn, config.PlaintextConn) if err != nil { return nil, err } - switch compression { + switch config.Compression { case "gzip": callOpts = append(callOpts, grpc.UseCompressor(gzip.Name)) case "zstd": callOpts = append(callOpts, grpc.UseCompressor(zstd.Name)) case "none": default: - return nil, fmt.Errorf("invalid compression: %q, must be one of 'gzip', 'zstd' or 'none'", compression) + return nil, fmt.Errorf("invalid compression: %q, must be one of 'gzip', 'zstd' or 'none'", config.Compression) } res := &FirehoseReader{ @@ -61,7 +63,7 @@ func (f *FirehoseReader) Launch(startBlock, stopBlock uint64, cursorFile string) } if len(cursor) > 0 { - f.zlogger.Info("found cursor file, ignoring start block number", zap.String("cursor", string(cursor)), zap.String("state_file", cursorFile)) + f.zlogger.Info("found state file, continuing previous run", zap.String("cursor", string(cursor)), zap.String("state_file", cursorFile)) } stream, err := f.firehoseClient.Blocks(context.Background(), &pbfirehose.Request{ @@ -92,10 +94,13 @@ func (f *FirehoseReader) ReadBlock() (obj *pbbstream.Block, err error) { return nil, err } - err = os.WriteFile(f.cursorStateFile, []byte(res.Cursor), 0644) + // We don't write the current cursor here, but the one from the previous block. In case an error happens downstream, + // we need to ensure that the current block is included after a restart. + err = f.writeCursor() if err != nil { - return nil, fmt.Errorf("failed to write cursor to state file: %w", err) + return nil, err } + f.cursor = res.Cursor BlockReadCount.Inc() f.stats.lastBlock = pbbstream.BlockRef{ @@ -120,6 +125,20 @@ func (f *FirehoseReader) Done() <-chan interface{} { } func (f *FirehoseReader) Close() error { + _ = f.writeCursor() f.stats.StopPeriodicLogToZap() return f.closeFunc() } + +func (f *FirehoseReader) writeCursor() error { + if f.cursor == "" { + return nil + } + + err := os.WriteFile(f.cursorStateFile, []byte(f.cursor), 0644) + if err != nil { + return fmt.Errorf("failed to write cursor to state file: %w", err) + } + + return nil +} From d5ec1077a4e951bbee4b5d719cba87f7ea02dfe2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Wed, 4 Sep 2024 16:20:38 +0200 Subject: [PATCH 4/4] add Changelog, minor fixes --- CHANGELOG.md | 4 ++++ cmd/apps/reader_node_firehose.go | 6 +++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f827a67..d19b81c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ Operators, you should copy/paste content of this content straight to your projec If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you should copy the content between those 2 version to your own repository, replacing placeholder value `fire{chain}` with your chain's own binary. +## Unreleased + +* Add `reader-node-firehose` which creates one-blocks by consuming blocks from an already existing Firehose endpoint. This can be used to set up an indexer stack without having to run an instrumented blockchain node. + ## v1.6.1 * Bump substreams to v1.10.0: Version 1.10.0 adds a new `EndpointInfo/Info` endpoint, introduces a 3-minute default execution timeout per block, updates metering metrics with a deprecation warning, enhances `substreams init` commands, and improves wasm module caching and Prometheus tool flexibility. Full changelog: https://github.com/streamingfast/substreams/releases/tag/v1.10.0 diff --git a/cmd/apps/reader_node_firehose.go b/cmd/apps/reader_node_firehose.go index d8c6e2d..f2b2f73 100644 --- a/cmd/apps/reader_node_firehose.go +++ b/cmd/apps/reader_node_firehose.go @@ -33,7 +33,7 @@ func RegisterReaderNodeFirehoseApp[B firecore.Block](chain *firecore.Chain[B], r launcher.RegisterApp(rootLog, &launcher.AppDef{ ID: "reader-node-firehose", Title: "Reader Node (Firehose)", - Description: "Blocks reading node, unmanaged, reads Firehose logs from standard input and transform them into Firehose chain specific blocks", + Description: "Blocks reading node, consumes blocks from an already existing Firehose endpoint. This can be used to set up an indexer stack without having to run an instrumented blockchain node.", RegisterFlags: func(cmd *cobra.Command) error { cmd.Flags().String("reader-node-firehose-endpoint", "", "Firehose endpoint to connect to.") cmd.Flags().String("reader-node-firehose-state", "{data-dir}/reader/state", "State file to store the cursor from the Firehose connection in.") @@ -69,8 +69,8 @@ func RegisterReaderNodeFirehoseApp[B firecore.Block](chain *firecore.Chain[B], r InsecureConn: viper.GetBool("reader-node-firehose-insecure"), PlaintextConn: viper.GetBool("reader-node-firehose-plaintext"), Compression: viper.GetString("reader-node-firehose-compression"), - ApiKey: os.Getenv(viper.GetString("reader-node-firehose-api-token-env-var")), - Jwt: os.Getenv(viper.GetString("reader-node-firehose-api-key-env-var")), + ApiKey: os.Getenv(viper.GetString("reader-node-firehose-api-key-env-var")), + Jwt: os.Getenv(viper.GetString("reader-node-firehose-api-token-env-var")), }, }, &firehose_reader.Modules{ MetricsAndReadinessManager: metricsAndReadinessManager,