-
Notifications
You must be signed in to change notification settings - Fork 32
/
Copy pathHTTPClientTransport.swift
298 lines (251 loc) · 10.7 KB
/
HTTPClientTransport.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
import Foundation
import Logging
#if canImport(FoundationNetworking)
import FoundationNetworking
#endif
public actor HTTPClientTransport: Actor, Transport {
public let endpoint: URL
private let session: URLSession
public private(set) var sessionID: String?
private let streaming: Bool
private var streamingTask: Task<Void, Never>?
private var lastEventID: String?
public nonisolated let logger: Logger
private var isConnected = false
private let messageStream: AsyncThrowingStream<Data, Swift.Error>
private let messageContinuation: AsyncThrowingStream<Data, Swift.Error>.Continuation
public init(
endpoint: URL,
configuration: URLSessionConfiguration = .default,
streaming: Bool = false,
logger: Logger? = nil
) {
self.init(
endpoint: endpoint,
session: URLSession(configuration: configuration),
streaming: streaming,
logger: logger
)
}
internal init(
endpoint: URL,
session: URLSession,
streaming: Bool = false,
logger: Logger? = nil
) {
self.endpoint = endpoint
self.session = session
self.streaming = streaming
// Create message stream
var continuation: AsyncThrowingStream<Data, Swift.Error>.Continuation!
self.messageStream = AsyncThrowingStream { continuation = $0 }
self.messageContinuation = continuation
self.logger =
logger
?? Logger(
label: "mcp.transport.http.client",
factory: { _ in SwiftLogNoOpLogHandler() }
)
}
/// Establishes connection with the transport
public func connect() async throws {
guard !isConnected else { return }
isConnected = true
if streaming {
// Start listening to server events
streamingTask = Task { await startListeningForServerEvents() }
}
logger.info("HTTP transport connected")
}
/// Disconnects from the transport
public func disconnect() async {
guard isConnected else { return }
isConnected = false
// Cancel streaming task if active
streamingTask?.cancel()
streamingTask = nil
// Cancel any in-progress requests
session.invalidateAndCancel()
// Clean up message stream
messageContinuation.finish()
logger.info("HTTP clienttransport disconnected")
}
/// Sends data through an HTTP POST request
public func send(_ data: Data) async throws {
guard isConnected else {
throw MCPError.internalError("Transport not connected")
}
var request = URLRequest(url: endpoint)
request.httpMethod = "POST"
request.addValue("application/json, text/event-stream", forHTTPHeaderField: "Accept")
request.addValue("application/json", forHTTPHeaderField: "Content-Type")
request.httpBody = data
// Add session ID if available
if let sessionID = sessionID {
request.addValue(sessionID, forHTTPHeaderField: "Mcp-Session-Id")
}
let (responseData, response) = try await session.data(for: request)
guard let httpResponse = response as? HTTPURLResponse else {
throw MCPError.internalError("Invalid HTTP response")
}
// Process the response based on content type and status code
let contentType = httpResponse.value(forHTTPHeaderField: "Content-Type") ?? ""
// Extract session ID if present
if let newSessionID = httpResponse.value(forHTTPHeaderField: "Mcp-Session-Id") {
self.sessionID = newSessionID
logger.debug("Session ID received", metadata: ["sessionID": "\(newSessionID)"])
}
// Handle different response types
switch httpResponse.statusCode {
case 200, 201, 202:
// For SSE, the processing happens in the streaming task
if contentType.contains("text/event-stream") {
logger.debug("Received SSE response, processing in streaming task")
// The streaming is handled by the SSE task if active
return
}
// For JSON responses, deliver the data directly
if contentType.contains("application/json") && !responseData.isEmpty {
logger.debug("Received JSON response", metadata: ["size": "\(responseData.count)"])
messageContinuation.yield(responseData)
}
case 404:
// If we get a 404 with a session ID, it means our session is invalid
if sessionID != nil {
logger.warning("Session has expired")
sessionID = nil
throw MCPError.internalError("Session expired")
}
throw MCPError.internalError("Endpoint not found")
default:
throw MCPError.internalError("HTTP error: \(httpResponse.statusCode)")
}
}
/// Receives data in an async sequence
public func receive() -> AsyncThrowingStream<Data, Swift.Error> {
return messageStream
}
// MARK: - SSE
/// Starts listening for server events using SSE
private func startListeningForServerEvents() async {
guard isConnected else { return }
// Retry loop for connection drops
while isConnected && !Task.isCancelled {
do {
try await connectToEventStream()
} catch {
if !Task.isCancelled {
logger.error("SSE connection error: \(error)")
// Wait before retrying
try? await Task.sleep(nanoseconds: 1_000_000_000) // 1 second
}
}
}
}
#if canImport(FoundationNetworking)
private func connectToEventStream() async throws {
logger.warning("SSE is not supported on this platform")
}
#else
/// Establishes an SSE connection to the server
private func connectToEventStream() async throws {
guard isConnected else { return }
var request = URLRequest(url: endpoint)
request.httpMethod = "GET"
request.addValue("text/event-stream", forHTTPHeaderField: "Accept")
// Add session ID if available
if let sessionID = sessionID {
request.addValue(sessionID, forHTTPHeaderField: "Mcp-Session-Id")
}
// Add Last-Event-ID header for resumability if available
if let lastEventID = lastEventID {
request.addValue(lastEventID, forHTTPHeaderField: "Last-Event-ID")
}
logger.debug("Starting SSE connection")
// Create URLSession task for SSE
let (stream, response) = try await session.bytes(for: request)
guard let httpResponse = response as? HTTPURLResponse else {
throw MCPError.internalError("Invalid HTTP response")
}
// Check response status
guard httpResponse.statusCode == 200 else {
throw MCPError.internalError("HTTP error: \(httpResponse.statusCode)")
}
// Extract session ID if present
if let newSessionID = httpResponse.value(forHTTPHeaderField: "Mcp-Session-Id") {
self.sessionID = newSessionID
}
// Process the SSE stream
var buffer = ""
var eventType = ""
var eventID: String?
var eventData = ""
for try await byte in stream {
if Task.isCancelled { break }
guard let char = String(bytes: [byte], encoding: .utf8) else { continue }
buffer.append(char)
// Process complete lines
while let newlineIndex = buffer.firstIndex(of: "\n") {
let line = buffer[..<newlineIndex]
buffer = String(buffer[buffer.index(after: newlineIndex)...])
// Empty line marks the end of an event
if line.isEmpty || line == "\r" || line == "\n" || line == "\r\n" {
if !eventData.isEmpty {
// Process the event
if eventType == "id" {
lastEventID = eventID
} else {
// Default event type is "message" if not specified
if let data = eventData.data(using: .utf8) {
logger.debug(
"SSE event received",
metadata: [
"type": "\(eventType.isEmpty ? "message" : eventType)",
"id": "\(eventID ?? "none")",
])
messageContinuation.yield(data)
}
}
// Reset for next event
eventType = ""
eventData = ""
}
continue
}
// Lines starting with ":" are comments
if line.hasPrefix(":") { continue }
// Parse field: value format
if let colonIndex = line.firstIndex(of: ":") {
let field = String(line[..<colonIndex])
var value = String(line[line.index(after: colonIndex)...])
// Trim leading space
if value.hasPrefix(" ") {
value = String(value.dropFirst())
}
// Process based on field
switch field {
case "event":
eventType = value
case "data":
if !eventData.isEmpty {
eventData.append("\n")
}
eventData.append(value)
case "id":
if !value.contains("\0") { // ID must not contain NULL
eventID = value
lastEventID = value
}
case "retry":
// Retry timing not implemented
break
default:
// Unknown fields are ignored per SSE spec
break
}
}
}
}
}
#endif
}