Skip to content

Commit 1745053

Browse files
committed
Add key support
1 parent c9a356c commit 1745053

File tree

7 files changed

+89
-23
lines changed

7 files changed

+89
-23
lines changed

README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,17 @@ end
2727
## Getting started
2828

2929
Check the [Getting started](https://hexdocs.pm/kafkaesque/getting-started.html) guide in Hexdocs.
30+
31+
## Updating versions
32+
To go from 1.0.0-rc.1 to 1.0.0-rc.2, an additional migration is needed:
33+
34+
```elixir
35+
defmodule MyApp.Migrations.BumpKafkaesque do
36+
use Ecto.Migration
37+
38+
def up, do: Kafkaesque.Migrations.up(:v1, :v2)
39+
def down, do: Kafkaesque.Migrations.down(:v2, :v1)
40+
end
41+
```
42+
43+
No extra steps are required if 1.0.0-rc.2 or a newer version was installed.

lib/kafkaesque.ex

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,10 @@ defmodule Kafkaesque do
5050
use the library. See the documentation of the `Kafkaesque` module for more
5151
information.
5252
"""
53-
@spec publish(Ecto.Repo.t(), String.t(), term(), String.t()) ::
54-
{:ok, Message.t()} | {:error, atom()}
55-
def publish(repo, topic, partition, payload) do
56-
message = Message.new(topic, partition, payload)
53+
@spec publish(Ecto.Repo.t(), String.t(), term(), String.t(), String.t()) ::
54+
{:ok, Message.t()} | {:error, Ecto.Changeset.t()}
55+
def publish(repo, topic, partition, key, payload) do
56+
message = Message.new(topic, partition, key, payload)
5757
repo.insert(message)
5858
end
5959

@@ -108,20 +108,23 @@ defmodule Kafkaesque do
108108
{repo, _opts} = Keyword.pop!(opts, :repo)
109109

110110
quote do
111-
@spec publish(String.t(), term()) :: {:ok, Kafkaesque.Message.t()} | {:error, atom()}
112-
def publish(topic, body) do
111+
@on_definition {Kafkaesque.CompileHooks, :on_def}
112+
113+
@spec publish(String.t(), String.t(), term()) ::
114+
{:ok, Kafkaesque.Message.t()} | {:error, Ecto.Changeset.t()}
115+
def publish(topic, key \\ "", body) do
113116
payload = encode(body)
114-
partition = partition(topic, body)
115-
Kafkaesque.publish(unquote(repo), topic, partition, payload)
117+
partition = partition(topic, key, body)
118+
Kafkaesque.publish(unquote(repo), topic, partition, key, payload)
116119
end
117120

118121
@spec encode(term()) :: String.t()
119122
def encode(body) do
120123
body
121124
end
122125

123-
@spec partition(String.t(), term()) :: integer()
124-
def partition(_topic, _body) do
126+
@spec partition(String.t(), String.t(), term()) :: integer()
127+
def partition(_topic, _key, _body) do
125128
0
126129
end
127130

@@ -137,7 +140,7 @@ defmodule Kafkaesque do
137140
Kafkaesque.child_spec(opts)
138141
end
139142

140-
defoverridable encode: 1, partition: 2
143+
defoverridable encode: 1, partition: 3
141144
end
142145
end
143146
end

lib/kafkaesque/compile_hooks.ex

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
defmodule Kafkaesque.CompileHooks do
2+
@moduledoc false
3+
4+
# We need to raise a compile error if partition/2 is defined due to a breaking
5+
# change: outbox modules are now expected to implement partition/3, not partition/2
6+
#
7+
# This way we turn the breaking change into a compile error instead of runtime
8+
# misbehaviour
9+
def on_def(_env, :def, :partition, args, _guards, _body) do
10+
if Enum.count(args) == 3 do
11+
:ok
12+
else
13+
raise "partition function is expected to have 3 arguments"
14+
end
15+
end
16+
17+
def on_def(_env, _kind, _name, _args, _guards, _body) do
18+
:ok
19+
end
20+
end

lib/kafkaesque/message.ex

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ defmodule Kafkaesque.Message do
3434
)
3535

3636
field(:partition, :integer)
37+
field(:key, :string, default: "")
3738
field(:body, :string)
3839
field(:attempt, :integer, default: 0)
3940
field(:attempted_by, :string)
@@ -44,12 +45,13 @@ defmodule Kafkaesque.Message do
4445
timestamps()
4546
end
4647

47-
@spec new(String.t(), String.t(), String.t()) :: Ecto.Changeset.t()
48-
def new(topic, partition, body) do
48+
@spec new(String.t(), String.t(), String.t(), String.t()) :: Ecto.Changeset.t()
49+
def new(topic, partition, key, body) do
4950
%__MODULE__{}
50-
|> cast(%{topic: topic, partition: partition, body: body}, [
51+
|> cast(%{topic: topic, partition: partition, body: body, key: key}, [
5152
:topic,
5253
:partition,
54+
:key,
5355
:body
5456
])
5557
|> validate_required([:topic, :partition, :body])

lib/kafkaesque/migrations.ex

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
defmodule Kafkaesque.Migrations do
22
use Ecto.Migration
33

4-
def up do
4+
# Functions independent from version
5+
def up() do
56
create table(:kafkaesque_messages, primary_key: false) do
67
add(:id, :bigserial, primary_key: true)
78
add(:state, :string, null: false, default: "pending")
89
add(:topic, :string, null: false)
910
add(:partition, :integer, null: false)
11+
add(:key, :binary, default: "")
1012
add(:body, :binary)
1113
add(:attempt, :integer, null: false, default: 0)
1214
add(:attempted_by, :string)
@@ -17,7 +19,23 @@ defmodule Kafkaesque.Migrations do
1719
end
1820
end
1921

20-
def down do
22+
def down() do
2123
drop(table(:kafkaesque_messages))
2224
end
25+
26+
def up(current, next)
27+
28+
def up(:v1, :v2) do
29+
alter table(:kafkaesque_messages) do
30+
add(:key, :binary, default: "", null: false)
31+
end
32+
end
33+
34+
def down(current, previous)
35+
36+
def down(:v2, :v1) do
37+
alter table(:kafkaesque_messages) do
38+
drop(:key)
39+
end
40+
end
2341
end

test/kafkaesque/message_test.exs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,22 @@ defmodule Kafkaesque.MessageTest do
88
topic = "sample"
99
body = "body"
1010
partition = 0
11+
key = "some"
1112

1213
assert %Ecto.Changeset{
1314
errors: [],
14-
changes: %{topic: ^topic, body: ^body, partition: ^partition}
15-
} = Message.new(topic, partition, body)
15+
changes: %{topic: ^topic, body: ^body, partition: ^partition, key: ^key}
16+
} = Message.new(topic, partition, key, body)
1617
end
1718

1819
test "returns invalid changeset for invalid input" do
1920
topic = 1
2021
body = {1, 2}
22+
key = 2
2123
partition = "notanumber"
2224

23-
assert %Ecto.Changeset{errors: [topic: _, partition: _, body: _]} =
24-
Message.new(topic, partition, body)
25+
assert %Ecto.Changeset{errors: [topic: _, partition: _, key: _, body: _]} =
26+
Message.new(topic, partition, key, body)
2527
end
2628
end
2729
end

test/kafkaesque_test.exs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,23 @@ defmodule KafkaesqueTest do
55

66
describe "publish/4" do
77
test "inserts valid messages" do
8-
{:ok, %Kafkaesque.Message{}} = Kafkaesque.publish(Repo, "test_topic", 0, "content")
8+
{:ok, %Kafkaesque.Message{}} = Kafkaesque.publish(Repo, "test_topic", 0, "", "content")
99
end
1010

1111
test "errors for invalid messages" do
1212
invalid_topic = 1
1313
invalid_body = {1, 2}
14+
invalid_key = 2
1415
invalid_partition = "notanumber"
1516

16-
assert {:error, %Ecto.Changeset{}} =
17-
Kafkaesque.publish(Repo, invalid_topic, invalid_partition, invalid_body)
17+
assert {:error, %Ecto.Changeset{errors: [topic: _, partition: _, key: _, body: _]}} =
18+
Kafkaesque.publish(
19+
Repo,
20+
invalid_topic,
21+
invalid_partition,
22+
invalid_key,
23+
invalid_body
24+
)
1825
end
1926
end
2027
end

0 commit comments

Comments
 (0)