Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add reader-node-firehose #63

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@
.envrc
.env
.DS_Store
firehose-data*
firehose-data*
/firecore
/firehose.yaml
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ Operators, you should copy/paste content of this content straight to your projec

If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you should copy the content between those 2 version to your own repository, replacing placeholder value `fire{chain}` with your chain's own binary.

## Unreleased

* Add `reader-node-firehose` which creates one-blocks by consuming blocks from an already existing Firehose endpoint. This can be used to set up an indexer stack without having to run an instrumented blockchain node.

## v1.6.1

* Bump substreams to v1.10.0: Version 1.10.0 adds a new `EndpointInfo/Info` endpoint, introduces a 3-minute default execution timeout per block, updates metering metrics with a deprecation warning, enhances `substreams init` commands, and improves wasm module caching and Prometheus tool flexibility. Full changelog: https://github.com/streamingfast/substreams/releases/tag/v1.10.0
Expand Down
80 changes: 80 additions & 0 deletions cmd/apps/reader_node_firehose.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2021 dfuse Platform Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apps

import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/launcher"
nodeManager "github.com/streamingfast/firehose-core/node-manager"
"github.com/streamingfast/firehose-core/node-manager/app/firehose_reader"
"github.com/streamingfast/firehose-core/node-manager/metrics"
"github.com/streamingfast/logging"
"go.uber.org/zap"
"os"
)

func RegisterReaderNodeFirehoseApp[B firecore.Block](chain *firecore.Chain[B], rootLog *zap.Logger) {
appLogger, appTracer := logging.PackageLogger("reader-node-firehose", chain.LoggerPackageID("reader-node-firehose"))

launcher.RegisterApp(rootLog, &launcher.AppDef{
ID: "reader-node-firehose",
Title: "Reader Node (Firehose)",
Description: "Blocks reading node, consumes blocks from an already existing Firehose endpoint. This can be used to set up an indexer stack without having to run an instrumented blockchain node.",
RegisterFlags: func(cmd *cobra.Command) error {
cmd.Flags().String("reader-node-firehose-endpoint", "", "Firehose endpoint to connect to.")
cmd.Flags().String("reader-node-firehose-state", "{data-dir}/reader/state", "State file to store the cursor from the Firehose connection in.")
cmd.Flags().String("reader-node-firehose-compression", "zstd", "Firehose compression, one of 'gzip', 'zstd' or 'none'.")
cmd.Flags().Bool("reader-node-firehose-insecure", false, "Skip TLS validation when connecting to a Firehose endpoint.")
cmd.Flags().Bool("reader-node-firehose-plaintext", false, "Connect to a Firehose endpoint using a non-encrypted, plaintext connection.")
cmd.Flags().String("reader-node-firehose-api-key-env-var", "FIREHOSE_API_KEY", "Look for an API key directly in this environment variable to authenticate against endpoint (alternative to api-token-env-var)")
cmd.Flags().String("reader-node-firehose-api-token-env-var", "FIREHOSE_API_TOKEN", "Look for a JWT in this environment variable to authenticate against endpoint (alternative to api-key-env-var)")

return nil
},
FactoryFunc: func(runtime *launcher.Runtime) (launcher.App, error) {
sfDataDir := runtime.AbsDataDir
archiveStoreURL := firecore.MustReplaceDataDir(sfDataDir, viper.GetString("common-one-block-store-url"))

metricID := "reader-node-firehose"
headBlockTimeDrift := metrics.NewHeadBlockTimeDrift(metricID)
headBlockNumber := metrics.NewHeadBlockNumber(metricID)
appReadiness := metrics.NewAppReadiness(metricID)
metricsAndReadinessManager := nodeManager.NewMetricsAndReadinessManager(headBlockTimeDrift, headBlockNumber, appReadiness, viper.GetDuration("reader-node-readiness-max-latency"))
return firehose_reader.New(&firehose_reader.Config{
GRPCAddr: viper.GetString("reader-node-grpc-listen-addr"),
OneBlocksStoreURL: archiveStoreURL,
MindReadBlocksChanCapacity: viper.GetInt("reader-node-blocks-chan-capacity"),
StartBlockNum: viper.GetUint64("reader-node-start-block-num"),
StopBlockNum: viper.GetUint64("reader-node-stop-block-num"),
WorkingDir: firecore.MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-working-dir")),
OneBlockSuffix: viper.GetString("reader-node-one-block-suffix"),

FirehoseConfig: firehose_reader.FirehoseConfig{
Endpoint: viper.GetString("reader-node-firehose-endpoint"),
StateFile: firecore.MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-firehose-state")),
InsecureConn: viper.GetBool("reader-node-firehose-insecure"),
PlaintextConn: viper.GetBool("reader-node-firehose-plaintext"),
Compression: viper.GetString("reader-node-firehose-compression"),
ApiKey: os.Getenv(viper.GetString("reader-node-firehose-api-key-env-var")),
Jwt: os.Getenv(viper.GetString("reader-node-firehose-api-token-env-var")),
},
}, &firehose_reader.Modules{
MetricsAndReadinessManager: metricsAndReadinessManager,
}, appLogger, appTracer), nil
},
})
}
1 change: 1 addition & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func Main[B firecore.Block](chain *firecore.Chain[B]) {
registerCommonFlags(chain)
apps.RegisterReaderNodeApp(chain, rootLog)
apps.RegisterReaderNodeStdinApp(chain, rootLog)
apps.RegisterReaderNodeFirehoseApp(chain, rootLog)
apps.RegisterMergerApp(rootLog)
apps.RegisterRelayerApp(rootLog)
apps.RegisterFirehoseApp(chain, rootLog)
Expand Down
151 changes: 151 additions & 0 deletions node-manager/app/firehose_reader/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright 2019 dfuse Platform Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package firehose_reader

import (
"fmt"
"github.com/streamingfast/bstream/blockstream"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
dgrpcserver "github.com/streamingfast/dgrpc/server"
dgrpcfactory "github.com/streamingfast/dgrpc/server/factory"
nodeManager "github.com/streamingfast/firehose-core/node-manager"
"github.com/streamingfast/firehose-core/node-manager/mindreader"
"github.com/streamingfast/logging"
pbheadinfo "github.com/streamingfast/pbgo/sf/headinfo/v1"
"github.com/streamingfast/shutter"
"go.uber.org/zap"
"google.golang.org/grpc"
)

type Config struct {
GRPCAddr string
OneBlocksStoreURL string
OneBlockSuffix string
MindReadBlocksChanCapacity int
StartBlockNum uint64
StopBlockNum uint64
WorkingDir string
LogToZap bool
DebugDeepMind bool
FirehoseConfig FirehoseConfig
}

type FirehoseConfig struct {
Endpoint string
StateFile string
PlaintextConn bool
InsecureConn bool
ApiKey string
Jwt string
Compression string
}

type Modules struct {
MetricsAndReadinessManager *nodeManager.MetricsAndReadinessManager
RegisterGRPCService func(server grpc.ServiceRegistrar) error
}

type App struct {
*shutter.Shutter
Config *Config
ReadyFunc func()
modules *Modules
zlogger *zap.Logger
tracer logging.Tracer
}

func New(c *Config, modules *Modules, zlogger *zap.Logger, tracer logging.Tracer) *App {
n := &App{
Shutter: shutter.New(),
Config: c,
ReadyFunc: func() {},
modules: modules,
zlogger: zlogger,
tracer: tracer,
}
return n
}

func (a *App) Run() error {
a.zlogger.Info("launching reader-node-firehose app (reading from firehose)", zap.Reflect("config", a.Config))

gs := dgrpcfactory.ServerFromOptions(dgrpcserver.WithLogger(a.zlogger))

blockStreamServer := blockstream.NewUnmanagedServer(
blockstream.ServerOptionWithLogger(a.zlogger),
blockstream.ServerOptionWithBuffer(1),
)

firehoseReader, err := NewFirehoseReader(a.Config.FirehoseConfig, a.zlogger)
if err != nil {
return err
}

a.zlogger.Info("launching reader log plugin")
mindreaderLogPlugin, err := mindreader.NewMindReaderPlugin(
a.Config.OneBlocksStoreURL,
a.Config.WorkingDir,
firehoseReader.NoopConsoleReader,
a.Config.StartBlockNum,
a.Config.StopBlockNum,
a.Config.MindReadBlocksChanCapacity,
a.modules.MetricsAndReadinessManager.UpdateHeadBlock,
func(_ error) {},
a.Config.OneBlockSuffix,
blockStreamServer,
a.zlogger,
a.tracer,
)
if err != nil {
return err
}

a.zlogger.Debug("configuring shutter")
mindreaderLogPlugin.OnTerminated(a.Shutdown)
a.OnTerminating(mindreaderLogPlugin.Shutdown)
Comment on lines +96 to +117
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, we would avoid completely the mindreader plugin, there is overall little reason to use outside avoiding re-implementing two things:

  • Save received one block to one-blocks-store
  • Emit block on block stream server

I understand it make the implementation easier. But I have the overall feeling that it will make a much simpler implementation since we will move away from the node-manager.

We should also move to something else than node-manager as the top-level package.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Should I move it into its own top-level package or is there another package to include it in?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new top-level reader-node-firehose seems good, the package <name> could be shorter if needed.


serviceRegistrar := gs.ServiceRegistrar()
pbheadinfo.RegisterHeadInfoServer(serviceRegistrar, blockStreamServer)
pbbstream.RegisterBlockStreamServer(serviceRegistrar, blockStreamServer)

if a.modules.RegisterGRPCService != nil {
err := a.modules.RegisterGRPCService(gs.ServiceRegistrar())
if err != nil {
return fmt.Errorf("register extra grpc service: %w", err)
}
}
gs.OnTerminated(a.Shutdown)
go gs.Launch(a.Config.GRPCAddr)

a.zlogger.Debug("launching firehose reader")
err = firehoseReader.Launch(a.Config.StartBlockNum, a.Config.StopBlockNum, a.Config.FirehoseConfig.StateFile)
if err != nil {
return err
}

a.zlogger.Debug("running reader log plugin")
mindreaderLogPlugin.Launch()
go a.modules.MetricsAndReadinessManager.Launch()

return nil
}

func (a *App) OnReady(f func()) {
a.ReadyFunc = f
}

func (a *App) IsReady() bool {
return true
}
Loading