-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: manage mutagen daemon lifecycle #98
Merged
Merged
Changes from all commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
0f362f8
chore: manage mutagen daemon lifecycle
ethanndickson 513ccd8
handle missing mutagen
ethanndickson 3291e73
fixup
ethanndickson 9be6173
gitkeep resources
ethanndickson b0cbab8
fixup
ethanndickson ebcadbe
only import swiftprotobuf in vpnlib to debug runtime crash
ethanndickson 5ed3893
extract to seperate lib
ethanndickson c1947aa
move back to vpnlib
ethanndickson b13a44f
remove duplicate framework
ethanndickson c9cba6d
logging
ethanndickson 2b673b8
review
ethanndickson 76abed5
error handling
ethanndickson f2fc365
log privacy
ethanndickson 21bb169
fixup
ethanndickson 2bf41aa
fixup
ethanndickson 16a7263
process -> subprocess package
ethanndickson 382fd9b
use async let
ethanndickson ad1b24d
fixup
ethanndickson File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
# TODO: Remove this once the grpc-swift-protobuf generator adds a lint disable comment | ||
excluded: | ||
- "**/*.pb.swift" | ||
- "**/*.grpc.swift" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,3 @@ | ||
--selfrequired log,info,error,debug,critical,fault | ||
--exclude **.pb.swift | ||
--exclude **.pb.swift,**.grpc.swift | ||
--condassignment always |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,225 @@ | ||
import Foundation | ||
import GRPC | ||
import NIO | ||
import os | ||
import Subprocess | ||
|
||
@MainActor | ||
public protocol FileSyncDaemon: ObservableObject { | ||
var state: DaemonState { get } | ||
func start() async | ||
func stop() async | ||
} | ||
|
||
@MainActor | ||
public class MutagenDaemon: FileSyncDaemon { | ||
private let logger = Logger(subsystem: Bundle.main.bundleIdentifier!, category: "mutagen") | ||
|
||
@Published public var state: DaemonState = .stopped { | ||
didSet { | ||
logger.info("daemon state changed: \(self.state.description, privacy: .public)") | ||
} | ||
} | ||
|
||
private var mutagenProcess: Subprocess? | ||
private let mutagenPath: URL! | ||
private let mutagenDataDirectory: URL | ||
private let mutagenDaemonSocket: URL | ||
|
||
private var group: MultiThreadedEventLoopGroup? | ||
private var channel: GRPCChannel? | ||
private var client: Daemon_DaemonAsyncClient? | ||
|
||
public init() { | ||
#if arch(arm64) | ||
mutagenPath = Bundle.main.url(forResource: "mutagen-darwin-arm64", withExtension: nil) | ||
#elseif arch(x86_64) | ||
mutagenPath = Bundle.main.url(forResource: "mutagen-darwin-amd64", withExtension: nil) | ||
#else | ||
fatalError("unknown architecture") | ||
#endif | ||
mutagenDataDirectory = FileManager.default.urls( | ||
for: .applicationSupportDirectory, | ||
in: .userDomainMask | ||
).first!.appending(path: "Coder Desktop").appending(path: "Mutagen") | ||
mutagenDaemonSocket = mutagenDataDirectory.appending(path: "daemon").appending(path: "daemon.sock") | ||
// It shouldn't be fatal if the app was built without Mutagen embedded, | ||
// but file sync will be unavailable. | ||
if mutagenPath == nil { | ||
logger.warning("Mutagen not embedded in app, file sync will be unavailable") | ||
state = .unavailable | ||
} | ||
} | ||
|
||
public func start() async { | ||
if case .unavailable = state { return } | ||
ThomasK33 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Stop an orphaned daemon, if there is one | ||
try? await connect() | ||
await stop() | ||
|
||
mutagenProcess = createMutagenProcess() | ||
// swiftlint:disable:next large_tuple | ||
let (standardOutput, standardError, waitForExit): (Pipe.AsyncBytes, Pipe.AsyncBytes, @Sendable () async -> Void) | ||
do { | ||
(standardOutput, standardError, waitForExit) = try mutagenProcess!.run() | ||
} catch { | ||
state = .failed(DaemonError.daemonStartFailure(error)) | ||
return | ||
} | ||
|
||
Task { | ||
await streamHandler(io: standardOutput) | ||
logger.info("standard output stream closed") | ||
} | ||
|
||
Task { | ||
await streamHandler(io: standardError) | ||
logger.info("standard error stream closed") | ||
} | ||
|
||
Task { | ||
await terminationHandler(waitForExit: waitForExit) | ||
} | ||
|
||
do { | ||
try await connect() | ||
} catch { | ||
state = .failed(DaemonError.daemonStartFailure(error)) | ||
return | ||
} | ||
|
||
state = .running | ||
logger.info( | ||
""" | ||
mutagen daemon started, pid: | ||
\(self.mutagenProcess?.pid.description ?? "unknown", privacy: .public) | ||
""" | ||
) | ||
} | ||
|
||
private func connect() async throws(DaemonError) { | ||
guard client == nil else { | ||
// Already connected | ||
return | ||
} | ||
group = MultiThreadedEventLoopGroup(numberOfThreads: 1) | ||
do { | ||
channel = try GRPCChannelPool.with( | ||
target: .unixDomainSocket(mutagenDaemonSocket.path), | ||
transportSecurity: .plaintext, | ||
eventLoopGroup: group! | ||
) | ||
client = Daemon_DaemonAsyncClient(channel: channel!) | ||
logger.info( | ||
"Successfully connected to mutagen daemon, socket: \(self.mutagenDaemonSocket.path, privacy: .public)" | ||
) | ||
} catch { | ||
logger.error("Failed to connect to gRPC: \(error)") | ||
try? await cleanupGRPC() | ||
throw DaemonError.connectionFailure(error) | ||
} | ||
} | ||
|
||
private func cleanupGRPC() async throws { | ||
try? await channel?.close().get() | ||
try? await group?.shutdownGracefully() | ||
|
||
client = nil | ||
channel = nil | ||
group = nil | ||
} | ||
|
||
public func stop() async { | ||
if case .unavailable = state { return } | ||
state = .stopped | ||
guard FileManager.default.fileExists(atPath: mutagenDaemonSocket.path) else { | ||
// Already stopped | ||
return | ||
} | ||
|
||
// "We don't check the response or error, because the daemon | ||
// may terminate before it has a chance to send the response." | ||
_ = try? await client?.terminate( | ||
Daemon_TerminateRequest(), | ||
callOptions: .init(timeLimit: .timeout(.milliseconds(500))) | ||
) | ||
|
||
try? await cleanupGRPC() | ||
|
||
mutagenProcess?.kill() | ||
mutagenProcess = nil | ||
logger.info("Daemon stopped and gRPC connection closed") | ||
} | ||
|
||
private func createMutagenProcess() -> Subprocess { | ||
let process = Subprocess([mutagenPath.path, "daemon", "run"]) | ||
process.environment = [ | ||
"MUTAGEN_DATA_DIRECTORY": mutagenDataDirectory.path, | ||
] | ||
logger.info("setting mutagen data directory: \(self.mutagenDataDirectory.path, privacy: .public)") | ||
return process | ||
} | ||
|
||
private func terminationHandler(waitForExit: @Sendable () async -> Void) async { | ||
await waitForExit() | ||
|
||
switch state { | ||
case .stopped: | ||
logger.info("mutagen daemon stopped") | ||
default: | ||
logger.error( | ||
""" | ||
mutagen daemon exited unexpectedly with code: | ||
\(self.mutagenProcess?.exitCode.description ?? "unknown") | ||
""" | ||
) | ||
state = .failed(.terminatedUnexpectedly) | ||
} | ||
} | ||
|
||
private func streamHandler(io: Pipe.AsyncBytes) async { | ||
for await line in io.lines { | ||
logger.info("\(line, privacy: .public)") | ||
} | ||
} | ||
} | ||
|
||
public enum DaemonState { | ||
case running | ||
case stopped | ||
case failed(DaemonError) | ||
case unavailable | ||
|
||
var description: String { | ||
switch self { | ||
case .running: | ||
"Running" | ||
case .stopped: | ||
"Stopped" | ||
case let .failed(error): | ||
"Failed: \(error)" | ||
case .unavailable: | ||
"Unavailable" | ||
} | ||
} | ||
} | ||
|
||
public enum DaemonError: Error { | ||
case daemonStartFailure(Error) | ||
case connectionFailure(Error) | ||
case terminatedUnexpectedly | ||
|
||
var description: String { | ||
switch self { | ||
case let .daemonStartFailure(error): | ||
"Daemon start failure: \(error)" | ||
case let .connectionFailure(error): | ||
"Connection failure: \(error)" | ||
case .terminatedUnexpectedly: | ||
"Daemon terminated unexpectedly" | ||
} | ||
} | ||
|
||
var localizedDescription: String { description } | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drive-by fix: I believe this is implicit in the
ObservableObject
conformance, but can't hurt to make it explicit.