Skip to content

Commit

Permalink
futures: sinkify (#475)
Browse files Browse the repository at this point in the history
This avoids copies here and there throughout the pipeline - ie
`copyString` and friends can often be avoided when moving things into
and out of futures

Annoyingly, one has to sprinkle the codebase liberally with `sink` and
`move` for the pipeline to work well - sink stuff _generally_ works
better in orc/arc

Looking at nim 1.6/refc, sink + local variable + move generates the best
code:

msg directly:
```nim
	T1_ = (*colonenv_).msg1; (*colonenv_).msg1 = copyStringRC1(msg);
```

local copy without move:
```nim
	T60_ = (*colonenv_).localCopy1; (*colonenv_).localCopy1 =
copyStringRC1(msg);
```

local copy with move:
```nim
	asgnRef((void**) (&(*colonenv_).localCopy1), msg);
```

Annoyingly, sink is also broken for refc+literals as it tries to
changes the refcount of the literal as part of the move (which shouldn't
be happening, but here we are), so we have to use a hack to find
literals and avoid moving them.
  • Loading branch information
arnetheduck authored Nov 19, 2023
1 parent 0b136b3 commit f03cdfc
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 40 deletions.
37 changes: 37 additions & 0 deletions chronos/config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,40 @@ when defined(debug) or defined(chronosConfig):
printOption("chronosEventEngine", chronosEventEngine)
printOption("chronosEventsCount", chronosEventsCount)
printOption("chronosInitialSize", chronosInitialSize)


# In nim 1.6, `sink` + local variable + `move` generates the best code for
# moving a proc parameter into a closure - this only works for closure
# procedures however - in closure iterators, the parameter is always copied
# into the closure (!) meaning that non-raw `{.async.}` functions always carry
# this overhead, sink or no. See usages of chronosMoveSink for examples.
# In addition, we need to work around https://github.com/nim-lang/Nim/issues/22175
# which has not been backported to 1.6.
# Long story short, the workaround is not needed in non-raw {.async.} because
# a copy of the literal is always made.
# TODO review the above for 2.0 / 2.0+refc
type
SeqHeader = object
length, reserved: int

proc isLiteral(s: string): bool {.inline.} =
when defined(gcOrc) or defined(gcArc):
false
else:
s.len > 0 and (cast[ptr SeqHeader](s).reserved and (1 shl (sizeof(int) * 8 - 2))) != 0

proc isLiteral[T](s: seq[T]): bool {.inline.} =
when defined(gcOrc) or defined(gcArc):
false
else:
s.len > 0 and (cast[ptr SeqHeader](s).reserved and (1 shl (sizeof(int) * 8 - 2))) != 0

template chronosMoveSink*(val: auto): untyped =
bind isLiteral
when not (defined(gcOrc) or defined(gcArc)) and val is seq|string:
if isLiteral(val):
val
else:
move(val)
else:
move(val)
6 changes: 3 additions & 3 deletions chronos/internal/asyncfutures.nim
Original file line number Diff line number Diff line change
Expand Up @@ -202,14 +202,14 @@ proc finish(fut: FutureBase, state: FutureState) =
when chronosFutureTracking:
scheduleDestructor(fut)

proc complete[T](future: Future[T], val: T, loc: ptr SrcLoc) =
proc complete[T](future: Future[T], val: sink T, loc: ptr SrcLoc) =
if not(future.cancelled()):
checkFinished(future, loc)
doAssert(isNil(future.internalError))
future.internalValue = val
future.internalValue = chronosMoveSink(val)
future.finish(FutureState.Completed)

template complete*[T](future: Future[T], val: T) =
template complete*[T](future: Future[T], val: sink T) =
## Completes ``future`` with value ``val``.
complete(future, val, getSrcLocation())

Expand Down
2 changes: 1 addition & 1 deletion chronos/internal/asyncmacro.nim
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ proc wrapInTryFinally(
newCall(ident "complete", fut)
),
nnkElseExpr.newTree(
newCall(ident "complete", fut, ident "result")
newCall(ident "complete", fut, newCall(ident "move", ident "result"))
)
)
)
Expand Down
8 changes: 4 additions & 4 deletions chronos/streams/tlsstream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import
bearssl/[brssl, ec, errors, pem, rsa, ssl, x509],
bearssl/certs/cacert
import ../asyncloop, ../timer, ../asyncsync
import ".."/[asyncloop, asyncsync, config, timer]
import asyncstream, ../transports/stream, ../transports/common
export asyncloop, asyncsync, timer, asyncstream

Expand Down Expand Up @@ -62,7 +62,7 @@ type

PEMContext = ref object
data: seq[byte]

TrustAnchorStore* = ref object
anchors: seq[X509TrustAnchor]

Expand Down Expand Up @@ -158,7 +158,7 @@ proc tlsWriteRec(engine: ptr SslEngineContext,
var length = 0'u
var buf = sslEngineSendrecBuf(engine[], length)
doAssert(length != 0 and not isNil(buf))
await writer.wsource.write(buf, int(length))
await writer.wsource.write(chronosMoveSink(buf), int(length))
sslEngineSendrecAck(engine[], length)
TLSResult.Success
except AsyncStreamError as exc:
Expand Down Expand Up @@ -481,7 +481,7 @@ proc newTLSClientAsyncStream*(
## ``minVersion`` of bigger then ``maxVersion`` you will get an error.
##
## ``flags`` - custom TLS connection flags.
##
##
## ``trustAnchors`` - use this if you want to use certificate trust
## anchors other than the default Mozilla trust anchors. If you pass
## a ``TrustAnchorStore`` you should reuse the same instance for
Expand Down
16 changes: 0 additions & 16 deletions chronos/transports/common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -596,22 +596,6 @@ proc raiseTransportOsError*(err: OSErrorCode) {.
## Raises transport specific OS error.
raise getTransportOsError(err)

type
SeqHeader = object
length, reserved: int

proc isLiteral*(s: string): bool {.inline.} =
when defined(gcOrc) or defined(gcArc):
false
else:
(cast[ptr SeqHeader](s).reserved and (1 shl (sizeof(int) * 8 - 2))) != 0

proc isLiteral*[T](s: seq[T]): bool {.inline.} =
when defined(gcOrc) or defined(gcArc):
false
else:
(cast[ptr SeqHeader](s).reserved and (1 shl (sizeof(int) * 8 - 2))) != 0

template getTransportTooManyError*(
code = OSErrorCode(0)
): ref TransportTooManyError =
Expand Down
10 changes: 5 additions & 5 deletions chronos/transports/datagram.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import std/deques
when not(defined(windows)): import ".."/selectors2
import ".."/[asyncloop, osdefs, oserrno, osutils, handles]
import ".."/[asyncloop, config, osdefs, oserrno, osutils, handles]
import "."/common

type
Expand Down Expand Up @@ -894,7 +894,7 @@ proc send*(transp: DatagramTransport, msg: sink string,
transp.checkClosed(retFuture)

let length = if msglen <= 0: len(msg) else: msglen
var localCopy = msg
var localCopy = chronosMoveSink(msg)
retFuture.addCallback(proc(_: pointer) = reset(localCopy))

let vector = GramVector(kind: WithoutAddress, buf: addr localCopy[0],
Expand All @@ -917,7 +917,7 @@ proc send*[T](transp: DatagramTransport, msg: sink seq[T],
transp.checkClosed(retFuture)

let length = if msglen <= 0: (len(msg) * sizeof(T)) else: (msglen * sizeof(T))
var localCopy = msg
var localCopy = chronosMoveSink(msg)
retFuture.addCallback(proc(_: pointer) = reset(localCopy))

let vector = GramVector(kind: WithoutAddress, buf: addr localCopy[0],
Expand Down Expand Up @@ -955,7 +955,7 @@ proc sendTo*(transp: DatagramTransport, remote: TransportAddress,
transp.checkClosed(retFuture)

let length = if msglen <= 0: len(msg) else: msglen
var localCopy = msg
var localCopy = chronosMoveSink(msg)
retFuture.addCallback(proc(_: pointer) = reset(localCopy))

let vector = GramVector(kind: WithAddress, buf: addr localCopy[0],
Expand All @@ -977,7 +977,7 @@ proc sendTo*[T](transp: DatagramTransport, remote: TransportAddress,
var retFuture = newFuture[void]("datagram.transport.sendTo(seq)")
transp.checkClosed(retFuture)
let length = if msglen <= 0: (len(msg) * sizeof(T)) else: (msglen * sizeof(T))
var localCopy = msg
var localCopy = chronosMoveSink(msg)
retFuture.addCallback(proc(_: pointer) = reset(localCopy))

let vector = GramVector(kind: WithAddress, buf: addr localCopy[0],
Expand Down
22 changes: 11 additions & 11 deletions chronos/transports/stream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
{.push raises: [].}

import std/deques
import ".."/[asyncloop, handles, osdefs, osutils, oserrno]
import common
import stew/ptrops
import ".."/[asyncloop, config, handles, osdefs, osutils, oserrno]
import ./common

type
VectorKind = enum
Expand Down Expand Up @@ -770,7 +771,7 @@ when defined(windows):
# Continue only if `retFuture` is not cancelled.
if not(retFuture.finished()):
let
pipeSuffix = $cast[cstring](unsafeAddr address.address_un[0])
pipeSuffix = $cast[cstring](baseAddr address.address_un)
pipeAsciiName = PipeHeaderName & pipeSuffix[1 .. ^1]
pipeName = toWideString(pipeAsciiName).valueOr:
retFuture.fail(getTransportOsError(error))
Expand Down Expand Up @@ -806,7 +807,7 @@ when defined(windows):

proc createAcceptPipe(server: StreamServer): Result[AsyncFD, OSErrorCode] =
let
pipeSuffix = $cast[cstring](addr server.local.address_un)
pipeSuffix = $cast[cstring](baseAddr server.local.address_un)
pipeName = ? toWideString(PipeHeaderName & pipeSuffix)
openMode =
if FirstPipe notin server.flags:
Expand Down Expand Up @@ -878,7 +879,7 @@ when defined(windows):
if server.status notin {ServerStatus.Stopped, ServerStatus.Closed}:
server.apending = true
let
pipeSuffix = $cast[cstring](addr server.local.address_un)
pipeSuffix = $cast[cstring](baseAddr server.local.address_un)
pipeAsciiName = PipeHeaderName & pipeSuffix
pipeName = toWideString(pipeAsciiName).valueOr:
raiseOsDefect(error, "acceptPipeLoop(): Unable to create name " &
Expand Down Expand Up @@ -2011,7 +2012,7 @@ proc createStreamServer*(host: TransportAddress,
elif host.family in {AddressFamily.Unix}:
# We do not care about result here, because if file cannot be removed,
# `bindSocket` will return EADDRINUSE.
discard osdefs.unlink(cast[cstring](unsafeAddr host.address_un[0]))
discard osdefs.unlink(cast[cstring](baseAddr host.address_un))

host.toSAddr(saddr, slen)
if osdefs.bindSocket(SocketHandle(serverSocket),
Expand Down Expand Up @@ -2240,20 +2241,19 @@ proc write*(transp: StreamTransport, msg: sink string,
var retFuture = newFuture[int]("stream.transport.write(string)")
transp.checkClosed(retFuture)
transp.checkWriteEof(retFuture)

let
nbytes = if msglen <= 0: len(msg) else: msglen

var
pbytes = cast[ptr byte](unsafeAddr msg[0])
pbytes = cast[ptr byte](baseAddr msg)
rbytes = nbytes

fastWrite(transp, pbytes, rbytes, nbytes)

let
written = nbytes - rbytes # In case fastWrite wrote some

var localCopy = msg
var localCopy = chronosMoveSink(msg)
retFuture.addCallback(proc(_: pointer) = reset(localCopy))

pbytes = cast[ptr byte](addr localCopy[written])
Expand All @@ -2278,15 +2278,15 @@ proc write*[T](transp: StreamTransport, msg: sink seq[T],
nbytes = if msglen <= 0: (len(msg) * sizeof(T)) else: (msglen * sizeof(T))

var
pbytes = cast[ptr byte](unsafeAddr msg[0])
pbytes = cast[ptr byte](baseAddr msg)
rbytes = nbytes

fastWrite(transp, pbytes, rbytes, nbytes)

let
written = nbytes - rbytes # In case fastWrite wrote some

var localCopy = msg
var localCopy = chronosMoveSink(msg)
retFuture.addCallback(proc(_: pointer) = reset(localCopy))

pbytes = cast[ptr byte](addr localCopy[written])
Expand Down
6 changes: 6 additions & 0 deletions tests/testfut.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1997,3 +1997,9 @@ suite "Future[T] behavior test suite":
check:
future1.cancelled() == true
future2.cancelled() == true
test "Sink with literals":
# https://github.com/nim-lang/Nim/issues/22175
let fut = newFuture[string]()
fut.complete("test")
check:
fut.value() == "test"

0 comments on commit f03cdfc

Please sign in to comment.