Skip to content

Commit

Permalink
Copy supervisor3 from klarna/supervisor3 into brod (kafka4beam#556)
Browse files Browse the repository at this point in the history
* Copy supervisor3 from klarna/supervisor3

I ran the following command:
```
curl https://raw.githubusercontent.com/klarna/supervisor3/master/src/supervisor3.erl -o src/brod_supervisor3.erl
```

* Remove supervisor3 as a dependency

* replace all supervisor3 usage for brod_supervisor3

* Fixed stuck supervisor when received try_again_restart with delayed_restart in state

This fix has been copied from [1]. The pull request contains more
explanation about the problem. Full credit of the fix to the author
in github egertak

[1] klarna/supervisor3#24

* Fix linter errors

Include some refactor of the functions to avoid linter errors

* Fix rebar.lock syntax

---------

Co-authored-by: Gonzalo Bella Fernandez <[email protected]>
Co-authored-by: ieQu1 <[email protected]>
Co-authored-by: Zaiming (Stone) Shi <[email protected]>
  • Loading branch information
4 people authored Jun 8, 2023
1 parent 5a78a14 commit 0582dbb
Show file tree
Hide file tree
Showing 7 changed files with 1,620 additions and 34 deletions.
3 changes: 1 addition & 2 deletions rebar.config
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
{deps, [ {supervisor3, "1.1.12"}
, {kafka_protocol, "4.1.2"}
{deps, [ {kafka_protocol, "4.1.2"}
, {snappyer, "1.2.9"}
]}.
{project_plugins, [{rebar3_lint, "~> 1.0.2"}]}.
Expand Down
6 changes: 2 additions & 4 deletions rebar.lock
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
{"1.2.0",
[{<<"crc32cer">>,{pkg,<<"crc32cer">>,<<"0.1.8">>},1},
{<<"kafka_protocol">>,{pkg,<<"kafka_protocol">>,<<"4.1.2">>},0},
{<<"snappyer">>,{pkg,<<"snappyer">>,<<"1.2.9">>},0},
{<<"supervisor3">>,{pkg,<<"supervisor3">>,<<"1.1.12">>},0}]}.
{<<"snappyer">>,{pkg,<<"snappyer">>,<<"1.2.9">>},0}]}.
[
{pkg_hash,[
{<<"crc32cer">>, <<"C6C2275C5FB60A95F4935D414F30B50EE9CFED494081C9B36EBB02EDFC2F48DB">>},
{<<"kafka_protocol">>, <<"EE45DB88BD83526B1D3079DE19D5331AD7D755E549C00EC6F2CAA47C591E9588">>},
{<<"snappyer">>, <<"9CC58470798648CE34C662CA0AA6DAAE31367667714C9A543384430A3586E5D3">>},
{<<"supervisor3">>, <<"2FAB1AF26BB9F8AE07692BB30EF79D5F1940E1587EFF9C14C6C8B04B16B400A8">>}]},
{<<"snappyer">>, <<"9CC58470798648CE34C662CA0AA6DAAE31367667714C9A543384430A3586E5D3">>}]},
{pkg_hash_ext,[
{<<"crc32cer">>, <<"251499085482920DEB6C9B7AADABF9FB4C432F96ADD97AB42AEE4501E5B6F591">>},
{<<"kafka_protocol">>, <<"04AD1A8CD2A57479907AA049489149FEBCCDDAD247338718A1A77E64F50E4B07">>},
Expand Down
2 changes: 1 addition & 1 deletion src/brod.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
[{description,"Apache Kafka Erlang client library"},
{vsn,"git"},
{registered,[]},
{applications,[kernel,stdlib,kafka_protocol,supervisor3,snappyer]},
{applications,[kernel,stdlib,kafka_protocol,snappyer]},
{env,[]},
{mod, {brod, []}},
{modules,[]},
Expand Down
18 changes: 9 additions & 9 deletions src/brod_consumers_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
%%%=============================================================================

-module(brod_consumers_sup).
-behaviour(supervisor3).
-behaviour(brod_supervisor3).

-export([ init/1
, post_init/1
Expand All @@ -46,21 +46,21 @@
%% @doc Start a root consumers supervisor.
-spec start_link() -> {ok, pid()}.
start_link() ->
supervisor3:start_link(?MODULE, ?TOPICS_SUP).
brod_supervisor3:start_link(?MODULE, ?TOPICS_SUP).

%% @doc Dynamically start a per-topic supervisor.
-spec start_consumer(pid(), pid(), brod:topic(), brod:consumer_config()) ->
{ok, pid()} | {error, any()}.
start_consumer(SupPid, ClientPid, TopicName, Config) ->
Spec = consumers_sup_spec(ClientPid, TopicName, Config),
supervisor3:start_child(SupPid, Spec).
brod_supervisor3:start_child(SupPid, Spec).


%% @doc Dynamically stop a per-topic supervisor.
-spec stop_consumer(pid(), brod:topic()) -> ok | {error, any()}.
stop_consumer(SupPid, TopicName) ->
supervisor3:terminate_child(SupPid, TopicName),
supervisor3:delete_child(SupPid, TopicName).
brod_supervisor3:terminate_child(SupPid, TopicName),
brod_supervisor3:delete_child(SupPid, TopicName).

%% @doc Find a brod_consumer process pid running under ?PARTITIONS_SUP
-spec find_consumer(pid(), brod:topic(), brod:partition()) ->
Expand All @@ -69,14 +69,14 @@ stop_consumer(SupPid, TopicName) ->
| {consumer_not_found, brod:topic(), brod:partition()}
| {consumer_down, any()}.
find_consumer(SupPid, Topic, Partition) ->
case supervisor3:find_child(SupPid, Topic) of
case brod_supervisor3:find_child(SupPid, Topic) of
[] ->
%% no such topic worker started,
%% check sys.config or brod:start_link_client args
{error, {consumer_not_found, Topic}};
[PartitionsSupPid] ->
try
case supervisor3:find_child(PartitionsSupPid, Partition) of
case brod_supervisor3:find_child(PartitionsSupPid, Partition) of
[] ->
%% no such partition?
{error, {consumer_not_found, Topic, Partition}};
Expand All @@ -88,7 +88,7 @@ find_consumer(SupPid, Topic, Partition) ->
end
end.

%% @doc supervisor3 callback.
%% @doc brod_supervisor3 callback.
init(?TOPICS_SUP) ->
{ok, {{one_for_one, 0, 1}, []}};
init({?PARTITIONS_SUP, _ClientPid, _Topic, _Config}) ->
Expand Down Expand Up @@ -132,7 +132,7 @@ consumers_sup_spec(ClientPid, TopicName, Config0) ->
Config = proplists:delete(topic_restart_delay_seconds, Config0),
Args = [?MODULE, {?PARTITIONS_SUP, ClientPid, TopicName, Config}],
{ _Id = TopicName
, _Start = {supervisor3, start_link, Args}
, _Start = {brod_supervisor3, start_link, Args}
, _Restart = {permanent, DelaySecs}
, _Shutdown = infinity
, _Type = supervisor
Expand Down
20 changes: 10 additions & 10 deletions src/brod_producers_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
%%%=============================================================================

-module(brod_producers_sup).
-behaviour(supervisor3).
-behaviour(brod_supervisor3).

-export([ init/1
, post_init/1
Expand All @@ -35,7 +35,7 @@
-define(TOPICS_SUP, brod_producers_sup).
-define(PARTITIONS_SUP, brod_producers_sup2).

%% Minimum delay seconds to work with supervisor3
%% Minimum delay seconds to work with brod_supervisor3
-define(MIN_SUPERVISOR3_DELAY_SECS, 1).

%% By default, restart ?PARTITIONS_SUP after a 10-seconds delay
Expand All @@ -51,20 +51,20 @@
%% @end
-spec start_link() -> {ok, pid()}.
start_link() ->
supervisor3:start_link(?MODULE, ?TOPICS_SUP).
brod_supervisor3:start_link(?MODULE, ?TOPICS_SUP).

%% @doc Dynamically start a per-topic supervisor
-spec start_producer(pid(), pid(), brod:topic(), brod:producer_config()) ->
{ok, pid()} | {error, any()}.
start_producer(SupPid, ClientPid, TopicName, Config) ->
Spec = producers_sup_spec(ClientPid, TopicName, Config),
supervisor3:start_child(SupPid, Spec).
brod_supervisor3:start_child(SupPid, Spec).

%% @doc Dynamically stop a per-topic supervisor
-spec stop_producer(pid(), brod:topic()) -> ok | {}.
stop_producer(SupPid, TopicName) ->
supervisor3:terminate_child(SupPid, TopicName),
supervisor3:delete_child(SupPid, TopicName).
brod_supervisor3:terminate_child(SupPid, TopicName),
brod_supervisor3:delete_child(SupPid, TopicName).

%% @doc Find a brod_producer process pid running under ?PARTITIONS_SUP.
-spec find_producer(pid(), brod:topic(), brod:partition()) ->
Expand All @@ -73,14 +73,14 @@ stop_producer(SupPid, TopicName) ->
| {producer_not_found, brod:topic(), brod:partition()}
| {producer_down, any()}.
find_producer(SupPid, Topic, Partition) ->
case supervisor3:find_child(SupPid, Topic) of
case brod_supervisor3:find_child(SupPid, Topic) of
[] ->
%% no such topic worker started,
%% check sys.config or brod:start_link_client args
{error, {producer_not_found, Topic}};
[PartitionsSupPid] ->
try
case supervisor3:find_child(PartitionsSupPid, Partition) of
case brod_supervisor3:find_child(PartitionsSupPid, Partition) of
[] ->
%% no such partition?
{error, {producer_not_found, Topic, Partition}};
Expand All @@ -92,7 +92,7 @@ find_producer(SupPid, Topic, Partition) ->
end
end.

%% @doc supervisor3 callback.
%% @doc brod_supervisor3 callback.
init(?TOPICS_SUP) ->
{ok, {{one_for_one, 0, 1}, []}};
init({?PARTITIONS_SUP, _ClientPid, _Topic, _Config}) ->
Expand All @@ -119,7 +119,7 @@ producers_sup_spec(ClientPid, TopicName, Config0) ->
?DEFAULT_PARTITIONS_SUP_RESTART_DELAY),
Args = [?MODULE, {?PARTITIONS_SUP, ClientPid, TopicName, Config}],
{ _Id = TopicName
, _Start = {supervisor3, start_link, Args}
, _Start = {brod_supervisor3, start_link, Args}
, _Restart = {permanent, DelaySecs}
, _Shutdown = infinity
, _Type = supervisor
Expand Down
16 changes: 8 additions & 8 deletions src/brod_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
%%%=============================================================================

-module(brod_sup).
-behaviour(supervisor3).
-behaviour(brod_supervisor3).

-export([ init/1
, post_init/1
Expand Down Expand Up @@ -101,28 +101,28 @@
%% @end
-spec start_link() -> {ok, pid()}.
start_link() ->
supervisor3:start_link({local, ?SUP}, ?MODULE, clients_sup).
brod_supervisor3:start_link({local, ?SUP}, ?MODULE, clients_sup).

-spec start_client([brod:endpoint()],
brod:client_id(),
brod:client_config()) -> ok | {error, any()}.
start_client(Endpoints, ClientId, Config) ->
ClientSpec = client_spec(Endpoints, ClientId, Config),
case supervisor3:start_child(?SUP, ClientSpec) of
case brod_supervisor3:start_child(?SUP, ClientSpec) of
{ok, _Pid} -> ok;
Error -> Error
end.

-spec stop_client(brod:client_id()) -> ok | {error, any()}.
stop_client(ClientId) ->
_ = supervisor3:terminate_child(?SUP, ClientId),
supervisor3:delete_child(?SUP, ClientId).
_ = brod_supervisor3:terminate_child(?SUP, ClientId),
brod_supervisor3:delete_child(?SUP, ClientId).

-spec find_client(brod:client_id()) -> [pid()].
find_client(Client) ->
supervisor3:find_child(?SUP, Client).
brod_supervisor3:find_child(?SUP, Client).

%% @doc supervisor3 callback
%% @doc brod_supervisor3 callback
init(clients_sup) ->
%% start and link it to root supervisor
{ok, _} = brod_kafka_apis:start_link(),
Expand All @@ -139,7 +139,7 @@ init(clients_sup) ->
%% before supervisor tries to restart it.
{ok, {{one_for_one, 0, 1}, ClientSpecs}}.

%% @doc supervisor3 callback.
%% @doc brod_supervisor3 callback.
post_init(_) ->
ignore.

Expand Down
Loading

0 comments on commit 0582dbb

Please sign in to comment.