From a746114bd2a673b438bb88234eb7966d556288df Mon Sep 17 00:00:00 2001 From: Denis Oblogin Date: Sun, 5 May 2019 23:57:16 +0300 Subject: [PATCH 1/4] Attempt of better error handling (#190) - When kafka connection is lost, handle relative errors gracefully. - Retry to recreate supervision tree with timeout. Added config sleep_for_reconnect, default 400. - Also fixed termination of ConsumerGroup.Manager to guaranteed stop worker. --- config/config.exs | 2 + lib/kafka_ex/consumer_group/heartbeat.ex | 5 +++ lib/kafka_ex/consumer_group/manager.ex | 49 ++++++++++++++++------ lib/kafka_ex/gen_consumer.ex | 42 ++++++++++++------- lib/kafka_ex/network_client.ex | 4 ++ lib/kafka_ex/protocol/api_versions.ex | 4 ++ lib/kafka_ex/protocol/consumer_metadata.ex | 6 +++ lib/kafka_ex/server.ex | 22 +++++++++- lib/kafka_ex/server_0_p_10_and_later.ex | 21 +++++++--- lib/kafka_ex/server_0_p_8_p_2.ex | 8 +++- lib/kafka_ex/server_0_p_9_p_0.ex | 8 +++- 11 files changed, 135 insertions(+), 36 deletions(-) diff --git a/config/config.exs b/config/config.exs index 72eb7384..2ca9f28c 100644 --- a/config/config.exs +++ b/config/config.exs @@ -52,6 +52,8 @@ config :kafka_ex, # Threshold number of messages consumed for GenConsumer to commit offsets # to the broker. commit_threshold: 100, + # Interval in milliseconds to wait before reconnect to kafka + sleep_for_reconnect: 400, # This is the flag that enables use of ssl use_ssl: true, # see SSL OPTION DESCRIPTIONS - CLIENT SIDE at http://erlang.org/doc/man/ssl.html diff --git a/lib/kafka_ex/consumer_group/heartbeat.ex b/lib/kafka_ex/consumer_group/heartbeat.ex index e074649b..056d62a5 100644 --- a/lib/kafka_ex/consumer_group/heartbeat.ex +++ b/lib/kafka_ex/consumer_group/heartbeat.ex @@ -79,6 +79,11 @@ defmodule KafkaEx.ConsumerGroup.Heartbeat do %HeartbeatResponse{error_code: error_code} -> Logger.warn("Heartbeat failed, got error code #{error_code}") {:stop, {:shutdown, {:error, error_code}}, state} + + {:error, reason} -> + Logger.warn("Heartbeat failed, got error reason #{inspect reason}") + {:stop, {:shutdown, {:error, reason}}, state} + end end end diff --git a/lib/kafka_ex/consumer_group/manager.ex b/lib/kafka_ex/consumer_group/manager.ex index 6869e921..f538f54b 100644 --- a/lib/kafka_ex/consumer_group/manager.ex +++ b/lib/kafka_ex/consumer_group/manager.ex @@ -192,15 +192,30 @@ defmodule KafkaEx.ConsumerGroup.Manager do {:noreply, state} end + # If the heartbeat gets an unrecoverable error. + def handle_info( + {:EXIT, _heartbeat_timer, {:shutdown, {:error, _reason}}}, + %State{} = state + ) do + {:noreply, state} + end + # When terminating, inform the group coordinator that this member is leaving # the group so that the group can rebalance without waiting for a session # timeout. - def terminate(_reason, %State{generation_id: nil, member_id: nil}), do: :ok + def terminate(_reason, %State{generation_id: nil, member_id: nil} = state) do + Process.unlink(state.worker_name) + KafkaEx.stop_worker(state.worker_name) + end def terminate(_reason, %State{} = state) do {:ok, _state} = leave(state) Process.unlink(state.worker_name) KafkaEx.stop_worker(state.worker_name) + + # should be at end because of race condition (stop heartbeat while it is shutting down) + # if race condition happens, worker will be abandoned + stop_heartbeat_timer(state) end ### Helpers @@ -244,9 +259,14 @@ defmodule KafkaEx.ConsumerGroup.Manager do # crash the worker if we recieve an error, but do it with a meaningful # error message - if join_response.error_code != :no_error do - raise "Error joining consumer group #{group_name}: " <> - "#{inspect(join_response.error_code)}" + case join_response do + %{error_code: :no_error} -> :ok + %{error_code: error_code} -> + raise "Error joining consumer group #{group_name}: " <> + "#{inspect(error_code)}" + {:error, reason} -> + raise "Error joining consumer group #{group_name}: " <> + "#{inspect(reason)}" end Logger.debug(fn -> "Joined consumer group #{group_name}" end) @@ -331,7 +351,6 @@ defmodule KafkaEx.ConsumerGroup.Manager do member_id: member_id } = state ) do - stop_heartbeat_timer(state) leave_request = %LeaveGroupRequest{ group_name: group_name, @@ -341,13 +360,19 @@ defmodule KafkaEx.ConsumerGroup.Manager do leave_group_response = KafkaEx.leave_group(leave_request, worker_name: worker_name) - if leave_group_response.error_code == :no_error do - Logger.debug(fn -> "Left consumer group #{group_name}" end) - else - Logger.warn(fn -> - "Received error #{inspect(leave_group_response.error_code)}, " <> - "consumer group manager will exit regardless." - end) + case leave_group_response do + %{error_code: :no_error} -> + Logger.debug(fn -> "Left consumer group #{group_name}" end) + %{error_code: error_code} -> + Logger.warn(fn -> + "Received error #{inspect(error_code)}, " <> + "consumer group manager will exit regardless." + end) + {:error, reason} -> + Logger.warn(fn -> + "Received error #{inspect(reason)}, " <> + "consumer group manager will exit regardless." + end) end {:ok, state} diff --git a/lib/kafka_ex/gen_consumer.ex b/lib/kafka_ex/gen_consumer.ex index dcb1517d..78944921 100644 --- a/lib/kafka_ex/gen_consumer.ex +++ b/lib/kafka_ex/gen_consumer.ex @@ -627,9 +627,12 @@ defmodule KafkaEx.GenConsumer do end def handle_info(:timeout, %State{} = state) do - new_state = consume(state) - - {:noreply, new_state, 0} + case consume(state) do + {:error, reason} -> + {:stop, reason, state} + new_state -> + {:noreply, new_state, 0} + end end def handle_info( @@ -668,20 +671,23 @@ defmodule KafkaEx.GenConsumer do fetch_options: fetch_options } = state ) do - [ - %FetchResponse{ - topic: ^topic, - partitions: [ - response = %{error_code: error_code, partition: ^partition} - ] - } - ] = - KafkaEx.fetch( - topic, - partition, - Keyword.merge(fetch_options, offset: offset) - ) + response = KafkaEx.fetch( + topic, + partition, + Keyword.merge(fetch_options, offset: offset) + ) + response + |> handle_fetch_response(state) + end + defp handle_fetch_response([ + %FetchResponse{ + topic: _topic, + partitions: [ + response = %{error_code: error_code, partition: _partition} + ] + } + ], state) do state = case error_code do :offset_out_of_range -> @@ -700,6 +706,10 @@ defmodule KafkaEx.GenConsumer do end end + defp handle_fetch_response(error, _state) do + {:error, error} + end + defp handle_message_set( message_set, %State{ diff --git a/lib/kafka_ex/network_client.ex b/lib/kafka_ex/network_client.ex index 63b4a8e8..df6ba514 100644 --- a/lib/kafka_ex/network_client.ex +++ b/lib/kafka_ex/network_client.ex @@ -96,6 +96,10 @@ defmodule KafkaEx.NetworkClient do response end + def send_sync_request(nil, _, _) do + {:error, :no_broker} + end + @spec format_host(binary) :: [char] | :inet.ip_address() def format_host(host) do case Regex.scan(~r/^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$/, host) do diff --git a/lib/kafka_ex/protocol/api_versions.ex b/lib/kafka_ex/protocol/api_versions.ex index e03fc86b..945cddca 100644 --- a/lib/kafka_ex/protocol/api_versions.ex +++ b/lib/kafka_ex/protocol/api_versions.ex @@ -70,6 +70,10 @@ defmodule KafkaEx.Protocol.ApiVersions do } end + def parse_response(nil, _this_api_version) do + %Response{error_code: :no_response} + end + defp parse_rest_of_response(api_versions_count, data, this_api_version) do {api_versions, remaining_data} = Protocol.Common.read_array( diff --git a/lib/kafka_ex/protocol/consumer_metadata.ex b/lib/kafka_ex/protocol/consumer_metadata.ex index 346bdf01..55854d18 100644 --- a/lib/kafka_ex/protocol/consumer_metadata.ex +++ b/lib/kafka_ex/protocol/consumer_metadata.ex @@ -55,4 +55,10 @@ defmodule KafkaEx.Protocol.ConsumerMetadata do error_code: Protocol.error(error_code) } end + + def parse_response(nil) do + %Response{ + error_code: :no_response + } + end end diff --git a/lib/kafka_ex/server.ex b/lib/kafka_ex/server.ex index 3c2db8c8..bb3fda5b 100644 --- a/lib/kafka_ex/server.ex +++ b/lib/kafka_ex/server.ex @@ -754,12 +754,18 @@ defmodule KafkaEx.Server do connect_broker(host, port, ssl_options, use_ssl) end - {correlation_id, metadata} = + check_brokers_sockets!(brokers) + + {correlation_id, metadata} = try do retrieve_metadata( brokers, 0, config_sync_timeout() ) + rescue e -> + sleep_for_reconnect() + Kernel.reraise(e, System.stacktrace()) + end state = %State{ metadata: metadata, @@ -783,6 +789,20 @@ defmodule KafkaEx.Server do state end + defp sleep_for_reconnect() do + Process.sleep(Application.get_env(:kafka_ex, :sleep_for_reconnect, 400)) + end + + defp check_brokers_sockets!(brokers) do + any_socket_opened = brokers + |> Enum.map(fn %Broker{socket: socket} -> !is_nil(socket) end) + |> Enum.reduce(&(&1 || &2)) + if !any_socket_opened do + sleep_for_reconnect() + raise "Brokers sockets are not opened" + end + end + defp connect_broker(host, port, ssl_opts, use_ssl) do %Broker{ host: host, diff --git a/lib/kafka_ex/server_0_p_10_and_later.ex b/lib/kafka_ex/server_0_p_10_and_later.ex index 4ecbe1f6..6cf2cd07 100644 --- a/lib/kafka_ex/server_0_p_10_and_later.ex +++ b/lib/kafka_ex/server_0_p_10_and_later.ex @@ -92,15 +92,22 @@ defmodule KafkaEx.Server0P10AndLater do } end) + check_brokers_sockets!(brokers) + {_, - %KafkaEx.Protocol.ApiVersions.Response{ - api_versions: api_versions, - error_code: :no_error - }, state} = kafka_api_versions(%State{brokers: brokers}) + %KafkaEx.Protocol.ApiVersions.Response{ + api_versions: api_versions, + error_code: error_code + }, state} = kafka_api_versions(%State{brokers: brokers}) + if error_code == :no_response do + sleep_for_reconnect() + raise "Brokers sockets are closed" + end + :no_error = error_code api_versions = KafkaEx.ApiVersions.api_versions_map(api_versions) - {correlation_id, metadata} = + {correlation_id, metadata} = try do retrieve_metadata( brokers, state.correlation_id, @@ -108,6 +115,10 @@ defmodule KafkaEx.Server0P10AndLater do [], api_versions ) + rescue e -> + sleep_for_reconnect() + Kernel.reraise(e, System.stacktrace()) + end state = %State{ metadata: metadata, diff --git a/lib/kafka_ex/server_0_p_8_p_2.ex b/lib/kafka_ex/server_0_p_8_p_2.ex index 9b47ac17..c407cc4b 100644 --- a/lib/kafka_ex/server_0_p_8_p_2.ex +++ b/lib/kafka_ex/server_0_p_8_p_2.ex @@ -72,8 +72,14 @@ defmodule KafkaEx.Server0P8P2 do } end) - {correlation_id, metadata} = + check_brokers_sockets!(brokers) + + {correlation_id, metadata} = try do retrieve_metadata(brokers, 0, config_sync_timeout()) + rescue e -> + sleep_for_reconnect() + Kernel.reraise(e, System.stacktrace()) + end state = %State{ metadata: metadata, diff --git a/lib/kafka_ex/server_0_p_9_p_0.ex b/lib/kafka_ex/server_0_p_9_p_0.ex index 5d4af795..8d305172 100644 --- a/lib/kafka_ex/server_0_p_9_p_0.ex +++ b/lib/kafka_ex/server_0_p_9_p_0.ex @@ -98,8 +98,14 @@ defmodule KafkaEx.Server0P9P0 do } end) - {correlation_id, metadata} = + check_brokers_sockets!(brokers) + + {correlation_id, metadata} = try do retrieve_metadata(brokers, 0, config_sync_timeout()) + rescue e -> + sleep_for_reconnect() + Kernel.reraise(e, System.stacktrace()) + end state = %State{ metadata: metadata, From 34d2b7fece7727a24307581e1f6876c7acca774c Mon Sep 17 00:00:00 2001 From: Denis Oblogin Date: Sun, 12 May 2019 00:57:34 +0300 Subject: [PATCH 2/4] Fixed specs in responses where {:error, atom} is possible on Kafka shutdown --- lib/kafka_ex/protocol/heartbeat.ex | 2 +- lib/kafka_ex/protocol/join_group.ex | 2 +- lib/kafka_ex/protocol/leave_group.ex | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/kafka_ex/protocol/heartbeat.ex b/lib/kafka_ex/protocol/heartbeat.ex index 5284b4ba..81e5a602 100644 --- a/lib/kafka_ex/protocol/heartbeat.ex +++ b/lib/kafka_ex/protocol/heartbeat.ex @@ -19,7 +19,7 @@ defmodule KafkaEx.Protocol.Heartbeat do # We could just return the error code instead of having the struct, but this # keeps the code normalized defstruct error_code: nil - @type t :: %Response{error_code: atom | integer} + @type t :: %Response{error_code: atom | integer} | {:error, atom} end @spec create_request(integer, binary, Request.t()) :: binary diff --git a/lib/kafka_ex/protocol/join_group.ex b/lib/kafka_ex/protocol/join_group.ex index c5bc5a34..17fd1024 100644 --- a/lib/kafka_ex/protocol/join_group.ex +++ b/lib/kafka_ex/protocol/join_group.ex @@ -35,7 +35,7 @@ defmodule KafkaEx.Protocol.JoinGroup do leader_id: binary, member_id: binary, members: [binary] - } + } | {:error, atom} def leader?(%__MODULE__{member_id: member_id, leader_id: leader_id}) do member_id == leader_id diff --git a/lib/kafka_ex/protocol/leave_group.ex b/lib/kafka_ex/protocol/leave_group.ex index a4549efa..e81aa2dd 100644 --- a/lib/kafka_ex/protocol/leave_group.ex +++ b/lib/kafka_ex/protocol/leave_group.ex @@ -16,7 +16,7 @@ defmodule KafkaEx.Protocol.LeaveGroup do @type t :: %Response{ error_code: atom | integer - } + } | {:error, atom} end @spec create_request(integer, binary, Request.t()) :: binary From 60ac0a253193bc9b6b6988d6faaee1d0dbf7e133 Mon Sep 17 00:00:00 2001 From: Denis Oblogin Date: Mon, 20 May 2019 02:20:17 +0300 Subject: [PATCH 3/4] stutdown manager on unrecoverable error --- lib/kafka_ex/consumer_group/manager.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/kafka_ex/consumer_group/manager.ex b/lib/kafka_ex/consumer_group/manager.ex index f538f54b..09c04976 100644 --- a/lib/kafka_ex/consumer_group/manager.ex +++ b/lib/kafka_ex/consumer_group/manager.ex @@ -194,10 +194,10 @@ defmodule KafkaEx.ConsumerGroup.Manager do # If the heartbeat gets an unrecoverable error. def handle_info( - {:EXIT, _heartbeat_timer, {:shutdown, {:error, _reason}}}, + {:EXIT, _heartbeat_timer, {:shutdown, {:error, reason}}}, %State{} = state ) do - {:noreply, state} + {:stop, {:shutdown, {:error, reason}}, state} end # When terminating, inform the group coordinator that this member is leaving From ec9299b9fa64e4ba1e25560625f80000467469e8 Mon Sep 17 00:00:00 2001 From: Denis Oblogin Date: Mon, 20 May 2019 02:20:37 +0300 Subject: [PATCH 4/4] restart consumer group together with manager consumer group --- lib/kafka_ex/consumer_group.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/kafka_ex/consumer_group.ex b/lib/kafka_ex/consumer_group.ex index 35bd6da1..dd1909a4 100644 --- a/lib/kafka_ex/consumer_group.ex +++ b/lib/kafka_ex/consumer_group.ex @@ -349,7 +349,7 @@ defmodule KafkaEx.ConsumerGroup do ) ] - supervise(children, strategy: :one_for_all) + supervise(children, strategy: :one_for_all, max_restarts: 0, max_seconds: 1) end defp call_manager(supervisor_pid, call) do