kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maniku...@apache.org
Subject [kafka] branch 2.7 updated: KAFKA-10669: Make CurrentLeaderEpoch field ignorable and set MaxNumOffsets field default to 1
Date Mon, 02 Nov 2020 18:30:44 GMT
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new edc2c4f  KAFKA-10669: Make CurrentLeaderEpoch field ignorable and set MaxNumOffsets
field default to 1
edc2c4f is described below

commit edc2c4fd8d1e91237b08c8df70628e74637e4e47
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
AuthorDate: Mon Nov 2 23:39:03 2020 +0530

    KAFKA-10669: Make CurrentLeaderEpoch field ignorable and set MaxNumOffsets field default
to 1
    
    Couple of failures observed after KAFKA-9627: Replace ListOffset request/response with
automated protocol (https://github.com/apache/kafka/pull/8295)
    
    1. Latest consumer fails to consume from 0.10.0.1 brokers. Below system tests are failing
    kafkatest.tests.client.client_compatibility_features_test.ClientCompatibilityFeaturesTest
    kafkatest.tests.client.client_compatibility_produce_consume_test.ClientCompatibilityProduceConsumeTest
    
    Solution: Current default value for MaxNumOffsets is 0. because to this brokers are not
returning offsets for v0 request. Set default value for MaxNumOffsets field to 1.  This is
similar to previous [approach]
    (https://github.com/apache/kafka/blob/2.6/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java#L204)
    
    2. In some scenarios, latest consumer fails with below error when connecting to a Kafka
cluster which consists of newer and older (<=2.0) Kafka brokers
    `org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default
currentLeaderEpoch at version 3`
    
    Solution: After #8295, consumer can set non-default CurrentLeaderEpoch value for v3 and
below requests. One solution is to make CurrentLeaderEpoch ignorable.
    
    Author: Manikumar Reddy <manikumar.reddy@gmail.com>
    
    Reviewers: David Jacot <djacot@confluent.io>
    
    Closes #9540 from omkreddy/fix-listoffsets
    
    (cherry picked from commit 236d7dc890e82c9b146579a8be801c1c7f54feb9)
    Signed-off-by: Manikumar Reddy <manikumar.reddy@gmail.com>
---
 .../common/message/ListOffsetRequest.json          |  4 +--
 .../kafka/common/requests/RequestResponseTest.java | 13 +++++-----
 .../unit/kafka/server/ListOffsetsRequestTest.scala | 30 +++++++++++++++++-----
 3 files changed, 32 insertions(+), 15 deletions(-)

diff --git a/clients/src/main/resources/common/message/ListOffsetRequest.json b/clients/src/main/resources/common/message/ListOffsetRequest.json
index 259d7bf..5ecc2d6 100644
--- a/clients/src/main/resources/common/message/ListOffsetRequest.json
+++ b/clients/src/main/resources/common/message/ListOffsetRequest.json
@@ -42,11 +42,11 @@
         "about": "Each partition in the request.", "fields": [
         { "name": "PartitionIndex", "type": "int32", "versions": "0+",
           "about": "The partition index." },
-        { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "4+", "default": "-1",
+        { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "4+", "default": "-1",
"ignorable": true,
           "about": "The current leader epoch." },
         { "name": "Timestamp", "type": "int64", "versions": "0+",
           "about": "The current timestamp." },
-        { "name": "MaxNumOffsets", "type": "int32", "versions": "0",
+        { "name": "MaxNumOffsets", "type": "int32", "versions": "0", "default": "1",
           "about": "The maximum number of offsets to report." }
       ]}
     ]}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 0862e2b..71048d8 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1243,7 +1243,8 @@ public class RequestResponseTest {
                     .setPartitions(Arrays.asList(new ListOffsetPartition()
                             .setPartitionIndex(0)
                             .setTimestamp(1000000L)
-                            .setMaxNumOffsets(10)));
+                            .setMaxNumOffsets(10)
+                            .setCurrentLeaderEpoch(5)));
             return ListOffsetRequest.Builder
                     .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
                     .setTargetTimes(Collections.singletonList(topic))
@@ -1253,7 +1254,8 @@ public class RequestResponseTest {
                     .setName("test")
                     .setPartitions(Arrays.asList(new ListOffsetPartition()
                             .setPartitionIndex(0)
-                            .setTimestamp(1000000L)));
+                            .setTimestamp(1000000L)
+                            .setCurrentLeaderEpoch(5)));
             return ListOffsetRequest.Builder
                     .forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
                     .setTargetTimes(Collections.singletonList(topic))
@@ -1261,10 +1263,9 @@ public class RequestResponseTest {
         } else if (version >= 2 && version <= 5) {
             ListOffsetPartition partition = new ListOffsetPartition()
                     .setPartitionIndex(0)
-                    .setTimestamp(1000000L);
-            if (version >= 4) {
-                partition.setCurrentLeaderEpoch(5);
-            }
+                    .setTimestamp(1000000L)
+                    .setCurrentLeaderEpoch(5);
+
             ListOffsetTopic topic = new ListOffsetTopic()
                     .setName("test")
                     .setPartitions(Arrays.asList(partition));
diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
index cedbf0a..ce324c7 100644
--- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
@@ -143,7 +143,13 @@ class ListOffsetsRequestTest extends BaseRequestTest {
     val partitionData = response.topics.asScala.find(_.name == topic).get
       .partitions.asScala.find(_.partitionIndex == partition.partition).get
 
-    (partitionData.offset, partitionData.leaderEpoch)
+    if (version == 0) {
+      if (partitionData.oldStyleOffsets().isEmpty)
+        (-1, partitionData.leaderEpoch)
+      else
+        (partitionData.oldStyleOffsets().asScala.head, partitionData.leaderEpoch)
+    } else
+      (partitionData.offset, partitionData.leaderEpoch)
   }
 
   @Test
@@ -174,17 +180,27 @@ class ListOffsetsRequestTest extends BaseRequestTest {
   }
 
   @Test
-  def testResponseDefaultOffsetAndLeaderEpochForLowerVersions(): Unit = {
+  def testResponseDefaultOffsetAndLeaderEpochForAllVersions(): Unit = {
     val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor
= 3, servers)
     val firstLeaderId = partitionToLeader(partition.partition)
 
     TestUtils.generateAndProduceMessages(servers, topic, 10)
 
-    assertEquals((-1L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 0))
-    assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 1))
-    assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 2))
-    assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 3))
-    assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, 4))
+    for (version <- ApiKeys.LIST_OFFSETS.oldestVersion to ApiKeys.LIST_OFFSETS.latestVersion)
{
+      if (version == 0) {
+        assertEquals((-1L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
+        assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP,
version.toShort))
+        assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP,
version.toShort))
+      } else if (version >= 1 && version <= 3) {
+        assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
+        assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP,
version.toShort))
+        assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP,
version.toShort))
+      } else if (version >= 4) {
+        assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
+        assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.EARLIEST_TIMESTAMP,
version.toShort))
+        assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetRequest.LATEST_TIMESTAMP,
version.toShort))
+      }
+    }
   }
 
   private def assertResponseError(error: Errors, brokerId: Int, request: ListOffsetRequest):
Unit = {


Mime
View raw message