From a70892a1a7cce19af005ae30cdd64222038cbf12 Mon Sep 17 00:00:00 2001 From: ashwini Date: Wed, 14 Feb 2018 13:42:30 +0900 Subject: [PATCH 1/3] Avoid parsing nil response in case of connection timeout --- lib/kafka_ex/server.ex | 8 ++---- lib/kafka_ex/server_0_p_8_p_2.ex | 45 +++++++++----------------------- 2 files changed, 15 insertions(+), 38 deletions(-) diff --git a/lib/kafka_ex/server.ex b/lib/kafka_ex/server.ex index 444d79fc..b40a9af3 100644 --- a/lib/kafka_ex/server.ex +++ b/lib/kafka_ex/server.ex @@ -486,12 +486,8 @@ defmodule KafkaEx.Server do |> client_request(updated_state) |> module.create_request - response = broker - |> NetworkClient.send_sync_request( - wire_request, - config_sync_timeout() - ) - |> module.parse_response + response = NetworkClient.send_sync_request(broker, wire_request, config_sync_timeout()) + response = if response != nil, do: module.parse_response(response), else: nil state_out = State.increment_correlation_id(updated_state) diff --git a/lib/kafka_ex/server_0_p_8_p_2.ex b/lib/kafka_ex/server_0_p_8_p_2.ex index d10bbbe4..bc8b61b1 100644 --- a/lib/kafka_ex/server_0_p_8_p_2.ex +++ b/lib/kafka_ex/server_0_p_8_p_2.ex @@ -102,9 +102,8 @@ defmodule KafkaEx.Server0P8P2 do Logger.log(:error, "Coordinator for topic #{offset_fetch.topic} is not available") {:topic_not_found, state} _ -> - response = broker - |> NetworkClient.send_sync_request(offset_fetch_request, config_sync_timeout()) - |> OffsetFetch.parse_response + response = NetworkClient.send_sync_request(broker, offset_fetch_request, config_sync_timeout()) + response = if response != nil, do: OffsetFetch.parse_response(response), else: nil {response, %{state | correlation_id: state.correlation_id + 1}} end @@ -155,40 +154,22 @@ defmodule KafkaEx.Server0P8P2 do end end - defp fetch(fetch_request, state) do - true = consumer_group_if_auto_commit?(fetch_request.auto_commit, state) - fetch_data = Fetch.create_request(%FetchRequest{ - fetch_request | - client_id: @client_id, - correlation_id: state.correlation_id, - }) - {broker, state} = case MetadataResponse.broker_for_topic(state.metadata, state.brokers, fetch_request.topic, fetch_request.partition) do - nil -> - updated_state = update_metadata(state) - {MetadataResponse.broker_for_topic(updated_state.metadata, updated_state.brokers, fetch_request.topic, fetch_request.partition), updated_state} - broker -> {broker, state} - end - - case broker do - nil -> - Logger.log(:error, "Leader for topic #{fetch_request.topic} is not available") - {:topic_not_found, state} - _ -> - response = broker - |> NetworkClient.send_sync_request(fetch_data, config_sync_timeout()) - |> Fetch.parse_response - state = %{state | correlation_id: state.correlation_id + 1} + defp fetch(request, state) do + true = consumer_group_if_auto_commit?(request.auto_commit, state) + case network_request(request, Fetch, state) do + {{:error, error}, state_out} -> {error, state_out} + {response, state_out} -> last_offset = response |> hd |> Map.get(:partitions) |> hd |> Map.get(:last_offset) - if last_offset != nil && fetch_request.auto_commit do + if last_offset != nil && request.auto_commit do offset_commit_request = %OffsetCommit.Request{ - topic: fetch_request.topic, + topic: request.topic, offset: last_offset, - partition: fetch_request.partition, - consumer_group: state.consumer_group} - {_, state} = offset_commit(state, offset_commit_request) + partition: request.partition, + consumer_group: state_out.consumer_group} + {_, state} = offset_commit(state_out, offset_commit_request) {response, state} else - {response, state} + {response, state_out} end end end From d4562f3a151148571d6b783238e0db4cc0ef9464 Mon Sep 17 00:00:00 2001 From: ashwini Date: Wed, 14 Feb 2018 14:20:03 +0900 Subject: [PATCH 2/3] Fix code style voilations --- lib/kafka_ex/server.ex | 9 +++++++-- lib/kafka_ex/server_0_p_8_p_2.ex | 9 +++++++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/lib/kafka_ex/server.ex b/lib/kafka_ex/server.ex index b40a9af3..c2faae38 100644 --- a/lib/kafka_ex/server.ex +++ b/lib/kafka_ex/server.ex @@ -486,8 +486,13 @@ defmodule KafkaEx.Server do |> client_request(updated_state) |> module.create_request - response = NetworkClient.send_sync_request(broker, wire_request, config_sync_timeout()) - response = if response != nil, do: module.parse_response(response), else: nil + response = NetworkClient.send_sync_request( + broker, + wire_request, + config_sync_timeout()) + response = if response != nil, + do: module.parse_response(response), + else: nil state_out = State.increment_correlation_id(updated_state) diff --git a/lib/kafka_ex/server_0_p_8_p_2.ex b/lib/kafka_ex/server_0_p_8_p_2.ex index bc8b61b1..d3b02821 100644 --- a/lib/kafka_ex/server_0_p_8_p_2.ex +++ b/lib/kafka_ex/server_0_p_8_p_2.ex @@ -102,8 +102,13 @@ defmodule KafkaEx.Server0P8P2 do Logger.log(:error, "Coordinator for topic #{offset_fetch.topic} is not available") {:topic_not_found, state} _ -> - response = NetworkClient.send_sync_request(broker, offset_fetch_request, config_sync_timeout()) - response = if response != nil, do: OffsetFetch.parse_response(response), else: nil + response = NetworkClient.send_sync_request( + broker, + offset_fetch_request, + config_sync_timeout()) + response = if response != nil, + do: OffsetFetch.parse_response(response), + else: nil {response, %{state | correlation_id: state.correlation_id + 1}} end From 48803f7f74f5360c5bea8733afbd0cbb6c1bf397 Mon Sep 17 00:00:00 2001 From: ashwini Date: Wed, 14 Feb 2018 14:26:35 +0900 Subject: [PATCH 3/3] Fix credo issues, unused aliases --- lib/kafka_ex/server_0_p_8_p_2.ex | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/kafka_ex/server_0_p_8_p_2.ex b/lib/kafka_ex/server_0_p_8_p_2.ex index d3b02821..73479920 100644 --- a/lib/kafka_ex/server_0_p_8_p_2.ex +++ b/lib/kafka_ex/server_0_p_8_p_2.ex @@ -17,9 +17,7 @@ defmodule KafkaEx.Server0P8P2 do alias KafkaEx.Protocol.ConsumerMetadata alias KafkaEx.Protocol.ConsumerMetadata.Response, as: ConsumerMetadataResponse alias KafkaEx.Protocol.Fetch - alias KafkaEx.Protocol.Fetch.Request, as: FetchRequest alias KafkaEx.Protocol.Metadata.Broker - alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse alias KafkaEx.Protocol.OffsetFetch alias KafkaEx.Protocol.OffsetCommit alias KafkaEx.Server.State