Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(yamux): change closedRemotely from Future into AsyncEvent #1133

Merged
merged 1 commit into from
Jun 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions libp2p/muxers/yamux/yamux.nim
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,15 @@ type
recvQueue: seq[byte]
isReset: bool
remoteReset: bool
closedRemotely: Future[void].Raising([])
closedRemotely: AsyncEvent
closedLocally: bool
receivedData: AsyncEvent

proc `$`(channel: YamuxChannel): string =
result = if channel.conn.dir == Out: "=> " else: "<= "
result &= $channel.id
var s: seq[string] = @[]
if channel.closedRemotely.completed():
if channel.closedRemotely.isSet():
s.add("ClosedRemotely")
if channel.closedLocally:
s.add("ClosedLocally")
Expand Down Expand Up @@ -198,12 +198,12 @@ proc lengthSendQueueWithLimit(channel: YamuxChannel): int =

proc actuallyClose(channel: YamuxChannel) {.async: (raises: []).} =
if channel.closedLocally and channel.sendQueue.len == 0 and
channel.closedRemotely.completed():
channel.closedRemotely.isSet():
await procCall Connection(channel).closeImpl()

proc remoteClosed(channel: YamuxChannel) {.async: (raises: []).} =
if not channel.closedRemotely.completed():
channel.closedRemotely.complete()
if not channel.closedRemotely.isSet():
channel.closedRemotely.fire()
await channel.actuallyClose()

method closeImpl*(channel: YamuxChannel) {.async: (raises: []).} =
Expand Down Expand Up @@ -239,7 +239,7 @@ proc reset(channel: YamuxChannel, isLocal: bool = false) {.async: (raises: []).}
except CancelledError, LPStreamError:
discard
await channel.close()
if not channel.closedRemotely.completed():
if not channel.closedRemotely.isSet():
await channel.remoteClosed()
channel.receivedData.fire()
if not isLocal:
Expand Down Expand Up @@ -280,10 +280,10 @@ method readOnce*(
if channel.recvQueue.len == 0:
channel.receivedData.clear()
try: # https://github.com/status-im/nim-chronos/issues/516
discard await race(channel.closedRemotely, channel.receivedData.wait())
discard await race(channel.closedRemotely.wait(), channel.receivedData.wait())
except ValueError:
raiseAssert("Futures list is not empty")
if channel.closedRemotely.completed() and channel.recvQueue.len == 0:
if channel.closedRemotely.isSet() and channel.recvQueue.len == 0:
channel.isEof = true
return
0 # we return 0 to indicate that the channel is closed for reading from now on
Expand Down Expand Up @@ -460,9 +460,6 @@ proc createStream(
# that the initial recvWindow is 256k.
# To solve this contradiction, no updateWindow will be sent until
# recvWindow is less than maxRecvWindow
proc newClosedRemotelyFut(): Future[void] {.async: (raises: [], raw: true).} =
newFuture[void]()

var stream = YamuxChannel(
id: id,
maxRecvWindow: recvWindow,
Expand All @@ -473,7 +470,7 @@ proc createStream(
isSrc: isSrc,
conn: m.connection,
receivedData: newAsyncEvent(),
closedRemotely: newClosedRemotelyFut(),
closedRemotely: newAsyncEvent(),
)
stream.objName = "YamuxStream"
if isSrc:
Expand Down
Loading