-
Notifications
You must be signed in to change notification settings - Fork 162
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NetworkClient should not send nil in case of failure #275
Conversation
lib/kafka_ex/server_0_p_8_p_2.ex
Outdated
response = broker | ||
|> NetworkClient.send_sync_request(offset_commit_request_payload, config_sync_timeout()) | ||
|> OffsetCommit.parse_response | ||
response = case NetworkClient.send_sync_request(broker, offset_commit_request_payload, config_sync_timeout()) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line is too long (max is 80, was 116).
4fee1be
to
5ca5132
Compare
lib/kafka_ex/server.ex
Outdated
@@ -526,7 +532,10 @@ defmodule KafkaEx.Server do | |||
defp first_broker_response(request, brokers, sync_timeout) do | |||
Enum.find_value(brokers, fn(broker) -> | |||
if Broker.connected?(broker) do | |||
NetworkClient.send_sync_request(broker, request, sync_timeout) | |||
case NetworkClient.send_sync_request(broker, request, sync_timeout) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Function body is nested too deep (max depth is 2, was 3).
lib/kafka_ex/server.ex
Outdated
@@ -526,7 +532,10 @@ defmodule KafkaEx.Server do | |||
defp first_broker_response(request, brokers, sync_timeout) do | |||
Enum.find_value(brokers, fn(broker) -> | |||
if Broker.connected?(broker) do | |||
NetworkClient.send_sync_request(broker, request, sync_timeout) | |||
case NetworkClient.send_sync_request(broker, request, sync_timeout) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Function body is nested too deep (max depth is 2, was 3).
lib/kafka_ex/server.ex
Outdated
@@ -304,8 +304,12 @@ defmodule KafkaEx.Server do | |||
0 -> NetworkClient.send_async_request(broker, produce_request_data) | |||
_ -> | |||
response = broker | |||
|> NetworkClient.send_sync_request(produce_request_data, config_sync_timeout()) | |||
|> Produce.parse_response | |||
|> NetworkClient.send_sync_request(produce_request_data, config_sync_timeout()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line is too long (max is 80, was 95).
lib/kafka_ex/server.ex
Outdated
@@ -334,8 +338,12 @@ defmodule KafkaEx.Server do | |||
{:topic_not_found, state} | |||
_ -> | |||
response = broker | |||
|> NetworkClient.send_sync_request(offset_request, config_sync_timeout()) | |||
|> Offset.parse_response | |||
|> NetworkClient.send_sync_request(offset_request, config_sync_timeout()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line is too long (max is 80, was 87).
lib/kafka_ex/server.ex
Outdated
do: module.parse_response(response), | ||
else: nil | ||
response = broker | ||
|> NetworkClient.send_sync_request(wire_request, config_sync_timeout()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line is too long (max is 80, was 85).
lib/kafka_ex/server.ex
Outdated
@@ -526,7 +532,10 @@ defmodule KafkaEx.Server do | |||
defp first_broker_response(request, brokers, sync_timeout) do | |||
Enum.find_value(brokers, fn(broker) -> | |||
if Broker.connected?(broker) do | |||
NetworkClient.send_sync_request(broker, request, sync_timeout) | |||
case NetworkClient.send_sync_request(broker, request, sync_timeout) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line is too long (max is 80, was 82).
5ca5132
to
b4b8a00
Compare
lib/kafka_ex/server_0_p_8_p_2.ex
Outdated
config_sync_timeout()) | ||
|> case do | ||
{:error, reason} -> {:error, reason} | ||
response -> OffsetFetch.parse_response(response) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be OffsetCommit.parse_response
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup. My Bad. I have updated the PR. Thanks.
b4b8a00
to
7e85668
Compare
7e85668
to
38c8633
Compare
Ebert has finished reviewing this Pull Request and has found:
You can see more details about this review at https://ebertapp.io/github/kafkaex/kafka_ex/pulls/275. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me.
@joshuawscott When can we get this merged? |
Going to give the other maintainers a chance to weigh in and then merge tomorrow U.S. time |
Thanks again for the PR @ashwini709 ! |
NetworkClient should not send nil in case of failure
As pointed out by @dantswain Swallowing
nil
in #272 was not a good idea. It leads to further crashes in hereresponse |> hd |> Map.get(:partitions) |> hd |> Map.get(:last_offset)
If we return the

{:error, reason}
in case of network_client'ssend_sync_reuqest
failure, it leads to the genserver crashing but with a valid reason.I would like to fix the issues related to #233 as much as possible.
@joshuawscott @dantswain would appreciate the guidance/suggestions to fix this behaviour. Thank you.
genconsumer's consume/1
handle the result and reply with error?While we are at it, I would like to send
{:ok, response}
in case of success if thats ok.