Skip to content

Commit

Permalink
save progress
Browse files Browse the repository at this point in the history
  • Loading branch information
dvovk committed Dec 5, 2024
2 parents d1c0a59 + a4d8f94 commit 3bee769
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 43 deletions.
14 changes: 9 additions & 5 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,13 +760,17 @@ func runSafeExtraneous(f func()) {
}

// Returns true if it was valid to reject the request.
func (c *Peer) remoteRejectedRequest(r RequestIndex) bool {
func (c *Peer) remoteRejectedRequest(r RequestIndex, lock bool, lockTorrent bool) bool {
if !func() bool {
c.t.mu.Lock()
defer c.t.mu.Unlock()
if lockTorrent {
c.t.mu.Lock()
defer c.t.mu.Unlock()
}

c.mu.Lock()
defer c.mu.Unlock()
if lock {
c.mu.Lock()
defer c.mu.Unlock()
}

if c.deleteRequest(r, false, false) {
c.decPeakRequests(false)
Expand Down
2 changes: 1 addition & 1 deletion peerconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
err = c.peerSentHaveNone(true)
case pp.Reject:
req := newRequestFromMessage(&msg)
if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req, false)) {
if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req, false), true, true) {
err = fmt.Errorf("received invalid reject for request %v", req)
c.logger.Levelf(log.Debug, "%v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3382,7 +3382,7 @@ func (t *Torrent) addWebSeed(url string, lock bool, opts ...AddWebSeedsOpt) {
Url: url,
},
maxRequesters: maxRequests,
activeRequests: make(map[Request]webseed.Request, maxRequests),
activeRequests: make(map[Request]*webseed.Request, maxRequests),
// Limit requests rather than responses - becuase otherwise
// the go http layer buffers causing memory growth
requestRateLimiter: t.cl.config.DownloadRateLimiter,
Expand Down
42 changes: 19 additions & 23 deletions webseed-peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type webseedPeer struct {
// First field for stats alignment.
peer Peer
client webseed.Client
activeRequests map[Request]webseed.Request
activeRequests map[Request]*webseed.Request
maxActiveRequests int // the max number of active requests for this peer
processedRequests int // the total number of requests this peer has processed
maxRequesters int // the number of requester to run for this peer
Expand Down Expand Up @@ -81,7 +81,7 @@ func (ws *webseedPeer) writeInterested(interested bool, lock bool) bool {
}

func (ws *webseedPeer) _cancel(r RequestIndex, lock bool, lockTorrent bool) bool {
if active, ok := func() (active webseed.Request, ok bool) {
if active, ok := func() (active *webseed.Request, ok bool) {
req := ws.peer.t.requestIndexToRequest(r, lockTorrent)

if lock {
Expand All @@ -98,14 +98,7 @@ func (ws *webseedPeer) _cancel(r RequestIndex, lock bool, lockTorrent bool) bool
defer ws.peer.mu.RUnlock()
}

if active.Context().Err() == nil {
LOGDBG(fmt.Sprintf("Ctx alive cancelling request:%v-%s\n", r, ws.peer.RemoteAddr.String() + "/" + ws.peer.t.info.Name), "webseed-peer.go->webseedPeer._cancel")
active.Cancel()
} else {
LOGDBG(fmt.Sprintf("start-Ctx dead cancelling request:%v-%s\n", r, ws.peer.RemoteAddr.String() + "/" + ws.peer.t.info.Name), "webseed-peer.go->webseedPeer._cancel")
ws.peer.RemoveRequestCancelled(r, "Ctx dead cancel")
LOGDBG(fmt.Sprintf("end-Ctx dead cancelling request:%v-%s\n", r, ws.peer.RemoteAddr.String() + "/" + ws.peer.t.info.Name), "webseed-peer.go->webseedPeer._cancel")
}
active.Cancel()
// The requester is running and will handle the result.
return true
}
Expand Down Expand Up @@ -169,7 +162,8 @@ func (cn *webseedPeer) nominalMaxRequests(lock bool, lockTorrent bool) maxReques
var limitedBuffPool = storage.NewLimitedBufferPool(bufPool, 5_000_000_000)

func (ws *webseedPeer) doRequest(r Request) error {
webseedRequest := ws.client.NewRequest(ws.intoSpec(r), limitedBuffPool, ws.requestRateLimiter, &ws.receiving)
webseedRequest := ws.client.NewRequest(ws.intoSpec(r), limitedBuffPool, ws.requestRateLimiter, &ws.receiving,
func() { ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r, false), false, false) } )

ws.peer.mu.Lock()
ws.activeRequests[r] = webseedRequest
Expand All @@ -180,17 +174,15 @@ func (ws *webseedPeer) doRequest(r Request) error {
}
ws.peer.mu.Unlock()

err := func() error {
ws.requesterCond.L.Unlock()
defer ws.requesterCond.L.Lock()
return ws.requestResultHandler(r, webseedRequest)
ws.requesterCond.L.Unlock()
defer func() {
ws.requesterCond.L.Lock()
ws.peer.mu.Lock()
delete(ws.activeRequests, r)
ws.peer.mu.Unlock()
}()

ws.peer.mu.Lock()
delete(ws.activeRequests, r)
ws.peer.mu.Unlock()

return err
return ws.requestResultHandler(r, webseedRequest)
}

func (ws *webseedPeer) requester(i int) {
Expand Down Expand Up @@ -550,7 +542,7 @@ func (ws *webseedPeer) onClose(lockTorrent bool) {
ws.requesterCond.Broadcast()
}

func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) error {
func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest *webseed.Request) error {
result := <-webseedRequest.Result
close(webseedRequest.Result) // one-shot

Expand All @@ -560,6 +552,10 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
}
}()

webseedRequest.Lock()
webseedRequest.Result = nil
webseedRequest.Unlock()

ws.persisting.Add(1)
defer ws.persisting.Add(-1)

Expand All @@ -581,8 +577,8 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
if len(piece) != 0 || result.Err == nil {
// Increment ChunksRead and friends
ws.peer.doChunkReadStats(int64(len(piece)))
ws.peer.readBytes(int64(len(piece)))
}
ws.peer.readBytes(int64(len(piece)))

if ws.peer.t.closed.IsSet() {
//log
Expand Down Expand Up @@ -617,7 +613,7 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
}

LOGDBG(fmt.Sprintf("strat-remoteRejectedRequest request:%v-%s\n", reqIdx, ws.peer.RemoteAddr.String() + "/" + ws.peer.t.info.Name), "webseed-peer.go->webseedPeer.requestResultHandler")
if !ws.peer.remoteRejectedRequest(reqIdx) {
if !ws.peer.remoteRejectedRequest(reqIdx, true, true) {
err = fmt.Errorf(`received invalid reject "%w", for request %v`, err, r)
ws.peer.logger.Levelf(log.Debug, "%v", err)
}
Expand Down
31 changes: 18 additions & 13 deletions webseed/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log"
"net/http"
"strings"
"sync"
"sync/atomic"

"github.com/RoaringBitmap/roaring"
Expand All @@ -30,18 +31,22 @@ type requestPart struct {
}

type Request struct {
ctx context.Context
cancel func()
Result chan RequestResult
readers []io.Reader
sync.Mutex
cancel func()
onCancelled func()
Result chan RequestResult
}

func (r Request) Cancel() {
func (r *Request) Cancel() {
r.cancel()
}

func (r Request) Context() context.Context {
return r.ctx
r.Lock()
hasResult := r.Result == nil
r.Unlock()

if hasResult {
r.onCancelled()
}
}

type Client struct {
Expand Down Expand Up @@ -77,7 +82,7 @@ type RequestResult struct {
Err error
}

func (ws *Client) NewRequest(r RequestSpec, buffers storage.BufferPool, limiter *rate.Limiter, receivingCounter *atomic.Int64) Request {
func (ws *Client) NewRequest(r RequestSpec, buffers storage.BufferPool, limiter *rate.Limiter, receivingCounter *atomic.Int64, onCancelled func()) *Request {
ctx, cancel := context.WithCancel(context.Background())
var requestParts []requestPart
if !ws.fileIndex.Locate(r, func(i int, e segments.Extent) bool {
Expand Down Expand Up @@ -120,10 +125,10 @@ func (ws *Client) NewRequest(r RequestSpec, buffers storage.BufferPool, limiter
}) {
panic("request out of file bounds")
}
req := Request{
ctx: ctx,
cancel: cancel,
Result: make(chan RequestResult, 1),
req := &Request{
cancel: cancel,
onCancelled: onCancelled,
Result: make(chan RequestResult, 1),
}
go func() {
readers, err := readRequestPartResponses(ctx, requestParts, receivingCounter)
Expand Down

0 comments on commit 3bee769

Please sign in to comment.