diff --git a/peer.go b/peer.go index 6e9ec49d7c..9e26a9f0cd 100644 --- a/peer.go +++ b/peer.go @@ -760,13 +760,17 @@ func runSafeExtraneous(f func()) { } // Returns true if it was valid to reject the request. -func (c *Peer) remoteRejectedRequest(r RequestIndex) bool { +func (c *Peer) remoteRejectedRequest(r RequestIndex, lock bool, lockTorrent bool) bool { if !func() bool { - c.t.mu.Lock() - defer c.t.mu.Unlock() + if lockTorrent { + c.t.mu.Lock() + defer c.t.mu.Unlock() + } - c.mu.Lock() - defer c.mu.Unlock() + if lock { + c.mu.Lock() + defer c.mu.Unlock() + } if c.deleteRequest(r, false, false) { c.decPeakRequests(false) diff --git a/peerconn.go b/peerconn.go index cc5506b57d..0104fab44a 100644 --- a/peerconn.go +++ b/peerconn.go @@ -1029,7 +1029,7 @@ func (c *PeerConn) mainReadLoop() (err error) { err = c.peerSentHaveNone(true) case pp.Reject: req := newRequestFromMessage(&msg) - if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req, false)) { + if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req, false), true, true) { err = fmt.Errorf("received invalid reject for request %v", req) c.logger.Levelf(log.Debug, "%v", err) } diff --git a/torrent.go b/torrent.go index efe6d99708..58658a3d8f 100644 --- a/torrent.go +++ b/torrent.go @@ -3382,7 +3382,7 @@ func (t *Torrent) addWebSeed(url string, lock bool, opts ...AddWebSeedsOpt) { Url: url, }, maxRequesters: maxRequests, - activeRequests: make(map[Request]webseed.Request, maxRequests), + activeRequests: make(map[Request]*webseed.Request, maxRequests), // Limit requests rather than responses - becuase otherwise // the go http layer buffers causing memory growth requestRateLimiter: t.cl.config.DownloadRateLimiter, diff --git a/webseed-peer.go b/webseed-peer.go index f35b4abd88..81f5eba0ea 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -31,7 +31,7 @@ type webseedPeer struct { // First field for stats alignment. peer Peer client webseed.Client - activeRequests map[Request]webseed.Request + activeRequests map[Request]*webseed.Request maxActiveRequests int // the max number of active requests for this peer processedRequests int // the total number of requests this peer has processed maxRequesters int // the number of requester to run for this peer @@ -81,7 +81,7 @@ func (ws *webseedPeer) writeInterested(interested bool, lock bool) bool { } func (ws *webseedPeer) _cancel(r RequestIndex, lock bool, lockTorrent bool) bool { - if active, ok := func() (active webseed.Request, ok bool) { + if active, ok := func() (active *webseed.Request, ok bool) { req := ws.peer.t.requestIndexToRequest(r, lockTorrent) if lock { @@ -98,14 +98,7 @@ func (ws *webseedPeer) _cancel(r RequestIndex, lock bool, lockTorrent bool) bool defer ws.peer.mu.RUnlock() } - if active.Context().Err() == nil { - LOGDBG(fmt.Sprintf("Ctx alive cancelling request:%v-%s\n", r, ws.peer.RemoteAddr.String() + "/" + ws.peer.t.info.Name), "webseed-peer.go->webseedPeer._cancel") - active.Cancel() - } else { - LOGDBG(fmt.Sprintf("start-Ctx dead cancelling request:%v-%s\n", r, ws.peer.RemoteAddr.String() + "/" + ws.peer.t.info.Name), "webseed-peer.go->webseedPeer._cancel") - ws.peer.RemoveRequestCancelled(r, "Ctx dead cancel") - LOGDBG(fmt.Sprintf("end-Ctx dead cancelling request:%v-%s\n", r, ws.peer.RemoteAddr.String() + "/" + ws.peer.t.info.Name), "webseed-peer.go->webseedPeer._cancel") - } + active.Cancel() // The requester is running and will handle the result. return true } @@ -169,7 +162,8 @@ func (cn *webseedPeer) nominalMaxRequests(lock bool, lockTorrent bool) maxReques var limitedBuffPool = storage.NewLimitedBufferPool(bufPool, 5_000_000_000) func (ws *webseedPeer) doRequest(r Request) error { - webseedRequest := ws.client.NewRequest(ws.intoSpec(r), limitedBuffPool, ws.requestRateLimiter, &ws.receiving) + webseedRequest := ws.client.NewRequest(ws.intoSpec(r), limitedBuffPool, ws.requestRateLimiter, &ws.receiving, + func() { ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r, false), false, false) } ) ws.peer.mu.Lock() ws.activeRequests[r] = webseedRequest @@ -180,17 +174,15 @@ func (ws *webseedPeer) doRequest(r Request) error { } ws.peer.mu.Unlock() - err := func() error { - ws.requesterCond.L.Unlock() - defer ws.requesterCond.L.Lock() - return ws.requestResultHandler(r, webseedRequest) + ws.requesterCond.L.Unlock() + defer func() { + ws.requesterCond.L.Lock() + ws.peer.mu.Lock() + delete(ws.activeRequests, r) + ws.peer.mu.Unlock() }() - ws.peer.mu.Lock() - delete(ws.activeRequests, r) - ws.peer.mu.Unlock() - - return err + return ws.requestResultHandler(r, webseedRequest) } func (ws *webseedPeer) requester(i int) { @@ -550,7 +542,7 @@ func (ws *webseedPeer) onClose(lockTorrent bool) { ws.requesterCond.Broadcast() } -func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) error { +func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest *webseed.Request) error { result := <-webseedRequest.Result close(webseedRequest.Result) // one-shot @@ -560,6 +552,10 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re } }() + webseedRequest.Lock() + webseedRequest.Result = nil + webseedRequest.Unlock() + ws.persisting.Add(1) defer ws.persisting.Add(-1) @@ -581,8 +577,8 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re if len(piece) != 0 || result.Err == nil { // Increment ChunksRead and friends ws.peer.doChunkReadStats(int64(len(piece))) + ws.peer.readBytes(int64(len(piece))) } - ws.peer.readBytes(int64(len(piece))) if ws.peer.t.closed.IsSet() { //log @@ -617,7 +613,7 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re } LOGDBG(fmt.Sprintf("strat-remoteRejectedRequest request:%v-%s\n", reqIdx, ws.peer.RemoteAddr.String() + "/" + ws.peer.t.info.Name), "webseed-peer.go->webseedPeer.requestResultHandler") - if !ws.peer.remoteRejectedRequest(reqIdx) { + if !ws.peer.remoteRejectedRequest(reqIdx, true, true) { err = fmt.Errorf(`received invalid reject "%w", for request %v`, err, r) ws.peer.logger.Levelf(log.Debug, "%v", err) } diff --git a/webseed/client.go b/webseed/client.go index 46ac49a397..e28c050eef 100644 --- a/webseed/client.go +++ b/webseed/client.go @@ -8,6 +8,7 @@ import ( "log" "net/http" "strings" + "sync" "sync/atomic" "github.com/RoaringBitmap/roaring" @@ -30,18 +31,22 @@ type requestPart struct { } type Request struct { - ctx context.Context - cancel func() - Result chan RequestResult - readers []io.Reader + sync.Mutex + cancel func() + onCancelled func() + Result chan RequestResult } -func (r Request) Cancel() { +func (r *Request) Cancel() { r.cancel() -} -func (r Request) Context() context.Context { - return r.ctx + r.Lock() + hasResult := r.Result == nil + r.Unlock() + + if hasResult { + r.onCancelled() + } } type Client struct { @@ -77,7 +82,7 @@ type RequestResult struct { Err error } -func (ws *Client) NewRequest(r RequestSpec, buffers storage.BufferPool, limiter *rate.Limiter, receivingCounter *atomic.Int64) Request { +func (ws *Client) NewRequest(r RequestSpec, buffers storage.BufferPool, limiter *rate.Limiter, receivingCounter *atomic.Int64, onCancelled func()) *Request { ctx, cancel := context.WithCancel(context.Background()) var requestParts []requestPart if !ws.fileIndex.Locate(r, func(i int, e segments.Extent) bool { @@ -120,10 +125,10 @@ func (ws *Client) NewRequest(r RequestSpec, buffers storage.BufferPool, limiter }) { panic("request out of file bounds") } - req := Request{ - ctx: ctx, - cancel: cancel, - Result: make(chan RequestResult, 1), + req := &Request{ + cancel: cancel, + onCancelled: onCancelled, + Result: make(chan RequestResult, 1), } go func() { readers, err := readRequestPartResponses(ctx, requestParts, receivingCounter)