Skip to content

Commit

Permalink
Merge branch 'coinbase:master' into feat/presign-url-in-blob-storage
Browse files Browse the repository at this point in the history
  • Loading branch information
bestmike007 authored Jan 23, 2024
2 parents b8a3b02 + e19ba5f commit a748241
Show file tree
Hide file tree
Showing 19 changed files with 262 additions and 39 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/robfig/cron/v3 v3.0.1
github.com/smallnest/weighted v0.0.0-20230419055410-36b780e40a7a
github.com/smira/go-statsd v1.3.3
github.com/spf13/cobra v1.8.0
github.com/spf13/viper v1.18.2
github.com/stretchr/testify v1.8.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,8 @@ github.com/smallnest/weighted v0.0.0-20230419055410-36b780e40a7a h1:eieNTZmrnPzI
github.com/smallnest/weighted v0.0.0-20230419055410-36b780e40a7a/go.mod h1:xc9CoZ+ZBGwajnWto5Aqw/wWg8euy4HtOr6K9Fxp9iw=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/smira/go-statsd v1.3.3 h1:WnMlmGTyMpzto+HvOJWRPoLaLlk5EGfzsnlQBcvj4yI=
github.com/smira/go-statsd v1.3.3/go.mod h1:RjdsESPgDODtg1VpVVf9MJrEW2Hw0wtRNbmB1CAhu6A=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
Expand Down
59 changes: 45 additions & 14 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"golang.org/x/xerrors"

"github.com/coinbase/chainstorage/config"
"github.com/coinbase/chainstorage/internal/utils/consts"
"github.com/coinbase/chainstorage/internal/utils/utils"
"github.com/coinbase/chainstorage/protos/coinbase/c3/common"
api "github.com/coinbase/chainstorage/protos/coinbase/chainstorage"
Expand All @@ -40,6 +39,7 @@ type (
Cron CronConfig `mapstructure:"cron"`
SLA SLAConfig `mapstructure:"sla"`
FunctionalTest FunctionalTestConfig `mapstructure:"functional_test"`
StatsD *StatsDConfig `mapstructure:"statsd"`

namespace string
env Env
Expand Down Expand Up @@ -396,6 +396,11 @@ type (
PerClientRPS int `mapstructure:"per_client_rps"`
}

StatsDConfig struct {
Address string `mapstructure:"address" validate:"required"`
Prefix string `mapstructure:"prefix"`
}

ConfigOption func(options *configOptions)

Env string
Expand Down Expand Up @@ -460,6 +465,8 @@ const (
EnvVarNamespace = "CHAINSTORAGE_NAMESPACE"
EnvVarConfigName = "CHAINSTORAGE_CONFIG"
EnvVarEnvironment = "CHAINSTORAGE_ENVIRONMENT"
EnvVarConfigRoot = "CHAINSTORAGE_CONFIG_ROOT"
EnvVarConfigPath = "CHAINSTORAGE_CONFIG_PATH"
EnvVarTestType = "TEST_TYPE"
EnvVarCI = "CI"

Expand Down Expand Up @@ -614,12 +621,12 @@ func getConfigName() string {
return configName
}

func GetProjectName() string {
projectName, ok := os.LookupEnv("CODEFLOW_PROJECT_NAME")
if !ok {
projectName = consts.ProjectName
}
return projectName
func GetConfigRoot() string {
return os.Getenv(EnvVarConfigRoot)
}

func GetConfigPath() string {
return os.Getenv(EnvVarConfigPath)
}

func mergeInConfig(v *viper.Viper, configOpts *configOptions, env Env) error {
Expand Down Expand Up @@ -854,17 +861,22 @@ func getConfigData(namespace string, env Env, blockchain common.Blockchain, netw
networkName := strings.TrimPrefix(network.GetName(), blockchainName+"-")
sidechainName := strings.TrimPrefix(sidechain.GetName(), blockchainName+"-"+networkName+"-")

configRoot := GetConfigRoot()
if env == envSecrets {
// .secrets.yml is intentionally not embedded in config.Store.
// Read it from the file system instead.
_, filename, _, ok := runtime.Caller(0)
if !ok {
return nil, xerrors.Errorf("failed to recover the filename information")
// If configRoot is not set, use the default path.
if len(configRoot) == 0 {
_, filename, _, ok := runtime.Caller(0)
if !ok {
return nil, xerrors.Errorf("failed to recover the filename information")
}
rootDir := strings.TrimSuffix(filename, CurrentFileName)
configRoot = fmt.Sprintf("%v/config", rootDir)
}
rootDir := strings.TrimSuffix(filename, CurrentFileName)
configPath := fmt.Sprintf("%v/config/%v/%v/%v/.secrets.yml", rootDir, namespace, blockchainName, networkName)
configPath := fmt.Sprintf("%v/%v/%v/%v/.secrets.yml", configRoot, namespace, blockchainName, networkName)
if sidechain != api.SideChain_SIDECHAIN_NONE {
configPath = fmt.Sprintf("%v/config/%v/%v/%v/%v/.secrets.yml", rootDir, namespace, blockchainName, networkName, sidechainName)
configPath = fmt.Sprintf("%v/%v/%v/%v/%v/.secrets.yml", configRoot, namespace, blockchainName, networkName, sidechainName)
}
reader, err := os.Open(configPath)
if err != nil {
Expand All @@ -873,7 +885,26 @@ func getConfigData(namespace string, env Env, blockchain common.Blockchain, netw
return reader, nil
}

configPath := fmt.Sprintf("%v/%v/%v/%v.yml", namespace, blockchainName, networkName, env)
configPath := GetConfigPath()
// If configPath is not set, try to construct the file system path from configRoot.
if len(configPath) == 0 && len(configRoot) > 0 {
configPath = fmt.Sprintf("%v/%v/%v/%v/%v.yml", configRoot, namespace, blockchainName, networkName, env)
if sidechain != api.SideChain_SIDECHAIN_NONE {
configPath = fmt.Sprintf("%v/%v/%v/%v/%v/%v.yml", configRoot, namespace, blockchainName, networkName, sidechainName, env)
}
}

// If either configRoot or configPath is set, read the config from the file system.
if len(configPath) > 0 {
reader, err := os.Open(configPath)
if err != nil {
return nil, xerrors.Errorf("failed to read config file %v: %w", configPath, err)
}
return reader, nil
}

// Read the config from the embedded config.Store.
configPath = fmt.Sprintf("%v/%v/%v/%v.yml", namespace, blockchainName, networkName, env)
if sidechain != api.SideChain_SIDECHAIN_NONE {
configPath = fmt.Sprintf("%v/%v/%v/%v/%v.yml", namespace, blockchainName, networkName, sidechainName, env)
}
Expand Down
13 changes: 13 additions & 0 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,19 @@ func TestConfigOverridingByEnvSettings(t *testing.T) {
})
}

func TestConfigOverrideConfigPath(t *testing.T) {
require := testutil.Require(t)
err := os.Setenv(config.EnvVarConfigPath, "../../config/chainstorage/ethereum/mainnet/base.yml")
require.NoError(err)
defer os.Unsetenv(config.EnvVarConfigPath)

cfg, err := config.New()
require.NoError(err)

require.Equal(common.Blockchain_BLOCKCHAIN_ETHEREUM, cfg.Blockchain())
require.Equal(common.Network_NETWORK_ETHEREUM_MAINNET, cfg.Network())
}

func TestEndpointParsing(t *testing.T) {
require := testutil.Require(t)

Expand Down
6 changes: 4 additions & 2 deletions internal/dlq/sqs/dlq.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,12 @@ var (

func New(params DLQParams) (DLQ, error) {
client := sqs.New(params.Session)
metrics := params.Metrics.SubScope("dlq")
metrics := params.Metrics.SubScope("dlq").Tagged(map[string]string{
"storage_type": "sqs",
})
impl := &dlqImpl{
config: params.Config,
logger: log.WithPackage(params.Logger),
logger: log.WithPackage(params.Logger).With(zap.String("storage_type", "sqs")),
client: client,
instrumentSendMessage: instrument.New(metrics, "send_message"),
instrumentResendMessage: instrument.New(metrics, "resend_message"),
Expand Down
5 changes: 3 additions & 2 deletions internal/gateway/chainstorage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ var (
)

func NewChainstorageClient(params Params) (Client, error) {
address := params.Config.SDK.ChainstorageAddress
authHeader := params.Config.SDK.AuthHeader
authToken := params.Config.SDK.AuthToken
restful := params.Config.SDK.Restful
Expand Down Expand Up @@ -128,7 +127,10 @@ func NewChainstorageClient(params Params) (Client, error) {
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(sendMsgSize), grpc.MaxCallRecvMsgSize(recvMsgSize)),
}

address := cfg.serverAddress
if strings.HasPrefix(address, "http://") || strings.Contains(address, "localhost") {
// Remove http:// prefix since grpc.DialContext does not support it.
address = strings.Replace(address, "http://", "", 1)
opts = append(opts, grpc.WithInsecure())
} else {
opts = append(opts, grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")))
Expand All @@ -155,7 +157,6 @@ func NewChainstorageClient(params Params) (Client, error) {
zap.String("env", string(params.Config.Env())),
zap.String("blockchain", params.Config.Chain.Blockchain.String()),
zap.String("network", params.Config.Chain.Network.String()),
zap.String("address", address),
zap.String("sidechain", params.Config.Chain.Sidechain.String()),
zap.String("address", cfg.serverAddress),
zap.String("client_id", cfg.clientID),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"testing"

"github.com/stretchr/testify/suite"

Check failure on line 7 in internal/storage/blobstorage/gcs/blob_storage_integration_test.go

View workflow job for this annotation

GitHub Actions / integration (1.21)

other declaration of suite

Check failure on line 7 in internal/storage/blobstorage/gcs/blob_storage_integration_test.go

View workflow job for this annotation

GitHub Actions / integration (1.20)

other declaration of suite
"go.uber.org/fx"
"google.golang.org/protobuf/proto"

Expand Down
2 changes: 2 additions & 0 deletions internal/storage/blobstorage/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package blobstorage
import (
"go.uber.org/fx"

"github.com/coinbase/chainstorage/internal/storage/blobstorage/gcs"
"github.com/coinbase/chainstorage/internal/storage/blobstorage/internal"
"github.com/coinbase/chainstorage/internal/storage/blobstorage/s3"
)
Expand All @@ -16,4 +17,5 @@ type (
var Module = fx.Options(
fx.Provide(internal.WithBlobStorageFactory),
s3.Module,
gcs.Module,
)
5 changes: 4 additions & 1 deletion internal/storage/blobstorage/s3/blob_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ func (f *blobStorageFactory) Create() (internal.BlobStorage, error) {
}

func New(params BlobStorageParams) (internal.BlobStorage, error) {
metrics := params.Metrics.SubScope("blob_storage")
metrics := params.Metrics.SubScope("blob_storage").Tagged(map[string]string{
"storage_type": "s3",
})
return &blobStorageImpl{
logger: log.WithPackage(params.Logger),
config: params.Config,
Expand Down Expand Up @@ -235,6 +237,7 @@ func (s *blobStorageImpl) PreSign(ctx context.Context, objectKey string) (string
func (s *blobStorageImpl) logDuration(method string, start time.Time) {
s.logger.Debug(
"blob_storage",
zap.String("storage_type", "s3"),
zap.String("method", method),
zap.Duration("duration", time.Since(start)),
)
Expand Down
4 changes: 3 additions & 1 deletion internal/storage/metastorage/dynamodb/block_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ func newBlockStorage(params Params) (internal.BlockStorage, error) {
return nil, xerrors.Errorf("failed to create metadata accessor: %w", err)
}

metrics := params.Metrics.SubScope("block_storage")
metrics := params.Metrics.SubScope("block_storage").Tagged(map[string]string{
"storage_type": "dynamodb",
})
accessor := blockStorageImpl{
blockTable: metadataTable,
blockStartHeight: params.Config.Chain.BlockStartHeight,
Expand Down
4 changes: 3 additions & 1 deletion internal/storage/metastorage/dynamodb/event_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ func newEventStorage(params Params) (internal.EventStorage, error) {
return nil, xerrors.Errorf("failed to create versioned event table: %w", err)
}

metrics := params.Metrics.SubScope("event_storage")
metrics := params.Metrics.SubScope("event_storage").Tagged(map[string]string{
"storage_type": "dynamodb",
})
storage := eventStorageImpl{
eventTable: eventTable,
heightIndexName: heightIndexName,
Expand Down
4 changes: 3 additions & 1 deletion internal/storage/metastorage/dynamodb/transaction_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ func newTransactionStorage(params Params) (internal.TransactionStorage, error) {
return nil, xerrors.Errorf("failed to create transaction table accessor: %w", err)
}

metrics := params.Metrics.SubScope("transaction_storage")
metrics := params.Metrics.SubScope("transaction_storage").Tagged(map[string]string{
"storage_type": "dynamodb",
})

return &transactionStorageImpl{
transactionTable: transactionTable,
Expand Down
4 changes: 3 additions & 1 deletion internal/storage/metastorage/firestore/block_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ type (
)

func newBlockStorage(params Params, client *firestore.Client) (internal.BlockStorage, error) {
metrics := params.Metrics.SubScope("block_storage_firestore")
metrics := params.Metrics.SubScope("block_storage").Tagged(map[string]string{
"storage_type": "firestore",
})
accessor := blockStorageImpl{
client: client,
projectId: params.Config.GCP.Project,
Expand Down
4 changes: 3 additions & 1 deletion internal/storage/metastorage/firestore/event_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ type (
)

func newEventStorage(params Params, client *firestore.Client) (internal.EventStorage, error) {
metrics := params.Metrics.SubScope("event_storage_firestore")
metrics := params.Metrics.SubScope("event_storage").Tagged(map[string]string{
"storage_type": "firestore",
})
storage := eventStorageImpl{
client: client,
env: params.Config.ConfigName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ type (
var _ internal.TransactionStorage = (*transactionStorageImpl)(nil)

func newTransactionStorage(params Params, client *firestore.Client) (internal.TransactionStorage, error) {
metrics := params.Metrics.SubScope("transaction_storage_firestore")
metrics := params.Metrics.SubScope("transaction_storage").Tagged(map[string]string{
"storage_type": "firestore",
})
return &transactionStorageImpl{
client: client,
instrumentAddOrUpdateTransaction: instrument.New(metrics, "add_transactions"),
Expand Down
1 change: 1 addition & 0 deletions internal/tally/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ import (

var Module = fx.Options(
fx.Provide(NewRootScope),
fx.Provide(NewStatsReporter),
)
Loading

0 comments on commit a748241

Please sign in to comment.