Skip to content

Commit

Permalink
Added hedging to AWS
Browse files Browse the repository at this point in the history
Signed-off-by: Joao Marcal <[email protected]>
  • Loading branch information
JoaoBraveCoding committed Nov 14, 2023
1 parent d5c5760 commit 4bc8466
Showing 1 changed file with 25 additions and 6 deletions.
31 changes: 25 additions & 6 deletions pkg/storage/chunk/client/aws/s3_thanos_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,45 @@ import (
)

type S3ThanosObjectClient struct {
client objstore.Bucket
client objstore.Bucket
hedgedClient objstore.Bucket
}

// NewS3ObjectClient makes a new S3-backed ObjectClient.
func NewS3ThanosObjectClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedgingCfg hedging.Config) (*S3ThanosObjectClient, error) {
bucket, err := bucket.NewClient(ctx, cfg, component, logger, prometheus.DefaultRegisterer)
client, err := newS3ThanosObjClient(ctx, cfg, component, logger, false, hedgingCfg)
if err != nil {
return nil, err
}
hedgedClient, err := newS3ThanosObjClient(ctx, cfg, component, logger, false, hedgingCfg)
if err != nil {
return nil, err
}
return &S3ThanosObjectClient{
client: bucket,
client: client,
hedgedClient: hedgedClient,
}, nil
}

func newS3ThanosObjClient(ctx context.Context, cfg bucket.Config, component string, logger log.Logger, hedging bool, hedgingCfg hedging.Config) (objstore.Bucket, error) {
if hedging {
hedgedTrasport, err := hedgingCfg.RoundTripperWithRegisterer(nil, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer))
if err != nil {
return nil, err
}

cfg.S3.HTTP.Transport = hedgedTrasport
}

return bucket.NewClient(ctx, cfg, component, logger, prometheus.DefaultRegisterer)
}

// Stop fulfills the chunk.ObjectClient interface
func (s *S3ThanosObjectClient) Stop() {}

// ObjectExists checks if a given objectKey exists in the AWS bucket
func (s *S3ThanosObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
return s.client.Exists(ctx, objectKey)
return s.hedgedClient.Exists(ctx, objectKey)
}

// PutObject into the store
Expand All @@ -50,12 +69,12 @@ func (s *S3ThanosObjectClient) DeleteObject(ctx context.Context, objectKey strin

// GetObject returns a reader and the size for the specified object key from the configured S3 bucket.
func (s *S3ThanosObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
reader, err := s.client.Get(ctx, objectKey)
reader, err := s.hedgedClient.Get(ctx, objectKey)
if err != nil {
return nil, 0, err
}

attr, err := s.client.Attributes(ctx, objectKey)
attr, err := s.hedgedClient.Attributes(ctx, objectKey)
if err != nil {
return nil, 0, errors.Wrapf(err, "failed to get attributes for %s", objectKey)
}
Expand Down

0 comments on commit 4bc8466

Please sign in to comment.