diff --git a/lib/kafka_ex/server.ex b/lib/kafka_ex/server.ex index 444d79fc..c2faae38 100644 --- a/lib/kafka_ex/server.ex +++ b/lib/kafka_ex/server.ex @@ -486,12 +486,13 @@ defmodule KafkaEx.Server do |> client_request(updated_state) |> module.create_request - response = broker - |> NetworkClient.send_sync_request( + response = NetworkClient.send_sync_request( + broker, wire_request, - config_sync_timeout() - ) - |> module.parse_response + config_sync_timeout()) + response = if response != nil, + do: module.parse_response(response), + else: nil state_out = State.increment_correlation_id(updated_state) diff --git a/lib/kafka_ex/server_0_p_8_p_2.ex b/lib/kafka_ex/server_0_p_8_p_2.ex index d10bbbe4..73479920 100644 --- a/lib/kafka_ex/server_0_p_8_p_2.ex +++ b/lib/kafka_ex/server_0_p_8_p_2.ex @@ -17,9 +17,7 @@ defmodule KafkaEx.Server0P8P2 do alias KafkaEx.Protocol.ConsumerMetadata alias KafkaEx.Protocol.ConsumerMetadata.Response, as: ConsumerMetadataResponse alias KafkaEx.Protocol.Fetch - alias KafkaEx.Protocol.Fetch.Request, as: FetchRequest alias KafkaEx.Protocol.Metadata.Broker - alias KafkaEx.Protocol.Metadata.Response, as: MetadataResponse alias KafkaEx.Protocol.OffsetFetch alias KafkaEx.Protocol.OffsetCommit alias KafkaEx.Server.State @@ -102,9 +100,13 @@ defmodule KafkaEx.Server0P8P2 do Logger.log(:error, "Coordinator for topic #{offset_fetch.topic} is not available") {:topic_not_found, state} _ -> - response = broker - |> NetworkClient.send_sync_request(offset_fetch_request, config_sync_timeout()) - |> OffsetFetch.parse_response + response = NetworkClient.send_sync_request( + broker, + offset_fetch_request, + config_sync_timeout()) + response = if response != nil, + do: OffsetFetch.parse_response(response), + else: nil {response, %{state | correlation_id: state.correlation_id + 1}} end @@ -155,40 +157,22 @@ defmodule KafkaEx.Server0P8P2 do end end - defp fetch(fetch_request, state) do - true = consumer_group_if_auto_commit?(fetch_request.auto_commit, state) - fetch_data = Fetch.create_request(%FetchRequest{ - fetch_request | - client_id: @client_id, - correlation_id: state.correlation_id, - }) - {broker, state} = case MetadataResponse.broker_for_topic(state.metadata, state.brokers, fetch_request.topic, fetch_request.partition) do - nil -> - updated_state = update_metadata(state) - {MetadataResponse.broker_for_topic(updated_state.metadata, updated_state.brokers, fetch_request.topic, fetch_request.partition), updated_state} - broker -> {broker, state} - end - - case broker do - nil -> - Logger.log(:error, "Leader for topic #{fetch_request.topic} is not available") - {:topic_not_found, state} - _ -> - response = broker - |> NetworkClient.send_sync_request(fetch_data, config_sync_timeout()) - |> Fetch.parse_response - state = %{state | correlation_id: state.correlation_id + 1} + defp fetch(request, state) do + true = consumer_group_if_auto_commit?(request.auto_commit, state) + case network_request(request, Fetch, state) do + {{:error, error}, state_out} -> {error, state_out} + {response, state_out} -> last_offset = response |> hd |> Map.get(:partitions) |> hd |> Map.get(:last_offset) - if last_offset != nil && fetch_request.auto_commit do + if last_offset != nil && request.auto_commit do offset_commit_request = %OffsetCommit.Request{ - topic: fetch_request.topic, + topic: request.topic, offset: last_offset, - partition: fetch_request.partition, - consumer_group: state.consumer_group} - {_, state} = offset_commit(state, offset_commit_request) + partition: request.partition, + consumer_group: state_out.consumer_group} + {_, state} = offset_commit(state_out, offset_commit_request) {response, state} else - {response, state} + {response, state_out} end end end