Skip to content

Commit

Permalink
pipe: Clean up old ->read and ->write
Browse files Browse the repository at this point in the history
Remove ->read and ->write (replace them with emulation over read_iter
and write_iter) and remove the old append().

Signed-off-by: Pedro Falcato <[email protected]>
  • Loading branch information
heatd committed Jan 5, 2024
1 parent 1599d09 commit 9674312
Showing 1 changed file with 10 additions and 274 deletions.
284 changes: 10 additions & 274 deletions kernel/kernel/fs/pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ class pipe : public refcountable
return container_of(list_first_element(&pipe_buffers), pipe_buffer, list_node);
}

ssize_t append(const void *ubuf, size_t len, bool atomic);
ssize_t append_iter(iovec_iter *iter, bool atomic);

public:
Expand Down Expand Up @@ -200,205 +199,11 @@ size_t pipe::available_space() const

ssize_t pipe::read(int flags, size_t len, void *buf)
{
ssize_t ret = 0;

if (len == 0)
return 0;

scoped_mutex g{pipe_lock};

// Lets keep track if the pipe was full the last time we grabbed the lock
// By doing so, we can know when to wake writers instead of wasting time trying to do so
// for no reason.
// Since PIPE_BUF atomic writes are complicated with this scheme, we add PIPE_BUF slack.
bool wasfull = available_space() < PIPE_BUF;

while (true)
{
if (!can_read())
{
// If we can't read more, return the short read if we have read some
if (ret || writer_count == 0)
{
break;
}

// NONBLOCK = return short read (already handled) or EAGAIN
if (flags & O_NONBLOCK)
{
if (!ret)
ret = -EAGAIN;
break;
}

// Wait for writers
if (wait_for_event_mutex_interruptible(&read_queue, can_read_or_eof(), &pipe_lock) ==
-EINTR)
{
ret = ret ?: -EINTR;
break;
}

wasfull = available_space() < PIPE_BUF;

continue;
}

/* We have data, lets read some */
assert(!list_is_empty(&pipe_buffers));

// Consume the first buffer in the queue

auto pbf = first_buf();

size_t to_read = min((size_t) pbf->len_, len);

u8 *page_buf = (u8 *) PAGE_TO_VIRT(pbf->page_) + pbf->offset_;

if (copy_to_user((u8 *) buf + ret, page_buf, to_read) < 0)
{
if (!ret)
ret = -EFAULT;
break;
}

pbf->offset_ += to_read;
pbf->len_ -= to_read;

if (pbf->len_ == 0)
{
// If its now empty, free the pipe buffer
list_remove(&pbf->list_node);

// Check if we have a cached page. If not, cache this one, else let it go.
if (!cached_page)
{
cached_page = pbf->steal_page();
}

delete pbf;
}

// Decrement the length of the pipe (curr_len)
curr_len -= to_read;
ret += to_read;
len -= to_read;

if (!len || !can_read())
{
// No more to read, break
break;
}
}

// Unlock to prevent contention with writers
g.unlock();

if (wasfull && ret > 0)
{
// If it was previously full and we read some, wake the writers
wake_all(&write_queue);
}

return ret;
}

ssize_t pipe::append(const void *ubuf, size_t len, bool atomic)
{
// Logic here is a bit tricky. Try to append to the last pipe buf
// If we can do so, then do it. Then if we still have more data, allocate a new pipe buffer
// and append it. If we ever need to roll back (since it may be a PIPE_BUF write), do so using
// "to_restore" and "old_restore_len".

pipe_buffer *to_restore = nullptr;
size_t old_restore_len = 0;
ssize_t ret = 0;

if (!list_is_empty(&pipe_buffers))
{
auto last_buf = container_of(list_last_element(&pipe_buffers), pipe_buffer, list_node);

// See if we have space in this pipe buffer
// TODO: Idea to test: memmove data back if we have offset != 0
// May compact things a bit.
if (PAGE_SIZE - last_buf->len_ <= len)
{
// We have space, copy up
if (atomic)
to_restore = last_buf;

old_restore_len = last_buf->len_;
u8 *page_buf = (u8 *) PAGE_TO_VIRT(last_buf->page_);
size_t to_copy = min(PAGE_SIZE - last_buf->len_, len);
if (copy_from_user(page_buf + last_buf->offset_, ubuf, to_copy) < 0)
return -EFAULT;

// Adjust the length
last_buf->len_ += to_copy;
assert(last_buf->len_ <= PAGE_SIZE);
len -= to_copy;
ret += to_copy;
curr_len += to_copy;
}
}

const auto avail = available_space();

// If we still have more to append and enough space, lets do so
if (avail && len)
{
page *p = cached_page;
if (!cached_page)
{
p = alloc_page(PAGE_ALLOC_NO_ZERO);
if (!p)
{
ret = -ENOMEM;
goto out;
}

cached_page = p;
}

auto blen = min(min(avail, len), PAGE_SIZE);
// Note: the page and its lifetime are now tied to the pipe buffer, but we steal
// the page on error.
auto buf = make_unique<pipe_buffer>(p, blen);
if (!buf)
{
ret = -ENOMEM;
goto out;
}

u8 *page_buf = (u8 *) PAGE_TO_VIRT(p);
if (copy_from_user(page_buf, (u8 *) ubuf + ret, buf->len_) < 0)
{
if (atomic || !ret)
ret = -EFAULT;
buf->steal_page();
goto out;
}

// Append the page_buf to the end of list
list_add_tail(&buf->list_node, &pipe_buffers);
ret += buf->len_;
curr_len += buf->len_;
to_restore = nullptr;

// Release the cached page, definitely no longer ours.
cached_page = nullptr;

buf.release();
}

out:
if (atomic && to_restore)
{
curr_len -= (to_restore->len_ - old_restore_len);
to_restore->len_ = old_restore_len;
}

return ret;
struct iovec iov;
iov.iov_base = buf;
iov.iov_len = len;
iovec_iter iter{cul::slice<iovec>{&iov, 1}, len, IOVEC_USER};
return read_iter(&iter, flags);
}

ssize_t pipe::append_iter(iovec_iter *iter, bool atomic)
Expand Down Expand Up @@ -512,80 +317,11 @@ KUNIT_MOCKABLE(kernel_raise_signal, int, int, process *, unsigned int, siginfo_t

ssize_t pipe::write(int flags, size_t len, const void *ubuf)
{
bool is_atomic_write = len <= PIPE_BUF;
ssize_t ret = 0;

scoped_mutex g{pipe_lock};

bool wasempty = !can_read();

while (len)
{
if (reader_count == 0)
{
CALL_KUNIT_MOCKABLE(kernel_raise_signal, SIGPIPE, get_current_process(), 0, nullptr);

if (!ret)
ret = -EPIPE;
break;
}

const auto avail = available_space();

bool may_write = avail > 0;

if (avail < len && is_atomic_write)
may_write = false;

if (!may_write)
{
if (ret != 0)
{
/* now that we're blocking, might as well signal readers */
wake_all(&read_queue);
}

if (flags & O_NONBLOCK)
{
if (!ret)
ret = -EAGAIN;
break;
}

if (wait_for_event_mutex_interruptible(
&write_queue,
(is_atomic_write && available_space() >= (len - ret)) || !is_full() ||
reader_count == 0,
&pipe_lock) == -EINTR)
{
if (!ret)
ret = -EINTR;
break;
}

wasempty = !can_read();
continue;
}

// Ok, we have space, lets write
ssize_t st = append((const u8 *) ubuf + ret, min(avail, len), is_atomic_write);

if (st < 0)
{
if (!ret)
ret = st;
break;
}

ret += st;
len -= st;
}

/* After finishing the write, signal any possible readers */
if (wasempty && ret > 0)
wake_all(&read_queue);

return ret;
struct iovec iov;
iov.iov_base = (void *) ubuf;
iov.iov_len = len;
iovec_iter iter{cul::slice<iovec>{&iov, 1}, len, IOVEC_USER};
return write_iter(&iter, flags);
}

pipe *get_pipe(void *helper)
Expand Down

0 comments on commit 9674312

Please sign in to comment.