Skip to content

Commit

Permalink
Add retry mechanism for unready files
Browse files Browse the repository at this point in the history
  • Loading branch information
darenliang committed Jul 16, 2022
1 parent 86d0423 commit 3712406
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 12 deletions.
2 changes: 2 additions & 0 deletions common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ const (
DataChannelName = "data"
MaxDiscordFileSize = 8388119
MaxDiscordMessageRequest = 100
PollInterval = 250
MaxRetries = 20
)

const (
Expand Down
51 changes: 39 additions & 12 deletions fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type FileData struct {
ctim time.Time
syncing *atomic.Bool
data []byte
load *Load
lock sync.RWMutex
dirty bool
}
Expand Down Expand Up @@ -63,6 +64,7 @@ func (fs *Dsfs) Mknod(path string, mode uint32, dev uint64) int {

fs.open[path] = &FileData{
data: make([]byte, 0),
load: newLoad(),
syncing: atomic.NewBool(false),
mtim: time.Now(),
ctim: time.Now(),
Expand Down Expand Up @@ -133,6 +135,7 @@ func (fs *Dsfs) Open(path string, flags int) (int, uint64) {

fs.open[path] = &FileData{
data: make([]byte, tx.Size),
load: newLoad(),
syncing: atomic.NewBool(false),
mtim: tx.Mtim,
ctim: tx.Ctim,
Expand Down Expand Up @@ -163,6 +166,7 @@ func (fs *Dsfs) Open(path string, flags int) (int, uint64) {
}
file.lock.Lock()
copy(file.data[ofst:ofst+n], buffer[:n])
file.load.addRange(int64(ofst), int64(ofst+n))
file.lock.Unlock()
return nil
}
Expand Down Expand Up @@ -409,18 +413,19 @@ func (fs *Dsfs) Truncate(path string, size int64, fh uint64) int {
fs.lock.Unlock()
return -fuse.ENOENT
}

filesize := int64(len(file.data))
fs.lock.Unlock()

file.lock.Lock()
filesize := int64(len(file.data))
if size == filesize {
file.lock.Unlock()
return 0
} else if size < filesize {
file.data = file.data[:size]
file.load.truncate(size)
} else {
file.data = append(file.data, make([]byte, size-filesize)...)
file.load.addRange(filesize, filesize+size)
}

file.mtim = time.Now()
Expand All @@ -444,21 +449,36 @@ func (fs *Dsfs) Read(path string, buff []byte, ofst int64, fh uint64) int {
fs.lock.Unlock()
return -fuse.ENOENT
}
fs.lock.Unlock()

filesize := int64(len(file.data))
endofst := ofst + int64(len(buff))
fs.lock.Unlock()
newEndofst := endofst
retries := 0

file.lock.RLock()
if endofst > filesize {
endofst = filesize
}
if endofst < ofst {
for {
filesize := int64(len(file.data))
if endofst > filesize {
newEndofst = filesize
}
if newEndofst < ofst {
file.lock.RUnlock()
return 0
}
if file.load.isReady(ofst, newEndofst) {
break
}
if retries >= MaxRetries {
file.lock.RUnlock()
return 0
}
retries++
file.lock.RUnlock()
return 0
time.Sleep(PollInterval * time.Millisecond)
file.lock.RLock()
}

bytesRead := copy(buff, file.data[ofst:endofst])
bytesRead := copy(buff, file.data[ofst:newEndofst])
file.lock.RUnlock()

return bytesRead
Expand All @@ -479,17 +499,20 @@ func (fs *Dsfs) Write(path string, buff []byte, ofst int64, fh uint64) int {
fs.lock.Unlock()
return -fuse.ENOENT
}
fs.lock.Unlock()

filesize := int64(len(file.data))
endofst := ofst + int64(len(buff))
fs.lock.Unlock()

file.lock.Lock()
filesize := int64(len(file.data))
if endofst > filesize {
file.data = append(file.data, make([]byte, endofst-filesize)...)
}

bytesWrite := copy(file.data[ofst:endofst], buff)
if !file.load.isReady(ofst, endofst) {
file.load.addRange(ofst, endofst)
}
file.mtim = time.Now()
file.dirty = true
file.lock.Unlock()
Expand Down Expand Up @@ -719,6 +742,7 @@ func (fs *Dsfs) ApplyLiveTx(pathBytes []byte, tx Tx) error {
filesize := int64(len(file.data))
if tx.Size < filesize {
file.data = file.data[:tx.Size]
file.load.truncate(tx.Size)
} else if tx.Size > filesize {
file.data = append(file.data, make([]byte, tx.Size-filesize)...)
}
Expand Down Expand Up @@ -754,6 +778,9 @@ func (fs *Dsfs) ApplyLiveTx(pathBytes []byte, tx Tx) error {

file.lock.Lock()
copy(file.data[ofst:ofst+n], buffer)
if !file.load.isReady(int64(ofst), int64(ofst+n)) {
file.load.addRange(int64(ofst), int64(ofst+n))
}
file.lock.Unlock()
}

Expand Down
48 changes: 48 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,51 @@ func getDir(path string) string {
// Handle weirdness with Windows
return strings.ReplaceAll(filepath.Dir(path), "\\", "/")
}

type Range struct {
start, end int64
}

type Load struct {
ranges []Range
}

func (load *Load) addRange(start, end int64) {
load.ranges = append(load.ranges, Range{start, end})
}

func (load *Load) truncate(end int64) {
var newRanges []Range
for _, v := range load.ranges {
if v.start >= end {
continue
}
if v.end > end {
newRanges = append(newRanges, Range{v.start, end})
continue
}
newRanges = append(newRanges, v)
}
load.ranges = newRanges
}

func (load *Load) isReady(start, end int64) bool {
for i := start; i < end; i++ {
seen := false
for _, v := range load.ranges {
if v.start <= i && i < v.end {
seen = true
i = v.end - 1
break
}
}
if !seen {
return false
}
}
return true
}

func newLoad() *Load {
return &Load{ranges: make([]Range, 0)}
}

0 comments on commit 3712406

Please sign in to comment.