Skip to content

Commit

Permalink
chore(yamux): change closedRemotely from Future into AsyncEvent (#1133)
Browse files Browse the repository at this point in the history
lchenut authored Jun 21, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 0f27f89 commit d1d53ff
Showing 1 changed file with 9 additions and 12 deletions.
21 changes: 9 additions & 12 deletions libp2p/muxers/yamux/yamux.nim
Original file line number Diff line number Diff line change
@@ -155,15 +155,15 @@ type
recvQueue: seq[byte]
isReset: bool
remoteReset: bool
closedRemotely: Future[void].Raising([])
closedRemotely: AsyncEvent
closedLocally: bool
receivedData: AsyncEvent

proc `$`(channel: YamuxChannel): string =
result = if channel.conn.dir == Out: "=> " else: "<= "
result &= $channel.id
var s: seq[string] = @[]
if channel.closedRemotely.completed():
if channel.closedRemotely.isSet():
s.add("ClosedRemotely")
if channel.closedLocally:
s.add("ClosedLocally")
@@ -198,12 +198,12 @@ proc lengthSendQueueWithLimit(channel: YamuxChannel): int =

proc actuallyClose(channel: YamuxChannel) {.async: (raises: []).} =
if channel.closedLocally and channel.sendQueue.len == 0 and
channel.closedRemotely.completed():
channel.closedRemotely.isSet():
await procCall Connection(channel).closeImpl()

proc remoteClosed(channel: YamuxChannel) {.async: (raises: []).} =
if not channel.closedRemotely.completed():
channel.closedRemotely.complete()
if not channel.closedRemotely.isSet():
channel.closedRemotely.fire()
await channel.actuallyClose()

method closeImpl*(channel: YamuxChannel) {.async: (raises: []).} =
@@ -239,7 +239,7 @@ proc reset(channel: YamuxChannel, isLocal: bool = false) {.async: (raises: []).}
except CancelledError, LPStreamError:
discard
await channel.close()
if not channel.closedRemotely.completed():
if not channel.closedRemotely.isSet():
await channel.remoteClosed()
channel.receivedData.fire()
if not isLocal:
@@ -280,10 +280,10 @@ method readOnce*(
if channel.recvQueue.len == 0:
channel.receivedData.clear()
try: # https://github.com/status-im/nim-chronos/issues/516
discard await race(channel.closedRemotely, channel.receivedData.wait())
discard await race(channel.closedRemotely.wait(), channel.receivedData.wait())
except ValueError:
raiseAssert("Futures list is not empty")
if channel.closedRemotely.completed() and channel.recvQueue.len == 0:
if channel.closedRemotely.isSet() and channel.recvQueue.len == 0:
channel.isEof = true
return
0 # we return 0 to indicate that the channel is closed for reading from now on
@@ -460,9 +460,6 @@ proc createStream(
# that the initial recvWindow is 256k.
# To solve this contradiction, no updateWindow will be sent until
# recvWindow is less than maxRecvWindow
proc newClosedRemotelyFut(): Future[void] {.async: (raises: [], raw: true).} =
newFuture[void]()

var stream = YamuxChannel(
id: id,
maxRecvWindow: recvWindow,
@@ -473,7 +470,7 @@ proc createStream(
isSrc: isSrc,
conn: m.connection,
receivedData: newAsyncEvent(),
closedRemotely: newClosedRemotelyFut(),
closedRemotely: newAsyncEvent(),
)
stream.objName = "YamuxStream"
if isSrc:

0 comments on commit d1d53ff

Please sign in to comment.