Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In-memory pipe implementation #12238

Open
straight-shoota opened this issue Jul 9, 2022 · 15 comments · May be fixed by #12245
Open

In-memory pipe implementation #12238

straight-shoota opened this issue Jul 9, 2022 · 15 comments · May be fixed by #12245

Comments

@straight-shoota
Copy link
Member

This discussion came up in #5430 (comment)

IO.pipe is currently implemented based on file descriptors, which has a couple downsides:

A use-mode implementation would avoid these problens and be more performant because there's no switch to the kernel.
It can't be used for inter-process communication, though. So we should retain a pipe implementation based on file-descriptors for this use case.

@jgaskins
Copy link
Contributor

So we should retain a pipe implementation based on file-descriptors for this use case.

Indeed, this is needed for Process and Signal.

In #5430 I mentioned I had some code for this. I posted it in #12245 to show where I was going with it. I also have another implementation that isn't based on Deque (it just uses Slice directly), but I realized while messing with it this evening that Deque simplified the implementation a lot.

One thing that might be nice is to allow the buffer to be elastic — shrink it back down when the size:capacity ratio is reduced below some threshold. I have no idea if that's feasible or what the real-world performance looks like but a user-space queue won't yield the CPU on I/O because we're not doing any kernel-level I/O, so a whoooole lot of writes can happen before a single read does, potentially ballooning memory until OOM.

Another mitigation strategy for that is to replace the Deque with a Channel. My assumption is that, since Deque provides the underlying implementation for Channel, that Channel is slower, but I haven't done any benchmarking yet. Using a Channel would definitely avoid unwanted memory bloat since it provides a hard limit on memory usage and would yield the CPU when the buffer fills, so it might be closer to live functionality.

@straight-shoota
Copy link
Member Author

We'll need to handle concurrent access correctly, so using a channel would be a good idea.
The Go implementation can serve as inspiration: https://go.dev/src/io/pipe.go

@jgaskins
Copy link
Contributor

If I stick with the Deque, the plan was to wrap a mutex around the code in read and write specifically to account for that.

@straight-shoota
Copy link
Member Author

Yeah, Channel does that for you already

@jgaskins
Copy link
Contributor

Yes, but that's not the only difference between the two. The way you're talking about it implies you are favoring one implementation based on a single factor.

@straight-shoota
Copy link
Member Author

Managing concurrent access is hard. So I would start with channels, just because it's easier. And there's no point in reimplementing that just for the sake of it.
Do you think there is a considerable downside to using channels?

@jgaskins
Copy link
Contributor

there's no point in reimplementing that just for the sake of it.

There is more to what I said than “for the sake of it”.

Do you think there is a considerable downside to using channels?

I pointed out tradeoffs in this comment.

@straight-shoota
Copy link
Member Author

Forgive me, I read that comment as a perfect argument for using channels.
The only downside you mentioned is a hunch that performance might be impacted. I don't think that's justified. We probably need pretty much everything that Channel provides for making the pipe implementation concurrency safe. Re-implementing that unlikely yields any better results.
Anyways, we should do the simple thing first which also gives more confidence for correctness. We can always optimize once there is a solid base.

@jgaskins
Copy link
Contributor

I wasn't making a case one way or the other, really. Just offering discussion. I don't think there was enough information at the time to draw any conclusions about a preferred implementation. There may still not be even after this post.

The only downside you mentioned is a hunch that performance might be impacted. I don't think that's justified.

Basing a decision on my assumption would be a mistake, certainly. I would never suggest making decisions about performance without benchmarks. But I'd argue that your dismissal of my note about performance is a mistake, as well.

And while I agree that correctness is more important than performance, channels provide little to no opportunity for optimization here. The API design explicitly prohibits that:

  • There's no way to send or receive a batch of elements, so every byte passed through it would lock and unlock a mutex on both the sending and receiving side
  • There's no introspection into channel state because that's not thread-safe, so to do any nonblocking I/O whatsoever, each byte would need to be read inside a select/else
  • There's no way to implement peek

These were all intentional design choices for channels. So your comment about optimizing a channel-based implementation later seems a lot more optimistic than reality allows. I wrote up a quick benchmark passing 8KB at a time through a channel vs a deque, plus the current IO::FileDescriptor-based pipe as the "time to beat":

channel 819.25  (  1.22ms) (± 1.00%)  2.13MB/op  9768.59× slower
  deque   8.00M (124.95ns) (± 0.91%)    0.0B/op          fastest
FD pipe   1.10M (907.29ns) (± 1.51%)    0.0B/op     7.26× slower
Benchmark code
require "benchmark"

size = 8192

channel = Channel(UInt8).new(size)
queue = Deque(UInt8).new(size)
p_read, p_write = IO.pipe(false, false)
slice = Bytes.new(size, '.'.ord.to_u8)

Benchmark.ips do |x|
  x.report "channel" do
    size.times { |i| channel.send slice[i] }
    size.times do |i|
      select
      when byte = channel.receive?
        if byte
          slice[i] = byte
        else
          break
        end
      else
      end
    end
  end

  mutex = Mutex.new
  x.report "deque" do
    mutex.synchronize do
      queue.copy_from slice
      queue.copy_to slice
      queue.pop(size)
    end
  end

  x.report "FD pipe" do
    p_write.write slice
    p_read.read slice
  end
end

class Deque(T)
  def copy_from(slice : Slice(T))
    if @buffer.null?
      initialize slice.size
    end

    if @start == 0
      slice.copy_to @buffer, @capacity
    else
      slice.copy_to @buffer + @start, @capacity - @start
      slice.copy_to @buffer, @start - @capacity
    end
  end

  def copy_to(slice : Slice(T))
    if @start == 0
      slice.copy_from @buffer, @capacity
    else
      slice.copy_from @buffer + @start, @capacity - @start
      slice.copy_from @buffer, @start - @capacity
    end
  end
end

Since each operation here is 8KB, that's:

Implementation Throughput
Channel 6.4MB/sec
Deque 62.5GB/sec
IO.pipe 8.6GB/sec

This means passing bytes over a Channel isn't just slower than over a Deque or even the IO::FileDescriptor-based implementation, it's slower than my internet connection.

Getting the Deque implementation this fast did require an optimization because copying byte-by-byte was slow. I couldn't come up with a way to do the same for Channel because it's a lot more complicated internally and its API design doesn't really seem to allow for it — Deque has plenty of introspection available. Also, the Deque optimization is still incomplete — it copies the entire buffer every time, even if it's not full. It was fine for my purposes because I guaranteed the buffer was full, but in real-world scenarios it can probably short-circuit to save cycles, and Deque#pop(n : Int) could possibly also be optimized to avoid having to iterate.

Maybe a similar optimization can be done for channels (since they're implemented on top of deques), and I'd be curious if it's feasible without bypassing a lot of Channel's safety mechanisms. I couldn't come up with a way to do it, though. Feel free to give it a shot.

TL;DR: I'd caution against zeroing in on a preferred implementation with too little information considered.

@yxhuvud
Copy link
Contributor

yxhuvud commented Jul 14, 2022

Regarding the blocking issues in #8152, isn't that using a different pipe creation that is different from IO.pipe? Anyhow, I'm fairly certain nothing would stop us from building a pipe variant that is closer to a socket than to a file, as epoll do work on pipes, and thus we can use wait_readable.

Another example of where pipes are currently used is multithreaded crystal, to enqueue fibers between threads.

@yxhuvud
Copy link
Contributor

yxhuvud commented Jul 14, 2022

Regarding the benchmark:
1: Don't involve select in benchmarking channels - that is introducing overhead that is different from what you are measuring. Either that, or use select in all of the benchmarks. Select need to support reading from pipes, after all.
2: Any benchmark that try to measure this that doesn't involve the scheduler at any point where nothing ever has to wait - preferably involving both from the reader and writer side is pretty much nonsense. We already know raw Deques and Arrays are faster than interprocess/fiber communication.

FWIW, IO.pipe held up well enough that I don't see a performance problem. Perhaps it needs an implementation that plays nicer with poll and blocking, but that seems like a very different issue. But there is definitely a question about how

@jgaskins
Copy link
Contributor

Regarding the blocking issues in #8152, isn't that using a different pipe creation that is different from IO.pipe? Anyhow, I'm fairly certain nothing would stop us from building a pipe variant that is closer to a socket than to a file, as epoll do work on pipes, and thus we can use wait_readable.

The purpose of this discussion is to have an implementation that doesn't use file descriptors at all, for the same reason you might use IO::Memory over a tempfile.

1: Don't involve select in benchmarking channels - that is introducing overhead that is different from what you are measuring. Either that, or use select in all of the benchmarks. Select need to support reading from pipes, after all.

Please re-read the part of that post where I mentioned why select was needed for the Channel implementation — which does not apply to Deque. The overhead of select is the reason I brought it up. It allocated about 272 bytes of heap memory for every single byte passed through.

2: Any benchmark that try to measure this that doesn't involve the scheduler at any point where nothing ever has to wait - preferably involving both from the reader and writer side is pretty much nonsense. We already know raw Deques and Arrays are faster than interprocess/fiber communication.

If you think it's not up to scratch, feel free to show me how it's done.

FWIW, IO.pipe held up well enough that I don't see a performance problem. Perhaps it needs an implementation that plays nicer with poll and blocking, but that seems like a very different issue.

Please see this comment. You're more than welcome to keep using the implementation backed by file descriptors. It isn't going to be removed.

@HertzDevil
Copy link
Contributor

This issue stemmed from the fact that the use of non-blocking IO.pipes blocks certain standard library specs on Windows (see what I did there?), and those specs obviously do not involve any inter-process communication. IIRC this affects the OAuth2 client specs. Any working implementation is fine by me even if it is restricted to spec/support/io.cr.

@HertzDevil
Copy link
Contributor

Two years later, IO.pipe is supposedly non-blocking and those specs now pass on Windows without MT, but it looks like its use in Crystal::FiberChannel now manifests via #14222

@crysbot
Copy link

crysbot commented Oct 31, 2024

This issue has been mentioned on Crystal Forum. There might be relevant details there:

https://forum.crystal-lang.org/t/block-capture-in-an-ecr-like-template-engine-results-in-out-of-order-output/6979/6

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants