diff --git a/.github/workflows/abi.yml b/.github/workflows/abi.yml new file mode 100644 index 000000000..2c05cc06b --- /dev/null +++ b/.github/workflows/abi.yml @@ -0,0 +1,61 @@ +name: ABI checks + +on: + push: + branches: [ master ] + pull_request: + branches: [ master ] + +env: + SRT_BASE: v1.5.0 + +jobs: + build: + name: ABI checks + runs-on: ubuntu-20.04 + + steps: + - uses: actions/checkout@v3 + with: + path: pull_request + - name: configure + run: | + cd pull_request + mkdir _build && cd _build + cmake -DCMAKE_BUILD_TYPE=Debug -DENABLE_UNITTESTS=ON ../ + - name: build + run: | + sudo apt install -y abi-dumper + cd pull_request/_build && cmake --build ./ + make install DESTDIR=./installdir + SRT_TAG_VERSION=$(cat version.h |grep SRT_VERSION_MINOR |head -n1 |awk {'print $3'}) + abi-dumper libsrt.so -o libsrt-pr.dump -public-headers installdir/usr/local/include/srt/ -lver 0 + SRT_BASE="v1.$SRT_TAG_VERSION.0" + echo "SRT_BASE=$SRT_BASE" >> "$GITHUB_ENV" + - uses: actions/checkout@v3 + with: + path: tag + ref: ${{ env.SRT_BASE }} + - name: configure_tag + run: | + echo $SRT_TAG_VERSION + cd tag + mkdir _build && cd _build + cmake -DCMAKE_BUILD_TYPE=Debug -DENABLE_UNITTESTS=ON ../ + - name: build_tag + run: | + cd tag + cd _build && cmake --build ./ + make install DESTDIR=./installdir + abi-dumper libsrt.so -o libsrt-tag.dump -public-headers installdir/usr/local/include/srt/ -lver 1 + - name: abi-check + run: | + git clone https://github.com/lvc/abi-compliance-checker.git + cd abi-compliance-checker && sudo make install && cd ../ + abi-compliance-checker -l libsrt -old tag/_build/libsrt-tag.dump -new pull_request/_build/libsrt-pr.dump + RES=$? + if (( $RES != 0 )) + then + echo "ABI/API Compatibility check failed with value $?" + exit $RES + fi diff --git a/CMakeLists.txt b/CMakeLists.txt index c5994d3b7..47d9b88df 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -38,7 +38,8 @@ string(TOLOWER ${CMAKE_SYSTEM_NAME} SYSNAME_LC) set_if(DARWIN (${CMAKE_SYSTEM_NAME} MATCHES "Darwin") OR (${CMAKE_SYSTEM_NAME} MATCHES "iOS") OR (${CMAKE_SYSTEM_NAME} MATCHES "tvOS") - OR (${CMAKE_SYSTEM_NAME} MATCHES "watchOS")) + OR (${CMAKE_SYSTEM_NAME} MATCHES "watchOS") + OR (${CMAKE_SYSTEM_NAME} MATCHES "visionOS")) set_if(LINUX ${CMAKE_SYSTEM_NAME} MATCHES "Linux") set_if(BSD ${SYSNAME_LC} MATCHES "bsd$") set_if(MICROSOFT WIN32 AND (NOT MINGW AND NOT CYGWIN)) @@ -237,12 +238,12 @@ if (NOT USE_ENCLIB) message("NOTE: USE_GNUTLS is deprecated. Use -DUSE_ENCLIB=gnutls instead.") set (USE_ENCLIB gnutls) else() - set (USE_ENCLIB openssl) + set (USE_ENCLIB openssl-evp) endif() endif() set(USE_ENCLIB "${USE_ENCLIB}" CACHE STRING "The crypto library that SRT uses") -set_property(CACHE USE_ENCLIB PROPERTY STRINGS "openssl" "gnutls" "mbedtls" "botan") +set_property(CACHE USE_ENCLIB PROPERTY STRINGS "openssl" "openssl-evp" "gnutls" "mbedtls" "botan") # Make sure DLLs and executabes go to the same path regardles of subdirectory set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}) diff --git a/docs/API/API-socket-options.md b/docs/API/API-socket-options.md index 84e361f6d..a06f8556d 100644 --- a/docs/API/API-socket-options.md +++ b/docs/API/API-socket-options.md @@ -58,7 +58,7 @@ Exchange for the initial key is done in the handshake. - `SRT_KM_S_SECURED` (`2`): KM exchange was successful and the data will be sent encrypted and will be decrypted by the receiver. This state is only possible on -both sides in both directions simultaneously. +both sides in both directions simultaneously. Any unencrypted packet will be dropped by the receiver. - `SRT_KM_S_NOSECRET` (`3`): If this state is in the sending direction (`SRTO_SNDKMSTATE`), then it means that the sending party has set a passphrase, but the peer did not. diff --git a/docs/API/statistics.md b/docs/API/statistics.md index 60bba59ee..bc34ecca8 100644 --- a/docs/API/statistics.md +++ b/docs/API/statistics.md @@ -245,6 +245,8 @@ Packets may be dropped conditionally when both `SRTO_TSBPDMODE` and `SRTO_TLPKTD #### pktRcvUndecryptTotal The total number of packets that failed to be decrypted at the receiver side. Available for receiver. +The statistic also counts unencrypted packets that were expected to be uncrypted on a secured connection (see [SRTO_KM_S_SECURED](API-socket-options.md#srt_km_state)) +and hence dropped as not encrypted (undecrypted). #### pktSndFilterExtraTotal @@ -822,4 +824,4 @@ The ratio of unrecovered by the socket group packets `Dropped Packets Ratio` can ``` Dropped Packets Ratio = pktRcvDropTotal / pktSentUniqueTotal; in case both sender and receiver statistics is available Dropped Packets Ratio = pktRcvDropTotal / (pktRecvUniqueTotal + pktRcvDropTotal); in case receiver only statistics is available -``` \ No newline at end of file +``` diff --git a/docs/build/build-options.md b/docs/build/build-options.md index 88fcb85bb..529bd5cd7 100644 --- a/docs/build/build-options.md +++ b/docs/build/build-options.md @@ -597,8 +597,8 @@ remember that: Encryption library to be used. Possible options for ``: -* openssl (default) -* openssl-evp (OpenSSL EVP API, since 1.5.1) +* openssl-evp (default) +* openssl * gnutls (with nettle) * mbedtls * botan diff --git a/haicrypt/hcrypt.c b/haicrypt/hcrypt.c index 2568654b1..dc3f06801 100644 --- a/haicrypt/hcrypt.c +++ b/haicrypt/hcrypt.c @@ -320,6 +320,7 @@ int HaiCrypt_Clone(HaiCrypt_Handle hhcSrc, HaiCrypt_CryptoDir tx, HaiCrypt_Handl cryptoClone->ctx_pair[1].flags &= ~HCRYPT_CTX_F_ENCRYPT; memset(cryptoClone->ctx_pair[0].salt, 0, sizeof(cryptoClone->ctx_pair[0].salt)); cryptoClone->ctx_pair[0].salt_len = 0; + cryptoClone->ctx = &cryptoClone->ctx_pair[0]; } *phhc = (void *)cryptoClone; diff --git a/scripts/visionOS.cmake b/scripts/visionOS.cmake new file mode 100644 index 000000000..0f0f60b69 --- /dev/null +++ b/scripts/visionOS.cmake @@ -0,0 +1,171 @@ +# This file is based off of the Platform/Darwin.cmake and Platform/UnixPaths.cmake +# files which are included with CMake 2.8.4 +# It has been altered for VISIONOS development + +# Options: +# +# VISION_PLATFORM = OS (default) or SIMULATOR or SIMULATOR64 +# This decides if SDKS will be selected from the XROS.platform or XRSimulator.platform folders +# OS - the default, used to build for Vision Pro physical device, which have an arm arch. +# SIMULATOR - used to build for the Simulator platforms, which have an x86 arch. +# +# VISIONOS_ARCH = arm64 (default for OS), x86_64 (addiitonal support for SIMULATOR64) +# +# CMAKE_VISIONOS_DEVELOPER_ROOT = automatic(default) or /path/to/platform/Developer folder +# By default this location is automatcially chosen based on the VISIONOS_PLATFORM value above. +# If set manually, it will override the default location and force the user of a particular Developer Platform +# +# CMAKE_VISIONOS_SDK_ROOT = automatic(default) or /path/to/platform/Developer/SDKs/SDK folder +# By default this location is automatcially chosen based on the CMAKE_VISIONOS_DEVELOPER_ROOT value. +# In this case it will always be the most up-to-date SDK found in the CMAKE_VISIONOS_DEVELOPER_ROOT path. +# If set manually, this will force the use of a specific SDK version +# + +# Standard settings +set (CMAKE_SYSTEM_NAME Darwin) +set (CMAKE_SYSTEM_VERSION 1) +set (UNIX True) +set (APPLE True) +set (VISIONOS True) + +# Required as of cmake 2.8.10 +set (CMAKE_OSX_DEPLOYMENT_TARGET "" CACHE STRING "Force unset of the deployment target for visionOs" FORCE) + +# Determine the cmake host system version so we know where to find the visionOS SDKs +find_program (CMAKE_UNAME uname /bin /usr/bin /usr/local/bin) +if (CMAKE_UNAME) + execute_process(COMMAND uname -r + OUTPUT_VARIABLE CMAKE_HOST_SYSTEM_VERSION) + string (REGEX REPLACE "^([0-9]+)\\.([0-9]+).*$" "\\1" DARWIN_MAJOR_VERSION "${CMAKE_HOST_SYSTEM_VERSION}") +endif (CMAKE_UNAME) + + +set(CMAKE_TRY_COMPILE_TARGET_TYPE STATIC_LIBRARY) +set(CMAKE_AR ar CACHE FILEPATH "" FORCE) + +set (CMAKE_C_OSX_COMPATIBILITY_VERSION_FLAG "-compatibility_version ") +set (CMAKE_C_OSX_CURRENT_VERSION_FLAG "-current_version ") +set (CMAKE_CXX_OSX_COMPATIBILITY_VERSION_FLAG "${CMAKE_C_OSX_COMPATIBILITY_VERSION_FLAG}") +set (CMAKE_CXX_OSX_CURRENT_VERSION_FLAG "${CMAKE_C_OSX_CURRENT_VERSION_FLAG}") + +if (CMAKE_BUILD_TYPE STREQUAL "Debug" OR ENABLE_DEBUG) + set(VISIONOS_DEBUG_OPTIONS "-glldb -gmodules") +else() + set(VISIONOS_DEBUG_OPTIONS "-fvisibility=hidden -fvisibility-inlines-hidden") +endif() + +set (CMAKE_C_FLAGS_INIT "${VISIONOS_DEBUG_OPTIONS} ${EMBED_OPTIONS}") +set (CMAKE_CXX_FLAGS_INIT "${VISIONOS_DEBUG_OPTIONS} ${EMBED_OPTIONS}") + +set (CMAKE_C_LINK_FLAGS "-Wl,-search_paths_first ${EMBED_OPTIONS} ${CMAKE_C_LINK_FLAGS}") +set (CMAKE_CXX_LINK_FLAGS "-Wl,-search_paths_first ${EMBED_OPTIONS} ${CMAKE_CXX_LINK_FLAGS}") + + +set (CMAKE_PLATFORM_HAS_INSTALLNAME 1) +set (CMAKE_SHARED_LIBRARY_CREATE_C_FLAGS "-dynamiclib") +set (CMAKE_SHARED_MODULE_CREATE_C_FLAGS "-bundle") +set (CMAKE_SHARED_MODULE_LOADER_C_FLAG "-Wl,-bundle_loader,") +set (CMAKE_SHARED_MODULE_LOADER_CXX_FLAG "-Wl,-bundle_loader,") +set (CMAKE_FIND_LIBRARY_SUFFIXES ".dylib" ".so" ".a") + +# Specify install_name_tool and pkg-config since it outside of SDK path and therefore can't be found by CMake +if (NOT DEFINED CMAKE_INSTALL_NAME_TOOL) + find_program(CMAKE_INSTALL_NAME_TOOL install_name_tool) +endif (NOT DEFINED CMAKE_INSTALL_NAME_TOOL) + +if (NOT DEFINED PKG_CONFIG_EXECUTABLE) + find_program(PKG_CONFIG_EXECUTABLE NAMES pkg-config) + if (DEFINED PKG_CONFIG_EXECUTABLE) + execute_process(COMMAND pkg-config --version OUTPUT_VARIABLE PKG_CONFIG_VERSION_STRING) + endif(DEFINED PKG_CONFIG_EXECUTABLE) +endif(NOT DEFINED PKG_CONFIG_EXECUTABLE) + + +# fffio Specify path to install shared library on device +set (CMAKE_INSTALL_NAME_DIR "@executable_path/Frameworks") +set (CMAKE_BUILD_WITH_INSTALL_NAME_DIR TRUE) + +# Setup visionOS platform unless specified manually with VISIONOS_PLATFORM +if (NOT DEFINED VISIONOS_PLATFORM) + set (VISIONOS_PLATFORM "OS") +endif (NOT DEFINED VISIONOS_PLATFORM) +set (VISIONOS_PLATFORM ${VISIONOS_PLATFORM} CACHE STRING "Type of visionOS Platform") + +# Check the platform selection and setup for developer root +if (${VISIONOS_PLATFORM} STREQUAL OS) + set (VISIONOS_PLATFORM_LOCATION "XROS.platform") + + # This causes the installers to properly locate the output libraries + set (CMAKE_XCODE_EFFECTIVE_PLATFORMS "-xros") +elseif (${VISIONOS_PLATFORM} STREQUAL SIMULATOR) + set (SIMULATOR true) + set (VISIONOS_PLATFORM_LOCATION "XRSimulator.platform") + + # This causes the installers to properly locate the output libraries + set (CMAKE_XCODE_EFFECTIVE_PLATFORMS "-xrsimulator") +elseif (${VISIONOS_PLATFORM} STREQUAL SIMULATOR64) + set (SIMULATOR true) + set (VISIONOS_PLATFORM_LOCATION "XRSimulator.platform") + + # This causes the installers to properly locate the output libraries + set (CMAKE_XCODE_EFFECTIVE_PLATFORMS "-xrsimulator") +else (${VISIONOS_PLATFORM} STREQUAL OS) + message (FATAL_ERROR "Unsupported VISIONOS_PLATFORM value selected. Please choose OS or SIMULATOR") +endif (${VISIONOS_PLATFORM} STREQUAL OS) + +# Setup visionOS developer location unless specified manually with CMAKE_VISIONOS_DEVELOPER_ROOT +if (NOT DEFINED CMAKE_VISIONOS_DEVELOPER_ROOT) + execute_process(COMMAND /usr/bin/xcode-select -print-path + OUTPUT_VARIABLE CMAKE_XCODE_DEVELOPER_DIR) + string(STRIP "${CMAKE_XCODE_DEVELOPER_DIR}" CMAKE_XCODE_DEVELOPER_DIR) # FIXED: remove new line character, otherwise it complain no visionOS SDK's found in default search path + set (CMAKE_VISIONOS_DEVELOPER_ROOT "${CMAKE_XCODE_DEVELOPER_DIR}/Platforms/${VISIONOS_PLATFORM_LOCATION}/Developer") +endif (NOT DEFINED CMAKE_VISIONOS_DEVELOPER_ROOT) +set (CMAKE_VISIONOS_DEVELOPER_ROOT ${CMAKE_VISIONOS_DEVELOPER_ROOT} CACHE PATH "Location of visionOS Platform") + +# Find and use the most recent visionOS sdk unless specified manually with CMAKE_VISIONOS_SDK_ROOT +if (NOT DEFINED CMAKE_VISIONOS_SDK_ROOT) + file (GLOB _CMAKE_VISIONOS_SDKS "${CMAKE_VISIONOS_DEVELOPER_ROOT}/SDKs/*") + if (_CMAKE_VISIONOS_SDKS) + list (SORT _CMAKE_VISIONOS_SDKS) + list (REVERSE _CMAKE_VISIONOS_SDKS) + list (GET _CMAKE_VISIONOS_SDKS 0 CMAKE_VISIONOS_SDK_ROOT) + else (_CMAKE_VISIONOS_SDKS) + message (FATAL_ERROR "No visionOS SDK's found in default search path ${CMAKE_VISIONOS_DEVELOPER_ROOT}. Manually set CMAKE_VISIONOS_SDK_ROOT or install the visionOS SDK.") + endif (_CMAKE_VISIONOS_SDKS) + message (STATUS "Toolchain using default visionOS SDK: ${CMAKE_VISIONOS_SDK_ROOT}") +endif (NOT DEFINED CMAKE_VISIONOS_SDK_ROOT) +set (CMAKE_VISIONOS_SDK_ROOT ${CMAKE_VISIONOS_SDK_ROOT} CACHE PATH "Location of the selected visionOS SDK") + +# Set the sysroot default to the most recent SDK +set (CMAKE_OSX_SYSROOT ${CMAKE_VISIONOS_SDK_ROOT} CACHE PATH "Sysroot used for visionOS support") + +# set the architecture for visionOS +if (NOT DEFINED VISIONOS_ARCH) + if (${VISIONOS_PLATFORM} STREQUAL OS) + set (VISIONOS_ARCH arm64) + elseif (${VISIONOS_PLATFORM} STREQUAL SIMULATOR) + set (VISIONOS_ARCH arm64) + elseif (${VISIONOS_PLATFORM} STREQUAL SIMULATOR64) + set (VISIONOS_ARCH x86_64) + endif (${VISIONOS_PLATFORM} STREQUAL OS) +endif(NOT DEFINED VISIONOS_ARCH) +set (CMAKE_OSX_ARCHITECTURES ${VISIONOS_ARCH} CACHE STRING "Build architecture for visionOS") + +# Set the find root to the visionOS developer roots and to user defined paths +set (CMAKE_FIND_ROOT_PATH ${CMAKE_VISIONOS_DEVELOPER_ROOT} ${CMAKE_VISIONOS_SDK_ROOT} ${CMAKE_PREFIX_PATH} CACHE STRING "visionOS find search path root") + +# default to searching for frameworks first +set (CMAKE_FIND_FRAMEWORK FIRST) + +# set up the default search directories for frameworks +set (CMAKE_SYSTEM_FRAMEWORK_PATH + ${CMAKE_VISIONOS_SDK_ROOT}/System/Library/Frameworks + ${CMAKE_VISIONOS_SDK_ROOT}/System/Library/PrivateFrameworks + ${CMAKE_VISIONOS_SDK_ROOT}/Developer/Library/Frameworks +) + +# only search the visionOS sdks, not the remainder of the host filesystem (except for programs, so that we can still find Python if needed) +set (CMAKE_FIND_ROOT_PATH_MODE_PROGRAM BOTH) +set (CMAKE_FIND_ROOT_PATH_MODE_LIBRARY ONLY) +set (CMAKE_FIND_ROOT_PATH_MODE_INCLUDE ONLY) + diff --git a/srtcore/api.cpp b/srtcore/api.cpp index 7d9274d3f..31cbee673 100644 --- a/srtcore/api.cpp +++ b/srtcore/api.cpp @@ -656,6 +656,9 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen, HLOGC(cnlog.Debug, log << "newConnection: mapping peer " << ns->m_PeerID << " to that socket (" << ns->m_SocketID << ")"); m_PeerRec[ns->getPeerSpec()].insert(ns->m_SocketID); + + LOGC(cnlog.Note, log << "@" << ns->m_SocketID << " connection on listener @" << listen + << " (" << ns->m_SelfAddr.str() << ") from peer @" << ns->m_PeerID << " (" << peer.str() << ")"); } catch (...) { diff --git a/srtcore/api.h b/srtcore/api.h index 963452313..c194b7333 100644 --- a/srtcore/api.h +++ b/srtcore/api.h @@ -467,11 +467,11 @@ class CUDTUnited SocketKeeper(): socket(NULL) {} - // This is intended for API functions to lock the group's existence + // This is intended for API functions to lock the socket's existence // for the lifetime of their call. SocketKeeper(CUDTUnited& glob, SRTSOCKET id, ErrorHandling erh = ERH_RETURN) { socket = glob.locateAcquireSocket(id, erh); } - // This is intended for TSBPD thread that should lock the group's + // This is intended for TSBPD thread that should lock the socket's // existence until it exits. SocketKeeper(CUDTUnited& glob, CUDTSocket* s) { @@ -493,8 +493,6 @@ class CUDTUnited { SRT_ASSERT(socket->isStillBusy() > 0); socket->apiRelease(); - // Only now that the group lock is lifted, can the - // group be now deleted and this pointer potentially dangling } } }; diff --git a/srtcore/buffer_rcv.cpp b/srtcore/buffer_rcv.cpp index fb389e4be..2ec42487d 100644 --- a/srtcore/buffer_rcv.cpp +++ b/srtcore/buffer_rcv.cpp @@ -206,7 +206,7 @@ int CRcvBuffer::insert(CUnit* unit) return 0; } -int CRcvBuffer::dropUpTo(int32_t seqno) +std::pair CRcvBuffer::dropUpTo(int32_t seqno) { IF_RCVBUF_DEBUG(ScopedLog scoped_log); IF_RCVBUF_DEBUG(scoped_log.ss << "CRcvBuffer::dropUpTo: seqno " << seqno << " m_iStartSeqNo " << m_iStartSeqNo); @@ -215,16 +215,23 @@ int CRcvBuffer::dropUpTo(int32_t seqno) if (len <= 0) { IF_RCVBUF_DEBUG(scoped_log.ss << ". Nothing to drop."); - return 0; + return std::make_pair(0, 0); } m_iMaxPosOff -= len; if (m_iMaxPosOff < 0) m_iMaxPosOff = 0; - const int iDropCnt = len; + int iNumDropped = 0; // Number of dropped packets that were missing. + int iNumDiscarded = 0; // The number of dropped packets that existed in the buffer. while (len > 0) { + // Note! Dropping a EntryState_Read must not be counted as a drop because it was read. + // Note! Dropping a EntryState_Drop must not be counted as a drop because it was already dropped and counted earlier. + if (m_entries[m_iStartPos].status == EntryState_Avail) + ++iNumDiscarded; + else if (m_entries[m_iStartPos].status == EntryState_Empty) + ++iNumDropped; dropUnitInPos(m_iStartPos); m_entries[m_iStartPos].status = EntryState_Empty; SRT_ASSERT(m_entries[m_iStartPos].pUnit == NULL && m_entries[m_iStartPos].status == EntryState_Empty); @@ -246,7 +253,7 @@ int CRcvBuffer::dropUpTo(int32_t seqno) } if (!m_tsbpd.isEnabled() && m_bMessageAPI) updateFirstReadableOutOfOrder(); - return iDropCnt; + return std::make_pair(iNumDropped, iNumDiscarded); } int CRcvBuffer::dropAll() @@ -255,7 +262,8 @@ int CRcvBuffer::dropAll() return 0; const int end_seqno = CSeqNo::incseq(m_iStartSeqNo, m_iMaxPosOff); - return dropUpTo(end_seqno); + const std::pair numDropped = dropUpTo(end_seqno); + return numDropped.first + numDropped.second; } int CRcvBuffer::dropMessage(int32_t seqnolo, int32_t seqnohi, int32_t msgno, DropActionIfExists actionOnExisting) diff --git a/srtcore/buffer_rcv.h b/srtcore/buffer_rcv.h index d664373f5..f783ac2a2 100644 --- a/srtcore/buffer_rcv.h +++ b/srtcore/buffer_rcv.h @@ -66,8 +66,8 @@ class CRcvBuffer /// Drop packets in the receiver buffer from the current position up to the seqno (excluding seqno). /// @param [in] seqno drop units up to this sequence number - /// @return number of dropped packets. - int dropUpTo(int32_t seqno); + /// @return number of dropped (missing) and discarded (available) packets as a pair(dropped, discarded). + std::pair dropUpTo(int32_t seqno); /// @brief Drop all the packets in the receiver buffer. /// The starting position and seqno are shifted right after the last packet in the buffer. @@ -200,6 +200,20 @@ class CRcvBuffer return (m_iMaxPosOff == 0); } + /// Returns the currently used number of cells, including + /// gaps with empty cells, or in other words, the distance + /// between the initial position and the youngest received packet. + size_t size() const + { + return m_iMaxPosOff; + } + + // Returns true if the buffer is full. Requires locking. + bool full() const + { + return size() == capacity(); + } + /// Return buffer capacity. /// One slot had to be empty in order to tell the difference between "empty buffer" and "full buffer". /// E.g. m_iFirstNonreadPos would again point to m_iStartPos if m_szSize entries are added continiously. @@ -333,9 +347,8 @@ class CRcvBuffer EntryStatus status; }; - //static Entry emptyEntry() { return Entry { NULL, EntryState_Empty }; } - - FixedArray m_entries; + typedef FixedArray entries_t; + entries_t m_entries; const size_t m_szSize; // size of the array of units (buffer) CUnitQueue* m_pUnitQueue; // the shared unit queue diff --git a/srtcore/channel.cpp b/srtcore/channel.cpp index 557be8fd7..0a4e1e318 100644 --- a/srtcore/channel.cpp +++ b/srtcore/channel.cpp @@ -142,14 +142,6 @@ srt::CChannel::CChannel() , m_bBindMasked(true) #endif { -#ifdef _WIN32 - SecureZeroMemory((PVOID)&m_SendOverlapped, sizeof(WSAOVERLAPPED)); - m_SendOverlapped.hEvent = WSACreateEvent(); - if (m_SendOverlapped.hEvent == NULL) { - LOGC(kmlog.Error, log << CONID() << "IPE: WSACreateEvent failed with error: " << NET_ERROR); - throw CUDTException(MJ_SETUP, MN_NORES, NET_ERROR); - } -#endif #ifdef SRT_ENABLE_PKTINFO // Do the check for ancillary data buffer size, kinda assertion static const size_t CMSG_MAX_SPACE = sizeof (CMSGNodeIPv4) + sizeof (CMSGNodeIPv6); @@ -165,12 +157,7 @@ srt::CChannel::CChannel() #endif } -srt::CChannel::~CChannel() -{ -#ifdef _WIN32 - WSACloseEvent(m_SendOverlapped.hEvent); -#endif -} +srt::CChannel::~CChannel() {} void srt::CChannel::createSocket(int family) { @@ -748,7 +735,7 @@ int srt::CChannel::sendto(const sockaddr_any& addr, CPacket& packet, const socka #endif // convert control information into network order - packet.toNL(); + packet.toNetworkByteOrder(); #ifndef _WIN32 msghdr mh; @@ -787,38 +774,65 @@ int srt::CChannel::sendto(const sockaddr_any& addr, CPacket& packet, const socka const int res = (int)::sendmsg(m_iSocket, &mh, 0); #else - DWORD size = (DWORD)(CPacket::HDR_SIZE + packet.getLength()); - int addrsize = addr.size(); + class WSAEventRef + { + public: + WSAEventRef() + : e(::WSACreateEvent()) + { + } + ~WSAEventRef() + { + ::WSACloseEvent(e); + e = NULL; + } + void reset() + { + ::WSAResetEvent(e); + } + WSAEVENT Handle() + { + return e; + } - int res = ::WSASendTo(m_iSocket, (LPWSABUF)packet.m_PacketVector, 2, &size, 0, addr.get(), addrsize, &m_SendOverlapped, NULL); + private: + WSAEVENT e; + }; +#if !defined(__MINGW32__) && defined(ENABLE_CXX11) + thread_local WSAEventRef lEvent; +#else + WSAEventRef lEvent; +#endif + WSAOVERLAPPED overlapped; + ::SecureZeroMemory(&overlapped, sizeof(overlapped)); + overlapped.hEvent = lEvent.Handle(); + + DWORD size = (DWORD)(packet.m_PacketVector[0].size() + packet.m_PacketVector[1].size()); + int addrsize = addr.size(); + int res = ::WSASendTo(m_iSocket, (LPWSABUF)packet.m_PacketVector, 2, &size, 0, addr.get(), addrsize, &overlapped, NULL); if (res == SOCKET_ERROR) { if (NET_ERROR == WSA_IO_PENDING) { - DWORD res_wait = WSAWaitForMultipleEvents(1, &m_SendOverlapped.hEvent, TRUE, 100 /*ms*/, FALSE); - if (res_wait == WAIT_FAILED) - { - LOGC(kslog.Warn, log << "CChannel::WSAWaitForMultipleEvents: failed with " << NET_ERROR); - res = -1; - } + DWORD dwFlags = 0; + const bool bCompleted = WSAGetOverlappedResult(m_iSocket, &overlapped, &size, TRUE, &dwFlags); + if (bCompleted) + res = 0; else - { - DWORD dwFlags = 0; - const bool bCompleted = WSAGetOverlappedResult(m_iSocket, &m_SendOverlapped, &size, false, &dwFlags); - res = bCompleted ? 0 : -1; - } + LOGC(kslog.Warn, log << "CChannel::sendto call on ::WSAGetOverlappedResult failed with error: " << NET_ERROR); + lEvent.reset(); } else { LOGC(kmlog.Error, log << CONID() << "WSASendTo failed with error: " << NET_ERROR); } } - WSAResetEvent(m_SendOverlapped.hEvent); + res = (0 == res) ? size : -1; #endif - packet.toHL(); + packet.toHostByteOrder(); return res; } @@ -1067,25 +1081,7 @@ srt::EReadStatus srt::CChannel::recvfrom(sockaddr_any& w_addr, CPacket& w_packet } w_packet.setLength(recv_size - CPacket::HDR_SIZE); - - // convert back into local host order - // XXX use NtoHLA(). - // for (int i = 0; i < 4; ++ i) - // w_packet.m_nHeader[i] = ntohl(w_packet.m_nHeader[i]); - { - uint32_t* p = w_packet.m_nHeader; - for (size_t i = 0; i < SRT_PH_E_SIZE; ++i) - { - *p = ntohl(*p); - ++p; - } - } - - if (w_packet.isControl()) - { - for (size_t j = 0, n = w_packet.getLength() / sizeof(uint32_t); j < n; ++j) - *((uint32_t*)w_packet.m_pcData + j) = ntohl(*((uint32_t*)w_packet.m_pcData + j)); - } + w_packet.toHostByteOrder(); return RST_OK; diff --git a/srtcore/channel.h b/srtcore/channel.h index e09b13fd9..e12310001 100644 --- a/srtcore/channel.h +++ b/srtcore/channel.h @@ -169,9 +169,6 @@ class CChannel private: UDPSOCKET m_iSocket; // socket descriptor -#ifdef _WIN32 - mutable WSAOVERLAPPED m_SendOverlapped; -#endif // Mutable because when querying original settings // this comprises the cache for extracted values, diff --git a/srtcore/core.cpp b/srtcore/core.cpp index 255fcb5c6..28b3c91b0 100644 --- a/srtcore/core.cpp +++ b/srtcore/core.cpp @@ -298,7 +298,7 @@ void srt::CUDT::construct() m_iPeerTsbPdDelay_ms = 0; m_bPeerTsbPd = false; m_bTsbPd = false; - m_bTsbPdAckWakeup = false; + m_bTsbPdNeedsWakeup = false; m_bGroupTsbPd = false; m_bPeerTLPktDrop = false; m_bBufferWasFull = false; @@ -2101,9 +2101,9 @@ int srt::CUDT::processSrtMsg_HSREQ(const uint32_t *srtdata, size_t bytelen, uint return SRT_CMD_NONE; } - LOGC(cnlog.Note, log << "HSREQ/rcv: cmd=" << SRT_CMD_HSREQ << "(HSREQ) len=" << bytelen - << hex << " vers=0x" << srtdata[SRT_HS_VERSION] << " opts=0x" << srtdata[SRT_HS_FLAGS] - << dec << " delay=" << SRT_HS_LATENCY_RCV::unwrap(srtdata[SRT_HS_LATENCY])); + LOGC(cnlog.Debug, log << "HSREQ/rcv: cmd=" << SRT_CMD_HSREQ << "(HSREQ) len=" << bytelen + << hex << " vers=0x" << srtdata[SRT_HS_VERSION] << " opts=0x" << srtdata[SRT_HS_FLAGS] + << dec << " delay=" << SRT_HS_LATENCY_RCV::unwrap(srtdata[SRT_HS_LATENCY])); m_uPeerSrtVersion = srtdata[SRT_HS_VERSION]; m_uPeerSrtFlags = srtdata[SRT_HS_FLAGS]; @@ -4968,8 +4968,9 @@ EConnectStatus srt::CUDT::postConnect(const CPacket* pResponse, bool rendezvous, } */ - - LOGC(cnlog.Note, log << CONID() << "Connection established to: " << m_PeerAddr.str()); + + LOGC(cnlog.Note, log << CONID() << "Connection established from (" + << m_SourceAddr.str() << ") to peer @" << m_PeerID << " (" << m_PeerAddr.str() << ")"); return CONN_ACCEPT; } @@ -5404,7 +5405,7 @@ void * srt::CUDT::tsbpd(void* param) CUniqueSync recvdata_lcc (self->m_RecvLock, self->m_RecvDataCond); CSync tsbpd_cc(self->m_RcvTsbPdCond, recvdata_lcc.locker()); - self->m_bTsbPdAckWakeup = true; + self->m_bTsbPdNeedsWakeup = true; while (!self->m_bClosing) { steady_clock::time_point tsNextDelivery; // Next packet delivery time @@ -5424,6 +5425,21 @@ void * srt::CUDT::tsbpd(void* param) const bool is_time_to_deliver = !is_zero(info.tsbpd_time) && (tnow >= info.tsbpd_time); tsNextDelivery = info.tsbpd_time; +#if ENABLE_HEAVY_LOGGING + if (info.seqno == SRT_SEQNO_NONE) + { + HLOGC(tslog.Debug, log << self->CONID() << "sok/tsbpd: packet check: NO PACKETS"); + } + else + { + HLOGC(tslog.Debug, log << self->CONID() << "sok/tsbpd: packet check: %" + << info.seqno << " T=" << FormatTime(tsNextDelivery) + << " diff-now-playtime=" << FormatDuration(tnow - tsNextDelivery) + << " ready=" << is_time_to_deliver + << " ondrop=" << info.seq_gap); + } +#endif + if (!self->m_bTLPktDrop) { rxready = !info.seq_gap && is_time_to_deliver; @@ -5469,8 +5485,8 @@ void * srt::CUDT::tsbpd(void* param) if (rxready) { HLOGC(tslog.Debug, - log << self->CONID() << "tsbpd: PLAYING PACKET seq=" << info.seqno << " (belated " - << (count_milliseconds(steady_clock::now() - info.tsbpd_time)) << "ms)"); + log << self->CONID() << "tsbpd: PLAYING PACKET seq=" << info.seqno << " (belated " + << FormatDuration(steady_clock::now() - info.tsbpd_time) << ")"); /* * There are packets ready to be delivered * signal a waiting "recv" call if there is any data available @@ -5533,6 +5549,8 @@ void * srt::CUDT::tsbpd(void* param) if (self->m_bClosing) break; + SRT_ATR_UNUSED bool bWokeUpOnSignal = true; + if (!is_zero(tsNextDelivery)) { IF_HEAVY_LOGGING(const steady_clock::duration timediff = tsNextDelivery - tnow); @@ -5540,12 +5558,12 @@ void * srt::CUDT::tsbpd(void* param) * Buffer at head of queue is not ready to play. * Schedule wakeup when it will be. */ - self->m_bTsbPdAckWakeup = false; + self->m_bTsbPdNeedsWakeup = false; HLOGC(tslog.Debug, - log << self->CONID() << "tsbpd: FUTURE PACKET seq=" << info.seqno - << " T=" << FormatTime(tsNextDelivery) << " - waiting " << count_milliseconds(timediff) << "ms"); + log << self->CONID() << "tsbpd: FUTURE PACKET seq=" << info.seqno + << " T=" << FormatTime(tsNextDelivery) << " - waiting " << FormatDuration(timediff)); THREAD_PAUSED(); - tsbpd_cc.wait_until(tsNextDelivery); + bWokeUpOnSignal = tsbpd_cc.wait_until(tsNextDelivery); THREAD_RESUMED(); } else @@ -5562,20 +5580,22 @@ void * srt::CUDT::tsbpd(void* param) * - Closing the connection */ HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: no data, scheduling wakeup at ack"); - self->m_bTsbPdAckWakeup = true; + self->m_bTsbPdNeedsWakeup = true; THREAD_PAUSED(); tsbpd_cc.wait(); THREAD_RESUMED(); } - HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: WAKE UP!!!"); + HLOGC(tslog.Debug, + log << self->CONID() << "tsbpd: WAKE UP [" << (bWokeUpOnSignal ? "signal" : "timeout") << "]!!! - " + << "NOW=" << FormatTime(steady_clock::now())); } THREAD_EXIT(); HLOGC(tslog.Debug, log << self->CONID() << "tsbpd: EXITING"); return NULL; } -int srt::CUDT::rcvDropTooLateUpTo(int seqno) +int srt::CUDT::rcvDropTooLateUpTo(int seqno, DropReason reason) { // Make sure that it would not drop over m_iRcvCurrSeqNo, which may break senders. if (CSeqNo::seqcmp(seqno, CSeqNo::incseq(m_iRcvCurrSeqNo)) > 0) @@ -5583,16 +5603,22 @@ int srt::CUDT::rcvDropTooLateUpTo(int seqno) dropFromLossLists(SRT_SEQNO_NONE, CSeqNo::decseq(seqno)); - const int iDropCnt = m_pRcvBuffer->dropUpTo(seqno); - if (iDropCnt > 0) + const std::pair iDropDiscardedPkts = m_pRcvBuffer->dropUpTo(seqno); + const int iDropCnt = iDropDiscardedPkts.first; + const int iDiscardedCnt = iDropDiscardedPkts.second; + const int iDropCntTotal = iDropCnt + iDiscardedCnt; + + // In case of DROP_TOO_LATE discarded packets should also be counted because they are not read from another member socket. + const int iDropStatCnt = (reason == DROP_DISCARD) ? iDropCnt : iDropCntTotal; + if (iDropStatCnt > 0) { enterCS(m_StatsLock); // Estimate dropped bytes from average payload size. const uint64_t avgpayloadsz = m_pRcvBuffer->getRcvAvgPayloadSize(); - m_stats.rcvr.dropped.count(stats::BytesPackets(iDropCnt * avgpayloadsz, (uint32_t) iDropCnt)); + m_stats.rcvr.dropped.count(stats::BytesPackets(iDropStatCnt * avgpayloadsz, (uint32_t)iDropStatCnt)); leaveCS(m_StatsLock); } - return iDropCnt; + return iDropCntTotal; } void srt::CUDT::setInitialRcvSeq(int32_t isn) @@ -5729,14 +5755,6 @@ void srt::CUDT::rewriteHandshakeData(const sockaddr_any& peer, CHandShake& w_hs) void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any& peer, const CPacket& hspkt, CHandShake& w_hs) { HLOGC(cnlog.Debug, log << CONID() << "acceptAndRespond: setting up data according to handshake"); -#if ENABLE_BONDING - // Keep the group alive for the lifetime of this function, - // and do it BEFORE acquiring m_ConnectionLock to avoid - // lock inversion. - // This will check if a socket belongs to a group and if so - // it will remember this group and keep it alive here. - CUDTUnited::GroupKeeper group_keeper(uglobal(), m_parent); -#endif ScopedLock cg(m_ConnectionLock); @@ -5824,6 +5842,16 @@ void srt::CUDT::acceptAndRespond(const sockaddr_any& agent, const sockaddr_any& throw CUDTException(MJ_SETUP, MN_REJECTED, 0); } +#if ENABLE_BONDING + // The socket and the group are only linked to each other after interpretSrtHandshake(..) has been called. + // Keep the group alive for the lifetime of this function, + // and do it BEFORE acquiring m_ConnectionLock to avoid + // lock inversion. + // This will check if a socket belongs to a group and if so + // it will remember this group and keep it alive here. + CUDTUnited::GroupKeeper group_keeper(uglobal(), m_parent); +#endif + if (!prepareBuffers(NULL)) { HLOGC(cnlog.Debug, @@ -6944,6 +6972,12 @@ bool srt::CUDT::isRcvBufferReadyNoLock() const return m_pRcvBuffer->isRcvDataReady(steady_clock::now()); } +bool srt::CUDT::isRcvBufferFull() const +{ + ScopedLock lck(m_RcvBufferLock); + return m_pRcvBuffer->full(); +} + // int by_exception: accepts values of CUDTUnited::ErrorHandling: // - 0 - by return value // - 1 - by exception @@ -7731,8 +7765,8 @@ bool srt::CUDT::updateCC(ETransmissionEvent evt, const EventVariant arg) m_iCongestionWindow = cgwindow; #if ENABLE_HEAVY_LOGGING HLOGC(rslog.Debug, - log << CONID() << "updateCC: updated values from congctl: interval=" << count_microseconds(m_tdSendInterval) << " us (" - << "tk (" << m_CongCtl->pktSndPeriod_us() << "us) cgwindow=" + log << CONID() << "updateCC: updated values from congctl: interval=" << FormatDuration(m_tdSendInterval) + << " (cfg:" << m_CongCtl->pktSndPeriod_us() << "us) cgwindow=" << std::setprecision(3) << cgwindow); #endif } @@ -7811,40 +7845,6 @@ void srt::CUDT::releaseSynch() leaveCS(m_RecvLock); } - -#if ENABLE_BONDING -void srt::CUDT::dropToGroupRecvBase() -{ - int32_t group_recv_base = SRT_SEQNO_NONE; - if (m_parent->m_GroupOf) - { - // Check is first done before locking to avoid unnecessary - // mutex locking. The condition for this field is that it - // can be either never set, already reset, or ever set - // and possibly dangling. The re-check after lock eliminates - // the dangling case. - ScopedLock glock (uglobal().m_GlobControlLock); - - // Note that getRcvBaseSeqNo() will lock m_GroupOf->m_GroupLock, - // but this is an intended order. - if (m_parent->m_GroupOf) - group_recv_base = m_parent->m_GroupOf->getRcvBaseSeqNo(); - } - if (group_recv_base == SRT_SEQNO_NONE) - return; - - ScopedLock lck(m_RcvBufferLock); - int cnt = rcvDropTooLateUpTo(CSeqNo::incseq(group_recv_base)); - if (cnt > 0) - { - HLOGC(grlog.Debug, - log << CONID() << "dropToGroupRecvBase: dropped " << cnt << " packets before ACK: group_recv_base=" - << group_recv_base << " m_iRcvLastAck=" << m_iRcvLastAck - << " m_iRcvCurrSeqNo=" << m_iRcvCurrSeqNo << " m_bTsbPd=" << m_bTsbPd); - } -} -#endif - namespace srt { #if ENABLE_HEAVY_LOGGING static void DebugAck(string hdr, int prev, int ack) @@ -8061,10 +8061,6 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) #endif string reason; // just for "a reason" of giving particular % for ACK -#if ENABLE_BONDING - dropToGroupRecvBase(); -#endif - // The TSBPD thread may change the first lost sequence record (TLPKTDROP). // To avoid it the m_RcvBufferLock has to be acquired. UniqueLock bufflock(m_RcvBufferLock); @@ -8172,7 +8168,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) /* Newly acknowledged data, signal TsbPD thread */ CUniqueSync tslcc (m_RecvLock, m_RcvTsbPdCond); // m_bTsbPdAckWakeup is protected by m_RecvLock in the tsbpd() thread - if (m_bTsbPdAckWakeup) + if (m_bTsbPdNeedsWakeup) tslcc.notify_one(); } else @@ -8235,7 +8231,8 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size) else if (!bNeedFullAck) { // Not possible (m_iRcvCurrSeqNo+1 <% m_iRcvLastAck ?) - LOGC(xtlog.Error, log << CONID() << "sendCtrl(UMSG_ACK): IPE: curr %" << ack << " <% last %" << m_iRcvLastAck); + LOGC(xtlog.Error, log << CONID() << "sendCtrl(UMSG_ACK): IPE: curr(" << reason << ") %" + << ack << " <% last %" << m_iRcvLastAck); return nbsent; } @@ -8726,19 +8723,18 @@ void srt::CUDT::processCtrlAckAck(const CPacket& ctrlpkt, const time_point& tsAr // srt_recvfile (which doesn't make any sense), you'll have a deadlock. if (m_config.bDriftTracer) { - enterCS(m_RcvBufferLock); - const bool drift_updated SRT_ATR_UNUSED = m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, rtt); - leaveCS(m_RcvBufferLock); + //enterCS(m_RcvBufferLock); + +#if ENABLE_BONDING + ScopedLock glock(uglobal().m_GlobControlLock); // XXX not too excessive? + const bool drift_updated = +#endif + m_pRcvBuffer->addRcvTsbPdDriftSample(ctrlpkt.getMsgTimeStamp(), tsArrival, rtt); + //leaveCS(m_RcvBufferLock); #if ENABLE_BONDING if (drift_updated && m_parent->m_GroupOf) - { - ScopedLock glock(uglobal().m_GlobControlLock); - if (m_parent->m_GroupOf) - { - m_parent->m_GroupOf->synchronizeDrift(this); - } - } + m_parent->m_GroupOf->synchronizeDrift(this); #endif } @@ -10195,7 +10191,7 @@ int srt::CUDT::handleSocketPacketReception(const vector& incoming, bool& #endif } } - else if (m_pCryptoControl && m_pCryptoControl->getCryptoMode() == CSrtConfig::CIPHER_MODE_AES_GCM) + else if (m_pCryptoControl && m_pCryptoControl->m_RcvKmState == SRT_KM_S_SECURED) { // Unencrypted packets are not allowed. const int iDropCnt = m_pRcvBuffer->dropMessage(u->m_Packet.getSeqNo(), u->m_Packet.getSeqNo(), SRT_MSGNO_NONE, CRcvBuffer::DROP_EXISTING); @@ -11333,7 +11329,7 @@ int srt::CUDT::processConnectRequest(const sockaddr_any& addr, CPacket& packet) } } } - LOGC(cnlog.Note, log << CONID() << "listen ret: " << hs.m_iReqType << " - " << RequestTypeStr(hs.m_iReqType)); + LOGC(cnlog.Debug, log << CONID() << "listen ret: " << hs.m_iReqType << " - " << RequestTypeStr(hs.m_iReqType)); return RejectReasonForURQ(hs.m_iReqType); } diff --git a/srtcore/core.h b/srtcore/core.h index 10746c8c9..e24bd8152 100644 --- a/srtcore/core.h +++ b/srtcore/core.h @@ -327,6 +327,7 @@ class CUDT #endif int32_t rcvSeqNo() const { return m_iRcvCurrSeqNo; } + SRT_ATTR_REQUIRES(m_RecvAckLock) int flowWindowSize() const { return m_iFlowWindowSize; } int32_t deliveryRate() const { return m_iDeliveryRate; } int bandwidth() const { return m_iBandwidth; } @@ -388,6 +389,7 @@ class CUDT /// Returns the number of packets in flight (sent, but not yet acknowledged). /// @returns The number of packets in flight belonging to the interval [0; ...) + SRT_ATTR_REQUIRES(m_RecvAckLock) int32_t getFlightSpan() const { return getFlightSpan(m_iSndLastAck, m_iSndCurrSeqNo); @@ -697,6 +699,8 @@ class CUDT /// the receiver fresh loss list. void unlose(const CPacket& oldpacket); void dropFromLossLists(int32_t from, int32_t to); + + SRT_ATTR_REQUIRES(m_RecvAckLock) bool getFirstNoncontSequence(int32_t& w_seq, std::string& w_log_reason); SRT_ATTR_EXCLUDES(m_ConnectionLock) @@ -752,14 +756,24 @@ class CUDT SRT_ATTR_REQUIRES(m_RcvBufferLock) bool isRcvBufferReadyNoLock() const; + SRT_ATTR_EXCLUDES(m_RcvBufferLock) + bool isRcvBufferFull() const; + // TSBPD thread main function. static void* tsbpd(void* param); + enum DropReason + { + DROP_TOO_LATE, //< Drop to keep up to the live pace (TLPKTDROP). + DROP_DISCARD //< Drop because another group member already provided these packets. + }; + /// Drop too late packets (receiver side). Update loss lists and ACK positions. /// The @a seqno packet itself is not dropped. /// @param seqno [in] The sequence number of the first packets following those to be dropped. + /// @param reason A reason for dropping (see @a DropReason). /// @return The number of packets dropped. - int rcvDropTooLateUpTo(int seqno); + int rcvDropTooLateUpTo(int seqno, DropReason reason = DROP_TOO_LATE); static loss_seqs_t defaultPacketArrival(void* vself, CPacket& pkt); static loss_seqs_t groupPacketArrival(void* vself, CPacket& pkt); @@ -980,7 +994,7 @@ class CUDT sync::CThread m_RcvTsbPdThread; // Rcv TsbPD Thread handle sync::Condition m_RcvTsbPdCond; // TSBPD signals if reading is ready. Use together with m_RecvLock - bool m_bTsbPdAckWakeup; // Signal TsbPd thread on Ack sent + bool m_bTsbPdNeedsWakeup; // Signal TsbPd thread to wake up on RCV buffer state change. sync::Mutex m_RcvTsbPdStartupLock; // Protects TSBPD thread creating and joining CallbackHolder m_cbAcceptHook; @@ -1129,7 +1143,8 @@ class CUDT /// @return -2 The incoming packet exceeds the expected sequence by more than a length of the buffer (irrepairable discrepancy). int handleSocketPacketReception(const std::vector& incoming, bool& w_new_inserted, bool& w_was_sent_in_order, CUDT::loss_seqs_t& w_srt_loss_seqs); - /// Get the packet's TSBPD time. + /// Get the packet's TSBPD time - + /// the time when it is passed to the reading application. /// The @a grp passed by void* is not used yet /// and shall not be used when ENABLE_BONDING=0. time_point getPktTsbPdTime(void* grp, const CPacket& packet); @@ -1149,12 +1164,6 @@ class CUDT static void addLossRecord(std::vector& lossrecord, int32_t lo, int32_t hi); int32_t bake(const sockaddr_any& addr, int32_t previous_cookie = 0, int correction = 0); -#if ENABLE_BONDING - /// @brief Drop packets in the recv buffer behind group_recv_base. - /// Updates m_iRcvLastSkipAck if it's behind group_recv_base. - void dropToGroupRecvBase(); -#endif - void processKeepalive(const CPacket& ctrlpkt, const time_point& tsArrival); diff --git a/srtcore/group.cpp b/srtcore/group.cpp index 1539245a0..0927d085a 100644 --- a/srtcore/group.cpp +++ b/srtcore/group.cpp @@ -259,6 +259,7 @@ CUDTGroup::CUDTGroup(SRT_GROUP_TYPE gtype) , m_uOPT_MinStabilityTimeout_us(1000 * CSrtConfig::COMM_DEF_MIN_STABILITY_TIMEOUT_MS) // -1 = "undefined"; will become defined with first added socket , m_iMaxPayloadSize(-1) + , m_iAvgPayloadSize(-1) , m_bSynRecving(true) , m_bSynSending(true) , m_bTsbPd(true) @@ -869,18 +870,9 @@ void CUDTGroup::syncWithSocket(const CUDT& core, const HandshakeSide side) set_currentSchedSequence(core.ISN()); } - // XXX - // Might need further investigation as to whether this isn't - // wrong for some cases. By having this -1 here the value will be - // laziliy set from the first reading one. It is believed that - // it covers all possible scenarios, that is: - // - // - no readers - no problem! - // - have some readers and a new is attached - this is set already - // - connect multiple links, but none has read yet - you'll be the first. - // - // Previous implementation used setting to: core.m_iPeerISN - resetInitialRxSequence(); + // Only set if was not initialized to avoid problems on a running connection. + if (m_RcvBaseSeqNo == SRT_SEQNO_NONE) + m_RcvBaseSeqNo = CSeqNo::decseq(core.m_iPeerISN); // Get the latency (possibly fixed against the opposite side) // from the first socket (core.m_iTsbPdDelay_ms), @@ -2048,10 +2040,14 @@ vector CUDTGroup::recv_WaitForReadReady(const vector& } else { - // No read-readiness reported by epoll, but probably missed or not yet handled - // as the receiver buffer is read-ready. + // No read-readiness reported by epoll, but can be missed or not yet handled + // while the receiver buffer is in fact read-ready. ScopedLock lg(sock->core().m_RcvBufferLock); - if (sock->core().m_pRcvBuffer && sock->core().m_pRcvBuffer->isRcvDataReady()) + if (!sock->core().m_pRcvBuffer) + continue; + // Checking for the next packet in the RCV buffer is safer that isReadReady(tnow). + const CRcvBuffer::PacketInfo info = sock->core().m_pRcvBuffer->getFirstValidPacketInfo(); + if (info.seqno != SRT_SEQNO_NONE && !info.seq_gap) readReady.push_back(sock); } } @@ -2221,6 +2217,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) } // Find the first readable packet among all member sockets. + steady_clock::time_point tnow = steady_clock::now(); CUDTSocket* socketToRead = NULL; CRcvBuffer::PacketInfo infoToRead = {-1, false, time_point()}; for (vector::const_iterator si = readySockets.begin(); si != readySockets.end(); ++si) @@ -2241,7 +2238,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) } const CRcvBuffer::PacketInfo info = - ps->core().m_pRcvBuffer->getFirstReadablePacketInfo(steady_clock::now()); + ps->core().m_pRcvBuffer->getFirstReadablePacketInfo(tnow); if (info.seqno == SRT_SEQNO_NONE) { HLOGC(grlog.Debug, log << "grp/recv: $" << id() << ": @" << ps->m_SocketID << ": Nothing to read."); @@ -2261,6 +2258,12 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) { socketToRead = ps; infoToRead = info; + + if (m_RcvBaseSeqNo != SRT_SEQNO_NONE && ((CSeqNo(w_mc.pktseq) - CSeqNo(m_RcvBaseSeqNo)) == 1)) + { + // We have the next packet. No need to check other read-ready sockets. + break; + } } } @@ -2309,6 +2312,20 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) } fillGroupData((w_mc), w_mc); + // m_RcvBaseSeqNo is expected to be set to the PeerISN with the first connected member, + // so a packet drop at the start should also be detected by this condition. + if (m_RcvBaseSeqNo != SRT_SEQNO_NONE) + { + const int32_t iNumDropped = (CSeqNo(w_mc.pktseq) - CSeqNo(m_RcvBaseSeqNo)) - 1; + if (iNumDropped > 0) + { + m_stats.recvDrop.count(stats::BytesPackets(iNumDropped * static_cast(avgRcvPacketSize()), iNumDropped)); + LOGC(grlog.Warn, + log << "@" << m_GroupID << " GROUP RCV-DROPPED " << iNumDropped << " packet(s): seqno %" + << CSeqNo::incseq(m_RcvBaseSeqNo) << " to %" << CSeqNo::decseq(w_mc.pktseq)); + } + } + HLOGC(grlog.Debug, log << "grp/recv: $" << id() << ": Update m_RcvBaseSeqNo: %" << m_RcvBaseSeqNo << " -> %" << w_mc.pktseq); m_RcvBaseSeqNo = w_mc.pktseq; @@ -2324,7 +2341,7 @@ int CUDTGroup::recv(char* buf, int len, SRT_MSGCTRL& w_mc) ScopedLock lg(ps->core().m_RcvBufferLock); if (m_RcvBaseSeqNo != SRT_SEQNO_NONE) { - const int cnt = ps->core().rcvDropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo)); + const int cnt = ps->core().rcvDropTooLateUpTo(CSeqNo::incseq(m_RcvBaseSeqNo), CUDT::DROP_DISCARD); if (cnt > 0) { HLOGC(grlog.Debug, diff --git a/srtcore/packet.cpp b/srtcore/packet.cpp index 668c4b5b3..180623039 100644 --- a/srtcore/packet.cpp +++ b/srtcore/packet.cpp @@ -432,38 +432,29 @@ void CPacket::pack(UDTMessageType pkttype, const int32_t* lparam, void* rparam, } } -void CPacket::toNL() +void CPacket::toNetworkByteOrder() { - // XXX USE HtoNLA! + // The payload of data packet should remain in network byte order. if (isControl()) { - for (ptrdiff_t i = 0, n = getLength() / 4; i < n; ++i) - *((uint32_t*)m_pcData + i) = htonl(*((uint32_t*)m_pcData + i)); + HtoNLA((uint32_t*) m_pcData, (const uint32_t*) m_pcData, getLength() / 4); } - // convert packet header into network order + // Convert packet header independent of packet type. uint32_t* p = m_nHeader; - for (int j = 0; j < 4; ++j) - { - *p = htonl(*p); - ++p; - } + HtoNLA(p, p, 4); } -void CPacket::toHL() +void CPacket::toHostByteOrder() { - // convert back into local host order + // Convert packet header independent of packet type. uint32_t* p = m_nHeader; - for (int k = 0; k < 4; ++k) - { - *p = ntohl(*p); - ++p; - } + NtoHLA(p, p, 4); + // The payload of data packet should remain in network byte order. if (isControl()) { - for (ptrdiff_t l = 0, n = getLength() / 4; l < n; ++l) - *((uint32_t*)m_pcData + l) = ntohl(*((uint32_t*)m_pcData + l)); + NtoHLA((uint32_t*)m_pcData, (const uint32_t*)m_pcData, getLength() / 4); } } diff --git a/srtcore/packet.h b/srtcore/packet.h index 9b757118f..5094247b5 100644 --- a/srtcore/packet.h +++ b/srtcore/packet.h @@ -331,8 +331,10 @@ class CPacket }; public: - void toNL(); - void toHL(); + /// @brief Convert the packet inline to a network byte order (Little-endian). + void toNetworkByteOrder(); + /// @brief Convert the packet inline to a host byte order. + void toHostByteOrder(); protected: // DynamicStruct is the same as array of given type and size, just it diff --git a/srtcore/queue.cpp b/srtcore/queue.cpp index a7df820ec..558b1b187 100644 --- a/srtcore/queue.cpp +++ b/srtcore/queue.cpp @@ -1121,8 +1121,8 @@ bool srt::CRendezvousQueue::qualifyToHandle(EReadStatus rst, if ((rst == RST_AGAIN || i->m_iID != iDstSockID) && tsNow <= tsRepeat) { HLOGC(cnlog.Debug, - log << "RID:@" << i->m_iID << std::fixed << count_microseconds(tsNow - tsLastReq) / 1000.0 - << " ms passed since last connection request."); + log << "RID:@" << i->m_iID << " " << FormatDuration(tsNow - tsLastReq) + << " passed since last connection request."); continue; } @@ -1436,7 +1436,7 @@ srt::EConnectStatus srt::CRcvQueue::worker_ProcessConnectionRequest(CUnit* unit, ScopedLock cg(m_LSLock); if (m_pListener) { - LOGC(cnlog.Note, log << "PASSING request from: " << addr.str() << " to agent:" << m_pListener->socketID()); + LOGC(cnlog.Debug, log << "PASSING request from: " << addr.str() << " to listener:" << m_pListener->socketID()); listener_ret = m_pListener->processConnectRequest(addr, unit->m_Packet); // This function does return a code, but it's hard to say as to whether @@ -1455,8 +1455,8 @@ srt::EConnectStatus srt::CRcvQueue::worker_ProcessConnectionRequest(CUnit* unit, if (have_listener) // That is, the above block with m_pListener->processConnectRequest was executed { - LOGC(cnlog.Note, - log << CONID() << "Listener managed the connection request from: " << addr.str() + LOGC(cnlog.Debug, + log << CONID() << "Listener got the connection request from: " << addr.str() << " result:" << RequestTypeStr(UDTRequestType(listener_ret))); return listener_ret == SRT_REJ_UNKNOWN ? CONN_CONTINUE : CONN_REJECT; } diff --git a/srtcore/utilities.h b/srtcore/utilities.h index 8a1374eb7..1786cf0ae 100644 --- a/srtcore/utilities.h +++ b/srtcore/utilities.h @@ -237,17 +237,20 @@ written by #endif -// Hardware <--> Network (big endian) convention +/// Hardware --> Network (big-endian) byte order conversion +/// @param size source length in four octets inline void HtoNLA(uint32_t* dst, const uint32_t* src, size_t size) { for (size_t i = 0; i < size; ++ i) - dst[i] = htonl(src[i]); + dst[i] = htobe32(src[i]); } +/// Network (big-endian) --> Hardware byte order conversion +/// @param size source length in four octets inline void NtoHLA(uint32_t* dst, const uint32_t* src, size_t size) { for (size_t i = 0; i < size; ++ i) - dst[i] = ntohl(src[i]); + dst[i] = be32toh(src[i]); } // Hardware <--> Intel (little endian) convention