-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathFileSyncDaemon.swift
225 lines (195 loc) · 6.75 KB
/
FileSyncDaemon.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import Foundation
import GRPC
import NIO
import os
import Subprocess
@MainActor
public protocol FileSyncDaemon: ObservableObject {
var state: DaemonState { get }
func start() async
func stop() async
}
@MainActor
public class MutagenDaemon: FileSyncDaemon {
private let logger = Logger(subsystem: Bundle.main.bundleIdentifier!, category: "mutagen")
@Published public var state: DaemonState = .stopped {
didSet {
logger.info("daemon state changed: \(self.state.description, privacy: .public)")
}
}
private var mutagenProcess: Subprocess?
private let mutagenPath: URL!
private let mutagenDataDirectory: URL
private let mutagenDaemonSocket: URL
private var group: MultiThreadedEventLoopGroup?
private var channel: GRPCChannel?
private var client: Daemon_DaemonAsyncClient?
public init() {
#if arch(arm64)
mutagenPath = Bundle.main.url(forResource: "mutagen-darwin-arm64", withExtension: nil)
#elseif arch(x86_64)
mutagenPath = Bundle.main.url(forResource: "mutagen-darwin-amd64", withExtension: nil)
#else
fatalError("unknown architecture")
#endif
mutagenDataDirectory = FileManager.default.urls(
for: .applicationSupportDirectory,
in: .userDomainMask
).first!.appending(path: "Coder Desktop").appending(path: "Mutagen")
mutagenDaemonSocket = mutagenDataDirectory.appending(path: "daemon").appending(path: "daemon.sock")
// It shouldn't be fatal if the app was built without Mutagen embedded,
// but file sync will be unavailable.
if mutagenPath == nil {
logger.warning("Mutagen not embedded in app, file sync will be unavailable")
state = .unavailable
}
}
public func start() async {
if case .unavailable = state { return }
// Stop an orphaned daemon, if there is one
try? await connect()
await stop()
mutagenProcess = createMutagenProcess()
// swiftlint:disable:next large_tuple
let (standardOutput, standardError, waitForExit): (Pipe.AsyncBytes, Pipe.AsyncBytes, @Sendable () async -> Void)
do {
(standardOutput, standardError, waitForExit) = try mutagenProcess!.run()
} catch {
state = .failed(DaemonError.daemonStartFailure(error))
return
}
Task {
await streamHandler(io: standardOutput)
logger.info("standard output stream closed")
}
Task {
await streamHandler(io: standardError)
logger.info("standard error stream closed")
}
Task {
await terminationHandler(waitForExit: waitForExit)
}
do {
try await connect()
} catch {
state = .failed(DaemonError.daemonStartFailure(error))
return
}
state = .running
logger.info(
"""
mutagen daemon started, pid:
\(self.mutagenProcess?.pid.description ?? "unknown", privacy: .public)
"""
)
}
private func connect() async throws(DaemonError) {
guard client == nil else {
// Already connected
return
}
group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
do {
channel = try GRPCChannelPool.with(
target: .unixDomainSocket(mutagenDaemonSocket.path),
transportSecurity: .plaintext,
eventLoopGroup: group!
)
client = Daemon_DaemonAsyncClient(channel: channel!)
logger.info(
"Successfully connected to mutagen daemon, socket: \(self.mutagenDaemonSocket.path, privacy: .public)"
)
} catch {
logger.error("Failed to connect to gRPC: \(error)")
try? await cleanupGRPC()
throw DaemonError.connectionFailure(error)
}
}
private func cleanupGRPC() async throws {
try? await channel?.close().get()
try? await group?.shutdownGracefully()
client = nil
channel = nil
group = nil
}
public func stop() async {
if case .unavailable = state { return }
state = .stopped
guard FileManager.default.fileExists(atPath: mutagenDaemonSocket.path) else {
// Already stopped
return
}
// "We don't check the response or error, because the daemon
// may terminate before it has a chance to send the response."
_ = try? await client?.terminate(
Daemon_TerminateRequest(),
callOptions: .init(timeLimit: .timeout(.milliseconds(500)))
)
try? await cleanupGRPC()
mutagenProcess?.kill()
mutagenProcess = nil
logger.info("Daemon stopped and gRPC connection closed")
}
private func createMutagenProcess() -> Subprocess {
let process = Subprocess([mutagenPath.path, "daemon", "run"])
process.environment = [
"MUTAGEN_DATA_DIRECTORY": mutagenDataDirectory.path,
]
logger.info("setting mutagen data directory: \(self.mutagenDataDirectory.path, privacy: .public)")
return process
}
private func terminationHandler(waitForExit: @Sendable () async -> Void) async {
await waitForExit()
switch state {
case .stopped:
logger.info("mutagen daemon stopped")
default:
logger.error(
"""
mutagen daemon exited unexpectedly with code:
\(self.mutagenProcess?.exitCode.description ?? "unknown")
"""
)
state = .failed(.terminatedUnexpectedly)
}
}
private func streamHandler(io: Pipe.AsyncBytes) async {
for await line in io.lines {
logger.info("\(line, privacy: .public)")
}
}
}
public enum DaemonState {
case running
case stopped
case failed(DaemonError)
case unavailable
var description: String {
switch self {
case .running:
"Running"
case .stopped:
"Stopped"
case let .failed(error):
"Failed: \(error)"
case .unavailable:
"Unavailable"
}
}
}
public enum DaemonError: Error {
case daemonStartFailure(Error)
case connectionFailure(Error)
case terminatedUnexpectedly
var description: String {
switch self {
case let .daemonStartFailure(error):
"Daemon start failure: \(error)"
case let .connectionFailure(error):
"Connection failure: \(error)"
case .terminatedUnexpectedly:
"Daemon terminated unexpectedly"
}
}
var localizedDescription: String { description }
}