kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-2334; Guard against non-monotonic offsets in the client (#5991)
Date Fri, 14 Dec 2018 21:53:19 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1522929  KAFKA-2334; Guard against non-monotonic offsets in the client (#5991)
1522929 is described below

commit 152292994e45d5bedda0673c142f9406e70d5d3e
Author: David Arthur <mumrah@gmail.com>
AuthorDate: Fri Dec 14 16:53:03 2018 -0500

    KAFKA-2334; Guard against non-monotonic offsets in the client (#5991)
    
    After a recent leader election, the leaders high-water mark might lag behind the offset
at the beginning of the new epoch (as well as the previous leader's HW). This can lead to
offsets going backwards from a client perspective, which is confusing and leads to strange
behavior in some clients.
    
    This change causes Partition#fetchOffsetForTimestamp to throw an exception to indicate
the offsets are not yet available from the leader. For new clients, a new OFFSET_NOT_AVAILABLE
error is added. For existing clients, a LEADER_NOT_AVAILABLE is thrown.
    
    This is an implementation of [KIP-207](https://cwiki.apache.org/confluence/display/KAFKA/KIP-207%3A+Offsets+returned+by+ListOffsetsResponse+should+be+monotonically+increasing+even+during+a+partition+leader+change).
    
    Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Dhruvil Shah <dhruvil@confluent.io>,
Jason Gustafson <jason@confluent.io>
---
 .../kafka/clients/consumer/internals/Fetcher.java  |   4 +-
 .../common/errors/OffsetNotAvailableException.java |  29 ++++
 .../org/apache/kafka/common/protocol/Errors.java   |   6 +-
 .../kafka/common/requests/ListOffsetRequest.java   |   5 +-
 .../kafka/common/requests/ListOffsetResponse.java  |   6 +-
 .../clients/consumer/internals/FetcherTest.java    |  39 +++++
 core/src/main/scala/kafka/api/ApiVersion.scala     |  11 +-
 core/src/main/scala/kafka/cluster/Partition.scala  |  35 ++++-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  31 ++--
 .../scala/kafka/server/ReplicaFetcherThread.scala  |   4 +-
 .../test/scala/unit/kafka/api/ApiVersionTest.scala |  18 +++
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 168 ++++++++++++++++++++-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  20 ++-
 13 files changed, 346 insertions(+), 30 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 265fc99..180fbba 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -811,7 +811,9 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
                         "is before 0.10.0", topicPartition);
             } else if (error == Errors.NOT_LEADER_FOR_PARTITION ||
                        error == Errors.REPLICA_NOT_AVAILABLE ||
-                       error == Errors.KAFKA_STORAGE_ERROR) {
+                       error == Errors.KAFKA_STORAGE_ERROR ||
+                       error == Errors.OFFSET_NOT_AVAILABLE ||
+                       error == Errors.LEADER_NOT_AVAILABLE) {
                 log.debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.",
                         topicPartition, error);
                 partitionsToRetry.add(topicPartition);
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetNotAvailableException.java
b/clients/src/main/java/org/apache/kafka/common/errors/OffsetNotAvailableException.java
new file mode 100644
index 0000000..97de3b3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/OffsetNotAvailableException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Indicates that the leader is not able to guarantee monotonically increasing offsets
+ * due to the high watermark lagging behind the epoch start offset after a recent leader
election
+ */
+public class OffsetNotAvailableException extends RetriableException {
+    private static final long serialVersionUID = 1L;
+
+    public OffsetNotAvailableException(String message) {
+        super(message);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index bd0815d..51a78f5 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -66,6 +66,7 @@ import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException;
 import org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.NotLeaderForPartitionException;
 import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
+import org.apache.kafka.common.errors.OffsetNotAvailableException;
 import org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.errors.OperationNotAttemptedException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
@@ -290,7 +291,10 @@ public enum Errors {
     UNSUPPORTED_COMPRESSION_TYPE(76, "The requesting client does not support the compression
type of given partition.",
             UnsupportedCompressionTypeException::new),
     STALE_BROKER_EPOCH(77, "Broker epoch has changed",
-            StaleBrokerEpochException::new);
+            StaleBrokerEpochException::new),
+    OFFSET_NOT_AVAILABLE(78, "The leader high watermark has not caught up from a recent leader
" +
+            "election so the offsets cannot be guaranteed to be monotonically increasing",
+            OffsetNotAvailableException::new);
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index 5107c4e..e9fe942 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -118,9 +118,12 @@ public class ListOffsetRequest extends AbstractRequest {
             ISOLATION_LEVEL,
             TOPICS_V4);
 
+    // V5 bump to include new possible error code (OFFSET_NOT_AVAILABLE)
+    private static final Schema LIST_OFFSET_REQUEST_V5 = LIST_OFFSET_REQUEST_V4;
+
     public static Schema[] schemaVersions() {
         return new Schema[] {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1, LIST_OFFSET_REQUEST_V2,
-            LIST_OFFSET_REQUEST_V3, LIST_OFFSET_REQUEST_V4};
+            LIST_OFFSET_REQUEST_V3, LIST_OFFSET_REQUEST_V4, LIST_OFFSET_REQUEST_V5};
     }
 
     private final int replicaId;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index 188571b..769c850 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -52,6 +52,8 @@ import static org.apache.kafka.common.protocol.types.Type.INT64;
  * - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} If the broker does not have metadata for a
topic or partition
  * - {@link Errors#KAFKA_STORAGE_ERROR} If the log directory for one of the requested partitions
is offline
  * - {@link Errors#UNKNOWN_SERVER_ERROR} For any unexpected errors
+ * - {@link Errors#LEADER_NOT_AVAILABLE} The leader's HW has not caught up after recent election
(v4 protocol)
+ * - {@link Errors#OFFSET_NOT_AVAILABLE} The leader's HW has not caught up after recent election
(v5+ protocol)
  */
 public class ListOffsetResponse extends AbstractResponse {
     public static final long UNKNOWN_TIMESTAMP = -1L;
@@ -125,9 +127,11 @@ public class ListOffsetResponse extends AbstractResponse {
             THROTTLE_TIME_MS,
             TOPICS_V4);
 
+    private static final Schema LIST_OFFSET_RESPONSE_V5 = LIST_OFFSET_RESPONSE_V4;
+
     public static Schema[] schemaVersions() {
         return new Schema[] {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1, LIST_OFFSET_RESPONSE_V2,
-            LIST_OFFSET_RESPONSE_V3, LIST_OFFSET_RESPONSE_V4};
+            LIST_OFFSET_RESPONSE_V3, LIST_OFFSET_RESPONSE_V4, LIST_OFFSET_RESPONSE_V5};
     }
 
     public static final class PartitionData {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 52b78e3..134cbed 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1126,6 +1126,45 @@ public class FetcherTest {
         assertEquals(5, subscriptions.position(tp0).longValue());
     }
 
+    /**
+     * Make sure the client behaves appropriately when receiving an exception for unavailable
offsets
+     */
+    @Test
+    public void testFetchOffsetErrors() {
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
+
+        // Fail with OFFSET_NOT_AVAILABLE
+        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
+                listOffsetResponse(Errors.OFFSET_NOT_AVAILABLE, 1L, 5L), false);
+        fetcher.resetOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
+        assertFalse(subscriptions.hasValidPosition(tp0));
+        assertTrue(subscriptions.isOffsetResetNeeded(tp0));
+        assertFalse(subscriptions.isFetchable(tp0));
+
+        // Fail with LEADER_NOT_AVAILABLE
+        time.sleep(retryBackoffMs);
+        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
+                listOffsetResponse(Errors.LEADER_NOT_AVAILABLE, 1L, 5L), false);
+        fetcher.resetOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
+        assertFalse(subscriptions.hasValidPosition(tp0));
+        assertTrue(subscriptions.isOffsetResetNeeded(tp0));
+        assertFalse(subscriptions.isFetchable(tp0));
+
+        // Back to normal
+        time.sleep(retryBackoffMs);
+        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
+                listOffsetResponse(Errors.NONE, 1L, 5L), false);
+        fetcher.resetOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
+        assertTrue(subscriptions.hasValidPosition(tp0));
+        assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+        assertTrue(subscriptions.isFetchable(tp0));
+        assertEquals(subscriptions.position(tp0).longValue(), 5L);
+    }
+
     @Test
     public void testListOffsetsSendsIsolationLevel() {
         for (final IsolationLevel isolationLevel : IsolationLevel.values()) {
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index cf36092..a68bcf0 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -84,7 +84,9 @@ object ApiVersion {
     KAFKA_2_1_IV2,
     // Introduced broker generation (KIP-380), and
     // LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1
-    KAFKA_2_2_IV0
+    KAFKA_2_2_IV0,
+    // New error code for ListOffsets when a new leader is lagging behind former HW (KIP-207)
+    KAFKA_2_2_IV1
   )
 
   // Map keys are the union of the short and full versions
@@ -289,6 +291,13 @@ case object KAFKA_2_2_IV0 extends DefaultApiVersion {
   val id: Int = 20
 }
 
+case object KAFKA_2_2_IV1 extends DefaultApiVersion {
+  val shortVersion: String = "2.2"
+  val subVersion = "IV1"
+  val recordVersion = RecordVersion.V2
+  val id: Int = 21
+}
+
 object ApiVersionValidator extends Validator {
 
   override def ensureValid(name: String, value: Any): Unit = {
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 1f52bd7..ca3abbb 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -817,17 +817,36 @@ class Partition(val topicPartition: TopicPartition,
       case None => localReplica.logEndOffset.messageOffset
     }
 
-    if (timestamp == ListOffsetRequest.LATEST_TIMESTAMP) {
-      Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset, Optional.of(leaderEpoch)))
+    val epochLogString = if(currentLeaderEpoch.isPresent) {
+      s"epoch ${currentLeaderEpoch.get}"
     } else {
-      def allowed(timestampOffset: TimestampAndOffset): Boolean =
-        timestamp == ListOffsetRequest.EARLIEST_TIMESTAMP || timestampOffset.offset <
lastFetchableOffset
+      "unknown epoch"
+    }
 
-      val fetchedOffset = logManager.getLog(topicPartition).flatMap { log =>
-        log.fetchOffsetByTimestamp(timestamp)
-      }
+    // Only consider throwing an error if we get a client request (isolationLevel is defined)
and the start offset
+    // is lagging behind the high watermark
+    val maybeOffsetsError: Option[ApiException] = leaderEpochStartOffsetOpt
+      .filter(epochStart => isolationLevel.isDefined && epochStart > localReplica.highWatermark.messageOffset)
+      .map(epochStart => Errors.OFFSET_NOT_AVAILABLE.exception(s"Failed to fetch offsets
for " +
+        s"partition $topicPartition with leader $epochLogString as this partition's " +
+        s"high watermark (${localReplica.highWatermark.messageOffset}) is lagging behind
the " +
+        s"start offset from the beginning of this epoch ($epochStart)."))
+
+    def getOffsetByTimestamp: Option[TimestampAndOffset] = {
+      logManager.getLog(topicPartition).flatMap(log => log.fetchOffsetByTimestamp(timestamp))
+    }
 
-      fetchedOffset.filter(allowed)
+    // If we're in the lagging HW state after a leader election, throw OffsetNotAvailable
for "latest" offset
+    // or for a timestamp lookup that is beyond the last fetchable offset.
+    timestamp match {
+      case ListOffsetRequest.LATEST_TIMESTAMP =>
+        maybeOffsetsError.map(e => throw e)
+          .orElse(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset,
Optional.of(leaderEpoch))))
+      case ListOffsetRequest.EARLIEST_TIMESTAMP =>
+        getOffsetByTimestamp
+      case _ =>
+        getOffsetByTimestamp.filter(timestampAndOffset => timestampAndOffset.offset <
lastFetchableOffset)
+          .orElse(maybeOffsetsError.map(e => throw e))
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index dfecfd3..6cf2403 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -834,9 +834,19 @@ class KafkaApis(val requestChannel: RequestChannel,
           ListOffsetResponse.UNKNOWN_OFFSET,
           Optional.empty()))
       } else {
+
+        def buildErrorResponse(e: Errors): (TopicPartition, ListOffsetResponse.PartitionData)
= {
+          (topicPartition, new ListOffsetResponse.PartitionData(
+            e,
+            ListOffsetResponse.UNKNOWN_TIMESTAMP,
+            ListOffsetResponse.UNKNOWN_OFFSET,
+            Optional.empty()))
+        }
+
         try {
           val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID
-          val isolationLevelOpt = if (offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID)
+          val isClientRequest = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID
+          val isolationLevelOpt = if (isClientRequest)
             Some(offsetRequest.isolationLevel)
           else
             None
@@ -866,16 +876,19 @@ class KafkaApis(val requestChannel: RequestChannel,
                     _ : UnsupportedForMessageFormatException) =>
             debug(s"Offset request with correlation id $correlationId from client $clientId
on " +
                 s"partition $topicPartition failed due to ${e.getMessage}")
-            (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e),
-              ListOffsetResponse.UNKNOWN_TIMESTAMP,
-              ListOffsetResponse.UNKNOWN_OFFSET,
-              Optional.empty()))
+            buildErrorResponse(Errors.forException(e))
+
+          // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE
+          case e: OffsetNotAvailableException =>
+            if(request.header.apiVersion >= 5) {
+              buildErrorResponse(Errors.forException(e))
+            } else {
+              buildErrorResponse(Errors.LEADER_NOT_AVAILABLE)
+            }
+
           case e: Throwable =>
             error("Error while responding to offset request", e)
-            (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e),
-              ListOffsetResponse.UNKNOWN_TIMESTAMP,
-              ListOffsetResponse.UNKNOWN_OFFSET,
-              Optional.empty()))
+            buildErrorResponse(Errors.forException(e))
         }
       }
     }
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 4452d89..4a09ebe 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -19,6 +19,7 @@ package kafka.server
 
 import java.util.Optional
 
+import kafka.api
 import kafka.api._
 import kafka.cluster.BrokerEndPoint
 import kafka.log.LogAppendInfo
@@ -80,7 +81,8 @@ class ReplicaFetcherThread(name: String,
 
   // Visible for testing
   private[server] val listOffsetRequestVersion: Short =
-    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV1) 4
+    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_2_IV1) 5
+    else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV1) 4
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1) 3
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV0) 2
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) 1
diff --git a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
index 1ffa695..3bd86f5 100644
--- a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
+++ b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
@@ -17,10 +17,13 @@
 
 package kafka.api
 
+import org.apache.commons.collections.CollectionUtils
 import org.apache.kafka.common.record.RecordVersion
 import org.junit.Test
 import org.junit.Assert._
 
+import scala.collection.JavaConverters
+
 class ApiVersionTest {
 
   @Test
@@ -84,6 +87,21 @@ class ApiVersionTest {
     assertEquals(KAFKA_2_1_IV0, ApiVersion("2.1-IV0"))
     assertEquals(KAFKA_2_1_IV1, ApiVersion("2.1-IV1"))
     assertEquals(KAFKA_2_1_IV2, ApiVersion("2.1-IV2"))
+
+    assertEquals(KAFKA_2_2_IV1, ApiVersion("2.2"))
+    assertEquals(KAFKA_2_2_IV0, ApiVersion("2.2-IV0"))
+    assertEquals(KAFKA_2_2_IV1, ApiVersion("2.2-IV1"))
+  }
+
+  @Test
+  def testApiVersionUniqueIds(): Unit = {
+    val allIds: Seq[Int] = ApiVersion.allVersions.map(apiVersion => {
+      apiVersion.id
+    })
+
+    val uniqueIds: Set[Int] = allIds.toSet
+
+    assertEquals(allIds.size, uniqueIds.size)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index cfaa147..4b9f656 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -29,7 +29,7 @@ import kafka.server._
 import kafka.utils.{CoreUtils, MockScheduler, MockTime, TestUtils}
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.ReplicaNotAvailableException
+import org.apache.kafka.common.errors.{ApiException, LeaderNotAvailableException, OffsetNotAvailableException,
ReplicaNotAvailableException}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
@@ -382,6 +382,172 @@ class PartitionTest {
     assertEquals(Optional.of(leaderEpoch), timestampAndOffset.leaderEpoch)
   }
 
+  /**
+    * This test checks that after a new leader election, we don't answer any ListOffsetsRequest
until
+    * the HW of the new leader has caught up to its startLogOffset for this epoch. From a
client
+    * perspective this helps guarantee monotonic offsets
+    *
+    * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-207%3A+Offsets+returned+by+ListOffsetsResponse+should+be+monotonically+increasing+even+during+a+partition+leader+change">KIP-207</a>
+    */
+  @Test
+  def testMonotonicOffsetsAfterLeaderChange(): Unit = {
+    val controllerEpoch = 3
+    val leader = brokerId
+    val follower1 = brokerId + 1
+    val follower2 = brokerId + 2
+    val controllerId = brokerId + 3
+    val replicas = List[Integer](leader, follower1, follower2).asJava
+    val isr = List[Integer](leader, follower2).asJava
+    val leaderEpoch = 8
+    val batch1 = TestUtils.records(records = List(
+      new SimpleRecord(10, "k1".getBytes, "v1".getBytes),
+      new SimpleRecord(11,"k2".getBytes, "v2".getBytes)))
+    val batch2 = TestUtils.records(records = List(new SimpleRecord("k3".getBytes, "v1".getBytes),
+      new SimpleRecord(20,"k4".getBytes, "v2".getBytes),
+      new SimpleRecord(21,"k5".getBytes, "v3".getBytes)))
+    val batch3 = TestUtils.records(records = List(
+      new SimpleRecord(30,"k6".getBytes, "v1".getBytes),
+      new SimpleRecord(31,"k7".getBytes, "v2".getBytes)))
+
+    val partition = Partition(topicPartition, time, replicaManager)
+    assertTrue("Expected first makeLeader() to return 'leader changed'",
+      partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch,
leader, leaderEpoch, isr, 1, replicas, true), 0))
+    assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch)
+    assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicas.map(_.brokerId))
+
+    // after makeLeader(() call, partition should know about all the replicas
+    val leaderReplica = partition.getReplica(leader).get
+    val follower1Replica = partition.getReplica(follower1).get
+    val follower2Replica = partition.getReplica(follower2).get
+
+    // append records with initial leader epoch
+    val lastOffsetOfFirstBatch = partition.appendRecordsToLeader(batch1, isFromClient = true).lastOffset
+    partition.appendRecordsToLeader(batch2, isFromClient = true)
+    assertEquals("Expected leader's HW not move", leaderReplica.logStartOffset, leaderReplica.highWatermark.messageOffset)
+
+    // let the follower in ISR move leader's HW to move further but below LEO
+    def readResult(fetchInfo: FetchDataInfo, leaderReplica: Replica): LogReadResult = {
+      LogReadResult(info = fetchInfo,
+        highWatermark = leaderReplica.highWatermark.messageOffset,
+        leaderLogStartOffset = leaderReplica.logStartOffset,
+        leaderLogEndOffset = leaderReplica.logEndOffset.messageOffset,
+        followerLogStartOffset = 0,
+        fetchTimeMs = time.milliseconds,
+        readSize = 10240,
+        lastStableOffset = None)
+    }
+
+    def fetchOffsetsForTimestamp(timestamp: Long, isolation: Option[IsolationLevel]): Either[ApiException,
Option[TimestampAndOffset]] = {
+      try {
+        Right(partition.fetchOffsetForTimestamp(
+          timestamp = timestamp,
+          isolationLevel = isolation,
+          currentLeaderEpoch = Optional.of(partition.getLeaderEpoch),
+          fetchOnlyFromLeader = true
+        ))
+      } catch {
+        case e: ApiException => Left(e)
+      }
+    }
+
+    // Update follower 1
+    partition.updateReplicaLogReadResult(
+      follower1Replica, readResult(FetchDataInfo(LogOffsetMetadata(0), batch1), leaderReplica))
+    partition.updateReplicaLogReadResult(
+      follower1Replica, readResult(FetchDataInfo(LogOffsetMetadata(2), batch2), leaderReplica))
+
+    // Update follower 2
+    partition.updateReplicaLogReadResult(
+      follower2Replica, readResult(FetchDataInfo(LogOffsetMetadata(0), batch1), leaderReplica))
+    partition.updateReplicaLogReadResult(
+      follower2Replica, readResult(FetchDataInfo(LogOffsetMetadata(2), batch2), leaderReplica))
+
+    // At this point, the leader has gotten 5 writes, but followers have only fetched two
+    assertEquals(2, partition.localReplica.get.highWatermark.messageOffset)
+
+    // Get the LEO
+    fetchOffsetsForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP, None) match {
+      case Right(Some(offsetAndTimestamp)) => assertEquals(5, offsetAndTimestamp.offset)
+      case Right(None) => fail("Should have seen some offsets")
+      case Left(e) => fail("Should not have seen an error")
+    }
+
+    // Get the HW
+    fetchOffsetsForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP, Some(IsolationLevel.READ_UNCOMMITTED))
match {
+      case Right(Some(offsetAndTimestamp)) => assertEquals(2, offsetAndTimestamp.offset)
+      case Right(None) => fail("Should have seen some offsets")
+      case Left(e) => fail("Should not have seen an error")
+    }
+
+    // Get a offset beyond the HW by timestamp, get a None
+    assertEquals(Right(None), fetchOffsetsForTimestamp(30, Some(IsolationLevel.READ_UNCOMMITTED)))
+
+    // Make into a follower
+    assertTrue(partition.makeFollower(controllerId,
+      new LeaderAndIsrRequest.PartitionState(controllerEpoch, follower2, leaderEpoch + 1,
isr, 1, replicas, false), 1))
+
+    // Back to leader, this resets the startLogOffset for this epoch (to 2), we're now in
the fault condition
+    assertTrue(partition.makeLeader(controllerId,
+      new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch + 2, isr,
1, replicas, false), 2))
+
+    // Try to get offsets as a client
+    fetchOffsetsForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP, Some(IsolationLevel.READ_UNCOMMITTED))
match {
+      case Right(Some(offsetAndTimestamp)) => fail("Should have failed with OffsetNotAvailable")
+      case Right(None) => fail("Should have seen an error")
+      case Left(e: OffsetNotAvailableException) => // ok
+      case Left(e: ApiException) => fail(s"Expected OffsetNotAvailableException, got $e")
+    }
+
+    // If request is not from a client, we skip the check
+    fetchOffsetsForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP, None) match {
+      case Right(Some(offsetAndTimestamp)) => assertEquals(5, offsetAndTimestamp.offset)
+      case Right(None) => fail("Should have seen some offsets")
+      case Left(e: ApiException) => fail(s"Got ApiException $e")
+    }
+
+    // If we request the earliest timestamp, we skip the check
+    fetchOffsetsForTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP, Some(IsolationLevel.READ_UNCOMMITTED))
match {
+      case Right(Some(offsetAndTimestamp)) => assertEquals(0, offsetAndTimestamp.offset)
+      case Right(None) => fail("Should have seen some offsets")
+      case Left(e: ApiException) => fail(s"Got ApiException $e")
+    }
+
+    // If we request an offset by timestamp earlier than the HW, we are ok
+    fetchOffsetsForTimestamp(11, Some(IsolationLevel.READ_UNCOMMITTED)) match {
+      case Right(Some(offsetAndTimestamp)) =>
+        assertEquals(1, offsetAndTimestamp.offset)
+        assertEquals(11, offsetAndTimestamp.timestamp)
+      case Right(None) => fail("Should have seen some offsets")
+      case Left(e: ApiException) => fail(s"Got ApiException $e")
+    }
+
+    // Request an offset by timestamp beyond the HW, get an error now since we're in a bad
state
+    fetchOffsetsForTimestamp(100, Some(IsolationLevel.READ_UNCOMMITTED)) match {
+      case Right(Some(offsetAndTimestamp)) => fail("Should have failed")
+      case Right(None) => fail("Should have failed")
+      case Left(e: OffsetNotAvailableException) => // ok
+      case Left(e: ApiException) => fail("Should have seen OffsetNotAvailableException,
saw $e")
+    }
+
+
+    // Next fetch from replicas, HW is moved up to 5 (ahead of the LEO)
+    partition.updateReplicaLogReadResult(
+      follower1Replica, readResult(FetchDataInfo(LogOffsetMetadata(5), MemoryRecords.EMPTY),
leaderReplica))
+    partition.updateReplicaLogReadResult(
+      follower2Replica, readResult(FetchDataInfo(LogOffsetMetadata(5), MemoryRecords.EMPTY),
leaderReplica))
+
+    // Error goes away
+    fetchOffsetsForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP, Some(IsolationLevel.READ_UNCOMMITTED))
match {
+      case Right(Some(offsetAndTimestamp)) => assertEquals(5, offsetAndTimestamp.offset)
+      case Right(None) => fail("Should have seen some offsets")
+      case Left(e: ApiException) => fail(s"Got ApiException $e")
+    }
+
+    // Now we see None instead of an error for out of range timestamp
+    assertEquals(Right(None), fetchOffsetsForTimestamp(100, Some(IsolationLevel.READ_UNCOMMITTED)))
+  }
+
+
   private def setupPartitionWithMocks(leaderEpoch: Int,
                                       isLeader: Boolean,
                                       log: Log = logManager.getOrCreateLog(topicPartition,
logConfig)): Partition = {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index f04c70f..9b4210e 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -373,9 +373,13 @@ class KafkaApisTest {
     val isolationLevel = IsolationLevel.READ_UNCOMMITTED
     val currentLeaderEpoch = Optional.of[Integer](15)
 
-    EasyMock.expect(replicaManager.fetchOffsetForTimestamp(tp, ListOffsetRequest.EARLIEST_TIMESTAMP,
-      Some(isolationLevel), currentLeaderEpoch, fetchOnlyFromLeader = true))
-      .andThrow(error.exception)
+    EasyMock.expect(replicaManager.fetchOffsetForTimestamp(
+      EasyMock.eq(tp),
+      EasyMock.eq(ListOffsetRequest.EARLIEST_TIMESTAMP),
+      EasyMock.eq(Some(isolationLevel)),
+      EasyMock.eq(currentLeaderEpoch),
+      fetchOnlyFromLeader = EasyMock.eq(true))
+    ).andThrow(error.exception)
 
     val capturedResponse = expectNoThrottling()
     EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
@@ -462,9 +466,13 @@ class KafkaApisTest {
     val latestOffset = 15L
     val currentLeaderEpoch = Optional.empty[Integer]()
 
-    EasyMock.expect(replicaManager.fetchOffsetForTimestamp(tp, ListOffsetRequest.LATEST_TIMESTAMP,
-      Some(isolationLevel), currentLeaderEpoch, fetchOnlyFromLeader = true))
-      .andReturn(Some(new TimestampAndOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, latestOffset,
currentLeaderEpoch)))
+    EasyMock.expect(replicaManager.fetchOffsetForTimestamp(
+      EasyMock.eq(tp),
+      EasyMock.eq(ListOffsetRequest.LATEST_TIMESTAMP),
+      EasyMock.eq(Some(isolationLevel)),
+      EasyMock.eq(currentLeaderEpoch),
+      fetchOnlyFromLeader = EasyMock.eq(true))
+    ).andReturn(Some(new TimestampAndOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, latestOffset,
currentLeaderEpoch)))
 
     val capturedResponse = expectNoThrottling()
     EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)


Mime
View raw message