Skip to content

Commit

Permalink
chore: add docs
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Jun 18, 2024
1 parent fe71f83 commit 2622f25
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 43 deletions.
4 changes: 3 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ config :klife, MyClient,
[
name: "test_async_topic"
],
[name: "my_topic"]
[name: "my_topic_1"],
[name: "my_topic_2"],
[name: "my_topic_3"]
]

if config_env() == :dev do
Expand Down
2 changes: 1 addition & 1 deletion example/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ config :example, Example.MySimplestClient,
bootstrap_servers: ["localhost:19092", "localhost:29092"],
ssl: false
],
topics: [[name: "my_topic"]]
topics: [[name: "my_topic_1"]]
1 change: 0 additions & 1 deletion example/lib/example.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@ defmodule Example do
def produce_batch(client, recs, opts \\ []), do: client.produce_batch(recs, opts)
def produce_batch_txn(client, recs, opts \\ []), do: client.produce_batch_txn(recs, opts)
def transaction(client, fun, opts \\ []), do: client.transaction(fun, opts)
def in_txn?(client), do: client.in_txn?()
end
2 changes: 1 addition & 1 deletion example/test/example_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule ExampleTest do
test "simplest client" do
client = Example.MySimplestClient

rec = %Record{value: :rand.bytes(10), topic: "my_topic"}
rec = %Record{value: :rand.bytes(10), topic: "my_topic_1"}
{:ok, %Record{partition: partition, offset: offset}} = Example.produce(client, rec)

KlifeTest.assert_offset(client, rec, offset, partition: partition)
Expand Down
34 changes: 32 additions & 2 deletions lib/klife.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,38 @@ defmodule Klife do
alias Klife.TxnProducerPool
alias Klife.Producer.Controller, as: PController

@produce_opts [
producer: [
type: :atom,
required: false,
doc:
"Producer's name that will override the `default_producer` configuration. Ignored inside transactions."
],
async: [
type: :boolean,
required: false,
default: false,
doc:
"Makes the produce asynchronous. When `true` the return value will be `:ok`. Ignored inside transactions."
],
partitioner: [
type: :atom,
required: false,
doc: "Module that will override `default_partitioner` configuration."
]
]

@txn_opts [
pool_name: [
type: :atom,
required: false,
doc: "Txn pool's name that will override the `default_txn_pool` configuration."
]
]

def get_produce_opts(), do: @produce_opts
def get_txn_opts(), do: @txn_opts

def produce(%Record{} = record, client, opts \\ []) do
case produce_batch([record], client, opts) do
[resp] -> resp
Expand Down Expand Up @@ -46,8 +78,6 @@ defmodule Klife do
TxnProducerPool.run_txn(client, get_txn_pool(client, opts), fun)
end

def in_txn?(client), do: TxnProducerPool.in_txn?(client)

defp get_txn_pool(client, opts) do
case Keyword.get(opts, :pool_name) do
nil -> apply(client, :get_default_txn_pool, [])
Expand Down
141 changes: 122 additions & 19 deletions lib/klife/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ defmodule Klife.Client do
But it will look somehting like this:
```elixir
config :my_app, MyApp.Client,
config :my_app, MyApp.MyClient,
connection: [
bootstrap_servers: ["localhost:19092", "localhost:29092"],
ssl: false
Expand All @@ -126,7 +126,7 @@ defmodule Klife.Client do
You can see more configuration examples on the ["Client configuration examples"](guides/examples/client_configuration.md) section.
Configuration options:
### Configuration options:
#{NimbleOptions.docs(@input_options)}
Expand All @@ -149,7 +149,7 @@ defmodule Klife.Client do
end
```
## Interacting with Producer API
## Producer API overview
In order to interact with the producer API you will work with `Klife.Record` module
as your main input and output data structure.
Expand All @@ -176,7 +176,7 @@ defmodule Klife.Client do
```elixir
rec = %Klife.Record{value: "some_val", topic: "my_topic"}
rec = %Klife.Record{value: "some_val", topic: "my_topic_1"}
{:ok, %Klife.Record{offset: offset, partition: partition}} = MyClient.produce(rec)
```
Expand All @@ -187,35 +187,139 @@ defmodule Klife.Client do
Produce a single record.
It expects a `Klife.Record` struct containg at least `:value` and `:topic` and returns
an ok/error tuple along side with the enriched version of the input record.
an ok/error tuple along side with the enriched version of the input record as described
in ["Producer API Overview"](m:Klife.Client#producer-api-overview).
The record's enriched attribute may be:
- :offset, when the record is successfully produced
- :partition, when it is not set on the input
- :error_code, kafka's server error code
## Options
## Examples
#{NimbleOptions.docs(Klife.get_produce_opts())}
iex> rec = %Klife.Record{value: "my_val", topic: "my_topic"}
iex> {:ok, %Klife.Record{} = enriched_rec} = MyClient.produce(rec)
iex> true = is_number(enriched_rec.offset)
iex> true = is_number(enriched_rec.partition)
## Examples
iex> rec = %Klife.Record{value: "my_val", topic: "my_topic_1"}
iex> {:ok, %Klife.Record{} = enriched_rec} = MyClient.produce(rec)
iex> true = is_number(enriched_rec.offset)
iex> true = is_number(enriched_rec.partition)
"""
@callback produce(record, opts :: Keyword.t()) :: {:ok, record} | {:error, record}

@doc group: "Producer API"
@doc """
Produce a batch of records.
It expects a list of `Klife.Record` structs containg at least `:value` and `:topic` and returns
a list of ok/error tuples along side with the enriched version of the input record as described
in ["Producer API Overview"](m:Klife.Client#producer-api-overview).
The order of the response tuples on the returning list is the same as the input list. That means
the first response tuple will be related to the first record on the input and so on.
> #### Semantics and guarantees {: .info}
>
> This functions is semantically equivalent to call [`produce/2`](c:produce/2) multiple times and
> wait for all responses. Which means that 2 records sent on the same batch may succeed or
> fail independently.
>
> In other words that is no atomicity guarentees. If you need it see [`produce_batch_txn/2`](c:produce_batch_txn/2).
>
> The input list may contain records related to any topic/partition, for records of the same
> topic/partition the order between them is guaranteed to be the same of the input, for records
> of different topic/partition no order is guaranteed between them.
>
## Options
#{NimbleOptions.docs(Klife.get_produce_opts())}
## Examples
iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> [{:ok, _resp1}, {:ok, _resp2}, {:ok, _resp3}] = MyClient.produce_batch([rec1, rec2, rec3])
In order to facilitate the response handling you can use `Klife.Record.verify_batch/1` or
`Klife.Record.verify_batch!/1` functions.
## Examples
iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> input = [rec1, rec2, rec3]
iex> {:ok, [_resp1, _resp2, _resp3]} = MyClient.produce_batch(input) |> Klife.Record.verify_batch()
"""
@callback produce_batch(list_of_records, opts :: Keyword.t()) :: list({:ok | :error, record})

@doc group: "Transaction API"
@callback produce_batch_txn(list_of_records, opts :: Keyword.t()) ::
{:ok, list_of_records} | {:error, list_of_records}
@doc """
Runs the given function inside a transaction.
@doc group: "Transaction API"
Every produce API call made inside the given function will be part of a transaction
that will only commit if the returning value of fun is `:ok` or `{:ok, _any}`, any
other return value will abort all records produced inside the given function.
> #### Beware of performance costs {: .warning}
> Each produce call inside the input function may have 1 extra network roundtrip to the broker
> than a normal non transactional call.
>
> At the end of the transaction another round trip is needed in order to commit or abort
> the transaction.
> #### Produce semantics inside transaction {: .info}
> All produce API calls keeps the same semantics as they have outside a transaction. This means
> that records produced using `produce_batch/2` may still succeed/fail independently and a
> `produce/2` call may still fail. Therefore it is user's responsability to verify and abort
> the transaction if needed.
## Options
#{NimbleOptions.docs(Klife.get_txn_opts())}
## Examples
iex> {:ok, [_resp1, _resp2, _resp3]} = MyClient.transaction(fn ->
...> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
...> {:ok, resp1} = MyClient.produce(rec1)
...> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
...> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
...> [resp2, resp3] = MyClient.produce_batch([rec2, rec3])
...> {:ok, [resp1, resp2, resp3]}
...> end)
"""
@callback transaction(fun :: function(), opts :: Keyword.t()) :: any()

@doc group: "Transaction API"
@callback in_txn?() :: true | false
@doc """
Transactionally produce a batch of records.
It expects a list of `Klife.Record` structs containg at least `:value` and `:topic` and returns
a tuple ok/error tuple along side with the enriched version of the input records as described
in ["Producer API Overview"](m:Klife.Client#producer-api-overview).
The order of the response tuples on the returning list is the same as the input list. That means
the first response tuple will be related to the first record on the input and so on.
> #### Beware of performance costs {: .warning}
> Each `produce_batch_txn/2` will have 2 extra network roundtrips to the broker than a non
> transactional `produce_batch/2`. One for adding topic/partitions to the transaction and other
> to commit or abort it.
## Options
#{NimbleOptions.docs(Klife.get_txn_opts())}
## Examples
iex> rec1 = %Klife.Record{value: "my_val_1", topic: "my_topic_1"}
iex> rec2 = %Klife.Record{value: "my_val_2", topic: "my_topic_2"}
iex> rec3 = %Klife.Record{value: "my_val_3", topic: "my_topic_3"}
iex> {:ok, [_resp1, _resp2, _resp3]} = MyClient.produce_batch_txn([rec1, rec2, rec3])
"""
@callback produce_batch_txn(list_of_records, opts :: Keyword.t()) ::
{:ok, list_of_records} | {:error, list_of_records}

defmacro __using__(opts) do
input_opts = @input_options
Expand Down Expand Up @@ -311,7 +415,6 @@ defmodule Klife.Client do
def produce_batch(recs, opts \\ []), do: Klife.produce_batch(recs, __MODULE__, opts)
def produce_batch_txn(recs, opts \\ []), do: Klife.produce_batch_txn(recs, __MODULE__, opts)
def transaction(fun, opts \\ []), do: Klife.transaction(fun, __MODULE__, opts)
def in_txn?(), do: Klife.in_txn?(__MODULE__)
end
end
end
2 changes: 1 addition & 1 deletion lib/klife/connection/controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ defmodule Klife.Connection.Controller do
type: :boolean,
required: false,
default: false,
doc: "Specify the underlying socket module. Use :ssl if true and :gen_tcp if false."
doc: "Specify the underlying socket module. Use `:ssl` if true and `:gen_tcp` if false."
],
connect_opts: [
type: {:list, :any},
Expand Down
2 changes: 2 additions & 0 deletions lib/mix/tasks/benchmark.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ if Mix.env() in [:dev] do
opts = [strategy: :one_for_one, name: Benchmark.Supervisor]
{:ok, _} = Supervisor.start_link([MyClient], opts)

:ok = Klife.TestUtils.wait_producer(MyClient)

Process.sleep(1_000)
apply(Mix.Tasks.Benchmark, :do_run_bench, args)
end
Expand Down
Loading

0 comments on commit 2622f25

Please sign in to comment.