Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Group Coordination over distributed nodes #379

Open
rocveralfre opened this issue Apr 1, 2020 · 34 comments
Open

Group Coordination over distributed nodes #379

rocveralfre opened this issue Apr 1, 2020 · 34 comments

Comments

@rocveralfre
Copy link

rocveralfre commented Apr 1, 2020

Hello,
We're using this project for a long time. The configuration is the following:

We have one Erlang node running a brod_client and a brod_group_subscriber (using the start_link_group_subscriber) with the following parameters:

brod version 3.10.0

group_config:

[
{rejoin_delay_seconds, crypto:rand_uniform(5, 360)},
{offset_commit_policy, commit_to_kafka_v2},
{offset_commit_interval_seconds, 5}
]

consumer_config:

[
{begin_offset, latest},
{offset_reset_policy, reset_to_earliest},
{max_wait_time, 5000},
{sleep_timeout, 3000}
]

We have just one topic with 4 partitions.

When we just have one node running, everything works correctly and the group_coordinator assigns all the partition to the single consumer.

However, when we start another node with the very same configuration, the two coordinators start looping over and try to acquire all partitions exclusively:

re-joining group, reason:unknown_member_id
[08:54:48.786I||<0.1555.0>] Group member (socrates-groupid-shared-by-all-members,coor=<0.1555.0>,cb=<0.1554.0>,generation=39437):
elected=true
[08:54:48.789I||<0.1555.0>] Group member (socrates-groupid-shared-by-all-members,coor=<0.1555.0>,cb=<0.1554.0>,generation=39437):
assignments received:
  socrates:
    partition=0 begin_offset=9268
    partition=1 begin_offset=8667
    partition=2 begin_offset=8663
    partition=3 begin_offset=2326257
[08:54:53.784I||<0.1555.0>] Group member (socrates-groupid-shared-by-all-members,coor=<0.1555.0>,cb=<0.1554.0>,generation=39437):
re-joining group, reason:unknown_member_id
[08:54:53.786I||<0.1555.0>] Group member (socrates-groupid-shared-by-all-members,coor=<0.1555.0>,cb=<0.1554.0>,generation=39439):
elected=true
[08:54:53.789I||<0.1555.0>] Group member (socrates-groupid-shared-by-all-members,coor=<0.1555.0>,cb=<0.1554.0>,generation=39439):
assignments received:
  socrates:
    partition=0 begin_offset=9268
    partition=1 begin_offset=8667
    partition=2 begin_offset=8663
    partition=3 begin_offset=2326257

It's worth to say that each node's clientId is different, while the groupId is the same for all the mnesia distributed nodes.

We also noticed, however, that if we stop and restart ALL our custom applications with an rpc call, e.g.

rpc:multicall(application, stop, [...]).
...
rpc:multicall(application, start, [...]).

Then the coordinators are able to reach a shared stable state where partitions are correctly distributed.

Are we misconfiguring the group_subscriber? Is our initial assumption (if a new member of the group joins all the members try to stabilize once more) incorrect?

This is really important for our project and we would be happy to hear your feedback.

Thanks in advance

@zmstone
Copy link
Contributor

zmstone commented Apr 1, 2020

Hi. I noticed in your logs that the generation number jumped from 39437 to 39439
It could be that generation 39438 had the other node elected as a leader but they keep failing to see the other member in the group (i.e. it's highly unlikely that there were two leaders elected in the same generation).

crypto:rand_uniform(5, 360) this might be the issue.
it should be set to (a lot) lower than the session timeout.

@rocveralfre
Copy link
Author

rocveralfre commented Apr 1, 2020

Hello, thanks for the overview. I removed the option at all, so that we use the default values.

We're still looping however, here is the log of the two nodes racing each others:
Node 1:

[12:49:32.206I||<0.1560.0>] Group member (socrates-groupid-shared-by-all-members,coor=<0.1560.0>,cb=<0.1559.0>,generation=40408):
re-joining group, reason:unknown_member_id
[12:49:32.209I||<0.1560.0>] Group member (socrates-groupid-shared-by-all-members,coor=<0.1560.0>,cb=<0.1559.0>,generation=40410):
elected=true
[12:49:32.211I||<0.1560.0>] Group member (socrates-groupid-shared-by-all-members,coor=<0.1560.0>,cb=<0.1559.0>,generation=40410):
assignments received:
  socrates:
    partition=0 begin_offset=9268
    partition=1 begin_offset=8667
    partition=2 begin_offset=8663
    partition=3 begin_offset=2326257
[12:49:37.207I||<0.1560.0>] Group member (socrates-groupid-shared-by-all-members,coor=<0.1560.0>,cb=<0.1559.0>,generation=40410):
re-joining group, reason:unknown_member_id
[12:49:37.210I||<0.1560.0>] Group member (socrates-groupid-shared-by-all-members,coor=<0.1560.0>,cb=<0.1559.0>,generation=40412):
elected=true
[12:49:37.213I||<0.1560.0>] Group member (socrates-groupid-shared-by-all-members,coor=<0.1560.0>,cb=<0.1559.0>,generation=40412):
assignments received:
  socrates:
    partition=0 begin_offset=9268
    partition=1 begin_offset=8667
    partition=2 begin_offset=8663
    partition=3 begin_offset=2326257

Node 2:

[12:49:33.716I||<0.1633.0>] Group member (socrates-groupid-shared-by-all-members,coor=<0.1633.0>,cb=<0.1632.0>,generation=40411):
assignments received:
  socrates:
    partition=0 begin_offset=9268
    partition=1 begin_offset=8667
    partition=2 begin_offset=8663
    partition=3 begin_offset=2326257
[12:49:38.681I||<0.1633.0>] Group member (socrates-groupid-shared-by-all-members,coor=<0.1633.0>,cb=<0.1632.0>,generation=40411):
re-joining group, reason:unknown_member_id
[12:49:38.683I||<0.1633.0>] Group member (socrates-groupid-shared-by-all-members,coor=<0.1633.0>,cb=<0.1632.0>,generation=40413):
elected=true
[12:49:38.686I||<0.1633.0>] Group member (socrates-groupid-shared-by-all-members,coor=<0.1633.0>,cb=<0.1632.0>,generation=40413):
assignments received:
  socrates:
    partition=0 begin_offset=9268
    partition=1 begin_offset=8667
    partition=2 begin_offset=8663
    partition=3 begin_offset=2326257
[12:49:43.681I||<0.1633.0>] Group member (socrates-groupid-shared-by-all-members,coor=<0.1633.0>,cb=<0.1632.0>,generation=40413):
re-joining group, reason:unknown_member_id
[12:49:43.684I||<0.1633.0>] Group member (socrates-groupid-shared-by-all-members,coor=<0.1633.0>,cb=<0.1632.0>,generation=40415):
elected=true
[12:49:43.687I||<0.1633.0>] Group member (socrates-groupid-shared-by-all-members,coor=<0.1633.0>,cb=<0.1632.0>,generation=40415):
assignments received:
  socrates:
    partition=0 begin_offset=9268
    partition=1 begin_offset=8667
    partition=2 begin_offset=8663
    partition=3 begin_offset=2326257

Should I raise, instead, the default session_timeout? (currently set to default 10s)

@zmstone
Copy link
Contributor

zmstone commented Apr 1, 2020

yes, try with a larger session timeout.

@rocveralfre
Copy link
Author

I tried with

{session_timeout_seconds, 20},

In the group_config, however, we're still racing. By the way, why is re-joining performed on 5 seconds basis?

[12:49:33.716I||<0.1633.0>] Group member 
[12:49:38.686I||<0.1633.0>] Group member

@zmstone
Copy link
Contributor

zmstone commented Apr 1, 2020

ah, the sync group request has a hard coded 5 seconds limit on waiting for response.
if it takes longer than 5 seconds for the other member to rejoin (typically revoke old assignments)
the waiting member will give up waiting.

it should be fixed to use session timeout for sync-request timeout.
it can happen more often when subscriber is blocked processing large amount of messages
i.e. not responsive to the rebalance callbacks.

before we can fix it, you can try async processing in group subscriber
or if it is due to a large batch (not large single message),
try slower down brod_consumer a bit by setting a decreasing max_bytes, prefetch_bytes and prefetch_count in consumer config.

@zmstone
Copy link
Contributor

zmstone commented Apr 1, 2020

this was done only for join group request
should do the same for sync group request

  %% send join group request and wait for response
  %% as long as the session timeout config
  RspBody = send_sync(Connection, Req, SessionTimeout),

@rocveralfre
Copy link
Author

rocveralfre commented Apr 1, 2020

I'm trying the following configuration (I use div function on default values), however no improvement..

[
 {max_rejoin_attempts, 30},
 {session_timeout_seconds, 40},
 {offset_commit_policy, commit_to_kafka_v2},
 {offset_commit_interval_seconds, 5}
],
[
 {max_bytes, 1048576 div 2},
 {prefetch_bytes, 102400 div 2},
 {prefetch_count, 10 div 2},
 {begin_offset, latest},
 {offset_reset_policy, reset_to_earliest},
 {max_wait_time, 5000},
 {sleep_timeout, 3000}
],

@zmstone
Copy link
Contributor

zmstone commented Apr 1, 2020

try my branch in the PR ?

@zmstone
Copy link
Contributor

zmstone commented Apr 1, 2020

it very much depends on how big your messages and message sets are.
and how long it takes for the subscriber to complete one batch
currently if it takes longer than 5 seconds, the group will not be able to reach balance.

@rocveralfre
Copy link
Author

I am trying to deploy your patch, I will let you know in some minutes.

@rocveralfre
Copy link
Author

Unfortunately, even with your last patch, we're still looping..

@zmstone
Copy link
Contributor

zmstone commented Apr 1, 2020

anything different in logs ?

@rocveralfre
Copy link
Author

Unfortunately not at all, we're still looping each 5 seconds with the very same log as before.

Also, as a side note, we tried to flush all the kafka queue before starting the second node this time and we're not producing any more messages in the topic.

@rocveralfre
Copy link
Author

rocveralfre commented Apr 1, 2020

For completeness' sake:
We start both the brod_client and the brod_group_subscriber as children of our erlang application, these are the children specifications for the two:

#{
      id => socrates_brod_client,
      modules => [?MODULE],
      restart => permanent,
      shutdown => 1000,
      start => {brod_client, start_link, [[
        {"broker.kafka.l4lb.thisdcos.directory", 9092}
      ], ClientID, []]}
    },
#{
        id => socrates_brod_group,
	modules => [socrates_multiplex],
	restart => temporary,
	shutdown => 1000,
	type => worker,
	start => {brod, start_link_group_subscriber, [
		ClientID,
		GroupId,
		[TOPIC],
		[
			{max_rejoin_attempts, 30},
			{session_timeout_seconds, 40},
			{offset_commit_policy, commit_to_kafka_v2},
			{offset_commit_interval_seconds, 5}
		],
		[
			{max_bytes, 1048576 div 2},
			{prefetch_bytes, 102400 div 2},
			{prefetch_count, 10 div 2},
			{begin_offset, latest},
			{offset_reset_policy, reset_to_earliest},
			{max_wait_time, 5000},
			{sleep_timeout, 3000}
		],
		message,
		socrates_multiplex,
		#{}
	]}
}

where

TOPIC = <<"socrates">>,
GroupId = <<TOPIC/binary, "-groupid-shared-by-all-members">>,
ClientID = list_to_atom(io_lib:format("socrates_brod_client~s", [node()])),

So each node shares the same TopicId and GroupId but not the ClientId.

@zmstone
Copy link
Contributor

zmstone commented Apr 1, 2020

Could you try a different offset_commit_interval_seconds ? 5 is the same as heartbeat rate which makes it hard to tell which request triggered the 'unknonw_member_id' error code.

@zmstone
Copy link
Contributor

zmstone commented Apr 1, 2020

what's your kafka version btw.

@rocveralfre
Copy link
Author

rocveralfre commented Apr 1, 2020

We are using Kafka version 2.12-2.3.0. We also tried the following:

  • offset_commit_interval_seconds =:= 10 -> still looping each 5 seconds
  • new empty topic (called "topic1") and starting the clients and subscribers from console -> crashing the clients and also the group_subscribers.

@rocveralfre
Copy link
Author

rocveralfre commented Apr 2, 2020

Hello again I would also ask another question:
Shouldn't the group_coordinator pid crash after N attempts of stabilize when N is max_rejoin_attempts? I ask because by looking at the log the pid of this print:

[08:54:44.395I||<0.23712.0>] Group member (topic1-groupid-shared-by-all-members,coor=<0.23712.0>,cb=<0.23711.0>,generation=9):
assignments received:

is always the same, even if after 5 times it should terminate (and terminate the parent group_subscriber) if I understood correctly.

May this be the cause of the stabilization issues?

@zmstone
Copy link
Contributor

zmstone commented Apr 2, 2020

there is actually a (re)join failure log https://github.com/klarna/brod/blob/3.10.0/src/brod_group_coordinator.erl#L513
If you do not see such log as below, it actually joined the group successfully

failed to join group
reason ...

Given that you have 10 seconds offset commit interval and 5 seconds heartbeat rate (which is the default) and you are seeing unknown_member_id every 5 seconds,
it suggests that kafka considers the session expired hence sent unknown_member_id in hb response.
otherwise it should have been a illegal_generation error to instruct the member to re-join with the old member ID.

I can try to reproduce the issue with kafka 2.3.0, but can't give an ETA now.

In the mean time, you can perhaps try 3 things:

  1. Try a faster heartbeat rate (e.g. 2 seconds).
  2. Double check your config.
    e.g. take the pid from the log coor=<0.1633.0>, and inspect process state in Erlang console.
rr(brod_group_coordinator).
sys:get_state(pid(0,1633,0)).
  1. Check if kafka is logging errors etc.

@rocveralfre
Copy link
Author

Thanks, we will try following your suggestions.

Is there any suggested kafka version you recommend us to use?

@zmstone
Copy link
Contributor

zmstone commented Apr 2, 2020

@k32 knows better which versions are proven working fine with brod_group_coordinator

@k32
Copy link

k32 commented Apr 2, 2020

We've been running fine with all major versions up to 2.2.*

@rocveralfre
Copy link
Author

As you pointed out, it's a version compatibility issue with Kafka version >2.2.*

As soon as we downgraded to the correct version the grouping started behaving as expected. Thanks for all the help.

Should we now rename the issue in "Support Kafka versions > 2.2.*" ?

@zmstone
Copy link
Contributor

zmstone commented Apr 2, 2020

seems to be working fine with 2.4.1 too.

@zmstone
Copy link
Contributor

zmstone commented Apr 2, 2020

2.3.1 is also good.

@zmstone
Copy link
Contributor

zmstone commented Apr 2, 2020

I had a quick look at 2.3.1 release notes https://downloads.apache.org/kafka/2.3.1/RELEASE_NOTES.html
but can't related this issue to any of the released ticket titles.

@rocveralfre could you check if my findings were correct ?
i.e. only 2.3.0 is having this issue.
if that is the case, I do not think it worth the effort investigate more.

@rocveralfre
Copy link
Author

Here is the list of what we tried so far (we cannot test all versions because we're bound to a packet manager that does not offer them all):

Kafka Version Works
2.2.1
2.3.0
2.4.0

@zmstone
Copy link
Contributor

zmstone commented Apr 13, 2020

I did not find such issue with kafka 2.3.1 and 2.4.0
It would be great if someone from klarna can help allocate some time to investigate/fix.
ping @k32 @mikpe @robertoaloi

@k32
Copy link

k32 commented Apr 14, 2020

Sorry for not answering. I stepped down from "acting maintainer" role that I had volunteered for. It's not decided yet who will take over.

@zmstone
Copy link
Contributor

zmstone commented Apr 14, 2020

no worries @k32 , just pinged all members I have interacted with in the last couple of weeks to make sure this issue gets some attention.

@fjuan
Copy link

fjuan commented Apr 14, 2020

We are trying to have an environment were we can reproduce the issue so that we can verify behaviour against different versions of Kafka. Will report back!

@dszoboszlay
Copy link
Contributor

There is one interesting things in the timing of the logged events sent by @rocveralfre:

Timestamp Node Event
12:49:37.207 1 receive unknown_member_id error
12:49:37.210 1 elected as generation 40412 group leader
12:49:38.681 2 receive unknown_member_id error
12:49:38.683 2 elected as generation 40413 group leader

This means the Kafka group coordinator elected node 1 as generation 40412 group leader somewhere between 12:49:37.207 and 12:49:37.210, but by 12:49:38.683 it already decided node 1 is inactive and elected node 2 as generation 40413 group leader instead. This is less than 1.5 seconds, so it's very unlikely that the root cause would be a too low timeout value or that the brod_group_coordinator process on node 1 would go into some blocking call and fail to respond quick enough.

@zmstone
Copy link
Contributor

zmstone commented Apr 16, 2020

@dszoboszlay had the issue very well spotted!
I wish I had this insight in the beginning.

such short leader re-election interval, I can only think of two possible causes:

  1. misconfigured session timeout, (we are using v0 apis so rebalance-timeout = session-timeout)
  2. broker side mis-interpretation of the timeout or some other bug

with regards to issue reproducing:
Our docker based test bed (start by make test-env) can be modified to run kafka-2.3 and kafka-2.4
This was how I did my quick tests by running brod_demo_group_subscriber_koc:bootstrap().
In my tests, it could not reach balance only with kafka 2.3.0,
but I did not try to kill a pid etc to cause a rejoin.

@dszoboszlay
Copy link
Contributor

I managed to reproduce the bug too with brod_demo_group_subscriber_koc and Kafka 2.3.0, but not with 2.3.1 or 2.4.0.

This looks like a Kafka regression in 2.3.0, which is fixed in 2.3.1, see KAFKA-8653.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants