@@ -50,7 +50,7 @@ public class WebSocketTransport: @unchecked Sendable {
50
50
private var queue : [ Int : String ] = [ : ]
51
51
52
52
@Atomic
53
- private var subscribers = [ String : ( Result < JSONObject , any Error > ) -> Void ] ( )
53
+ private var subscribers = [ String : ( Result < JSONObject , any Error > ) async -> Void ] ( )
54
54
@Atomic
55
55
private var subscriptions : [ String : String ] = [ : ]
56
56
let processingQueue = DispatchQueue ( label: " com.apollographql.WebSocketTransport " )
@@ -187,8 +187,8 @@ public class WebSocketTransport: @unchecked Sendable {
187
187
return websocket. write ( ping: data, completion: completionHandler)
188
188
}
189
189
190
- private func processMessage( text: String ) {
191
- OperationMessage ( serialized: text) . parse { parseHandler in
190
+ private func processMessage( text: String ) async {
191
+ await OperationMessage ( serialized: text) . parse { parseHandler in
192
192
guard
193
193
let type = parseHandler. type,
194
194
let messageType = OperationMessage . Types ( rawValue: type) else {
@@ -215,14 +215,14 @@ public class WebSocketTransport: @unchecked Sendable {
215
215
// subscriber probably unsubscribed.
216
216
if let responseHandler = subscribers [ id] {
217
217
if let payload = parseHandler. payload {
218
- responseHandler ( . success( payload) )
218
+ await responseHandler ( . success( payload) )
219
219
} else if let error = parseHandler. error {
220
- responseHandler ( . failure( error) )
220
+ await responseHandler ( . failure( error) )
221
221
} else {
222
222
let websocketError = WebSocketError ( payload: parseHandler. payload,
223
223
error: parseHandler. error,
224
224
kind: . neitherErrorNorPayloadReceived)
225
- responseHandler ( . failure( websocketError) )
225
+ await responseHandler ( . failure( websocketError) )
226
226
}
227
227
}
228
228
case . complete:
@@ -266,8 +266,10 @@ public class WebSocketTransport: @unchecked Sendable {
266
266
}
267
267
268
268
private func notifyErrorAllHandlers( _ error: any Error ) {
269
- for (_, handler) in subscribers {
270
- handler ( . failure( error) )
269
+ Task {
270
+ for (_, handler) in subscribers {
271
+ await handler ( . failure( error) )
272
+ }
271
273
}
272
274
}
273
275
@@ -335,10 +337,13 @@ public class WebSocketTransport: @unchecked Sendable {
335
337
self . websocket. delegate = nil
336
338
}
337
339
338
- func sendHelper< Operation: GraphQLOperation > ( operation: Operation , resultHandler: @escaping @Sendable ( _ result: Result < JSONObject , any Error > ) -> Void ) -> String ? {
339
- let body = config. requestBodyCreator. requestBody ( for: operation,
340
- sendQueryDocument: true ,
341
- autoPersistQuery: false )
340
+ func sendHelper< Operation: GraphQLOperation > ( operation: Operation , resultHandler: @escaping @Sendable ( _ result: Result < JSONObject , any Error > ) async -> Void ) -> String ? {
341
+ let body = config. requestBodyCreator. requestBody (
342
+ for: operation,
343
+ sendQueryDocument: true ,
344
+ autoPersistQuery: false ,
345
+ clientAwarenessMetadata: config. clientAwarenessMetadata
346
+ )
342
347
let identifier = config. operationMessageIdCreator. requestId ( )
343
348
344
349
let messageType : OperationMessage . Types
@@ -460,36 +465,40 @@ extension WebSocketTransport: NetworkTransport {
460
465
461
466
#warning("TODO: stream never finishes. WebSocketTask does not report subscription termination.")
462
467
return AsyncThrowingStream { continuation in
463
- let task = WebSocketTask ( self , operation) { [ weak store, contextIdentifier] result in
464
- Task {
468
+ let wsTask = WebSocketTask ( self , operation) { [ weak store, contextIdentifier] result in
469
+ do {
465
470
try Task . checkCancellation ( )
466
-
471
+
467
472
let jsonBody = try result. get ( )
468
473
let response = GraphQLResponse ( operation: operation, body: jsonBody)
469
-
474
+
470
475
if let store = store {
471
476
let ( graphQLResult, parsedRecords) = try await response. parseResult ( )
472
-
477
+
473
478
try Task . checkCancellation ( )
474
-
479
+
475
480
guard let records = parsedRecords else {
476
481
continuation. yield ( graphQLResult)
477
482
return
478
483
}
479
-
484
+
480
485
try await store. publish ( records: records, identifier: contextIdentifier)
481
486
continuation. yield ( graphQLResult)
482
-
487
+
483
488
} else {
484
489
let graphQLResult = try await response. parseResultFast ( )
485
490
try Task . checkCancellation ( )
486
-
491
+
487
492
continuation. yield ( graphQLResult)
488
493
}
494
+ } catch {
495
+ continuation. finish ( throwing: error)
489
496
}
490
- }
497
+ }
491
498
492
- continuation. onTermination = { _ in task. cancel ( ) }
499
+ continuation. onTermination = {
500
+ _ in wsTask. cancel ( )
501
+ }
493
502
}
494
503
}
495
504
}
@@ -592,7 +601,9 @@ extension WebSocketTransport: WebSocketClientDelegate {
592
601
}
593
602
594
603
public func websocketDidReceiveMessage( socket: any WebSocketClient , text: String ) {
595
- self . processMessage ( text: text)
604
+ Task {
605
+ await self . processMessage ( text: text)
606
+ }
596
607
}
597
608
598
609
public func websocketDidReceiveData( socket: any WebSocketClient , data: Data ) {
0 commit comments