Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: px driver enhancements #216

Open
wants to merge 16 commits into
base: ln/iouring
Choose a base branch
from
205 changes: 204 additions & 1 deletion dev.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
#include <linux/vmalloc.h>
#include "pxd_compat.h"
#include "pxd_core.h"
#include <linux/mmu_context.h>
#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 11, 0)
#include <linux/sched/mm.h>
#endif

#if LINUX_VERSION_CODE >= KERNEL_VERSION(4,6,0)
#define PAGE_CACHE_GET(page) get_page(page)
Expand Down Expand Up @@ -254,7 +258,13 @@ void fuse_request_send_nowait(struct fuse_conn *fc, struct fuse_req *req)
static bool request_pending(struct fuse_conn *fc)
{
struct fuse_queue_cb *cb = &fc->queue->requests_cb;
return cb->r.read != cb->r.write;
return cb->r.read != smp_load_acquire(&cb->r.write);
}

static bool user_request_pending(struct fuse_conn *fc)
{
struct fuse_queue_cb *cb = &fc->queue->user_requests_cb;
return cb->r.read != smp_load_acquire(&cb->r.write);
}

/* Wait until a request is available on the pending list */
Expand Down Expand Up @@ -1077,6 +1087,9 @@ static unsigned fuse_dev_poll(struct file *file, poll_table *wait)
if (!fc)
return POLLERR;

if (request_pending(fc))
return (mask | POLLIN | POLLRDNORM);

poll_wait(file, &fc->waitq, wait);

if (request_pending(fc))
Expand Down Expand Up @@ -1105,6 +1118,15 @@ void fuse_end_queued_requests(struct fuse_conn *fc)

static void fuse_conn_free_allocs(struct fuse_conn *fc)
{
int i;

fc->user_mm = NULL;
for (i=0; i<NWORKERS; i++) {
if (fc->io_worker_thread[i]) {
kthread_stop(fc->io_worker_thread[i]);
fc->io_worker_thread[i] = NULL;
}
}
if (fc->per_cpu_ids)
free_percpu(fc->per_cpu_ids);
if (fc->free_ids)
Expand All @@ -1124,6 +1146,7 @@ void fuse_queue_init_cb(struct fuse_queue_cb *cb)

cb->r.write = 0;
cb->r.read = 0;
atomic_set(&cb->r.need_wake_up, 0);
}

static void fuse_conn_queues_init(struct fuse_conn_queues *queue)
Expand All @@ -1135,6 +1158,172 @@ static void fuse_conn_queues_init(struct fuse_conn_queues *queue)
memset(queue->user_requests, 0, sizeof(queue->user_requests));
}

void fuse_run_user_queue(struct fuse_conn *fc, bool mm_fault)
{
struct fuse_queue_cb *cb = &fc->queue->user_requests_cb;
struct fuse_user_request *req;
uint32_t read, write;
uint32_t span, i;
#define NREQINLINE (16u)
struct fuse_user_request ureq[NREQINLINE];

spin_lock(&fc->io_lock);
write = smp_load_acquire(&cb->r.write);
read = cb->r.read;
span = min((write - read), NREQINLINE);
for (i=0; i<span; i++, read++) {
req = &fc->queue->user_requests[
read & (FUSE_REQUEST_QUEUE_SIZE - 1)];
memcpy(&ureq[i], req, sizeof(*req));
}

if (span != 0) smp_store_release(&cb->r.read, read);

// give up lock
spin_unlock(&fc->io_lock);

if (user_request_pending(fc)) wake_up(&fc->io_wait);

for (i=0; i<span; i++) {
if (unlikely(mm_fault)) {
fuse_user_complete(fc, ureq[i].unique, ureq[i].user_data, -EFAULT);
} else {
fuse_process_user_request(fc, &ureq[i]);
}
}

}

#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 11, 0)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrappers to control userspace locking.

mm_segment_t fuse_setup_user_access(struct mm_struct *mm, bool *mm_fault)
{
#if LINUX_VERSION_CODE >= KERNEL_VERSION(5,10,0)
mm_segment_t old_fs = force_uaccess_begin();
#else
mm_segment_t old_fs = get_fs();
#endif

*mm_fault = false;
if (mm) {
if (!mmget_not_zero(mm)) {
*mm_fault = true;
return old_fs;
}
#if LINUX_VERSION_CODE < KERNEL_VERSION(5,8,0)
use_mm(mm);
set_fs(USER_DS);
#else
kthread_use_mm(mm);
#endif
}

return old_fs;
}

void fuse_remove_user_access(struct mm_struct *mm, mm_segment_t old_fs)
{
if (mm) {
#if LINUX_VERSION_CODE < KERNEL_VERSION(5,8,0)
unuse_mm(mm);
set_fs(old_fs);
#else
kthread_unuse_mm(mm);
#endif
mmput(mm);
}
}
#else
mm_segment_t fuse_setup_user_access(struct fuse_conn *fc, bool *mm_fault) { *mm_fault = false; return get_fs();}
void fuse_remove_user_access(struct mm_struct *mm, mm_segment_t old_fs) {}
#endif

static int fuse_process_user_queue(void *c)
{
struct fuse_conn *fc = (struct fuse_conn*) c;
struct fuse_queue_cb *cb = &fc->queue->user_requests_cb;
bool mm_fault;
struct mm_struct *cur_mm = NULL;
#if LINUX_VERSION_CODE >= KERNEL_VERSION(5,10,0)
mm_segment_t old_fs = force_uaccess_begin();
#else
mm_segment_t old_fs = get_fs();
#endif
unsigned long spin_wait = jiffies;

while (!kthread_should_stop()) {
do {
if (user_request_pending(fc)) break;
cpu_relax();
} while (jiffies < spin_wait);

// prepare to sleep
if (!user_request_pending(fc)) {
if (cur_mm != NULL) {
fuse_remove_user_access(cur_mm, old_fs);
cur_mm = NULL;
}

atomic_inc(&cb->r.need_wake_up);
if (signal_pending(current))
flush_signals(current);

wait_event_interruptible(fc->io_wait,
(user_request_pending(fc) || kthread_should_stop() ||
kthread_should_park()));
atomic_dec(&cb->r.need_wake_up);
}

if (kthread_should_stop()) {
break;
}

if (kthread_should_park()) {
fuse_remove_user_access(cur_mm, old_fs);
cur_mm = NULL;
kthread_parkme();
continue;
}

mm_fault = false;
if (!cur_mm) {
fuse_setup_user_access(fc->user_mm, &mm_fault);
cur_mm = fc->user_mm;
}
fuse_run_user_queue(fc, mm_fault);
spin_wait = jiffies + msecs_to_jiffies(3);
}

if (cur_mm != NULL) {
fuse_remove_user_access(cur_mm, old_fs);
}
return 0;
}

void fuse_pause_user_queue(struct fuse_conn *fc)
{
struct fuse_queue_cb *cb = &fc->queue->user_requests_cb;
int i;

pr_info("parking worker threads");
for (i=0; i<NWORKERS; i++) kthread_park(fc->io_worker_thread[i]);
BUG_ON(atomic_read(&cb->r.need_wake_up) != 0);

if (fc->user_mm) {
mmdrop(fc->user_mm);
fc->user_mm = NULL;
}
}

void fuse_restart_user_queue(struct fuse_conn *fc)
{
int i;

mmgrab(current->mm);
fc->user_mm = current->mm;
pr_info("unparking worker threads");
for (i=0; i<NWORKERS; i++) kthread_unpark(fc->io_worker_thread[i]);
}

int fuse_conn_init(struct fuse_conn *fc)
{
int i, rc;
Expand Down Expand Up @@ -1185,6 +1374,18 @@ int fuse_conn_init(struct fuse_conn *fc)

fuse_conn_queues_init(fc->queue);

fc->user_mm = NULL;
init_waitqueue_head(&fc->io_wait);
spin_lock_init(&fc->io_lock);
for (i=0; i<NWORKERS; i++) {
fc->io_worker_thread[i] = kthread_create(fuse_process_user_queue, fc, "userq-worker-%d", i);
if (IS_ERR(fc->io_worker_thread[i])) {
rc = (int) PTR_ERR(fc->io_worker_thread[i]);
goto err_out;
}
wake_up_process(fc->io_worker_thread[i]);
}

return 0;
err_out:
fuse_conn_free_allocs(fc);
Expand Down Expand Up @@ -1370,6 +1571,8 @@ int fuse_restart_requests(struct fuse_conn *fc)

vfree(resend_reqs);

fuse_restart_user_queue(fc);

return 0;
}

Expand Down
14 changes: 14 additions & 0 deletions fuse_i.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,13 @@ struct fuse_conn {

/** Called on final put */
void (*release)(struct fuse_conn *);

/** user request processing */
wait_queue_head_t io_wait;
#define NWORKERS (8)
struct task_struct* io_worker_thread[NWORKERS];
struct mm_struct *user_mm;
spinlock_t io_lock;
};

/** Device operations */
Expand Down Expand Up @@ -305,6 +312,13 @@ void fuse_request_send_nowait(struct fuse_conn *fc, struct fuse_req *req);
/* Abort all requests */
void fuse_abort_conn(struct fuse_conn *fc);

/**
* start processing pending IOs from userspace.
*/
void fuse_run_user_queue(struct fuse_conn *fc, bool mm_fault);
void fuse_restart_user_queue(struct fuse_conn *fc);
void fuse_pause_user_queue(struct fuse_conn *fc);

/**
* Initialize fuse_conn
*/
Expand Down
20 changes: 2 additions & 18 deletions pxd.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,25 +230,8 @@ static long pxd_ioctl_run_user_queue(struct file *file)
{
struct pxd_context *ctx = container_of(file->f_op, struct pxd_context, fops);
struct fuse_conn *fc = &ctx->fc;
struct fuse_queue_cb *cb = &fc->queue->user_requests_cb;

struct fuse_user_request *req;

uint32_t read = cb->r.read;
uint32_t write = smp_load_acquire(&cb->r.write);

while (read != write) {
for (; read != write; ++read) {
req = &fc->queue->user_requests[
read & (FUSE_REQUEST_QUEUE_SIZE - 1)];
fuse_process_user_request(fc, req);
}

smp_store_release(&cb->r.read, read);

write = smp_load_acquire(&cb->r.write);
}

fuse_run_user_queue(fc, false);
return 0;
}

Expand Down Expand Up @@ -2157,6 +2140,7 @@ static int pxd_control_release(struct inode *inode, struct file *file)
pxd_printk("%s: not opened\n", __func__);
} else {
WRITE_ONCE(ctx->fc.connected, 0);
fuse_pause_user_queue(&ctx->fc);
}

schedule_delayed_work(&ctx->abort_work, pxd_timeout_secs * HZ);
Expand Down
4 changes: 4 additions & 0 deletions pxd.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ struct pxd_device* find_pxd_device(struct pxd_context *ctx, uint64_t dev_id);
*/
// No arguments necessary other than opcode
#define PXD_FEATURE_FASTPATH (0x1)
#define PXD_FEATURE_BGIO (0x2)

static inline
int pxd_supported_features(void)
Expand All @@ -221,6 +222,9 @@ int pxd_supported_features(void)
#ifdef __PX_FASTPATH__
features |= PXD_FEATURE_FASTPATH;
#endif
#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 11, 0)
features |= PXD_FEATURE_BGIO;
#endif

return features;
}
Expand Down