From 3712406e9c61b51c3ff11f614bece61dabb53d17 Mon Sep 17 00:00:00 2001 From: Daren Liang Date: Sat, 16 Jul 2022 14:26:25 -0400 Subject: [PATCH] Add retry mechanism for unready files --- common.go | 2 ++ fs.go | 51 +++++++++++++++++++++++++++++++++++++++------------ utils.go | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 89 insertions(+), 12 deletions(-) diff --git a/common.go b/common.go index 58d05ab..7cc1ba2 100644 --- a/common.go +++ b/common.go @@ -5,6 +5,8 @@ const ( DataChannelName = "data" MaxDiscordFileSize = 8388119 MaxDiscordMessageRequest = 100 + PollInterval = 250 + MaxRetries = 20 ) const ( diff --git a/fs.go b/fs.go index 65244aa..d2c4971 100644 --- a/fs.go +++ b/fs.go @@ -27,6 +27,7 @@ type FileData struct { ctim time.Time syncing *atomic.Bool data []byte + load *Load lock sync.RWMutex dirty bool } @@ -63,6 +64,7 @@ func (fs *Dsfs) Mknod(path string, mode uint32, dev uint64) int { fs.open[path] = &FileData{ data: make([]byte, 0), + load: newLoad(), syncing: atomic.NewBool(false), mtim: time.Now(), ctim: time.Now(), @@ -133,6 +135,7 @@ func (fs *Dsfs) Open(path string, flags int) (int, uint64) { fs.open[path] = &FileData{ data: make([]byte, tx.Size), + load: newLoad(), syncing: atomic.NewBool(false), mtim: tx.Mtim, ctim: tx.Ctim, @@ -163,6 +166,7 @@ func (fs *Dsfs) Open(path string, flags int) (int, uint64) { } file.lock.Lock() copy(file.data[ofst:ofst+n], buffer[:n]) + file.load.addRange(int64(ofst), int64(ofst+n)) file.lock.Unlock() return nil } @@ -409,18 +413,19 @@ func (fs *Dsfs) Truncate(path string, size int64, fh uint64) int { fs.lock.Unlock() return -fuse.ENOENT } - - filesize := int64(len(file.data)) fs.lock.Unlock() file.lock.Lock() + filesize := int64(len(file.data)) if size == filesize { file.lock.Unlock() return 0 } else if size < filesize { file.data = file.data[:size] + file.load.truncate(size) } else { file.data = append(file.data, make([]byte, size-filesize)...) + file.load.addRange(filesize, filesize+size) } file.mtim = time.Now() @@ -444,21 +449,36 @@ func (fs *Dsfs) Read(path string, buff []byte, ofst int64, fh uint64) int { fs.lock.Unlock() return -fuse.ENOENT } + fs.lock.Unlock() - filesize := int64(len(file.data)) endofst := ofst + int64(len(buff)) - fs.lock.Unlock() + newEndofst := endofst + retries := 0 file.lock.RLock() - if endofst > filesize { - endofst = filesize - } - if endofst < ofst { + for { + filesize := int64(len(file.data)) + if endofst > filesize { + newEndofst = filesize + } + if newEndofst < ofst { + file.lock.RUnlock() + return 0 + } + if file.load.isReady(ofst, newEndofst) { + break + } + if retries >= MaxRetries { + file.lock.RUnlock() + return 0 + } + retries++ file.lock.RUnlock() - return 0 + time.Sleep(PollInterval * time.Millisecond) + file.lock.RLock() } - bytesRead := copy(buff, file.data[ofst:endofst]) + bytesRead := copy(buff, file.data[ofst:newEndofst]) file.lock.RUnlock() return bytesRead @@ -479,17 +499,20 @@ func (fs *Dsfs) Write(path string, buff []byte, ofst int64, fh uint64) int { fs.lock.Unlock() return -fuse.ENOENT } + fs.lock.Unlock() - filesize := int64(len(file.data)) endofst := ofst + int64(len(buff)) - fs.lock.Unlock() file.lock.Lock() + filesize := int64(len(file.data)) if endofst > filesize { file.data = append(file.data, make([]byte, endofst-filesize)...) } bytesWrite := copy(file.data[ofst:endofst], buff) + if !file.load.isReady(ofst, endofst) { + file.load.addRange(ofst, endofst) + } file.mtim = time.Now() file.dirty = true file.lock.Unlock() @@ -719,6 +742,7 @@ func (fs *Dsfs) ApplyLiveTx(pathBytes []byte, tx Tx) error { filesize := int64(len(file.data)) if tx.Size < filesize { file.data = file.data[:tx.Size] + file.load.truncate(tx.Size) } else if tx.Size > filesize { file.data = append(file.data, make([]byte, tx.Size-filesize)...) } @@ -754,6 +778,9 @@ func (fs *Dsfs) ApplyLiveTx(pathBytes []byte, tx Tx) error { file.lock.Lock() copy(file.data[ofst:ofst+n], buffer) + if !file.load.isReady(int64(ofst), int64(ofst+n)) { + file.load.addRange(int64(ofst), int64(ofst+n)) + } file.lock.Unlock() } diff --git a/utils.go b/utils.go index e69d929..ac04410 100644 --- a/utils.go +++ b/utils.go @@ -9,3 +9,51 @@ func getDir(path string) string { // Handle weirdness with Windows return strings.ReplaceAll(filepath.Dir(path), "\\", "/") } + +type Range struct { + start, end int64 +} + +type Load struct { + ranges []Range +} + +func (load *Load) addRange(start, end int64) { + load.ranges = append(load.ranges, Range{start, end}) +} + +func (load *Load) truncate(end int64) { + var newRanges []Range + for _, v := range load.ranges { + if v.start >= end { + continue + } + if v.end > end { + newRanges = append(newRanges, Range{v.start, end}) + continue + } + newRanges = append(newRanges, v) + } + load.ranges = newRanges +} + +func (load *Load) isReady(start, end int64) bool { + for i := start; i < end; i++ { + seen := false + for _, v := range load.ranges { + if v.start <= i && i < v.end { + seen = true + i = v.end - 1 + break + } + } + if !seen { + return false + } + } + return true +} + +func newLoad() *Load { + return &Load{ranges: make([]Range, 0)} +}