Skip to content

Commit

Permalink
docs: Document new client connection
Browse files Browse the repository at this point in the history
  • Loading branch information
flexlixrup committed Jan 6, 2025
1 parent 2e25415 commit 991cab2
Showing 1 changed file with 70 additions and 51 deletions.
121 changes: 70 additions & 51 deletions Sources/Pulsar/Documentation.docc/HowToUse.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,25 @@ struct PulsarExample {
func connect(eventLoopGroup: EventLoopGroup) async throws {
var msgCount = 0

// Configure the Pulsar client
let config = PulsarClientConfiguration(
host: "localhost",
port: 6650,
group: eventLoopGroup,
reconnectionLimit: 10
)

// Create a Pulsar client
let client = await PulsarClient(host: "localhost", port: 6650)
let client = try await PulsarClient(configuration: config) { error in
print("Error: \(error)")
}

// Set up a consumer
let consumer = try await client.consumer(
topic: "persistent://public/default/my-topic",
subscription: "test",
subscriptionType: .shared
)
// Set up a consumer
let consumer = try await client.consumer(
topic: "persistent://public/default/my-topic",
subscription: "test",
subscriptionType: .shared
)

// Consume messages
Task {
Expand Down Expand Up @@ -86,9 +96,15 @@ struct PulsarExample {
}

func connect(eventLoopGroup: EventLoopGroup) async throws {
let client = await PulsarClient(host: "localhost", port: 6650, reconnectLimit: 10) { error in
print("Client closed due to error: \(error)")
exit(0)
// Configure the Pulsar client
let config = PulsarClientConfiguration(
host: "localhost",
port: 6650,
group: eventLoopGroup,
reconnectionLimit: 10
)
let client = try await PulsarClient(configuration: config) { error in
print("Error: \(error)")
}

// Set up a producer
Expand Down Expand Up @@ -126,49 +142,52 @@ struct PulsarExample {
The library supports mTLS encryption as well as mTLS authentication.

```swift
var clientCertPath: String? {
// Get all the nescessary certs
Bundle.module.path(forResource: "client-cert", ofType: "pem")
}

var clientKeyPath: String? {
Bundle.module.path(forResource: "client-key", ofType: "pem")
}

var caCertPath: String? {
Bundle.module.path(forResource: "ca-cert", ofType: "pem")
}

// Build the NIOSSLCertificates
let clientCertificate = try NIOSSLCertificate(file: clientCertPath!, format: .pem)
let clientPrivateKey = try NIOSSLPrivateKey(file: clientKeyPath!, format: .pem)
let caCertificate = try NIOSSLCertificate(file: caCertPath!, format: .pem)

// Make a NIO client TLS configuration.
var tlsConfig = TLSConfiguration.makeClientConfiguration()
tlsConfig.certificateVerification = .fullVerification
tlsConfig.trustRoots = .certificates([caCertificate])
tlsConfig.privateKey = .privateKey(clientPrivateKey)
tlsConfig.certificateChain = [.certificate(clientCertificate)]
tlsConfig.certificateVerification = .fullVerification
import Foundation
import NIO
import NIOSSL
import Pulsar

// Wrap it into TLSConnection and define if the cluster only uses TLS encryption or also authentication
let auth = TLSConnection(tlsConfiguration: tlsConfig, clientCA: clientCertificate, authenticationRequired: true)
func createSecureClient(eventLoopGroup: EventLoopGroup) async throws -> PulsarClient {
// Load certificates
guard
let clientCertPath = Bundle.module.path(forResource: "client-cert", ofType: "pem"),
let clientKeyPath = Bundle.module.path(forResource: "client-key", ofType: "pem"),
let caCertPath = Bundle.module.path(forResource: "ca-cert", ofType: "pem")
else {
fatalError("Certificates not found")
}

let client = try await PulsarClient(
host: "localhost",
port: 6651,
tlsConfiguration: auth,
group: eventLoopGroup,
reconnectLimit: 10
) { error in
do {
throw error
} catch {
print("Client closed")
exit(0)
}
}
let clientCertificate = try NIOSSLCertificate(file: clientCertPath, format: .pem)
let clientPrivateKey = try NIOSSLPrivateKey(file: clientKeyPath, format: .pem)
let caCertificate = try NIOSSLCertificate(file: caCertPath, format: .pem)

// Configure TLS
var tlsConfig = TLSConfiguration.makeClientConfiguration()
tlsConfig.certificateVerification = .fullVerification
tlsConfig.trustRoots = .certificates([caCertificate])
tlsConfig.privateKey = .privateKey(clientPrivateKey)
tlsConfig.certificateChain = [.certificate(clientCertificate)]

// Create TLS connection configuration
let auth = TLSConnection(
tlsConfiguration: tlsConfig,
clientCA: clientCertificate,
authenticationRequired: true
)

// Configure the Pulsar client with TLS
let config = PulsarClientConfiguration(
host: "localhost",
port: 6651,
tlsConfiguration: auth,
group: eventLoopGroup,
reconnectionLimit: 10
)

return try await PulsarClient(configuration: config) { error in
print("Error: \(error)")
}
}
```

## Additional Features
Expand Down

0 comments on commit 991cab2

Please sign in to comment.