diff --git a/go.mod b/go.mod index c03829fe..3dc6d7b8 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/robfig/cron/v3 v3.0.1 github.com/smallnest/weighted v0.0.0-20230419055410-36b780e40a7a + github.com/smira/go-statsd v1.3.3 github.com/spf13/cobra v1.8.0 github.com/spf13/viper v1.18.2 github.com/stretchr/testify v1.8.4 diff --git a/go.sum b/go.sum index 8da4151b..97bec667 100644 --- a/go.sum +++ b/go.sum @@ -703,6 +703,8 @@ github.com/smallnest/weighted v0.0.0-20230419055410-36b780e40a7a h1:eieNTZmrnPzI github.com/smallnest/weighted v0.0.0-20230419055410-36b780e40a7a/go.mod h1:xc9CoZ+ZBGwajnWto5Aqw/wWg8euy4HtOr6K9Fxp9iw= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +github.com/smira/go-statsd v1.3.3 h1:WnMlmGTyMpzto+HvOJWRPoLaLlk5EGfzsnlQBcvj4yI= +github.com/smira/go-statsd v1.3.3/go.mod h1:RjdsESPgDODtg1VpVVf9MJrEW2Hw0wtRNbmB1CAhu6A= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= diff --git a/internal/config/config.go b/internal/config/config.go index b05e0bbe..0c2b3b5b 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -19,7 +19,6 @@ import ( "golang.org/x/xerrors" "github.com/coinbase/chainstorage/config" - "github.com/coinbase/chainstorage/internal/utils/consts" "github.com/coinbase/chainstorage/internal/utils/utils" "github.com/coinbase/chainstorage/protos/coinbase/c3/common" api "github.com/coinbase/chainstorage/protos/coinbase/chainstorage" @@ -40,6 +39,7 @@ type ( Cron CronConfig `mapstructure:"cron"` SLA SLAConfig `mapstructure:"sla"` FunctionalTest FunctionalTestConfig `mapstructure:"functional_test"` + StatsD *StatsDConfig `mapstructure:"statsd"` namespace string env Env @@ -396,6 +396,11 @@ type ( PerClientRPS int `mapstructure:"per_client_rps"` } + StatsDConfig struct { + Address string `mapstructure:"address" validate:"required"` + Prefix string `mapstructure:"prefix"` + } + ConfigOption func(options *configOptions) Env string @@ -460,6 +465,8 @@ const ( EnvVarNamespace = "CHAINSTORAGE_NAMESPACE" EnvVarConfigName = "CHAINSTORAGE_CONFIG" EnvVarEnvironment = "CHAINSTORAGE_ENVIRONMENT" + EnvVarConfigRoot = "CHAINSTORAGE_CONFIG_ROOT" + EnvVarConfigPath = "CHAINSTORAGE_CONFIG_PATH" EnvVarTestType = "TEST_TYPE" EnvVarCI = "CI" @@ -614,12 +621,12 @@ func getConfigName() string { return configName } -func GetProjectName() string { - projectName, ok := os.LookupEnv("CODEFLOW_PROJECT_NAME") - if !ok { - projectName = consts.ProjectName - } - return projectName +func GetConfigRoot() string { + return os.Getenv(EnvVarConfigRoot) +} + +func GetConfigPath() string { + return os.Getenv(EnvVarConfigPath) } func mergeInConfig(v *viper.Viper, configOpts *configOptions, env Env) error { @@ -854,17 +861,22 @@ func getConfigData(namespace string, env Env, blockchain common.Blockchain, netw networkName := strings.TrimPrefix(network.GetName(), blockchainName+"-") sidechainName := strings.TrimPrefix(sidechain.GetName(), blockchainName+"-"+networkName+"-") + configRoot := GetConfigRoot() if env == envSecrets { // .secrets.yml is intentionally not embedded in config.Store. // Read it from the file system instead. - _, filename, _, ok := runtime.Caller(0) - if !ok { - return nil, xerrors.Errorf("failed to recover the filename information") + // If configRoot is not set, use the default path. + if len(configRoot) == 0 { + _, filename, _, ok := runtime.Caller(0) + if !ok { + return nil, xerrors.Errorf("failed to recover the filename information") + } + rootDir := strings.TrimSuffix(filename, CurrentFileName) + configRoot = fmt.Sprintf("%v/config", rootDir) } - rootDir := strings.TrimSuffix(filename, CurrentFileName) - configPath := fmt.Sprintf("%v/config/%v/%v/%v/.secrets.yml", rootDir, namespace, blockchainName, networkName) + configPath := fmt.Sprintf("%v/%v/%v/%v/.secrets.yml", configRoot, namespace, blockchainName, networkName) if sidechain != api.SideChain_SIDECHAIN_NONE { - configPath = fmt.Sprintf("%v/config/%v/%v/%v/%v/.secrets.yml", rootDir, namespace, blockchainName, networkName, sidechainName) + configPath = fmt.Sprintf("%v/%v/%v/%v/%v/.secrets.yml", configRoot, namespace, blockchainName, networkName, sidechainName) } reader, err := os.Open(configPath) if err != nil { @@ -873,7 +885,26 @@ func getConfigData(namespace string, env Env, blockchain common.Blockchain, netw return reader, nil } - configPath := fmt.Sprintf("%v/%v/%v/%v.yml", namespace, blockchainName, networkName, env) + configPath := GetConfigPath() + // If configPath is not set, try to construct the file system path from configRoot. + if len(configPath) == 0 && len(configRoot) > 0 { + configPath = fmt.Sprintf("%v/%v/%v/%v/%v.yml", configRoot, namespace, blockchainName, networkName, env) + if sidechain != api.SideChain_SIDECHAIN_NONE { + configPath = fmt.Sprintf("%v/%v/%v/%v/%v/%v.yml", configRoot, namespace, blockchainName, networkName, sidechainName, env) + } + } + + // If either configRoot or configPath is set, read the config from the file system. + if len(configPath) > 0 { + reader, err := os.Open(configPath) + if err != nil { + return nil, xerrors.Errorf("failed to read config file %v: %w", configPath, err) + } + return reader, nil + } + + // Read the config from the embedded config.Store. + configPath = fmt.Sprintf("%v/%v/%v/%v.yml", namespace, blockchainName, networkName, env) if sidechain != api.SideChain_SIDECHAIN_NONE { configPath = fmt.Sprintf("%v/%v/%v/%v/%v.yml", namespace, blockchainName, networkName, sidechainName, env) } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 7c84d391..8c3d2a3a 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -398,6 +398,19 @@ func TestConfigOverridingByEnvSettings(t *testing.T) { }) } +func TestConfigOverrideConfigPath(t *testing.T) { + require := testutil.Require(t) + err := os.Setenv(config.EnvVarConfigPath, "../../config/chainstorage/ethereum/mainnet/base.yml") + require.NoError(err) + defer os.Unsetenv(config.EnvVarConfigPath) + + cfg, err := config.New() + require.NoError(err) + + require.Equal(common.Blockchain_BLOCKCHAIN_ETHEREUM, cfg.Blockchain()) + require.Equal(common.Network_NETWORK_ETHEREUM_MAINNET, cfg.Network()) +} + func TestEndpointParsing(t *testing.T) { require := testutil.Require(t) diff --git a/internal/dlq/sqs/dlq.go b/internal/dlq/sqs/dlq.go index 68f29004..0f07011e 100644 --- a/internal/dlq/sqs/dlq.go +++ b/internal/dlq/sqs/dlq.go @@ -69,10 +69,12 @@ var ( func New(params DLQParams) (DLQ, error) { client := sqs.New(params.Session) - metrics := params.Metrics.SubScope("dlq") + metrics := params.Metrics.SubScope("dlq").Tagged(map[string]string{ + "storage_type": "sqs", + }) impl := &dlqImpl{ config: params.Config, - logger: log.WithPackage(params.Logger), + logger: log.WithPackage(params.Logger).With(zap.String("storage_type", "sqs")), client: client, instrumentSendMessage: instrument.New(metrics, "send_message"), instrumentResendMessage: instrument.New(metrics, "resend_message"), diff --git a/internal/gateway/chainstorage_client.go b/internal/gateway/chainstorage_client.go index dd53273c..08db3e89 100644 --- a/internal/gateway/chainstorage_client.go +++ b/internal/gateway/chainstorage_client.go @@ -71,7 +71,6 @@ var ( ) func NewChainstorageClient(params Params) (Client, error) { - address := params.Config.SDK.ChainstorageAddress authHeader := params.Config.SDK.AuthHeader authToken := params.Config.SDK.AuthToken restful := params.Config.SDK.Restful @@ -128,7 +127,10 @@ func NewChainstorageClient(params Params) (Client, error) { grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(sendMsgSize), grpc.MaxCallRecvMsgSize(recvMsgSize)), } + address := cfg.serverAddress if strings.HasPrefix(address, "http://") || strings.Contains(address, "localhost") { + // Remove http:// prefix since grpc.DialContext does not support it. + address = strings.Replace(address, "http://", "", 1) opts = append(opts, grpc.WithInsecure()) } else { opts = append(opts, grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, ""))) @@ -155,7 +157,6 @@ func NewChainstorageClient(params Params) (Client, error) { zap.String("env", string(params.Config.Env())), zap.String("blockchain", params.Config.Chain.Blockchain.String()), zap.String("network", params.Config.Chain.Network.String()), - zap.String("address", address), zap.String("sidechain", params.Config.Chain.Sidechain.String()), zap.String("address", cfg.serverAddress), zap.String("client_id", cfg.clientID), diff --git a/internal/storage/blobstorage/gcs/blob_storage_integration_test.go b/internal/storage/blobstorage/gcs/blob_storage_integration_test.go index 288b2290..8fe90af5 100644 --- a/internal/storage/blobstorage/gcs/blob_storage_integration_test.go +++ b/internal/storage/blobstorage/gcs/blob_storage_integration_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + "github.com/stretchr/testify/suite" "go.uber.org/fx" "google.golang.org/protobuf/proto" diff --git a/internal/storage/blobstorage/module.go b/internal/storage/blobstorage/module.go index 3ec55b3c..5e0ccf7f 100644 --- a/internal/storage/blobstorage/module.go +++ b/internal/storage/blobstorage/module.go @@ -3,6 +3,7 @@ package blobstorage import ( "go.uber.org/fx" + "github.com/coinbase/chainstorage/internal/storage/blobstorage/gcs" "github.com/coinbase/chainstorage/internal/storage/blobstorage/internal" "github.com/coinbase/chainstorage/internal/storage/blobstorage/s3" ) @@ -16,4 +17,5 @@ type ( var Module = fx.Options( fx.Provide(internal.WithBlobStorageFactory), s3.Module, + gcs.Module, ) diff --git a/internal/storage/blobstorage/s3/blob_storage.go b/internal/storage/blobstorage/s3/blob_storage.go index 4a8d7e6e..b11c9cc8 100644 --- a/internal/storage/blobstorage/s3/blob_storage.go +++ b/internal/storage/blobstorage/s3/blob_storage.go @@ -84,7 +84,9 @@ func (f *blobStorageFactory) Create() (internal.BlobStorage, error) { } func New(params BlobStorageParams) (internal.BlobStorage, error) { - metrics := params.Metrics.SubScope("blob_storage") + metrics := params.Metrics.SubScope("blob_storage").Tagged(map[string]string{ + "storage_type": "s3", + }) return &blobStorageImpl{ logger: log.WithPackage(params.Logger), config: params.Config, @@ -235,6 +237,7 @@ func (s *blobStorageImpl) PreSign(ctx context.Context, objectKey string) (string func (s *blobStorageImpl) logDuration(method string, start time.Time) { s.logger.Debug( "blob_storage", + zap.String("storage_type", "s3"), zap.String("method", method), zap.Duration("duration", time.Since(start)), ) diff --git a/internal/storage/metastorage/dynamodb/block_storage.go b/internal/storage/metastorage/dynamodb/block_storage.go index 917e021b..a298fc75 100644 --- a/internal/storage/metastorage/dynamodb/block_storage.go +++ b/internal/storage/metastorage/dynamodb/block_storage.go @@ -73,7 +73,9 @@ func newBlockStorage(params Params) (internal.BlockStorage, error) { return nil, xerrors.Errorf("failed to create metadata accessor: %w", err) } - metrics := params.Metrics.SubScope("block_storage") + metrics := params.Metrics.SubScope("block_storage").Tagged(map[string]string{ + "storage_type": "dynamodb", + }) accessor := blockStorageImpl{ blockTable: metadataTable, blockStartHeight: params.Config.Chain.BlockStartHeight, diff --git a/internal/storage/metastorage/dynamodb/event_storage.go b/internal/storage/metastorage/dynamodb/event_storage.go index 6c1aaa22..20783249 100644 --- a/internal/storage/metastorage/dynamodb/event_storage.go +++ b/internal/storage/metastorage/dynamodb/event_storage.go @@ -73,7 +73,9 @@ func newEventStorage(params Params) (internal.EventStorage, error) { return nil, xerrors.Errorf("failed to create versioned event table: %w", err) } - metrics := params.Metrics.SubScope("event_storage") + metrics := params.Metrics.SubScope("event_storage").Tagged(map[string]string{ + "storage_type": "dynamodb", + }) storage := eventStorageImpl{ eventTable: eventTable, heightIndexName: heightIndexName, diff --git a/internal/storage/metastorage/dynamodb/transaction_storage.go b/internal/storage/metastorage/dynamodb/transaction_storage.go index 77d9f4f6..0af445d7 100644 --- a/internal/storage/metastorage/dynamodb/transaction_storage.go +++ b/internal/storage/metastorage/dynamodb/transaction_storage.go @@ -57,7 +57,9 @@ func newTransactionStorage(params Params) (internal.TransactionStorage, error) { return nil, xerrors.Errorf("failed to create transaction table accessor: %w", err) } - metrics := params.Metrics.SubScope("transaction_storage") + metrics := params.Metrics.SubScope("transaction_storage").Tagged(map[string]string{ + "storage_type": "dynamodb", + }) return &transactionStorageImpl{ transactionTable: transactionTable, diff --git a/internal/storage/metastorage/firestore/block_storage.go b/internal/storage/metastorage/firestore/block_storage.go index d90dde36..b36572aa 100644 --- a/internal/storage/metastorage/firestore/block_storage.go +++ b/internal/storage/metastorage/firestore/block_storage.go @@ -36,7 +36,9 @@ type ( ) func newBlockStorage(params Params, client *firestore.Client) (internal.BlockStorage, error) { - metrics := params.Metrics.SubScope("block_storage_firestore") + metrics := params.Metrics.SubScope("block_storage").Tagged(map[string]string{ + "storage_type": "firestore", + }) accessor := blockStorageImpl{ client: client, projectId: params.Config.GCP.Project, diff --git a/internal/storage/metastorage/firestore/event_storage.go b/internal/storage/metastorage/firestore/event_storage.go index a1b373a8..1c11f7f2 100644 --- a/internal/storage/metastorage/firestore/event_storage.go +++ b/internal/storage/metastorage/firestore/event_storage.go @@ -39,7 +39,9 @@ type ( ) func newEventStorage(params Params, client *firestore.Client) (internal.EventStorage, error) { - metrics := params.Metrics.SubScope("event_storage_firestore") + metrics := params.Metrics.SubScope("event_storage").Tagged(map[string]string{ + "storage_type": "firestore", + }) storage := eventStorageImpl{ client: client, env: params.Config.ConfigName, diff --git a/internal/storage/metastorage/firestore/transaction_storage.go b/internal/storage/metastorage/firestore/transaction_storage.go index 6e7bf7c1..375f1bfc 100644 --- a/internal/storage/metastorage/firestore/transaction_storage.go +++ b/internal/storage/metastorage/firestore/transaction_storage.go @@ -21,7 +21,9 @@ type ( var _ internal.TransactionStorage = (*transactionStorageImpl)(nil) func newTransactionStorage(params Params, client *firestore.Client) (internal.TransactionStorage, error) { - metrics := params.Metrics.SubScope("transaction_storage_firestore") + metrics := params.Metrics.SubScope("transaction_storage").Tagged(map[string]string{ + "storage_type": "firestore", + }) return &transactionStorageImpl{ client: client, instrumentAddOrUpdateTransaction: instrument.New(metrics, "add_transactions"), diff --git a/internal/tally/module.go b/internal/tally/module.go index e1d2415f..551975d9 100644 --- a/internal/tally/module.go +++ b/internal/tally/module.go @@ -6,4 +6,5 @@ import ( var Module = fx.Options( fx.Provide(NewRootScope), + fx.Provide(NewStatsReporter), ) diff --git a/internal/tally/stats_reporter.go b/internal/tally/stats_reporter.go new file mode 100644 index 00000000..734d2a21 --- /dev/null +++ b/internal/tally/stats_reporter.go @@ -0,0 +1,115 @@ +package tally + +import ( + "context" + "time" + + smirastatsd "github.com/smira/go-statsd" + "github.com/uber-go/tally/v4" + "go.uber.org/fx" + "go.uber.org/zap" + + "github.com/coinbase/chainstorage/internal/config" +) + +type ( + StatsReporterParams struct { + fx.In + Lifecycle fx.Lifecycle + Logger *zap.Logger + Config *config.Config + } + + reporter struct { + client *smirastatsd.Client + } +) + +const ( + reportingInterval = time.Second +) + +var ( + // hardcoding this to be datadog format + // we need think about whats the best way to set it up in config such that + // when we switch reporter impl, config will still be backward compatible + tagFormat = smirastatsd.TagFormatDatadog +) + +func NewStatsReporter(params StatsReporterParams) tally.StatsReporter { + if params.Config.StatsD == nil { + return tally.NullStatsReporter + } + cfg := params.Config.StatsD + client := smirastatsd.NewClient( + cfg.Address, + smirastatsd.MetricPrefix(cfg.Prefix), + smirastatsd.TagStyle(tagFormat), + smirastatsd.ReportInterval(reportingInterval), + ) + params.Logger.Info("initialized statsd client") + params.Lifecycle.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return client.Close() + }, + }) + return &reporter{ + client: client, + } +} + +func convertTags(tagsMap map[string]string) []smirastatsd.Tag { + tags := make([]smirastatsd.Tag, 0, len(tagsMap)) + for key, value := range tagsMap { + tags = append(tags, smirastatsd.StringTag(key, value)) + } + return tags +} + +func (r *reporter) ReportCounter(name string, tags map[string]string, value int64) { + r.client.Incr(name, value, convertTags(tags)...) +} + +func (r *reporter) ReportGauge(name string, tags map[string]string, value float64) { + r.client.FGauge(name, value, convertTags(tags)...) +} + +func (r *reporter) ReportTimer(name string, tags map[string]string, value time.Duration) { + r.client.PrecisionTiming(name, value, convertTags(tags)...) +} + +func (r *reporter) ReportHistogramValueSamples( + name string, + tags map[string]string, + buckets tally.Buckets, + bucketLowerBound, + bucketUpperBound float64, + samples int64) { + panic("no implemented") +} + +func (r *reporter) ReportHistogramDurationSamples( + name string, + tags map[string]string, + buckets tally.Buckets, + bucketLowerBound, + bucketUpperBound time.Duration, + samples int64) { + panic("no implemented") +} + +func (r *reporter) Capabilities() tally.Capabilities { + return r +} + +func (r *reporter) Reporting() bool { + return true +} + +func (r *reporter) Tagging() bool { + return true +} + +func (r *reporter) Flush() { + // no-op +} diff --git a/internal/tally/stats_reporter_test.go b/internal/tally/stats_reporter_test.go new file mode 100644 index 00000000..834a1cfa --- /dev/null +++ b/internal/tally/stats_reporter_test.go @@ -0,0 +1,49 @@ +package tally + +import ( + "testing" + + "github.com/uber-go/tally/v4" + "go.uber.org/fx" + + "github.com/coinbase/chainstorage/internal/config" + "github.com/coinbase/chainstorage/internal/utils/testapp" + "github.com/coinbase/chainstorage/internal/utils/testutil" +) + +func TestNewReporterDefaultNoStatsD(t *testing.T) { + testapp.TestAllConfigs(t, func(t *testing.T, cfg *config.Config) { + require := testutil.Require(t) + + var reporter tally.StatsReporter + testapp.New( + t, + testapp.WithConfig(cfg), + fx.Provide(NewStatsReporter), + fx.Populate(&reporter), + ) + + require.Equal(tally.NullStatsReporter, reporter) + require.Equal(false, reporter.Capabilities().Reporting()) + require.Equal(false, reporter.Capabilities().Tagging()) + }) +} + +func TestNewReporterDefaultWithStatsD(t *testing.T) { + testapp.TestAllConfigs(t, func(t *testing.T, cfg *config.Config) { + require := testutil.Require(t) + cfg.StatsD = &config.StatsDConfig{ + Address: "localhost:8125", + } + var reporter tally.StatsReporter + testapp.New( + t, + testapp.WithConfig(cfg), + fx.Provide(NewStatsReporter), + fx.Populate(&reporter), + ) + require.NotEqual(tally.NullStatsReporter, reporter) + require.Equal(true, reporter.Capabilities().Reporting()) + require.Equal(true, reporter.Capabilities().Tagging()) + }) +} diff --git a/internal/tally/tally.go b/internal/tally/tally.go index ff6b2c59..9d065cb1 100644 --- a/internal/tally/tally.go +++ b/internal/tally/tally.go @@ -2,7 +2,6 @@ package tally import ( "context" - "time" "github.com/uber-go/tally/v4" "go.uber.org/fx" @@ -16,27 +15,18 @@ type ( fx.In Lifecycle fx.Lifecycle Config *config.Config - Reporter tally.StatsReporter `optional:"true"` + Reporter tally.StatsReporter } ) -const ( - reportingInterval = time.Second -) - func NewRootScope(params MetricParams) tally.Scope { - // XXX: Inject your own reporter here. - reporter := params.Reporter - if reporter == nil { - reporter = tally.NullStatsReporter - } - opts := tally.ScopeOptions{ Prefix: consts.ServiceName, - Reporter: reporter, + Reporter: params.Reporter, Tags: params.Config.GetCommonTags(), } - scope, closer := tally.NewRootScope(opts, reportingInterval) + //report interval will be set on reporter + scope, closer := tally.NewRootScope(opts, 0) params.Lifecycle.Append(fx.Hook{ OnStop: func(ctx context.Context) error { return closer.Close()