Skip to content

Commit 27de7e4

Browse files
author
Gijs Peskens
authored
Optimize error handling & improve non-blocking performance (#37)
* Optimize error handling & provide fast path for non-blocking send runtime.LockOSThread() has a cost associated with it, as well as each cgo call. Also provide a fastpath for non-blocking mode: try to send first and only start the epoller when send fails, this reduces a cgo call in the happy case (most likely the default case).
1 parent 056991b commit 27de7e4

File tree

3 files changed

+134
-56
lines changed

3 files changed

+134
-56
lines changed

read.go

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package srtgo
2+
3+
/*
4+
#cgo LDFLAGS: -lsrt
5+
#include <srt/srt.h>
6+
7+
int srt_recvmsg2_wrapped(SRTSOCKET u, char* buf, int len, SRT_MSGCTRL *mctrl, int *srterror, int *syserror)
8+
{
9+
int ret = srt_recvmsg2(u, buf, len, mctrl);
10+
if (ret < 0) {
11+
*srterror = srt_getlasterror(syserror);
12+
}
13+
return ret;
14+
}
15+
16+
*/
17+
import "C"
18+
import (
19+
"errors"
20+
"fmt"
21+
"runtime"
22+
"syscall"
23+
"unsafe"
24+
)
25+
26+
func srtRecvMsg2Impl(u C.SRTSOCKET, buf []byte, msgctrl *C.SRT_MSGCTRL) (n int, err error) {
27+
srterr := C.int(0)
28+
syserr := C.int(0)
29+
n = int(C.srt_recvmsg2_wrapped(u, (*C.char)(unsafe.Pointer(&buf[0])), C.int(len(buf)), msgctrl, &srterr, &syserr))
30+
if n < 0 {
31+
srterror := SRTErrno(srterr)
32+
if syserr < 0 {
33+
srterror.wrapSysErr(syscall.Errno(syserr))
34+
}
35+
err = srterror
36+
}
37+
return
38+
}
39+
40+
// Read data from the SRT socket
41+
func (s SrtSocket) Read(b []byte) (n int, err error) {
42+
//Fastpath
43+
n, err = srtRecvMsg2Impl(s.socket, b, nil)
44+
45+
if err != nil {
46+
if errors.Is(err, error(EAsyncRCV)) {
47+
runtime.LockOSThread()
48+
defer runtime.UnlockOSThread()
49+
timeoutMs := C.int64_t(s.pollTimeout)
50+
fds := [1]C.SRT_EPOLL_EVENT{}
51+
len := C.int(1)
52+
res := C.srt_epoll_uwait(s.epollIn, &fds[0], len, timeoutMs)
53+
if res == 0 {
54+
return 0, &SrtEpollTimeout{}
55+
}
56+
if res == SRT_ERROR {
57+
return 0, fmt.Errorf("error in read:epoll %w", srtGetAndClearError())
58+
}
59+
if fds[0].events&C.SRT_EPOLL_ERR > 0 {
60+
return 0, srtGetAndClearError()
61+
}
62+
//Read again, now that we are ready
63+
n, err = srtRecvMsg2Impl(s.socket, b, nil)
64+
}
65+
}
66+
return
67+
}

srtgo.go

-56
Original file line numberDiff line numberDiff line change
@@ -292,62 +292,6 @@ func (s SrtSocket) Connect() error {
292292
return nil
293293
}
294294

295-
// Read data from the SRT socket
296-
func (s SrtSocket) Read(b []byte) (n int, err error) {
297-
runtime.LockOSThread()
298-
defer runtime.UnlockOSThread()
299-
if !s.blocking {
300-
timeoutMs := C.int64_t(s.pollTimeout)
301-
fds := [1]C.SRT_EPOLL_EVENT{}
302-
len := C.int(1)
303-
res := C.srt_epoll_uwait(s.epollIn, &fds[0], len, timeoutMs)
304-
if res == 0 {
305-
return 0, &SrtEpollTimeout{}
306-
}
307-
if res == SRT_ERROR {
308-
return 0, fmt.Errorf("error in read:epoll %w", srtGetAndClearError())
309-
}
310-
if fds[0].events&C.SRT_EPOLL_ERR > 0 {
311-
return 0, srtGetAndClearError()
312-
}
313-
}
314-
315-
res := C.srt_recvmsg2(s.socket, (*C.char)(unsafe.Pointer(&b[0])), C.int(len(b)), nil)
316-
if res == SRT_ERROR {
317-
return 0, fmt.Errorf("error in read:srt_recvmsg2 %w", srtGetAndClearError())
318-
}
319-
320-
return int(res), nil
321-
}
322-
323-
// Write data to the SRT socket
324-
func (s SrtSocket) Write(b []byte) (n int, err error) {
325-
runtime.LockOSThread()
326-
defer runtime.UnlockOSThread()
327-
if !s.blocking {
328-
timeoutMs := C.int64_t(s.pollTimeout)
329-
fds := [1]C.SRT_EPOLL_EVENT{}
330-
len := C.int(1)
331-
res := C.srt_epoll_uwait(s.epollOut, &fds[0], len, timeoutMs)
332-
if res == 0 {
333-
return 0, &SrtEpollTimeout{}
334-
}
335-
if res == SRT_ERROR {
336-
return 0, fmt.Errorf("error in write:epoll %w", srtGetAndClearError())
337-
}
338-
if fds[0].events&C.SRT_EPOLL_ERR > 0 {
339-
return 0, &SrtSocketClosed{}
340-
}
341-
}
342-
343-
res := C.srt_sendmsg2(s.socket, (*C.char)(unsafe.Pointer(&b[0])), C.int(len(b)), nil)
344-
if res == SRT_ERROR {
345-
return 0, fmt.Errorf("error in write:srt_sendmsg2 %w", srtGetAndClearError())
346-
}
347-
348-
return int(res), nil
349-
}
350-
351295
// Stats - Retrieve stats from the SRT socket
352296
func (s SrtSocket) Stats() (*SrtStats, error) {
353297
runtime.LockOSThread()

write.go

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package srtgo
2+
3+
/*
4+
#cgo LDFLAGS: -lsrt
5+
#include <srt/srt.h>
6+
7+
int srt_sendmsg2_wrapped(SRTSOCKET u, const char* buf, int len, SRT_MSGCTRL *mctrl, int *srterror, int *syserror)
8+
{
9+
int ret = srt_sendmsg2(u, buf, len, mctrl);
10+
if (ret < 0) {
11+
*srterror = srt_getlasterror(syserror);
12+
}
13+
return ret;
14+
}
15+
16+
*/
17+
import "C"
18+
import (
19+
"errors"
20+
"fmt"
21+
"runtime"
22+
"syscall"
23+
"unsafe"
24+
)
25+
26+
func srtSendMsg2Impl(u C.SRTSOCKET, buf []byte, msgctrl *C.SRT_MSGCTRL) (n int, err error) {
27+
srterr := C.int(0)
28+
syserr := C.int(0)
29+
n = int(C.srt_sendmsg2_wrapped(u, (*C.char)(unsafe.Pointer(&buf[0])), C.int(len(buf)), msgctrl, &srterr, &syserr))
30+
if n < 0 {
31+
srterror := SRTErrno(srterr)
32+
if syserr < 0 {
33+
srterror.wrapSysErr(syscall.Errno(syserr))
34+
}
35+
err = srterror
36+
}
37+
return
38+
}
39+
40+
// Write data to the SRT socket
41+
func (s SrtSocket) Write(b []byte) (n int, err error) {
42+
43+
//Fastpath:
44+
n, err = srtSendMsg2Impl(s.socket, b, nil)
45+
46+
if err != nil {
47+
if errors.Is(err, error(EAsyncSND)) && !s.blocking {
48+
runtime.LockOSThread()
49+
defer runtime.UnlockOSThread()
50+
timeoutMs := C.int64_t(s.pollTimeout)
51+
fds := [1]C.SRT_EPOLL_EVENT{}
52+
len := C.int(1)
53+
res := C.srt_epoll_uwait(s.epollOut, &fds[0], len, timeoutMs)
54+
if res == 0 {
55+
return 0, &SrtEpollTimeout{}
56+
}
57+
if res == SRT_ERROR {
58+
return 0, fmt.Errorf("error in write:epoll %w", srtGetAndClearError())
59+
}
60+
if fds[0].events&C.SRT_EPOLL_ERR > 0 {
61+
return 0, &SrtSocketClosed{}
62+
}
63+
n, err = srtSendMsg2Impl(s.socket, b, nil)
64+
}
65+
}
66+
return
67+
}

0 commit comments

Comments
 (0)