Skip to content

Commit

Permalink
add fd-overload retries inline (#75)
Browse files Browse the repository at this point in the history
add fd-overload retries inline
  • Loading branch information
willscott authored Apr 10, 2020
1 parent 01ce5da commit 53d4c9b
Showing 1 changed file with 80 additions and 14 deletions.
94 changes: 80 additions & 14 deletions flatfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -48,6 +49,9 @@ var (
// If this period did not suffice to read the size of the datastore,
// the remaining sizes will be stimated.
DiskUsageCalcTimeout = 5 * time.Minute
// RetryDelay is a timeout for a backoff on retrying operations
// that fail due to transient errors like too many file descriptors open.
RetryDelay = time.Millisecond * 200
)

const (
Expand Down Expand Up @@ -432,6 +436,15 @@ func (fs *Datastore) doOp(oper *op) error {
}
}

func isTooManyFDError(err error) bool {
perr, ok := err.(*os.PathError)
if ok && perr.Err == syscall.EMFILE {
return true
}

return false
}

// doWrite optimizes out write operations (put/delete) to the same
// key by queueing them and succeeding all queued
// operations if one of them does. In such case,
Expand All @@ -447,7 +460,15 @@ func (fs *Datastore) doWriteOp(oper *op) error {
}

// Do the operation
err := fs.doOp(oper)
var err error
for i := 0; i < 6; i++ {
err = fs.doOp(oper)

if err == nil || !isTooManyFDError(err) {
break
}
time.Sleep(time.Duration(i+1) * RetryDelay)
}

// Finish it. If no error, it will signal other operations
// waiting on this result to succeed. Otherwise, they will
Expand Down Expand Up @@ -532,6 +553,28 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error {
}
}()

closer := func() error {
for fi := range files {
if ops[fi] != 0 {
continue
}

if fs.sync {
if err := syncFile(fi); err != nil {
return err
}
}

if err := fi.Close(); err != nil {
return err
}

// signify closed
ops[fi] = 1
}
return nil
}

for key, value := range data {
dir, path := fs.encode(key)
if err := fs.makeDirNoSync(dir); err != nil {
Expand All @@ -540,6 +583,21 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error {
dirsToSync = append(dirsToSync, dir)

tmp, err := fs.tempFile()

// Fallback retry for temporary error.
if err != nil && isTooManyFDError(err) {
if err = closer(); err != nil {
return err
}
for i := 0; i < 6; i++ {
time.Sleep(time.Duration(i+1) * RetryDelay)

tmp, err = fs.tempFile()
if err == nil || !isTooManyFDError(err) {
break
}
}
}
if err != nil {
return err
}
Expand All @@ -558,19 +616,9 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error {

// Now we sync everything
// sync and close files
for fi := range files {
if fs.sync {
if err := syncFile(fi); err != nil {
return err
}
}

if err := fi.Close(); err != nil {
return err
}

// signify closed
ops[fi] = 1
err := closer()
if err != nil {
return err
}

// move files to their proper places
Expand Down Expand Up @@ -601,6 +649,24 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error {
}

func (fs *Datastore) Get(key datastore.Key) (value []byte, err error) {
value, err = fs.get(key)

// Fallback retry for temporary error.
if err != nil && isTooManyFDError(err) {
for i := 0; i < 6; i++ {
time.Sleep(time.Duration(i+1) * RetryDelay)

value, err = fs.get(key)
if err == nil || !isTooManyFDError(err) {
break
}
}
}

return
}

func (fs *Datastore) get(key datastore.Key) (value []byte, err error) {
// Can't exist in datastore.
if !keyIsValid(key) {
return nil, datastore.ErrNotFound
Expand Down

0 comments on commit 53d4c9b

Please sign in to comment.