kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] 01/01: Fix test and minor tweaks
Date Sun, 08 Dec 2019 17:29:48 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch hachikuji/KAFKA-9212
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit cfb88f4f116e0e2932c5dd2e416841824a915257
Author: Ismael Juma <ismael@juma.me.uk>
AuthorDate: Sun Dec 8 09:27:59 2019 -0800

    Fix test and minor tweaks
---
 clients/src/test/java/org/apache/kafka/clients/MetadataTest.java | 7 ++++++-
 core/src/main/scala/kafka/controller/KafkaController.scala       | 8 ++++----
 2 files changed, 10 insertions(+), 5 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index fc51957..7067e88 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -158,6 +158,11 @@ public class MetadataTest {
         assertEquals(0, metadata.timeToNextUpdate(now + 1));
     }
 
+    /**
+     * Prior to Kafka version 2.4 (which coincides with Metadata version 9), the broker does
not propagate leader epoch
+     * information accurately while a reassignment is in progress, so we cannot rely on it.
This is explained in more
+     * detail in MetadataResponse's constructor.
+     */
     @Test
     public void testIgnoreLeaderEpochInOlderMetadataResponse() {
         TopicPartition tp = new TopicPartition("topic", 0);
@@ -196,7 +201,7 @@ public class MetadataTest {
             assertEquals(-1, info.epoch());
         }
 
-        for (short version = 9; version <= ApiKeys.METADATA.oldestVersion(); version++)
{
+        for (short version = 9; version <= ApiKeys.METADATA.latestVersion(); version++)
{
             Struct struct = data.toStruct(version);
             MetadataResponse response = new MetadataResponse(struct, version);
             assertTrue(response.hasReliableLeaderEpochs());
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 6133cc6..444e74d 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1081,15 +1081,15 @@ class KafkaController(val config: KafkaConfig,
           val UpdateLeaderAndIsrResult(finishedUpdates, _) =
             zkClient.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch,
controllerContext.epochZkVersion)
 
-          finishedUpdates.get(partition).exists {
-            case Right(leaderAndIsr) =>
+          finishedUpdates.get(partition) match {
+            case Some(Right(leaderAndIsr)) =>
               val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr,
epoch)
               controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
               finalLeaderIsrAndControllerEpoch = Some(leaderIsrAndControllerEpoch)
               info(s"Updated leader epoch for partition $partition to ${leaderAndIsr.leaderEpoch}")
               true
-            case Left(e) =>
-              throw e
+            case Some(Left(e)) => throw e
+            case None => false
           }
         case None =>
           throw new IllegalStateException(s"Cannot update leader epoch for partition $partition
as " +


Mime
View raw message