diff --git a/README.md b/README.md
index e2e292264e..de714c8603 100755
--- a/README.md
+++ b/README.md
@@ -47,21 +47,23 @@ docker run -p 1935:1935 -p 1985:1985 -p 8080:8080 ossrs/srs:3
**From here,** strongly recommend to read bellow wikis:
* Usage: How to delivery RTMP?([CN][v1_CN_SampleRTMP], [EN][v1_EN_SampleRTMP])
-* Usage: How to delivery RTMP Edge Cluster?([CN][v3_CN_SampleRTMPCluster], [EN][v3_EN_SampleRTMPCluster])
-* Usage: How to create a RTMP Origin Cluster?([CN][v3_CN_SampleOriginCluster], [EN][v3_EN_SampleOriginCluster])
-* Usage: How to delivery HTTP FLV Live Streaming?([CN][v3_CN_SampleHttpFlv], [EN][v3_EN_SampleHttpFlv])
-* Usage: How to delivery HTTP FLV Live Streaming Cluster?([CN][v3_CN_SampleHttpFlvCluster], [EN][v3_EN_SampleHttpFlvCluster])
+* Usage: How to delivery RTMP-Edge Cluster?([CN][v3_CN_SampleRTMPCluster], [EN][v3_EN_SampleRTMPCluster])
+* Usage: How to create a RTMP-Origin Cluster?([CN][v3_CN_SampleOriginCluster], [EN][v3_EN_SampleOriginCluster])
+* Usage: How to delivery HTTP-FLV?([CN][v3_CN_SampleHttpFlv], [EN][v3_EN_SampleHttpFlv])
+* Usage: How to delivery HTTP-FLV Cluster?([CN][v3_CN_SampleHttpFlvCluster], [EN][v3_EN_SampleHttpFlvCluster])
* Usage: How to delivery HLS?([CN][v3_CN_SampleHLS], [EN][v3_EN_SampleHLS])
-* Usage: How to delivery HLS for other codec?([CN][v3_CN_SampleTranscode2HLS], [EN][v3_EN_SampleTranscode2HLS])
-* Usage: How to transode RTMP stream by FFMPEG?([CN][v2_CN_SampleFFMPEG], [EN][v2_EN_SampleFFMPEG])
+* Usage: How to transcode to h.264+aac for HLS?([CN][v3_CN_SampleTranscode2HLS], [EN][v3_EN_SampleTranscode2HLS])
+* Usage: How to transode stream by FFMPEG?([CN][v2_CN_SampleFFMPEG], [EN][v2_EN_SampleFFMPEG])
* Usage: How to forward stream to other servers?([CN][v3_CN_SampleForward], [EN][v3_EN_SampleForward])
-* Usage: How to deploy in low lantency mode?([CN][v3_CN_SampleRealtime], [EN][v3_EN_SampleRealtime])
-* Usage: How to ingest file/stream/device to RTMP?([CN][v1_CN_SampleIngest], [EN][v1_EN_SampleIngest])
+* Usage: How to enable low lantency live streaming?([CN][v3_CN_SampleRealtime], [EN][v3_EN_SampleRealtime])
+* Usage: How to ingest file/stream/device to SRS?([CN][v1_CN_SampleIngest], [EN][v1_EN_SampleIngest])
* Usage: How to delivery HLS by SRS HTTP server?([CN][v3_CN_SampleHTTP], [EN][v3_EN_SampleHTTP])
+* Usage: How to delivery DASH(Experimental)?([CN][v3_CN_SampleDASH], [EN][v3_EN_SampleDASH])
+* Usage: How to transmux SRT(Experimental) to live streaming?([CN][v4_CN_SampleSRT], [EN][v4_EN_SampleSRT])
* Usage: How to publish h.264 raw stream as RTMP? ([CN][v3_CN_SrsLibrtmp2], [EN][v3_EN_SrsLibrtmp2])
-* Usage: How to improve edge performance by multiple CPUs? ([CN][v3_CN_REUSEPORT], [EN][v3_EN_REUSEPORT])
-* Usage: Why choose SRS? About the milestone and product plan? ([CN][v1_CN_Product], [EN][v1_EN_Product])
-* Usage: How to file bug or chat with us? ([CN][v1_CN_Contact], [EN][v1_EN_Contact])
+* Usage: How to enable multiple processes? ([CN][v3_CN_REUSEPORT], [EN][v3_EN_REUSEPORT])
+* Usage: Why SRS? What's the milestones? ([CN][v1_CN_Product], [EN][v1_EN_Product])
+* Usage: Want to contact us? ([CN][v1_CN_Contact], [EN][v1_EN_Contact]) Or file an issue [here](https://github.com/ossrs/srs/issues/new)?
## Wiki
@@ -127,6 +129,7 @@ For previous versions, please read:
- [x] [Experimental] Support a simple [mgmt console][console], please read [srs-ngb][srs-ngb].
- [x] [Experimental] Support RTMP client library: srs-librtmp([CN][v3_CN_SrsLibrtmp], [EN][v3_EN_SrsLibrtmp])
- [x] [Experimental] Support HTTP RAW API, please read [#459][bug #459], [#470][bug #470], [#319][bug #319].
+- [x] [Experimental] Support SRT server, read [#1147][bug #1147].
- [x] [Deprecated] Support Adobe HDS(f4m), please read wiki([CN][v2_CN_DeliveryHDS], [EN][v2_EN_DeliveryHDS]) and [#1535][bug #1535].
- [x] [Deprecated] Support bandwidth testing([CN][v1_CN_BandwidthTestTool], [EN][v1_EN_BandwidthTestTool]), please read [#1535][bug #1535].
- [x] [Deprecated] Support Adobe FMS/AMS token traverse([CN][v3_CN_DRM2], [EN][v3_EN_DRM2]) authentication, please read [#1535][bug #1535].
@@ -1002,13 +1005,13 @@ SRS always use the simplest architecture to solve complex domain problems.
## Modularity Architecture
```
-+------------------------------------------------------+
-| SRS server | Programs in Main or Research |
-+------------------------------------------------------+
-| App(For SRS) | Modules(1) | research/librtmp |
-+------------------------------------------------------+
-| Service(C/S apps over ST) | Libs(Export librtmp) |
-+------------------------------------------------------+
++----------------+-------------------------------------+
+| SRS/SRT server | Programs in Main or Research |
++----------------+--+------------+---------------------+
+| App(For SRS) | Modules(1) | research/librtmp |
++-------------------+------------+---------------------+
+| Service(C/S apps over ST) | srs-librtmp |
++--------------------------------+---------------------+
| Protocol Stack(RTMP/HTTP/RTSP/JSON/AMF/Format) |
+------------------------------------------------------+
| Kernel(File, Codec, Stream, LB services) |
@@ -1027,31 +1030,33 @@ Remark:
+---------+ +----------+
| Publish | | Deliver |
+---|-----+ +----|-----+
-+----------------------+-------------------------+----------------+
-| Input | SRS(Simple RTMP Server) | Output |
-+----------------------+-------------------------+----------------+
-| | +-> DASH -------------+-> DASH player |
-| Encoder(1) | +-> RTMP/HDS --------+-> Flash player |
-| (FMLE,FFMPEG, -rtmp-+->-+-> HLS/HTTP ---------+-> M3U8 player |
-| Flash,XSPLIT, | +-> FLV/MP3/Aac/Ts ---+-> HTTP player |
-| ......) | +-> Fowarder ---------+-> RTMP server |
-| | +-> Transcoder -------+-> RTMP server |
-| | +-> EXEC(5) ----------+-> External app |
-| | +-> DVR --------------+-> FLV file |
-| | +-> BandwidthTest ----+-> Flash |
-+----------------------+ | |
-| MediaSource(2) | | |
-| (RTSP,FILE, | | |
-| HTTP,HLS, --pull-+->-- Ingester(3) -(rtmp)-+-> SRS |
-| Device, | | |
-| ......) | | |
-+----------------------+ | |
-| MediaSource(2) | | |
-| (RTSP,FILE, | | |
-| HTTP,HLS, --push-+->-- Streamer(4) -(rtmp)-+-> SRS |
-| Device, | | |
-| ......) | | |
-+----------------------+-------------------------+----------------+
++----------------------+----------------------------+----------------+
+| Input | SRS(Simple RTMP Server) | Output |
++----------------------+----------------------------+----------------+
+| | +-> DASH ----------------+-> DASH player |
+| Encoder(1) | +-> RTMP/HDS -----------+-> Flash player |
+| (FMLE,FFMPEG, -rtmp-+->-+-> HLS/HTTP ------------+-> M3U8 player |
+| Flash,XSPLIT, | +-> FLV/MP3/Aac/Ts ------+-> HTTP player |
+| ......) | +-> Fowarder ------------+-> RTMP server |
+| | +-> Transcoder ----------+-> RTMP server |
+| | +-> EXEC(5) -------------+-> External app |
+| | +-> DVR -----------------+-> FLV file |
+| | +-> BandwidthTest -------+-> Flash |
++----------------------+ | |
+| MediaSource(2) | | |
+| (RTSP,FILE, | | |
+| HTTP,HLS, --pull-+->-- Ingester(3) -(rtmp)----+-> SRS |
+| Device, | | |
+| ......) | | |
++----------------------+ | |
+| MediaSource(2) | | |
+| (RTSP,FILE, | | |
+| HTTP,HLS, --push-+->- StreamCaster(4) -(rtmp)-+-> SRS |
+| Device, | | |
+| ......) | | |
++----------------------+ | |
+| FFMPEG --push(srt)--+->- SRTModule(5) ---(rtmp)-+-> SRS |
++----------------------+----------------------------+----------------+
```
@@ -1062,6 +1067,7 @@ Remark:
1. Ingester: Forks a ffmpeg(or other tools) to ingest as rtmp to SRS, please read [Ingest][v1_CN_Ingest].
1. Streamer: Remuxs other protocols to RTMP, please read [Streamer][v2_CN_Streamer].
1. EXEC: Like NGINX-RTMP, EXEC forks external tools for events, please read [ng-exec][v3_CN_NgExec].
+1. SRTModule: A isolate module which run in [hybrid](https://github.com/ossrs/srs/issues/1147#issuecomment-577574883) model.
## AUTHORS
@@ -1611,10 +1617,9 @@ Winlin
[bug #1580]: https://github.com/ossrs/srs/issues/1580
[bug #1547]: https://github.com/ossrs/srs/issues/1547
[bug #1221]: https://github.com/ossrs/srs/issues/1221
-[bug #xxxxxxxxxxxxx]: https://github.com/ossrs/srs/issues/xxxxxxxxxxxxx
-
[bug #1111]: https://github.com/ossrs/srs/issues/1111
[bug #463]: https://github.com/ossrs/srs/issues/463
+[bug #1147]: https://github.com/ossrs/srs/issues/1147
[bug #xxxxxxxxxxxxx]: https://github.com/ossrs/srs/issues/xxxxxxxxxxxxx
[exo #828]: https://github.com/google/ExoPlayer/pull/828
diff --git a/trunk/auto/auto_headers.sh b/trunk/auto/auto_headers.sh
index 87720b8401..b4c04545cd 100755
--- a/trunk/auto/auto_headers.sh
+++ b/trunk/auto/auto_headers.sh
@@ -67,6 +67,12 @@ else
srs_undefine_macro "SRS_AUTO_HDS" $SRS_AUTO_HEADERS_H
fi
+if [ $SRS_SRT = YES ]; then
+ srs_define_macro "SRS_AUTO_SRT" $SRS_AUTO_HEADERS_H
+else
+ srs_undefine_macro "SRS_AUTO_SRT" $SRS_AUTO_HEADERS_H
+fi
+
if [ $SRS_MEM_WATCH = YES ]; then
srs_define_macro "SRS_AUTO_MEM_WATCH" $SRS_AUTO_HEADERS_H
else
diff --git a/trunk/auto/depends.sh b/trunk/auto/depends.sh
index bf183c0a61..3105f7fdf8 100755
--- a/trunk/auto/depends.sh
+++ b/trunk/auto/depends.sh
@@ -385,6 +385,22 @@ if [ $SRS_FFMPEG_TOOL = YES ]; then
if [ ! -f ${SRS_OBJS}/ffmpeg/bin/ffmpeg ]; then echo "build ffmpeg-4.1 failed."; exit -1; fi
fi
+#####################################################################################
+# SRT module, https://github.com/ossrs/srs/issues/1147#issuecomment-577469119
+#####################################################################################
+if [[ $SRS_SRT == YES ]]; then
+ if [[ -f /usr/local/lib64/libsrt.a && ! -f ${SRS_OBJS}/srt/lib/libsrt.a ]]; then
+ mkdir -p ${SRS_OBJS}/srt/lib && ln -sf /usr/local/lib64/libsrt.a ${SRS_OBJS}/srt/lib/libsrt.a
+ mkdir -p ${SRS_OBJS}/srt/include && ln -sf /usr/local/include/srt ${SRS_OBJS}/srt/include/
+ fi
+ if [[ -f ${SRS_OBJS}/srt/lib/libsrt.a ]]; then
+ echo "libsrt-1.4.1 is ok.";
+ else
+ echo "no libsrt, please use srs-docker or build from source https://github.com/ossrs/srs/issues/1147#issuecomment-577469119";
+ exit -1;
+ fi
+fi
+
#####################################################################################
# build research code, librtmp
#####################################################################################
diff --git a/trunk/auto/options.sh b/trunk/auto/options.sh
index 8e9fae3716..9c35149422 100755
--- a/trunk/auto/options.sh
+++ b/trunk/auto/options.sh
@@ -16,6 +16,7 @@ help=no
################################################################
# feature options
SRS_HDS=NO
+SRS_SRT=NO
SRS_NGINX=NO
SRS_FFMPEG_TOOL=NO
SRS_LIBRTMP=NO
@@ -125,6 +126,7 @@ Features:
--with-librtmp Enable srs-librtmp, library for client.
--with-research Build the research tools.
--with-utest Build the utest for SRS.
+ --with-srt Build the srt for SRS.
--without-ssl Disable rtmp complex handshake.
--without-hds Disable hds, the adobe http dynamic streaming.
@@ -133,6 +135,7 @@ Features:
--without-librtmp Disable srs-librtmp, library for client.
--without-research Do not build the research tools.
--without-utest Do not build the utest for SRS.
+ --without-srt Do not build the srt for SRS.
--prefix= The absolute installation path for srs. Default: $SRS_PREFIX
--static Whether add '-static' to link options.
@@ -211,6 +214,7 @@ function parse_user_option() {
--with-librtmp) SRS_LIBRTMP=YES ;;
--with-research) SRS_RESEARCH=YES ;;
--with-utest) SRS_UTEST=YES ;;
+ --with-srt) SRS_SRT=YES ;;
--with-gperf) SRS_GPERF=YES ;;
--with-gmc) SRS_GPERF_MC=YES ;;
--with-gmd) SRS_GPERF_MD=YES ;;
@@ -226,6 +230,7 @@ function parse_user_option() {
--without-librtmp) SRS_LIBRTMP=NO ;;
--without-research) SRS_RESEARCH=NO ;;
--without-utest) SRS_UTEST=NO ;;
+ --without-srt) SRS_SRT=NO ;;
--without-gperf) SRS_GPERF=NO ;;
--without-gmc) SRS_GPERF_MC=NO ;;
--without-gmd) SRS_GPERF_MD=NO ;;
@@ -522,6 +527,7 @@ SRS_AUTO_CONFIGURE="--prefix=${SRS_PREFIX}"
if [ $SRS_LIBRTMP = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-librtmp"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-librtmp"; fi
if [ $SRS_RESEARCH = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-research"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-research"; fi
if [ $SRS_UTEST = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-utest"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-utest"; fi
+ if [ $SRS_SRT = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-srt"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-srt"; fi
if [ $SRS_GPERF = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-gperf"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-gperf"; fi
if [ $SRS_GPERF_MC = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-gmc"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-gmc"; fi
if [ $SRS_GPERF_MD = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-gmd"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-gmd"; fi
diff --git a/trunk/auto/utest.sh b/trunk/auto/utest.sh
index 18b4978cd1..cb1a877c91 100755
--- a/trunk/auto/utest.sh
+++ b/trunk/auto/utest.sh
@@ -52,7 +52,7 @@ USER_DIR = .
CPPFLAGS += -I\$(GTEST_DIR)/include
# Flags passed to the C++ compiler.
-CXXFLAGS += -g -Wall -Wextra -O0 ${EXTRA_DEFINES}
+CXXFLAGS += ${CXXFLAGS} -Wextra ${EXTRA_DEFINES}
# All tests produced by this Makefile. Remember to add new tests you
# created to the list.
diff --git a/trunk/conf/full.conf b/trunk/conf/full.conf
index 1160877a43..b4f0f95c3d 100644
--- a/trunk/conf/full.conf
+++ b/trunk/conf/full.conf
@@ -247,6 +247,18 @@ stream_caster {
listen 8936;
}
+#############################################################################################
+# SRT server section
+#############################################################################################
+# @doc https://github.com/ossrs/srs/issues/1147#issuecomment-577607026
+srt_server {
+ # whether SRT server is enabled.
+ # default: off
+ enabled on;
+ # The UDP listen port for SRT.
+ listen 10080;
+}
+
#############################################################################################
# Kafka sections
#############################################################################################
diff --git a/trunk/conf/srt.conf b/trunk/conf/srt.conf
new file mode 100644
index 0000000000..d648b16b32
--- /dev/null
+++ b/trunk/conf/srt.conf
@@ -0,0 +1,32 @@
+# SRT config.
+
+listen 1935;
+max_connections 1000;
+srs_log_tank console;
+daemon off;
+
+http_api {
+ enabled on;
+ listen 1985;
+}
+http_server {
+ enabled on;
+ listen 8080;
+ dir ./objs/nginx/html;
+}
+
+srt_server {
+ enabled on;
+ listen 10080;
+}
+
+# @doc https://github.com/ossrs/srs/issues/1147#issuecomment-577607026
+vhost __defaultVhost__ {
+}
+vhost srs.srt.com.cn {
+}
+
+stats {
+ network 0;
+ disk sda sdb xvda xvdb;
+}
diff --git a/trunk/configure b/trunk/configure
index d09ebc09eb..ad446a7610 100755
--- a/trunk/configure
+++ b/trunk/configure
@@ -98,6 +98,9 @@ GDBDebug=" -g -O0"
WarnLevel=" -Wall"
# the compile standard.
CppStd="-ansi"
+if [[ $SRS_SRT == YES ]]; then
+ CppStd="-std=c++11"
+fi
# for library compile
if [[ $SRS_EXPORT_LIBRTMP_PROJECT == YES ]]; then
LibraryCompile=" -fPIC"
@@ -157,8 +160,15 @@ fi
if [ $SRS_GPERF_MD = YES ]; then
LibGperfFile="${SRS_OBJS_DIR}/gperf/lib/libtcmalloc_debug.a";
fi
+# srt code path
+if [[ $SRS_SRT == YES ]]; then
+ LibSRTRoot="${SRS_WORKDIR}/src/srt"; LibSRTfile="${SRS_OBJS_DIR}/srt/lib/libsrt.a"
+fi
# the link options, always use static link
SrsLinkOptions="-ldl";
+if [[ $SRS_SRT == YES ]]; then
+ SrsLinkOptions="${SrsLinkOptions} -pthread";
+fi
if [[ $SRS_SSL == YES && $SRS_USE_SYS_SSL == YES ]]; then
SrsLinkOptions="${SrsLinkOptions} -lssl -lcrypto";
fi
@@ -205,6 +215,17 @@ MODULE_FILES=("srs_protocol_amf0" "srs_protocol_io" "srs_rtmp_stack"
"srs_protocol_format")
PROTOCOL_INCS="src/protocol"; MODULE_DIR=${PROTOCOL_INCS} . auto/modules.sh
PROTOCOL_OBJS="${MODULE_OBJS[@]}"
+#
+#srt protocol features.
+if [ $SRS_SRT = YES ]; then
+ MODULE_ID="SRT"
+ MODULE_DEPENDS=("CORE" "KERNEL" "PROTOCOL" "SERVICE" "APP")
+ ModuleLibIncs=(${SRS_OBJS_DIR})
+ MODULE_FILES=("srt_server" "srt_handle" "srt_conn" "srt_to_rtmp" "ts_demux" "srt_data")
+ SRT_INCS=${LibSRTRoot}; MODULE_DIR=${LibSRTRoot} . auto/modules.sh
+ SRT_OBJS="${MODULE_OBJS[@]}"
+fi
+
#
#Service Module, for both Server and Client Modules.
if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
@@ -234,7 +255,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
"srs_app_mpegts_udp" "srs_app_rtsp" "srs_app_listener" "srs_app_async_call"
"srs_app_caster_flv" "srs_app_process" "srs_app_ng_exec"
"srs_app_hourglass" "srs_app_dash" "srs_app_fragment" "srs_app_dvr"
- "srs_app_coworkers")
+ "srs_app_coworkers" "srs_app_hybrid")
DEFINES=""
# add each modules for app
for SRS_MODULE in ${SRS_MODULES[*]}; do
@@ -258,7 +279,13 @@ LIBS_OBJS="${MODULE_OBJS[@]}"
if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
MODULE_ID="SERVER"
MODULE_DEPENDS=("CORE" "KERNEL" "PROTOCOL" "SERVICE" "APP")
+ if [[ $SRS_SRT == YES ]]; then
+ MODULE_DEPENDS+=("SRT")
+ fi
ModuleLibIncs=(${LibSTRoot} ${SRS_OBJS_DIR} ${LibGperfRoot} ${LibSSLRoot})
+ if [[ $SRS_SRT == YES ]]; then
+ ModuleLibIncs+=("${LibSRTRoot[*]}")
+ fi
MODULE_FILES=("srs_main_server")
SERVER_INCS="src/main"; MODULE_DIR=${SERVER_INCS} . auto/modules.sh
SERVER_OBJS="${MODULE_OBJS[@]}"
@@ -296,8 +323,15 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
#
# all depends libraries
ModuleLibFiles=(${LibSTfile} ${LibSSLfile} ${LibGperfFile})
+ if [[ $SRS_SRT == YES ]]; then
+ ModuleLibFiles+=("${LibSRTfile[*]}")
+ fi
# all depends objects
MODULE_OBJS="${CORE_OBJS[@]} ${KERNEL_OBJS[@]} ${PROTOCOL_OBJS[@]} ${SERVICE_OBJS[@]} ${APP_OBJS[@]} ${SERVER_OBJS[@]}"
+ ModuleLibIncs=(${LibSTRoot} ${SRS_OBJS_DIR} ${LibGperfRoot} ${LibSSLRoot})
+ if [[ $SRS_SRT == YES ]]; then
+ MODULE_OBJS="${MODULE_OBJS} ${SRT_OBJS[@]}"
+ fi
LINK_OPTIONS="${SrsLinkOptions}${SrsGprofLink}${SrsGperfLink}"
#
# srs: srs(simple rtmp server) over st(state-threads)
@@ -305,6 +339,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
#
# For modules, without the app module.
MODULE_OBJS="${CORE_OBJS[@]} ${KERNEL_OBJS[@]} ${PROTOCOL_OBJS[@]} ${SERVICE_OBJS[@]} ${MAIN_OBJS[@]}"
+ ModuleLibFiles=(${LibSTfile} ${LibSSLfile} ${LibGperfFile})
#
for SRS_MODULE in ${SRS_MODULES[*]}; do
. $SRS_MODULE/config
@@ -325,9 +360,18 @@ if [ $SRS_UTEST = YES ]; then
"srs_utest_config" "srs_utest_rtmp" "srs_utest_http" "srs_utest_avc" "srs_utest_reload"
"srs_utest_mp4" "srs_utest_service" "srs_utest_app")
ModuleLibIncs=(${SRS_OBJS_DIR} ${LibSTRoot} ${LibSSLRoot})
+ if [[ $SRS_SRT == YES ]]; then
+ ModuleLibIncs+=("${LibSRTRoot[*]}")
+ fi
ModuleLibFiles=(${LibSTfile} ${LibSSLfile})
+ if [[ $SRS_SRT == YES ]]; then
+ ModuleLibFiles+=("${LibSRTfile[*]}")
+ fi
MODULE_DEPENDS=("CORE" "KERNEL" "PROTOCOL" "SERVICE" "APP")
- MODULE_OBJS="${CORE_OBJS[@]} ${KERNEL_OBJS[@]} ${PROTOCOL_OBJS[@]} ${SERVICE_OBJS[@]} ${APP_OBJS[@]}"
+ if [[ $SRS_SRT == YES ]]; then
+ MODULE_DEPENDS+=("SRT")
+ fi
+ MODULE_OBJS="${CORE_OBJS[@]} ${KERNEL_OBJS[@]} ${PROTOCOL_OBJS[@]} ${SERVICE_OBJS[@]} ${APP_OBJS[@]} ${SRT_OBJS[@]}"
LINK_OPTIONS="-lpthread ${SrsLinkOptions}" MODULE_DIR="src/utest" APP_NAME="srs_utest" . auto/utest.sh
fi
@@ -584,6 +628,11 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then
else
echo -e "${GREEN}Warning: HDS is disabled.${BLACK}"
fi
+ if [ $SRS_SRT = YES ]; then
+ echo -e "${YELLOW}Experiment: SRT is enabled. https://github.com/ossrs/srs/issues/1147${BLACK}"
+ else
+ echo -e "${GREEN}Warning: SRT is disabled.${BLACK}"
+ fi
if [ $SRS_DVR = YES ]; then
echo -e "${GREEN}DVR is enabled.${BLACK}"
else
diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp
index 20594b917d..0fa544fab7 100644
--- a/trunk/src/app/srs_app_config.cpp
+++ b/trunk/src/app/srs_app_config.cpp
@@ -479,7 +479,7 @@ srs_error_t srs_config_transform_vhost(SrsConfDirective* root)
++it;
continue;
}
-
+
// SRS3.0, change the folowing like a shadow:
// mode, origin, token_traverse, vhost, debug_srs_upnode
// SRS1/2:
@@ -3470,7 +3470,7 @@ srs_error_t SrsConfig::check_normal_config()
&& n != "srs_log_tank" && n != "srs_log_level" && n != "srs_log_file"
&& n != "max_connections" && n != "daemon" && n != "heartbeat"
&& n != "http_api" && n != "stats" && n != "vhost" && n != "pithy_print_ms"
- && n != "http_server" && n != "stream_caster"
+ && n != "http_server" && n != "stream_caster" && n != "srt_server"
&& n != "utc_time" && n != "work_dir" && n != "asprocess"
) {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal directive %s", n.c_str());
@@ -3504,6 +3504,15 @@ srs_error_t SrsConfig::check_normal_config()
}
}
}
+ if (true) {
+ SrsConfDirective* conf = root->get("srt_server");
+ for (int i = 0; conf && i < (int)conf->directives.size(); i++) {
+ string n = conf->at(i)->name;
+ if (n != "enabled" && n != "listen") {
+ return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal srt_stream.%s", n.c_str());
+ }
+ }
+ }
if (true) {
SrsConfDirective* conf = get_heartbeart();
for (int i = 0; conf && i < (int)conf->directives.size(); i++) {
@@ -3631,6 +3640,7 @@ srs_error_t SrsConfig::check_normal_config()
get_vhosts(vhosts);
for (int n = 0; n < (int)vhosts.size(); n++) {
SrsConfDirective* vhost = vhosts[n];
+
for (int i = 0; vhost && i < (int)vhost->directives.size(); i++) {
SrsConfDirective* conf = vhost->at(i);
string n = conf->name;
@@ -6629,6 +6639,39 @@ bool SrsConfig::get_raw_api_allow_update()
return SRS_CONF_PERFER_FALSE(conf->arg0());
}
+
+bool SrsConfig::get_srt_enabled()
+{
+ static bool DEFAULT = false;
+
+ SrsConfDirective* conf = root->get("srt_server");
+ if (!conf) {
+ return DEFAULT;
+ }
+
+ conf = conf->get("enabled");
+ if (!conf || conf->arg0().empty()) {
+ return DEFAULT;
+ }
+
+ return SRS_CONF_PERFER_FALSE(conf->arg0());
+}
+
+unsigned short SrsConfig::get_srt_listen_port()
+{
+ static unsigned short DEFAULT = 10080;
+ SrsConfDirective* conf = root->get("srt_server");
+ if (!conf) {
+ return DEFAULT;
+ }
+
+ conf = conf->get("listen");
+ if (!conf || conf->arg0().empty()) {
+ return DEFAULT;
+ }
+ return (unsigned short)atoi(conf->arg0().c_str());
+}
+
bool SrsConfig::get_http_stream_enabled()
{
SrsConfDirective* conf = root->get("http_server");
diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp
index c9c03e24f5..cc936affcb 100644
--- a/trunk/src/app/srs_app_config.hpp
+++ b/trunk/src/app/srs_app_config.hpp
@@ -588,6 +588,13 @@ class SrsConfig
virtual bool get_forward_enabled(std::string vhost);
// Get the forward directive of vhost.
virtual SrsConfDirective* get_forwards(std::string vhost);
+
+public:
+ // Whether the srt sevice enabled
+ virtual bool get_srt_enabled();
+ // Get the srt service listen port
+ virtual unsigned short get_srt_listen_port();
+
// http_hooks section
private:
// Get the http_hooks directive of vhost.
diff --git a/trunk/src/app/srs_app_hybrid.cpp b/trunk/src/app/srs_app_hybrid.cpp
new file mode 100644
index 0000000000..cd8147327b
--- /dev/null
+++ b/trunk/src/app/srs_app_hybrid.cpp
@@ -0,0 +1,185 @@
+/**
+ * The MIT License (MIT)
+ *
+ * Copyright (c) 2013-2020 Winlin
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+ * the Software, and to permit persons to whom the Software is furnished to do so,
+ * subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+#include
+
+#include
+#include
+#include
+#include
+
+using namespace std;
+
+ISrsHybridServer::ISrsHybridServer()
+{
+}
+
+ISrsHybridServer::~ISrsHybridServer()
+{
+}
+
+SrsServerAdapter::SrsServerAdapter()
+{
+ srs = new SrsServer();
+}
+
+SrsServerAdapter::~SrsServerAdapter()
+{
+ srs_freep(srs);
+}
+
+srs_error_t SrsServerAdapter::initialize()
+{
+ srs_error_t err = srs_success;
+ return err;
+}
+
+srs_error_t SrsServerAdapter::run()
+{
+ srs_error_t err = srs_success;
+
+ // Initialize the whole system, set hooks to handle server level events.
+ if ((err = srs->initialize(NULL)) != srs_success) {
+ return srs_error_wrap(err, "server initialize");
+ }
+
+ if ((err = srs->initialize_st()) != srs_success) {
+ return srs_error_wrap(err, "initialize st");
+ }
+
+ if ((err = srs->acquire_pid_file()) != srs_success) {
+ return srs_error_wrap(err, "acquire pid file");
+ }
+
+ if ((err = srs->initialize_signal()) != srs_success) {
+ return srs_error_wrap(err, "initialize signal");
+ }
+
+ if ((err = srs->listen()) != srs_success) {
+ return srs_error_wrap(err, "listen");
+ }
+
+ if ((err = srs->register_signal()) != srs_success) {
+ return srs_error_wrap(err, "register signal");
+ }
+
+ if ((err = srs->http_handle()) != srs_success) {
+ return srs_error_wrap(err, "http handle");
+ }
+
+ if ((err = srs->ingest()) != srs_success) {
+ return srs_error_wrap(err, "ingest");
+ }
+
+ if ((err = srs->cycle()) != srs_success) {
+ return srs_error_wrap(err, "main cycle");
+ }
+
+ return err;
+}
+
+void SrsServerAdapter::stop()
+{
+}
+
+SrsHybridServer::SrsHybridServer()
+{
+}
+
+SrsHybridServer::~SrsHybridServer()
+{
+ vector::iterator it;
+ for (it = servers.begin(); it != servers.end(); ++it) {
+ ISrsHybridServer* server = *it;
+ srs_freep(server);
+ }
+ servers.clear();
+}
+
+void SrsHybridServer::register_server(ISrsHybridServer* svr)
+{
+ servers.push_back(svr);
+}
+
+srs_error_t SrsHybridServer::initialize()
+{
+ srs_error_t err = srs_success;
+
+ // init st
+ if ((err = srs_st_init()) != srs_success) {
+ return srs_error_wrap(err, "initialize st failed");
+ }
+
+ vector::iterator it;
+ for (it = servers.begin(); it != servers.end(); ++it) {
+ ISrsHybridServer* server = *it;
+
+ if ((err = server->initialize()) != srs_success) {
+ return srs_error_wrap(err, "init server");
+ }
+ }
+
+ return err;
+}
+
+srs_error_t SrsHybridServer::run()
+{
+ srs_error_t err = srs_success;
+
+ // Run master server in this main thread.
+ SrsServerAdapter* master_server = NULL;
+
+ vector::iterator it;
+ for (it = servers.begin(); it != servers.end(); ++it) {
+ ISrsHybridServer* server = *it;
+
+ if (!master_server) {
+ master_server = dynamic_cast(server);
+ if (master_server) {
+ continue;
+ }
+ }
+
+ if ((err = server->run()) != srs_success) {
+ return srs_error_wrap(err, "run server");
+ }
+ }
+
+ if (master_server) {
+ return master_server->run();
+ }
+
+ return err;
+}
+
+void SrsHybridServer::stop()
+{
+ vector::iterator it;
+ for (it = servers.begin(); it != servers.end(); ++it) {
+ ISrsHybridServer* server = *it;
+ server->stop();
+ }
+}
+
+SrsHybridServer* _srs_hybrid = new SrsHybridServer();
+
diff --git a/trunk/src/app/srs_app_hybrid.hpp b/trunk/src/app/srs_app_hybrid.hpp
new file mode 100644
index 0000000000..3834566608
--- /dev/null
+++ b/trunk/src/app/srs_app_hybrid.hpp
@@ -0,0 +1,80 @@
+/**
+ * The MIT License (MIT)
+ *
+ * Copyright (c) 2013-2020 Winlin
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+ * the Software, and to permit persons to whom the Software is furnished to do so,
+ * subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+#ifndef SRS_APP_HYBRID_HPP
+#define SRS_APP_HYBRID_HPP
+
+#include
+
+#include
+
+class SrsServer;
+
+// The hibrid server interfaces, we could register many servers.
+class ISrsHybridServer
+{
+public:
+ ISrsHybridServer();
+ virtual ~ISrsHybridServer();
+public:
+ // Only ST initialized before each server, we could fork processes as such.
+ virtual srs_error_t initialize() = 0;
+ // Run each server, should never block except the SRS master server.
+ virtual srs_error_t run() = 0;
+ // Stop each server, should do cleanup, for example, kill processes forked by server.
+ virtual void stop() = 0;
+};
+
+// The SRS server adapter, the master server.
+class SrsServerAdapter : public ISrsHybridServer
+{
+private:
+ SrsServer* srs;
+public:
+ SrsServerAdapter();
+ virtual ~SrsServerAdapter();
+public:
+ virtual srs_error_t initialize();
+ virtual srs_error_t run();
+ virtual void stop();
+};
+
+// The hybrid server manager.
+class SrsHybridServer
+{
+private:
+ std::vector servers;
+public:
+ SrsHybridServer();
+ virtual ~SrsHybridServer();
+public:
+ virtual void register_server(ISrsHybridServer* svr);
+public:
+ virtual srs_error_t initialize();
+ virtual srs_error_t run();
+ virtual void stop();
+};
+
+extern SrsHybridServer* _srs_hybrid;
+
+#endif
diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp
index 2a3039f1a6..9a51ded50e 100644
--- a/trunk/src/app/srs_app_server.cpp
+++ b/trunk/src/app/srs_app_server.cpp
@@ -565,11 +565,6 @@ srs_error_t SrsServer::initialize_st()
{
srs_error_t err = srs_success;
- // init st
- if ((err = srs_st_init()) != srs_success) {
- return srs_error_wrap(err, "initialize st failed");
- }
-
// @remark, st alloc segment use mmap, which only support 32757 threads,
// if need to support more, for instance, 100k threads, define the macro MALLOC_STACK.
// TODO: FIXME: maybe can use "sysctl vm.max_map_count" to refine.
@@ -688,7 +683,7 @@ srs_error_t SrsServer::listen()
if ((err = conn_manager->start()) != srs_success) {
return srs_error_wrap(err, "connection manager");
}
-
+
return err;
}
diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp
index 25cfbaf300..c787378027 100644
--- a/trunk/src/main/srs_main_server.cpp
+++ b/trunk/src/main/srs_main_server.cpp
@@ -48,10 +48,15 @@ using namespace std;
#include
#include
#include
+#include
+
+#ifdef SRS_AUTO_SRT
+#include
+#endif
// pre-declare
-srs_error_t run(SrsServer* svr);
-srs_error_t run_master(SrsServer* svr);
+srs_error_t run_directly_or_daemon();
+srs_error_t run_hybrid_server();
void show_macro_features();
string srs_getenv(const char* name);
@@ -177,10 +182,7 @@ srs_error_t do_main(int argc, char** argv)
// features
show_macro_features();
- SrsServer* svr = new SrsServer();
- SrsAutoFree(SrsServer, svr);
-
- if ((err = run(svr)) != srs_success) {
+ if ((err = run_directly_or_daemon()) != srs_success) {
return srs_error_wrap(err, "run");
}
@@ -214,6 +216,7 @@ void show_macro_features()
ss << ", dash:" << "on";
ss << ", hls:" << srs_bool2switch(true);
ss << ", hds:" << srs_bool2switch(SRS_AUTO_HDS_BOOL);
+ ss << ", srt:" << srs_bool2switch(SRS_AUTO_SRT_BOOL);
// hc(http callback)
ss << ", hc:" << srs_bool2switch(true);
// ha(http api)
@@ -350,18 +353,13 @@ string srs_getenv(const char* name)
return "";
}
-srs_error_t run(SrsServer* svr)
+srs_error_t run_directly_or_daemon()
{
srs_error_t err = srs_success;
-
- // Initialize the whole system, set hooks to handle server level events.
- if ((err = svr->initialize(NULL)) != srs_success) {
- return srs_error_wrap(err, "server initialize");
- }
// If not daemon, directly run master.
if (!_srs_config->get_daemon()) {
- if ((err = run_master(svr)) != srs_success) {
+ if ((err = run_hybrid_server()) != srs_success) {
return srs_error_wrap(err, "run master");
}
return srs_success;
@@ -398,49 +396,35 @@ srs_error_t run(SrsServer* svr)
// son
srs_trace("son(daemon) process running.");
- if ((err = run_master(svr)) != srs_success) {
+ if ((err = run_hybrid_server()) != srs_success) {
return srs_error_wrap(err, "daemon run master");
}
return err;
}
-srs_error_t run_master(SrsServer* svr)
+srs_error_t run_hybrid_server()
{
srs_error_t err = srs_success;
-
- if ((err = svr->initialize_st()) != srs_success) {
- return srs_error_wrap(err, "initialize st");
- }
-
- if ((err = svr->initialize_signal()) != srs_success) {
- return srs_error_wrap(err, "initialize signal");
- }
-
- if ((err = svr->acquire_pid_file()) != srs_success) {
- return srs_error_wrap(err, "acquire pid file");
- }
-
- if ((err = svr->listen()) != srs_success) {
- return srs_error_wrap(err, "listen");
- }
-
- if ((err = svr->register_signal()) != srs_success) {
- return srs_error_wrap(err, "register signal");
- }
-
- if ((err = svr->http_handle()) != srs_success) {
- return srs_error_wrap(err, "http handle");
- }
-
- if ((err = svr->ingest()) != srs_success) {
- return srs_error_wrap(err, "ingest");
+
+ _srs_hybrid->register_server(new SrsServerAdapter());
+#ifdef SRS_AUTO_SRT
+ _srs_hybrid->register_server(new SrtServerAdapter());
+#endif
+
+ // Do some system initialize.
+ if ((err = _srs_hybrid->initialize()) != srs_success) {
+ return srs_error_wrap(err, "hybrid initialize");
}
-
- if ((err = svr->cycle()) != srs_success) {
- return srs_error_wrap(err, "main cycle");
+
+ // Should run util hybrid servers all done.
+ if ((err = _srs_hybrid->run()) != srs_success) {
+ return srs_error_wrap(err, "hybrid run");
}
-
+
+ // After all done, stop and cleanup.
+ _srs_hybrid->stop();
+
return err;
}
diff --git a/trunk/src/srt/srt_conn.cpp b/trunk/src/srt/srt_conn.cpp
new file mode 100644
index 0000000000..2df80b011f
--- /dev/null
+++ b/trunk/src/srt/srt_conn.cpp
@@ -0,0 +1,169 @@
+#include "srt_conn.hpp"
+#include "time_help.h"
+#include "stringex.hpp"
+#include
+
+bool is_streamid_valid(const std::string& streamid) {
+ int mode = ERR_SRT_MODE;
+ std::string url_subpash;
+
+ bool ret = get_streamid_info(streamid, mode, url_subpash);
+ if (!ret) {
+ return ret;
+ }
+
+ if ((mode != PULL_SRT_MODE) && (mode != PUSH_SRT_MODE)) {
+ return false;
+ }
+
+ if (url_subpash.empty()) {
+ return false;
+ }
+
+ std::vector info_vec;
+ string_split(url_subpash, "/", info_vec);
+ if (info_vec.size() < 2) {
+ return false;
+ }
+
+ return true;
+}
+
+bool get_key_value(const std::string& info, std::string& key, std::string& value) {
+ size_t pos = info.find("=");
+
+ if (pos == info.npos) {
+ return false;
+ }
+
+ key = info.substr(0, pos);
+ value = info.substr(pos+1);
+
+ if (key.empty() || value.empty()) {
+ return false;
+ }
+ return true;
+}
+
+//eg. streamid=#!::h:live/livestream,m:publish
+bool get_streamid_info(const std::string& streamid, int& mode, std::string& url_subpash) {
+ std::vector info_vec;
+ std::string real_streamid;
+
+ size_t pos = streamid.find("#!::h");
+ if (pos != 0) {
+ return false;
+ }
+ real_streamid = streamid.substr(4);
+
+ string_split(real_streamid, ",", info_vec);
+ if (info_vec.size() < 2) {
+ return false;
+ }
+
+ for (int index = 0; index < info_vec.size(); index++) {
+ std::string key;
+ std::string value;
+
+ bool ret = get_key_value(info_vec[index], key, value);
+ if (!ret) {
+ continue;
+ }
+
+ if (key == "h") {
+ url_subpash = value;//eg. h=live/stream
+ } else if (key == "m") {
+ std::string mode_str = string_lower(value);//m=publish or m=request
+ if (mode_str == "publish") {
+ mode = PUSH_SRT_MODE;
+ } else if (mode_str == "request") {
+ mode = PULL_SRT_MODE;
+ } else {
+ mode = ERR_SRT_MODE;
+ return false;
+ }
+ } else {//not suport
+ continue;
+ }
+ }
+
+ return true;
+}
+
+srt_conn::srt_conn(SRTSOCKET conn_fd, const std::string& streamid):_conn_fd(conn_fd),
+ _streamid(streamid) {
+ get_streamid_info(streamid, _mode, _url_subpath);
+ _update_timestamp = now_ms();
+
+ std::vector path_vec;
+
+ string_split(_url_subpath, "/", path_vec);
+ if (path_vec.size() >= 3) {
+ _vhost = path_vec[0];
+ } else {
+ _vhost = "__default_host__";
+ }
+ srs_trace("srt connect construct streamid:%s, mode:%d, subpath:%s, vhost:%s",
+ streamid.c_str(), _mode, _url_subpath.c_str(), _vhost.c_str());
+}
+
+srt_conn::~srt_conn() {
+ close();
+}
+
+std::string srt_conn::get_vhost() {
+ return _vhost;
+}
+
+void srt_conn::update_timestamp(long long now_ts) {
+ _update_timestamp = now_ts;
+}
+
+long long srt_conn::get_last_ts() {
+ return _update_timestamp;
+}
+
+void srt_conn::close() {
+ if (_conn_fd == SRT_INVALID_SOCK) {
+ return;
+ }
+ srt_close(_conn_fd);
+ _conn_fd = SRT_INVALID_SOCK;
+}
+
+SRTSOCKET srt_conn::get_conn() {
+ return _conn_fd;
+}
+int srt_conn::get_mode() {
+ return _mode;
+}
+
+std::string srt_conn::get_streamid() {
+ return _streamid;
+}
+
+std::string srt_conn::get_subpath() {
+ return _url_subpath;
+}
+
+int srt_conn::read(unsigned char* data, int len) {
+ int ret = 0;
+
+ ret = srt_recv(_conn_fd, (char*)data, len);
+ if (ret <= 0) {
+ srs_error("srt read error:%d, socket fd:%d", ret, _conn_fd);
+ return ret;
+ }
+ return ret;
+}
+
+int srt_conn::write(unsigned char* data, int len) {
+ int ret = 0;
+
+ ret = srt_send(_conn_fd, (char*)data, len);
+ if (ret <= 0) {
+ srs_error("srt write error:%d, socket fd:%d", ret, _conn_fd);
+ return ret;
+ }
+ return ret;
+}
\ No newline at end of file
diff --git a/trunk/src/srt/srt_conn.hpp b/trunk/src/srt/srt_conn.hpp
new file mode 100644
index 0000000000..adc79eaf47
--- /dev/null
+++ b/trunk/src/srt/srt_conn.hpp
@@ -0,0 +1,50 @@
+#ifndef SRT_CONN_H
+#define SRT_CONN_H
+#include "stringex.hpp"
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#define ERR_SRT_MODE 0x00
+#define PULL_SRT_MODE 0x01
+#define PUSH_SRT_MODE 0x02
+
+bool is_streamid_valid(const std::string& streamid);
+bool get_key_value(const std::string& info, std::string& key, std::string& value);
+bool get_streamid_info(const std::string& streamid, int& mode, std::string& url_subpash);
+
+class srt_conn {
+public:
+ srt_conn(SRTSOCKET conn_fd, const std::string& streamid);
+ ~srt_conn();
+
+ void close();
+ SRTSOCKET get_conn();
+ int get_mode();
+ std::string get_streamid();
+ std::string get_subpath();
+ std::string get_vhost();
+ int read(unsigned char* data, int len);
+ int write(unsigned char* data, int len);
+
+ void update_timestamp(long long now_ts);
+ long long get_last_ts();
+
+private:
+ SRTSOCKET _conn_fd;
+ std::string _streamid;
+ std::string _url_subpath;
+ std::string _vhost;
+ int _mode;
+ long long _update_timestamp;
+};
+
+typedef std::shared_ptr SRT_CONN_PTR;
+
+#endif //SRT_CONN_H
\ No newline at end of file
diff --git a/trunk/src/srt/srt_data.cpp b/trunk/src/srt/srt_data.cpp
new file mode 100644
index 0000000000..075064b142
--- /dev/null
+++ b/trunk/src/srt/srt_data.cpp
@@ -0,0 +1,31 @@
+#include "srt_data.hpp"
+#include
+
+SRT_DATA_MSG::SRT_DATA_MSG(unsigned int len, const std::string& path):_len(len)
+ ,_key_path(path) {
+ _data_p = new unsigned char[len];
+ memset(_data_p, 0, len);
+}
+
+SRT_DATA_MSG::SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path):_len(len)
+ ,_key_path(path)
+{
+ _data_p = new unsigned char[len];
+ memcpy(_data_p, data_p, len);
+}
+
+SRT_DATA_MSG::~SRT_DATA_MSG() {
+ delete _data_p;
+}
+
+std::string SRT_DATA_MSG::get_path() {
+ return _key_path;
+}
+
+unsigned int SRT_DATA_MSG::data_len() {
+ return _len;
+}
+
+unsigned char* SRT_DATA_MSG::get_data() {
+ return _data_p;
+}
diff --git a/trunk/src/srt/srt_data.hpp b/trunk/src/srt/srt_data.hpp
new file mode 100644
index 0000000000..ab9cf81ea5
--- /dev/null
+++ b/trunk/src/srt/srt_data.hpp
@@ -0,0 +1,24 @@
+#ifndef SRT_DATA_H
+#define SRT_DATA_H
+#include
+#include
+
+class SRT_DATA_MSG {
+public:
+ SRT_DATA_MSG(unsigned int len, const std::string& path);
+ SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path);
+ ~SRT_DATA_MSG();
+
+ unsigned int data_len();
+ unsigned char* get_data();
+ std::string get_path();
+
+private:
+ unsigned int _len;
+ unsigned char* _data_p;
+ std::string _key_path;
+};
+
+typedef std::shared_ptr SRT_DATA_MSG_PTR;
+
+#endif
\ No newline at end of file
diff --git a/trunk/src/srt/srt_handle.cpp b/trunk/src/srt/srt_handle.cpp
new file mode 100644
index 0000000000..ea25b2c831
--- /dev/null
+++ b/trunk/src/srt/srt_handle.cpp
@@ -0,0 +1,473 @@
+
+#include "srt_handle.hpp"
+#include "time_help.h"
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+#include
+
+static bool MONITOR_STATICS_ENABLE = false;
+static long long MONITOR_TIMEOUT = 5000;
+const unsigned int DEF_DATA_SIZE = 188*7;
+const long long CHECK_ALIVE_INTERVAL = 10*1000;
+const long long CHECK_ALIVE_TIMEOUT = 15*1000;
+
+long long srt_now_ms = 0;
+
+srt_handle::srt_handle():_run_flag(false)
+ ,_last_timestamp(0)
+ ,_last_check_alive_ts(0) {
+}
+
+srt_handle::~srt_handle() {
+
+}
+
+int srt_handle::start() {
+ _handle_pollid = srt_epoll_create();
+ if (_handle_pollid < -1) {
+ srs_error("srt handle srt_epoll_create error, _handle_pollid=%d", _handle_pollid);
+ return -1;
+ }
+
+ _run_flag = true;
+ srs_trace("srt handle is starting...");
+ _work_thread_ptr = std::make_shared(&srt_handle::onwork, this);
+
+ return 0;
+}
+
+void srt_handle::stop() {
+ _run_flag = false;
+ _work_thread_ptr->join();
+ return;
+}
+
+void srt_handle::debug_statics(SRTSOCKET srtsocket, const std::string& streamid) {
+ SRT_TRACEBSTATS mon;
+ srt_bstats(srtsocket, &mon, 1);
+ std::ostringstream output;
+ long long now_ul = now_ms();
+
+ if (!MONITOR_STATICS_ENABLE) {
+ return;
+ }
+ if (_last_timestamp == 0) {
+ _last_timestamp = now_ul;
+ return;
+ }
+
+ if ((now_ul - _last_timestamp) < MONITOR_TIMEOUT) {
+ return;
+ }
+ _last_timestamp = now_ul;
+ output << "======= SRT STATS: sid=" << streamid << std::endl;
+ output << "PACKETS SENT: " << std::setw(11) << mon.pktSent << " RECEIVED: " << std::setw(11) << mon.pktRecv << std::endl;
+ output << "LOST PKT SENT: " << std::setw(11) << mon.pktSndLoss << " RECEIVED: " << std::setw(11) << mon.pktRcvLoss << std::endl;
+ output << "REXMIT SENT: " << std::setw(11) << mon.pktRetrans << " RECEIVED: " << std::setw(11) << mon.pktRcvRetrans << std::endl;
+ output << "DROP PKT SENT: " << std::setw(11) << mon.pktSndDrop << " RECEIVED: " << std::setw(11) << mon.pktRcvDrop << std::endl;
+ output << "RATE SENDING: " << std::setw(11) << mon.mbpsSendRate << " RECEIVING: " << std::setw(11) << mon.mbpsRecvRate << std::endl;
+ output << "BELATED RECEIVED: " << std::setw(11) << mon.pktRcvBelated << " AVG TIME: " << std::setw(11) << mon.pktRcvAvgBelatedTime << std::endl;
+ output << "REORDER DISTANCE: " << std::setw(11) << mon.pktReorderDistance << std::endl;
+ output << "WINDOW FLOW: " << std::setw(11) << mon.pktFlowWindow << " CONGESTION: " << std::setw(11) << mon.pktCongestionWindow << " FLIGHT: " << std::setw(11) << mon.pktFlightSize << std::endl;
+ output << "LINK RTT: " << std::setw(9) << mon.msRTT << "ms BANDWIDTH: " << std::setw(7) << mon.mbpsBandwidth << "Mb/s " << std::endl;
+ output << "BUFFERLEFT: SND: " << std::setw(11) << mon.byteAvailSndBuf << " RCV: " << std::setw(11) << mon.byteAvailRcvBuf << std::endl;
+
+ srs_trace("\r\n%s", output.str().c_str());
+ return;
+}
+
+void srt_handle::add_new_puller(SRT_CONN_PTR conn_ptr, std::string stream_id) {
+ _conn_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr));
+
+ auto iter = _streamid_map.find(stream_id);
+ if (iter == _streamid_map.end()) {
+ std::unordered_map srtsocket_map;
+ srtsocket_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr));
+
+ _streamid_map.insert(std::make_pair(stream_id, srtsocket_map));
+ srs_trace("add new puller fd:%d, streamid:%s", conn_ptr->get_conn(), stream_id.c_str());
+ } else {
+ iter->second.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr));
+ srs_trace("add new puller fd:%d, streamid:%s, size:%d",
+ conn_ptr->get_conn(), stream_id.c_str(), iter->second.size());
+ }
+
+ return;
+}
+
+void srt_handle::close_pull_conn(SRTSOCKET srtsocket, std::string stream_id) {
+ srs_warn("close_pull_conn read_fd=%d, streamid=%s", srtsocket, stream_id.c_str());
+ srt_epoll_remove_usock(_handle_pollid, srtsocket);
+
+ auto streamid_iter = _streamid_map.find(stream_id);
+ if (streamid_iter != _streamid_map.end()) {
+ auto srtsocket_map = streamid_iter->second;
+ if (srtsocket_map.size() == 0) {
+ _streamid_map.erase(stream_id);
+ } else if (srtsocket_map.size() == 1) {
+ srtsocket_map.erase(srtsocket);
+ _streamid_map.erase(stream_id);
+ } else {
+ srtsocket_map.erase(srtsocket);
+ }
+ } else {
+ assert(0);
+ }
+
+ auto conn_iter = _conn_map.find(srtsocket);
+ if (conn_iter != _conn_map.end()) {
+ _conn_map.erase(conn_iter);
+ return;
+ } else {
+ assert(0);
+ }
+
+ return;
+}
+
+SRT_CONN_PTR srt_handle::get_srt_conn(SRTSOCKET conn_srt_socket) {
+ SRT_CONN_PTR ret_conn;
+
+ auto iter = _conn_map.find(conn_srt_socket);
+ if (iter == _conn_map.end()) {
+ return ret_conn;
+ }
+
+ ret_conn = iter->second;
+
+ return ret_conn;
+}
+
+void srt_handle::add_newconn(SRT_CONN_PTR conn_ptr, int events) {
+ int val_i;
+ int opt_len = sizeof(int);
+
+ val_i = 1000;
+ srt_setsockopt(conn_ptr->get_conn(), 0, SRTO_LATENCY, &val_i, opt_len);
+ val_i = 2048;
+ srt_setsockopt(conn_ptr->get_conn(), 0, SRTO_MAXBW, &val_i, opt_len);
+
+ srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_LATENCY, &val_i, &opt_len);
+ srs_trace("srto SRTO_LATENCY=%d", val_i);
+ srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_SNDBUF, &val_i, &opt_len);
+ srs_trace("srto SRTO_SNDBUF=%d", val_i);
+ srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_RCVBUF, &val_i, &opt_len);
+ srs_trace("srto SRTO_RCVBUF=%d", val_i);
+ srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_MAXBW, &val_i, &opt_len);
+ srs_trace("srto SRTO_MAXBW=%d", val_i);
+
+ if (conn_ptr->get_mode() == PULL_SRT_MODE) {
+ add_new_puller(conn_ptr, conn_ptr->get_subpath());
+ } else {
+ if(add_new_pusher(conn_ptr) == false) {
+ srs_trace("push connection is repeated and rejected, fd:%d, streamid:%s",
+ conn_ptr->get_conn(), conn_ptr->get_streamid().c_str());
+ conn_ptr->close();
+ return;
+ }
+ }
+ srs_trace("new conn added fd:%d, event:0x%08x", conn_ptr->get_conn(), events);
+ int ret = srt_epoll_add_usock(_handle_pollid, conn_ptr->get_conn(), &events);
+ if (ret < 0) {
+ srs_error("srt handle run add epoll error:%d", ret);
+ return;
+ }
+
+ return;
+}
+
+int srt_handle::get_srt_mode(SRTSOCKET conn_srt_socket) {
+ auto iter = _conn_map.find(conn_srt_socket);
+ if (iter == _conn_map.end()) {
+ return 0;
+ }
+ return iter->second->get_mode();
+}
+
+void srt_handle::insert_message_queue(request_message_t msg) {
+ std::unique_lock lock(_queue_mutex);
+ _message_queue.push(msg);
+}
+
+bool srt_handle::get_message_from_queue(request_message_t& msg) {
+ std::unique_lock lock(_queue_mutex);
+ bool ret = false;
+
+ if (_message_queue.empty()) {
+ return ret;
+ }
+ ret = true;
+ msg = _message_queue.front();
+ _message_queue.pop();
+
+ return ret;
+}
+
+void srt_handle::onwork()
+{
+ const unsigned int SRT_FD_MAX = 1024;
+ SRT_SOCKSTATUS status = SRTS_INIT;
+ std::string streamid;
+ int ret;
+ const int64_t DEF_TIMEOUT_INTERVAL = 30;
+
+ srs_trace("srt handle epoll work is starting...");
+ while(_run_flag)
+ {
+ SRTSOCKET read_fds[SRT_FD_MAX];
+ SRTSOCKET write_fds[SRT_FD_MAX];
+ int rfd_num = SRT_FD_MAX;
+ int wfd_num = SRT_FD_MAX;
+
+ srt_now_ms = now_ms();
+
+ request_message_t msg;
+
+ if (get_message_from_queue(msg)) {
+ add_newconn(msg.conn_ptr, msg.events);
+ }
+
+ check_alive();
+
+ ret = srt_epoll_wait(_handle_pollid, read_fds, &rfd_num, write_fds, &wfd_num,
+ DEF_TIMEOUT_INTERVAL, 0, 0, 0, 0);
+ if (ret < 0) {
+ srs_info("srt handle epoll is timeout, ret=%d, srt_now_ms=%ld",
+ ret, srt_now_ms);
+ std::this_thread::sleep_for(std::chrono::milliseconds(30));
+ continue;
+ }
+
+ for (int index = 0; index < rfd_num; index++)
+ {
+ status = srt_getsockstate(read_fds[index]);
+ srs_info("srt handle read(push) rfd num:%d, status:%d, streamid:%s, read_fd",
+ rfd_num, status, streamid.c_str(), read_fds[index]);
+ handle_srt_socket(status, read_fds[index]);
+ }
+
+ for (int index = 0; index < wfd_num; index++)
+ {
+ status = srt_getsockstate(write_fds[index]);
+ streamid = UDT::getstreamid(write_fds[index]);
+ srs_info("srt handle write(puller) wfd num:%d, status:%d, streamid:%s, write_fd",
+ wfd_num, status, streamid.c_str(), write_fds[index]);
+ handle_srt_socket(status, write_fds[index]);
+ }
+
+ }
+}
+
+void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd) {
+ SRT_CONN_PTR srt_conn_ptr;
+ unsigned char data[DEF_DATA_SIZE];
+ int ret;
+ srt_conn_ptr = get_srt_conn(conn_fd);
+
+ if (!srt_conn_ptr) {
+ srs_error("handle_push_data fd:%d fail to find srt connection.", conn_fd);
+ return;
+ }
+
+ if (status != SRTS_CONNECTED) {
+ srs_error("handle_push_data error status:%d fd:%d", status, conn_fd);
+ close_push_conn(conn_fd);
+ return;
+ }
+
+ ret = srt_conn_ptr->read(data, DEF_DATA_SIZE);
+ if (ret <= 0) {
+ srs_error("handle_push_data srt connect read error:%d, fd:%d", ret, conn_fd);
+ close_push_conn(conn_fd);
+ return;
+ }
+ srt_conn_ptr->update_timestamp(srt_now_ms);
+
+ srt2rtmp::get_instance()->insert_data_message(data, ret, subpath);
+
+ //send data to subscriber(players)
+ //streamid, play map
+ auto streamid_iter = _streamid_map.find(subpath);
+ if (streamid_iter == _streamid_map.end()) {//no puler
+ srs_info("receive data size(%d) from pusher(%d) but no puller", ret, conn_fd);
+ return;
+ }
+ srs_info("receive data size(%d) from pusher(%d) to pullers, count:%d",
+ ret, conn_fd, streamid_iter->second.size());
+
+ for (auto puller_iter = streamid_iter->second.begin();
+ puller_iter != streamid_iter->second.end();
+ puller_iter++) {
+ auto player_conn = puller_iter->second;
+ if (!player_conn) {
+ srs_error("handle_push_data get srt connect error from fd:%d", puller_iter->first);
+ continue;
+ }
+ int write_ret = player_conn->write(data, ret);
+ srs_info("send data size(%d) to puller fd:%d", write_ret, puller_iter->first);
+ if (write_ret > 0) {
+ puller_iter->second->update_timestamp(srt_now_ms);
+ }
+ }
+
+ return;
+}
+
+void srt_handle::check_alive() {
+ long long diff_t;
+ std::list conn_list;
+
+ if (_last_check_alive_ts == 0) {
+ _last_check_alive_ts = srt_now_ms;
+ return;
+ }
+ diff_t = srt_now_ms - _last_check_alive_ts;
+ if (diff_t < CHECK_ALIVE_INTERVAL) {
+ return;
+ }
+
+ for (auto conn_iter = _conn_map.begin();
+ conn_iter != _conn_map.end();
+ conn_iter++)
+ {
+ long long timeout = srt_now_ms - conn_iter->second->get_last_ts();
+ if (timeout > CHECK_ALIVE_TIMEOUT) {
+ conn_list.push_back(conn_iter->second);
+ }
+ }
+
+ for (auto del_iter = conn_list.begin();
+ del_iter != conn_list.end();
+ del_iter++)
+ {
+ SRT_CONN_PTR conn_ptr = *del_iter;
+ if (conn_ptr->get_mode() == PUSH_SRT_MODE) {
+ srs_warn("check alive close pull connection fd:%d, streamid:%s",
+ conn_ptr->get_conn(), conn_ptr->get_subpath().c_str());
+ close_push_conn(conn_ptr->get_conn());
+ } else if (conn_ptr->get_mode() == PULL_SRT_MODE) {
+ srs_warn("check alive close pull connection fd:%d, streamid:%s",
+ conn_ptr->get_conn(), conn_ptr->get_subpath().c_str());
+ close_pull_conn(conn_ptr->get_conn(), conn_ptr->get_subpath());
+ } else {
+ srs_error("check_alive get unkown srt mode:%d, fd:%d",
+ conn_ptr->get_mode(), conn_ptr->get_conn());
+ assert(0);
+ }
+ }
+}
+
+void srt_handle::close_push_conn(SRTSOCKET srtsocket) {
+ auto iter = _conn_map.find(srtsocket);
+
+ if (iter != _conn_map.end()) {
+ SRT_CONN_PTR conn_ptr = iter->second;
+ auto push_iter = _push_conn_map.find(conn_ptr->get_subpath());
+ if (push_iter != _push_conn_map.end()) {
+ _push_conn_map.erase(push_iter);
+ }
+ _conn_map.erase(iter);
+ conn_ptr->close();
+ }
+
+ srt_epoll_remove_usock(_handle_pollid, srtsocket);
+
+ return;
+}
+
+bool srt_handle::add_new_pusher(SRT_CONN_PTR conn_ptr) {
+ auto push_iter = _push_conn_map.find(conn_ptr->get_subpath());
+ if (push_iter != _push_conn_map.end()) {
+ return false;
+ }
+ _push_conn_map.insert(std::make_pair(conn_ptr->get_subpath(), conn_ptr));
+ _conn_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr));
+ srs_trace("srt_handle add new pusher streamid:%s, subpath:%s",
+ conn_ptr->get_streamid().c_str(), conn_ptr->get_subpath().c_str());
+ return true;
+}
+
+void srt_handle::handle_pull_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd) {
+ srs_info("handle_pull_data status:%d, subpath:%s, fd:%d",
+ status, subpath.c_str(), conn_fd);
+ auto conn_ptr = get_srt_conn(conn_fd);
+ if (!conn_ptr) {
+ srs_error("handle_pull_data fail to find fd(%d)", conn_fd);
+ assert(0);
+ return;
+ }
+ conn_ptr->update_timestamp(srt_now_ms);
+}
+
+void srt_handle::handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd)
+{
+ std::string subpath;
+ int mode;
+ auto conn_ptr = get_srt_conn(conn_fd);
+
+ if (!conn_ptr) {
+ if (status != SRTS_CLOSED) {
+ srs_error("handle_srt_socket find srt connection error, fd:%d, status:%d",
+ conn_fd, status);
+ }
+ return;
+ }
+ bool ret = get_streamid_info(conn_ptr->get_streamid(), mode, subpath);
+ if (!ret) {
+ conn_ptr->close();
+ conn_ptr = nullptr;
+ return;
+ }
+
+ if (mode == PUSH_SRT_MODE) {
+ switch (status)
+ {
+ case SRTS_CONNECTED:
+ {
+ handle_push_data(status, subpath, conn_fd);
+ break;
+ }
+ case SRTS_BROKEN:
+ {
+ srs_warn("srt push disconnected event fd:%d, streamid:%s",
+ conn_fd, conn_ptr->get_streamid().c_str());
+ close_push_conn(conn_fd);
+ break;
+ }
+ default:
+ srs_error("push mode unkown status:%d, fd:%d", status, conn_fd);
+ break;
+ }
+ } else if (mode == PULL_SRT_MODE) {
+ switch (status)
+ {
+ case SRTS_CONNECTED:
+ {
+ handle_pull_data(status, subpath, conn_fd);
+ break;
+ }
+ case SRTS_BROKEN:
+ {
+ srs_warn("srt pull disconnected fd:%d, streamid:%s",
+ conn_fd, conn_ptr->get_streamid().c_str());
+ close_pull_conn(conn_fd, subpath);
+ break;
+ }
+ default:
+ srs_error("pull mode unkown status:%d, fd:%d", status, conn_fd);
+ break;
+ }
+ } else {
+ assert(0);
+ }
+ return;
+}
\ No newline at end of file
diff --git a/trunk/src/srt/srt_handle.hpp b/trunk/src/srt/srt_handle.hpp
new file mode 100644
index 0000000000..da555b7149
--- /dev/null
+++ b/trunk/src/srt/srt_handle.hpp
@@ -0,0 +1,80 @@
+#ifndef SRT_HANDLE_H
+#define SRT_HANDLE_H
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include "srt_conn.hpp"
+#include "srt_to_rtmp.hpp"
+
+typedef struct {
+ SRT_CONN_PTR conn_ptr;
+ int events;
+} request_message_t;
+
+class srt_handle {
+public:
+ srt_handle();
+ ~srt_handle();
+
+ int start();//create srt epoll and create epoll thread
+ void stop();//close srt epoll and end epoll thread
+
+ void insert_message_queue(request_message_t msg);
+ bool get_message_from_queue(request_message_t& msg);
+
+private:
+ //add new srt connection into epoll event
+ void add_newconn(SRT_CONN_PTR conn_ptr, int events);
+ //get srt conn object by srt socket
+ SRT_CONN_PTR get_srt_conn(SRTSOCKET conn_srt_socket);
+ //get srt connect mode: push or pull
+ int get_srt_mode(SRTSOCKET conn_srt_socket);
+
+ void onwork();//epoll thread loop
+ //handle recv/send srt socket
+ void handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd);
+ void handle_push_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd);
+ void handle_pull_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd);
+
+ //add new puller into puller list and conn_map
+ void add_new_puller(SRT_CONN_PTR, std::string stream_id);
+ //remove pull srt from play list
+ void close_pull_conn(SRTSOCKET srtsocket, std::string stream_id);
+
+ //add new pusher into pusher map:
+ bool add_new_pusher(SRT_CONN_PTR conn_ptr);
+ //remove push connection and remove epoll
+ void close_push_conn(SRTSOCKET srtsocket);
+
+ //check srt connection whether it's still alive.
+ void check_alive();
+
+ //debug statics
+ void debug_statics(SRTSOCKET srtsocket, const std::string& streamid);
+
+private:
+ bool _run_flag;
+ int _handle_pollid;
+
+ std::unordered_map _conn_map;//save all srt connection: pull or push
+ std::shared_ptr _work_thread_ptr;
+
+ //save push srt connection for prevent from repeat push connection
+ std::unordered_map _push_conn_map;//key:streamid, value:SRT_CONN_PTR
+ //streamid, play map
+ std::unordered_map> _streamid_map;
+
+ std::mutex _queue_mutex;
+ std::queue _message_queue;
+
+ long long _last_timestamp;
+ long long _last_check_alive_ts;
+};
+
+#endif //SRT_HANDLE_H
\ No newline at end of file
diff --git a/trunk/src/srt/srt_server.cpp b/trunk/src/srt/srt_server.cpp
new file mode 100644
index 0000000000..021f714ec8
--- /dev/null
+++ b/trunk/src/srt/srt_server.cpp
@@ -0,0 +1,253 @@
+#include "srt_server.hpp"
+#include "srt_handle.hpp"
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+#include
+
+srt_server::srt_server(unsigned short port):listen_port(port)
+ ,server_socket(-1)
+{
+ handle_ptr = std::make_shared();
+}
+
+srt_server::~srt_server()
+{
+
+}
+
+int srt_server::init_srt() {
+ if (server_socket != -1) {
+ return -1;
+ }
+
+ server_socket = srt_create_socket();
+ sockaddr_in sa;
+ memset(&sa, 0, sizeof sa);
+ sa.sin_family = AF_INET;
+ sa.sin_port = htons(listen_port);
+
+ sockaddr* psa = (sockaddr*)&sa;
+
+ int ret = srt_bind(server_socket, psa, sizeof(sa));
+ if ( ret == SRT_ERROR )
+ {
+ srt_close(server_socket);
+ srs_error("srt bind error: %d", ret);
+ return -1;
+ }
+
+ ret = srt_listen(server_socket, 5);
+ if (ret == SRT_ERROR)
+ {
+ srt_close(server_socket);
+ srs_error("srt listen error: %d", ret);
+ return -2;
+ }
+
+ _pollid = srt_epoll_create();
+ if (_pollid < -1) {
+ srs_error("srt server srt_epoll_create error, port=%d", listen_port);
+ return -1;
+ }
+
+ int events = SRT_EPOLL_IN | SRT_EPOLL_ERR;
+ ret = srt_epoll_add_usock(_pollid, server_socket, &events);
+ if (ret < 0) {
+ srs_error("srt server run add epoll error:%d", ret);
+ return ret;
+ }
+
+ srs_trace("srt server listen port=%d, server_fd=%d", listen_port, server_socket);
+
+ return 0;
+}
+
+int srt_server::start()
+{
+ int ret;
+
+ if ((ret = init_srt()) < 0) {
+ return ret;
+ }
+ ret = handle_ptr->start();
+ if (ret < 0) {
+ return ret;
+ }
+
+ run_flag = true;
+ srs_trace("srt server is starting... port(%d)", listen_port);
+ thread_run_ptr = std::make_shared(&srt_server::on_work, this);
+ return 0;
+}
+
+void srt_server::stop()
+{
+ run_flag = false;
+ if (!thread_run_ptr) {
+ return;
+ }
+ thread_run_ptr->join();
+
+ handle_ptr->stop();
+ return;
+}
+
+void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr) {
+ SRTSOCKET conn_fd = -1;
+ sockaddr_in scl;
+ int sclen = sizeof(scl);
+ int conn_event;// = SRT_EPOLL_IN |SRT_EPOLL_OUT| SRT_EPOLL_ERR;
+
+ switch(status) {
+ case SRTS_LISTENING:
+ {
+ conn_fd = srt_accept(input_fd, (sockaddr*)&scl, &sclen);
+ if (conn_fd == -1) {
+ return;
+ }
+ //add new srt connect into srt handle
+ std::string streamid = UDT::getstreamid(conn_fd);
+ if (!is_streamid_valid(streamid)) {
+ srs_trace("srt streamid(%s) error, fd:%d", streamid.c_str(), conn_fd);
+ srt_close(conn_fd);
+ return;
+ }
+ SRT_CONN_PTR srt_conn_ptr = std::make_shared(conn_fd, streamid);
+
+ std::string vhost_str = srt_conn_ptr->get_vhost();
+ srs_trace("new srt connection streamid:%s, fd:%d, vhost:%s",
+ streamid.c_str(), conn_fd, vhost_str.c_str());
+ SrsConfDirective* vhost_p = _srs_config->get_vhost(vhost_str, true);
+ if (!vhost_p) {
+ srs_trace("srt streamid(%s): no vhost %s, fd:%d",
+ streamid.c_str(), vhost_str.c_str(), conn_fd);
+ srt_conn_ptr->close();
+ return;
+ }
+ if (srt_conn_ptr->get_mode() == PULL_SRT_MODE) {
+ //add SRT_EPOLL_IN for information notify
+ conn_event = SRT_EPOLL_IN | SRT_EPOLL_ERR;//not inlucde SRT_EPOLL_OUT for save cpu
+ } else if (srt_conn_ptr->get_mode() == PUSH_SRT_MODE) {
+ conn_event = SRT_EPOLL_IN | SRT_EPOLL_ERR;
+ } else {
+ srs_trace("stream mode error, it shoulde be m=push or m=pull, streamid:%s",
+ srt_conn_ptr->get_streamid().c_str());
+ srt_conn_ptr->close();
+ return;
+ }
+ request_message_t msg = {srt_conn_ptr, conn_event};
+ handle_ptr->insert_message_queue(msg);
+ break;
+ }
+ case SRTS_CONNECTED:
+ {
+ srs_trace("srt connected: socket=%d, mode:%s", input_fd, dscr.c_str());
+ break;
+ }
+ case SRTS_BROKEN:
+ {
+ srt_epoll_remove_usock(_pollid, input_fd);
+ srt_close(input_fd);
+ srs_warn("srt close: socket=%d", input_fd);
+ break;
+ }
+ default:
+ {
+ srs_error("srt server unkown status:%d", status);
+ }
+ }
+}
+
+void srt_server::on_work()
+{
+ const unsigned int SRT_FD_MAX = 100;
+ srs_trace("srt server is working port(%d)", listen_port);
+ while (run_flag)
+ {
+ SRTSOCKET read_fds[SRT_FD_MAX];
+ SRTSOCKET write_fds[SRT_FD_MAX];
+ int rfd_num = SRT_FD_MAX;
+ int wfd_num = SRT_FD_MAX;
+
+ int ret = srt_epoll_wait(_pollid, read_fds, &rfd_num, write_fds, &wfd_num, -1,
+ nullptr, nullptr, nullptr, nullptr);
+ if (ret < 0) {
+ continue;
+ }
+ srs_trace("srt server epoll get: ret=%d, rfd_num=%d, wfd_num=%d",
+ ret, rfd_num, wfd_num);
+
+ for (int index = 0; index < rfd_num; index++) {
+ SRT_SOCKSTATUS status = srt_getsockstate(read_fds[index]);
+ srt_handle_connection(status, read_fds[index], "read fd");
+ }
+
+ for (int index = 0; index < wfd_num; index++) {
+ SRT_SOCKSTATUS status = srt_getsockstate(write_fds[index]);
+ srt_handle_connection(status, read_fds[index], "write fd");
+ }
+ }
+}
+
+SrtServerAdapter::SrtServerAdapter()
+{
+}
+
+SrtServerAdapter::~SrtServerAdapter()
+{
+}
+
+srs_error_t SrtServerAdapter::initialize()
+{
+ srs_error_t err = srs_success;
+
+ // TODO: FIXME: We could fork processes here, because here only ST is initialized.
+
+ return err;
+}
+
+srs_error_t SrtServerAdapter::run()
+{
+ srs_error_t err = srs_success;
+
+ // TODO: FIXME: We could start a coroutine to dispatch SRT task to processes.
+
+ if(_srs_config->get_srt_enabled()) {
+ srs_trace("srt server is enabled...");
+ unsigned short srt_port = _srs_config->get_srt_listen_port();
+ srs_trace("srt server listen port:%d", srt_port);
+ err = srt2rtmp::get_instance()->init();
+ if (err != srs_success) {
+ return srs_error_wrap(err, "srt start srt2rtmp error");
+ }
+
+ srt_ptr = std::make_shared(srt_port);
+ if (!srt_ptr) {
+ return srs_error_wrap(err, "srt listen %d", srt_port);
+ }
+ } else {
+ srs_trace("srt server is disabled...");
+ }
+
+ if(_srs_config->get_srt_enabled()) {
+ srt_ptr->start();
+ }
+
+ return err;
+}
+
+void SrtServerAdapter::stop()
+{
+ // TODO: FIXME: If forked processes, we should do cleanup.
+}
diff --git a/trunk/src/srt/srt_server.hpp b/trunk/src/srt/srt_server.hpp
new file mode 100644
index 0000000000..27bb3bdfbd
--- /dev/null
+++ b/trunk/src/srt/srt_server.hpp
@@ -0,0 +1,53 @@
+#ifndef SRT_SERVER_H
+#define SRT_SERVER_H
+
+#include
+
+#include
+#include
+
+#include
+
+class srt_handle;
+
+class srt_server {
+public:
+ srt_server(unsigned short port);
+ ~srt_server();
+
+ int start();//init srt handl and create srt main thread loop
+ void stop();//stop srt main thread loop
+
+private:
+ //init srt socket and srt epoll
+ int init_srt();
+ //srt main epoll loop
+ void on_work();
+ //accept new srt connection
+ void srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd, const std::string& dscr);
+
+private:
+ unsigned short listen_port;
+ SRTSOCKET server_socket;
+ int _pollid;
+ bool run_flag;
+ std::shared_ptr thread_run_ptr;
+ std::shared_ptr handle_ptr;
+};
+
+typedef std::shared_ptr SRT_SERVER_PTR;
+
+class SrtServerAdapter : public ISrsHybridServer
+{
+private:
+ SRT_SERVER_PTR srt_ptr;
+public:
+ SrtServerAdapter();
+ virtual ~SrtServerAdapter();
+public:
+ virtual srs_error_t initialize();
+ virtual srs_error_t run();
+ virtual void stop();
+};
+
+#endif//SRT_SERVER_H
\ No newline at end of file
diff --git a/trunk/src/srt/srt_to_rtmp.cpp b/trunk/src/srt/srt_to_rtmp.cpp
new file mode 100644
index 0000000000..ecb348b201
--- /dev/null
+++ b/trunk/src/srt/srt_to_rtmp.cpp
@@ -0,0 +1,446 @@
+#include "srt_to_rtmp.hpp"
+#include
+#include
+#include
+#include
+#include
+#include
+#include "stringex.hpp"
+
+std::shared_ptr srt2rtmp::s_srt2rtmp_ptr;
+
+std::shared_ptr srt2rtmp::get_instance() {
+ if (!s_srt2rtmp_ptr) {
+ s_srt2rtmp_ptr = std::make_shared();
+ }
+ return s_srt2rtmp_ptr;
+}
+
+srt2rtmp::srt2rtmp() {
+
+}
+
+srt2rtmp::~srt2rtmp() {
+ release();
+}
+
+srs_error_t srt2rtmp::init() {
+ srs_error_t err = srs_success;
+
+ if (_trd_ptr.get() != nullptr) {
+ return srs_error_wrap(err, "don't start thread again");
+ }
+
+ _trd_ptr = std::make_shared("srt2rtmp", this);
+
+ if ((err = _trd_ptr->start()) != srs_success) {
+ return srs_error_wrap(err, "start thread");
+ }
+ srs_trace("srt2rtmp start coroutine...");
+
+ return err;
+}
+
+void srt2rtmp::release() {
+ if (!_trd_ptr) {
+ return;
+ }
+ _trd_ptr->stop();
+ _trd_ptr = nullptr;
+}
+
+void srt2rtmp::insert_data_message(unsigned char* data_p, unsigned int len, const std::string& key_path) {
+ std::unique_lock locker(_mutex);
+
+ SRT_DATA_MSG_PTR msg_ptr = std::make_shared(data_p, len, key_path);
+ _msg_queue.push(msg_ptr);
+ //_notify_cond.notify_one();
+ return;
+}
+
+SRT_DATA_MSG_PTR srt2rtmp::get_data_message() {
+ std::unique_lock locker(_mutex);
+ SRT_DATA_MSG_PTR msg_ptr;
+
+ if (_msg_queue.empty())
+ {
+ return msg_ptr;
+ }
+ //while (_msg_queue.empty()) {
+ // _notify_cond.wait(locker);
+ //}
+
+ msg_ptr = _msg_queue.front();
+ _msg_queue.pop();
+ return msg_ptr;
+}
+
+//the cycle is running in srs coroutine
+srs_error_t srt2rtmp::cycle() {
+ srs_error_t err = srs_success;
+
+ while(true) {
+ SRT_DATA_MSG_PTR msg_ptr = get_data_message();
+
+ if (!msg_ptr) {
+ srs_usleep((30 * SRS_UTIME_MILLISECONDS));
+ } else {
+ handle_ts_data(msg_ptr);
+ }
+
+ if ((err = _trd_ptr->pull()) != srs_success) {
+ return srs_error_wrap(err, "forwarder");
+ }
+ }
+}
+
+void srt2rtmp::handle_ts_data(SRT_DATA_MSG_PTR data_ptr) {
+ RTMP_CLIENT_PTR rtmp_ptr;
+ auto iter = _rtmp_client_map.find(data_ptr->get_path());
+ if (iter == _rtmp_client_map.end()) {
+ srs_trace("new rtmp client for srt upstream, key_path:%s", data_ptr->get_path().c_str());
+ rtmp_ptr = std::make_shared(data_ptr->get_path());
+ _rtmp_client_map.insert(std::make_pair(data_ptr->get_path(), rtmp_ptr));
+ } else {
+ rtmp_ptr = iter->second;
+ }
+
+ rtmp_ptr->receive_ts_data(data_ptr);
+
+ return;
+}
+
+rtmp_client::rtmp_client(std::string key_path):_key_path(key_path)
+ , _connect_flag(false) {
+ _ts_demux_ptr = std::make_shared();
+ _avc_ptr = std::make_shared();
+ _aac_ptr = std::make_shared();
+ std::vector ret_vec;
+
+ string_split(key_path, "/", ret_vec);
+
+ if (ret_vec.size() >= 3) {
+ _vhost = ret_vec[0];
+ _appname = ret_vec[1];
+ _streamname = ret_vec[2];
+ } else {
+ _vhost = "DEFAULT_VHOST";
+ _appname = ret_vec[0];
+ _streamname = ret_vec[1];
+ }
+ char url_sz[128];
+ sprintf(url_sz, "rtmp://127.0.0.1/%s?vhost=%s/%s",
+ _appname.c_str(), _vhost.c_str(), _streamname.c_str());
+ _url = url_sz;
+
+ _h264_sps_changed = false;
+ _h264_pps_changed = false;
+ _h264_sps_pps_sent = false;
+ srs_trace("rtmp client construct url:%s", url_sz);
+}
+
+rtmp_client::~rtmp_client() {
+
+}
+
+void rtmp_client::close() {
+ _connect_flag = false;
+ if (!_rtmp_conn_ptr) {
+ return;
+ }
+ _rtmp_conn_ptr->close();
+ _rtmp_conn_ptr = nullptr;
+
+}
+
+srs_error_t rtmp_client::connect() {
+ srs_error_t err = srs_success;
+ srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT;
+ srs_utime_t sto = SRS_CONSTS_RTMP_PULSE;
+
+ if (_connect_flag) {
+ return srs_success;
+ }
+
+ if (_rtmp_conn_ptr.get() != nullptr) {
+ return srs_error_wrap(err, "repeated connect %s failed, cto=%dms, sto=%dms.",
+ _url.c_str(), srsu2msi(cto), srsu2msi(sto));
+ }
+
+ _rtmp_conn_ptr = std::make_shared(_url, cto, sto);
+
+ if ((err = _rtmp_conn_ptr->connect()) != srs_success) {
+ close();
+ return srs_error_wrap(err, "connect %s failed, cto=%dms, sto=%dms.",
+ _url.c_str(), srsu2msi(cto), srsu2msi(sto));
+ }
+
+ if ((err = _rtmp_conn_ptr->publish(SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE)) != srs_success) {
+ close();
+ return srs_error_wrap(err, "publish error, url:%s", _url.c_str());
+ }
+ _connect_flag = true;
+ return err;
+}
+
+void rtmp_client::receive_ts_data(SRT_DATA_MSG_PTR data_ptr) {
+ _ts_demux_ptr->decode(data_ptr, shared_from_this());//on_data_callback is the decode callback
+ return;
+}
+
+srs_error_t rtmp_client::write_h264_sps_pps(uint32_t dts, uint32_t pts) {
+ srs_error_t err = srs_success;
+
+ // TODO: FIMXE: there exists bug, see following comments.
+ // when sps or pps changed, update the sequence header,
+ // for the pps maybe not changed while sps changed.
+ // so, we must check when each video ts message frame parsed.
+ if (!_h264_sps_changed || !_h264_pps_changed) {
+ return err;
+ }
+
+ // h264 raw to h264 packet.
+ std::string sh;
+ if ((err = _avc_ptr->mux_sequence_header(_h264_sps, _h264_pps, dts, pts, sh)) != srs_success) {
+ return srs_error_wrap(err, "mux sequence header");
+ }
+
+ // h264 packet to flv packet.
+ int8_t frame_type = SrsVideoAvcFrameTypeKeyFrame;
+ int8_t avc_packet_type = SrsVideoAvcFrameTraitSequenceHeader;
+ char* flv = NULL;
+ int nb_flv = 0;
+ if ((err = _avc_ptr->mux_avc2flv(sh, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) {
+ return srs_error_wrap(err, "avc to flv");
+ }
+
+ // the timestamp in rtmp message header is dts.
+ uint32_t timestamp = dts;
+ if ((err = rtmp_write_packet(SrsFrameTypeVideo, timestamp, flv, nb_flv)) != srs_success) {
+ return srs_error_wrap(err, "write packet");
+ }
+
+ // reset sps and pps.
+ _h264_sps_changed = false;
+ _h264_pps_changed = false;
+ _h264_sps_pps_sent = true;
+
+ return err;
+}
+
+srs_error_t rtmp_client::write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts) {
+ srs_error_t err = srs_success;
+
+ // when sps or pps not sent, ignore the packet.
+ // @see https://github.com/ossrs/srs/issues/203
+ if (!_h264_sps_pps_sent) {
+ return srs_error_new(ERROR_H264_DROP_BEFORE_SPS_PPS, "drop sps/pps");
+ }
+
+ // 5bits, 7.3.1 NAL unit syntax,
+ // ISO_IEC_14496-10-AVC-2003.pdf, page 44.
+ // 7: SPS, 8: PPS, 5: I Frame, 1: P Frame
+ SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f);
+
+ // for IDR frame, the frame is keyframe.
+ SrsVideoAvcFrameType frame_type = SrsVideoAvcFrameTypeInterFrame;
+ if (nal_unit_type == SrsAvcNaluTypeIDR) {
+ frame_type = SrsVideoAvcFrameTypeKeyFrame;
+ }
+
+ std::string ibp;
+ if ((err = _avc_ptr->mux_ipb_frame(frame, frame_size, ibp)) != srs_success) {
+ return srs_error_wrap(err, "mux frame");
+ }
+
+ int8_t avc_packet_type = SrsVideoAvcFrameTraitNALU;
+ char* flv = NULL;
+ int nb_flv = 0;
+ if ((err = _avc_ptr->mux_avc2flv(ibp, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) {
+ return srs_error_wrap(err, "mux avc to flv");
+ }
+
+ // the timestamp in rtmp message header is dts.
+ uint32_t timestamp = dts;
+ return rtmp_write_packet(SrsFrameTypeVideo, timestamp, flv, nb_flv);
+}
+
+srs_error_t rtmp_client::write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts) {
+ srs_error_t err = srs_success;
+
+ char* data = NULL;
+ int size = 0;
+ if ((err = _aac_ptr->mux_aac2flv(frame, frame_size, codec, dts, &data, &size)) != srs_success) {
+ return srs_error_wrap(err, "mux aac to flv");
+ }
+
+ return rtmp_write_packet(SrsFrameTypeAudio, dts, data, size);
+}
+
+srs_error_t rtmp_client::rtmp_write_packet(char type, uint32_t timestamp, char* data, int size) {
+ srs_error_t err = srs_success;
+ SrsSharedPtrMessage* msg = NULL;
+
+ if ((err = srs_rtmp_create_msg(type, timestamp, data, size, _rtmp_conn_ptr->sid(), &msg)) != srs_success) {
+ return srs_error_wrap(err, "create message");
+ }
+ srs_assert(msg);
+
+ // send out encoded msg.
+ if ((err = _rtmp_conn_ptr->send_and_free_message(msg)) != srs_success) {
+ close();
+ return srs_error_wrap(err, "send messages");
+ }
+
+ return err;
+}
+
+srs_error_t rtmp_client::on_ts_video(std::shared_ptr avs_ptr, uint64_t dts, uint64_t pts) {
+ srs_error_t err = srs_success;
+
+ // ensure rtmp connected.
+ if ((err = connect()) != srs_success) {
+ return srs_error_wrap(err, "connect");
+ }
+
+ // send each frame.
+ while (!avs_ptr->empty()) {
+ char* frame = NULL;
+ int frame_size = 0;
+ if ((err = _avc_ptr->annexb_demux(avs_ptr.get(), &frame, &frame_size)) != srs_success) {
+ return srs_error_wrap(err, "demux annexb");
+ }
+
+ //srs_trace_data(frame, frame_size, "video annexb demux:");
+ // 5bits, 7.3.1 NAL unit syntax,
+ // ISO_IEC_14496-10-AVC-2003.pdf, page 44.
+ // 7: SPS, 8: PPS, 5: I Frame, 1: P Frame
+ SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f);
+
+ // ignore the nalu type sps(7), pps(8), aud(9)
+ if (nal_unit_type == SrsAvcNaluTypeAccessUnitDelimiter) {
+ continue;
+ }
+
+ // for sps
+ if (_avc_ptr->is_sps(frame, frame_size)) {
+ std::string sps;
+ if ((err = _avc_ptr->sps_demux(frame, frame_size, sps)) != srs_success) {
+ return srs_error_wrap(err, "demux sps");
+ }
+
+ if (_h264_sps == sps) {
+ continue;
+ }
+ _h264_sps_changed = true;
+ _h264_sps = sps;
+
+ if ((err = write_h264_sps_pps(dts, pts)) != srs_success) {
+ return srs_error_wrap(err, "write sps/pps");
+ }
+ continue;
+ }
+
+ // for pps
+ if (_avc_ptr->is_pps(frame, frame_size)) {
+ std::string pps;
+ if ((err = _avc_ptr->pps_demux(frame, frame_size, pps)) != srs_success) {
+ return srs_error_wrap(err, "demux pps");
+ }
+
+ if (_h264_pps == pps) {
+ continue;
+ }
+ _h264_pps_changed = true;
+ _h264_pps = pps;
+
+ if ((err = write_h264_sps_pps(dts, pts)) != srs_success) {
+ return srs_error_wrap(err, "write sps/pps");
+ }
+ continue;
+ }
+
+ // ibp frame.
+ // TODO: FIXME: we should group all frames to a rtmp/flv message from one ts message.
+ srs_info("mpegts: demux avc ibp frame size=%d, dts=%d", frame_size, dts);
+ if ((err = write_h264_ipb_frame(frame, frame_size, dts, pts)) != srs_success) {
+ return srs_error_wrap(err, "write frame");
+ }
+ }
+
+ return err;
+}
+
+srs_error_t rtmp_client::on_ts_audio(std::shared_ptr avs_ptr, uint64_t dts, uint64_t pts) {
+ srs_error_t err = srs_success;
+
+ // ensure rtmp connected.
+ if ((err = connect()) != srs_success) {
+ return srs_error_wrap(err, "connect");
+ }
+
+ // send each frame.
+ while (!avs_ptr->empty()) {
+ char* frame = NULL;
+ int frame_size = 0;
+ SrsRawAacStreamCodec codec;
+ if ((err = _aac_ptr->adts_demux(avs_ptr.get(), &frame, &frame_size, codec)) != srs_success) {
+ return srs_error_wrap(err, "demux adts");
+ }
+ //srs_trace("audio annexb demux sampling_frequency_index:%d, aac_packet_type:%d, sound_rate:%d, sound_size:%d",
+ // codec.sampling_frequency_index, codec.aac_packet_type, codec.sound_rate,
+ // codec.sound_size);
+ //srs_trace_data(frame, frame_size, "audio annexb demux:");
+ // ignore invalid frame,
+ // * atleast 1bytes for aac to decode the data.
+ if (frame_size <= 0) {
+ continue;
+ }
+
+ // generate sh.
+ if (_aac_specific_config.empty()) {
+ std::string sh;
+ if ((err = _aac_ptr->mux_sequence_header(&codec, sh)) != srs_success) {
+ return srs_error_wrap(err, "mux sequence header");
+ }
+ _aac_specific_config = sh;
+
+ codec.aac_packet_type = 0;
+
+ if ((err = write_audio_raw_frame((char*)sh.data(), (int)sh.length(), &codec, dts)) != srs_success) {
+ return srs_error_wrap(err, "write raw audio frame");
+ }
+ }
+
+ // audio raw data.
+ codec.aac_packet_type = 1;
+ if ((err = write_audio_raw_frame(frame, frame_size, &codec, dts)) != srs_success) {
+ return srs_error_wrap(err, "write audio raw frame");
+ }
+ }
+
+ return err;
+}
+
+void rtmp_client::on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type,
+ uint64_t dts, uint64_t pts)
+{
+ if (!data_ptr || (data_ptr->get_data() == nullptr) || (data_ptr->data_len() == 0)) {
+ assert(0);
+ return;
+ }
+
+ auto avs_ptr = std::make_shared((char*)data_ptr->get_data(), data_ptr->data_len());
+ dts = dts / 90;
+ pts = pts / 90;
+
+ if (media_type == STREAM_TYPE_VIDEO_H264) {
+ on_ts_video(avs_ptr, dts, pts);
+ } else if (media_type == STREAM_TYPE_AUDIO_AAC) {
+ on_ts_audio(avs_ptr, dts, pts);
+ } else {
+ srs_error("mpegts demux unkown stream type:0x%02x", media_type);
+ assert(0);
+ }
+ return;
+}
diff --git a/trunk/src/srt/srt_to_rtmp.hpp b/trunk/src/srt/srt_to_rtmp.hpp
new file mode 100644
index 0000000000..040b832b74
--- /dev/null
+++ b/trunk/src/srt/srt_to_rtmp.hpp
@@ -0,0 +1,101 @@
+#ifndef SRT_TO_RTMP_H
+#define SRT_TO_RTMP_H
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include "srt_data.hpp"
+#include "ts_demux.hpp"
+
+#define SRT_VIDEO_MSG_TYPE 0x01
+#define SRT_AUDIO_MSG_TYPE 0x02
+
+typedef std::shared_ptr RTMP_CONN_PTR;
+typedef std::shared_ptr AVC_PTR;
+typedef std::shared_ptr AAC_PTR;
+
+#define DEFAULT_VHOST "__default_host__"
+
+class rtmp_client : public ts_media_data_callback_I, public std::enable_shared_from_this {
+public:
+ rtmp_client(std::string key_path);
+ ~rtmp_client();
+
+ void receive_ts_data(SRT_DATA_MSG_PTR data_ptr);
+
+private:
+ virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts);
+
+ srs_error_t connect();
+ void close();
+
+private:
+ srs_error_t on_ts_video(std::shared_ptr avs_ptr, uint64_t dts, uint64_t pts);
+ srs_error_t on_ts_audio(std::shared_ptr avs_ptr, uint64_t dts, uint64_t pts);
+ virtual srs_error_t write_h264_sps_pps(uint32_t dts, uint32_t pts);
+ virtual srs_error_t write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts);
+ virtual srs_error_t write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts);
+
+private:
+ virtual srs_error_t rtmp_write_packet(char type, uint32_t timestamp, char* data, int size);
+
+private:
+ std::string _key_path;
+ std::string _url;
+ std::string _vhost;
+ std::string _appname;
+ std::string _streamname;
+ TS_DEMUX_PTR _ts_demux_ptr;
+
+private:
+ AVC_PTR _avc_ptr;
+ std::string _h264_sps;
+ bool _h264_sps_changed;
+ std::string _h264_pps;
+ bool _h264_pps_changed;
+ bool _h264_sps_pps_sent;
+private:
+ std::string _aac_specific_config;
+ AAC_PTR _aac_ptr;
+private:
+ RTMP_CONN_PTR _rtmp_conn_ptr;
+ bool _connect_flag;
+};
+
+typedef std::shared_ptr RTMP_CLIENT_PTR;
+
+class srt2rtmp : public ISrsCoroutineHandler {
+public:
+ static std::shared_ptr get_instance();
+ srt2rtmp();
+ virtual ~srt2rtmp();
+
+ srs_error_t init();
+ void release();
+
+ void insert_data_message(unsigned char* data_p, unsigned int len, const std::string& key_path);
+
+private:
+ SRT_DATA_MSG_PTR get_data_message();
+ virtual srs_error_t cycle();
+ void handle_ts_data(SRT_DATA_MSG_PTR data_ptr);
+
+private:
+ static std::shared_ptr s_srt2rtmp_ptr;
+ std::shared_ptr _trd_ptr;
+ std::mutex _mutex;
+ //std::condition_variable_any _notify_cond;
+ std::queue _msg_queue;
+
+ std::unordered_map _rtmp_client_map;
+};
+
+#endif
\ No newline at end of file
diff --git a/trunk/src/srt/stringex.hpp b/trunk/src/srt/stringex.hpp
new file mode 100644
index 0000000000..36e8eb8b46
--- /dev/null
+++ b/trunk/src/srt/stringex.hpp
@@ -0,0 +1,39 @@
+#ifndef STRING_EX_H
+#define STRING_EX_H
+#include
+#include
+#include
+#include
+#include
+#include
+
+inline int string_split(const std::string& input_str, const std::string& split_str, std::vector& output_vec) {
+ if (input_str.length() == 0) {
+ return 0;
+ }
+
+ std::string tempString(input_str);
+ do {
+
+ size_t pos = tempString.find(split_str);
+ if (pos == tempString.npos) {
+ output_vec.push_back(tempString);
+ break;
+ }
+ std::string seg_str = tempString.substr(0, pos);
+ tempString = tempString.substr(pos+split_str.size());
+ output_vec.push_back(seg_str);
+ } while(tempString.size() > 0);
+
+ return output_vec.size();
+}
+
+inline std::string string_lower(const std::string input_str) {
+ std::string output_str(input_str);
+
+ std::transform(input_str.begin(), input_str.end(), output_str.begin(), ::tolower);
+
+ return output_str;
+}
+
+#endif//STRING_EX_H
\ No newline at end of file
diff --git a/trunk/src/srt/time_help.h b/trunk/src/srt/time_help.h
new file mode 100644
index 0000000000..301139df7a
--- /dev/null
+++ b/trunk/src/srt/time_help.h
@@ -0,0 +1,10 @@
+#ifndef TIME_HELP_H
+#define TIME_HELP_H
+#include
+
+inline long long now_ms() {
+ return std::chrono::duration_cast(
+ std::chrono::system_clock::now().time_since_epoch()).count();
+}
+
+#endif //TIME_HELP_H
\ No newline at end of file
diff --git a/trunk/src/srt/ts_demux.cpp b/trunk/src/srt/ts_demux.cpp
new file mode 100644
index 0000000000..4c10f871b2
--- /dev/null
+++ b/trunk/src/srt/ts_demux.cpp
@@ -0,0 +1,577 @@
+#include "ts_demux.hpp"
+#include
+#include
+
+ts_demux::ts_demux():_data_total(0)
+ ,_last_pid(0)
+ ,_last_dts(0)
+ ,_last_pts(0)
+{
+
+}
+
+ts_demux::~ts_demux() {
+
+}
+
+int ts_demux::decode_unit(unsigned char* data_p, std::string key_path, TS_DATA_CALLBACK_PTR callback)
+{
+ int pos = 0;
+ int npos = 0;
+ ts_header ts_header_info;
+
+ ts_header_info._sync_byte = data_p[pos];
+ pos++;
+
+ ts_header_info._transport_error_indicator = (data_p[pos]&0x80)>>7;
+ ts_header_info._payload_unit_start_indicator = (data_p[pos]&0x40)>>6;
+ ts_header_info._transport_priority = (data_p[pos]&0x20)>>5;
+ ts_header_info._PID = ((data_p[pos]<<8)|data_p[pos+1])&0x1FFF;
+ pos += 2;
+
+ ts_header_info._transport_scrambling_control = (data_p[pos]&0xC0)>>6;
+ ts_header_info._adaptation_field_control = (data_p[pos]&0x30)>>4;
+ ts_header_info._continuity_counter = (data_p[pos]&0x0F);
+ pos++;
+ npos = pos;
+
+ //printf("ts header(0x%02x) payload_unit_start_indicator:%d, pid:%d, adaptation_field_control:%d, pos:%d\r\n",
+ // ts_header_info._sync_byte,
+ // ts_header_info._payload_unit_start_indicator, ts_header_info._PID,
+ // ts_header_info._adaptation_field_control, pos);
+
+ adaptation_field* field_p = &(ts_header_info._adaptation_field_info);
+ // adaptation field
+ // 0x01 No adaptation_field, payload only
+ // 0x02 Adaptation_field only, no payload
+ // 0x03 Adaptation_field followed by payload
+ if( ts_header_info._adaptation_field_control == 2
+ || ts_header_info._adaptation_field_control == 3 ){
+ // adaptation_field()
+ field_p->_adaptation_field_length = data_p[pos];
+ pos++;
+
+ if( field_p->_adaptation_field_length > 0 ){
+ field_p->_discontinuity_indicator = (data_p[pos]&0x80)>>7;
+ field_p->_random_access_indicator = (data_p[pos]&0x40)>>6;
+ field_p->_elementary_stream_priority_indicator = (data_p[pos]&0x20)>>5;
+ field_p->_PCR_flag = (data_p[pos]&0x10)>>4;
+ field_p->_OPCR_flag = (data_p[pos]&0x08)>>3;
+ field_p->_splicing_point_flag = (data_p[pos]&0x04)>>2;
+ field_p->_transport_private_data_flag = (data_p[pos]&0x02)>>1;
+ field_p->_adaptation_field_extension_flag = (data_p[pos]&0x01);
+ pos++;
+
+ if( field_p->_PCR_flag == 1 ) { // PCR info
+ //program_clock_reference_base 33 uimsbf
+ //reserved 6 bslbf
+ //program_clock_reference_extension 9 uimsbf
+ pos += 6;
+ }
+ if( field_p->_OPCR_flag == 1 ) {
+ //original_program_clock_reference_base 33 uimsbf
+ //reserved 6 bslbf
+ //original_program_clock_reference_extension 9 uimsbf
+ pos += 6;
+ }
+ if( field_p->_splicing_point_flag == 1 ) {
+ //splice_countdown 8 tcimsbf
+ pos++;
+ }
+ if( field_p->_transport_private_data_flag == 1 ) {
+ //transport_private_data_length 8 uimsbf
+ field_p->_transport_private_data_length = data_p[pos];
+ pos++;
+ memcpy(field_p->_private_data_byte, data_p + pos, field_p->_transport_private_data_length);
+ }
+ if( field_p->_adaptation_field_extension_flag == 1 ) {
+ //adaptation_field_extension_length 8 uimsbf
+ field_p->_adaptation_field_extension_length = data_p[pos];
+ pos++;
+ //ltw_flag 1 bslbf
+ field_p->_ltw_flag = (data_p[pos]&0x80)>>7;
+ //piecewise_rate_flag 1 bslbf
+ field_p->_piecewise_rate_flag = (data_p[pos]&0x40)>>6;
+ //seamless_splice_flag 1 bslbf
+ field_p->_seamless_splice_flag = (data_p[pos]&0x20)>>5;
+ //reserved 5 bslbf
+ pos++;
+ if (field_p->_ltw_flag == 1) {
+ //ltw_valid_flag 1 bslbf
+ //ltw_offset 15 uimsbf
+ pos += 2;
+ }
+ if (field_p->_piecewise_rate_flag == 1) {
+ //reserved 2 bslbf
+ //piecewise_rate 22 uimsbf
+ pos += 3;
+ }
+ if (field_p->_seamless_splice_flag == 1) {
+ //splice_type 4 bslbf
+ //DTS_next_AU[32..30] 3 bslbf
+ //marker_bit 1 bslbf
+ //DTS_next_AU[29..15] 15 bslbf
+ //marker_bit 1 bslbf
+ //DTS_next_AU[14..0] 15 bslbf
+ //marker_bit 1 bslbf
+ pos += 5;
+ }
+ }
+ }
+ npos += sizeof(field_p->_adaptation_field_length) + field_p->_adaptation_field_length;
+ }
+
+ if(ts_header_info._adaptation_field_control == 1
+ || ts_header_info._adaptation_field_control == 3 ) {
+ // data_byte with placeholder
+ // payload parser
+ if(ts_header_info._PID == 0x00){
+ // PAT // program association table
+ if(ts_header_info._payload_unit_start_indicator) {
+ pos++;
+ }
+ _pat._table_id = data_p[pos];
+ pos++;
+ _pat._section_syntax_indicator = (data_p[pos]>>7)&0x01;
+ // skip 3 bits of 1 zero and 2 reserved
+ _pat._section_length = ((data_p[pos]<<8)|data_p[pos+1])&0x0FFF;
+ pos += 2;
+ _pat._transport_stream_id = (data_p[pos]<<8)|data_p[pos+1];
+ pos += 2;
+ // reserved 2 bits
+ _pat._version_number = (data_p[pos]&0x3E)>>1;
+ _pat._current_next_indicator = data_p[pos]&0x01;
+ pos++;
+ _pat._section_number = data_p[pos];
+ pos++;
+ _pat._last_section_number = data_p[pos];
+ assert(_pat._table_id == 0x00);
+ assert((188 - npos) > (_pat._section_length+3)); // PAT = section_length + 3
+ pos++;
+ _pat._pid_vec.clear();
+ for (;pos+4 <= _pat._section_length-5-4+9 + npos;) { // 4:CRC, 5:follow section_length item rpos + 4(following unit length) section_length + 9(above field and unit_start_first_byte )
+ PID_INFO pid_info;
+ //program_number 16 uimsbf
+ pid_info._program_number = data_p[pos]<<8|data_p[pos+1];
+ pos += 2;
+// reserved 3 bslbf
+
+ if (pid_info._program_number == 0) {
+// // network_PID 13 uimsbf
+ pid_info._network_id = (data_p[pos]<<8|data_p[pos+1])&0x1FFF;
+ //printf("#### network id:%d.\r\n", pid_info._network_id);
+ pos += 2;
+ }
+ else {
+// // program_map_PID 13 uimsbf
+ pid_info._pid = (data_p[pos]<<8|data_p[pos+1])&0x1FFF;
+ //printf("#### pmt id:%d.\r\n", pid_info._pid);
+ pos += 2;
+ }
+ _pat._pid_vec.push_back(pid_info);
+ // network_PID and program_map_PID save to list
+ }
+// CRC_32 use pat to calc crc32, eq
+ pos += 4;
+ }else if(ts_header_info._PID == 0x01){
+ // CAT // conditional access table
+ }else if(ts_header_info._PID == 0x02){
+ //TSDT // transport stream description table
+ }else if(ts_header_info._PID == 0x03){
+ //IPMP // IPMP control information table
+ // 0x0004-0x000F Reserved
+ // 0x0010-0x1FFE May be assigned as network_PID, Program_map_PID, elementary_PID, or for other purposes
+ }else if(ts_header_info._PID == 0x11){
+ // SDT // https://en.wikipedia.org/wiki/Service_Description_Table / https://en.wikipedia.org/wiki/MPEG_transport_stream
+ }else if(is_pmt(ts_header_info._PID)) {
+ if(ts_header_info._payload_unit_start_indicator)
+ pos++;
+ _pmt._table_id = data_p[pos];
+ pos++;
+ _pmt._section_syntax_indicator = (data_p[pos]>>7)&0x01;
+ // skip 3 bits of 1 zero and 2 reserved
+ _pmt._section_length = ((data_p[pos]<<8)|data_p[pos+1])&0x0FFF;
+ pos += 2;
+ _pmt._program_number = (data_p[pos]<<8)|data_p[pos+1];
+ pos += 2;
+ // reserved 2 bits
+ _pmt._version_number = (data_p[pos]&0x3E)>>1;
+ _pmt._current_next_indicator = data_p[pos]&0x01;
+ pos++;
+ _pmt._section_number = data_p[pos];
+ pos++;
+ _pmt._last_section_number = data_p[pos];
+ pos++;
+ // skip 3 bits for reserved 3 bslbf
+ _pmt._PCR_PID = ((data_p[pos]<<8)|data_p[pos+1])&0x1FFF; //PCR_PID 13 uimsbf
+ pos += 2;
+
+ //reserved 4 bslbf
+ _pmt._program_info_length = ((data_p[pos]<<8)|data_p[pos+1])&0x0FFF;//program_info_length 12 uimsbf
+ pos += 2;
+ assert(_pmt._table_id==0x02); // 0x02, // TS_program_map_section
+ memcpy(_pmt._dscr, data_p+pos, _pmt._program_info_length);
+// for (i = 0; i < N; i++) {
+// descriptor()
+// }
+ pos += _pmt._program_info_length;
+ _pmt._stream_pid_vec.clear();
+ _pmt._pid2steamtype.clear();
+
+ for (; pos + 5 <= _pmt._section_length + 4 - 4 + npos; ) { // pos(above field length) i+5(following unit length) section_length +3(PMT begin three bytes)+1(payload_unit_start_indicator) -4(crc32)
+ STREAM_PID_INFO pid_info;
+ pid_info._stream_type = data_p[pos];//stream_type 8 uimsbf 0x1B AVC video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video
+ pos++;
+ //reserved 3 bslbf
+ pid_info._elementary_PID = ((data_p[pos]<<8)|data_p[pos+1])&0x1FFF; //elementary_PID 13 uimsbf
+ pos += 2;
+ //reserved 4 bslbf
+ pid_info._ES_info_length = ((data_p[pos]<<8)|data_p[pos+1])&0x0FFF; //ES_info_length 12 uimsbf
+ pos += 2;
+ if( pos + pid_info._ES_info_length > _pmt._section_length + 4 - 4 + npos )
+ break;
+ int absES_info_length = pos + pid_info._ES_info_length;
+ for (; pos< absES_info_length; ) {
+ //descriptor()
+ int descriptor_tag = data_p[pos];
+ (void)descriptor_tag;
+ pos++;
+ int descriptor_length = data_p[pos];
+ pos++;
+ memcpy(pid_info._dscr, data_p + pos, descriptor_length);
+ pos += descriptor_length;
+ }
+ // save program_number(stream num) elementary_PID(PES PID) stream_type(stream codec)
+ //printf("pmt pid:%d, streamtype:%d, pos:%d\r\n", pid_info._elementary_PID, pid_info._stream_type, pos);
+ _pmt._stream_pid_vec.push_back(pid_info);
+ _pmt._pid2steamtype.insert(std::make_pair((unsigned short)pid_info._elementary_PID, pid_info._stream_type));
+ }
+ pos += 4;//CRC_32
+ }else if(ts_header_info._PID == 0x0042){
+ // USER
+ }else if(ts_header_info._PID == 0x1FFF){
+ // Null packet
+ }else{//pes packet or pure data packet
+ //bool isFound = false;
+ for (size_t i = 0; i < _pmt._stream_pid_vec.size(); i++) {
+ if(ts_header_info._PID == _pmt._stream_pid_vec[i]._elementary_PID){
+ //isFound = true;
+ if(ts_header_info._payload_unit_start_indicator){
+ unsigned char* ret_data_p = nullptr;
+ size_t ret_size = 0;
+
+ //callback last media data in data buffer
+ on_callback(callback, _last_pid, key_path, _last_dts, _last_pts);
+
+ pes_parse(data_p+npos, npos, &ret_data_p, ret_size, _last_dts, _last_pts);
+ if ((ret_data_p != nullptr) && (ret_size > 0)) {
+ insert_into_databuf(ret_data_p, ret_size, key_path, ts_header_info._PID);
+ }
+ }else{
+ //fwrite(p, 1, 188-(npos+pos), pes_info[i].fd);
+ insert_into_databuf(data_p + npos, 188-npos, key_path, ts_header_info._PID);
+ }
+ }
+ }
+ //if(!isFound){
+ // printf("unknown PID = %X \n", ts_header_info._PID);
+ //}
+ }
+ }
+
+ return 0;
+}
+int ts_demux::decode(SRT_DATA_MSG_PTR data_ptr, TS_DATA_CALLBACK_PTR callback)
+{
+ int ret = -1;
+ std::string path;
+
+ if (!data_ptr || (data_ptr->data_len() < 188) || (data_ptr->data_len()%188 != 0))
+ {
+ return -1;
+ }
+
+
+ unsigned int count = data_ptr->data_len()/188;
+ path = data_ptr->get_path();
+ for (unsigned int index = 0; index < count; index++)
+ {
+ ret = decode_unit(data_ptr->get_data() + 188*index, path, callback);
+ if (ret < 0)
+ {
+ break;
+ }
+ }
+ return ret;
+}
+
+void ts_demux::insert_into_databuf(unsigned char* data_p, size_t data_size, std::string key_path, unsigned short pid) {
+ _last_pid = pid;
+ _data_total += data_size;
+ _data_buffer_vec.push_back(std::make_shared(data_p, data_size, key_path));
+ return;
+}
+
+void ts_demux::on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid, std::string key_path,
+ uint64_t dts, uint64_t pts) {
+ if ((_data_total <=0 ) || (_data_buffer_vec.empty())) {
+ return;
+ }
+
+ auto iter = _pmt._pid2steamtype.find(pid);
+ if (iter == _pmt._pid2steamtype.end()) {
+ return;
+ }
+ unsigned char stream_type = iter->second;
+ auto total_data_ptr = std::make_shared(_data_total, key_path);
+ size_t pos = 0;
+
+ for (size_t index = 0; index < _data_buffer_vec.size(); index++) {
+ memcpy(total_data_ptr->get_data() + pos,
+ _data_buffer_vec[index]->get_data(),
+ _data_buffer_vec[index]->data_len());
+ pos += _data_buffer_vec[index]->data_len();
+ }
+ _data_buffer_vec.clear();
+ _data_total = 0;
+
+ callback->on_data_callback(total_data_ptr, stream_type, dts, pts);
+ return;
+}
+
+bool ts_demux::is_pmt(unsigned short pid) {
+ for (size_t index = 0; index < _pat._pid_vec.size(); index++) {
+ if (_pat._pid_vec[index]._program_number != 0) {
+ if (_pat._pid_vec[index]._pid == pid) {
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
+
+int ts_demux::pes_parse(unsigned char* p, size_t npos,
+ unsigned char** ret_pp, size_t& ret_size,
+ uint64_t& dts, uint64_t& pts) {
+ int pos = 0;
+ int packet_start_code_prefix = (p[pos]<<16)|(p[pos+1]<<8)|p[pos+2]; //packet_start_code_prefix 24 bslbf
+ pos += 3;
+ int stream_id = p[pos]; //stream_id 8 uimsbf
+ pos++;
+ //printf("pes parse %02x %02x.\r\n", p[pos], p[pos+1]);
+ int PES_packet_length = ((unsigned int)p[pos]<<8)|p[pos+1]; //PES_packet_length 16 uimsbf
+ (void)PES_packet_length;
+ pos += 2;
+ //printf("pes parse packet_start_code_prefix:%d, npos:%lu, PES_packet_length:%d, stream_id:%d.\r\n",
+ // packet_start_code_prefix, npos, PES_packet_length, stream_id);
+ assert(0x00000001 == packet_start_code_prefix);
+ if (stream_id != 188//program_stream_map 1011 1100
+ && stream_id != 190//padding_stream 1011 1110
+ && stream_id != 191//private_stream_2 1011 1111
+ && stream_id != 240//ECM 1111 0000
+ && stream_id != 241//EMM 1111 0001
+ && stream_id != 255//program_stream_directory 1111 1111
+ && stream_id != 242//DSMCC_stream 1111 0010
+ && stream_id != 248//ITU-T Rec. H.222.1 type E stream 1111 1000
+ )
+ {
+ assert(0x80 == p[pos]);
+ //skip 2bits//'10' 2 bslbf
+ int PES_scrambling_control = (p[pos]&30)>>4; //PES_scrambling_control 2 bslbf
+ (void)PES_scrambling_control;
+ int PES_priority = (p[pos]&0x08)>>3; //PES_priority 1 bslbf
+ (void)PES_priority;
+ int data_alignment_indicator = (p[pos]&0x04)>>2;//data_alignment_indicator 1 bslbf
+ (void)data_alignment_indicator;
+ int copyright = (p[pos]&0x02)>>1; //copyright 1 bslbf
+ (void)copyright;
+ int original_or_copy = (p[pos]&0x01);//original_or_copy 1 bslbf
+ (void)original_or_copy;
+ pos++;
+ int PTS_DTS_flags = (p[pos]&0xC0)>>6; //PTS_DTS_flags 2 bslbf
+ int ESCR_flag = (p[pos]&0x20)>>5; // ESCR_flag 1 bslbf
+ int ES_rate_flag = (p[pos]&0x10)>>4;//ES_rate_flag 1 bslbf
+ int DSM_trick_mode_flag = (p[pos]&0x08)>>3;//DSM_trick_mode_flag 1 bslbf
+ int additional_copy_info_flag = (p[pos]&0x04)>>2; //additional_copy_info_flag 1 bslbf
+ int PES_CRC_flag = (p[pos]&0x02)>>1; //PES_CRC_flag 1 bslbf
+ int PES_extension_flag = (p[pos]&0x01);//PES_extension_flag 1 bslbf
+ pos++;
+ int PES_header_data_length = p[pos]; //PES_header_data_length 8 uimsbf
+ (void)PES_header_data_length;
+ pos++;
+
+ if (PTS_DTS_flags == 2) {
+ // skip 4 bits '0010' 4 bslbf
+ // PTS [32..30] 3 bslbf
+ // marker_bit 1 bslbf
+ // PTS [29..15] 15 bslbf
+ // marker_bit 1 bslbf
+ // PTS [14..0] 15 bslbf
+ // marker_bit 1 bslbf
+ pts = (((p[pos]>>1)&0x07) << 30) | (p[pos+1]<<22) | (((p[pos+2]>>1)&0x7F)<<15) | (p[pos+3]<<7) | ((p[pos+4]>>1)&0x7F);
+ pos += 5;
+ }
+ if (PTS_DTS_flags == 3) {
+ // '0011' 4 bslbf
+ // PTS [32..30] 3 bslbf
+ // marker_bit 1 bslbf
+ //PTS [29..15] 15 bslbf
+ //marker_bit 1 bslbf
+ // PTS [14..0] 15 bslbf
+ // marker_bit 1 bslbf
+ pts = (((p[pos]>>1)&0x07) << 30) | (p[pos+1]<<22) | (((p[pos+2]>>1)&0x7F)<<15) | (p[pos+3]<<7) | ((p[pos+4]>>1)&0x7F);
+ pos += 5;
+ // '0001' 4 bslbf
+ // DTS [32..30] 3 bslbf
+ // marker_bit 1 bslbf
+ // DTS [29..15] 15 bslbf
+ // marker_bit 1 bslbf
+ // DTS [14..0] 15 bslbf
+ // marker_bit 1 bslbf
+ dts = (((p[pos]>>1)&0x07) << 30) | (p[pos+1]<<22) | (((p[pos+2]>>1)&0x7F)<<15) | (p[pos+3]<<7) | ((p[pos+4]>>1)&0x7F);
+ pos += 5;
+ }
+ if (ESCR_flag == 1) {
+ // reserved 2 bslbf
+ // ESCR_base[32..30] 3 bslbf
+ // marker_bit 1 bslbf
+ // ESCR_base[29..15] 15 bslbf
+ // marker_bit 1 bslbf
+ // ESCR_base[14..0] 15 bslbf
+ // marker_bit 1 bslbf
+ // ESCR_extension 9 uimsbf
+ // marker_bit 1 bslbf
+ uint64_t ESCR_base = ((((uint64_t)p[pos] >> 3) & 0x07) << 30) | (((uint64_t)p[pos] & 0x03) << 28) | ((uint64_t)p[pos + 1] << 20) | ((((uint64_t)p[pos + 2] >> 3) & 0x1F) << 15) | (((uint64_t)p[pos + 2] & 0x3) << 13) | ((uint64_t)p[pos + 3] << 5) | ((p[pos + 4] >> 3) & 0x1F);
+ int ESCR_extension = ((p[pos + 4] & 0x03) << 7) | ((p[pos + 5] >> 1) & 0x7F);
+ (void)ESCR_base;
+ (void)ESCR_extension;
+ pos += 6;
+ }
+ if (ES_rate_flag == 1) {
+ // marker_bit 1 bslbf
+ // ES_rate 22 uimsbf
+ // marker_bit 1 bslbf
+ int ES_rate = (p[pos]&0x7F)<<15 | (p[pos+1])<<7 | (p[pos+2]&0x7F)>>1;
+ (void)ES_rate;
+ pos += 3;
+ }
+ if (DSM_trick_mode_flag == 1) { // ignore
+ int trick_mode_control = (p[pos]&0xE0)>>5;//trick_mode_control 3 uimsbf
+ if ( trick_mode_control == 0/*fast_forward*/ ) {
+ // field_id 2 bslbf
+ // intra_slice_refresh 1 bslbf
+ // frequency_truncation 2 bslbf
+ }
+ else if ( trick_mode_control == 1/*slow_motion*/ ) {
+ //rep_cntrl 5 uimsbf
+ }
+ else if ( trick_mode_control == 2/*freeze_frame*/ ) {
+ // field_id 2 uimsbf
+ // reserved 3 bslbf
+ }
+ else if ( trick_mode_control == 3/*fast_reverse*/ ) {
+ // field_id 2 bslbf
+ // intra_slice_refresh 1 bslbf
+ // frequency_truncation 2 bslbf
+ }else if ( trick_mode_control == 4/*slow_reverse*/ ) {
+ // rep_cntrl 5 uimsbf
+ }
+ else{
+ //reserved 5 bslbf
+ }
+ pos++;
+ }
+ if ( additional_copy_info_flag == 1) { // ignore
+ // marker_bit 1 bslbf
+ // additional_copy_info 7 bslbf
+ pos++;
+ }
+ if ( PES_CRC_flag == 1) { // ignore
+ // previous_PES_packet_CRC 16 bslbf
+ pos += 2;
+ }
+ if ( PES_extension_flag == 1) { // ignore
+ int PES_private_data_flag = (p[pos]&0x80)>>7;// PES_private_data_flag 1 bslbf
+ int pack_header_field_flag = (p[pos]&0x40)>>6;// pack_header_field_flag 1 bslbf
+ int program_packet_sequence_counter_flag = (p[pos]&0x20)>>5;// program_packet_sequence_counter_flag 1 bslbf
+ int P_STD_buffer_flag = (p[pos]&0x10)>>4; // P-STD_buffer_flag 1 bslbf
+ // reserved 3 bslbf
+ int PES_extension_flag_2 = (p[pos]&0x01);// PES_extension_flag_2 1 bslbf
+ pos++;
+
+ if ( PES_private_data_flag == 1) {
+ // PES_private_data 128 bslbf
+ pos += 16;
+ }
+ if (pack_header_field_flag == 1) {
+ // pack_field_length 8 uimsbf
+ // pack_header()
+ }
+ if (program_packet_sequence_counter_flag == 1) {
+ // marker_bit 1 bslbf
+ // program_packet_sequence_counter 7 uimsbf
+ // marker_bit 1 bslbf
+ // MPEG1_MPEG2_identifier 1 bslbf
+ // original_stuff_length 6 uimsbf
+ pos += 2;
+ }
+ if ( P_STD_buffer_flag == 1) {
+ // '01' 2 bslbf
+ // P-STD_buffer_scale 1 bslbf
+ // P-STD_buffer_size 13 uimsbf
+ pos += 2;
+ }
+ if ( PES_extension_flag_2 == 1) {
+ // marker_bit 1 bslbf
+ int PES_extension_field_length = (p[pos]&0x7F);// PES_extension_field_length 7 uimsbf
+ pos++;
+ for (int i = 0; i < PES_extension_field_length; i++) {
+ // reserved 8 bslbf
+ pos++;
+ }
+ }
+ }
+
+// for (int i = 0; i < N1; i++) {
+ //stuffing_byte 8 bslbf
+// rpos++;
+// }
+// for (int i = 0; i < N2; i++) {
+ //PES_packet_data_byte 8 bslbf
+// rpos++;
+// }
+ *ret_pp = p+pos;
+ ret_size = 188-(npos+pos);
+ //printf("pes parse body size:%lu, data:0x%02x 0x%02x 0x%02x 0x%02x 0x%02x 0x%02x, dts:%lu(%lu), pts:%lu(%lu)\r\n",
+ // ret_size, p[pos], p[pos+1], p[pos+2], p[pos+3], p[pos+4], p[pos+5],
+ // dts, dts/90, pts, pts/90);
+ }
+ else if ( stream_id == 188//program_stream_map 1011 1100 BC
+ || stream_id == 191//private_stream_2 1011 1111 BF
+ || stream_id == 240//ECM 1111 0000 F0
+ || stream_id == 241//EMM 1111 0001 F1
+ || stream_id == 255//program_stream_directory 1111 1111 FF
+ || stream_id == 242//DSMCC_stream 1111 0010 F2
+ || stream_id == 248//ITU-T Rec. H.222.1 type E stream 1111 1000 F8
+ ) {
+// for (i = 0; i < PES_packet_length; i++) {
+ //PES_packet_data_byte 8 bslbf
+// rpos++;
+// }
+ *ret_pp = p+pos;
+ ret_size = 188-(npos+pos);
+ //fwrite(p, 1, 188-(npos+rpos), fd);
+ }
+ else if ( stream_id == 190//padding_stream 1011 1110
+ ) {
+// for (i = 0; i < PES_packet_length; i++) {
+ // padding_byte 8 bslbf
+// rpos++;
+ *ret_pp = p+pos;
+ ret_size = 188-(npos+pos);
+// }
+ }
+
+ return pos;
+}
\ No newline at end of file
diff --git a/trunk/src/srt/ts_demux.hpp b/trunk/src/srt/ts_demux.hpp
new file mode 100644
index 0000000000..4d7c9fd5e6
--- /dev/null
+++ b/trunk/src/srt/ts_demux.hpp
@@ -0,0 +1,237 @@
+#ifndef TS_DEMUX_H
+#define TS_DEMUX_H
+#include "srt_data.hpp"
+#include
+#include
+#include
+#include
+
+/* mpegts stream type in ts pmt
+Value Description
+0x00 ITU-T | ISO/IEC Reserved
+0x01 ISO/IEC 11172-2 Video (mpeg video v1)
+0x02 ITU-T Rec. H.262 | ISO/IEC 13818-2 Video(mpeg video v2)or ISO/IEC 11172-2 constrained parameter video stream
+0x03 ISO/IEC 11172-3 Audio (MPEG 1 Audio codec Layer I, Layer II and Layer III audio specifications)
+0x04 ISO/IEC 13818-3 Audio (BC Audio Codec)
+0x05 ITU-T Rec. H.222.0 | ISO/IEC 13818-1 private_sections
+0x06 ITU-T Rec. H.222.0 | ISO/IEC 13818-1 PES packets containing private data
+0x07 ISO/IEC 13522 MHEG
+0x08 ITU-T Rec. H.222.0 | ISO/IEC 13818-1 Annex A DSM-CC
+0x09 ITU-T Rec. H.222.1
+0x0A ISO/IEC 13818-6 type A
+0x0B ISO/IEC 13818-6 type B
+0x0C ISO/IEC 13818-6 type C
+0x0D ISO/IEC 13818-6 type D
+0x0E ITU-T Rec. H.222.0 | ISO/IEC 13818-1 auxiliary
+0x0F ISO/IEC 13818-7 Audio with ADTS transport syntax
+0x10 ISO/IEC 14496-2 Visual
+0x11 ISO/IEC 14496-3 Audio with the LATM transport syntax as defined in ISO/IEC 14496-3/Amd.1
+0x12 ISO/IEC 14496-1 SL-packetized stream or FlexMux stream carried in PES packets
+0x13 ISO/IEC 14496-1 SL-packetized stream or FlexMux stream carried in ISO/IEC 14496_sections
+0x14 ISO/IEC 13818-6 Synchronized Download Protocol
+0x15 Metadata carried in PES packets
+0x16 Metadata carried in metadata_sections
+0x17 Metadata carried in ISO/IEC 13818-6 Data Carousel
+0x18 Metadata carried in ISO/IEC 13818-6 Object Carousel
+0x19 Metadata carried in ISO/IEC 13818-6 Synchronized Download Protocol
+0x1A IPMP stream (defined in ISO/IEC 13818-11, MPEG-2 IPMP)
+0x1B AVC video stream as defined in ITU-T Rec. H.264 | ISO/IEC 14496-10 Video (h.264)
+0x1C ISO/IEC 14496-3 Audio, without using any additional transport syntax, such as DST, ALS and SLS
+0x1D ISO/IEC 14496-17 Text
+0x1E Auxiliary video stream as defined in ISO/IEC 23002-3 (AVS)
+0x1F-0x7E ITU-T Rec. H.222.0 | ISO/IEC 13818-1 Reserved
+0x7F IPMP stream 0x80-0xFF User Private
+*/
+#define STREAM_TYPE_VIDEO_MPEG1 0x01
+#define STREAM_TYPE_VIDEO_MPEG2 0x02
+#define STREAM_TYPE_AUDIO_MPEG1 0x03
+#define STREAM_TYPE_AUDIO_MPEG2 0x04
+#define STREAM_TYPE_PRIVATE_SECTION 0x05
+#define STREAM_TYPE_PRIVATE_DATA 0x06
+#define STREAM_TYPE_AUDIO_AAC 0x0f
+#define STREAM_TYPE_AUDIO_AAC_LATM 0x11
+#define STREAM_TYPE_VIDEO_MPEG4 0x10
+#define STREAM_TYPE_METADATA 0x15
+#define STREAM_TYPE_VIDEO_H264 0x1b
+#define STREAM_TYPE_VIDEO_HEVC 0x24
+#define STREAM_TYPE_VIDEO_CAVS 0x42
+#define STREAM_TYPE_VIDEO_VC1 0xea
+#define STREAM_TYPE_VIDEO_DIRAC 0xd1
+
+#define STREAM_TYPE_AUDIO_AC3 0x81
+#define STREAM_TYPE_AUDIO_DTS 0x82
+#define STREAM_TYPE_AUDIO_TRUEHD 0x83
+#define STREAM_TYPE_AUDIO_EAC3 0x87
+
+class ts_media_data_callback_I {
+public:
+ virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type, uint64_t dts, uint64_t pts) = 0;
+};
+
+typedef std::shared_ptr TS_DATA_CALLBACK_PTR;
+
+class adaptation_field {
+public:
+ adaptation_field(){};
+ ~adaptation_field(){};
+
+public:
+ unsigned char _adaptation_field_length;
+
+ unsigned char _discontinuity_indicator:1;
+ unsigned char _random_access_indicator:1;
+ unsigned char _elementary_stream_priority_indicator:1;
+ unsigned char _PCR_flag:1;
+ unsigned char _OPCR_flag:1;
+ unsigned char _splicing_point_flag:1;
+ unsigned char _transport_private_data_flag:1;
+ unsigned char _adaptation_field_extension_flag:1;
+
+ //if(PCR_flag == '1')
+ unsigned long _program_clock_reference_base;//33 bits
+ unsigned short _program_clock_reference_extension;//9bits
+ //if (OPCR_flag == '1')
+ unsigned long _original_program_clock_reference_base;//33 bits
+ unsigned short _original_program_clock_reference_extension;//9bits
+ //if (splicing_point_flag == '1')
+ unsigned char _splice_countdown;
+ //if (transport_private_data_flag == '1')
+ unsigned char _transport_private_data_length;
+ unsigned char _private_data_byte[256];
+ //if (adaptation_field_extension_flag == '1')
+ unsigned char _adaptation_field_extension_length;
+ unsigned char _ltw_flag;
+ unsigned char _piecewise_rate_flag;
+ unsigned char _seamless_splice_flag;
+ unsigned char _reserved0;
+ //if (ltw_flag == '1')
+ unsigned short _ltw_valid_flag:1;
+ unsigned short _ltw_offset:15;
+ //if (piecewise_rate_flag == '1')
+ unsigned int _piecewise_rate;//22bits
+ //if (seamless_splice_flag == '1')
+ unsigned char _splice_type;//4bits
+ unsigned char _DTS_next_AU1;//3bits
+ unsigned char _marker_bit1;//1bit
+ unsigned short _DTS_next_AU2;//15bit
+ unsigned char _marker_bit2;//1bit
+ unsigned short _DTS_next_AU3;//15bit
+};
+
+class ts_header {
+public:
+ ts_header(){}
+ ~ts_header(){}
+
+public:
+ unsigned char _sync_byte;
+
+ unsigned short _transport_error_indicator:1;
+ unsigned short _payload_unit_start_indicator:1;
+ unsigned short _transport_priority:1;
+ unsigned short _PID:13;
+
+ unsigned char _transport_scrambling_control:2;
+ unsigned char _adaptation_field_control:2;
+ unsigned char _continuity_counter:4;
+
+ adaptation_field _adaptation_field_info;
+};
+
+typedef struct {
+ unsigned short _program_number;
+ unsigned short _pid;
+ unsigned short _network_id;
+} PID_INFO;
+
+class pat_info {
+public:
+ pat_info(){};
+ ~pat_info(){};
+
+public:
+ unsigned char _table_id;
+
+ unsigned short _section_syntax_indicator:1;
+ unsigned short _reserved0:1;
+ unsigned short _reserved1:2;
+ unsigned short _section_length:12;
+
+ unsigned short _transport_stream_id;
+
+ unsigned char _reserved3:2;
+ unsigned char _version_number:5;
+ unsigned char _current_next_indicator:1;
+
+ unsigned char _section_number;
+ unsigned char _last_section_number;
+ std::vector _pid_vec;
+};
+
+typedef struct {
+ unsigned char _stream_type;
+ unsigned short _reserved1:3;
+ unsigned short _elementary_PID:13;
+ unsigned short _reserved:4;
+ unsigned short _ES_info_length;
+ unsigned char _dscr[4096];
+ unsigned int _crc_32;
+} STREAM_PID_INFO;
+
+class pmt_info {
+public:
+ pmt_info(){};
+ ~pmt_info(){};
+public:
+ unsigned char _table_id;
+ unsigned short _section_syntax_indicator:1;
+ unsigned short _reserved1:1;
+ unsigned short _reserved2:2;
+ unsigned short _section_length:12;
+ unsigned short _program_number:16;
+ unsigned char _reserved:2;
+ unsigned char _version_number:5;
+ unsigned char _current_next_indicator:5;
+ unsigned char _section_number;
+ unsigned char _last_section_number;
+ unsigned short _reserved3:3;
+ unsigned short _PCR_PID:13;
+ unsigned short _reserved4:4;
+ unsigned short _program_info_length:12;
+ unsigned char _dscr[4096];
+
+ std::unordered_map _pid2steamtype;
+ std::vector _stream_pid_vec;
+};
+
+class ts_demux {
+public:
+ ts_demux();
+ ~ts_demux();
+
+ int decode(SRT_DATA_MSG_PTR data_ptr, TS_DATA_CALLBACK_PTR callback);
+
+private:
+ int decode_unit(unsigned char* data_p, std::string key_path, TS_DATA_CALLBACK_PTR callback);
+ bool is_pmt(unsigned short pmt_id);
+ int pes_parse(unsigned char* p, size_t npos, unsigned char** ret_pp, size_t& ret_size,
+ uint64_t& dts, uint64_t& pts);
+ void insert_into_databuf(unsigned char* data_p, size_t data_size, std::string key_path, unsigned short pid);
+ void on_callback(TS_DATA_CALLBACK_PTR callback, unsigned short pid,
+ std::string key_path, uint64_t dts, uint64_t pts);
+
+private:
+ std::string _key_path;//only for srt
+
+ pat_info _pat;
+ pmt_info _pmt;
+ std::vector _data_buffer_vec;
+ size_t _data_total;
+ unsigned short _last_pid;
+ uint64_t _last_dts;
+ uint64_t _last_pts;
+};
+
+typedef std::shared_ptr TS_DEMUX_PTR;
+
+#endif
\ No newline at end of file
diff --git a/trunk/src/srt/ts_demux_test.cpp b/trunk/src/srt/ts_demux_test.cpp
new file mode 100644
index 0000000000..e68d06218b
--- /dev/null
+++ b/trunk/src/srt/ts_demux_test.cpp
@@ -0,0 +1,55 @@
+#include "ts_demux.hpp"
+#include
+#include
+
+#define TS_MAX 188
+
+class media_data_get : public ts_media_data_callback_I {
+public:
+ media_data_get() {};
+ virtual ~media_data_get() {};
+
+public:
+ virtual void on_data_callback(SRT_DATA_MSG_PTR data_ptr, unsigned int media_type
+ , uint64_t dts, uint64_t pts) {
+ printf("media type:%d, data len:%d, key_path:%s, dts:%lu(%lu), pts:%lu(%lu)\r\n",
+ media_type, data_ptr->data_len(), data_ptr->get_path().c_str(), dts, dts/90, pts, pts/90);
+ FILE* file_p;
+ char filename[80];
+
+ sprintf(filename, "%u.media", media_type);
+ file_p = fopen(filename, "ab+");
+ if (file_p) {
+ fwrite(data_ptr->get_data(), data_ptr->data_len(), 1, file_p);
+ fclose(file_p);
+ }
+ return;
+ }
+};
+
+int main(int argn, char** argv) {
+ unsigned char data[TS_MAX];
+ ts_demux demux_obj;
+ auto callback_ptr = std::make_shared();
+ FILE* file_p;
+ if (argn < 2) {
+ printf("please input ts name.\r\n");
+ return 0;
+ }
+
+ const char* file_name = argv[1];
+ printf("input ts name:%s.\r\n", file_name);
+
+ file_p = fopen(file_name, "r");
+ fseek(file_p, 0L, SEEK_END); /* 定位到文件末尾 */
+ size_t flen = ftell(file_p); /* 得到文件大小 */
+ fseek(file_p, 0L, SEEK_SET); /* 定位到文件开头 */
+
+ do {
+ fread(data, TS_MAX, 1, file_p);
+ auto input_ptr = std::make_shared((unsigned char*)data, (unsigned int)TS_MAX, std::string("live/shiwei"));
+ demux_obj.decode(input_ptr, callback_ptr);
+ flen -= TS_MAX;
+ } while(flen > 0);
+ return 1;
+}
\ No newline at end of file