Skip to content

Commit

Permalink
moved sleep_for_reconnect() to supervisor
Browse files Browse the repository at this point in the history
  • Loading branch information
ukrbublik committed May 18, 2019
1 parent 34d2b7f commit 0cb335d
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 31 deletions.
14 changes: 2 additions & 12 deletions lib/kafka_ex/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -756,16 +756,12 @@ defmodule KafkaEx.Server do

check_brokers_sockets!(brokers)

{correlation_id, metadata} = try do
{correlation_id, metadata} =
retrieve_metadata(
brokers,
0,
config_sync_timeout()
)
rescue e ->
sleep_for_reconnect()
Kernel.reraise(e, System.stacktrace())
end

state = %State{
metadata: metadata,
Expand All @@ -789,18 +785,12 @@ defmodule KafkaEx.Server do
state
end

defp sleep_for_reconnect() do
Process.sleep(Application.get_env(:kafka_ex, :sleep_for_reconnect, 400))
end

defp check_brokers_sockets!(brokers) do
any_socket_opened = brokers
|> Enum.map(fn %Broker{socket: socket} -> !is_nil(socket) end)
|> Enum.reduce(&(&1 || &2))
if !any_socket_opened do
sleep_for_reconnect()
if !any_socket_opened, do:
raise "Brokers sockets are not opened"
end
end

defp connect_broker(host, port, ssl_opts, use_ssl) do
Expand Down
10 changes: 2 additions & 8 deletions lib/kafka_ex/server_0_p_10_and_later.ex
Original file line number Diff line number Diff line change
Expand Up @@ -99,26 +99,20 @@ defmodule KafkaEx.Server0P10AndLater do
api_versions: api_versions,
error_code: error_code
}, state} = kafka_api_versions(%State{brokers: brokers})
if error_code == :no_response do
sleep_for_reconnect()
if error_code == :no_response, do:
raise "Brokers sockets are closed"
end
:no_error = error_code

api_versions = KafkaEx.ApiVersions.api_versions_map(api_versions)

{correlation_id, metadata} = try do
{correlation_id, metadata} =
retrieve_metadata(
brokers,
state.correlation_id,
config_sync_timeout(),
[],
api_versions
)
rescue e ->
sleep_for_reconnect()
Kernel.reraise(e, System.stacktrace())
end

state = %State{
metadata: metadata,
Expand Down
6 changes: 1 addition & 5 deletions lib/kafka_ex/server_0_p_8_p_2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,8 @@ defmodule KafkaEx.Server0P8P2 do

check_brokers_sockets!(brokers)

{correlation_id, metadata} = try do
{correlation_id, metadata} =
retrieve_metadata(brokers, 0, config_sync_timeout())
rescue e ->
sleep_for_reconnect()
Kernel.reraise(e, System.stacktrace())
end

state = %State{
metadata: metadata,
Expand Down
6 changes: 1 addition & 5 deletions lib/kafka_ex/server_0_p_9_p_0.ex
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,8 @@ defmodule KafkaEx.Server0P9P0 do

check_brokers_sockets!(brokers)

{correlation_id, metadata} = try do
{correlation_id, metadata} =
retrieve_metadata(brokers, 0, config_sync_timeout())
rescue e ->
sleep_for_reconnect()
Kernel.reraise(e, System.stacktrace())
end

state = %State{
metadata: metadata,
Expand Down
11 changes: 10 additions & 1 deletion lib/kafka_ex/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,16 @@ defmodule KafkaEx.Supervisor do
end

def start_child(opts) do
Supervisor.start_child(__MODULE__, opts)
start_result = Supervisor.start_child(__MODULE__, opts)
case start_result do
{:error, _error} -> sleep_for_reconnect()
_ -> :ok
end
start_result
end

defp sleep_for_reconnect() do
:timer.sleep(Application.get_env(:kafka_ex, :sleep_for_reconnect, 400))
end

def stop_child(child) do
Expand Down

0 comments on commit 0cb335d

Please sign in to comment.