Skip to content

Commit

Permalink
Merge branch 'coinbase:master' into feat/presign-url-in-blob-storage
Browse files Browse the repository at this point in the history
  • Loading branch information
bestmike007 authored Jan 25, 2024
2 parents 821aa21 + 2032bcc commit ec9d91c
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 13 deletions.
2 changes: 2 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ var (
DLQType_value = map[string]int32{
"UNSPECIFIED": 0,
"SQS": 1,
"FIRESTORE": 2,
}
)

Expand Down Expand Up @@ -491,6 +492,7 @@ const (

DLQType_UNSPECIFIED DLQType = 0
DLQType_SQS DLQType = 1
DLQType_FIRESTORE DLQType = 2

AWSAccountDevelopment AWSAccount = "development"
AWSAccountProduction AWSAccount = "production"
Expand Down
123 changes: 123 additions & 0 deletions internal/dlq/firestore/dlq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package firestore

import (
"context"
"encoding/json"

"cloud.google.com/go/firestore"
"go.uber.org/fx"
"go.uber.org/zap"
"golang.org/x/xerrors"

"github.com/coinbase/chainstorage/internal/config"
"github.com/coinbase/chainstorage/internal/dlq/internal"
"github.com/coinbase/chainstorage/internal/utils/fxparams"
"github.com/coinbase/chainstorage/internal/utils/instrument"
"github.com/coinbase/chainstorage/internal/utils/log"
)

// A firestore-based store only DLQ for storing unexpected blocks

type (
DLQ = internal.DLQ
Message = internal.Message

DLQParams struct {
fx.In
fxparams.Params
}

dlqImpl struct {
config *config.Config
logger *zap.Logger
client *firestore.Client
instrumentSendMessage instrument.Instrument
instrumentResendMessage instrument.Instrument
instrumentReceiveMessage instrument.Instrument
instrumentDeleteMessage instrument.Instrument
}

dlqFactory struct {
params DLQParams
}
)

var _ DLQ = (*dlqImpl)(nil)

func (f *dlqFactory) Create() (internal.DLQ, error) {
return New(f.params)
}

func NewFactory(params DLQParams) internal.DLQFactory {
return &dlqFactory{params}
}

func New(params DLQParams) (DLQ, error) {
ctx := context.Background()
config := params.Config.GCP
if config == nil {
return nil, xerrors.Errorf("failed to create firestore meta storage: missing GCP config")
}

client, err := firestore.NewClient(ctx, config.Project)
if err != nil {
return nil, xerrors.Errorf("failed to create firestore client: %w", err)
}
metrics := params.Metrics.SubScope("dlq").Tagged(map[string]string{
"storage_type": "firestore",
})
return &dlqImpl{
config: params.Config,
logger: log.WithPackage(params.Logger).With(zap.String("storage_type", "firestore")),
client: client,
instrumentSendMessage: instrument.New(metrics, "send_message"),
instrumentResendMessage: instrument.New(metrics, "resend_message"),
instrumentReceiveMessage: instrument.New(metrics, "receive_message", instrument.WithFilter(internal.FilterError)),
instrumentDeleteMessage: instrument.New(metrics, "delete_message"),
}, nil
}

// DeleteMessage implements internal.DLQ.
func (*dlqImpl) DeleteMessage(ctx context.Context, message *internal.Message) error {
return internal.ErrNotFound
}

// ReceiveMessage implements internal.DLQ.
func (*dlqImpl) ReceiveMessage(ctx context.Context) (*internal.Message, error) {
return nil, internal.ErrNotFound
}

// ResendMessage implements internal.DLQ.
func (*dlqImpl) ResendMessage(ctx context.Context, message *internal.Message) error {
return internal.ErrNotFound
}

// SendMessage implements internal.DLQ.
func (q *dlqImpl) SendMessage(ctx context.Context, message *internal.Message) error {
return q.instrumentSendMessage.Instrument(ctx, func(ctx context.Context) error {
body, err := json.Marshal(message.Data)
if err != nil {
return xerrors.Errorf("failed to marshal body: %w", err)
}
messageBody := string(body)

_, _, err = q.client.Collection("dlq").Add(ctx, map[string]any{
"Topic": message.Topic,
"Retries": message.Retries,
"SentTimestamp": message.SentTimestamp,
"ReceiptHandle": message.ReceiptHandle,
"Data": messageBody,
})

if err != nil {
return xerrors.Errorf("failed to send message: %w", err)
}

q.logger.Info(
"sent message to dlq",
zap.String("topic", message.Topic),
zap.Reflect("data", message.Data),
)
return nil
})
}
4 changes: 4 additions & 0 deletions internal/dlq/internal/dlq.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,7 @@ func WithDLQFactory(params DLQFactoryParams) (DLQ, error) {
}
return dlq, nil
}

func FilterError(err error) bool {
return xerrors.Is(err, ErrNotFound)
}
12 changes: 2 additions & 10 deletions internal/dlq/sqs/dlq.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ const (
retriesAttributeDataType = "Number"
)

var (
ErrNotFound = xerrors.New("not found")
)

func New(params DLQParams) (DLQ, error) {
client := sqs.New(params.Session)
metrics := params.Metrics.SubScope("dlq").Tagged(map[string]string{
Expand All @@ -78,7 +74,7 @@ func New(params DLQParams) (DLQ, error) {
client: client,
instrumentSendMessage: instrument.New(metrics, "send_message"),
instrumentResendMessage: instrument.New(metrics, "resend_message"),
instrumentReceiveMessage: instrument.New(metrics, "receive_message", instrument.WithFilter(filterError)),
instrumentReceiveMessage: instrument.New(metrics, "receive_message", instrument.WithFilter(internal.FilterError)),
instrumentDeleteMessage: instrument.New(metrics, "delete_message"),
}
if params.Config.AWS.IsLocalStack {
Expand Down Expand Up @@ -196,7 +192,7 @@ func (q *dlqImpl) ReceiveMessage(ctx context.Context) (*Message, error) {

numMessages := len(output.Messages)
if numMessages == 0 {
return ErrNotFound
return internal.ErrNotFound
}

if numMessages != 1 {
Expand Down Expand Up @@ -311,7 +307,3 @@ func (q *dlqImpl) initQueueURL() error {
)
return nil
}

func filterError(err error) bool {
return xerrors.Is(err, ErrNotFound)
}
6 changes: 3 additions & 3 deletions internal/dlq/sqs/dlq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestIntegrationDLQ(t *testing.T) {

_, err = q.ReceiveMessage(context.Background())
require.Error(err)
require.True(xerrors.Is(err, ErrNotFound))
require.True(xerrors.Is(err, internal.ErrNotFound))
}

func TestIntegrationDLQ_Resend(t *testing.T) {
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestIntegrationDLQ_Resend(t *testing.T) {

_, err = q.ReceiveMessage(context.Background())
require.Error(err)
require.True(xerrors.Is(err, ErrNotFound))
require.True(xerrors.Is(err, internal.ErrNotFound))
}

func TestIntegrationDLQ_UnknownTopic(t *testing.T) {
Expand Down Expand Up @@ -149,5 +149,5 @@ func TestIntegrationDLQ_UnknownTopic(t *testing.T) {

_, err = q.ReceiveMessage(context.Background())
require.Error(err)
require.True(xerrors.Is(err, ErrNotFound))
require.True(xerrors.Is(err, internal.ErrNotFound))
}

0 comments on commit ec9d91c

Please sign in to comment.