Skip to content

Commit

Permalink
docs: Extend documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
flexlixrup committed Jan 2, 2025
1 parent dfb5151 commit 66ff9ee
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 33 deletions.
97 changes: 85 additions & 12 deletions Sources/Pulsar/Documentation.docc/HowToUse.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# Get started
# Get Started

This section provides an overview how to quickly get the library up and running.
Welcome to the Swift Pulsar Client! This guide will help you get started with the library quickly by walking you through the basic setup and usage for both consuming and producing messages.

## Consumer
## Consumer Example
The following example demonstrates how to create a Pulsar consumer to receive messages from a specific topic.

```swift
import Logging
Expand All @@ -12,43 +13,115 @@ import Pulsar
@main
struct PulsarExample {
static func main() async throws {
// You do not need to provide your own EventLoopGroup.
// Set up logging and event loop group
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
LoggingSystem.bootstrap { label in
var handler = StreamLogHandler.standardOutput(label: label)
handler.logLevel = .trace
return handler
}

let connector = PulsarExample()
try await connector.connect(eventLoopGroup: eventLoopGroup)
}

func connect(eventLoopGroup: EventLoopGroup) async throws {
var msgCount = 0
// Create a Pulsar client and connect to the server at localhost:6650.

// Create a Pulsar client
let client = await PulsarClient(host: "localhost", port: 6650)

// Create a consumer at the specified topic with the wanted subscription name.
let consumer = try await client.consumer(topic: "persistent://public/default/my-topic", subscription: "test", subscriptionType: .shared)
// Set up a consumer
let consumer = try await client.consumer(
topic: "persistent://public/default/my-topic",
subscription: "test",
subscriptionType: .shared
)

// Consume messages
Task {
do {
// Consume messages and do a thing, everytime you receive a message.
for try await message in consumer {
msgCount += 1
print("Received message in the exec: \(String(decoding: message.data, as: UTF8.self))")
print("Received message: \(String(decoding: message.data, as: UTF8.self))")
if msgCount == 2 {
try await consumer.close()
print("Closed consumer")
}
}
} catch {
// The consumer should never close automatically, only when you call consumer.close()
print("Whooops we closed, this should never happen automatically.")
print("Unexpected consumer closure: \(error)")
}
}

// Keep the application running
let keepAlivePromise = eventLoopGroup.next().makePromise(of: Void.self)
try await keepAlivePromise.futureResult.get()
}
}
```

## Producer Example
This example shows how to create a producer to send messages to a specific Pulsar topic.

```swift
import Foundation
import Logging
import NIO
import Pulsar

@main
struct PulsarExample {
static func main() async throws {
// Set up logging and event loop group
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
LoggingSystem.bootstrap { label in
var handler = StreamLogHandler.standardOutput(label: label)
handler.logLevel = .trace
return handler
}

let connector = PulsarExample()
try await connector.connect(eventLoopGroup: eventLoopGroup)
}

func connect(eventLoopGroup: EventLoopGroup) async throws {
let client = await PulsarClient(host: "localhost", port: 6650, reconnectLimit: 10) { error in
print("Client closed due to error: \(error)")
exit(0)
}

// Set up a producer
let producer = try await client.producer(
topic: "persistent://public/default/my-topic1",
accessMode: .shared,
schema: .string
) { _ in
print("Producer closed")
} as PulsarProducer<String>

// Send messages in a loop
Task {
while true {
do {
let message = "Hello from Swift"
try await producer.asyncSend(message: Message(payload: message))
print("Sent message: \(message)")
try await Task.sleep(for: .seconds(5))
} catch {
print("Failed to send message: \(error)")
}
}
}
// Keep the application running.

// Keep the application running
let keepAlivePromise = eventLoopGroup.next().makePromise(of: Void.self)
try await keepAlivePromise.futureResult.get()
}
}
```

## Additional Features
- **Reconnection Handling**: Configure the reconnection limit with the `reconnectLimit` parameter when initializing the `PulsarClient`.
- **Schema Support**: Specify schemas like `.string` for type-safe message handling.
- **Logging**: Use Swift's `Logging` package to customize log levels and outputs.
78 changes: 74 additions & 4 deletions Sources/Pulsar/Documentation.docc/SupportedFeatures.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,80 @@

The library is not (yet) complete. This section provides an overview of the supported features of the library.

## Client
## Client Features

## Consumer
| Feature | Supported | Notes |
|---------------------------------|-----------|-----------------------------------------------|
| **Authentication** | | |
| - Token-based Authentication | No | |
| - TLS Authentication | No | |
| - OAuth2 Authentication | No | |
| **Encryption** | | |
| - End-to-End Encryption | No | |
| **Connection Handling** | | |
| - Automatic Reconnection | Yes | |
| - Connection Timeout Config | No | |
| **Logging and Metrics** | | |
| - Built-in Logging | Yes | |
| - Metrics Collection | No | |

## Producer
## Producer Features

## Reader
| Feature | Supported | Notes |
|---------------------------------|-----------|-----------------------------------------------|
| **Message Routing** | | |
| - Key-based Routing | No | |
| - Custom Partitioning | No | |
| **Message Delivery** | | |
| - Synchronous Send | Yes | |
| - Asynchronous Send | Yes | |
| - Batch Message Publishing | No | |
| **Compression** | | |
| - LZ4 Compression | No | |
| - ZLIB Compression | No | |
| - ZSTD Compression | No | |
| - SNAPPY Compression | No | |
| **Message TTL** | | |
| - Set per Message | No | |
| **Delayed Delivery** | | |
| - Deliver After Delay | No | |
| - Deliver at Specific Time | No | |

## Consumer Features

| Feature | Supported | Notes |
|---------------------------------|-----------|-----------------------------------------------|
| **Subscription Types** | | |
| - Exclusive | Yes | |
| - Shared | Yes | |
| - Failover | Yes | |
| - Key_Shared | Yes | |
| **Message Acknowledgment** | | |
| - Individual Acknowledgment | Yes | |
| - Cumulative Acknowledgment | No | |
| - Negative Acknowledgment | No | |
| **Batch Message Consumption** | | |
| - Batch Receive | No | |
| **Dead Letter Policy** | | |
| - Dead Letter Topic Support | No | |
| **Message Redelivery** | | |
| - Delayed Redelivery | No | |
| - Max Redelivery Attempts | No | |

## Reader Features (Not implemented)

| Feature | Supported | Notes |
|---------------------------------|-----------|-----------------------------------------------|
| **Message Reading** | | |
| - Start from Specific MessageID | No | |
| - Start from Timestamp | No | |
| **Non-Durable Subscriptions** | | |
| - Ephemeral Readers | No | |

## TableView Features (Not implemented)

| Feature | Supported | Notes |
|---------------------------------|-----------|-----------------------------------------------|
| **Key-Value Access** | | |
| - Real-time Key-Value Updates | No | |
| - Snapshot of Latest Key-Values | No | |
14 changes: 0 additions & 14 deletions Sources/Pulsar/Protos.swift
Original file line number Diff line number Diff line change
@@ -1,17 +1,3 @@
// Copyright 2025 Felix Ruppert
//
// Licensed under the Apache License, Version 2.0 (the License );
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an AS IS BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// DO NOT EDIT.
// swift-format-ignore-file
// swiftlint:disable all
Expand Down
9 changes: 8 additions & 1 deletion Sources/Pulsar/PulsarClient/PulsarClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,14 @@ import Logging
import NIO
@_exported import SchemaTypes

/// The core Pulsar Client used to connect to the server. All control of the library, like consumers and producers and its settings are controlled via the Client.
/// The core Pulsar Client used to connect to the server.
///
/// This actor manages the connection to a Pulsar server and provides functionality
/// for creating and managing producers and consumers. It also handles configuration
/// of connection parameters and retry mechanisms.
///
/// All interactions with the Pulsar messaging system, such as sending or receiving messages,
/// are controlled through this client.
public final actor PulsarClient {
let logger = Logger(label: "PulsarClient")
let group: EventLoopGroup
Expand Down
1 change: 1 addition & 0 deletions Sources/Pulsar/PulsarClientError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// The errors thrown by the library.
public enum PulsarClientError: Error, Equatable {
case clientClosed
case networkError
Expand Down
6 changes: 5 additions & 1 deletion Sources/Pulsar/PulsarConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

/// A Pulsar Consumer, used to asynchronously consume messages from a topic.
/// A Pulsar consumer used to asynchronously consume messages from a specific topic.
///
/// This class provides support for consuming messages from a Pulsar topic using various subscription types.
/// It conforms to `AsyncSequence`, enabling iteration over received messages in an asynchronous context.
/// Generic `T` represents the type of payload for the messages, conforming to `PulsarPayload`.
public final class PulsarConsumer<T: PulsarPayload>: AsyncSequence, Sendable, AnyConsumer {
public let consumerID: UInt64
let autoAcknowledge: Bool
Expand Down
6 changes: 5 additions & 1 deletion Sources/Pulsar/PulsarProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

/// A Pulsar producer, used to publish messages to a topic.
/// A Pulsar producer used to publish messages to a specific topic.
///
/// This component enables sending messages to a Pulsar topic. It supports configuration
/// for schema, and other publishing parameters to ensure efficient and reliable
/// message delivery.
public final class PulsarProducer<T: PulsarPayload>: Sendable, AnyProducer {
public let producerID: UInt64
let topic: String
Expand Down
1 change: 1 addition & 0 deletions Sources/Pulsar/PulsarSchema.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Supported Pulsar schemas by the library.
public enum PulsarSchema: String, Equatable, Sendable {
case bytes
case string
Expand Down

0 comments on commit 66ff9ee

Please sign in to comment.