Skip to content

Commit

Permalink
Limit number of concurrent file writes
Browse files Browse the repository at this point in the history
Lower initial part size from 96 --> 16
Max part size 700MiB
Increase http client pool to 25
  • Loading branch information
kpjensen committed Sep 23, 2021
1 parent 2ff722f commit bd37300
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 17 deletions.
31 changes: 18 additions & 13 deletions dxfuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"log"
"math"
"net/http"
"os"
"path/filepath"
Expand Down Expand Up @@ -808,7 +807,7 @@ func (fsys *Filesys) CreateFile(ctx context.Context, op *fuseops.CreateFileOp) e
Tgid: tgid,
lastPartId: 0,
nextWriteOffset: 0,
writeBuffer: make([]byte, 0, 96*1024*1024),
writeBuffer: nil,
writeBufferOffset: 0,
mutex: &sync.Mutex{},
}
Expand Down Expand Up @@ -1269,8 +1268,8 @@ func (fsys *Filesys) openRegularFile(
Tgid: tgid,
lastPartId: 0,
nextWriteOffset: 0,
// 96MB slice capacity
writeBuffer: make([]byte, 0, 96*1024*1024),
// 16MB slice capacity
writeBuffer: nil,
writeBufferOffset: 0,
mutex: &sync.Mutex{},
}
Expand Down Expand Up @@ -1520,10 +1519,13 @@ func (fsys *Filesys) WriteFile(ctx context.Context, op *fuseops.WriteFileOp) err

if op.Offset != fh.nextWriteOffset {
fsys.log("ERROR: Only sequential writes are supported")
fsys.log("%v", fh.writeBufferOffset)
fsys.log("op.Offest: %d, fh.nextWriteOffest: %d", op.Offset, fh.nextWriteOffset)
return syscall.ENOTSUP
}
if fh.writeBuffer == nil {
// Allocate write buffer
fh.writeBuffer = fsys.uploader.AllocateWriteBuffer(fh.lastPartId, true)
}

bytesToWrite := op.Data

Expand Down Expand Up @@ -1562,11 +1564,7 @@ func (fsys *Filesys) WriteFile(ctx context.Context, op *fuseops.WriteFileOp) err
}
fh.wg.Add(1)
fsys.uploader.uploadQueue <- uploadReq
fh.writeBuffer = fh.writeBuffer[:0]
// increasing buffer size for next part
nextCap := 96 * 1024 * 1024 * math.Pow(1.0003, float64(partId))
nextBufferCapacity := math.Round(nextCap)
fh.writeBuffer = make([]byte, 0, int64(nextBufferCapacity))
fh.writeBuffer = nil
fh.writeBufferOffset = 0
// Update the file attributes in the database (size, mtime)
fsys.mutex.Lock()
Expand All @@ -1579,6 +1577,9 @@ func (fsys *Filesys) WriteFile(ctx context.Context, op *fuseops.WriteFileOp) err
}
fsys.opClose(oph)
fsys.mutex.Unlock()
fh.writeBuffer = fsys.uploader.AllocateWriteBuffer(partId, false)
fsys.log("allocated wb for %s", fh.Id)

}
// all data copied into buffer slice, break
if bytesCopied == len(bytesToWrite) {
Expand All @@ -1595,12 +1596,15 @@ func (fsys *Filesys) FlushFile(ctx context.Context, op *fuseops.FlushFileOp) err
if fsys.options.Verbose {
fsys.log("Flush inode %d", op.Inode)
}
fsys.mutex.Lock()
fh, ok := fsys.fhTable[op.Handle]

if !ok {
// File handle doesn't exist
// invalid file handle. It doesn't exist in the table
fsys.mutex.Unlock()
return fuse.EINVAL
}
fsys.mutex.Unlock()

if fh == nil {
return nil
}
Expand Down Expand Up @@ -1644,7 +1648,8 @@ func (fsys *Filesys) FlushFile(ctx context.Context, op *fuseops.FlushFileOp) err
}
fh.wg.Add(1)
fsys.uploader.uploadQueue <- uploadReq
fh.writeBuffer = fh.writeBuffer[:0]
fh.writeBuffer = nil
<-fsys.uploader.writeBufferChan

fh.wg.Wait()
// Check if there was an error uploading the last part
Expand Down
31 changes: 28 additions & 3 deletions upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package dxfuse

import (
"context"
"math"
"runtime"
"sync"

"github.com/dnanexus/dxda"
Expand All @@ -12,6 +14,21 @@ const (
maxUploadRoutines = 4
)

// TODO replace this with a more reasonble buffer pool for managing memory use
func (uploader *FileUploader) AllocateWriteBuffer(partId int, block bool) []byte {
if partId < 1 {
partId = 1
}
// Wait for available buffer
if block {
uploader.writeBufferChan <- struct{}{}
}
writeBufferCapacity := math.Min(InitialPartSize*math.Pow(1.1, float64(partId)), MaxPartSize)
writeBufferCapacity = math.Round(writeBufferCapacity)
writeBuffer := make([]byte, 0, int64(writeBufferCapacity))
return writeBuffer
}

type UploadRequest struct {
fh *FileHandle
writeBuffer []byte
Expand All @@ -24,6 +41,8 @@ type FileUploader struct {
uploadQueue chan UploadRequest
wg sync.WaitGroup
numUploadRoutines int
// Max write buffers being written to reduce memory consumption
writeBufferChan chan struct{}
// API to dx
ops *DxOps
}
Expand All @@ -34,10 +53,15 @@ func (uploader *FileUploader) log(a string, args ...interface{}) {
}

func NewFileUploader(verboseLevel int, options Options, dxEnv dxda.DXEnvironment) *FileUploader {

concurrentWriteBufferLimit := 15
if runtime.NumCPU()*3 > concurrentWriteBufferLimit {
concurrentWriteBufferLimit = runtime.NumCPU() * 3
}
uploader := &FileUploader{
verbose: verboseLevel >= 1,
uploadQueue: make(chan UploadRequest),
verbose: verboseLevel >= 1,
uploadQueue: make(chan UploadRequest),
// Limit of 15 concurrent file handle write buffers
writeBufferChan: make(chan struct{}, concurrentWriteBufferLimit),
numUploadRoutines: maxUploadRoutines,
ops: NewDxOps(dxEnv, options),
}
Expand All @@ -52,6 +76,7 @@ func NewFileUploader(verboseLevel int, options Options, dxEnv dxda.DXEnvironment
func (uploader *FileUploader) Shutdown() {
// Close channel and wait for goroutines to complete
close(uploader.uploadQueue)
close(uploader.writeBufferChan)
uploader.wg.Wait()
}

Expand Down
4 changes: 3 additions & 1 deletion util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ const (
LogFile = "dxfuse.log"
)
const (
HttpClientPoolSize = 8
HttpClientPoolSize = 25
FileWriteInactivityThresh = 5 * time.Minute
MaxDirSize = 255 * 1000
MaxNumFileHandles = 1000 * 1000
NumRetriesDefault = 10
InitialPartSize = 16 * MiB
MaxPartSize = 700 * MiB
Version = "v1.0.0-rc.2"
)
const (
Expand Down

0 comments on commit bd37300

Please sign in to comment.