Skip to content

Commit

Permalink
chore: standardization and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Jun 11, 2024
1 parent 37f067f commit 5fc9a4e
Show file tree
Hide file tree
Showing 19 changed files with 440 additions and 207 deletions.
74 changes: 30 additions & 44 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,58 +45,44 @@ config :klife, Klife.MyCluster,
]
],
topics: [
%{
[
name: "benchmark_topic_0",
producer: :benchmark_producer,
num_partitions: 30,
replication_factor: 2
},
%{
default_producer: :benchmark_producer
],
[
name: "benchmark_topic_1",
producer: :benchmark_producer,
num_partitions: 30,
replication_factor: 2
},
%{
default_producer: :benchmark_producer
],
[
name: "benchmark_topic_2",
producer: :benchmark_producer,
num_partitions: 30,
replication_factor: 2
},
%{
default_producer: :benchmark_producer
],
[
name: "benchmark_topic_in_flight",
producer: :benchmark_producer_in_flight,
num_partitions: 30,
replication_factor: 2
},
%{
default_producer: :benchmark_producer_in_flight
],
[
name: "benchmark_topic_in_flight_linger",
producer: :benchmark_producer_in_flight_linger,
num_partitions: 30,
replication_factor: 2
},
%{
default_producer: :benchmark_producer_in_flight_linger
],
[
name: "test_batch_topic",
enable_produce: true,
producer: :test_batch_producer
},
%{
default_producer: :test_batch_producer
],
[
name: "test_compression_topic",
producer: :test_batch_compressed_producer
},
%{
name: "test_no_batch_topic",
enable_produce: true
},
%{
default_producer: :test_batch_compressed_producer
],
[
name: "test_no_batch_topic"
],
[
name: "test_no_batch_topic_2",
enable_produce: true,
partitioner: Klife.TestCustomPartitioner
},
%{
name: "test_async_topic",
enable_produce: true
}
default_partitioner: Klife.TestCustomPartitioner
],
[
name: "test_async_topic"
]
]

if config_env() == :dev do
Expand Down
36 changes: 36 additions & 0 deletions guides/examples/cluster_configuration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Cluster configuration

Here are some cluster configuration examples.

## Simplest configuration

```elixir
config :my_app, MyApp.Cluster,
connection: [
bootstrap_servers: ["localhost:19092", "localhost:29092"],
ssl: false
],
topics: [[name: "my_topic_0"]]
```

This cluster will connect to brokers using non ssl connection and produce messages only to topic `my_topic` using the default producer and default partitioner.

## SSL and custom socket opts

```elixir
config :my_app, MyApp.Cluster,
connection: [
bootstrap_servers: ["localhost:19092", "localhost:29092"],
ssl: true,
connect_opts: [
verify: :verify_peer,
cacertfile: Path.relative("test/compose_files/ssl/ca.crt")
],
socket_opts: [delay_send: true]
],
topics: [[name: "my_topic_0"]]
```

This cluster will connect to brokers using ssl connection, `connect_opts` and `socket_opts` are forwarded to erlang module `:ssl` in order to proper configure the socket. See the documentation for more details.

## TODO: DO MORE EXAMPLES
28 changes: 9 additions & 19 deletions lib/klife.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
defmodule Klife do
@moduledoc """
Main functions to interact with clusters.
Usually you will not need to call any function here directly
but instead use them through a module that use `Klife.Cluster`.
"""

alias Klife.Record
alias Klife.Producer
alias Klife.TxnProducerPool
Expand Down Expand Up @@ -29,7 +36,7 @@ defmodule Klife do

def produce_batch_txn([%Record{} | _] = records, cluster, opts \\ []) do
transaction(
fn -> records |> produce_batch(cluster, opts) |> verify_batch() end,
fn -> records |> produce_batch(cluster, opts) |> Record.verify_batch() end,
cluster,
opts
)
Expand All @@ -39,24 +46,7 @@ defmodule Klife do
TxnProducerPool.run_txn(cluster, get_txn_pool(opts), fun)
end

def verify_batch(produce_resps) do
case Enum.group_by(produce_resps, &elem(&1, 0), &elem(&1, 1)) do
%{error: error_list} ->
{:error, error_list}

%{ok: resp} ->
{:ok, resp}
end
end

def verify_batch!(produce_resps) do
case verify_batch(produce_resps) do
{:ok, resp} -> resp
{:error, errors} -> raise "Error on batch verification. #{inspect(errors)}"
end
end

defp get_txn_pool(opts), do: Keyword.get(opts, :txn_pool, :klife_txn_pool)
defp get_txn_pool(opts), do: Keyword.get(opts, :txn_pool, Klife.Cluster.default_txn_pool_name())

defp maybe_add_partition(%Record{} = record, cluster, opts) do
case record do
Expand Down
Loading

0 comments on commit 5fc9a4e

Please sign in to comment.