@@ -2,13 +2,26 @@ import Foundation
2
2
import GRPC
3
3
import NIO
4
4
import os
5
+ import Semaphore
5
6
import Subprocess
7
+ import SwiftUI
6
8
7
9
@MainActor
8
10
public protocol FileSyncDaemon : ObservableObject {
9
11
var state : DaemonState { get }
10
- func start( ) async
12
+ func start( ) async throws ( DaemonError )
11
13
func stop( ) async
14
+ func listSessions( ) async throws -> [ FileSyncSession ]
15
+ func createSession( with: FileSyncSession ) async throws
16
+ }
17
+
18
+ public struct FileSyncSession {
19
+ public let id : String
20
+ public let name : String
21
+ public let localPath : URL
22
+ public let workspace : String
23
+ public let agent : String
24
+ public let remotePath : URL
12
25
}
13
26
14
27
@MainActor
@@ -17,7 +30,14 @@ public class MutagenDaemon: FileSyncDaemon {
17
30
18
31
@Published public var state : DaemonState = . stopped {
19
32
didSet {
20
- logger. info ( " daemon state changed: \( self . state. description, privacy: . public) " )
33
+ logger. info ( " daemon state set: \( self . state. description, privacy: . public) " )
34
+ if case . failed = state {
35
+ Task {
36
+ try ? await cleanupGRPC ( )
37
+ }
38
+ mutagenProcess? . kill ( )
39
+ mutagenProcess = nil
40
+ }
21
41
}
22
42
}
23
43
@@ -26,46 +46,61 @@ public class MutagenDaemon: FileSyncDaemon {
26
46
private let mutagenDataDirectory : URL
27
47
private let mutagenDaemonSocket : URL
28
48
49
+ // Non-nil when the daemon is running
29
50
private var group : MultiThreadedEventLoopGroup ?
30
51
private var channel : GRPCChannel ?
31
- private var client : Daemon_DaemonAsyncClient ?
32
-
33
- public init ( ) {
34
- #if arch(arm64)
35
- mutagenPath = Bundle . main. url ( forResource: " mutagen-darwin-arm64 " , withExtension: nil )
36
- #elseif arch(x86_64)
37
- mutagenPath = Bundle . main. url ( forResource: " mutagen-darwin-amd64 " , withExtension: nil )
38
- #else
39
- fatalError ( " unknown architecture " )
40
- #endif
41
- mutagenDataDirectory = FileManager . default. urls (
42
- for: . applicationSupportDirectory,
43
- in: . userDomainMask
44
- ) . first!. appending ( path: " Coder Desktop " ) . appending ( path: " Mutagen " )
52
+ private var client : DaemonClient ?
53
+
54
+ // Protect start & stop transitions against re-entrancy
55
+ private let transition = AsyncSemaphore ( value: 1 )
56
+
57
+ public init ( mutagenPath: URL ? = nil ,
58
+ mutagenDataDirectory: URL = FileManager . default. urls (
59
+ for: . applicationSupportDirectory,
60
+ in: . userDomainMask
61
+ ) . first!. appending ( path: " Coder Desktop " ) . appending ( path: " Mutagen " ) )
62
+ {
63
+ self . mutagenPath = mutagenPath
64
+ self . mutagenDataDirectory = mutagenDataDirectory
45
65
mutagenDaemonSocket = mutagenDataDirectory. appending ( path: " daemon " ) . appending ( path: " daemon.sock " )
46
66
// It shouldn't be fatal if the app was built without Mutagen embedded,
47
67
// but file sync will be unavailable.
48
68
if mutagenPath == nil {
49
69
logger. warning ( " Mutagen not embedded in app, file sync will be unavailable " )
50
70
state = . unavailable
71
+ return
72
+ }
73
+
74
+ // If there are sync sessions, the daemon should be running
75
+ Task {
76
+ do throws ( DaemonError) {
77
+ try await start ( )
78
+ } catch {
79
+ state = . failed( error)
80
+ return
81
+ }
82
+ await stopIfNoSessions ( )
51
83
}
52
84
}
53
85
54
- public func start( ) async {
86
+ public func start( ) async throws ( DaemonError ) {
55
87
if case . unavailable = state { return }
56
88
57
89
// Stop an orphaned daemon, if there is one
58
90
try ? await connect ( )
59
91
await stop ( )
60
92
93
+ await transition. wait ( )
94
+ defer { transition. signal ( ) }
95
+ logger. info ( " starting mutagen daemon " )
96
+
61
97
mutagenProcess = createMutagenProcess ( )
62
98
// swiftlint:disable:next large_tuple
63
99
let ( standardOutput, standardError, waitForExit) : ( Pipe . AsyncBytes , Pipe . AsyncBytes , @Sendable ( ) async -> Void )
64
100
do {
65
101
( standardOutput, standardError, waitForExit) = try mutagenProcess!. run ( )
66
102
} catch {
67
- state = . failed( DaemonError . daemonStartFailure ( error) )
68
- return
103
+ throw . daemonStartFailure( error)
69
104
}
70
105
71
106
Task {
@@ -85,10 +120,11 @@ public class MutagenDaemon: FileSyncDaemon {
85
120
do {
86
121
try await connect ( )
87
122
} catch {
88
- state = . failed( DaemonError . daemonStartFailure ( error) )
89
- return
123
+ throw . daemonStartFailure( error)
90
124
}
91
125
126
+ try await waitForDaemonStart ( )
127
+
92
128
state = . running
93
129
logger. info (
94
130
"""
@@ -98,6 +134,34 @@ public class MutagenDaemon: FileSyncDaemon {
98
134
)
99
135
}
100
136
137
+ // The daemon takes a moment to open the socket, and we don't want to hog the main actor
138
+ // so poll for it on a background thread
139
+ private func waitForDaemonStart(
140
+ maxAttempts: Int = 5 ,
141
+ attemptInterval: Duration = . milliseconds( 100 )
142
+ ) async throws ( DaemonError) {
143
+ do {
144
+ try await Task . detached ( priority: . background) {
145
+ for attempt in 0 ... maxAttempts {
146
+ do {
147
+ _ = try await self . client!. mgmt. version (
148
+ Daemon_VersionRequest ( ) ,
149
+ callOptions: . init( timeLimit: . timeout( . milliseconds( 500 ) ) )
150
+ )
151
+ return
152
+ } catch {
153
+ if attempt == maxAttempts {
154
+ throw error
155
+ }
156
+ try ? await Task . sleep ( for: attemptInterval)
157
+ }
158
+ }
159
+ } . value
160
+ } catch {
161
+ throw . daemonStartFailure( error)
162
+ }
163
+ }
164
+
101
165
private func connect( ) async throws ( DaemonError) {
102
166
guard client == nil else {
103
167
// Already connected
@@ -110,14 +174,17 @@ public class MutagenDaemon: FileSyncDaemon {
110
174
transportSecurity: . plaintext,
111
175
eventLoopGroup: group!
112
176
)
113
- client = Daemon_DaemonAsyncClient ( channel: channel!)
177
+ client = DaemonClient (
178
+ mgmt: Daemon_DaemonAsyncClient ( channel: channel!) ,
179
+ sync: Synchronization_SynchronizationAsyncClient ( channel: channel!)
180
+ )
114
181
logger. info (
115
182
" Successfully connected to mutagen daemon, socket: \( self . mutagenDaemonSocket. path, privacy: . public) "
116
183
)
117
184
} catch {
118
185
logger. error ( " Failed to connect to gRPC: \( error) " )
119
186
try ? await cleanupGRPC ( )
120
- throw DaemonError . connectionFailure ( error)
187
+ throw . connectionFailure( error)
121
188
}
122
189
}
123
190
@@ -132,6 +199,10 @@ public class MutagenDaemon: FileSyncDaemon {
132
199
133
200
public func stop( ) async {
134
201
if case . unavailable = state { return }
202
+ await transition. wait ( )
203
+ defer { transition. signal ( ) }
204
+ logger. info ( " stopping mutagen daemon " )
205
+
135
206
state = . stopped
136
207
guard FileManager . default. fileExists ( atPath: mutagenDaemonSocket. path) else {
137
208
// Already stopped
@@ -140,7 +211,7 @@ public class MutagenDaemon: FileSyncDaemon {
140
211
141
212
// "We don't check the response or error, because the daemon
142
213
// may terminate before it has a chance to send the response."
143
- _ = try ? await client? . terminate (
214
+ _ = try ? await client? . mgmt . terminate (
144
215
Daemon_TerminateRequest ( ) ,
145
216
callOptions: . init( timeLimit: . timeout( . milliseconds( 500 ) ) )
146
217
)
@@ -175,6 +246,7 @@ public class MutagenDaemon: FileSyncDaemon {
175
246
"""
176
247
)
177
248
state = . failed( . terminatedUnexpectedly)
249
+ return
178
250
}
179
251
}
180
252
@@ -183,6 +255,55 @@ public class MutagenDaemon: FileSyncDaemon {
183
255
logger. info ( " \( line, privacy: . public) " )
184
256
}
185
257
}
258
+
259
+ public func listSessions( ) async throws -> [ FileSyncSession ] {
260
+ guard case . running = state else {
261
+ return [ ]
262
+ }
263
+ // TODO: Implement
264
+ return [ ]
265
+ }
266
+
267
+ public func createSession( with _: FileSyncSession ) async throws {
268
+ if case . stopped = state {
269
+ do throws ( DaemonError) {
270
+ try await start ( )
271
+ } catch {
272
+ state = . failed( error)
273
+ return
274
+ }
275
+ }
276
+ // TODO: Add Session
277
+ }
278
+
279
+ public func deleteSession( ) async throws {
280
+ // TODO: Delete session
281
+ await stopIfNoSessions ( )
282
+ }
283
+
284
+ private func stopIfNoSessions( ) async {
285
+ let sessions : Synchronization_ListResponse
286
+ do {
287
+ sessions = try await client!. sync. list ( Synchronization_ListRequest . with { req in
288
+ req. selection = . with { selection in
289
+ selection. all = true
290
+ }
291
+ } )
292
+ } catch {
293
+ state = . failed( . daemonStartFailure( error) )
294
+ return
295
+ }
296
+ // If there's no configured sessions, the daemon doesn't need to be running
297
+ if sessions. sessionStates. isEmpty {
298
+ logger. info ( " No sync sessions found " )
299
+ await stop ( )
300
+ }
301
+ }
302
+ }
303
+
304
+ struct DaemonClient {
305
+ let mgmt : Daemon_DaemonAsyncClient
306
+ let sync : Synchronization_SynchronizationAsyncClient
186
307
}
187
308
188
309
public enum DaemonState {
@@ -191,7 +312,7 @@ public enum DaemonState {
191
312
case failed( DaemonError )
192
313
case unavailable
193
314
194
- var description : String {
315
+ public var description : String {
195
316
switch self {
196
317
case . running:
197
318
" Running "
@@ -203,12 +324,27 @@ public enum DaemonState {
203
324
" Unavailable "
204
325
}
205
326
}
327
+
328
+ public var color : Color {
329
+ switch self {
330
+ case . running:
331
+ . green
332
+ case . stopped:
333
+ . gray
334
+ case . failed:
335
+ . red
336
+ case . unavailable:
337
+ . gray
338
+ }
339
+ }
206
340
}
207
341
208
342
public enum DaemonError : Error {
343
+ case daemonNotRunning
209
344
case daemonStartFailure( Error )
210
345
case connectionFailure( Error )
211
346
case terminatedUnexpectedly
347
+ case grpcFailure( Error )
212
348
213
349
var description : String {
214
350
switch self {
@@ -218,6 +354,10 @@ public enum DaemonError: Error {
218
354
" Connection failure: \( error) "
219
355
case . terminatedUnexpectedly:
220
356
" Daemon terminated unexpectedly "
357
+ case . daemonNotRunning:
358
+ " The daemon must be started first "
359
+ case let . grpcFailure( error) :
360
+ " Failed to communicate with daemon: \( error) "
221
361
}
222
362
}
223
363
0 commit comments