Skip to content

Commit

Permalink
[Feature]Report join call telemetry (#612)
Browse files Browse the repository at this point in the history
  • Loading branch information
ipavlidakis authored Dec 3, 2024
1 parent 3dfc2e2 commit 850a602
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 5 deletions.
10 changes: 5 additions & 5 deletions Sources/StreamVideo/WebRTC/v2/SFU/SFUAdapter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -297,18 +297,18 @@ final class SFUAdapter: ConnectionStateDelegate, CustomStringConvertible, @unche
func sendStats(
_ report: CallStatsReport?,
for sessionId: String,
thermalState: ProcessInfo.ThermalState? = nil
thermalState: ProcessInfo.ThermalState? = nil,
telemetry: Stream_Video_Sfu_Signal_Telemetry? = nil
) async throws {
statusCheck()
guard let report else { return }
var statsRequest = Stream_Video_Sfu_Signal_SendStatsRequest()
statsRequest.sessionID = sessionId
statsRequest.sdk = "stream-ios"
statsRequest.sdkVersion = SystemEnvironment.version
statsRequest.webrtcVersion = SystemEnvironment.webRTCVersion
statsRequest.publisherStats = report.publisherRawStats?.jsonString ?? ""
statsRequest.subscriberStats = report.subscriberRawStats?.jsonString ?? ""
statsRequest.publisherStats = report?.publisherRawStats?.jsonString ?? ""
statsRequest.subscriberStats = report?.subscriberRawStats?.jsonString ?? ""
statsRequest.deviceState = .init(thermalState)
statsRequest.telemetry = telemetry ?? .init()

let task = Task { [statsRequest, signalService] in
try Task.checkCancellation()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ extension WebRTCCoordinator.StateMachine.Stage {
WebRTCCoordinator.StateMachine.Stage,
@unchecked Sendable
{
private enum FlowType { case regular, fast, rejoin, migrate }
private let disposableBag = DisposableBag()
private let startTime = Date()
private var flowType = FlowType.regular

/// Initializes a new instance of `JoiningStage`.
/// - Parameter context: The context for the joining stage.
Expand All @@ -50,15 +53,19 @@ extension WebRTCCoordinator.StateMachine.Stage {
) -> Self? {
switch previousStage.id {
case .connected where context.isRejoiningFromSessionID != nil:
flowType = .rejoin
executeRejoining()
return self
case .connected:
flowType = .regular
execute(isFastReconnecting: false)
return self
case .fastReconnected:
flowType = .fast
execute(isFastReconnecting: true)
return self
case .migrated:
flowType = .migrate
executeMigration()
return self
default:
Expand Down Expand Up @@ -334,6 +341,64 @@ extension WebRTCCoordinator.StateMachine.Stage {
try Task.checkCancellation()

try await coordinator.stateAdapter.restoreScreenSharing()

try Task.checkCancellation()

reportTelemetry(
sessionId: await coordinator.stateAdapter.sessionID,
sfuAdapter: sfuAdapter
)
}

/// Reports telemetry data to the SFU (Selective Forwarding Unit) to monitor and analyze the
/// connection lifecycle.
///
/// This method collects relevant metrics based on the flow type of the connection, such as
/// connection time or reconnection details, and sends them to the SFU for logging and diagnostics.
/// The telemetry data provides insights into the connection's performance and the strategies used
/// during rejoining, fast reconnecting, or migration.
///
/// The reported data includes:
/// - Connection time in seconds for a regular flow.
/// - Reconnection strategies (e.g., fast reconnect, rejoin, or migration) and their duration.
private func reportTelemetry(
sessionId: String,
sfuAdapter: SFUAdapter
) {
Task {
var telemetry = Stream_Video_Sfu_Signal_Telemetry()
let duration = Float(Date().timeIntervalSince(startTime))
var reconnection = Stream_Video_Sfu_Signal_Reconnection()
reconnection.timeSeconds = duration

telemetry.data = {
switch flowType {
case .regular:
return .connectionTimeSeconds(duration)
case .fast:
var reconnection = Stream_Video_Sfu_Signal_Reconnection()
reconnection.strategy = .fast
return .reconnection(reconnection)
case .rejoin:
reconnection.strategy = .rejoin
return .reconnection(reconnection)
case .migrate:
reconnection.strategy = .migrate
return .reconnection(reconnection)
}
}()

do {
try await sfuAdapter.sendStats(
nil,
for: sessionId,
telemetry: telemetry
)
log.debug("Join call completed in \(duration) seconds.")
} catch {
log.error(error)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,41 @@ final class WebRTCCoordinatorStateMachine_JoiningStageTests: XCTestCase, @unchec
cancellable.cancel()
}

func test_transition_fromConnectedSFUConnected_reportsTelemetry() async throws {
subject.context.coordinator = mockCoordinatorStack.coordinator
subject.context.reconnectAttempts = 11
await mockCoordinatorStack
.coordinator
.stateAdapter
.set(sfuAdapter: mockCoordinatorStack.sfuStack.adapter)
mockCoordinatorStack.webRTCAuthenticator.stubbedFunction[.waitForConnect] = Result<Void, Error>.success(())
let cancellable = receiveEvent(
.sfuEvent(.joinResponse(Stream_Video_Sfu_Event_JoinResponse())),
every: 0.3
)

try await assertTransition(
from: .connected,
expectedTarget: .joined,
subject: subject
) { _ in }

cancellable.cancel()

let mockSignalService = try XCTUnwrap(mockCoordinatorStack?.sfuStack.service)
await fulfillment { mockSignalService.sendStatsWasCalledWithRequest?.telemetry != nil }
let telemetry = try XCTUnwrap(mockSignalService.sendStatsWasCalledWithRequest?.telemetry)

switch telemetry.data {
case .connectionTimeSeconds:
XCTAssertTrue(true)
case .reconnection:
XCTFail()
case .none:
XCTFail()
}
}

// MARK: - transition from connected with isRejoiningFromSessionID != nil

func test_transition_fromConnectedWithRejoinWithoutCoordinator_transitionsToDisconnected() async throws {
Expand Down Expand Up @@ -693,6 +728,41 @@ final class WebRTCCoordinatorStateMachine_JoiningStageTests: XCTestCase, @unchec
cancellable.cancel()
}

func test_transition_fromConnectedWithRejoinSFUConnected_reportsTelemetry() async throws {
subject.context.coordinator = mockCoordinatorStack.coordinator
subject.context.isRejoiningFromSessionID = .unique
await mockCoordinatorStack
.coordinator
.stateAdapter
.set(sfuAdapter: mockCoordinatorStack.sfuStack.adapter)
mockCoordinatorStack.webRTCAuthenticator.stubbedFunction[.waitForConnect] = Result<Void, Error>.success(())
let cancellable = receiveEvent(
.sfuEvent(.joinResponse(Stream_Video_Sfu_Event_JoinResponse())),
every: 0.3
)

try await assertTransition(
from: .connected,
expectedTarget: .joined,
subject: subject
) { _ in }

cancellable.cancel()

let mockSignalService = try XCTUnwrap(mockCoordinatorStack?.sfuStack.service)
await fulfillment { mockSignalService.sendStatsWasCalledWithRequest?.telemetry != nil }
let telemetry = try XCTUnwrap(mockSignalService.sendStatsWasCalledWithRequest?.telemetry)

switch telemetry.data {
case .connectionTimeSeconds:
XCTFail()
case let .reconnection(reconnection):
XCTAssertEqual(reconnection.strategy, .rejoin)
case .none:
XCTFail()
}
}

// MARK: - transition from fastReconnected

func test_transition_fromFastReconnected_doeNotConfigurePeerConnections() async throws {
Expand Down Expand Up @@ -857,6 +927,41 @@ final class WebRTCCoordinatorStateMachine_JoiningStageTests: XCTestCase, @unchec
cancellable.cancel()
}

func test_transition_fromFastReconnectedWithSFUConnected_reportsTelemetry() async throws {
subject.context.coordinator = mockCoordinatorStack.coordinator
subject.context.reconnectAttempts = 11
await mockCoordinatorStack
.coordinator
.stateAdapter
.set(sfuAdapter: mockCoordinatorStack.sfuStack.adapter)
mockCoordinatorStack.webRTCAuthenticator.stubbedFunction[.waitForConnect] = Result<Void, Error>.success(())
let cancellable = receiveEvent(
.sfuEvent(.joinResponse(Stream_Video_Sfu_Event_JoinResponse())),
every: 0.3
)

try await assertTransition(
from: .fastReconnected,
expectedTarget: .joined,
subject: subject
) { _ in }

cancellable.cancel()

let mockSignalService = try XCTUnwrap(mockCoordinatorStack?.sfuStack.service)
await fulfillment { mockSignalService.sendStatsWasCalledWithRequest?.telemetry != nil }
let telemetry = try XCTUnwrap(mockSignalService.sendStatsWasCalledWithRequest?.telemetry)

switch telemetry.data {
case .connectionTimeSeconds:
XCTFail()
case let .reconnection(reconnection):
XCTAssertEqual(reconnection.strategy, .fast)
case .none:
XCTFail()
}
}

// MARK: - transition from migrated

func test_transition_fromMigratedWithoutCoordinator_updatesReconnectionStategy() async throws {
Expand Down Expand Up @@ -1199,6 +1304,42 @@ final class WebRTCCoordinatorStateMachine_JoiningStageTests: XCTestCase, @unchec
cancellable.cancel()
}

func test_transition_fromMigratedWithSFUConnected_reportsTelemetry() async throws {
subject.context.coordinator = mockCoordinatorStack.coordinator
subject.context.reconnectAttempts = 11
subject.context.migratingFromSFU = "test-sfu"
await mockCoordinatorStack
.coordinator
.stateAdapter
.set(sfuAdapter: mockCoordinatorStack.sfuStack.adapter)
mockCoordinatorStack.webRTCAuthenticator.stubbedFunction[.waitForConnect] = Result<Void, Error>.success(())
let cancellable = receiveEvent(
.sfuEvent(.joinResponse(Stream_Video_Sfu_Event_JoinResponse())),
every: 0.3
)

try await assertTransition(
from: .migrated,
expectedTarget: .joined,
subject: subject
) { _ in }

cancellable.cancel()

let mockSignalService = try XCTUnwrap(mockCoordinatorStack?.sfuStack.service)
await fulfillment { mockSignalService.sendStatsWasCalledWithRequest?.telemetry != nil }
let telemetry = try XCTUnwrap(mockSignalService.sendStatsWasCalledWithRequest?.telemetry)

switch telemetry.data {
case .connectionTimeSeconds:
XCTFail()
case let .reconnection(reconnection):
XCTAssertEqual(reconnection.strategy, .migrate)
case .none:
XCTFail()
}
}

// MARK: - Private helpers

private func assertTransition(
Expand Down

0 comments on commit 850a602

Please sign in to comment.