Skip to content

Commit

Permalink
docs: improve docs for various types
Browse files Browse the repository at this point in the history
  • Loading branch information
flexlixrup committed Feb 15, 2025
1 parent 46800e9 commit d3ee53a
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 19 deletions.
6 changes: 3 additions & 3 deletions Sources/Pulsar/Documentation.docc/HowToUse.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct PulsarExample {
}

// Set up a consumer
let consumer = try await client.consumer(
let consumer: PulsarProducer<String> = try await client.consumer(
topic: "persistent://public/default/my-topic",
subscription: "test",
subscriptionType: .shared
Expand Down Expand Up @@ -108,13 +108,13 @@ struct PulsarExample {
}

// Set up a producer
let producer = try await client.producer(
let producer: PulsarProducer<String> = try await client.producer(
topic: "persistent://public/default/my-topic1",
accessMode: .shared,
schema: .string
) { _ in
print("Producer closed")
} as PulsarProducer<String>
}

// Send messages in a loop
Task {
Expand Down
40 changes: 34 additions & 6 deletions Sources/Pulsar/PulsarClient/PulsarClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,42 @@ import NIO
import NIOSSL
@_exported import SchemaTypes

/// The core Pulsar Client used to connect to the server.
/// The core Pulsar Client used to establish and manage connections to an Apache Pulsar server.
///
/// This actor manages the connection to a Pulsar server and provides functionality
/// for creating and managing producers and consumers. It also handles configuration
/// of connection parameters and retry mechanisms.
/// This actor is responsible for handling communication with the Pulsar server, including
/// establishing and managing connections, handling authentication, and providing an interface
/// for creating producers and consumers. It also implements reconnection logic, secure TLS handling,
/// and resource management for active connections.
///
/// All interactions with the Pulsar messaging system, such as sending or receiving messages,
/// are controlled through this client.
/// ## Features
/// - Supports secure (TLS) and non-secure connections.
/// - Manages a pool of active connections.
/// - Handles automatic reconnections in case of network failures.
/// - Supports configuration of connection parameters including hostname, port, and reconnection limits.
/// - Provides an event-driven interface for message producers and consumers.
/// - Closes all active channels gracefully when the client shuts down.
///
/// ## Usage
/// ```swift
/// let config = PulsarClientConfiguration(host: "pulsar.example.com", port: 6650)
/// let client = try await PulsarClient(configuration: config) { error in
/// print("Client closed: \(error)")
/// }
/// ```
///
/// Once initialized, the `PulsarClient` can be used to create producers and consumers to send and receive messages.
///
/// ## Connection Management
/// - The client maintains a `connectionPool` to track open connections.
/// - If the connection is lost, it attempts to reconnect based on the `reconnectLimit` configuration.
/// - TLS settings can be specified through `PulsarClientConfiguration` to establish a secure connection.
///
/// ## Closing the Client
/// When the client is no longer needed, it should be closed using:
/// ```swift
/// try await client.close()
/// ```
/// This ensures that all resources are released and all connections are closed cleanly.
public final actor PulsarClient {
let logger = Logger(label: "PulsarClient")
let group: EventLoopGroup
Expand Down
51 changes: 48 additions & 3 deletions Sources/Pulsar/PulsarConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,54 @@

/// A Pulsar consumer used to asynchronously consume messages from a specific topic.
///
/// This class provides support for consuming messages from a Pulsar topic using various subscription types.
/// It conforms to `AsyncSequence`, enabling iteration over received messages in an asynchronous context.
/// Generic `T` represents the type of payload for the messages, conforming to `PulsarPayload`.
/// This class provides functionality for consuming messages from an Apache Pulsar topic.
/// It supports different subscription types and conforms to `AsyncSequence`, allowing
/// messages to be iterated over in an asynchronous context using Swift's `for await` syntax.
///
/// ## Features:
/// - Conforms to `AsyncSequence`, enabling structured and idiomatic message consumption.
/// - Handles message acknowledgment automatically (if `autoAcknowledge` is enabled).
/// - Supports schema-based payload deserialization.
/// - Provides explicit error handling mechanisms.
///
/// ## Usage Example:
/// ```swift
/// let consumer = PulsarConsumer<MyPayload>(
/// autoAck: true,
/// handler: myHandler,
/// consumerID: 67890,
/// topic: "persistent://public/default/my-topic",
/// subscriptionName: "my-subscription",
/// subscriptionType: .shared,
/// subscriptionMode: .durable,
/// schema: mySchema
/// )
///
/// for await message in consumer {
/// print("Received message: \(message.payload)")
/// }
///
/// try await consumer.close() // Close the consumer when done
/// ```
///
/// ## Lifecycle:
/// - The consumer is initialized with a handler, topic, subscription details, and schema.
/// - Messages are received and decoded using the specified schema.
/// - The consumer continuously yields messages via `AsyncThrowingStream<Message<T>, Error>`.
/// - The consumer can be explicitly closed using `close()`, ensuring proper resource cleanup.
///
/// ## Error Handling:
/// - If message deserialization fails, the consumer will call `fail(error:)`, terminating the stream.
/// - If an error occurs while handling messages, the stream finishes with the provided error.
/// - Closing the consumer ensures proper detachment from the Pulsar client.
///
/// - Note: This class is designed to be `Sendable`, meaning it can be safely used in concurrent contexts.
///
/// - Parameters:
/// - T: A type conforming to ``PulsarPayload``, representing the message payload.
///
/// - SeeAlso: ``PulsarProducer`` for message publishing.
///
public final class PulsarConsumer<T: PulsarPayload>: AsyncSequence, Sendable, AnyConsumer {
public let consumerID: UInt64
let autoAcknowledge: Bool
Expand Down
48 changes: 45 additions & 3 deletions Sources/Pulsar/PulsarProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,51 @@

/// A Pulsar producer used to publish messages to a specific topic.
///
/// This component enables sending messages to a Pulsar topic. It supports configuration
/// for schema, and other publishing parameters to ensure efficient and reliable
/// message delivery.
/// This class provides a mechanism for sending messages to an Apache Pulsar topic.
/// It supports both synchronous and asynchronous message publishing, allowing
/// developers to choose between guaranteed delivery with broker acknowledgment
/// (`syncSend`) and a fire-and-forget approach (`asyncSend`). The producer
/// is designed to handle different schema types and manage its lifecycle efficiently.
///
/// ## Features:
/// - Publishes messages to a specified Pulsar topic.
/// - Supports synchronous (`syncSend`) and asynchronous (`asyncSend`) message delivery.
/// - Manages producer state via `ProducerStateManager`.
/// - Configurable access mode and schema.
/// - Provides a closure (`onClosed`) to handle producer shutdown events.
///
/// ## Usage Example:
/// ```swift
/// let producer = PulsarProducer<MyPayload>(
/// handler: myHandler,
/// producerAccessMode: .exclusive,
/// producerID: 12345,
/// schema: mySchema,
/// topic: "persistent://public/default/my-topic"
/// )
///
/// try await producer.syncSend(message: myMessage) // Waits for broker response
/// try await producer.asyncSend(message: myMessage) // Fire-and-forget
/// try await producer.close() // Closes the producer
/// ```
///
/// ## Lifecycle:
/// - The producer is initialized with a handler, schema, topic, and other configurations.
/// - Messages can be sent using `syncSend` (awaits broker acknowledgment) or `asyncSend` (does not wait).
/// - The producer can be explicitly closed using `close()`, triggering the `onClosed` handler if provided.
///
/// ## Error Handling:
/// - `syncSend` throws an error if a broker acknowledgment is not received within a timeout.
/// - `asyncSend` does not throw errors for timeout issues but will throw for major failures.
/// - `close()` ensures a graceful shutdown of the producer.
///
/// - Note: This class is designed to be `Sendable`, meaning it can be used safely in concurrent contexts.
///
/// - Parameters:
/// - T: A type conforming to ``PulsarPayload``, representing the payload schema.
///
/// - SeeAlso: ``PulsarConsumer`` for message consumtion.
///
public final class PulsarProducer<T: PulsarPayload>: Sendable, AnyProducer {
public let producerID: UInt64
let topic: String
Expand Down
8 changes: 4 additions & 4 deletions Sources/PulsarExample/PulsarExample.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ struct PulsarExample {
let client = try await PulsarClient(configuration: config) { error in
print("Error: \(error)")
}
let consumer =
let consumer: PulsarConsumer<String> =
try await client.consumer(
topic: "persistent://public/default/my-topic2",
subscription: "test",
subscriptionType: .shared,
schema: .string
) as PulsarConsumer<String>
)
Task {
do {
for try await message in consumer {
Expand All @@ -91,10 +91,10 @@ struct PulsarExample {
}
}

let producer =
let producer: PulsarProducer<String> =
try await client.producer(topic: "persistent://public/default/my-topic1", accessMode: .shared, schema: .string) { _ in
print("Produer closed")
} as PulsarProducer<String>
}
Task {
while true {
do {
Expand Down

0 comments on commit d3ee53a

Please sign in to comment.