diff --git a/CHANGELOG.md b/CHANGELOG.md index e62b644b2..609b38784 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,29 @@ +# librdkafka v2.6.2 + +librdkafka v2.6.2 is a maintenance release: + +* Fixes to allow to migrate partitions to leaders with same leader epoch, + or NULL leader epoch (#4901). + + +## Fixes + +### Consumer fixes + +* Issues: #4796. + Fix to allow to migrate partitions to leaders with NULL leader epoch. + NULL leader epoch can happen during a cluster roll with an upgrade to a + version supporting KIP-320. + Happening since v2.1.0 (#4901). +* Issues: #4804. + Fix to allow to migrate partitions to leaders with same leader epoch. + Same leader epoch can happen when partition is + temporarily migrated to the internal broker (#4804), or if broker implementation + never bumps it, as it's not needed to validate the offsets. + Happening since v2.4.0 (#4901). + + + # librdkafka v2.6.1 librdkafka v2.6.1 is a maintenance release: diff --git a/src/rdkafka_metadata.c b/src/rdkafka_metadata.c index 26a989c0f..b54337b02 100644 --- a/src/rdkafka_metadata.c +++ b/src/rdkafka_metadata.c @@ -2067,7 +2067,8 @@ rd_kafka_metadata_update_op(rd_kafka_t *rk, rd_kafka_metadata_internal_t *mdi) { .partitions[part] .leader_epoch; - if (current_leader_epoch >= mdpi->leader_epoch) { + if (mdpi->leader_epoch != -1 && + current_leader_epoch > mdpi->leader_epoch) { rd_kafka_broker_destroy(rkb); rd_kafka_dbg( rk, METADATA, "METADATAUPDATE", diff --git a/src/rdkafka_mock.c b/src/rdkafka_mock.c index b28000051..ee0d34ea8 100644 --- a/src/rdkafka_mock.c +++ b/src/rdkafka_mock.c @@ -538,7 +538,7 @@ rd_kafka_mock_committed_offset_find(const rd_kafka_mock_partition_t *mpart, rd_kafka_mock_committed_offset_t * rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart, const rd_kafkap_str_t *group, - int64_t offset, + rd_kafka_fetch_pos_t pos, const rd_kafkap_str_t *metadata) { rd_kafka_mock_committed_offset_t *coff; @@ -561,12 +561,13 @@ rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart, coff->metadata = rd_kafkap_str_copy(metadata); - coff->offset = offset; + coff->pos = pos; rd_kafka_dbg(mpart->topic->cluster->rk, MOCK, "MOCK", - "Topic %s [%" PRId32 "] committing offset %" PRId64 + "Topic %s [%" PRId32 + "] committing offset %s" " for group %.*s", - mpart->topic->name, mpart->id, offset, + mpart->topic->name, mpart->id, rd_kafka_fetch_pos2str(pos), RD_KAFKAP_STR_PR(group)); return coff; diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index 45626b538..2b029fcd0 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -290,13 +290,26 @@ void rd_kafka_mock_Fetch_reply_tags_partition_write( rd_kafka_mock_partition_t *mpart) { switch (tagtype) { case 1: /* CurrentLeader */ + { + int32_t leader_id = mpart->leader->id, + leader_epoch = mpart->leader_epoch; + rd_kafka_mock_partition_leader_t *mpart_leader = + rd_kafka_mock_partition_next_leader_response(mpart); + if (mpart_leader) { + leader_id = mpart_leader->leader_id; + leader_epoch = mpart_leader->leader_epoch; + rd_kafka_mock_partition_leader_destroy(mpart, + mpart_leader); + } + /* Leader id */ - rd_kafka_buf_write_i32(rkbuf, mpart->leader->id); + rd_kafka_buf_write_i32(rkbuf, leader_id); /* Leader epoch */ - rd_kafka_buf_write_i32(rkbuf, mpart->leader_epoch); + rd_kafka_buf_write_i32(rkbuf, leader_epoch); /* Field tags */ rd_kafka_buf_write_tags_empty(rkbuf); break; + } default: break; } @@ -917,12 +930,13 @@ static int rd_kafka_mock_handle_OffsetFetch(rd_kafka_mock_connection_t *mconn, mpart, &GroupId); /* Response: CommittedOffset */ - rd_kafka_buf_write_i64(resp, coff ? coff->offset : -1); + rd_kafka_buf_write_i64(resp, + coff ? coff->pos.offset : -1); if (rkbuf->rkbuf_reqhdr.ApiVersion >= 5) { /* Response: CommittedLeaderEpoch */ rd_kafka_buf_write_i32( - resp, mpart ? mpart->leader_epoch : -1); + resp, coff ? coff->pos.leader_epoch : -1); } /* Response: Metadata */ @@ -939,10 +953,11 @@ static int rd_kafka_mock_handle_OffsetFetch(rd_kafka_mock_connection_t *mconn, rd_kafka_dbg(mcluster->rk, MOCK, "MOCK", "Topic %s [%" PRId32 "] returning " - "committed offset %" PRId64 + "committed offset %s" " for group %s", mtopic->name, mpart->id, - coff->offset, coff->group); + rd_kafka_fetch_pos2str(coff->pos), + coff->group); else rd_kafka_dbg(mcluster->rk, MOCK, "MOCK", "Topic %.*s [%" PRId32 @@ -1070,6 +1085,7 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn, rd_kafka_mock_partition_t *mpart = NULL; rd_kafka_resp_err_t err = all_err; int64_t CommittedOffset; + int32_t CommittedLeaderEpoch = -1; rd_kafkap_str_t Metadata; rd_kafka_buf_read_i32(rkbuf, &Partition); @@ -1087,7 +1103,6 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_read_i64(rkbuf, &CommittedOffset); if (rkbuf->rkbuf_reqhdr.ApiVersion >= 6) { - int32_t CommittedLeaderEpoch; rd_kafka_buf_read_i32(rkbuf, &CommittedLeaderEpoch); @@ -1106,9 +1121,11 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_skip_tags(rkbuf); if (!err) - rd_kafka_mock_commit_offset(mpart, &GroupId, - CommittedOffset, - &Metadata); + rd_kafka_mock_commit_offset( + mpart, &GroupId, + RD_KAFKA_FETCH_POS(CommittedOffset, + CommittedLeaderEpoch), + &Metadata); /* Response: ErrorCode */ rd_kafka_buf_write_i16(resp, err); diff --git a/src/rdkafka_mock_int.h b/src/rdkafka_mock_int.h index 4ea6df2a5..b91d0ca18 100644 --- a/src/rdkafka_mock_int.h +++ b/src/rdkafka_mock_int.h @@ -231,7 +231,7 @@ typedef struct rd_kafka_mock_committed_offset_s { /**< mpart.committed_offsets */ TAILQ_ENTRY(rd_kafka_mock_committed_offset_s) link; char *group; /**< Allocated along with the struct */ - int64_t offset; /**< Committed offset */ + rd_kafka_fetch_pos_t pos; /**< Committed position */ rd_kafkap_str_t *metadata; /**< Metadata, allocated separately */ } rd_kafka_mock_committed_offset_t; @@ -481,7 +481,7 @@ rd_kafka_mock_committed_offset_find(const rd_kafka_mock_partition_t *mpart, rd_kafka_mock_committed_offset_t * rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart, const rd_kafkap_str_t *group, - int64_t offset, + rd_kafka_fetch_pos_t pos, const rd_kafkap_str_t *metadata); const rd_kafka_mock_msgset_t * diff --git a/src/rdkafka_offset.c b/src/rdkafka_offset.c index 3da38117a..cf21d60c5 100644 --- a/src/rdkafka_offset.c +++ b/src/rdkafka_offset.c @@ -900,7 +900,21 @@ static void rd_kafka_offset_validate_tmr_cb(rd_kafka_timers_t *rkts, rd_kafka_toppar_t *rktp = arg; rd_kafka_toppar_lock(rktp); - rd_kafka_offset_validate(rktp, "retrying offset validation"); + /* Retry validation only when it's still needed. + * Even if validation can be started in fetch states ACTIVE and + * VALIDATE_EPOCH_WAIT, its retry should be done only + * in fetch state VALIDATE_EPOCH_WAIT. */ + if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) + rd_kafka_offset_validate(rktp, "retrying offset validation"); + else { + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, FETCH, "VALIDATE", + "%.*s [%" PRId32 + "]: skipping offset " + "validation retry in fetch state %s", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, + rd_kafka_fetch_states[rktp->rktp_fetch_state]); + } rd_kafka_toppar_unlock(rktp); } @@ -923,6 +937,9 @@ static void rd_kafka_toppar_handle_OffsetForLeaderEpoch(rd_kafka_t *rk, rd_kafka_topic_partition_t *rktpar; int64_t end_offset; int32_t end_offset_leader_epoch; + rd_kafka_toppar_lock(rktp); + rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_VALIDATING; + rd_kafka_toppar_unlock(rktp); if (err == RD_KAFKA_RESP_ERR__DESTROY) { rd_kafka_toppar_destroy(rktp); /* Drop refcnt */ @@ -1142,12 +1159,10 @@ void rd_kafka_offset_validate(rd_kafka_toppar_t *rktp, const char *fmt, ...) { rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, FETCH, "VALIDATE", "%.*s [%" PRId32 "]: unable to perform offset " - "validation: partition leader not available", + "validation: partition leader not available. " + "Retrying when available", RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), rktp->rktp_partition); - - rd_kafka_toppar_set_fetch_state(rktp, - RD_KAFKA_TOPPAR_FETCH_ACTIVE); return; } @@ -1169,8 +1184,21 @@ void rd_kafka_offset_validate(rd_kafka_toppar_t *rktp, const char *fmt, ...) { return; } + if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_VALIDATING) { + rd_kafka_dbg( + rktp->rktp_rkt->rkt_rk, FETCH, "VALIDATE", + "%.*s [%" PRId32 + "]: skipping offset " + "validation for %s: validation is already ongoing", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, + rd_kafka_fetch_pos2str(rktp->rktp_offset_validation_pos)); + return; + } + rd_kafka_toppar_set_fetch_state( rktp, RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT); + rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_VALIDATING; /* Construct and send OffsetForLeaderEpochRequest */ parts = rd_kafka_topic_partition_list_new(1); diff --git a/src/rdkafka_partition.h b/src/rdkafka_partition.h index 98ff43176..152d3877e 100644 --- a/src/rdkafka_partition.h +++ b/src/rdkafka_partition.h @@ -433,6 +433,8 @@ struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */ #define RD_KAFKA_TOPPAR_F_ASSIGNED \ 0x2000 /**< Toppar is part of the consumer \ * assignment. */ +#define RD_KAFKA_TOPPAR_F_VALIDATING \ + 0x4000 /**< Toppar is currently requesting validation. */ /* * Timers diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index fd3a17536..5dc545f07 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -663,7 +663,8 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt, int32_t leader_epoch) { rd_kafka_toppar_t *rktp; rd_bool_t need_epoch_validation = rd_false; - int r = 0; + rd_bool_t fetching_from_follower; + int r = 0; rktp = rd_kafka_toppar_get(rkt, partition, 0); if (unlikely(!rktp)) { @@ -681,7 +682,7 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt, rd_kafka_toppar_lock(rktp); - if (leader_epoch < rktp->rktp_leader_epoch) { + if (leader_epoch != -1 && leader_epoch < rktp->rktp_leader_epoch) { rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER", "%s [%" PRId32 "]: ignoring outdated metadata update with " @@ -691,68 +692,61 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, leader_epoch, rktp->rktp_leader_epoch); - if (rktp->rktp_fetch_state != - RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) { - rd_kafka_toppar_unlock(rktp); - rd_kafka_toppar_destroy(rktp); /* from get() */ - return 0; - } + rd_kafka_toppar_unlock(rktp); + rd_kafka_toppar_destroy(rktp); /* from get() */ + return 0; } - if (rktp->rktp_leader_epoch == -1 || - leader_epoch > rktp->rktp_leader_epoch) { - rd_bool_t fetching_from_follower; - rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER", - "%s [%" PRId32 "]: leader %" PRId32 - " epoch %" PRId32 " -> leader %" PRId32 - " epoch %" PRId32, - rktp->rktp_rkt->rkt_topic->str, - rktp->rktp_partition, rktp->rktp_leader_id, - rktp->rktp_leader_epoch, leader_id, leader_epoch); - if (leader_epoch > rktp->rktp_leader_epoch) - rktp->rktp_leader_epoch = leader_epoch; + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER", + "%s [%" PRId32 "]: leader %" PRId32 " epoch %" PRId32 + " -> leader %" PRId32 " epoch %" PRId32, + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + rktp->rktp_leader_id, rktp->rktp_leader_epoch, leader_id, + leader_epoch); + + if (leader_epoch > rktp->rktp_leader_epoch || + rktp->rktp_fetch_state == + RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) { + /* Epoch increased and needs to be validated (leader_epoch > -1) + * or we need to complete the validation. */ need_epoch_validation = rd_true; + } + rktp->rktp_leader_epoch = leader_epoch; - fetching_from_follower = - leader != NULL && rktp->rktp_broker != NULL && - rktp->rktp_broker->rkb_source != RD_KAFKA_INTERNAL && - rktp->rktp_broker != leader; - - if (fetching_from_follower && - rktp->rktp_leader_id == leader_id) { - rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER", - "Topic %s [%" PRId32 "]: leader %" PRId32 - " unchanged, " - "not migrating away from preferred " - "replica %" PRId32, - rktp->rktp_rkt->rkt_topic->str, - rktp->rktp_partition, leader_id, - rktp->rktp_broker_id); - r = 0; + fetching_from_follower = + leader != NULL && rktp->rktp_broker != NULL && + rktp->rktp_broker->rkb_source != RD_KAFKA_INTERNAL && + rktp->rktp_broker != leader; - } else { + if (fetching_from_follower && rktp->rktp_leader_id == leader_id) { + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER", + "Topic %s [%" PRId32 "]: leader %" PRId32 + " unchanged, " + "not migrating away from preferred " + "replica %" PRId32, + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, leader_id, + rktp->rktp_broker_id); + r = 0; - if (rktp->rktp_leader_id != leader_id || - rktp->rktp_leader != leader) { - /* Update leader if it has changed */ - rktp->rktp_leader_id = leader_id; - if (rktp->rktp_leader) - rd_kafka_broker_destroy( - rktp->rktp_leader); - if (leader) - rd_kafka_broker_keep(leader); - rktp->rktp_leader = leader; - } + } else { - /* Update handling broker */ - r = rd_kafka_toppar_broker_update( - rktp, leader_id, leader, "leader updated"); + if (rktp->rktp_leader_id != leader_id || + rktp->rktp_leader != leader) { + /* Update leader if it has changed */ + rktp->rktp_leader_id = leader_id; + if (rktp->rktp_leader) + rd_kafka_broker_destroy(rktp->rktp_leader); + if (leader) + rd_kafka_broker_keep(leader); + rktp->rktp_leader = leader; } - } else if (rktp->rktp_fetch_state == - RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) - need_epoch_validation = rd_true; + /* Update handling broker */ + r = rd_kafka_toppar_broker_update(rktp, leader_id, leader, + "leader updated"); + } if (need_epoch_validation) { /* Set offset validation position, diff --git a/tests/0139-offset_validation_mock.c b/tests/0139-offset_validation_mock.c index f6f9271ee..942aacaea 100644 --- a/tests/0139-offset_validation_mock.c +++ b/tests/0139-offset_validation_mock.c @@ -192,14 +192,14 @@ static void do_test_permanent_error_retried(rd_kafka_resp_err_t err) { rktpar = rd_kafka_topic_partition_list_add(rktpars, topic, 0); rktpar->offset = 0; - /* Will validate the offset at start fetching again + /* Will validate the offset and start fetching again * from offset 0. */ rd_kafka_topic_partition_set_leader_epoch(rktpar, 0); rd_kafka_seek_partitions(c1, rktpars, -1); rd_kafka_topic_partition_list_destroy(rktpars); /* Read all messages after seek to zero. - * In case of permanent error instead it reset to latest and + * In case of permanent error, instead, it resets to latest and * gets an EOF. */ test_consumer_poll("MSG_ALL", c1, testid, 0, 0, 5, NULL); @@ -424,6 +424,412 @@ static void do_test_store_offset_without_leader_epoch(void) { SUB_TEST_PASS(); } +static rd_bool_t is_broker_fetch_request(rd_kafka_mock_request_t *request, + void *opaque) { + return rd_kafka_mock_request_id(request) == *(int *)(opaque) && + rd_kafka_mock_request_api_key(request) == RD_KAFKAP_Fetch; +} + +static rd_bool_t +is_offset_for_leader_epoch_request(rd_kafka_mock_request_t *request, + void *opaque) { + return rd_kafka_mock_request_id(request) == *(int *)(opaque) && + rd_kafka_mock_request_api_key(request) == + RD_KAFKAP_OffsetForLeaderEpoch; +} + +static rd_bool_t is_metadata_request(rd_kafka_mock_request_t *request, + void *opaque) { + return rd_kafka_mock_request_api_key(request) == RD_KAFKAP_Metadata; +} + +/** + * @brief A second leader change is triggered after first one switches + * to a leader supporting KIP-320, the second leader either: + * + * - variation 0: doesn't support KIP-320 (leader epoch -1). + * This can happed during a cluster roll for upgrading the cluster. + * See #4796. + * - variation 1: the leader epoch is the same as previous leader. + * This can happen when the broker doesn't need that a validation + * should be performed after a leader change. + * + * In both cases no validation should be performed + * and it should continue fetching messages on the new leader. + */ +static void do_test_leader_change_no_validation(int variation) { + const char *topic = test_mk_topic_name(__FUNCTION__, 1); + const char *c1_groupid = topic; + rd_kafka_t *c1; + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + int msg_cnt = 5; + uint64_t testid = test_id_generate(); + rd_kafka_conf_t *conf; + int i, leader = 1; + size_t matching_requests; + /* No KIP-320 support on second leader change */ + int32_t leader_epoch = -1; + if (variation == 1) { + /* Same leader epoch on second leader change */ + leader_epoch = 2; + } + + SUB_TEST_QUICK("variation: %d", variation); + + mcluster = test_mock_cluster_new(2, &bootstraps); + rd_kafka_mock_topic_create(mcluster, topic, 1, 2); + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + + /* Seed the topic with messages */ + test_produce_msgs_easy_v(topic, testid, 0, 0, msg_cnt, 10, + "bootstrap.servers", bootstraps, + "batch.num.messages", "1", NULL); + + test_conf_init(&conf, NULL, 60); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "enable.auto.commit", "false"); + + c1 = test_create_consumer(c1_groupid, NULL, conf, NULL); + test_consumer_subscribe(c1, topic); + + rd_kafka_mock_start_request_tracking(mcluster); + TEST_SAY("Consume initial messages and join the group, etc.\n"); + test_consumer_poll("MSG_INIT", c1, testid, 0, 0, msg_cnt, NULL); + + TEST_SAY("Wait Fetch request to broker 1\n"); + matching_requests = test_mock_wait_matching_requests( + mcluster, 1, 1000, is_broker_fetch_request, &leader); + TEST_ASSERT_LATER(matching_requests > 0, + "Expected at least one Fetch request to broker 1"); + + /* No validation is performed on first fetch. */ + TEST_SAY("Wait no OffsetForLeaderEpoch request to broker 1\n"); + matching_requests = test_mock_wait_matching_requests( + mcluster, 0, 1000, is_offset_for_leader_epoch_request, &leader); + TEST_ASSERT_LATER(matching_requests == 0, + "Expected no OffsetForLeaderEpoch request" + " to broker 1, got %" PRIusz, + matching_requests); + rd_kafka_mock_stop_request_tracking(mcluster); + + /* The leader will change from 1->2, and the OffsetForLeaderEpoch will + * be sent to broker 2. Leader epoch becomes 1. */ + rd_kafka_mock_start_request_tracking(mcluster); + TEST_SAY("Changing leader to broker 2\n"); + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); + leader = 2; + rd_kafka_poll(c1, 1000); + + TEST_SAY("Wait Fetch request to broker 2\n"); + matching_requests = test_mock_wait_matching_requests( + mcluster, 1, 1000, is_broker_fetch_request, &leader); + TEST_ASSERT_LATER(matching_requests > 0, + "Expected at least one fetch request to broker 2"); + + TEST_SAY("Wait OffsetForLeaderEpoch request to broker 2\n"); + matching_requests = test_mock_wait_matching_requests( + mcluster, 1, 1000, is_offset_for_leader_epoch_request, &leader); + TEST_ASSERT_LATER(matching_requests == 1, + "Expected one OffsetForLeaderEpoch request" + " to broker 2, got %" PRIusz, + matching_requests); + rd_kafka_mock_stop_request_tracking(mcluster); + + /* Reset leader, set leader epoch to `leader_epoch` + * to trigger this special case. */ + TEST_SAY("Changing leader to broker 1\n"); + for (i = 0; i < 5; i++) { + rd_kafka_mock_partition_push_leader_response(mcluster, topic, 0, + 1, leader_epoch); + } + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + leader = 1; + rd_kafka_mock_start_request_tracking(mcluster); + rd_kafka_poll(c1, 1000); + + TEST_SAY("Wait Fetch request to broker 1\n"); + /* 0 is correct here as second parameter as we don't wait to receive + * at least one Fetch request, given in the failure case it'll take more + * than 1s and it's possible a OffsetForLeaderEpoch is received after + * that, because we ran out of overridden leader responses. */ + matching_requests = test_mock_wait_matching_requests( + mcluster, 0, 1000, is_broker_fetch_request, &leader); + TEST_ASSERT_LATER(matching_requests > 0, + "Expected at least one fetch request to broker 1"); + + /* Given same leader epoch, or -1, is returned, + * no validation is performed */ + TEST_SAY("Wait no OffsetForLeaderEpoch request to broker 1\n"); + matching_requests = test_mock_wait_matching_requests( + mcluster, 0, 1000, is_offset_for_leader_epoch_request, &leader); + TEST_ASSERT_LATER(matching_requests == 0, + "Expected no OffsetForLeaderEpoch request" + " to broker 1, got %" PRIusz, + matching_requests); + rd_kafka_mock_stop_request_tracking(mcluster); + + rd_kafka_destroy(c1); + test_mock_cluster_destroy(mcluster); + + TEST_LATER_CHECK(); + SUB_TEST_PASS(); +} + +static int +is_fatal_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, const char *reason) { + /* Ignore UNKNOWN_TOPIC_OR_PART errors. */ + TEST_SAY("is_fatal?: %s: %s\n", rd_kafka_err2str(err), reason); + if (err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION || + err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) + return 0; + return 1; +} + +/** + * @brief Test partition validation when it's temporarily delegated to + * the internal broker. There are two variations: + * + * variation 1: leader epoch bump is simultaneous to the partition + * delegation returning from the internal broker to the + * new leader. + * variation 2: leader epoch bump is triggered immediately by KIP-951 + * and validation fails, later metadata request fails + * and partition is delegated to the internal broker. + * When partition is delegated back to the leader, + * it finds the same leader epoch but validation must + * be completed as state is still VALIDATE_EPOCH_WAIT. + * + * In both cases, fetch must continue with the new leader and + * after the validation is completed. + * + * See #4804. + */ +static void do_test_leader_change_from_internal_broker(int variation) { + const char *topic = test_mk_topic_name(__FUNCTION__, 1); + const char *c1_groupid = topic; + rd_kafka_t *c1; + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + int msg_cnt = 5; + uint64_t testid = test_id_generate(); + rd_kafka_conf_t *conf; + int leader = 1; + size_t matching_requests, expected_offset_for_leader_epoch_requests = 1; + + SUB_TEST_QUICK("variation: %d", variation); + + mcluster = test_mock_cluster_new(2, &bootstraps); + rd_kafka_mock_topic_create(mcluster, topic, 1, 2); + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + + /* Seed the topic with messages */ + test_produce_msgs_easy_v(topic, testid, 0, 0, msg_cnt, 10, + "bootstrap.servers", bootstraps, + "batch.num.messages", "1", NULL); + + test_conf_init(&conf, NULL, 60); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "enable.auto.commit", "false"); + + c1 = test_create_consumer(c1_groupid, NULL, conf, NULL); + test_consumer_subscribe(c1, topic); + test_curr->is_fatal_cb = is_fatal_cb; + + rd_kafka_mock_start_request_tracking(mcluster); + TEST_SAY("Consume initial messages and join the group, etc.\n"); + test_consumer_poll("MSG_INIT", c1, testid, 0, 0, msg_cnt, NULL); + + TEST_SAY("Wait Fetch request to broker 1\n"); + matching_requests = test_mock_wait_matching_requests( + mcluster, 1, 1000, is_broker_fetch_request, &leader); + TEST_ASSERT_LATER(matching_requests > 0, + "Expected at least one Fetch request to broker 1"); + + /* No validation is performed on first fetch. */ + TEST_SAY("Wait no OffsetForLeaderEpoch request to broker 1\n"); + matching_requests = test_mock_wait_matching_requests( + mcluster, 0, 1000, is_offset_for_leader_epoch_request, &leader); + TEST_ASSERT_LATER(matching_requests == 0, + "Expected no OffsetForLeaderEpoch request" + " to broker 1, got %" PRIusz, + matching_requests); + rd_kafka_mock_stop_request_tracking(mcluster); + + /* The leader will change from 1->2, and the OffsetForLeaderEpoch will + * be sent to broker 2. Leader epoch becomes 1. */ + rd_kafka_mock_start_request_tracking(mcluster); + TEST_SAY("Changing leader to broker 2\n"); + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); + leader = 2; + rd_kafka_poll(c1, 1000); + + TEST_SAY("Wait Fetch request to broker 2\n"); + matching_requests = test_mock_wait_matching_requests( + mcluster, 1, 1000, is_broker_fetch_request, &leader); + TEST_ASSERT_LATER(matching_requests > 0, + "Expected at least one fetch request to broker 2"); + + TEST_SAY("Wait OffsetForLeaderEpoch request to broker 2\n"); + matching_requests = test_mock_wait_matching_requests( + mcluster, 1, 1000, is_offset_for_leader_epoch_request, &leader); + TEST_ASSERT_LATER(matching_requests == 1, + "Expected one OffsetForLeaderEpoch request" + " to broker 2, got %" PRIusz, + matching_requests); + rd_kafka_mock_stop_request_tracking(mcluster); + + /* Reset leader, Metadata request fails in between and delegates + * the partition to the internal broker. */ + TEST_SAY("Changing leader to broker 1\n"); + if (variation == 0) { + /* Fail Fetch request too, otherwise KIP-951 mechanism is faster + * than the Metadata request. */ + rd_kafka_mock_push_request_errors( + mcluster, RD_KAFKAP_Fetch, 1, + RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART); + } else if (variation == 1) { + /* First OffsetForLeaderEpoch is triggered by KIP-951, + * it updates leader epoch, then it fails, triggers metadata + * refresh, + * Metadata fails too and partition is delegated to the internal + * broker. + * Validation is retried three times during this period + * and it should fail because we want to see what happens + * next when partition isn't delegated to the internal + * broker anymore. */ + rd_kafka_mock_push_request_errors( + mcluster, RD_KAFKAP_OffsetForLeaderEpoch, 3, + RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, + RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, + RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART); + } + + /* This causes a Metadata request error. */ + rd_kafka_mock_topic_set_error(mcluster, topic, + RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART); + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + leader = 1; + rd_kafka_mock_start_request_tracking(mcluster); + rd_kafka_poll(c1, 1000); + + TEST_SAY( + "Wait a Metadata request that fails and delegates partition to" + " the internal broker.\n"); + matching_requests = test_mock_wait_matching_requests( + mcluster, 1, 1000, is_metadata_request, NULL); + TEST_ASSERT_LATER(matching_requests > 0, + "Expected at least one Metadata request"); + TEST_SAY( + "Reset partition error status." + " Partition is delegated to broker 1.\n"); + rd_kafka_mock_topic_set_error(mcluster, topic, + RD_KAFKA_RESP_ERR_NO_ERROR); + + TEST_SAY("Wait Fetch request to broker 1\n"); + matching_requests = test_mock_wait_matching_requests( + mcluster, 1, 2000, is_broker_fetch_request, &leader); + TEST_ASSERT_LATER(matching_requests > 0, + "Expected at least one fetch request to broker 1"); + + TEST_SAY("Wait OffsetForLeaderEpoch request to broker 1\n"); + if (variation == 1) { + /* There's three OffsetForLeaderEpoch requests more in + * variation 1. See previous comment. */ + expected_offset_for_leader_epoch_requests += 3; + } + matching_requests = test_mock_wait_matching_requests( + mcluster, 1, 1000, is_offset_for_leader_epoch_request, &leader); + TEST_ASSERT_LATER( + matching_requests == expected_offset_for_leader_epoch_requests, + "Expected %" PRIusz + " OffsetForLeaderEpoch request" + " to broker 1, got %" PRIusz, + expected_offset_for_leader_epoch_requests, matching_requests); + rd_kafka_mock_stop_request_tracking(mcluster); + + rd_kafka_destroy(c1); + test_mock_cluster_destroy(mcluster); + + TEST_LATER_CHECK(); + SUB_TEST_PASS(); + test_curr->is_fatal_cb = NULL; +} + +/** + * @brief Test that a committed offset is validated before starting to + * fetch messages. + */ +static void do_test_offset_validation_on_offset_fetch(void) { + rd_kafka_mock_cluster_t *mcluster; + rd_kafka_conf_t *conf; + const char *bootstraps; + const char *topic = test_mk_topic_name(__FUNCTION__, 1); + const char *c1_groupid = topic; + rd_kafka_t *c1; + int msg_count = 5, leader = 2; + uint64_t testid = test_id_generate(); + size_t matching_requests; + + SUB_TEST_QUICK(); + + mcluster = test_mock_cluster_new(3, &bootstraps); + rd_kafka_mock_topic_create(mcluster, topic, 1, 1); + + /* Seed the topic with messages */ + test_produce_msgs_easy_v(topic, testid, 0, 0, msg_count, 10, + "bootstrap.servers", bootstraps, + "batch.num.messages", "1", NULL); + + test_conf_init(&conf, NULL, 60); + + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "topic.metadata.refresh.interval.ms", "5000"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "enable.auto.commit", "false"); + test_conf_set(conf, "enable.auto.offset.store", "true"); + test_conf_set(conf, "enable.partition.eof", "true"); + + c1 = test_create_consumer(c1_groupid, NULL, rd_kafka_conf_dup(conf), + NULL); + test_consumer_subscribe(c1, topic); + + /* 5 messages because of reset to earliest */ + test_consumer_poll("MSG_ALL", c1, testid, 0, 0, 5, NULL); + TEST_CALL_ERR__(rd_kafka_commit(c1, NULL, rd_false)); + rd_kafka_destroy(c1); + + /* Leader changes to 2 */ + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); + + rd_kafka_mock_start_request_tracking(mcluster); + + /* Destroy conf this time */ + c1 = test_create_consumer(c1_groupid, NULL, conf, NULL); + test_consumer_subscribe(c1, topic); + + /* EOF because of it start from committed offset after validation */ + test_consumer_poll("MSG_EOF", c1, testid, 1, 0, 0, NULL); + + /* Ensure offset has been validated */ + matching_requests = test_mock_wait_matching_requests( + mcluster, 1, 1000, is_offset_for_leader_epoch_request, &leader); + TEST_ASSERT_LATER(matching_requests == 1, + "Expected 1" + " OffsetForLeaderEpoch request" + " to broker 1, got %" PRIusz, + matching_requests); + rd_kafka_mock_stop_request_tracking(mcluster); + + rd_kafka_destroy(c1); + + test_mock_cluster_destroy(mcluster); + + TEST_LATER_CHECK(); + SUB_TEST_PASS(); +} int main_0139_offset_validation_mock(int argc, char **argv) { @@ -438,5 +844,13 @@ int main_0139_offset_validation_mock(int argc, char **argv) { do_test_store_offset_without_leader_epoch(); + do_test_leader_change_no_validation(0); + do_test_leader_change_no_validation(1); + + do_test_leader_change_from_internal_broker(0); + do_test_leader_change_from_internal_broker(1); + + do_test_offset_validation_on_offset_fetch(); + return 0; }