kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8442; Include ISR in Metadata response even if there is no leader (#6836)
Date Tue, 30 Jul 2019 15:44:28 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c6286b2  KAFKA-8442; Include ISR in Metadata response even if there is no leader
(#6836)
c6286b2 is described below

commit c6286b2b3e0d573773844f9b0a2931e9c2058f58
Author: huxi <huxi_2b@hotmail.com>
AuthorDate: Tue Jul 30 23:44:11 2019 +0800

    KAFKA-8442; Include ISR in Metadata response even if there is no leader (#6836)
    
    Currently the Metadata response returns an empty ISR if there is no active leader. The
behavior is inconsistent since other fields such as the replica list and offline replicas
are included. This patch changes the behavior to return the current known ISR. This fixes
a problem with the topic describe command which fails to report ISR when a leader is offline.
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 core/src/main/scala/kafka/admin/TopicCommand.scala                 | 5 ++++-
 core/src/main/scala/kafka/server/MetadataCache.scala               | 7 +++----
 .../scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala   | 2 +-
 core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala      | 2 +-
 4 files changed, 9 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index f2e197a..a25e546 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -120,7 +120,10 @@ object TopicCommand extends Logging {
       opts.reportUnavailablePartitions && hasUnavailablePartitions(partitionDescription)
     }
     private def hasUnderMinIsrPartitions(partitionDescription: PartitionDescription) = {
-      partitionDescription.isr.size < partitionDescription.minIsrCount
+      if (partitionDescription.leader.isDefined)
+        partitionDescription.isr.size < partitionDescription.minIsrCount
+      else
+        partitionDescription.minIsrCount > 0
     }
     private def hasAtMinIsrPartitions(partitionDescription: PartitionDescription) = {
       partitionDescription.isr.size == partitionDescription.minIsrCount
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 8b8e159..7d10bb0 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -82,6 +82,8 @@ class MetadataCache(brokerId: Int) extends Logging {
         val replicaInfo = getEndpoints(snapshot, replicas, listenerName, errorUnavailableEndpoints)
         val offlineReplicaInfo = getEndpoints(snapshot, partitionState.offlineReplicas.asScala,
listenerName, errorUnavailableEndpoints)
 
+        val isr = partitionState.basePartitionState.isr.asScala
+        val isrInfo = getEndpoints(snapshot, isr, listenerName, errorUnavailableEndpoints)
         maybeLeader match {
           case None =>
             val error = if (!snapshot.aliveBrokers.contains(brokerId)) { // we are already
holding the read lock
@@ -92,13 +94,10 @@ class MetadataCache(brokerId: Int) extends Logging {
               if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else Errors.LEADER_NOT_AVAILABLE
             }
             new MetadataResponse.PartitionMetadata(error, partitionId.toInt, Node.noNode(),
-              Optional.empty(), replicaInfo.asJava, java.util.Collections.emptyList(),
+              Optional.empty(), replicaInfo.asJava, isrInfo.asJava,
               offlineReplicaInfo.asJava)
 
           case Some(leader) =>
-            val isr = partitionState.basePartitionState.isr.asScala
-            val isrInfo = getEndpoints(snapshot, isr, listenerName, errorUnavailableEndpoints)
-
             if (replicaInfo.size < replicas.size) {
               debug(s"Error while fetching metadata for $topicPartition: replica information
not available for " +
                 s"following brokers ${replicas.filterNot(replicaInfo.map(_.id).contains).mkString(",")}")
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
index 8813b59..a04ef0d 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
@@ -602,7 +602,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with
Loggin
           topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName,
"--unavailable-partitions"))))
       val rows = output.split("\n")
       assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"))
-      assertTrue(rows(0).endsWith("Leader: none\tReplicas: 0\tIsr: "))
+      assertTrue(rows(0).contains("Leader: none\tReplicas: 0\tIsr:"))
     } finally {
       restartDeadBrokers()
     }
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 5ddabc0..9f73fbe 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -182,7 +182,7 @@ class MetadataCacheTest {
     val partitionMetadata = partitionMetadatas.get(0)
     assertEquals(0, partitionMetadata.partition)
     assertEquals(expectedError, partitionMetadata.error)
-    assertTrue(partitionMetadata.isr.isEmpty)
+    assertFalse(partitionMetadata.isr.isEmpty)
     assertEquals(1, partitionMetadata.replicas.size)
     assertEquals(0, partitionMetadata.replicas.get(0).id)
   }


Mime
View raw message