Skip to content

Commit

Permalink
pipe: implement writev
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Shkenev <[email protected]>
  • Loading branch information
petershh authored and heatd committed Jan 5, 2024
1 parent 29924fa commit 1599d09
Showing 1 changed file with 179 additions and 0 deletions.
179 changes: 179 additions & 0 deletions kernel/kernel/fs/pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class pipe : public refcountable
}

ssize_t append(const void *ubuf, size_t len, bool atomic);
ssize_t append_iter(iovec_iter *iter, bool atomic);

public:
size_t reader_count{1};
Expand All @@ -136,6 +137,7 @@ class pipe : public refcountable
void close_write_end();
short poll(struct file *filp, void *poll_file, short events);
ssize_t read_iter(iovec_iter *iter, unsigned int flags);
ssize_t write_iter(iovec_iter *iter, int flags);

void wake_all(wait_queue *wq)
{
Expand Down Expand Up @@ -399,6 +401,101 @@ ssize_t pipe::append(const void *ubuf, size_t len, bool atomic)
return ret;
}

ssize_t pipe::append_iter(iovec_iter *iter, bool atomic)
{
// Tricky logic, as in pipe::append()

pipe_buffer *to_restore = nullptr;
size_t old_restore_len = 0;
ssize_t ret = 0;

if (!list_is_empty(&pipe_buffers))
{
auto last_buf = container_of(list_last_element(&pipe_buffers), pipe_buffer, list_node);

// See if we have space in this pipe buffer
// TODO: Idea to test: memmove data back if we have offset != 0
// May compact things a bit.
if (PAGE_SIZE - last_buf->len_ <= iter->bytes)
{
// We have space, copy up
if (atomic)
to_restore = last_buf;

old_restore_len = last_buf->len_;
u8 *page_buf = (u8 *) PAGE_TO_VIRT(last_buf->page_);
ssize_t copied =
copy_from_iter(iter, page_buf + last_buf->offset_, PAGE_SIZE - last_buf->len_);
if (copied < 0)
return -EFAULT;

// Adjust the length
last_buf->len_ += copied;
assert(last_buf->len_ <= PAGE_SIZE);
ret += copied;
curr_len += copied;
}
}

const auto avail = available_space();

// If we still have more to append and enough space, lets do so
if (avail && !iter->empty())
{
if (!cached_page)
{
cached_page = alloc_page(PAGE_ALLOC_NO_ZERO);
if (!cached_page)
{
ret = -ENOMEM;
goto out;
}
}

page *p = cached_page;

auto blen = min(min(avail, iter->bytes), PAGE_SIZE);
// Note: the page and its lifetime are now tied to the pipe buffer, but we steal
// the page on error.
auto buf = make_unique<pipe_buffer>(p, blen);
if (!buf)
{
ret = -ENOMEM;
goto out;
}

u8 *page_buf = (u8 *) PAGE_TO_VIRT(p);
ssize_t copied = copy_from_iter(iter, page_buf, buf->len_);
if (copied < 0)
{
if (atomic || !ret)
ret = -EFAULT;
buf->steal_page();
goto out;
}

// Append the page_buf to the end of list
list_add_tail(&buf->list_node, &pipe_buffers);
ret += copied;
curr_len += copied;
to_restore = nullptr;

// Release the cached page, definitely no longer ours.
cached_page = nullptr;

buf.release();
}

out:
if (atomic && to_restore)
{
curr_len -= (to_restore->len_ - old_restore_len);
to_restore->len_ = old_restore_len;
}

return ret;
}

// Pretty basic mocking thing
// I don't know if this is any useful honestly.
// The "easiest" way would be to hot-patch the functions we want to mock with a jmp
Expand Down Expand Up @@ -750,6 +847,79 @@ ssize_t pipe::read_iter(iovec_iter *iter, unsigned int flags)
return ret;
}

ssize_t pipe::write_iter(iovec_iter *iter, int flags)
{
bool is_atomic_write = iter->bytes <= PIPE_BUF;
ssize_t ret = 0;

scoped_mutex g{pipe_lock};

bool wasempty = !can_read();

while (!iter->empty())
{
if (reader_count == 0)
{
CALL_KUNIT_MOCKABLE(kernel_raise_signal, SIGPIPE, get_current_process(), 0, nullptr);

if (!ret)
ret = -EPIPE;
break;
}

const auto avail = available_space();

bool may_write = avail > 0 && !(avail < iter->bytes && is_atomic_write);

if (!may_write)
{
if (ret != 0)
{
/* now that we're blocking, might as well signal readers */
wake_all(&read_queue);
}

if (flags & O_NONBLOCK)
{
if (!ret)
ret = -EAGAIN;
break;
}

if (wait_for_event_mutex_interruptible(
&write_queue,
(is_atomic_write && available_space() >= iter->bytes) || !is_full() ||
reader_count == 0,
&pipe_lock) == -EINTR)
{
if (!ret)
ret = -EINTR;
break;
}

wasempty = !can_read();
continue;
}

// Now we have space
ssize_t st = append_iter(iter, is_atomic_write);

if (st < 0)
{
if (!ret)
ret = st;
break;
}

ret += st;
}

if (wasempty && ret > 0)
wake_all(&read_queue);

return ret;
}

ssize_t pipe_read_iter(struct file *filp, size_t off, iovec_iter *iter, unsigned int flags)
{
(void) off;
Expand All @@ -758,6 +928,14 @@ ssize_t pipe_read_iter(struct file *filp, size_t off, iovec_iter *iter, unsigned
return p->read_iter(iter, filp->f_flags);
}

ssize_t pipe_write_iter(struct file *filp, size_t off, iovec_iter *iter, unsigned int flags)
{
(void) off;
(void) flags;
pipe *p = get_pipe(filp->f_ino->i_pipe);
return p->write_iter(iter, filp->f_flags);
}

const struct file_ops pipe_ops = {
.read = pipe_read,
.write = pipe_write,
Expand All @@ -767,6 +945,7 @@ const struct file_ops pipe_ops = {
.fcntl = pipe_fcntl,
.release = pipe_release,
.read_iter = pipe_read_iter,
.write_iter = pipe_write_iter,
};

static int pipe_create(struct file **pipe_readable, struct file **pipe_writeable)
Expand Down

0 comments on commit 1599d09

Please sign in to comment.