This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1522929 KAFKA-2334; Guard against non-monotonic offsets in the client (#5991)
1522929 is described below
commit 152292994e45d5bedda0673c142f9406e70d5d3e
Author: David Arthur <mumrah@gmail.com>
AuthorDate: Fri Dec 14 16:53:03 2018 -0500
KAFKA-2334; Guard against non-monotonic offsets in the client (#5991)
After a recent leader election, the leaders high-water mark might lag behind the offset
at the beginning of the new epoch (as well as the previous leader's HW). This can lead to
offsets going backwards from a client perspective, which is confusing and leads to strange
behavior in some clients.
This change causes Partition#fetchOffsetForTimestamp to throw an exception to indicate
the offsets are not yet available from the leader. For new clients, a new OFFSET_NOT_AVAILABLE
error is added. For existing clients, a LEADER_NOT_AVAILABLE is thrown.
This is an implementation of [KIP-207](https://cwiki.apache.org/confluence/display/KAFKA/KIP-207%3A+Offsets+returned+by+ListOffsetsResponse+should+be+monotonically+increasing+even+during+a+partition+leader+change).
Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Dhruvil Shah <dhruvil@confluent.io>,
Jason Gustafson <jason@confluent.io>
---
.../kafka/clients/consumer/internals/Fetcher.java | 4 +-
.../common/errors/OffsetNotAvailableException.java | 29 ++++
.../org/apache/kafka/common/protocol/Errors.java | 6 +-
.../kafka/common/requests/ListOffsetRequest.java | 5 +-
.../kafka/common/requests/ListOffsetResponse.java | 6 +-
.../clients/consumer/internals/FetcherTest.java | 39 +++++
core/src/main/scala/kafka/api/ApiVersion.scala | 11 +-
core/src/main/scala/kafka/cluster/Partition.scala | 35 ++++-
core/src/main/scala/kafka/server/KafkaApis.scala | 31 ++--
.../scala/kafka/server/ReplicaFetcherThread.scala | 4 +-
.../test/scala/unit/kafka/api/ApiVersionTest.scala | 18 +++
.../scala/unit/kafka/cluster/PartitionTest.scala | 168 ++++++++++++++++++++-
.../scala/unit/kafka/server/KafkaApisTest.scala | 20 ++-
13 files changed, 346 insertions(+), 30 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 265fc99..180fbba 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -811,7 +811,9 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
"is before 0.10.0", topicPartition);
} else if (error == Errors.NOT_LEADER_FOR_PARTITION ||
error == Errors.REPLICA_NOT_AVAILABLE ||
- error == Errors.KAFKA_STORAGE_ERROR) {
+ error == Errors.KAFKA_STORAGE_ERROR ||
+ error == Errors.OFFSET_NOT_AVAILABLE ||
+ error == Errors.LEADER_NOT_AVAILABLE) {
log.debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.",
topicPartition, error);
partitionsToRetry.add(topicPartition);
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetNotAvailableException.java
b/clients/src/main/java/org/apache/kafka/common/errors/OffsetNotAvailableException.java
new file mode 100644
index 0000000..97de3b3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/OffsetNotAvailableException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Indicates that the leader is not able to guarantee monotonically increasing offsets
+ * due to the high watermark lagging behind the epoch start offset after a recent leader
election
+ */
+public class OffsetNotAvailableException extends RetriableException {
+ private static final long serialVersionUID = 1L;
+
+ public OffsetNotAvailableException(String message) {
+ super(message);
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index bd0815d..51a78f5 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -66,6 +66,7 @@ import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.NotLeaderForPartitionException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
+import org.apache.kafka.common.errors.OffsetNotAvailableException;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.OperationNotAttemptedException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
@@ -290,7 +291,10 @@ public enum Errors {
UNSUPPORTED_COMPRESSION_TYPE(76, "The requesting client does not support the compression
type of given partition.",
UnsupportedCompressionTypeException::new),
STALE_BROKER_EPOCH(77, "Broker epoch has changed",
- StaleBrokerEpochException::new);
+ StaleBrokerEpochException::new),
+ OFFSET_NOT_AVAILABLE(78, "The leader high watermark has not caught up from a recent leader
" +
+ "election so the offsets cannot be guaranteed to be monotonically increasing",
+ OffsetNotAvailableException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index 5107c4e..e9fe942 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -118,9 +118,12 @@ public class ListOffsetRequest extends AbstractRequest {
ISOLATION_LEVEL,
TOPICS_V4);
+ // V5 bump to include new possible error code (OFFSET_NOT_AVAILABLE)
+ private static final Schema LIST_OFFSET_REQUEST_V5 = LIST_OFFSET_REQUEST_V4;
+
public static Schema[] schemaVersions() {
return new Schema[] {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1, LIST_OFFSET_REQUEST_V2,
- LIST_OFFSET_REQUEST_V3, LIST_OFFSET_REQUEST_V4};
+ LIST_OFFSET_REQUEST_V3, LIST_OFFSET_REQUEST_V4, LIST_OFFSET_REQUEST_V5};
}
private final int replicaId;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index 188571b..769c850 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -52,6 +52,8 @@ import static org.apache.kafka.common.protocol.types.Type.INT64;
* - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} If the broker does not have metadata for a
topic or partition
* - {@link Errors#KAFKA_STORAGE_ERROR} If the log directory for one of the requested partitions
is offline
* - {@link Errors#UNKNOWN_SERVER_ERROR} For any unexpected errors
+ * - {@link Errors#LEADER_NOT_AVAILABLE} The leader's HW has not caught up after recent election
(v4 protocol)
+ * - {@link Errors#OFFSET_NOT_AVAILABLE} The leader's HW has not caught up after recent election
(v5+ protocol)
*/
public class ListOffsetResponse extends AbstractResponse {
public static final long UNKNOWN_TIMESTAMP = -1L;
@@ -125,9 +127,11 @@ public class ListOffsetResponse extends AbstractResponse {
THROTTLE_TIME_MS,
TOPICS_V4);
+ private static final Schema LIST_OFFSET_RESPONSE_V5 = LIST_OFFSET_RESPONSE_V4;
+
public static Schema[] schemaVersions() {
return new Schema[] {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1, LIST_OFFSET_RESPONSE_V2,
- LIST_OFFSET_RESPONSE_V3, LIST_OFFSET_RESPONSE_V4};
+ LIST_OFFSET_RESPONSE_V3, LIST_OFFSET_RESPONSE_V4, LIST_OFFSET_RESPONSE_V5};
}
public static final class PartitionData {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 52b78e3..134cbed 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1126,6 +1126,45 @@ public class FetcherTest {
assertEquals(5, subscriptions.position(tp0).longValue());
}
+ /**
+ * Make sure the client behaves appropriately when receiving an exception for unavailable
offsets
+ */
+ @Test
+ public void testFetchOffsetErrors() {
+ subscriptions.assignFromUser(singleton(tp0));
+ subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.LATEST);
+
+ // Fail with OFFSET_NOT_AVAILABLE
+ client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
+ listOffsetResponse(Errors.OFFSET_NOT_AVAILABLE, 1L, 5L), false);
+ fetcher.resetOffsetsIfNeeded();
+ consumerClient.pollNoWakeup();
+ assertFalse(subscriptions.hasValidPosition(tp0));
+ assertTrue(subscriptions.isOffsetResetNeeded(tp0));
+ assertFalse(subscriptions.isFetchable(tp0));
+
+ // Fail with LEADER_NOT_AVAILABLE
+ time.sleep(retryBackoffMs);
+ client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
+ listOffsetResponse(Errors.LEADER_NOT_AVAILABLE, 1L, 5L), false);
+ fetcher.resetOffsetsIfNeeded();
+ consumerClient.pollNoWakeup();
+ assertFalse(subscriptions.hasValidPosition(tp0));
+ assertTrue(subscriptions.isOffsetResetNeeded(tp0));
+ assertFalse(subscriptions.isFetchable(tp0));
+
+ // Back to normal
+ time.sleep(retryBackoffMs);
+ client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
+ listOffsetResponse(Errors.NONE, 1L, 5L), false);
+ fetcher.resetOffsetsIfNeeded();
+ consumerClient.pollNoWakeup();
+ assertTrue(subscriptions.hasValidPosition(tp0));
+ assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+ assertTrue(subscriptions.isFetchable(tp0));
+ assertEquals(subscriptions.position(tp0).longValue(), 5L);
+ }
+
@Test
public void testListOffsetsSendsIsolationLevel() {
for (final IsolationLevel isolationLevel : IsolationLevel.values()) {
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index cf36092..a68bcf0 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -84,7 +84,9 @@ object ApiVersion {
KAFKA_2_1_IV2,
// Introduced broker generation (KIP-380), and
// LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1
- KAFKA_2_2_IV0
+ KAFKA_2_2_IV0,
+ // New error code for ListOffsets when a new leader is lagging behind former HW (KIP-207)
+ KAFKA_2_2_IV1
)
// Map keys are the union of the short and full versions
@@ -289,6 +291,13 @@ case object KAFKA_2_2_IV0 extends DefaultApiVersion {
val id: Int = 20
}
+case object KAFKA_2_2_IV1 extends DefaultApiVersion {
+ val shortVersion: String = "2.2"
+ val subVersion = "IV1"
+ val recordVersion = RecordVersion.V2
+ val id: Int = 21
+}
+
object ApiVersionValidator extends Validator {
override def ensureValid(name: String, value: Any): Unit = {
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 1f52bd7..ca3abbb 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -817,17 +817,36 @@ class Partition(val topicPartition: TopicPartition,
case None => localReplica.logEndOffset.messageOffset
}
- if (timestamp == ListOffsetRequest.LATEST_TIMESTAMP) {
- Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset, Optional.of(leaderEpoch)))
+ val epochLogString = if(currentLeaderEpoch.isPresent) {
+ s"epoch ${currentLeaderEpoch.get}"
} else {
- def allowed(timestampOffset: TimestampAndOffset): Boolean =
- timestamp == ListOffsetRequest.EARLIEST_TIMESTAMP || timestampOffset.offset <
lastFetchableOffset
+ "unknown epoch"
+ }
- val fetchedOffset = logManager.getLog(topicPartition).flatMap { log =>
- log.fetchOffsetByTimestamp(timestamp)
- }
+ // Only consider throwing an error if we get a client request (isolationLevel is defined)
and the start offset
+ // is lagging behind the high watermark
+ val maybeOffsetsError: Option[ApiException] = leaderEpochStartOffsetOpt
+ .filter(epochStart => isolationLevel.isDefined && epochStart > localReplica.highWatermark.messageOffset)
+ .map(epochStart => Errors.OFFSET_NOT_AVAILABLE.exception(s"Failed to fetch offsets
for " +
+ s"partition $topicPartition with leader $epochLogString as this partition's " +
+ s"high watermark (${localReplica.highWatermark.messageOffset}) is lagging behind
the " +
+ s"start offset from the beginning of this epoch ($epochStart)."))
+
+ def getOffsetByTimestamp: Option[TimestampAndOffset] = {
+ logManager.getLog(topicPartition).flatMap(log => log.fetchOffsetByTimestamp(timestamp))
+ }
- fetchedOffset.filter(allowed)
+ // If we're in the lagging HW state after a leader election, throw OffsetNotAvailable
for "latest" offset
+ // or for a timestamp lookup that is beyond the last fetchable offset.
+ timestamp match {
+ case ListOffsetRequest.LATEST_TIMESTAMP =>
+ maybeOffsetsError.map(e => throw e)
+ .orElse(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset,
Optional.of(leaderEpoch))))
+ case ListOffsetRequest.EARLIEST_TIMESTAMP =>
+ getOffsetByTimestamp
+ case _ =>
+ getOffsetByTimestamp.filter(timestampAndOffset => timestampAndOffset.offset <
lastFetchableOffset)
+ .orElse(maybeOffsetsError.map(e => throw e))
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index dfecfd3..6cf2403 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -834,9 +834,19 @@ class KafkaApis(val requestChannel: RequestChannel,
ListOffsetResponse.UNKNOWN_OFFSET,
Optional.empty()))
} else {
+
+ def buildErrorResponse(e: Errors): (TopicPartition, ListOffsetResponse.PartitionData)
= {
+ (topicPartition, new ListOffsetResponse.PartitionData(
+ e,
+ ListOffsetResponse.UNKNOWN_TIMESTAMP,
+ ListOffsetResponse.UNKNOWN_OFFSET,
+ Optional.empty()))
+ }
+
try {
val fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID
- val isolationLevelOpt = if (offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID)
+ val isClientRequest = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID
+ val isolationLevelOpt = if (isClientRequest)
Some(offsetRequest.isolationLevel)
else
None
@@ -866,16 +876,19 @@ class KafkaApis(val requestChannel: RequestChannel,
_ : UnsupportedForMessageFormatException) =>
debug(s"Offset request with correlation id $correlationId from client $clientId
on " +
s"partition $topicPartition failed due to ${e.getMessage}")
- (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e),
- ListOffsetResponse.UNKNOWN_TIMESTAMP,
- ListOffsetResponse.UNKNOWN_OFFSET,
- Optional.empty()))
+ buildErrorResponse(Errors.forException(e))
+
+ // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE
+ case e: OffsetNotAvailableException =>
+ if(request.header.apiVersion >= 5) {
+ buildErrorResponse(Errors.forException(e))
+ } else {
+ buildErrorResponse(Errors.LEADER_NOT_AVAILABLE)
+ }
+
case e: Throwable =>
error("Error while responding to offset request", e)
- (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e),
- ListOffsetResponse.UNKNOWN_TIMESTAMP,
- ListOffsetResponse.UNKNOWN_OFFSET,
- Optional.empty()))
+ buildErrorResponse(Errors.forException(e))
}
}
}
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 4452d89..4a09ebe 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -19,6 +19,7 @@ package kafka.server
import java.util.Optional
+import kafka.api
import kafka.api._
import kafka.cluster.BrokerEndPoint
import kafka.log.LogAppendInfo
@@ -80,7 +81,8 @@ class ReplicaFetcherThread(name: String,
// Visible for testing
private[server] val listOffsetRequestVersion: Short =
- if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV1) 4
+ if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_2_IV1) 5
+ else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV1) 4
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1) 3
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV0) 2
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) 1
diff --git a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
index 1ffa695..3bd86f5 100644
--- a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
+++ b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
@@ -17,10 +17,13 @@
package kafka.api
+import org.apache.commons.collections.CollectionUtils
import org.apache.kafka.common.record.RecordVersion
import org.junit.Test
import org.junit.Assert._
+import scala.collection.JavaConverters
+
class ApiVersionTest {
@Test
@@ -84,6 +87,21 @@ class ApiVersionTest {
assertEquals(KAFKA_2_1_IV0, ApiVersion("2.1-IV0"))
assertEquals(KAFKA_2_1_IV1, ApiVersion("2.1-IV1"))
assertEquals(KAFKA_2_1_IV2, ApiVersion("2.1-IV2"))
+
+ assertEquals(KAFKA_2_2_IV1, ApiVersion("2.2"))
+ assertEquals(KAFKA_2_2_IV0, ApiVersion("2.2-IV0"))
+ assertEquals(KAFKA_2_2_IV1, ApiVersion("2.2-IV1"))
+ }
+
+ @Test
+ def testApiVersionUniqueIds(): Unit = {
+ val allIds: Seq[Int] = ApiVersion.allVersions.map(apiVersion => {
+ apiVersion.id
+ })
+
+ val uniqueIds: Set[Int] = allIds.toSet
+
+ assertEquals(allIds.size, uniqueIds.size)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index cfaa147..4b9f656 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -29,7 +29,7 @@ import kafka.server._
import kafka.utils.{CoreUtils, MockScheduler, MockTime, TestUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.ReplicaNotAvailableException
+import org.apache.kafka.common.errors.{ApiException, LeaderNotAvailableException, OffsetNotAvailableException,
ReplicaNotAvailableException}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
@@ -382,6 +382,172 @@ class PartitionTest {
assertEquals(Optional.of(leaderEpoch), timestampAndOffset.leaderEpoch)
}
+ /**
+ * This test checks that after a new leader election, we don't answer any ListOffsetsRequest
until
+ * the HW of the new leader has caught up to its startLogOffset for this epoch. From a
client
+ * perspective this helps guarantee monotonic offsets
+ *
+ * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-207%3A+Offsets+returned+by+ListOffsetsResponse+should+be+monotonically+increasing+even+during+a+partition+leader+change">KIP-207</a>
+ */
+ @Test
+ def testMonotonicOffsetsAfterLeaderChange(): Unit = {
+ val controllerEpoch = 3
+ val leader = brokerId
+ val follower1 = brokerId + 1
+ val follower2 = brokerId + 2
+ val controllerId = brokerId + 3
+ val replicas = List[Integer](leader, follower1, follower2).asJava
+ val isr = List[Integer](leader, follower2).asJava
+ val leaderEpoch = 8
+ val batch1 = TestUtils.records(records = List(
+ new SimpleRecord(10, "k1".getBytes, "v1".getBytes),
+ new SimpleRecord(11,"k2".getBytes, "v2".getBytes)))
+ val batch2 = TestUtils.records(records = List(new SimpleRecord("k3".getBytes, "v1".getBytes),
+ new SimpleRecord(20,"k4".getBytes, "v2".getBytes),
+ new SimpleRecord(21,"k5".getBytes, "v3".getBytes)))
+ val batch3 = TestUtils.records(records = List(
+ new SimpleRecord(30,"k6".getBytes, "v1".getBytes),
+ new SimpleRecord(31,"k7".getBytes, "v2".getBytes)))
+
+ val partition = Partition(topicPartition, time, replicaManager)
+ assertTrue("Expected first makeLeader() to return 'leader changed'",
+ partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch,
leader, leaderEpoch, isr, 1, replicas, true), 0))
+ assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch)
+ assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicas.map(_.brokerId))
+
+ // after makeLeader(() call, partition should know about all the replicas
+ val leaderReplica = partition.getReplica(leader).get
+ val follower1Replica = partition.getReplica(follower1).get
+ val follower2Replica = partition.getReplica(follower2).get
+
+ // append records with initial leader epoch
+ val lastOffsetOfFirstBatch = partition.appendRecordsToLeader(batch1, isFromClient = true).lastOffset
+ partition.appendRecordsToLeader(batch2, isFromClient = true)
+ assertEquals("Expected leader's HW not move", leaderReplica.logStartOffset, leaderReplica.highWatermark.messageOffset)
+
+ // let the follower in ISR move leader's HW to move further but below LEO
+ def readResult(fetchInfo: FetchDataInfo, leaderReplica: Replica): LogReadResult = {
+ LogReadResult(info = fetchInfo,
+ highWatermark = leaderReplica.highWatermark.messageOffset,
+ leaderLogStartOffset = leaderReplica.logStartOffset,
+ leaderLogEndOffset = leaderReplica.logEndOffset.messageOffset,
+ followerLogStartOffset = 0,
+ fetchTimeMs = time.milliseconds,
+ readSize = 10240,
+ lastStableOffset = None)
+ }
+
+ def fetchOffsetsForTimestamp(timestamp: Long, isolation: Option[IsolationLevel]): Either[ApiException,
Option[TimestampAndOffset]] = {
+ try {
+ Right(partition.fetchOffsetForTimestamp(
+ timestamp = timestamp,
+ isolationLevel = isolation,
+ currentLeaderEpoch = Optional.of(partition.getLeaderEpoch),
+ fetchOnlyFromLeader = true
+ ))
+ } catch {
+ case e: ApiException => Left(e)
+ }
+ }
+
+ // Update follower 1
+ partition.updateReplicaLogReadResult(
+ follower1Replica, readResult(FetchDataInfo(LogOffsetMetadata(0), batch1), leaderReplica))
+ partition.updateReplicaLogReadResult(
+ follower1Replica, readResult(FetchDataInfo(LogOffsetMetadata(2), batch2), leaderReplica))
+
+ // Update follower 2
+ partition.updateReplicaLogReadResult(
+ follower2Replica, readResult(FetchDataInfo(LogOffsetMetadata(0), batch1), leaderReplica))
+ partition.updateReplicaLogReadResult(
+ follower2Replica, readResult(FetchDataInfo(LogOffsetMetadata(2), batch2), leaderReplica))
+
+ // At this point, the leader has gotten 5 writes, but followers have only fetched two
+ assertEquals(2, partition.localReplica.get.highWatermark.messageOffset)
+
+ // Get the LEO
+ fetchOffsetsForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP, None) match {
+ case Right(Some(offsetAndTimestamp)) => assertEquals(5, offsetAndTimestamp.offset)
+ case Right(None) => fail("Should have seen some offsets")
+ case Left(e) => fail("Should not have seen an error")
+ }
+
+ // Get the HW
+ fetchOffsetsForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP, Some(IsolationLevel.READ_UNCOMMITTED))
match {
+ case Right(Some(offsetAndTimestamp)) => assertEquals(2, offsetAndTimestamp.offset)
+ case Right(None) => fail("Should have seen some offsets")
+ case Left(e) => fail("Should not have seen an error")
+ }
+
+ // Get a offset beyond the HW by timestamp, get a None
+ assertEquals(Right(None), fetchOffsetsForTimestamp(30, Some(IsolationLevel.READ_UNCOMMITTED)))
+
+ // Make into a follower
+ assertTrue(partition.makeFollower(controllerId,
+ new LeaderAndIsrRequest.PartitionState(controllerEpoch, follower2, leaderEpoch + 1,
isr, 1, replicas, false), 1))
+
+ // Back to leader, this resets the startLogOffset for this epoch (to 2), we're now in
the fault condition
+ assertTrue(partition.makeLeader(controllerId,
+ new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch + 2, isr,
1, replicas, false), 2))
+
+ // Try to get offsets as a client
+ fetchOffsetsForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP, Some(IsolationLevel.READ_UNCOMMITTED))
match {
+ case Right(Some(offsetAndTimestamp)) => fail("Should have failed with OffsetNotAvailable")
+ case Right(None) => fail("Should have seen an error")
+ case Left(e: OffsetNotAvailableException) => // ok
+ case Left(e: ApiException) => fail(s"Expected OffsetNotAvailableException, got $e")
+ }
+
+ // If request is not from a client, we skip the check
+ fetchOffsetsForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP, None) match {
+ case Right(Some(offsetAndTimestamp)) => assertEquals(5, offsetAndTimestamp.offset)
+ case Right(None) => fail("Should have seen some offsets")
+ case Left(e: ApiException) => fail(s"Got ApiException $e")
+ }
+
+ // If we request the earliest timestamp, we skip the check
+ fetchOffsetsForTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP, Some(IsolationLevel.READ_UNCOMMITTED))
match {
+ case Right(Some(offsetAndTimestamp)) => assertEquals(0, offsetAndTimestamp.offset)
+ case Right(None) => fail("Should have seen some offsets")
+ case Left(e: ApiException) => fail(s"Got ApiException $e")
+ }
+
+ // If we request an offset by timestamp earlier than the HW, we are ok
+ fetchOffsetsForTimestamp(11, Some(IsolationLevel.READ_UNCOMMITTED)) match {
+ case Right(Some(offsetAndTimestamp)) =>
+ assertEquals(1, offsetAndTimestamp.offset)
+ assertEquals(11, offsetAndTimestamp.timestamp)
+ case Right(None) => fail("Should have seen some offsets")
+ case Left(e: ApiException) => fail(s"Got ApiException $e")
+ }
+
+ // Request an offset by timestamp beyond the HW, get an error now since we're in a bad
state
+ fetchOffsetsForTimestamp(100, Some(IsolationLevel.READ_UNCOMMITTED)) match {
+ case Right(Some(offsetAndTimestamp)) => fail("Should have failed")
+ case Right(None) => fail("Should have failed")
+ case Left(e: OffsetNotAvailableException) => // ok
+ case Left(e: ApiException) => fail("Should have seen OffsetNotAvailableException,
saw $e")
+ }
+
+
+ // Next fetch from replicas, HW is moved up to 5 (ahead of the LEO)
+ partition.updateReplicaLogReadResult(
+ follower1Replica, readResult(FetchDataInfo(LogOffsetMetadata(5), MemoryRecords.EMPTY),
leaderReplica))
+ partition.updateReplicaLogReadResult(
+ follower2Replica, readResult(FetchDataInfo(LogOffsetMetadata(5), MemoryRecords.EMPTY),
leaderReplica))
+
+ // Error goes away
+ fetchOffsetsForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP, Some(IsolationLevel.READ_UNCOMMITTED))
match {
+ case Right(Some(offsetAndTimestamp)) => assertEquals(5, offsetAndTimestamp.offset)
+ case Right(None) => fail("Should have seen some offsets")
+ case Left(e: ApiException) => fail(s"Got ApiException $e")
+ }
+
+ // Now we see None instead of an error for out of range timestamp
+ assertEquals(Right(None), fetchOffsetsForTimestamp(100, Some(IsolationLevel.READ_UNCOMMITTED)))
+ }
+
+
private def setupPartitionWithMocks(leaderEpoch: Int,
isLeader: Boolean,
log: Log = logManager.getOrCreateLog(topicPartition,
logConfig)): Partition = {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index f04c70f..9b4210e 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -373,9 +373,13 @@ class KafkaApisTest {
val isolationLevel = IsolationLevel.READ_UNCOMMITTED
val currentLeaderEpoch = Optional.of[Integer](15)
- EasyMock.expect(replicaManager.fetchOffsetForTimestamp(tp, ListOffsetRequest.EARLIEST_TIMESTAMP,
- Some(isolationLevel), currentLeaderEpoch, fetchOnlyFromLeader = true))
- .andThrow(error.exception)
+ EasyMock.expect(replicaManager.fetchOffsetForTimestamp(
+ EasyMock.eq(tp),
+ EasyMock.eq(ListOffsetRequest.EARLIEST_TIMESTAMP),
+ EasyMock.eq(Some(isolationLevel)),
+ EasyMock.eq(currentLeaderEpoch),
+ fetchOnlyFromLeader = EasyMock.eq(true))
+ ).andThrow(error.exception)
val capturedResponse = expectNoThrottling()
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
@@ -462,9 +466,13 @@ class KafkaApisTest {
val latestOffset = 15L
val currentLeaderEpoch = Optional.empty[Integer]()
- EasyMock.expect(replicaManager.fetchOffsetForTimestamp(tp, ListOffsetRequest.LATEST_TIMESTAMP,
- Some(isolationLevel), currentLeaderEpoch, fetchOnlyFromLeader = true))
- .andReturn(Some(new TimestampAndOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, latestOffset,
currentLeaderEpoch)))
+ EasyMock.expect(replicaManager.fetchOffsetForTimestamp(
+ EasyMock.eq(tp),
+ EasyMock.eq(ListOffsetRequest.LATEST_TIMESTAMP),
+ EasyMock.eq(Some(isolationLevel)),
+ EasyMock.eq(currentLeaderEpoch),
+ fetchOnlyFromLeader = EasyMock.eq(true))
+ ).andReturn(Some(new TimestampAndOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, latestOffset,
currentLeaderEpoch)))
val capturedResponse = expectNoThrottling()
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
|