Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support MSG_ZEROCOPY for streaming server. #13

Open
winlinvip opened this issue Apr 18, 2020 · 9 comments
Open

Support MSG_ZEROCOPY for streaming server. #13

winlinvip opened this issue Apr 18, 2020 · 9 comments

Comments

@winlinvip
Copy link
Member

winlinvip commented Apr 18, 2020

参考:ossrs/srs#307 (comment)

目前内核最热的函数是copy_user_enhanced_fast_string,它主要是将用户空间的数据,拷贝到内核,可以想到是因为要将发送的UDP的payload拷贝到内核发送。

同样的,TCP也是这个是瓶颈,实际上Linux内核支持了很多种零拷贝方式,比如sendfile、splice、tee还有MSG_ZEROCOPY

它提到是有代价的,如果要发送大量的数据,那么比较值得:

Copy avoidance is not a free lunch. As implemented, with page pinning, it replaces 
per byte copy cost with page accounting and completion notification overhead. As a 
result, MSG_ZEROCOPY is generally only effective at writes over around 10 KB.

若使用sendmmsg,600Kbps码率的流,1个连接观看时一次发送50KB数据,1000个连接观看一次发送8.5MB的数据,2000个连接观看一次发送14.4MB数据,3000个连接观看一次发送20MB数据。

这可能需要修改ST做支持,参考:#13

@winlinvip
Copy link
Member Author

winlinvip commented Apr 18, 2020

看起来是Kernel 4.18新增的功能:https://www.douban.com/note/686726381/

这篇文章说如果不重用内存的话可以怎样,但不重用的话那还是会多一次拷贝了:https://zhuanlan.zhihu.com/p/28575308

从Linux文章介绍说可能不一定会拷贝成功:https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html#loopback

For this reason all packets generated with MSG_ZEROCOPY that are looped to a local socket will incur a deferred copy.

其中提到的deferred copy,是指:https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html#deferred-copies

Copy avoidance is not always feasible. Devices that do not support scatter-gather I/O cannot send packets made up of kernel generated protocol headers plus zerocopy user data. A packet may need to be converted to a private copy of data deep in the stack, say to compute a checksum.

In all these cases, the kernel returns a completion notification when it releases its hold on the shared pages. That notification may arrive before the (copied) data is fully transmitted. A zerocopy completion notification is not a transmit completion notification, therefore.

Deferred copies can be more expensive than a copy immediately in the system call, if the data is no longer warm in the cache. The process also incurs notification processing cost for no benefit. For this reason, the kernel signals if data was completed with a copy, by setting flag SO_EE_CODE_ZEROCOPY_COPIED in field ee_code on return. A process may use this signal to stop passing flag MSG_ZEROCOPY on subsequent requests on the same socket.

@winlinvip
Copy link
Member Author

winlinvip commented Apr 19, 2020

ST Scheduler

Zero Copy可能涉及到ST对于EPOLLERR事件的处理,需要花时间看下ST的调度机制。

IDLE Thread

以一个测试程序为例,分析ST的调度过程:

#include <st.h>
int main(int argc, char** argv) {
    st_set_eventsys(ST_EVENTSYS_ALT); // Use epoll.
    st_init();
    while (true) {
        st_sleep(10);
    }
    return 0;
}

Note: 我们在main协程中不断循环,不断sleep等待。

st_init时,就会启动一个idle协程,这个idle协程就是执行epoll_wait的那个协程了:

#0  st_thread_create (start=0x5eeac4 <_st_idle_thread_start>, arg=0x0, joinable=0, stk_size=0) at sched.c:550
#1  0x00000000005ee966 in st_init () at sched.c:172

int st_init(void)
{
    // Create idle thread
    _st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start, NULL, 0, 0);
    _st_this_vp.idle_thread->flags = _ST_FL_IDLE_THREAD; // 标记为IDLE协程

    // Initialize primordial thread
    thread = (_st_thread_t *) calloc(1, sizeof(_st_thread_t) + (ST_KEYS_MAX * sizeof(void *)));
    thread->flags = _ST_FL_PRIMORDIAL; // 标记为main协程
}
  • _st_this_vp: _st_vp_t,vp维护的就是协程的队列和时钟信息,它有几个重要的成员:
    • idle_thread: _st_thread_t*,默认的IDLE协程,会自动创建这个协程。
    • run_q: _st_clist_t,可运行的协程的队列,不断的添加和删除协程到这个队列,调度时从队列取出线程后切换到协程执行。
    • io_q: _st_clist_t,等待IO的协程的队列,当fd队列满了不可读写时就会EAGAIN,这是就会调用poll等待fd可读写,会放入IO队列。
    • zombie_q: _st_clist_t,退出了的协程的队列,清理协程。
    • sleep_q: _st_thread_t*,睡眠的队列,主要是有超时操作的线程,比如st_usleep,或者poll时有timeout,或者cond带timeout等。
  • IDLE和main协程,两个都是ST的主动对象,st_init先创建IDLE,然后创建main协程。
    • IDLE协程在后台运行,主要是ST的工作协程,会在所有协程(包括main)退出后自动退出。
    • main协程也叫primordial thread,就是main函数的协程,它的栈空间就是main的栈空间。
  • _st_this_thread: _st_thread_t*,当前运行的协程。
  • _st_eventsys: _st_eventsys_t*,当前的事件机制,比如epoll、kqueue、poll或select,比如要使用epoll时需要在st_init前就设置好。

下面是IDLE协程做的事情的分析:

void *_st_idle_thread_start(void *arg)
{
    _st_thread_t *me = _ST_CURRENT_THREAD();
    while (_st_active_count > 0) {
        /* Idle vp till I/O is ready or the smallest timeout expired */
        _ST_VP_IDLE();
        
        /* Check sleep queue for expired threads */
        _st_vp_check_clock();
        
        me->state = _ST_ST_RUNNABLE;
        _ST_SWITCH_CONTEXT(me);
    }
}
  • _ST_VP_IDLE就是_st_epoll_dispatch,也就是epoll_wait事件分发。其中epoll_wait有个参数就是timeout,这个timeout会根据当前的_ST_SLEEPQ也就是_st_this_vp.sleep_q来计算最小的超时时间。如果有大量的线程有timeout操作,那么就会导致_st_epoll_dispatch频繁调用。
    • 上面的例子中,main协程总是等待10秒,那么epoll_wait时的timeout也是10秒。
    • 如果上面的例子,将st_sleep(10),改成st_sleep(-1)也就是无超时,那么整个系统没有timeout操作,epoll_wait的timeout也就是-1,驱动系统的就是IO事件了。
    • 当然实际程序中当然是需要超时的,比如UDP必须使用超时检查。系统有时候也需要定时器,比如定时清理,或者定时更新时钟(可以用ST的时钟)。从上下游读取数据时,必须有超时,否则会导致连接数过多。
  • _st_vp_check_clock,会先获取当前时钟,只在初始化时会获取时钟,在这个函数时会更新时钟;相当于epoll_wait结束后更新下时钟。然后根据时钟,和上次时钟对比,看下哪些协程超时时间到了,就把这些协程添加到运行队列,并从超时队列删除。
  • _ST_SWITCH_CONTEXT,从当前IDLE线程切走。

总结起来说,IDLE协程,就是IO事件+超时事件驱动协程,本质上就是epoll_wait(io, timeout),等待IO事件到来,或者超时事件。

Scheduler

我们看到了_st_this_vp这个核心数据结构,有运行队列run_q,有IO队列io_q,还有等待队列sleep_q,这些队列如何调度,下面做详细分析。

首先,调度器的工作函数是_st_vp_schedule,它是放在宏定义_ST_SWITCH_CONTEXT中的。也就是在切换协程时,就会调用这个函数了,常见的调度入口包括:

  • st_poll,当FD不可读写(EAGAIN)时,就会调用这个函数等待IO事件,它会把协程放入io_q中,如果读写是有超时的还会放入sleep_q中,然后调用_ST_SWITCH_CONTEXT(me)进入调度函数。
  • _st_idle_thread_start,上面分析过,IDLE协程每次工作完,都会调用_ST_SWITCH_CONTEXT(me)进入调度函数。
  • st_thread_exit,如果协程是joinable,则会设置了term,结束时会先放入zombie_q,等待用户join给协程收尸,然后清理协程资源。等待用户join,清理协程之后,都调用了_ST_SWITCH_CONTEXT(thread)进入调度函数。
  • st_usleep,若有超时则加入sleep_q并设置为SLEEPING,若无超时则设置为SUSPEND,最后调用_ST_SWITCH_CONTEXT(me)进入调度函数。
  • st_cond_timedwait,等待条件变量时,若有超时则加入sleep_q,最后调用_ST_SWITCH_CONTEXT(me)进度调度函数。
  • st_mutex_lock,等待锁时,拿到锁后,调用_ST_SWITCH_CONTEXT(me)进度调度函数。

上面大致这些函数就是用户代码中,调用了st_read, st_write, st_sleep, st_cond_wait, st_mutext_lockst_thread_join等函数,最终就会进入调度器,因为这意味着可能需要切换到其他协程了。下面我们分析下调度宏的实现:

#define _ST_SWITCH_CONTEXT(_thread)       \
    if (!MD_SETJMP((_thread)->context)) { \
        _st_vp_schedule();                  \
    }                                     \

这个调度宏定义,就是将当前协程的信息保存,然后进入调度函数_st_vp_schedule,所以这个调度函数是ST的关键,它实现其实也简单:

void _st_vp_schedule(void)
{
    if (_ST_RUNQ.next != &_ST_RUNQ) {
        /* Pull thread off of the run queue */
        thread = _ST_THREAD_PTR(_ST_RUNQ.next);
        _ST_DEL_RUNQ(thread);
    } else {
        /* If there are no threads to run, switch to the idle thread */
        thread = _st_this_vp.idle_thread;
    }
    
    /* Resume the thread */
    thread->state = _ST_ST_RUNNING;
    _ST_RESTORE_CONTEXT(thread);
}
  • 调度函数就是优先从run_q中取出线程,如果没有可运行的线程就跑IDLE线程,然后切换到这个线程。
  • IDLE线程会驱动IO和timeout事件,把io的线程和等待到期的线程,放入run_q队列,然后让这个调度函数下次执行时就会从run_q中取出协程了。

可见run_q实际上是要马上执行的活动协程,由于IDLE叫IDLE空闲协程,所以在创建完它后,会把它从run_q中删除(默认创建线程后就会加入到run_q执行):

    // Create idle thread
    _st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start, NULL, 0, 0);
    _st_this_vp.idle_thread->flags = _ST_FL_IDLE_THREAD; // 标记为IDLE协程
    _st_active_count--; // IDLE协程并不算系统的总协程数目,IDLE会在总数目为0时退出。
    _ST_DEL_RUNQ(_st_this_vp.idle_thread); // IDLE也不在run_q中

我们分析下下面例子程序的调度过程:

#include <st.h>
int main(int argc, char** argv) {
    st_set_eventsys(ST_EVENTSYS_ALT);
    st_init(); // #1
    while (true) {
        st_sleep(3); // #2
        printf("OK\n"); // #3
    }
    return 0;
}
  1. st_init启动IDLE和main协程,继续返回main协程运行。
  2. main协程调用st_sleep,将main放入sleep_q,然后进入调度函数_st_vp_schedule切换。
    2.1 _st_vp_schedule发现run_q没有可运行的协程(main协程在sleep_q),选择IDLE协程执行_st_epoll_dispatch
    2.2 IDLE协程运行_st_epoll_dispatch,timeout为3000(main协程是最先超时的3s),调用epoll_wait(fds, 3s),如果没有IO事件,则超时返回,nfd为0,继续在IDLE协程执行。
    2.3 IDLE协程运行_st_vp_check_clock,检查sleep_q,发现main到期,将main协程加入到运行队列。IDLE协程进入调度函数_st_vp_schedule切换。
    2.4. _st_vp_schedule发现run_q有活跃的main协程,进入main协程。
  3. main协程打印OK,并重复第2步。

Cond & Mux

条件变量cond和锁mux,它们的调度实际上是由用户行为调度的,比如下面的程序:

void* foo(void* arg) {
    st_cond_t cond = (st_cond_t)arg;
    st_sleep(3); // #4
    st_cond_signal(cond); // #5
    return NULL;
}
int main(int argc, char** argv) {
    st_set_eventsys(ST_EVENTSYS_ALT);
    st_init(); // #1

    st_cond_t cond = st_cond_new();
    st_thread_create(foo, cond, 0, 0); // #2
    st_cond_wait(cond); // #3
    printf("OK\n"); // #6
    return 0; 
}

这个程序的运行序列是:

  1. st_init启动IDLE和main协程,继续返回main协程运行。
  2. st_thread_create创建foo协程,只是加入到run_q队列,返回main协程运行。
  3. st_cond_wait将当前main协程设置为_ST_ST_COND_WAIT状态,加入到cond的wait_q,然后进入调度函数_st_vp_schedule切换。
    3.1 _st_vp_schedule调度函数的run_q有个foo协程待运行,切换到这个foo协程运行。
  4. foo协程调用st_sleep(3),将foo协程加入sleep_q,最后进入调度函数_st_vp_schedule切换。
    4.1 _st_vp_schedule发现run_q没有可运行的协程(main协程在等待cond,foo协程在sleep_q),选择IDLE协程执行_st_epoll_dispatch
    4.2 IDLE协程运行_st_epoll_dispatch,timeout为3000(foo协程是最先超时的3s),调用epoll_wait(fds, 3s),如果没有IO事件,则超时返回,nfd为0,继续在IDLE协程执行。
    4.3 IDLE协程运行_st_vp_check_clock,检查sleep_q,发现foo到期,将foo协程加入到运行队列。IDLE协程进入调度函数_st_vp_schedule切换。
    4.4. _st_vp_schedule发现run_q有活跃的foo协程,进入foo协程。
  5. foo协程调用st_cond_signal,发现有main协程在等待,将main加入到run_q队列。
    5.1 foo协程退出,进入调度函数_st_vp_schedule切换。
    5.2 _st_vp_schedule发现main协程在run_q,切换到main协程执行。
  6. main协程打印OK,并退出。

Note:如果cond wait时没有设置超时,协程只会被cond调度到,不会在_st_this_vp的调度队列。

Note: 只适合系统有个总的定时器,由定时器驱动cond去通知其他的线程,比如检查session是否超时,可以由总的定时器发cond通知就好了。

Note: 全部使用cond+io,这是最高效的。cond意味着要唤起处理,处理时也可以进入io等待,或者处理定时器事件,这样系统就是全部IO驱动,定时器的timeout如果只有一个就会形成脉冲,本质上就变成:epoll_wait(fds, 300ms)类似这样,而不会是epoll_wait(fds, 10ms/ 20ms/ 100ms)这种。

@winlinvip
Copy link
Member Author

image

这是目前SRS的UDP的调度结构图。基本上还是很高效的,所以st没有出现在瓶颈中。可以改进的点包括:

  1. 系统只有一个sleep,在main协程中,转换成cond通知其他协程。
  2. HTTP Stream使用了sleep,所以会触发ST切换的性能,HTTP-FLV的性能比RTMP低,应该和RTMP一样使用cond。
  3. MR(Merged Write)使用了sleep,所以导致读优化效率不高,应该使用readmsg或readmmsg,结合cond处理。不过读的优化和写不太一样,写可以确定多少个消息后再写,而读则比较麻烦些,特别是TCP要看如何实现端口复用,这样可以有很多消息可以读(比如RTMP回源使用私有协议)。
  4. Consumer每个都有个cond,导致Source需要每个Consumer调用cond siginal,其实可以把cond放在source上,让每个consumer等待source的事件,实现broadcast,批量唤起这些协程。

@winlinvip
Copy link
Member Author

winlinvip commented Apr 19, 2020

ST IO

关于ST的IO机制,主要看下ST基于Linux的epoll如何实现,其他事件框架类似。

Open

ST提供几个open的函数:

  • st_open,打开指定路径的文件,设置为非阻塞模式。
  • st_netfd_open,将系统fd转成ST的fd,使用fnctl方式设置为非阻塞模式。
  • st_netfd_open_socket,将已经存在的socket fd转成ST的fd,使用ioctl设置为非阻塞模式。

以上三个函数,最终都是调用_st_netfd_new创建ST的fd,全部使用非阻塞模式。

Accept

作为TCP服务器,ST提供了st_accept接收新的连接。还有个st_netfd_serialize_accept设置为多进程时串行接收,现在一般用不着了,可以忽略。

st_accept的实现,和很多ST的IO函数一样,都是下面的逻辑:

_st_netfd_t *st_accept(_st_netfd_t *fd, struct sockaddr *addr, int *addrlen, st_utime_t timeout)
{
    for ( ; ; ) {
        osfd = accept(fd->osfd, addr, (socklen_t *)addrlen);
        if (osfd >= 0)
            break;
        if (errno == EINTR)
            continue;
        if (!_IO_NOT_READY_ERROR)
            return NULL;
        /* Wait until the socket becomes readable */
        if (st_netfd_poll(fd, POLLIN, timeout) < 0)
            return NULL;
    }
    newfd = _st_netfd_new(osfd, 1, 1);
    return newfd;
}

Note: 这里总是忽略了errno=EINTR,这是系统的错误码。实际上当调用st_thread_interrupt时,会设置thread->flags并将线程放入run_q队列,这样继续运行时就会发现是协程中断了。所以几乎每个给用户调用的st函数,都会检查_ST_FL_INTERRUPT这个标记,来判断当前协程是否已经中止运行。

  • 调用系统函数accept,尝试读取新的客户端,如果顺利完成,就直接返回了。
  • 如果fd不可读,也就是没有新的客户端,errno是EAGAIN,那么就用poll(POLLIN)函数等待客户端连接过来。
  • 如果有新的连接过来,按照之前的调度逻辑,会再次切回这个协程,继续accept。

Note: _IO_NOT_READY_ERROR宏,就是判断是否是EAGAIN,也就是暂时fd还未准备好。

#if EAGAIN != EWOULDBLOCK
    #define _IO_NOT_READY_ERROR  ((errno == EAGAIN) || (errno == EWOULDBLOCK))
#else
    #define _IO_NOT_READY_ERROR  (errno == EAGAIN)
#endif

Polling

当FD没有准备好时,也就是EAGAIN时,异步回调的框架,就会逐步返回函数,下次cycle大循环再继续处理。而ST则更好解决这个问题,就会调用st_netfd_poll等待fd准备好了之后,再把协程切回来,用户感觉不到EAGAIN。

int st_netfd_poll(_st_netfd_t *fd, int how, st_utime_t timeout)
{
    if ((n = st_poll(&pd, 1, timeout)) < 0)
}

// in sched.c
int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)
{
    if ((*_st_eventsys->pollset_add)(pds, npds) < 0)
        return -1;
    
    pq.pds = pds;
    pq.npds = npds;
    pq.thread = me;
    pq.on_ioq = 1;
    _ST_ADD_IOQ(pq);
    if (timeout != ST_UTIME_NO_TIMEOUT)
        _ST_ADD_SLEEPQ(me, timeout);
    me->state = _ST_ST_IO_WAIT;
    
    _ST_SWITCH_CONTEXT(me);
}
  • 调用pollset_add添加fd,实际上是_st_epoll_pollset_add把pds加入到epoll中,然后加入到io_q中,如果有timeout加入到sleep_q中。
  • 设置线程状态为_ST_ST_IO_WAIT,表示这个协程是在IO等待中,虽然它也可能在sleep_q中。如果只是在sleep_q中,比如用st_sleep导致线程挂起,那状态是_ST_ST_SLEEPING(有超时时间)或者_ST_ST_SUSPENDED(-1永不超时,只能靠中断协程才能返回了)。
  • 使用_ST_SWITCH_CONTEXT进入调度器,切换到其他协程,前面分析过详细过程。

我们看下_st_epoll_pollset_add详细的处理:

ST_HIDDEN int _st_epoll_pollset_add(struct pollfd *pds, int npds)
{
    for (i = 0; i < npds; i++) {
        fd = pds[i].fd;
        old_events = _ST_EPOLL_EVENTS(fd); 

        if (pds[i].events & POLLIN)
            _ST_EPOLL_READ_CNT(fd)++;
        if (pds[i].events & POLLOUT)
            _ST_EPOLL_WRITE_CNT(fd)++;
        if (pds[i].events & POLLPRI)
            _ST_EPOLL_EXCEP_CNT(fd)++;

        events = _ST_EPOLL_EVENTS(fd);
        if (events != old_events) {
            op = old_events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
            ev.events = events;
            ev.data.fd = fd;
            if (epoll_ctl(_st_epoll_data->epfd, op, fd, &ev)

            // 全局缓存的epoll对象,有每个fd的缓存。
            // (gdb) p *_st_epoll_data
            // $43 = {fd_data = 0xd511f0, evtlist = 0xd61200, evtlist_size = 4096, evtlist_cnt = 0, epfd = 7}
            // (gdb) p _st_epoll_data->fd_data[11]
            // $45 = {rd_ref_cnt = 0, wr_ref_cnt = 0, ex_ref_cnt = 0, revents = 0}
  • _st_epoll_data是epoll全局对象,epfd就是epoll_create返回的epoll的fd。也就是每个fd有16字节的cache,100万个fd需要大约16MB。
  • 为了支持对一个fd多次poll,ST还保存了每个fd的event计数。存储在_st_epoll_data的fd_data链表中。这样在poll add时,可以获取fd的old_events,也就是目前的状态。并将对应的计数器增加,比如_ST_EPOLL_READ_CNT。
  • 合并fd的状态成events,调用epoll_ctl新增或者修改fd的状态。比如对一个fd,有两个poll,一个是poll(IN),一个是poll(IN | OUT),那么最终合并成IN | OUT事件。

加入到epoll后,就开始等待IO事件了。

IO Event

如果来了一个TCP客户端,比如RTMP推流,那么epoll_wait返回就是1了。处理流程如下:

ST_HIDDEN void _st_epoll_dispatch(void)
{
    nfd = epoll_wait(_st_epoll_data->epfd, _st_epoll_data->evtlist, _st_epoll_data->evtlist_size, timeout);

    if (nfd > 0) {
        for (i = 0; i < nfd; i++) {
            osfd = _st_epoll_data->evtlist[i].data.fd;
            _ST_EPOLL_REVENTS(osfd) = _st_epoll_data->evtlist[i].events;
            if (_ST_EPOLL_REVENTS(osfd) & (EPOLLERR | EPOLLHUP)) {
                /* Also set I/O bits on error */
                _ST_EPOLL_REVENTS(osfd) |= _ST_EPOLL_EVENTS(osfd);
            }
        }

        for (q = _ST_IOQ.next; q != &_ST_IOQ; q = q->next) {
  • 先遍历一遍所有活跃的fd,把events存放在_ST_EPOLL_REVENTS中,也就是前面将的每个fd的计数器的revents中。revents就是目前这个fd的比较完整的events集合,因为不仅有用户侦听的IN或OUT,对于ERR或HUP等总是读取的事件也会记录在这里,见下面一条描述。
  • 对于EPOLLERR | EPOLLHUP(这两个事件用户不设置也会读取到),还会把用户原始的events也合并进来。比如HUP就是Hangup事件,意味着对方关闭了fd,这时候可能原始的events不在返回的events中,意思就是收到关闭事件时,默认认为我们收到了读写等事件,所有等待读写的协程,都会被激活了。
  • 后面就是循环一遍io_q,激活对应的线程了。由于只缓存了fd的事件,而没有缓存对应等待的协程,所以这里循环了所有的io_q,所以之前有个PR是改进这块的,参考 #4#5,这块逻辑比较复杂,下面单独分析。

首先,添加到io_q就一个地方,在st_poll

int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)
{
    _st_pollq_t pq;
    pq.pds = pds;
    pq.npds = npds;
    pq.thread = me;
    pq.on_ioq = 1;
    _ST_ADD_IOQ(pq);
    me->state = _ST_ST_IO_WAIT;

    // pq就是代表当前等待的事件和线程,内容如下:
    // (gdb) p &pq
    // $96 = (_st_pollq_t *) 0x7ffff7fc4bb0
    // (gdb) p pq.links
    // $93 = {next = 0x0, prev = 0x0}

    // 调用前,io_q是这样的,io_q指向自己,也就是空的:
    // (gdb) p &_st_this_vp.io_q 
    // $90 = (_st_clist_t *) 0xc8e640 <_st_this_vp+32>
    // (gdb) p _st_this_vp.io_q 
    // $91 = {next = 0xc8e640 <_st_this_vp+32>, prev = 0xc8e640 <_st_this_vp+32>}

    // 调用后,io_q和qp.links链接起来了:
    // (gdb) p _st_this_vp.io_q 
    // $97 = {next = 0x7ffff7fc4bb0, prev = 0x7ffff7fc4bb0}
    // (gdb) p pq.links
    // $107 = {next = 0xc8e640 <_st_this_vp+32>, prev = 0xc8e640 <_st_this_vp+32>}

    // 注意:_st_pollq_t被直接转成了_st_clist_t,它们两个的内存布局开头是相同的。
    // (gdb) p _st_this_vp.io_q.next
    // $103 = (struct _st_clist *) 0x7ffff7fc4bb0
    // 所以我们要将io_q的next,直接转成_st_pollq_t,才是它真正指向的对象。
    // (gdb) p *(_st_pollq_t*)_st_this_vp.io_q.next
    // $105 = {links = {next = 0xc8e640 <_st_this_vp+32>, prev = 0xc8e640 <_st_this_vp+32>}, thread = 0x7ffff7fc4e30, pds = 0x7ffff7fc4c30, npds = 1, on_ioq = 1}

注意:所有的IO都会放在io_q中,比如SRS开启的signal(将信号转pipe),侦听配置文件的变更inotify(fd),至少会有3个节点和协程在io_q中。

现在协程已经在io_q中,我们看epoll_wait读取到新的TCP连接时的处理:

ST_HIDDEN void _st_epoll_dispatch(void)
{
    nfd = epoll_wait(_st_epoll_data->epfd, _st_epoll_data->evtlist, _st_epoll_data->evtlist_size, timeout);

    if (nfd > 0) {
        for (q = _ST_IOQ.next; q != &_ST_IOQ; q = q->next) {
            pq = _ST_POLLQUEUE_PTR(q);
            // 和上面一样,`q: _st_clist_t*`是链表,我们需要将它转成实际的类型`pq: _st_pollq_t*`。
            // _ST_POLLQUEUE_PTR这个宏,就是计算两个结构转换需要的偏移量。
            // (gdb) p q
            // $109 = (_st_clist_t *) 0x7ffff7fc4bb0
            // (gdb) p pq
            // $110 = (_st_pollq_t *) 0x7ffff7fc4bb0

            notify = 0;
            epds = pq->pds + pq->npds;
            for (pds = pq->pds; pds < epds; pds++) {
            // 这段就是遍历所有的pds,也就是所有的侦听。
            // (gdb) p pq->pds
            // $118 = (struct pollfd *) 0x7ffff7fc4c30
            // (gdb) p pq->npds
            // $119 = 1
            // (gdb) p *pds
            // $123 = {fd = 11, events = 1, revents = 0}

                // 这段就是看fd是否有事件,如果没有就忽略,继续找下一个。
                // revents就是这次循环会设置的标记,代表fd的活跃事件。
                if (_ST_EPOLL_REVENTS(pds->fd) == 0) {
                    pds->revents = 0;
                    continue;
                }

                // 下面这段就是fd有活跃事件,设置pds的revents和notify,激活这个协程。
                // 全局缓存中的revents,和pq的revents,都代表了这次poll的事件。
                osfd = pds->fd;
                events = pds->events;
                pds->revents = revents;
                revents = 0;
                if ((events & POLLIN) && (_ST_EPOLL_REVENTS(osfd) & EPOLLIN))
                    revents |= POLLIN;
                ......
                if (revents) {
                    notify = 1;
                }
                // 这里将fd对应的pds设置事件。
                // (gdb) p *pds
                // $127 = {fd = 11, events = 1, revents = 1}
            }

            if (notify) {
                // 如果协程有侦听的fd被激活,那么就把协程从io_q移除。
                ST_REMOVE_LINK(&pq->links);
                pq->on_ioq = 0;

                // 减少fd的全局计数器,代表这个协程已经不再侦听这个fd了。
                // 调用结束后,可以看这个fd的cache信息,rd_ref_cnt已经从1变成0了:
                // (gdb) p _st_epoll_data->fd_data[11]
                // $146 = {rd_ref_cnt = 0, wr_ref_cnt = 0, ex_ref_cnt = 0, revents = 1}
                // 但是注意,如果有数据,并不会调用epoll_ctl删除,因为是属于fired的fd,revents非0。
                // 比如线程侦听了两个fd,一个活跃一个不活跃,只会把不活跃的给删除侦听了。
                _st_epoll_pollset_del(pq->pds, pq->npds);

                // 设置协程为活跃,调度器将跳到这个协程执行。
                pq->thread->state = _ST_ST_RUNNABLE;
                _ST_ADD_RUNQ(pq->thread);
        }

        // 对于fired也就是活跃的fd,重置revents为0,并修改它的侦听。
        // 比如如果有两个协程在侦听,一个侦听的是IN,一个侦听的是OUT,
        // 这次收到的是IN事件,那么就会把IN去掉不侦听,只侦听OUT。
        // 如果fd只有一个协程在侦听,那么就直接DEL掉侦听了。
        for (i = 0; i < nfd; i++) {
            /* Delete/modify descriptors that fired */
            osfd = _st_epoll_data->evtlist[i].data.fd;
            _ST_EPOLL_REVENTS(osfd) = 0;
            events = _ST_EPOLL_EVENTS(osfd);
            op = events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
            ev.events = events;
            ev.data.fd = osfd;
            if (epoll_ctl(_st_epoll_data->epfd, op, osfd, &ev) == 0 && op == EPOLL_CTL_DEL) {
                _st_epoll_data->evtlist_cnt--;
            }
        }

@winlinvip
Copy link
Member Author

winlinvip commented Apr 19, 2020

看了下ST的IO机制,是可以支持Zero Copy的,在收到EPOLLERR时,对应poll这个fd的协程会被唤起和激活,我们就可以接收反馈消息了。

可以写一个测试程序,序列如下:

sendmsg(fd, buf0, sizeof(buf0), MSG_ZEROCOPY);
sendmsg(fd, buf1, sizeof(buf1), MSG_ZEROCOPY);
st_netfd_poll(fd, POLLERR);
recvmsg(fd, &msg, MSG_ERRQUEUE);

需要确认的是:

  • 由于POLLERR是默认事件,实际上不用poll,那么可以直接recvmsg读取,如果读取不到会等下一次。
  • 但是st_recvmsg在EAGAIN时,poll的是IN事件,那么有数据来的时候也可能会激活这个FD了。这和接收线程就会被同时激活,相当于接收数据的协程和这个协程同时都在收fd的数据,不过他们flag是不同的。

实例代码:https://github.com/ossrs/srs/tree/develop/trunk/research/msg_zerocopy

Linux Kernel

需要Linux 5.0内核,才能支持UDP的Zero Copy,参考4.19: udpgso_bench_tx: setsockopt zerocopy: Unknown error 524

Note: MSG_ZEROCOPY for UDP was added in commit b5947e5d1e71 ("udp: msg_zerocopy") in Linux 5.0.

升级CentOS8内核到5.0:

rpm --import https://www.elrepo.org/RPM-GPG-KEY-elrepo.org
dnf -y install https://www.elrepo.org/elrepo-release-8.0-2.el8.elrepo.noarch.rpm
dnf repolist
dnf --disablerepo="*" --enablerepo="elrepo-kernel" list available | grep kernel-ml
dnf -y --enablerepo=elrepo-kernel install kernel-ml
reboot

重启服务器后,查看内核版本:

[root@iZ8vbff2yyg58069iioe9dZ msg_zerocopy]# uname -r
5.6.5-1.el8.elrepo.x86_64

Reception

使用Zero Copy需要设置socket:

int one = 1;
int r0 = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &one, sizeof(one));
assert(!r0);

发送时,设置flags:

int r0 = st_sendmsg(stfd, &msg, MSG_ZEROCOPY, ST_UTIME_NO_TIMEOUT);
assert(r0 > 0);

然后,我们接收内核的反馈消息,参考Reception

char control[100];
msg.msg_control = control;
msg.msg_controllen = sizeof(control);
r0 = st_recvmsg(stfd, &msg, MSG_ERRQUEUE, ST_UTIME_NO_TIMEOUT);

Remark: st_recvmsg的返回值,-1代表错误,0代表有可读的消息但没读出来可以判断flags,>0代表有数据。

发送后,ST在_st_epoll_dispatch函数中,调用epoll_wait就可以收到EPOLLERR=0x08事件:

1236	    nfd = epoll_wait(_st_epoll_data->epfd, _st_epoll_data->evtlist, _st_epoll_data->evtlist_size, timeout);
(gdb) p nfd
$6 = 1
(gdb) p/x _st_epoll_data->evtlist[i]
$8 = {events = 0x8, data = {ptr = 0x04, fd = 0x4, u32 = 0x4, u64 = 0x04}}

ST发现是EPOLLERR(0x08),则会把侦听的事件EPOLLIN(0x01)也加到revents了:

            _ST_EPOLL_REVENTS(osfd) = _st_epoll_data->evtlist[i].events;
            if (_ST_EPOLL_REVENTS(osfd) & (EPOLLERR | EPOLLHUP)) {
                /* Also set I/O bits on error */
                _ST_EPOLL_REVENTS(osfd) |= _ST_EPOLL_EVENTS(osfd);
            }

// events=0x01, events=0x08, revents=0x09(0x8|0x01)
(gdb) p/x _st_epoll_data->fd_data[4]
$11 = {rd_ref_cnt = 0x1, wr_ref_cnt = 0x0, ex_ref_cnt = 0x0, revents = 0x9}

这时候如果有st_recvmsg,则会唤醒所有的协程,不管是否是侦听了EPOLLERR。比如如果有另外协程,是在st_recvfrom接收消息,也一样会被激活:

int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)
{
    _ST_SWITCH_CONTEXT(me);
    
    n = 0;
    if (pq.on_ioq) {
        /* If we timed out, the pollq might still be on the ioq. Remove it */
        _ST_DEL_IOQ(pq);
        (*_st_eventsys->pollset_del)(pds, npds);
    } else {
        /* Count the number of ready descriptors */
        for (pd = pds; pd < epd; pd++) {
            if (pd->revents)
                n++;
        }
    }

// 这时侦听的是EPOLLIN(events=0x01),而收到的是EPOLLERR(0x08),
// 所以n++后会是1,会继续下一次的recvmsg。
(gdb) p/x pd[0]
$25 = {fd = 0x4, events = 0x1, revents = 0x9}

也就是所有侦听这个fd的协程都会唤醒,然后再次尝试读取消息,当然如果一般接收数据的协程就会进入EAGAIN继续下一次等待。

也就是说,在实际的服务器上,一般上会有两个协程,一个收一个发:

// coroutine #1
st_recvmsg(stfd, buf, sizeof(buf), 0, timeout);

// coroutine #2
st_sendmsg(stfd, buf, sizeof(buf), MSG_ZEROCOPY, timeout);
st_recvmsg(stfd, buf, sizeof(buf), MSG_ERRQUEUE, timeout);

当协程2使用ZeroCopy发送数据后,协程1和2会被依次唤醒继续尝试recvmsg,协程1会发现EAGAIN实际上没有数据,协程2会收到ZeroMessage的消息。

Note: 这实际上并不会有问题,效率上有点浪费,如果有性能瓶颈,则可以使用reuseport将收发的fd分离,就可以解决这个问题。

Notification Parsing

收到control消息后,解析内核确认的序列,参考Notification Parsing

    cmsghdr* cm = CMSG_FIRSTHDR(&msg);
    assert(cm->cmsg_level == SOL_IP || cm->cmsg_type == IP_RECVERR);

    sock_extended_err* serr = (sock_extended_err*)(void*)CMSG_DATA(cm);
    assert(serr->ee_errno == 0 && serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY);

    uint32_t hi = serr->ee_data;
    uint32_t lo = serr->ee_info;
    uint32_t range = hi - lo + 1;

比如发送第一个消息后,range=1,lo=0,hi=0,表示内核确认了这个消息。

@winlinvip
Copy link
Member Author

winlinvip commented Apr 20, 2020

Sendmmsg

内核是以msghdr为单位确认的。对于sendmmsg,内核是每个msghdr分配一个id,比如sendmmsg一次发送2个消息,内核确认是:

Reception 48 bytes, flags 0x2000, cmsg(level 0, type 0xb), serr(errno 0, origin 0x5, code 0), 
range 2 [0, 1]

range=2, lo=0, hi=1,确认了这两个消息。

Multiple Sendmmsg

若发送了4个消息,分两次发送,每次用sendmmsg发送2个,那么kernel反馈的消息如下:

Ping 172.26.129.126:8001 5 bytes, copies=1, r0=2, Hello
Reception 48 bytes, flags 0x2000, cmsg(level 0, type 0xb), serr(errno 0, origin 0x5, code 0), range 2 [0, 1]

Ping 172.26.129.126:8001 5 bytes, copies=1, r0=2, Hello
Reception 48 bytes, flags 0x2000, cmsg(level 0, type 0xb), serr(errno 0, origin 0x5, code 0), range 2 [2, 3]

Merged Reception

如果发送了多次消息,但只在最后收一次,那么内核会一次确认这4个消息:

Reception 48 bytes, flags 0x2000, cmsg(level 0, type 0xb), serr(errno 0, origin 0x5, code 0), 
range 4 [0, 3]

range=4, lo=0, hi=3,表示这4个消息都被确认了。

@winlinvip
Copy link
Member Author

winlinvip commented Apr 20, 2020

Mix

支持ZeroCopy的消息和非ZeroCopy的消息混合发送,参考Mixing CopyAvoidance and Copying

如果混合了其他消息,kernel确认的ID只对zerocopy的msghdr编号,比如我们发送的序列如下:

sendmsg(msg, MSG_ZEROCOPY) #0
sendmsg(msg, MSG_ZEROCOPY) #1
sendmsg(msg, 0)
sendmsg(msg, MSG_ZEROCOPY) #2
sendmsg(msg, MSG_ZEROCOPY) #3

确认的序列:

Reception 48 bytes, flags 0x2000, cmsg(level 0, type 0xb), serr(errno 0, origin 0x5, code 0), 
range 2 [0, 1]
Reception 48 bytes, flags 0x2000, cmsg(level 0, type 0xb), serr(errno 0, origin 0x5, code 0), 
range 2 [2, 3]

或者一次确认:

Reception 48 bytes, flags 0x2000, cmsg(level 0, type 0xb), serr(errno 0, origin 0x5, code 0), 
range 4 [0, 3]

也就是range=4,lo=0, hi=3,并不包含非zerocopy的消息。

@winlinvip
Copy link
Member Author

winlinvip commented Apr 21, 2020

发现ZeroCopy时是有些限制的:

  • 单个UDP包Payload长度不能超过1472字节。
  • 单个msghdr的iovs总长度不能超过65507字节。
  • 单个msghdr被GSO的包个数不能超过36个。
  • 单个msghdr的iovs个数不能超过133个。
  • GSO切iovs为36个时,单个iov的长度不能超过6字节。

下表是测出来实际可用的数据:

GSO IOVS IOV_LEN Bytes
0 1 1472 1472
1472 1 65507 65507
1472 2 31456 62912
1472 3 19168 57504
1472 4 15072 60288
1472 5 10976 54880
1472 6 6880 41280
1472 17 1472 25024
0 133 1 133

MTU

实测发现如果包超过1472字节,就会出现defered copies:

./client --host=172.26.129.126 --port=8001 --zerocopy=true --iovs=1 --size=1473
Ping 172.26.129.126:8001 1473 bytes, control 0, copies=0, r0=1473, Hello
Reception 48 bytes, flags 0x2000, cmsg(level 0, type 0xb), serr(errno 0, origin 0x5, code 0x1), range 1 [0, 0]
Warning: Defered copies, should stop zerocopy

Note: 这是因为每个UDP包不要超过MTU。

UDP MaxSize

如果包超过65508字节,就会出现errno=90(Message too long)错误:

./client --host=172.26.129.126 --port=8001 --zerocopy=true --iovs=1 --size=65508
Server listen 172.26.129.126:8001, pong 0, zerocopy 1, copies 0, loop 1, batch 0, mix 0, size 65508, gso 0, iovs 1, sndbuf 0
epoll events EPOLLERR=0x8, EPOLLHUP=0x10
socket SO_SNDBUF default=2097152, user=0, now=2097152, r0=0, r1=0, r2=0
Client fd=4
Ping 65508 bytes, error r0=-1, errno=90

Note: 这是超过了一个UDP包的总长度了。

GSO

可以使用GSO将包分成多个包,当然就不是在一个Payload中发送,而是在多个UDP包了。

比如1500字节的内容,我们GSO设为1400,那么会分成两个UDP包:

./client --host=172.26.129.126 --port=8001 --zerocopy=true --iovs=1 --size=1500 --gso=1400
Ping 172.26.129.126:8001 1500 bytes, control 24, copies=0, r0=1500, Hello
Reception 48 bytes, flags 0x2000, cmsg(level 0, type 0xb), serr(errno 0, origin 0x5, code 0), range 1 [0, 0]

From 172.26.129.127:50407 1400 bytes, flags 0, Hello
From 172.26.129.127:50407 100 bytes, flags 0, 

但是注意,GSO也不能超过65508字节内容,比如:

./client --host=172.26.129.126 --port=8001 --zerocopy=true --iovs=1 --size=65508 --gso=1400
Ping 65508 bytes, error r0=-1, errno=90

还有,GSO的包个数不能超过36个,比如:

./client --host=172.26.129.126 --port=8001 --zerocopy=true --iovs=1 --size=37 --gso=1

Ping 37 bytes, error r0=-1, errno=22

IOVEC

单个msghdr的iovs不能超过134个:

./client --host=172.26.129.126 --port=8001 --zerocopy=true --iovs=134 --size=1

Ping 134 bytes, error r0=-1, errno=90

另外,iov的长度变长时,iovs个数会更少,比如iov每个是6字节时,不能超过24:

./client --host=172.26.129.126 --port=8001 --zerocopy=true --iovs=25 --size=16

Ping 400 bytes, error r0=-1, errno=90

如果有GSO,iovs最多不能超过36个:

./client --host=172.26.129.126 --port=8001 --zerocopy=true --iovs=37 --size=6 --gso=1400

Ping 222 bytes, error r0=-1, errno=90
./client --host=172.26.129.126 --port=8001 --zerocopy=true --iovs=36 --size=7 --gso=1400

Ping 252 bytes, error r0=-1, errno=90

SENSMMSG

对于sendmmsg没有限制:

./client --host=172.26.129.126 --port=8001 --zerocopy=true --iovs=36 --size=6 --gso=6 --copy=10

Ping 172.26.129.126:8001 36 bytes, control 24, copies=10, r0=11, H
Reception 48 bytes, flags 0x2000, cmsg(level 0, type 0xb), serr(errno 0, origin 0x5, code 0), range 6 [0, 5]
Reception 48 bytes, flags 0x2000, cmsg(level 0, type 0xb), serr(errno 0, origin 0x5, code 0), range 1 [6, 6]
Reception 48 bytes, flags 0x2000, cmsg(level 0, type 0xb), serr(errno 0, origin 0x5, code 0), range 1 [7, 7]
Reception 48 bytes, flags 0x2000, cmsg(level 0, type 0xb), serr(errno 0, origin 0x5, code 0), range 1 [8, 8]
Reception 48 bytes, flags 0x2000, cmsg(level 0, type 0xb), serr(errno 0, origin 0x5, code 0), range 1 [9, 9]
Reception 48 bytes, flags 0x2000, cmsg(level 0, type 0xb), serr(errno 0, origin 0x5, code 0), range 1 [10, 10]

或者无GSO:

./client --host=172.26.129.126 --port=8001 --zerocopy=true --iovs=1 --size=1400 --copy=1024

Ping 172.26.129.126:8001 1400 bytes, control 0, copies=1024, r0=1025, Hello
Reception 48 bytes, flags 0x2000, cmsg(level 0, type 0xb), serr(errno 0, origin 0x5, code 0), range 1021 [0, 1020]
Reception 48 bytes, flags 0x2000, cmsg(level 0, type 0xb), serr(errno 0, origin 0x5, code 0), range 4 [1021, 1024]

@winlinvip
Copy link
Member Author

winlinvip commented Apr 21, 2020

测试了下,不需要ST做修改,在调用层接收内核的消息就可以。

另外,从数据看,ZeroCopy降低了copy_user_enhanced_fast_string,但是也导致一个新的函数成为热点,比如get_user_pages_fast和_raw_spin_unlock_irqrestore。总体看起来和没有COPY差不多。

代码:https://github.com/winlinvip/srs/tree/feature/msg_zerocopy

@ossrs ossrs deleted a comment from Blacklion55 Jul 1, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant