Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add mutagen prompting gRPC #118

Merged
merged 1 commit into from
Mar 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions Coder-Desktop/VPNLib/FileSync/FileSyncDaemon.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public protocol FileSyncDaemon: ObservableObject {

@MainActor
public class MutagenDaemon: FileSyncDaemon {
private let logger = Logger(subsystem: Bundle.main.bundleIdentifier!, category: "mutagen")
let logger = Logger(subsystem: Bundle.main.bundleIdentifier!, category: "mutagen")

@Published public var state: DaemonState = .stopped {
didSet {
Expand All @@ -42,9 +42,9 @@ public class MutagenDaemon: FileSyncDaemon {
private let mutagenDaemonSocket: URL

// Non-nil when the daemon is running
var client: DaemonClient?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making this internal instead of private so we can split the daemon class up into multiple extensions, across multiple files.

private var group: MultiThreadedEventLoopGroup?
private var channel: GRPCChannel?
private var client: DaemonClient?

// Protect start & stop transitions against re-entrancy
private let transition = AsyncSemaphore(value: 1)
Expand Down Expand Up @@ -171,7 +171,8 @@ public class MutagenDaemon: FileSyncDaemon {
)
client = DaemonClient(
mgmt: Daemon_DaemonAsyncClient(channel: channel!),
sync: Synchronization_SynchronizationAsyncClient(channel: channel!)
sync: Synchronization_SynchronizationAsyncClient(channel: channel!),
prompt: Prompting_PromptingAsyncClient(channel: channel!)
)
logger.info(
"Successfully connected to mutagen daemon, socket: \(self.mutagenDaemonSocket.path, privacy: .public)"
Expand Down Expand Up @@ -301,6 +302,7 @@ public class MutagenDaemon: FileSyncDaemon {
struct DaemonClient {
let mgmt: Daemon_DaemonAsyncClient
let sync: Synchronization_SynchronizationAsyncClient
let prompt: Prompting_PromptingAsyncClient
}

public enum DaemonState {
Expand Down Expand Up @@ -342,6 +344,8 @@ public enum DaemonError: Error {
case connectionFailure(Error)
case terminatedUnexpectedly
case grpcFailure(Error)
case invalidGrpcResponse(String)
case unexpectedStreamClosure

public var description: String {
switch self {
Expand All @@ -355,6 +359,10 @@ public enum DaemonError: Error {
"The daemon must be started first"
case let .grpcFailure(error):
"Failed to communicate with daemon: \(error)"
case let .invalidGrpcResponse(response):
"Invalid gRPC response: \(response)"
case .unexpectedStreamClosure:
"Unexpected stream closure"
}
}

Expand Down
53 changes: 53 additions & 0 deletions Coder-Desktop/VPNLib/FileSync/FileSyncPrompting.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import GRPC

extension MutagenDaemon {
typealias PromptStream = GRPCAsyncBidirectionalStreamingCall<Prompting_HostRequest, Prompting_HostResponse>

func host(allowPrompts: Bool = true) async throws(DaemonError) -> (PromptStream, identifier: String) {
let stream = client!.prompt.makeHostCall()

do {
try await stream.requestStream.send(.with { req in req.allowPrompts = allowPrompts })
} catch {
throw .grpcFailure(error)
}

// We can't make call `makeAsyncIterator` more than once
// (as a for-loop would do implicitly)
var iter = stream.responseStream.makeAsyncIterator()

let initResp: Prompting_HostResponse?
do {
initResp = try await iter.next()
} catch {
throw .grpcFailure(error)
}
guard let initResp else {
throw .unexpectedStreamClosure
}
try initResp.ensureValid(first: true, allowPrompts: allowPrompts)

Task.detached(priority: .background) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block is self-contained, and we're currently doing a lot on the main actor already. I was previously running into issues on startup with the VPN code getting starved by file sync code.

do {
while let msg = try await iter.next() {
try msg.ensureValid(first: false, allowPrompts: allowPrompts)
var reply: Prompting_HostRequest = .init()
if msg.isPrompt {
// Handle SSH key prompts
if msg.message.contains("yes/no/[fingerprint]") {
reply.response = "yes"
}
// Any other messages that require a non-empty response will
// cause the create op to fail, showing an error. This is ok for now.
}
try await stream.requestStream.send(reply)
}
} catch let error as GRPCStatus where error.code == .cancelled {
return
} catch {
self.logger.critical("Prompt stream failed: \(error)")
}
}
return (stream, identifier: initResp.identifier)
}
}
23 changes: 23 additions & 0 deletions Coder-Desktop/VPNLib/FileSync/MutagenConvert.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,26 @@ func accumulateErrors(from state: Synchronization_State) -> [FileSyncError] {
func humanReadableBytes(_ bytes: UInt64) -> String {
ByteCountFormatter().string(fromByteCount: Int64(bytes))
}

extension Prompting_HostResponse {
func ensureValid(first: Bool, allowPrompts: Bool) throws(DaemonError) {
if first {
if identifier.isEmpty {
throw .invalidGrpcResponse("empty prompter identifier")
}
if isPrompt {
throw .invalidGrpcResponse("unexpected message type specification")
}
if !message.isEmpty {
throw .invalidGrpcResponse("unexpected message")
}
} else {
if !identifier.isEmpty {
throw .invalidGrpcResponse("unexpected prompter identifier")
}
if isPrompt, !allowPrompts {
throw .invalidGrpcResponse("disallowed prompt message type")
}
}
}
}
Loading
Loading