Skip to content

Commit

Permalink
Merge pull request #98 from pracucci/fix-s3-upload-regression
Browse files Browse the repository at this point in the history
Fix s3 upload performance regression
  • Loading branch information
yeya24 authored Jan 28, 2024
2 parents 6ecabdd + c83effb commit bdadaef
Show file tree
Hide file tree
Showing 2 changed files with 344 additions and 106 deletions.
135 changes: 74 additions & 61 deletions objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,19 +232,6 @@ func NopCloserWithSize(r io.Reader) io.ReadCloser {
return nopCloserWithObjectSize{r}
}

type nopSeekerCloserWithObjectSize struct{ io.Reader }

func (nopSeekerCloserWithObjectSize) Close() error { return nil }
func (n nopSeekerCloserWithObjectSize) ObjectSize() (int64, error) { return TryToGetSize(n.Reader) }

func (n nopSeekerCloserWithObjectSize) Seek(offset int64, whence int) (int64, error) {
return n.Reader.(io.Seeker).Seek(offset, whence)
}

func nopSeekerCloserWithSize(r io.Reader) io.ReadSeekCloser {
return nopSeekerCloserWithObjectSize{r}
}

// UploadDir uploads all files in srcdir to the bucket with into a top-level directory
// named dstdir. It is a caller responsibility to clean partial upload in case of failure.
func UploadDir(ctx context.Context, logger log.Logger, bkt Bucket, srcdir, dstdir string, options ...UploadOption) error {
Expand Down Expand Up @@ -555,8 +542,9 @@ func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, err
}
return nil, err
}
return newTimingReadCloser(
return newTimingReader(
rc,
true,
op,
b.opsDuration,
b.opsFailures,
Expand All @@ -577,8 +565,9 @@ func (b *metricBucket) GetRange(ctx context.Context, name string, off, length in
}
return nil, err
}
return newTimingReadCloser(
return newTimingReader(
rc,
true,
op,
b.opsDuration,
b.opsFailures,
Expand Down Expand Up @@ -608,16 +597,9 @@ func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) err
const op = OpUpload
b.ops.WithLabelValues(op).Inc()

_, ok := r.(io.Seeker)
var nopR io.ReadCloser
if ok {
nopR = nopSeekerCloserWithSize(r)
} else {
nopR = NopCloserWithSize(r)
}

trc := newTimingReadCloser(
nopR,
trc := newTimingReader(
r,
false,
op,
b.opsDuration,
b.opsFailures,
Expand Down Expand Up @@ -670,12 +652,13 @@ func (b *metricBucket) Name() string {
return b.bkt.Name()
}

type timingReadSeekCloser struct {
timingReadCloser
}
type timingReader struct {
io.Reader

// closeReader holds whether the wrapper io.Reader should be closed when
// Close() is called on the timingReader.
closeReader bool

type timingReadCloser struct {
io.ReadCloser
objSize int64
objSizeErr error

Expand All @@ -691,14 +674,15 @@ type timingReadCloser struct {
transferredBytes *prometheus.HistogramVec
}

func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec, isFailureExpected IsOpFailureExpectedFunc, fetchedBytes *prometheus.CounterVec, transferredBytes *prometheus.HistogramVec) io.ReadCloser {
func newTimingReader(r io.Reader, closeReader bool, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec, isFailureExpected IsOpFailureExpectedFunc, fetchedBytes *prometheus.CounterVec, transferredBytes *prometheus.HistogramVec) io.ReadCloser {
// Initialize the metrics with 0.
dur.WithLabelValues(op)
failed.WithLabelValues(op)
objSize, objSizeErr := TryToGetSize(rc)
objSize, objSizeErr := TryToGetSize(r)

trc := timingReadCloser{
ReadCloser: rc,
trc := timingReader{
Reader: r,
closeReader: closeReader,
objSize: objSize,
objSizeErr: objSizeErr,
start: time.Now(),
Expand All @@ -711,50 +695,79 @@ func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramV
readBytes: 0,
}

_, ok := rc.(io.Seeker)
if ok {
return &timingReadSeekCloser{
timingReadCloser: trc,
}
_, isSeeker := r.(io.Seeker)
_, isReaderAt := r.(io.ReaderAt)

if isSeeker && isReaderAt {
// The assumption is that in most cases when io.ReaderAt() is implemented then
// io.Seeker is implemented too (e.g. os.File).
return &timingReaderSeekerReaderAt{timingReaderSeeker: timingReaderSeeker{timingReader: trc}}
}
if isSeeker {
return &timingReaderSeeker{timingReader: trc}
}

return &trc
}

func (t *timingReadCloser) ObjectSize() (int64, error) {
return t.objSize, t.objSizeErr
func (r *timingReader) ObjectSize() (int64, error) {
return r.objSize, r.objSizeErr
}

func (rc *timingReadCloser) Close() error {
err := rc.ReadCloser.Close()
if !rc.alreadyGotErr && err != nil {
rc.failed.WithLabelValues(rc.op).Inc()
func (r *timingReader) Close() error {
var closeErr error

// Call the wrapped reader if it implements Close(), only if we've been asked to close it.
if closer, ok := r.Reader.(io.Closer); r.closeReader && ok {
closeErr = closer.Close()

if !r.alreadyGotErr && closeErr != nil {
r.failed.WithLabelValues(r.op).Inc()
r.alreadyGotErr = true
}
}
if !rc.alreadyGotErr && err == nil {
rc.duration.WithLabelValues(rc.op).Observe(time.Since(rc.start).Seconds())
rc.transferredBytes.WithLabelValues(rc.op).Observe(float64(rc.readBytes))
rc.alreadyGotErr = true

// Track duration and transferred bytes only if no error occurred.
if !r.alreadyGotErr {
r.duration.WithLabelValues(r.op).Observe(time.Since(r.start).Seconds())
r.transferredBytes.WithLabelValues(r.op).Observe(float64(r.readBytes))

// Trick to tracking metrics multiple times in case Close() gets called again.
r.alreadyGotErr = true
}
return err

return closeErr
}

func (rc *timingReadCloser) Read(b []byte) (n int, err error) {
n, err = rc.ReadCloser.Read(b)
if rc.fetchedBytes != nil {
rc.fetchedBytes.WithLabelValues(rc.op).Add(float64(n))
func (r *timingReader) Read(b []byte) (n int, err error) {
n, err = r.Reader.Read(b)
if r.fetchedBytes != nil {
r.fetchedBytes.WithLabelValues(r.op).Add(float64(n))
}

rc.readBytes += int64(n)
r.readBytes += int64(n)
// Report metric just once.
if !rc.alreadyGotErr && err != nil && err != io.EOF {
if !rc.isFailureExpected(err) {
rc.failed.WithLabelValues(rc.op).Inc()
if !r.alreadyGotErr && err != nil && err != io.EOF {
if !r.isFailureExpected(err) {
r.failed.WithLabelValues(r.op).Inc()
}
rc.alreadyGotErr = true
r.alreadyGotErr = true
}
return n, err
}

func (rsc *timingReadSeekCloser) Seek(offset int64, whence int) (int64, error) {
return (rsc.ReadCloser).(io.Seeker).Seek(offset, whence)
type timingReaderSeeker struct {
timingReader
}

func (rsc *timingReaderSeeker) Seek(offset int64, whence int) (int64, error) {
return (rsc.Reader).(io.Seeker).Seek(offset, whence)
}

type timingReaderSeekerReaderAt struct {
timingReaderSeeker
}

func (rsc *timingReaderSeekerReaderAt) ReadAt(p []byte, off int64) (int, error) {
return (rsc.Reader).(io.ReaderAt).ReadAt(p, off)
}
Loading

0 comments on commit bdadaef

Please sign in to comment.