Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When a connection timeout occured, fetch/2 couldn't handle the exception #233

Closed
tzzzoz opened this issue Sep 22, 2017 · 5 comments
Closed

Comments

@tzzzoz
Copy link

tzzzoz commented Sep 22, 2017

Stacktrace

10:06:25.292 [error] Receiving data from broker "xxx.xxx.xxx.xxx":19093 failed with :timeout

10:06:25.292 [debug] Shutting down worker #PID<0.1605.0>

10:06:25.297 [error] GenServer #PID<0.1581.0> terminating
** (FunctionClauseError) no function clause matching in KafkaEx.Protocol.Fetch.parse_response/1
    (kafka_ex) lib/kafka_ex/protocol/fetch.ex:55: KafkaEx.Protocol.Fetch.parse_response(nil)
    (kafka_ex) lib/kafka_ex/server_0_p_8_p_2.ex:170: KafkaEx.Server0P8P2.fetch/2
    (kafka_ex) lib/kafka_ex/server_0_p_8_p_2.ex:77: KafkaEx.Server0P8P2.kafka_server_fetch/2
    (stdlib) gen_server.erl:615: :gen_server.try_handle_call/4
    (stdlib) gen_server.erl:647: :gen_server.handle_msg/5
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:fetch, %KafkaEx.Protocol.Fetch.Request{auto_commit: false, client_id: nil, correlation_id: nil, max_bytes: 1000000, min_bytes: 1, offset: 2, partition: 8, topic: "raw_token_update_requests", wait_time: 10}}

A timeout occured while connecting to kafka brokers, fetch/2 could not handle the nil response.

@dantswain
Copy link
Collaborator

Thanks, @tzzzoz . I've seen this a few times as well. It is definitely a problem, and this extra context helps!

@MiLk
Copy link
Contributor

MiLk commented Nov 24, 2017

We always encounter this issue.
Here are the logs:

[error] GenServer #PID<0.594.0> terminating
** (FunctionClauseError) no function clause matching in KafkaEx.Protocol.Fetch.parse_response/1
    (kafka_ex) lib/kafka_ex/protocol/fetch.ex:55: KafkaEx.Protocol.Fetch.parse_response(nil)
    (kafka_ex) lib/kafka_ex/server_0_p_8_p_2.ex:170: KafkaEx.Server0P8P2.fetch/2
    (kafka_ex) lib/kafka_ex/server_0_p_8_p_2.ex:77: KafkaEx.Server0P8P2.kafka_server_fetch/2
    (stdlib) gen_server.erl:636: :gen_server.try_handle_call/4
    (stdlib) gen_server.erl:665: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message (from #PID<0.593.0>): {:fetch, %KafkaEx.Protocol.Fetch.Request{auto_commit: false, client_id: nil, correlation_id: nil, max_bytes: 1000000, min_bytes: 1, offset: 4, partition: 3, topic: "dev.ts.data.reservations", wait_time: 10}}
State: %KafkaEx.Server.State{brokers: [%KafkaEx.Protocol.Metadata.Broker{host: "localhost", node_id: 1, port: 9092, socket: %KafkaEx.Socket{socket: #Port<0.12163>, ssl: false}}, %KafkaEx.Protocol.Metadata.Broker{host: "localhost", node_id: -1, port: 9092, socket: %KafkaEx.Socket{socket: #Port<0.12125>, ssl: false}}], consumer_group: "nexus_kafka_ex", consumer_group_update_interval: 30000, consumer_metadata: %KafkaEx.Protocol.ConsumerMetadata.Response{coordinator_host: "localhost", coordinator_id: 1, coordinator_port: 9092, error_code: :no_error}, correlation_id: 1180, event_pid: nil, metadata: %KafkaEx.Protocol.Metadata.Response{brokers: [%KafkaEx.Protocol.Metadata.Broker{host: "localhost", node_id: 1, port: 9092, socket: nil}], topic_metadatas: [%KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error, partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 0, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 6, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 33, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 24, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 42, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 15, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 39, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 30, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 3, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 12, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 48, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 21, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 18, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 9, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 27, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 36, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 45, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 28, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 19, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 1, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 37, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 10, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 34, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 25, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 43, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 7, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 16, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 31, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 22, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 13, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 4, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 40, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 49, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 46, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 14, ...}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, ...}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [...], ...}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, ...}, %KafkaEx.Protocol.Metadata.PartitionMetadata{...}, ...], topic: "__consumer_offsets"}, %KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error, partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 0, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 3, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 1, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 4, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 2, replicas: [1]}], topic: "dev.ts.data.reservations"}]}, metadata_update_interval: 30000, ssl_options: [], use_ssl: false, worker_name: #PID<0.594.0>}
Client #PID<0.593.0> is alive
    (stdlib) gen.erl:169: :gen.do_call/4
    (elixir) lib/gen_server.ex:771: GenServer.call/3
    (kafka_ex) lib/kafka_ex/gen_consumer.ex:481: KafkaEx.GenConsumer.consume/1
    (kafka_ex) lib/kafka_ex/gen_consumer.ex:453: KafkaEx.GenConsumer.handle_info/2
    (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3

[error] GenServer #PID<0.588.0> terminating
** (FunctionClauseError) no function clause matching in KafkaEx.Protocol.Fetch.parse_response/1
    (kafka_ex) lib/kafka_ex/protocol/fetch.ex:55: KafkaEx.Protocol.Fetch.parse_response(nil)
    (kafka_ex) lib/kafka_ex/server_0_p_8_p_2.ex:170: KafkaEx.Server0P8P2.fetch/2
    (kafka_ex) lib/kafka_ex/server_0_p_8_p_2.ex:77: KafkaEx.Server0P8P2.kafka_server_fetch/2
    (stdlib) gen_server.erl:636: :gen_server.try_handle_call/4
    (stdlib) gen_server.erl:665: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message (from #PID<0.587.0>): {:fetch, %KafkaEx.Protocol.Fetch.Request{auto_commit: false, client_id: nil, correlation_id: nil, max_bytes: 1000000, min_bytes: 1, offset: 0, partition: 2, topic: "dev.ts.data.reservations", wait_time: 10}}
State: %KafkaEx.Server.State{brokers: [%KafkaEx.Protocol.Metadata.Broker{host: "localhost", node_id: 1, port: 9092, socket: %KafkaEx.Socket{socket: #Port<0.12001>, ssl: false}}, %KafkaEx.Protocol.Metadata.Broker{host: "localhost", node_id: -1, port: 9092, socket: %KafkaEx.Socket{socket: #Port<0.12000>, ssl: false}}], consumer_group: "nexus_kafka_ex", consumer_group_update_interval: 30000, consumer_metadata: %KafkaEx.Protocol.ConsumerMetadata.Response{coordinator_host: "localhost", coordinator_id: 1, coordinator_port: 9092, error_code: :no_error}, correlation_id: 1160, event_pid: nil, metadata: %KafkaEx.Protocol.Metadata.Response{brokers: [%KafkaEx.Protocol.Metadata.Broker{host: "localhost", node_id: 1, port: 9092, socket: nil}], topic_metadatas: [%KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error, partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 0, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 6, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 33, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 24, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 42, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 15, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 39, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 30, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 3, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 12, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 48, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 21, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 18, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 9, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 27, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 36, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 45, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 28, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 19, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 1, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 37, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 10, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 34, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 25, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 43, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 7, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 16, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 31, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 22, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 13, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 4, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 40, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 49, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 46, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 14, ...}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, ...}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [...], ...}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, ...}, %KafkaEx.Protocol.Metadata.PartitionMetadata{...}, ...], topic: "__consumer_offsets"}, %KafkaEx.Protocol.Metadata.TopicMetadata{error_code: :no_error, partition_metadatas: [%KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 0, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 3, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 1, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 4, replicas: [1]}, %KafkaEx.Protocol.Metadata.PartitionMetadata{error_code: :no_error, isrs: [1], leader: 1, partition_id: 2, replicas: [1]}], topic: "dev.ts.data.reservations"}]}, metadata_update_interval: 30000, ssl_options: [], use_ssl: false, worker_name: #PID<0.588.0>}
Client #PID<0.587.0> is alive
    (stdlib) gen.erl:169: :gen.do_call/4
    (elixir) lib/gen_server.ex:771: GenServer.call/3
    (kafka_ex) lib/kafka_ex/gen_consumer.ex:481: KafkaEx.GenConsumer.consume/1
    (kafka_ex) lib/kafka_ex/gen_consumer.ex:453: KafkaEx.GenConsumer.handle_info/2
    (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3

We are using Kafka 0.11.0.1 from https://github.com/confluentinc/cp-docker-images
Our topic has 5 partitions, and we are using a ConsumerGroup.

While in our setting, we have:

config :kafka_ex,
  kafka_version: "0.11.0"

I noticed it's using Server0P8P2, I don't know if we are missing something, if it's normal or if there is a bug.

@bjhaid
Copy link
Member

bjhaid commented Nov 24, 2017

This is a known problem, see #235

@MiLk you are not doing anything wrong seeing Server0P8P2, the implementation for fetch is same across 0.8.2 and 0.9.0 and probably same up to 0.11.0, see:

defdelegate kafka_server_fetch(fetch_request, state), to: Server0P8P2

@MiLk
Copy link
Contributor

MiLk commented Nov 24, 2017

Ah cool, thanks then.

@bjhaid
Copy link
Member

bjhaid commented Nov 24, 2017

From the comments on the linked PR, the correct fix isn't also complicated so we'll appreciate if someone can send in a PR to fix this problem. See #235 (comment) specifically

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants