Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature]Report join call telemetry #612

Merged
merged 3 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions Sources/StreamVideo/WebRTC/v2/SFU/SFUAdapter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -297,18 +297,18 @@ final class SFUAdapter: ConnectionStateDelegate, CustomStringConvertible, @unche
func sendStats(
_ report: CallStatsReport?,
for sessionId: String,
thermalState: ProcessInfo.ThermalState? = nil
thermalState: ProcessInfo.ThermalState? = nil,
telemetry: Stream_Video_Sfu_Signal_Telemetry? = nil
) async throws {
statusCheck()
guard let report else { return }
var statsRequest = Stream_Video_Sfu_Signal_SendStatsRequest()
statsRequest.sessionID = sessionId
statsRequest.sdk = "stream-ios"
statsRequest.sdkVersion = SystemEnvironment.version
statsRequest.webrtcVersion = SystemEnvironment.webRTCVersion
statsRequest.publisherStats = report.publisherRawStats?.jsonString ?? ""
statsRequest.subscriberStats = report.subscriberRawStats?.jsonString ?? ""
statsRequest.publisherStats = report?.publisherRawStats?.jsonString ?? ""
statsRequest.subscriberStats = report?.subscriberRawStats?.jsonString ?? ""
statsRequest.deviceState = .init(thermalState)
statsRequest.telemetry = telemetry ?? .init()

let task = Task { [statsRequest, signalService] in
try Task.checkCancellation()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ extension WebRTCCoordinator.StateMachine.Stage {
WebRTCCoordinator.StateMachine.Stage,
@unchecked Sendable
{
private enum FlowType { case regular, fast, rejoin, migrate }
private let disposableBag = DisposableBag()
private let startTime = Date()
private var flowType = FlowType.regular

/// Initializes a new instance of `JoiningStage`.
/// - Parameter context: The context for the joining stage.
Expand All @@ -50,15 +53,19 @@ extension WebRTCCoordinator.StateMachine.Stage {
) -> Self? {
switch previousStage.id {
case .connected where context.isRejoiningFromSessionID != nil:
flowType = .rejoin
executeRejoining()
return self
case .connected:
flowType = .regular
execute(isFastReconnecting: false)
return self
case .fastReconnected:
flowType = .fast
execute(isFastReconnecting: true)
return self
case .migrated:
flowType = .migrate
executeMigration()
return self
default:
Expand Down Expand Up @@ -334,6 +341,64 @@ extension WebRTCCoordinator.StateMachine.Stage {
try Task.checkCancellation()

try await coordinator.stateAdapter.restoreScreenSharing()

try Task.checkCancellation()

reportTelemetry(
sessionId: await coordinator.stateAdapter.sessionID,
sfuAdapter: sfuAdapter
)
}

/// Reports telemetry data to the SFU (Selective Forwarding Unit) to monitor and analyze the
/// connection lifecycle.
///
/// This method collects relevant metrics based on the flow type of the connection, such as
/// connection time or reconnection details, and sends them to the SFU for logging and diagnostics.
/// The telemetry data provides insights into the connection's performance and the strategies used
/// during rejoining, fast reconnecting, or migration.
///
/// The reported data includes:
/// - Connection time in seconds for a regular flow.
/// - Reconnection strategies (e.g., fast reconnect, rejoin, or migration) and their duration.
private func reportTelemetry(
sessionId: String,
sfuAdapter: SFUAdapter
) {
Task {
var telemetry = Stream_Video_Sfu_Signal_Telemetry()
let duration = Float(Date().timeIntervalSince(startTime))
var reconnection = Stream_Video_Sfu_Signal_Reconnection()
reconnection.timeSeconds = duration

telemetry.data = {
switch flowType {
case .regular:
return .connectionTimeSeconds(duration)
case .fast:
var reconnection = Stream_Video_Sfu_Signal_Reconnection()
reconnection.strategy = .fast
return .reconnection(reconnection)
case .rejoin:
reconnection.strategy = .rejoin
return .reconnection(reconnection)
case .migrate:
reconnection.strategy = .migrate
return .reconnection(reconnection)
}
}()

do {
try await sfuAdapter.sendStats(
nil,
for: sessionId,
telemetry: telemetry
)
log.debug("Join call completed in \(duration) seconds.")
} catch {
log.error(error)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,41 @@ final class WebRTCCoordinatorStateMachine_JoiningStageTests: XCTestCase, @unchec
cancellable.cancel()
}

func test_transition_fromConnectedSFUConnected_reportsTelemetry() async throws {
subject.context.coordinator = mockCoordinatorStack.coordinator
subject.context.reconnectAttempts = 11
await mockCoordinatorStack
.coordinator
.stateAdapter
.set(sfuAdapter: mockCoordinatorStack.sfuStack.adapter)
mockCoordinatorStack.webRTCAuthenticator.stubbedFunction[.waitForConnect] = Result<Void, Error>.success(())
let cancellable = receiveEvent(
.sfuEvent(.joinResponse(Stream_Video_Sfu_Event_JoinResponse())),
every: 0.3
)

try await assertTransition(
from: .connected,
expectedTarget: .joined,
subject: subject
) { _ in }

cancellable.cancel()

let mockSignalService = try XCTUnwrap(mockCoordinatorStack?.sfuStack.service)
await fulfillment { mockSignalService.sendStatsWasCalledWithRequest?.telemetry != nil }
let telemetry = try XCTUnwrap(mockSignalService.sendStatsWasCalledWithRequest?.telemetry)

switch telemetry.data {
case .connectionTimeSeconds:
XCTAssertTrue(true)
case .reconnection:
XCTFail()
case .none:
XCTFail()
}
}

// MARK: - transition from connected with isRejoiningFromSessionID != nil

func test_transition_fromConnectedWithRejoinWithoutCoordinator_transitionsToDisconnected() async throws {
Expand Down Expand Up @@ -693,6 +728,41 @@ final class WebRTCCoordinatorStateMachine_JoiningStageTests: XCTestCase, @unchec
cancellable.cancel()
}

func test_transition_fromConnectedWithRejoinSFUConnected_reportsTelemetry() async throws {
subject.context.coordinator = mockCoordinatorStack.coordinator
subject.context.isRejoiningFromSessionID = .unique
await mockCoordinatorStack
.coordinator
.stateAdapter
.set(sfuAdapter: mockCoordinatorStack.sfuStack.adapter)
mockCoordinatorStack.webRTCAuthenticator.stubbedFunction[.waitForConnect] = Result<Void, Error>.success(())
let cancellable = receiveEvent(
.sfuEvent(.joinResponse(Stream_Video_Sfu_Event_JoinResponse())),
every: 0.3
)

try await assertTransition(
from: .connected,
expectedTarget: .joined,
subject: subject
) { _ in }

cancellable.cancel()

let mockSignalService = try XCTUnwrap(mockCoordinatorStack?.sfuStack.service)
await fulfillment { mockSignalService.sendStatsWasCalledWithRequest?.telemetry != nil }
let telemetry = try XCTUnwrap(mockSignalService.sendStatsWasCalledWithRequest?.telemetry)

switch telemetry.data {
case .connectionTimeSeconds:
XCTFail()
case let .reconnection(reconnection):
XCTAssertEqual(reconnection.strategy, .rejoin)
case .none:
XCTFail()
}
}

// MARK: - transition from fastReconnected

func test_transition_fromFastReconnected_doeNotConfigurePeerConnections() async throws {
Expand Down Expand Up @@ -857,6 +927,41 @@ final class WebRTCCoordinatorStateMachine_JoiningStageTests: XCTestCase, @unchec
cancellable.cancel()
}

func test_transition_fromFastReconnectedWithSFUConnected_reportsTelemetry() async throws {
subject.context.coordinator = mockCoordinatorStack.coordinator
subject.context.reconnectAttempts = 11
await mockCoordinatorStack
.coordinator
.stateAdapter
.set(sfuAdapter: mockCoordinatorStack.sfuStack.adapter)
mockCoordinatorStack.webRTCAuthenticator.stubbedFunction[.waitForConnect] = Result<Void, Error>.success(())
let cancellable = receiveEvent(
.sfuEvent(.joinResponse(Stream_Video_Sfu_Event_JoinResponse())),
every: 0.3
)

try await assertTransition(
from: .fastReconnected,
expectedTarget: .joined,
subject: subject
) { _ in }

cancellable.cancel()

let mockSignalService = try XCTUnwrap(mockCoordinatorStack?.sfuStack.service)
await fulfillment { mockSignalService.sendStatsWasCalledWithRequest?.telemetry != nil }
let telemetry = try XCTUnwrap(mockSignalService.sendStatsWasCalledWithRequest?.telemetry)

switch telemetry.data {
case .connectionTimeSeconds:
XCTFail()
case let .reconnection(reconnection):
XCTAssertEqual(reconnection.strategy, .fast)
case .none:
XCTFail()
}
}

// MARK: - transition from migrated

func test_transition_fromMigratedWithoutCoordinator_updatesReconnectionStategy() async throws {
Expand Down Expand Up @@ -1199,6 +1304,42 @@ final class WebRTCCoordinatorStateMachine_JoiningStageTests: XCTestCase, @unchec
cancellable.cancel()
}

func test_transition_fromMigratedWithSFUConnected_reportsTelemetry() async throws {
subject.context.coordinator = mockCoordinatorStack.coordinator
subject.context.reconnectAttempts = 11
subject.context.migratingFromSFU = "test-sfu"
await mockCoordinatorStack
.coordinator
.stateAdapter
.set(sfuAdapter: mockCoordinatorStack.sfuStack.adapter)
mockCoordinatorStack.webRTCAuthenticator.stubbedFunction[.waitForConnect] = Result<Void, Error>.success(())
let cancellable = receiveEvent(
.sfuEvent(.joinResponse(Stream_Video_Sfu_Event_JoinResponse())),
every: 0.3
)

try await assertTransition(
from: .migrated,
expectedTarget: .joined,
subject: subject
) { _ in }

cancellable.cancel()

let mockSignalService = try XCTUnwrap(mockCoordinatorStack?.sfuStack.service)
await fulfillment { mockSignalService.sendStatsWasCalledWithRequest?.telemetry != nil }
let telemetry = try XCTUnwrap(mockSignalService.sendStatsWasCalledWithRequest?.telemetry)

switch telemetry.data {
case .connectionTimeSeconds:
XCTFail()
case let .reconnection(reconnection):
XCTAssertEqual(reconnection.strategy, .migrate)
case .none:
XCTFail()
}
}

// MARK: - Private helpers

private func assertTransition(
Expand Down
Loading