From 309fcbb70dac95f9f4b29fb839bb60962efc4d74 Mon Sep 17 00:00:00 2001 From: Andy Grundman Date: Thu, 12 Sep 2024 03:24:29 -0400 Subject: [PATCH] Improve support for high-resolution stats * This patch adds a new microsecond-resolution function call, LiGetMicroseconds(), to complement the existing LiGetMillis(). Many variables used by stats have been updated to work at this higher resolution and now provide better results when displaying e.g. sub-millisecond frametime stats. To try and avoid confusion, variables that now contain microseconds have been renamed with a suffix of 'Us', and those ending in 'Ms' contain milliseconds. I originally experimented with nanoseconds but it felt like overkill for our needs. Public API in Limelight.h: uint64_t LiGetMicroseconds(void); uint64_t LiGetMillis(void); const RTP_AUDIO_STATS* LiGetRTPAudioStats(void); // provides access to RTP data for the overlay stats const RTP_VIDEO_STATS* LiGetRTPVideoStats(void); Note: Users of this library may need to make changes. If using LiGetMillis() to track the duration of something that is shown to the user, consider switching to LiGetMicroseconds(). Remember to divide by 1000 at time of display to show in milliseconds. --- .gitignore | 1 + CMakeLists.txt | 34 ++++++++- src/AudioStream.c | 16 +++-- src/InputStream.c | 6 +- src/Limelight.h | 54 +++++++++++--- src/Misc.c | 6 +- src/Platform.c | 153 +++++++++++++++++++++++++++++++++++++--- src/Platform.h | 29 +++++++- src/RtpAudioQueue.c | 21 ++++-- src/RtpAudioQueue.h | 4 +- src/RtpVideoQueue.c | 47 ++++++------ src/RtpVideoQueue.h | 10 +-- src/RtspConnection.c | 38 +++++----- src/VideoDepacketizer.c | 44 ++++++------ src/VideoStream.c | 14 ++-- 15 files changed, 367 insertions(+), 110 deletions(-) diff --git a/.gitignore b/.gitignore index 8be66fa4..a9e46ee4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +.vscode/ limelight-common/ARM/ limelight-common/Debug/ Build/ diff --git a/CMakeLists.txt b/CMakeLists.txt index 5e273cad..61c4a5d5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,6 +2,7 @@ cmake_minimum_required(VERSION 3.1) project(moonlight-common-c LANGUAGES C) +string(TOUPPER "x${CMAKE_BUILD_TYPE}" BUILD_TYPE) set(CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/cmake) option(USE_MBEDTLS "Use MbedTLS instead of OpenSSL" OFF) @@ -61,7 +62,6 @@ else() target_include_directories(moonlight-common-c SYSTEM PRIVATE ${OPENSSL_INCLUDE_DIR}) endif() -string(TOUPPER "x${CMAKE_BUILD_TYPE}" BUILD_TYPE) if("${BUILD_TYPE}" STREQUAL "XDEBUG") target_compile_definitions(moonlight-common-c PRIVATE LC_DEBUG) else() @@ -74,10 +74,40 @@ else() endif() endif() +if (NOT(MSVC OR APPLE)) + include(CheckLibraryExists) + CHECK_LIBRARY_EXISTS(rt clock_gettime "" HAVE_CLOCK_GETTIME) + + if (NOT HAVE_CLOCK_GETTIME) + set(CMAKE_EXTRA_INCLUDE_FILES time.h) + CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME) + SET(CMAKE_EXTRA_INCLUDE_FILES) + endif() + + foreach(clock CLOCK_MONOTONIC CLOCK_MONOTONIC_RAW) + message(STATUS "Testing whether ${clock} can be used") + CHECK_CXX_SOURCE_COMPILES( +"#define _POSIX_C_SOURCE 200112L +#include +int main () +{ + struct timespec ts[1]; + clock_gettime (${clock}, ts); + return 0; +}" HAVE_${clock}) + if(HAVE_${clock}) + message(STATUS "Testing whether ${clock} can be used -- Success") + else() + message(STATUS "Testing whether ${clock} can be used -- Failed") + endif() + endforeach() + +endif() + target_include_directories(moonlight-common-c SYSTEM PUBLIC src) target_include_directories(moonlight-common-c PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/reedsolomon ) -target_compile_definitions(moonlight-common-c PRIVATE HAS_SOCKLEN_T) \ No newline at end of file +target_compile_definitions(moonlight-common-c PRIVATE HAS_SOCKLEN_T) diff --git a/src/AudioStream.c b/src/AudioStream.c index d14d5725..2fcc1f0e 100644 --- a/src/AudioStream.c +++ b/src/AudioStream.c @@ -275,7 +275,7 @@ static void AudioReceiveThreadProc(void* context) { } else if (packet->header.size == 0) { // Receive timed out; try again - + if (!receivedDataFromPeer) { waitingForAudioMs += UDP_RECV_POLL_TIMEOUT_MS; } @@ -299,6 +299,8 @@ static void AudioReceiveThreadProc(void* context) { Limelog("Received first audio packet after %d ms\n", waitingForAudioMs); if (firstReceiveTime != 0) { + // XXX firstReceiveTime is never set here... + // We're already dropping 500ms of audio so this probably doesn't matter packetsToDrop += (uint32_t)(PltGetMillis() - firstReceiveTime) / AudioPacketDuration; } @@ -366,7 +368,7 @@ static void AudioReceiveThreadProc(void* context) { free(queuedPacket); } } - + // Break on exit if (queuedPacket != NULL) { break; @@ -374,7 +376,7 @@ static void AudioReceiveThreadProc(void* context) { } } } - + if (packet != NULL) { free(packet); } @@ -405,12 +407,12 @@ void stopAudioStream(void) { AudioCallbacks.stop(); PltInterruptThread(&receiveThread); - if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { + if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { // Signal threads waiting on the LBQ LbqSignalQueueShutdown(&packetQueue); PltInterruptThread(&decoderThread); } - + PltJoinThread(&receiveThread); if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { PltJoinThread(&decoderThread); @@ -474,3 +476,7 @@ int LiGetPendingAudioFrames(void) { int LiGetPendingAudioDuration(void) { return LiGetPendingAudioFrames() * AudioPacketDuration; } + +const RTP_AUDIO_STATS* LiGetRTPAudioStats(void) { + return &rtpAudioQueue.stats; +} diff --git a/src/InputStream.c b/src/InputStream.c index e61e2bee..d9e97ce0 100644 --- a/src/InputStream.c +++ b/src/InputStream.c @@ -95,7 +95,7 @@ typedef struct _PACKET_HOLDER { // Initializes the input stream int initializeInputStream(void) { memcpy(currentAesIv, StreamConfig.remoteInputAesIv, sizeof(currentAesIv)); - + // Set a high maximum queue size limit to ensure input isn't dropped // while the input send thread is blocked for short periods. LbqInitializeLinkedBlockingQueue(&packetQueue, MAX_QUEUED_INPUT_PACKETS); @@ -129,7 +129,7 @@ int initializeInputStream(void) { // Destroys and cleans up the input stream void destroyInputStream(void) { PLINKED_BLOCKING_QUEUE_ENTRY entry, nextEntry; - + PltDestroyCryptoContext(cryptoContext); entry = LbqDestroyLinkedBlockingQueue(&packetQueue); @@ -740,7 +740,7 @@ int stopInputStream(void) { if (inputSock != INVALID_SOCKET) { shutdownTcpSocket(inputSock); } - + if (inputSock != INVALID_SOCKET) { closeSocket(inputSock); inputSock = INVALID_SOCKET; diff --git a/src/Limelight.h b/src/Limelight.h index b2bb5509..f052866b 100644 --- a/src/Limelight.h +++ b/src/Limelight.h @@ -69,7 +69,7 @@ typedef struct _STREAM_CONFIGURATION { // Specifies the channel configuration of the audio stream. // See AUDIO_CONFIGURATION constants and MAKE_AUDIO_CONFIGURATION() below. int audioConfiguration; - + // Specifies the mask of supported video formats. // See VIDEO_FORMAT constants below. int supportedVideoFormats; @@ -154,16 +154,15 @@ typedef struct _DECODE_UNIT { // (happens when the frame is repeated). uint16_t frameHostProcessingLatency; - // Receive time of first buffer. This value uses an implementation-defined epoch, - // but the same epoch as enqueueTimeMs and LiGetMillis(). - uint64_t receiveTimeMs; + // Receive time of first buffer in microseconds. + uint64_t receiveTimeUs; // Time the frame was fully assembled and queued for the video decoder to process. // This is also approximately the same time as the final packet was received, so - // enqueueTimeMs - receiveTimeMs is the time taken to receive the frame. At the + // enqueueTimeUs - receiveTimeUs is the time taken to receive the frame. At the // time the decode unit is passed to submitDecodeUnit(), the total queue delay - // can be calculated by LiGetMillis() - enqueueTimeMs. - uint64_t enqueueTimeMs; + // can be calculated. This value is in microseconds. + uint64_t enqueueTimeUs; // Presentation time in milliseconds with the epoch at the first captured frame. // This can be used to aid frame pacing or to drop old frames that were queued too @@ -512,10 +511,10 @@ void LiInitializeConnectionCallbacks(PCONNECTION_LISTENER_CALLBACKS clCallbacks) typedef struct _SERVER_INFORMATION { // Server host name or IP address in text form const char* address; - + // Text inside 'appversion' tag in /serverinfo const char* serverInfoAppVersion; - + // Text inside 'GfeVersion' tag in /serverinfo (if present) const char* serverInfoGfeVersion; @@ -825,7 +824,12 @@ int LiSendHighResScrollEvent(short scrollAmount); int LiSendHScrollEvent(signed char scrollClicks); int LiSendHighResHScrollEvent(short scrollAmount); +// This function returns a time in microseconds with an implementation-defined epoch. +// It should only ever be compared with the return value from a previous call to itself. +uint64_t LiGetMicroseconds(void); + // This function returns a time in milliseconds with an implementation-defined epoch. +// It should only ever be compared with the return value from a previous call to itself. uint64_t LiGetMillis(void); // This is a simplistic STUN function that can assist clients in getting the WAN address @@ -848,6 +852,36 @@ int LiGetPendingAudioFrames(void); // negotiated audio frame duration. int LiGetPendingAudioDuration(void); +// Returns a pointer to a struct containing various statistics about the RTP audio stream. +// The data should be considered read-only and must not be modified. +typedef struct _RTP_AUDIO_STATS { + uint32_t packetCountAudio; // total audio packets + uint32_t packetCountFec; // total packets of type FEC + uint32_t packetCountFecRecovered; // a packet was saved + uint32_t packetCountFecFailed; // tried to recover but too much was lost + uint32_t packetCountOOS; // out-of-sequence packets + uint32_t packetCountInvalid; // corrupted packets, etc + uint32_t packetCountFecInvalid; // invalid FEC packet +} RTP_AUDIO_STATS, *PRTP_AUDIO_STATS; + +const RTP_AUDIO_STATS* LiGetRTPAudioStats(void); + +// Returns a pointer to a struct containing various statistics about the RTP video stream. +// The data should be considered read-only and must not be modified. +// Right now this is mainly used to track total video and FEC packets, as there are +// many video stats already implemented at a higher level in moonlight-qt. +typedef struct _RTP_VIDEO_STATS { + uint32_t packetCountVideo; // total video packets + uint32_t packetCountFec; // total packets of type FEC + uint32_t packetCountFecRecovered; // a packet was saved + uint32_t packetCountFecFailed; // tried to recover but too much was lost + uint32_t packetCountOOS; // out-of-sequence packets + uint32_t packetCountInvalid; // corrupted packets, etc + uint32_t packetCountFecInvalid; // invalid FEC packet +} RTP_VIDEO_STATS, *PRTP_VIDEO_STATS; + +const RTP_VIDEO_STATS* LiGetRTPVideoStats(void); + // Port index flags for use with LiGetPortFromPortFlagIndex() and LiGetProtocolFromPortFlagIndex() #define ML_PORT_INDEX_TCP_47984 0 #define ML_PORT_INDEX_TCP_47989 1 @@ -875,7 +909,7 @@ int LiGetPendingAudioDuration(void); unsigned int LiGetPortFlagsFromStage(int stage); unsigned int LiGetPortFlagsFromTerminationErrorCode(int errorCode); -// Returns the IPPROTO_* value for the specified port index +// Returns the IPPROTO_* value for the specified port index int LiGetProtocolFromPortFlagIndex(int portFlagIndex); // Returns the port number for the specified port index diff --git a/src/Misc.c b/src/Misc.c index 4988b125..e3cd6b0f 100644 --- a/src/Misc.c +++ b/src/Misc.c @@ -99,7 +99,7 @@ int extractVersionQuadFromString(const char* string, int* quad) { nextNumber++; } } - + return 0; } @@ -148,6 +148,10 @@ uint64_t LiGetMillis(void) { return PltGetMillis(); } +uint64_t LiGetMicroseconds(void) { + return PltGetMicroseconds(); +} + uint32_t LiGetHostFeatureFlags(void) { return SunshineFeatureFlags; } diff --git a/src/Platform.c b/src/Platform.c index 4f998856..e4091892 100644 --- a/src/Platform.c +++ b/src/Platform.c @@ -464,24 +464,157 @@ void PltWaitForConditionVariable(PLT_COND* cond, PLT_MUTEX* mutex) { #endif } -uint64_t PltGetMillis(void) { +//// Begin timing functions + +// These functions return a number of microseconds or milliseconds since an opaque start time. + +static bool has_monotonic_time = false; +static bool ticks_started = false; + #if defined(LC_WINDOWS) - return GetTickCount64(); -#elif defined(CLOCK_MONOTONIC) && !defined(NO_CLOCK_GETTIME) - struct timespec tv; - clock_gettime(CLOCK_MONOTONIC, &tv); +static LARGE_INTEGER start_ticks; +static LARGE_INTEGER ticks_per_second; + +void PltTicksInit(void) { + if (ticks_started) { + return; + } + ticks_started = true; + BOOL rc = QueryPerformanceFrequency(&ticks_per_second); + QueryPerformanceCounter(&start_ticks); +} + +uint64_t PltGetMicroseconds(void) { + if (!ticks_started) { + PltTicksInit(); + } + LARGE_INTEGER now; + BOOL rc = QueryPerformanceCounter(&now); + return (uint64_t)(((now.QuadPart - start_ticks.QuadPart) * 1000000) / ticks_per_second.QuadPart); +} + +#elif defined(LC_DARWIN) + +static mach_timebase_info_data_t mach_base_info; +static uint64_t start; + +void PltTicksInit(void) { + if (ticks_started) { + return; + } + ticks_started = true; + mach_timebase_info(&mach_base_info); + has_monotonic_time = true; + start = mach_absolute_time(); +} + +uint64_t PltGetMicroseconds(void) { + if (!ticks_started) { + PltTicksInit(); + } + const uint64_t now = mach_absolute_time(); + return (((now - start) * mach_base_info.numer) / mach_base_info.denom) / 1000; +} + +#elif defined(__vita__) + +static uint64_t start; + +void PltTicksInit(void) { + if (ticks_started) { + return; + } + ticks_started = true; + start = sceKernelGetProcessTimeWide(); +} + +uint64_t PltGetMicroseconds(void) { + if (!ticks_started) { + PltTicksInit(); + } + uint64_t now = sceKernelGetProcessTimeWide(); + return (uint64_t)(now - start); +} + +#elif defined(__3DS__) + +static uint64_t start; + +void PltTicksInit(void) { + if (ticks_started) { + return; + } + ticks_started = true; + start = svcGetSystemTick(); +} + +uint64_t PltGetMicroseconds(void) { + if (!ticks_started) { + PltTicksInit(); + } + uint64_t elapsed = svcGetSystemTick() - start; + return elapsed * 1000 / CPU_TICKS_PER_MSEC; +} - return ((uint64_t)tv.tv_sec * 1000) + (tv.tv_nsec / 1000000); #else - struct timeval tv; - gettimeofday(&tv, NULL); +/* Use CLOCK_MONOTONIC_RAW, if available, which is not subject to adjustment by NTP */ +#ifdef HAVE_CLOCK_GETTIME +static struct timespec start_ts; +# ifdef CLOCK_MONOTONIC_RAW +# define PLT_MONOTONIC_CLOCK CLOCK_MONOTONIC_RAW +# else +# define PLT_MONOTONIC_CLOCK CLOCK_MONOTONIC +# endif +#endif + +static struct timeval start_tv; + +void PltTicksInit(void) { + if (ticks_started) { + return; + } + ticks_started = true; +#ifdef HAVE_CLOCK_GETTIME + if (clock_gettime(PLT_MONOTONIC_CLOCK, &start_ts) == 0) { + has_monotonic_time = true; + } else +#endif + { + gettimeofday(&start_tv, NULL); + } +} + +uint64_t PltGetMicroseconds(void) { + if (!ticks_started) { + PltTicksInit(); + } + + if (has_monotonic_time) { +#ifdef HAVE_CLOCK_GETTIME + struct timespec now; + clock_gettime(PLT_MONOTONIC_CLOCK, &now); + return (uint64_t)(((int64_t)(now.tv_sec - start_ts.tv_sec) * 1000000) + ((now.tv_nsec - start_ts.tv_nsec) / 1000)); +#else + LC_ASSERT(false); + return 0; +#endif + } else { + struct timeval now; + gettimeofday(&now, NULL); + return (uint64_t)(((int64_t)(now.tv_sec - start_tv.tv_sec) * 1000000) + (now.tv_usec - start_tv.tv_usec)); + } +} - return ((uint64_t)tv.tv_sec * 1000) + (tv.tv_usec / 1000); #endif + +uint64_t PltGetMillis(void) { + return PltGetMicroseconds() / 1000; } +//// End timing functions + bool PltSafeStrcpy(char* dest, size_t dest_size, const char* src) { LC_ASSERT(dest_size > 0); @@ -519,6 +652,8 @@ bool PltSafeStrcpy(char* dest, size_t dest_size, const char* src) { int initializePlatform(void) { int err; + PltTicksInit(); + err = initializePlatformSockets(); if (err != 0) { return err; diff --git a/src/Platform.h b/src/Platform.h index 20d03f5e..d9c9ffa8 100644 --- a/src/Platform.h +++ b/src/Platform.h @@ -18,6 +18,16 @@ #include #include #include +#elif defined(__APPLE__) +#include +#include +#include +#include +#include +#include +#include +#include +#include #elif defined(__vita__) #include #include @@ -70,9 +80,19 @@ #include #include "Limelight.h" -#define Limelog(s, ...) \ +#if defined(LC_DARWIN) +// Don't give the SDL logger a chance to slow down any threads +# define Limelog(s, ...) \ + if (ListenerCallbacks.logMessage) { \ + dispatch_async(dispatch_get_main_queue(), ^{ \ + ListenerCallbacks.logMessage(s, ##__VA_ARGS__); \ + }); \ + } +#else +# define Limelog(s, ...) \ if (ListenerCallbacks.logMessage) \ ListenerCallbacks.logMessage(s, ##__VA_ARGS__) +#endif #if defined(LC_WINDOWS) #include @@ -146,6 +166,11 @@ int initializePlatform(void); void cleanupPlatform(void); +bool PltSafeStrcpy(char* dest, size_t dest_size, const char* src); + +void PltTicksInit(void); + +uint64_t PltGetMicroseconds(void); uint64_t PltGetMillis(void); -bool PltSafeStrcpy(char* dest, size_t dest_size, const char* src); + diff --git a/src/RtpAudioQueue.c b/src/RtpAudioQueue.c index be734d4e..0f7235b4 100644 --- a/src/RtpAudioQueue.c +++ b/src/RtpAudioQueue.c @@ -204,15 +204,19 @@ static PRTPA_FEC_BLOCK getFecBlockForRtpPacket(PRTP_AUDIO_QUEUE queue, PRTP_PACK if (packet->packetType == RTP_PAYLOAD_TYPE_AUDIO) { if (length < sizeof(RTP_PACKET)) { + queue->stats.packetCountInvalid++; Limelog("RTP audio data packet too small: %u\n", length); LC_ASSERT_VT(false); return NULL; } + queue->stats.packetCountAudio++; + // Remember if we've received out-of-sequence packets lately. We can use // this knowledge to more quickly give up on FEC blocks. if (!queue->synchronizing && isBefore16(packet->sequenceNumber, queue->oldestRtpBaseSequenceNumber)) { queue->lastOosSequenceNumber = packet->sequenceNumber; + queue->stats.packetCountOOS++; if (!queue->receivedOosData) { Limelog("Leaving fast audio recovery mode after OOS audio data (%u < %u)\n", packet->sequenceNumber, queue->oldestRtpBaseSequenceNumber); @@ -238,11 +242,14 @@ static PRTPA_FEC_BLOCK getFecBlockForRtpPacket(PRTP_AUDIO_QUEUE queue, PRTP_PACK PAUDIO_FEC_HEADER fecHeader = (PAUDIO_FEC_HEADER)(packet + 1); if (length < sizeof(RTP_PACKET) + sizeof(AUDIO_FEC_HEADER)) { + queue->stats.packetCountFecInvalid++; Limelog("RTP audio FEC packet too small: %u\n", length); LC_ASSERT_VT(false); return NULL; } + queue->stats.packetCountFec++; + // This is an FEC packet, so we can just copy (and byteswap) the FEC header fecBlockPayloadType = fecHeader->payloadType; fecBlockBaseSeqNum = BE16(fecHeader->baseSequenceNumber); @@ -252,6 +259,7 @@ static PRTPA_FEC_BLOCK getFecBlockForRtpPacket(PRTP_AUDIO_QUEUE queue, PRTP_PACK // Ensure the FEC shard index is valid to prevent OOB access // later during recovery. if (fecHeader->fecShardIndex >= RTPA_FEC_SHARDS) { + queue->stats.packetCountFecInvalid++; Limelog("Too many audio FEC shards: %u\n", fecHeader->fecShardIndex); LC_ASSERT_VT(false); return NULL; @@ -261,6 +269,7 @@ static PRTPA_FEC_BLOCK getFecBlockForRtpPacket(PRTP_AUDIO_QUEUE queue, PRTP_PACK // The FEC blocks must start on a RTPA_DATA_SHARDS boundary for our queuing logic to work. This isn't // the case for older versions of GeForce Experience (at least 3.13). Disable the FEC logic if this // invariant is validated. + queue->stats.packetCountFecInvalid++; Limelog("Invalid FEC block base sequence number (got %u, expected %u)\n", fecBlockBaseSeqNum, (fecBlockBaseSeqNum / RTPA_DATA_SHARDS) * RTPA_DATA_SHARDS); Limelog("Audio FEC has been disabled due to an incompatibility with your host's old software!\n"); @@ -304,6 +313,7 @@ static PRTPA_FEC_BLOCK getFecBlockForRtpPacket(PRTP_AUDIO_QUEUE queue, PRTP_PACK if (existingBlock->blockSize != blockSize) { // This can happen with older versions of GeForce Experience (3.13) and Sunshine that don't use a // constant size for audio packets. + queue->stats.packetCountFecInvalid++; Limelog("Audio block size mismatch (got %u, expected %u)\n", blockSize, existingBlock->blockSize); Limelog("Audio FEC has been disabled due to an incompatibility with your host's old software!\n"); LC_ASSERT_VT(existingBlock->blockSize == blockSize); @@ -331,7 +341,7 @@ static PRTPA_FEC_BLOCK getFecBlockForRtpPacket(PRTP_AUDIO_QUEUE queue, PRTP_PACK memset(block, 0, sizeof(*block)); - block->queueTimeMs = PltGetMillis(); + block->queueTimeUs = PltGetMicroseconds(); block->blockSize = blockSize; memset(block->marks, 1, sizeof(block->marks)); @@ -454,13 +464,15 @@ static bool completeFecBlock(PRTP_AUDIO_QUEUE queue, PRTPA_FEC_BLOCK block) { } } -#ifdef FEC_VERBOSE + if (block->dataShardsReceived != RTPA_DATA_SHARDS) { + queue->stats.packetCountFecRecovered += RTPA_DATA_SHARDS - block->dataShardsReceived; +#ifdef FEC_VERBOSE Limelog("Recovered %d audio data shards from block %d\n", RTPA_DATA_SHARDS - block->dataShardsReceived, block->fecHeader.baseSequenceNumber); - } #endif + } #ifdef FEC_VALIDATION_MODE // Check the RTP header values @@ -531,9 +543,10 @@ static void handleMissingPackets(PRTP_AUDIO_QUEUE queue) { // At this point, we know we've got a second FEC block queued up waiting on the first one to complete. // If we've never seen OOS data from this host, we'll assume the first one is lost and skip forward. // If we have seen OOS data, we'll wait for a little while longer to see if OOS packets arrive before giving up. - if (!queue->receivedOosData || PltGetMillis() - queue->blockHead->queueTimeMs > (uint32_t)(AudioPacketDuration * RTPA_DATA_SHARDS) + RTPQ_OOS_WAIT_TIME_MS) { + if (!queue->receivedOosData || PltGetMicroseconds() - queue->blockHead->queueTimeUs > (uint64_t)(AudioPacketDuration * RTPA_DATA_SHARDS) + (RTPQ_OOS_WAIT_TIME_MS * 1000)) { LC_ASSERT(!isBefore16(queue->nextRtpSequenceNumber, queue->blockHead->fecHeader.baseSequenceNumber)); + queue->stats.packetCountFecFailed++; Limelog("Unable to recover audio data block %u to %u (%u+%u=%u received < %u needed)\n", queue->blockHead->fecHeader.baseSequenceNumber, queue->blockHead->fecHeader.baseSequenceNumber + RTPA_DATA_SHARDS - 1, diff --git a/src/RtpAudioQueue.h b/src/RtpAudioQueue.h index 1c8ed09c..bbe562e5 100644 --- a/src/RtpAudioQueue.h +++ b/src/RtpAudioQueue.h @@ -33,7 +33,7 @@ typedef struct _RTPA_FEC_BLOCK { AUDIO_FEC_HEADER fecHeader; - uint64_t queueTimeMs; + uint64_t queueTimeUs; uint8_t dataShardsReceived; uint8_t fecShardsReceived; bool fullyReassembled; @@ -63,6 +63,8 @@ typedef struct _RTP_AUDIO_QUEUE { bool receivedOosData; bool synchronizing; bool incompatibleServer; + + RTP_AUDIO_STATS stats; } RTP_AUDIO_QUEUE, *PRTP_AUDIO_QUEUE; #define RTPQ_RET_PACKET_CONSUMED 0x1 diff --git a/src/RtpVideoQueue.c b/src/RtpVideoQueue.c index d5dbe94f..dc82a94b 100644 --- a/src/RtpVideoQueue.c +++ b/src/RtpVideoQueue.c @@ -91,7 +91,7 @@ static void removeEntryFromList(PRTPV_QUEUE_LIST list, PRTPV_QUEUE_ENTRY entry) static void reportFinalFrameFecStatus(PRTP_VIDEO_QUEUE queue) { SS_FRAME_FEC_STATUS fecStatus; - + fecStatus.frameIndex = BE32(queue->currentFrameNumber); fecStatus.highestReceivedSequenceNumber = BE16(queue->receivedHighestSequenceNumber); fecStatus.nextContiguousSequenceNumber = BE16(queue->nextContiguousSequenceNumber); @@ -103,7 +103,7 @@ static void reportFinalFrameFecStatus(PRTP_VIDEO_QUEUE queue) { fecStatus.fecPercentage = (uint8_t)queue->fecPercentage; fecStatus.multiFecBlockIndex = (uint8_t)queue->multiFecCurrentBlockNumber; fecStatus.multiFecBlockCount = (uint8_t)(queue->multiFecLastBlockNumber + 1); - + connectionSendFrameFecStatus(&fecStatus); } @@ -111,7 +111,7 @@ static void reportFinalFrameFecStatus(PRTP_VIDEO_QUEUE queue) { static bool queuePacket(PRTP_VIDEO_QUEUE queue, PRTPV_QUEUE_ENTRY newEntry, PRTP_PACKET packet, int length, bool isParity, bool isFecRecovery) { PRTPV_QUEUE_ENTRY entry; bool outOfSequence; - + LC_ASSERT(!(isFecRecovery && isParity)); LC_ASSERT(!isBefore16(packet->sequenceNumber, queue->nextContiguousSequenceNumber)); @@ -195,7 +195,7 @@ static int reconstructFrame(PRTP_VIDEO_QUEUE queue) { int ret; LC_ASSERT(totalPackets == U16(queue->bufferHighestSequenceNumber - queue->bufferLowestSequenceNumber) + 1U); - + #ifdef FEC_VALIDATION_MODE // We'll need an extra packet to run in FEC validation mode, because we will // be "dropping" one below and recovering it using parity. However, some frames @@ -263,9 +263,9 @@ static int reconstructFrame(PRTP_VIDEO_QUEUE queue) { ret = -2; goto cleanup; } - + rs = reed_solomon_new(queue->bufferDataPackets, queue->bufferParityPackets); - + // This could happen in an OOM condition, but it could also mean the FEC data // that we fed to reed_solomon_new() is bogus, so we'll assert to get a better look. LC_ASSERT(rs != NULL); @@ -273,9 +273,9 @@ static int reconstructFrame(PRTP_VIDEO_QUEUE queue) { ret = -3; goto cleanup; } - + memset(marks, 1, sizeof(char) * (totalPackets)); - + int receiveSize = StreamConfig.packetSize + MAX_RTP_HEADER_SIZE; int packetBufferSize = receiveSize + sizeof(RTPV_QUEUE_ENTRY); @@ -307,7 +307,7 @@ static int reconstructFrame(PRTP_VIDEO_QUEUE queue) { packets[index] = (unsigned char*) entry->packet; marks[index] = 0; - + //Set padding to zero if (entry->length < receiveSize) { memset(&packets[index][entry->length], 0, receiveSize - entry->length); @@ -326,9 +326,9 @@ static int reconstructFrame(PRTP_VIDEO_QUEUE queue) { } } } - + ret = reed_solomon_reconstruct(rs, packets, marks, totalPackets, receiveSize); - + // We should always provide enough parity to recover the missing data successfully. // If this fails, something is probably wrong with our FEC state. LC_ASSERT(ret == 0); @@ -339,7 +339,7 @@ static int reconstructFrame(PRTP_VIDEO_QUEUE queue) { queue->bufferDataPackets - queue->receivedDataPackets, queue->currentFrameNumber); #endif - + // Report the final FEC status if we needed to perform a recovery reportFinalFrameFecStatus(queue); } @@ -355,7 +355,7 @@ static int reconstructFrame(PRTP_VIDEO_QUEUE queue) { rtpPacket->header = queue->pendingFecBlockList.head->packet->header; rtpPacket->timestamp = queue->pendingFecBlockList.head->packet->timestamp; rtpPacket->ssrc = queue->pendingFecBlockList.head->packet->ssrc; - + int dataOffset = sizeof(*rtpPacket); if (rtpPacket->header & FLAG_EXTENSION) { dataOffset += 4; // 2 additional fields @@ -457,7 +457,7 @@ static int reconstructFrame(PRTP_VIDEO_QUEUE queue) { if (marks != NULL) free(marks); - + return ret; } @@ -497,8 +497,8 @@ static void stageCompleteFecBlock(PRTP_VIDEO_QUEUE queue) { // and use the first packet's receive time for all packets. This ends up // actually being better for the measurements that the depacketizer does, // since it properly handles out of order packets. - LC_ASSERT(queue->bufferFirstRecvTimeMs != 0); - entry->receiveTimeMs = queue->bufferFirstRecvTimeMs; + LC_ASSERT(queue->bufferFirstRecvTimeUs != 0); + entry->receiveTimeUs = queue->bufferFirstRecvTimeUs; // Move this packet to the completed FEC block list insertEntryIntoList(&queue->completedFecBlockList, entry); @@ -631,7 +631,7 @@ int RtpvAddPacket(PRTP_VIDEO_QUEUE queue, PRTP_PACKET packet, int length, PRTPV_ queue->bufferDataPackets); } } - + // We must either start on the current FEC block number for the current frame, // or block 0 of a new frame. uint8_t expectedFecBlockNumber = (queue->currentFrameNumber == nvPacket->frameIndex ? queue->multiFecCurrentBlockNumber : 0); @@ -689,8 +689,8 @@ int RtpvAddPacket(PRTP_VIDEO_QUEUE queue, PRTP_PACKET packet, int length, PRTPV_ // Tell the control stream logic about this frame, even if we don't end up // being able to reconstruct a full frame from it. connectionSawFrame(queue->currentFrameNumber); - - queue->bufferFirstRecvTimeMs = PltGetMillis(); + + queue->bufferFirstRecvTimeUs = PltGetMicroseconds(); queue->bufferLowestSequenceNumber = U16(packet->sequenceNumber - fecIndex); queue->nextContiguousSequenceNumber = queue->bufferLowestSequenceNumber; queue->receivedDataPackets = 0; @@ -706,6 +706,9 @@ int RtpvAddPacket(PRTP_VIDEO_QUEUE queue, PRTP_PACKET packet, int length, PRTPV_ queue->bufferHighestSequenceNumber = U16(queue->bufferFirstParitySequenceNumber + queue->bufferParityPackets - 1); queue->multiFecCurrentBlockNumber = fecCurrentBlockNumber; queue->multiFecLastBlockNumber = (nvPacket->multiFecBlocks >> 6) & 0x3; + + queue->stats.packetCountVideo += queue->bufferDataPackets; + queue->stats.packetCountFec += queue->bufferParityPackets; } // Reject packets above our FEC queue valid sequence number range @@ -762,18 +765,18 @@ int RtpvAddPacket(PRTP_VIDEO_QUEUE queue, PRTP_PACKET packet, int length, PRTPV_ queue->receivedParityPackets++; LC_ASSERT(queue->receivedParityPackets <= queue->bufferParityPackets); } - + // Try to submit this frame. If we haven't received enough packets, // this will fail and we'll keep waiting. if (reconstructFrame(queue) == 0) { // Stage the complete FEC block for use once reassembly is complete stageCompleteFecBlock(queue); - + // stageCompleteFecBlock() should have consumed all pending FEC data LC_ASSERT(queue->pendingFecBlockList.head == NULL); LC_ASSERT(queue->pendingFecBlockList.tail == NULL); LC_ASSERT(queue->pendingFecBlockList.count == 0); - + // If we're not yet at the last FEC block for this frame, move on to the next block. // Otherwise, the frame is complete and we can move on to the next frame. if (queue->multiFecCurrentBlockNumber < queue->multiFecLastBlockNumber) { diff --git a/src/RtpVideoQueue.h b/src/RtpVideoQueue.h index ec42c04c..7c3a3d25 100644 --- a/src/RtpVideoQueue.h +++ b/src/RtpVideoQueue.h @@ -6,8 +6,8 @@ typedef struct _RTPV_QUEUE_ENTRY { struct _RTPV_QUEUE_ENTRY* next; struct _RTPV_QUEUE_ENTRY* prev; PRTP_PACKET packet; - uint64_t receiveTimeMs; - uint32_t presentationTimeMs; + uint64_t receiveTimeUs; + uint64_t presentationTimeMs; int length; bool isParity; } RTPV_QUEUE_ENTRY, *PRTPV_QUEUE_ENTRY; @@ -22,7 +22,7 @@ typedef struct _RTP_VIDEO_QUEUE { RTPV_QUEUE_LIST pendingFecBlockList; RTPV_QUEUE_LIST completedFecBlockList; - uint64_t bufferFirstRecvTimeMs; + uint64_t bufferFirstRecvTimeUs; uint32_t bufferLowestSequenceNumber; uint32_t bufferHighestSequenceNumber; uint32_t bufferFirstParitySequenceNumber; @@ -43,8 +43,10 @@ typedef struct _RTP_VIDEO_QUEUE { uint8_t multiFecCurrentBlockNumber; uint8_t multiFecLastBlockNumber; - uint32_t lastOosFramePresentationTimestamp; + uint64_t lastOosFramePresentationTimestamp; bool receivedOosData; + + RTP_VIDEO_STATS stats; // the above values are short-lived, this tracks stats for the life of the queue } RTP_VIDEO_QUEUE, *PRTP_VIDEO_QUEUE; #define RTPF_RET_QUEUED 0 diff --git a/src/RtspConnection.c b/src/RtspConnection.c index 6f2a1830..d8f1b351 100644 --- a/src/RtspConnection.c +++ b/src/RtspConnection.c @@ -267,19 +267,19 @@ static bool transactRtspMessageEnet(PRTSP_MESSAGE request, PRTSP_MESSAGE respons payloadLength = request->payloadLength; request->payload = NULL; request->payloadLength = 0; - + // Serialize the RTSP message into a message buffer serializedMessage = serializeRtspMessage(request, &messageLen); if (serializedMessage == NULL) { goto Exit; } - + // Create the reliable packet that describes our outgoing message packet = enet_packet_create(serializedMessage, messageLen, ENET_PACKET_FLAG_RELIABLE); if (packet == NULL) { goto Exit; } - + // Send the message if (enet_peer_send(peer, 0, packet) < 0) { enet_packet_destroy(packet); @@ -299,10 +299,10 @@ static bool transactRtspMessageEnet(PRTSP_MESSAGE request, PRTSP_MESSAGE respons enet_packet_destroy(packet); goto Exit; } - + enet_host_flush(client); } - + // Wait for a reply if (serviceEnetHost(client, &event, RTSP_RECEIVE_TIMEOUT_SEC * 1000) <= 0 || event.type != ENET_EVENT_TYPE_RECEIVE) { @@ -343,7 +343,7 @@ static bool transactRtspMessageEnet(PRTSP_MESSAGE request, PRTSP_MESSAGE respons offset += (int) event.packet->dataLength; enet_packet_destroy(event.packet); } - + if (parseRtspMessage(response, responseBuffer, offset) == RTSP_ERROR_SUCCESS) { // Successfully parsed response ret = true; @@ -583,7 +583,7 @@ static bool setupStream(PRTSP_MESSAGE response, char* target, int* error) { else { transportValue = " "; } - + if (addOption(&request, "Transport", transportValue) && addOption(&request, "If-Modified-Since", "Thu, 01 Jan 1970 00:00:00 GMT")) { @@ -992,21 +992,21 @@ int performRtspHandshake(PSERVER_INFORMATION serverInfo) { rtspClientVersion = 14; break; } - + // Setup ENet if required by this GFE version if (useEnet) { ENetAddress address; ENetEvent event; - + enet_address_set_address(&address, (struct sockaddr *)&RemoteAddr, AddrLen); enet_address_set_port(&address, RtspPortNumber); - + // Create a client that can use 1 outgoing connection and 1 channel client = enet_host_create(RemoteAddr.ss_family, NULL, 1, 1, 0, 0); if (client == NULL) { return -1; } - + // Connect to the host peer = enet_host_connect(client, &address, 1, 0); if (peer == NULL) { @@ -1014,7 +1014,7 @@ int performRtspHandshake(PSERVER_INFORMATION serverInfo) { client = NULL; return -1; } - + // Wait for the connect to complete if (serviceEnetHost(client, &event, RTSP_CONNECT_TIMEOUT_SEC * 1000) <= 0 || event.type != ENET_EVENT_TYPE_CONNECT) { @@ -1072,7 +1072,7 @@ int performRtspHandshake(PSERVER_INFORMATION serverInfo) { ret = -1; goto Exit; } - + if ((StreamConfig.supportedVideoFormats & VIDEO_FORMAT_MASK_AV1) && strstr(response.payload, "AV1/90000")) { if ((serverInfo->serverCodecModeSupport & SCM_AV1_HIGH10_444) && (StreamConfig.supportedVideoFormats & VIDEO_FORMAT_AV1_HIGH10_444)) { NegotiatedVideoFormat = VIDEO_FORMAT_AV1_HIGH10_444; @@ -1205,10 +1205,10 @@ int performRtspHandshake(PSERVER_INFORMATION serverInfo) { } // Given there is a non-null session id, get the - // first token of the session until ";", which + // first token of the session until ";", which // resolves any 454 session not found errors on // standard RTSP server implementations. - // (i.e - sessionId = "DEADBEEFCAFE;timeout = 90") + // (i.e - sessionId = "DEADBEEFCAFE;timeout = 90") sessionIdString = strdup(strtok_r(sessionId, ";", &strtokCtx)); if (sessionIdString == NULL) { Limelog("Failed to duplicate session ID string\n"); @@ -1262,7 +1262,7 @@ int performRtspHandshake(PSERVER_INFORMATION serverInfo) { freeMessage(&response); } - + if (AppVersionQuad[0] >= 5) { RTSP_MESSAGE response; int error = -1; @@ -1389,9 +1389,9 @@ int performRtspHandshake(PSERVER_INFORMATION serverInfo) { } } - + ret = 0; - + Exit: // Cleanup the ENet stuff if (useEnet) { @@ -1399,7 +1399,7 @@ int performRtspHandshake(PSERVER_INFORMATION serverInfo) { enet_peer_disconnect_now(peer, 0); peer = NULL; } - + if (client != NULL) { enet_host_destroy(client); client = NULL; diff --git a/src/VideoDepacketizer.c b/src/VideoDepacketizer.c index 33faf735..00478180 100644 --- a/src/VideoDepacketizer.c +++ b/src/VideoDepacketizer.c @@ -17,10 +17,10 @@ static bool decodingFrame; static int frameType; static uint16_t lastPacketPayloadLength; static bool strictIdrFrameWait; -static uint64_t syntheticPtsBase; +static uint64_t syntheticPtsBaseUs; static uint16_t frameHostProcessingLatency; -static uint64_t firstPacketReceiveTime; -static unsigned int firstPacketPresentationTime; +static uint64_t firstPacketReceiveTimeUs; +static uint64_t firstPacketPresentationTime; static bool dropStatePending; static bool idrFrameProcessed; @@ -68,9 +68,9 @@ void initializeVideoDepacketizer(int pktSize) { waitingForRefInvalFrame = false; lastPacketInStream = UINT32_MAX; decodingFrame = false; - syntheticPtsBase = 0; + syntheticPtsBaseUs = 0; frameHostProcessingLatency = 0; - firstPacketReceiveTime = 0; + firstPacketReceiveTimeUs = 0; firstPacketPresentationTime = 0; lastPacketPayloadLength = 0; dropStatePending = false; @@ -483,9 +483,9 @@ static void reassembleFrame(int frameNumber) { qdu->decodeUnit.frameType = frameType; qdu->decodeUnit.frameNumber = frameNumber; qdu->decodeUnit.frameHostProcessingLatency = frameHostProcessingLatency; - qdu->decodeUnit.receiveTimeMs = firstPacketReceiveTime; + qdu->decodeUnit.receiveTimeUs = firstPacketReceiveTimeUs; qdu->decodeUnit.presentationTimeMs = firstPacketPresentationTime; - qdu->decodeUnit.enqueueTimeMs = LiGetMillis(); + qdu->decodeUnit.enqueueTimeUs = PltGetMicroseconds(); // These might be wrong for a few frames during a transition between SDR and HDR, // but the effects shouldn't very noticable since that's an infrequent operation. @@ -710,16 +710,16 @@ static void processAvcHevcRtpPayloadSlow(PBUFFER_DESC currentPos, PLENTRY_INTERN void requestDecoderRefresh(void) { // Wait for the next IDR frame waitingForIdrFrame = true; - + // Flush the decode unit queue freeDecodeUnitList(LbqFlushQueueItems(&decodeUnitQueue)); - + // Request the receive thread drop its state // on the next call. We can't do it here because // it may be trying to queue DUs and we'll nuke // the state out from under it. dropStatePending = true; - + // Request the IDR frame LiRequestIdrFrame(); } @@ -736,7 +736,7 @@ static bool isFirstPacket(uint8_t flags, uint8_t fecBlockNumber) { // Process an RTP Payload // The caller will free *existingEntry unless we NULL it static void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length, - uint64_t receiveTimeMs, unsigned int presentationTimeMs, + uint64_t receiveTimeUs, uint64_t presentationTimeMs, PLENTRY_INTERNAL* existingEntry) { BUFFER_DESC currentPos; uint32_t frameIndex; @@ -764,7 +764,7 @@ static void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length, LC_ASSERT_VT((flags & ~(FLAG_SOF | FLAG_EOF | FLAG_CONTAINS_PIC_DATA)) == 0); streamPacketIndex = videoPacket->streamPacketIndex; - + // Drop packets from a previously corrupt frame if (isBefore32(frameIndex, nextFrameNumber)) { return; @@ -787,10 +787,10 @@ static void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length, } return; } - + // Verify that we didn't receive an incomplete frame LC_ASSERT(firstPacket ^ decodingFrame); - + // Check sequencing of this frame to ensure we didn't // miss one in between if (firstPacket) { @@ -819,16 +819,16 @@ static void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length, // We're now decoding a frame decodingFrame = true; frameType = FRAME_TYPE_PFRAME; - firstPacketReceiveTime = receiveTimeMs; - + firstPacketReceiveTimeUs = receiveTimeUs; + // Some versions of Sunshine don't send a valid PTS, so we will // synthesize one using the receive time as the time base. - if (!syntheticPtsBase) { - syntheticPtsBase = receiveTimeMs; + if (!syntheticPtsBaseUs) { + syntheticPtsBaseUs = receiveTimeUs; } - + if (!presentationTimeMs && frameIndex > 0) { - firstPacketPresentationTime = (unsigned int)(receiveTimeMs - syntheticPtsBase); + firstPacketPresentationTime = (receiveTimeUs - syntheticPtsBaseUs) / 1000; } else { firstPacketPresentationTime = presentationTimeMs; @@ -1150,7 +1150,7 @@ void queueRtpPacket(PRTPV_QUEUE_ENTRY queueEntryPtr) { RTPV_QUEUE_ENTRY queueEntry = *queueEntryPtr; LC_ASSERT(!queueEntry.isParity); - LC_ASSERT(queueEntry.receiveTimeMs != 0); + LC_ASSERT(queueEntry.receiveTimeUs != 0); dataOffset = sizeof(*queueEntry.packet); if (queueEntry.packet->header & FLAG_EXTENSION) { @@ -1169,7 +1169,7 @@ void queueRtpPacket(PRTPV_QUEUE_ENTRY queueEntryPtr) { processRtpPayload((PNV_VIDEO_PACKET)(((char*)queueEntry.packet) + dataOffset), queueEntry.length - dataOffset, - queueEntry.receiveTimeMs, + queueEntry.receiveTimeUs, queueEntry.presentationTimeMs, &existingEntry); diff --git a/src/VideoStream.c b/src/VideoStream.c index a2def84e..402b0566 100644 --- a/src/VideoStream.c +++ b/src/VideoStream.c @@ -154,7 +154,7 @@ static void VideoReceiveThreadProc(void* context) { break; } } - + // Receive timed out; try again continue; } @@ -168,9 +168,7 @@ static void VideoReceiveThreadProc(void* context) { #ifndef LC_FUZZING if (!receivedFullFrame) { - uint64_t now = PltGetMillis(); - - if (now - firstDataTimeMs >= FIRST_FRAME_TIMEOUT_SEC * 1000) { + if (PltGetMillis() - firstDataTimeMs >= FIRST_FRAME_TIMEOUT_SEC * 1000) { Limelog("Terminating connection due to lack of a successful video frame\n"); ListenerCallbacks.connectionTerminated(ML_ERROR_NO_VIDEO_FRAME); break; @@ -286,7 +284,7 @@ void stopVideoStream(void) { // Wake up client code that may be waiting on the decode unit queue stopVideoDepacketizer(); - + PltInterruptThread(&udpPingThread); PltInterruptThread(&receiveThread); if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) { @@ -302,7 +300,7 @@ void stopVideoStream(void) { if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) { PltJoinThread(&decoderThread); } - + if (firstFrameSocket != INVALID_SOCKET) { closeSocket(firstFrameSocket); firstFrameSocket = INVALID_SOCKET; @@ -415,3 +413,7 @@ int startVideoStream(void* rendererContext, int drFlags) { return 0; } + +const RTP_VIDEO_STATS* LiGetRTPVideoStats(void) { + return &rtpQueue.stats; +}