diff --git a/kernel/kernel/fs/pipe.cpp b/kernel/kernel/fs/pipe.cpp index 557b80a0c..164afe9d4 100644 --- a/kernel/kernel/fs/pipe.cpp +++ b/kernel/kernel/fs/pipe.cpp @@ -122,6 +122,7 @@ class pipe : public refcountable } ssize_t append(const void *ubuf, size_t len, bool atomic); + ssize_t append_iter(iovec_iter *iter, bool atomic); public: size_t reader_count{1}; @@ -136,6 +137,7 @@ class pipe : public refcountable void close_write_end(); short poll(struct file *filp, void *poll_file, short events); ssize_t read_iter(iovec_iter *iter, unsigned int flags); + ssize_t write_iter(iovec_iter *iter, int flags); void wake_all(wait_queue *wq) { @@ -399,6 +401,101 @@ ssize_t pipe::append(const void *ubuf, size_t len, bool atomic) return ret; } +ssize_t pipe::append_iter(iovec_iter *iter, bool atomic) +{ + // Tricky logic, as in pipe::append() + + pipe_buffer *to_restore = nullptr; + size_t old_restore_len = 0; + ssize_t ret = 0; + + if (!list_is_empty(&pipe_buffers)) + { + auto last_buf = container_of(list_last_element(&pipe_buffers), pipe_buffer, list_node); + + // See if we have space in this pipe buffer + // TODO: Idea to test: memmove data back if we have offset != 0 + // May compact things a bit. + if (PAGE_SIZE - last_buf->len_ <= iter->bytes) + { + // We have space, copy up + if (atomic) + to_restore = last_buf; + + old_restore_len = last_buf->len_; + u8 *page_buf = (u8 *) PAGE_TO_VIRT(last_buf->page_); + ssize_t copied = + copy_from_iter(iter, page_buf + last_buf->offset_, PAGE_SIZE - last_buf->len_); + if (copied < 0) + return -EFAULT; + + // Adjust the length + last_buf->len_ += copied; + assert(last_buf->len_ <= PAGE_SIZE); + ret += copied; + curr_len += copied; + } + } + + const auto avail = available_space(); + + // If we still have more to append and enough space, lets do so + if (avail && !iter->empty()) + { + if (!cached_page) + { + cached_page = alloc_page(PAGE_ALLOC_NO_ZERO); + if (!cached_page) + { + ret = -ENOMEM; + goto out; + } + } + + page *p = cached_page; + + auto blen = min(min(avail, iter->bytes), PAGE_SIZE); + // Note: the page and its lifetime are now tied to the pipe buffer, but we steal + // the page on error. + auto buf = make_unique(p, blen); + if (!buf) + { + ret = -ENOMEM; + goto out; + } + + u8 *page_buf = (u8 *) PAGE_TO_VIRT(p); + ssize_t copied = copy_from_iter(iter, page_buf, buf->len_); + if (copied < 0) + { + if (atomic || !ret) + ret = -EFAULT; + buf->steal_page(); + goto out; + } + + // Append the page_buf to the end of list + list_add_tail(&buf->list_node, &pipe_buffers); + ret += copied; + curr_len += copied; + to_restore = nullptr; + + // Release the cached page, definitely no longer ours. + cached_page = nullptr; + + buf.release(); + } + +out: + if (atomic && to_restore) + { + curr_len -= (to_restore->len_ - old_restore_len); + to_restore->len_ = old_restore_len; + } + + return ret; +} + // Pretty basic mocking thing // I don't know if this is any useful honestly. // The "easiest" way would be to hot-patch the functions we want to mock with a jmp @@ -750,6 +847,79 @@ ssize_t pipe::read_iter(iovec_iter *iter, unsigned int flags) return ret; } +ssize_t pipe::write_iter(iovec_iter *iter, int flags) +{ + bool is_atomic_write = iter->bytes <= PIPE_BUF; + ssize_t ret = 0; + + scoped_mutex g{pipe_lock}; + + bool wasempty = !can_read(); + + while (!iter->empty()) + { + if (reader_count == 0) + { + CALL_KUNIT_MOCKABLE(kernel_raise_signal, SIGPIPE, get_current_process(), 0, nullptr); + + if (!ret) + ret = -EPIPE; + break; + } + + const auto avail = available_space(); + + bool may_write = avail > 0 && !(avail < iter->bytes && is_atomic_write); + + if (!may_write) + { + if (ret != 0) + { + /* now that we're blocking, might as well signal readers */ + wake_all(&read_queue); + } + + if (flags & O_NONBLOCK) + { + if (!ret) + ret = -EAGAIN; + break; + } + + if (wait_for_event_mutex_interruptible( + &write_queue, + (is_atomic_write && available_space() >= iter->bytes) || !is_full() || + reader_count == 0, + &pipe_lock) == -EINTR) + { + if (!ret) + ret = -EINTR; + break; + } + + wasempty = !can_read(); + continue; + } + + // Now we have space + ssize_t st = append_iter(iter, is_atomic_write); + + if (st < 0) + { + if (!ret) + ret = st; + break; + } + + ret += st; + } + + if (wasempty && ret > 0) + wake_all(&read_queue); + + return ret; +} + ssize_t pipe_read_iter(struct file *filp, size_t off, iovec_iter *iter, unsigned int flags) { (void) off; @@ -758,6 +928,14 @@ ssize_t pipe_read_iter(struct file *filp, size_t off, iovec_iter *iter, unsigned return p->read_iter(iter, filp->f_flags); } +ssize_t pipe_write_iter(struct file *filp, size_t off, iovec_iter *iter, unsigned int flags) +{ + (void) off; + (void) flags; + pipe *p = get_pipe(filp->f_ino->i_pipe); + return p->write_iter(iter, filp->f_flags); +} + const struct file_ops pipe_ops = { .read = pipe_read, .write = pipe_write, @@ -767,6 +945,7 @@ const struct file_ops pipe_ops = { .fcntl = pipe_fcntl, .release = pipe_release, .read_iter = pipe_read_iter, + .write_iter = pipe_write_iter, }; static int pipe_create(struct file **pipe_readable, struct file **pipe_writeable)