Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: manage mutagen daemon lifecycle #98

Merged
merged 18 commits into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .swiftlint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# TODO: Remove this once the grpc-swift-protobuf generator adds a lint disable comment
excluded:
- "**/*.pb.swift"
- "**/*.grpc.swift"
2 changes: 1 addition & 1 deletion Coder Desktop/.swiftformat
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
--selfrequired log,info,error,debug,critical,fault
--exclude **.pb.swift
--exclude **.pb.swift,**.grpc.swift
--condassignment always
16 changes: 14 additions & 2 deletions Coder Desktop/Coder Desktop/Coder_DesktopApp.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import FluidMenuBarExtra
import NetworkExtension
import SwiftUI
import VPNLib

@main
struct DesktopApp: App {
Expand Down Expand Up @@ -30,10 +31,12 @@ class AppDelegate: NSObject, NSApplicationDelegate {
private var menuBar: MenuBarController?
let vpn: CoderVPNService
let state: AppState
let fileSyncDaemon: MutagenDaemon

override init() {
vpn = CoderVPNService()
state = AppState(onChange: vpn.configureTunnelProviderProtocol)
fileSyncDaemon = MutagenDaemon()
}

func applicationDidFinishLaunching(_: Notification) {
Expand All @@ -56,14 +59,23 @@ class AppDelegate: NSObject, NSApplicationDelegate {
state.reconfigure()
}
}
// TODO: Start the daemon only once a file sync is configured
Task {
await fileSyncDaemon.start()
}
}

// This function MUST eventually call `NSApp.reply(toApplicationShouldTerminate: true)`
// or return `.terminateNow`
func applicationShouldTerminate(_: NSApplication) -> NSApplication.TerminateReply {
if !state.stopVPNOnQuit { return .terminateNow }
Task {
await vpn.stop()
async let vpnTask: Void = {
if await self.state.stopVPNOnQuit {
await self.vpn.stop()
}
}()
async let fileSyncTask: Void = self.fileSyncDaemon.stop()
_ = await (vpnTask, fileSyncTask)
NSApp.reply(toApplicationShouldTerminate: true)
}
return .terminateLater
Expand Down
1 change: 1 addition & 0 deletions Coder Desktop/Coder Desktop/State.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import KeychainAccess
import NetworkExtension
import SwiftUI

@MainActor
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive-by fix: I believe this is implicit in the ObservableObject conformance, but can't hurt to make it explicit.

class AppState: ObservableObject {
let appId = Bundle.main.bundleIdentifier!

Expand Down
Empty file.
225 changes: 225 additions & 0 deletions Coder Desktop/VPNLib/FileSync/FileSyncDaemon.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
import Foundation
import GRPC
import NIO
import os
import Subprocess

@MainActor
public protocol FileSyncDaemon: ObservableObject {
var state: DaemonState { get }
func start() async
func stop() async
}

@MainActor
public class MutagenDaemon: FileSyncDaemon {
private let logger = Logger(subsystem: Bundle.main.bundleIdentifier!, category: "mutagen")

@Published public var state: DaemonState = .stopped {
didSet {
logger.info("daemon state changed: \(self.state.description, privacy: .public)")
}
}

private var mutagenProcess: Subprocess?
private let mutagenPath: URL!
private let mutagenDataDirectory: URL
private let mutagenDaemonSocket: URL

private var group: MultiThreadedEventLoopGroup?
private var channel: GRPCChannel?
private var client: Daemon_DaemonAsyncClient?

public init() {
#if arch(arm64)
mutagenPath = Bundle.main.url(forResource: "mutagen-darwin-arm64", withExtension: nil)
#elseif arch(x86_64)
mutagenPath = Bundle.main.url(forResource: "mutagen-darwin-amd64", withExtension: nil)
#else
fatalError("unknown architecture")
#endif
mutagenDataDirectory = FileManager.default.urls(
for: .applicationSupportDirectory,
in: .userDomainMask
).first!.appending(path: "Coder Desktop").appending(path: "Mutagen")
mutagenDaemonSocket = mutagenDataDirectory.appending(path: "daemon").appending(path: "daemon.sock")
// It shouldn't be fatal if the app was built without Mutagen embedded,
// but file sync will be unavailable.
if mutagenPath == nil {
logger.warning("Mutagen not embedded in app, file sync will be unavailable")
state = .unavailable
}
}

public func start() async {
if case .unavailable = state { return }

// Stop an orphaned daemon, if there is one
try? await connect()
await stop()

mutagenProcess = createMutagenProcess()
// swiftlint:disable:next large_tuple
let (standardOutput, standardError, waitForExit): (Pipe.AsyncBytes, Pipe.AsyncBytes, @Sendable () async -> Void)
do {
(standardOutput, standardError, waitForExit) = try mutagenProcess!.run()
} catch {
state = .failed(DaemonError.daemonStartFailure(error))
return
}

Task {
await streamHandler(io: standardOutput)
logger.info("standard output stream closed")
}

Task {
await streamHandler(io: standardError)
logger.info("standard error stream closed")
}

Task {
await terminationHandler(waitForExit: waitForExit)
}

do {
try await connect()
} catch {
state = .failed(DaemonError.daemonStartFailure(error))
return
}

state = .running
logger.info(
"""
mutagen daemon started, pid:
\(self.mutagenProcess?.pid.description ?? "unknown", privacy: .public)
"""
)
}

private func connect() async throws(DaemonError) {
guard client == nil else {
// Already connected
return
}
group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
do {
channel = try GRPCChannelPool.with(
target: .unixDomainSocket(mutagenDaemonSocket.path),
transportSecurity: .plaintext,
eventLoopGroup: group!
)
client = Daemon_DaemonAsyncClient(channel: channel!)
logger.info(
"Successfully connected to mutagen daemon, socket: \(self.mutagenDaemonSocket.path, privacy: .public)"
)
} catch {
logger.error("Failed to connect to gRPC: \(error)")
try? await cleanupGRPC()
throw DaemonError.connectionFailure(error)
}
}

private func cleanupGRPC() async throws {
try? await channel?.close().get()
try? await group?.shutdownGracefully()

client = nil
channel = nil
group = nil
}

public func stop() async {
if case .unavailable = state { return }
state = .stopped
guard FileManager.default.fileExists(atPath: mutagenDaemonSocket.path) else {
// Already stopped
return
}

// "We don't check the response or error, because the daemon
// may terminate before it has a chance to send the response."
_ = try? await client?.terminate(
Daemon_TerminateRequest(),
callOptions: .init(timeLimit: .timeout(.milliseconds(500)))
)

try? await cleanupGRPC()

mutagenProcess?.kill()
mutagenProcess = nil
logger.info("Daemon stopped and gRPC connection closed")
}

private func createMutagenProcess() -> Subprocess {
let process = Subprocess([mutagenPath.path, "daemon", "run"])
process.environment = [
"MUTAGEN_DATA_DIRECTORY": mutagenDataDirectory.path,
]
logger.info("setting mutagen data directory: \(self.mutagenDataDirectory.path, privacy: .public)")
return process
}

private func terminationHandler(waitForExit: @Sendable () async -> Void) async {
await waitForExit()

switch state {
case .stopped:
logger.info("mutagen daemon stopped")
default:
logger.error(
"""
mutagen daemon exited unexpectedly with code:
\(self.mutagenProcess?.exitCode.description ?? "unknown")
"""
)
state = .failed(.terminatedUnexpectedly)
}
}

private func streamHandler(io: Pipe.AsyncBytes) async {
for await line in io.lines {
logger.info("\(line, privacy: .public)")
}
}
}

public enum DaemonState {
case running
case stopped
case failed(DaemonError)
case unavailable

var description: String {
switch self {
case .running:
"Running"
case .stopped:
"Stopped"
case let .failed(error):
"Failed: \(error)"
case .unavailable:
"Unavailable"
}
}
}

public enum DaemonError: Error {
case daemonStartFailure(Error)
case connectionFailure(Error)
case terminatedUnexpectedly

var description: String {
switch self {
case let .daemonStartFailure(error):
"Daemon start failure: \(error)"
case let .connectionFailure(error):
"Connection failure: \(error)"
case .terminatedUnexpectedly:
"Daemon terminated unexpectedly"
}
}

var localizedDescription: String { description }
}
Loading
Loading