Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: conditionally start file sync daemon #115

Merged
merged 2 commits into from
Mar 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions Coder-Desktop/Coder-Desktop/Coder_DesktopApp.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,18 @@
override init() {
vpn = CoderVPNService()
state = AppState(onChange: vpn.configureTunnelProviderProtocol)
fileSyncDaemon = MutagenDaemon()
if state.startVPNOnLaunch {
vpn.startWhenReady = true
}
vpn.installSystemExtension()
#if arch(arm64)
let mutagenBinary = "mutagen-darwin-arm64"

Check warning on line 44 in Coder-Desktop/Coder-Desktop/Coder_DesktopApp.swift

View workflow job for this annotation

GitHub Actions / fmt

Indent code in accordance with the scope level. (indent)
#elseif arch(x86_64)
let mutagenBinary = "mutagen-darwin-amd64"

Check warning on line 46 in Coder-Desktop/Coder-Desktop/Coder_DesktopApp.swift

View workflow job for this annotation

GitHub Actions / fmt

Indent code in accordance with the scope level. (indent)
#endif
fileSyncDaemon = MutagenDaemon(
mutagenPath: Bundle.main.url(forResource: mutagenBinary, withExtension: nil)
)
}

func applicationDidFinishLaunching(_: Notification) {
Expand Down Expand Up @@ -73,10 +80,6 @@
state.reconfigure()
}
}
// TODO: Start the daemon only once a file sync is configured
Task {
await fileSyncDaemon.start()
}
}

deinit {
Expand Down
190 changes: 165 additions & 25 deletions Coder-Desktop/VPNLib/FileSync/FileSyncDaemon.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,26 @@ import Foundation
import GRPC
import NIO
import os
import Semaphore
import Subprocess
import SwiftUI

@MainActor
public protocol FileSyncDaemon: ObservableObject {
var state: DaemonState { get }
func start() async
func start() async throws(DaemonError)
func stop() async
func listSessions() async throws -> [FileSyncSession]
func createSession(with: FileSyncSession) async throws
}

public struct FileSyncSession {
public let id: String
public let name: String
public let localPath: URL
public let workspace: String
public let agent: String
public let remotePath: URL
}

@MainActor
Expand All @@ -17,7 +30,14 @@ public class MutagenDaemon: FileSyncDaemon {

@Published public var state: DaemonState = .stopped {
didSet {
logger.info("daemon state changed: \(self.state.description, privacy: .public)")
logger.info("daemon state set: \(self.state.description, privacy: .public)")
if case .failed = state {
Task {
try? await cleanupGRPC()
}
mutagenProcess?.kill()
mutagenProcess = nil
}
}
}

Expand All @@ -26,46 +46,61 @@ public class MutagenDaemon: FileSyncDaemon {
private let mutagenDataDirectory: URL
private let mutagenDaemonSocket: URL

// Non-nil when the daemon is running
private var group: MultiThreadedEventLoopGroup?
private var channel: GRPCChannel?
private var client: Daemon_DaemonAsyncClient?

public init() {
#if arch(arm64)
mutagenPath = Bundle.main.url(forResource: "mutagen-darwin-arm64", withExtension: nil)
#elseif arch(x86_64)
mutagenPath = Bundle.main.url(forResource: "mutagen-darwin-amd64", withExtension: nil)
#else
fatalError("unknown architecture")
#endif
mutagenDataDirectory = FileManager.default.urls(
for: .applicationSupportDirectory,
in: .userDomainMask
).first!.appending(path: "Coder Desktop").appending(path: "Mutagen")
private var client: DaemonClient?

// Protect start & stop transitions against re-entrancy
private let transition = AsyncSemaphore(value: 1)

public init(mutagenPath: URL? = nil,
mutagenDataDirectory: URL = FileManager.default.urls(
for: .applicationSupportDirectory,
in: .userDomainMask
).first!.appending(path: "Coder Desktop").appending(path: "Mutagen"))
{
self.mutagenPath = mutagenPath
self.mutagenDataDirectory = mutagenDataDirectory
mutagenDaemonSocket = mutagenDataDirectory.appending(path: "daemon").appending(path: "daemon.sock")
// It shouldn't be fatal if the app was built without Mutagen embedded,
// but file sync will be unavailable.
if mutagenPath == nil {
logger.warning("Mutagen not embedded in app, file sync will be unavailable")
state = .unavailable
return
}

// If there are sync sessions, the daemon should be running
Task {
do throws(DaemonError) {
try await start()
} catch {
state = .failed(error)
return
}
await stopIfNoSessions()
}
}

public func start() async {
public func start() async throws(DaemonError) {
if case .unavailable = state { return }

// Stop an orphaned daemon, if there is one
try? await connect()
await stop()

await transition.wait()
defer { transition.signal() }
logger.info("starting mutagen daemon")

mutagenProcess = createMutagenProcess()
// swiftlint:disable:next large_tuple
let (standardOutput, standardError, waitForExit): (Pipe.AsyncBytes, Pipe.AsyncBytes, @Sendable () async -> Void)
do {
(standardOutput, standardError, waitForExit) = try mutagenProcess!.run()
} catch {
state = .failed(DaemonError.daemonStartFailure(error))
return
throw .daemonStartFailure(error)
}

Task {
Expand All @@ -85,10 +120,11 @@ public class MutagenDaemon: FileSyncDaemon {
do {
try await connect()
} catch {
state = .failed(DaemonError.daemonStartFailure(error))
return
throw .daemonStartFailure(error)
}

try await waitForDaemonStart()

state = .running
logger.info(
"""
Expand All @@ -98,6 +134,34 @@ public class MutagenDaemon: FileSyncDaemon {
)
}

// The daemon takes a moment to open the socket, and we don't want to hog the main actor
// so poll for it on a background thread
private func waitForDaemonStart(
maxAttempts: Int = 5,
attemptInterval: Duration = .milliseconds(100)
) async throws(DaemonError) {
do {
try await Task.detached(priority: .background) {
for attempt in 0 ... maxAttempts {
do {
_ = try await self.client!.mgmt.version(
Daemon_VersionRequest(),
callOptions: .init(timeLimit: .timeout(.milliseconds(500)))
)
return
} catch {
if attempt == maxAttempts {
throw error
}
try? await Task.sleep(for: attemptInterval)
}
}
}.value
} catch {
throw .daemonStartFailure(error)
}
}

private func connect() async throws(DaemonError) {
guard client == nil else {
// Already connected
Expand All @@ -110,14 +174,17 @@ public class MutagenDaemon: FileSyncDaemon {
transportSecurity: .plaintext,
eventLoopGroup: group!
)
client = Daemon_DaemonAsyncClient(channel: channel!)
client = DaemonClient(
mgmt: Daemon_DaemonAsyncClient(channel: channel!),
sync: Synchronization_SynchronizationAsyncClient(channel: channel!)
)
logger.info(
"Successfully connected to mutagen daemon, socket: \(self.mutagenDaemonSocket.path, privacy: .public)"
)
} catch {
logger.error("Failed to connect to gRPC: \(error)")
try? await cleanupGRPC()
throw DaemonError.connectionFailure(error)
throw .connectionFailure(error)
}
}

Expand All @@ -132,6 +199,10 @@ public class MutagenDaemon: FileSyncDaemon {

public func stop() async {
if case .unavailable = state { return }
await transition.wait()
defer { transition.signal() }
logger.info("stopping mutagen daemon")

state = .stopped
guard FileManager.default.fileExists(atPath: mutagenDaemonSocket.path) else {
// Already stopped
Expand All @@ -140,7 +211,7 @@ public class MutagenDaemon: FileSyncDaemon {

// "We don't check the response or error, because the daemon
// may terminate before it has a chance to send the response."
_ = try? await client?.terminate(
_ = try? await client?.mgmt.terminate(
Daemon_TerminateRequest(),
callOptions: .init(timeLimit: .timeout(.milliseconds(500)))
)
Expand Down Expand Up @@ -175,6 +246,7 @@ public class MutagenDaemon: FileSyncDaemon {
"""
)
state = .failed(.terminatedUnexpectedly)
return
}
}

Expand All @@ -183,6 +255,55 @@ public class MutagenDaemon: FileSyncDaemon {
logger.info("\(line, privacy: .public)")
}
}

public func listSessions() async throws -> [FileSyncSession] {
guard case .running = state else {
return []
}
// TODO: Implement
return []
}

public func createSession(with _: FileSyncSession) async throws {
if case .stopped = state {
do throws(DaemonError) {
try await start()
} catch {
state = .failed(error)
return
}
}
// TODO: Add Session
}

public func deleteSession() async throws {
// TODO: Delete session
await stopIfNoSessions()
}

private func stopIfNoSessions() async {
let sessions: Synchronization_ListResponse
do {
sessions = try await client!.sync.list(Synchronization_ListRequest.with { req in
req.selection = .with { selection in
selection.all = true
}
})
} catch {
state = .failed(.daemonStartFailure(error))
return
}
// If there's no configured sessions, the daemon doesn't need to be running
if sessions.sessionStates.isEmpty {
logger.info("No sync sessions found")
await stop()
}
}
}

struct DaemonClient {
let mgmt: Daemon_DaemonAsyncClient
let sync: Synchronization_SynchronizationAsyncClient
}

public enum DaemonState {
Expand All @@ -191,7 +312,7 @@ public enum DaemonState {
case failed(DaemonError)
case unavailable

var description: String {
public var description: String {
switch self {
case .running:
"Running"
Expand All @@ -203,12 +324,27 @@ public enum DaemonState {
"Unavailable"
}
}

public var color: Color {
switch self {
case .running:
.green
case .stopped:
.gray
case .failed:
.red
case .unavailable:
.gray
}
}
}

public enum DaemonError: Error {
case daemonNotRunning
case daemonStartFailure(Error)
case connectionFailure(Error)
case terminatedUnexpectedly
case grpcFailure(Error)

var description: String {
switch self {
Expand All @@ -218,6 +354,10 @@ public enum DaemonError: Error {
"Connection failure: \(error)"
case .terminatedUnexpectedly:
"Daemon terminated unexpectedly"
case .daemonNotRunning:
"The daemon must be started first"
case let .grpcFailure(error):
"Failed to communicate with daemon: \(error)"
}
}

Expand Down
6 changes: 5 additions & 1 deletion Coder-Desktop/project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,10 @@ packages:
exactVersion: 1.24.2
Subprocess:
url: https://github.com/jamf/Subprocess
revision: 9d67b79
revision: 9d67b79
Semaphore:
url: https://github.com/groue/Semaphore/
exactVersion: 0.1.0

targets:
Coder Desktop:
Expand Down Expand Up @@ -276,6 +279,7 @@ targets:
product: SwiftProtobufPluginLibrary
- package: GRPC
- package: Subprocess
- package: Semaphore
- target: CoderSDK
embed: false

Expand Down
Loading