kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] kafka git commit: KAFKA-4148; Support ListOffsetRequest v1 and search offsets by timestamp in consumer (KIP-79)
Date Tue, 20 Sep 2016 01:38:55 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk cf8f4a713 -> eaaa433fc


http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 78e00df..1b0a127 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -24,17 +24,16 @@ import kafka.admin.AdminUtils
 import kafka.cluster.BrokerEndPoint
 import kafka.log.LogConfig
 import kafka.message.ByteBufferMessageSet
-import kafka.api.{KAFKA_0_9_0, KAFKA_0_10_0_IV0, KAFKA_0_10_1_IV1}
+import kafka.api.{KAFKA_0_10_0_IV0, KAFKA_0_10_1_IV1, KAFKA_0_10_1_IV2, KAFKA_0_9_0}
 import kafka.common.{KafkaStorageException, TopicAndPartition}
 import ReplicaFetcherThread._
-
-import org.apache.kafka.clients.{ManualMetadataUpdater, NetworkClient, ClientRequest, ClientResponse}
-import org.apache.kafka.common.network.{LoginType, Selectable, ChannelBuilders, NetworkReceive,
Selector, Mode}
-import org.apache.kafka.common.requests.{ListOffsetResponse, FetchResponse, RequestSend,
AbstractRequest, ListOffsetRequest}
+import org.apache.kafka.clients.{ClientRequest, ClientResponse, ManualMetadataUpdater, NetworkClient}
+import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, NetworkReceive,
Selectable, Selector}
+import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetRequest,
ListOffsetResponse, RequestSend}
 import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.{Errors, ApiKeys}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.utils.Time
 
 import scala.collection.{JavaConverters, Map}
@@ -263,15 +262,23 @@ class ReplicaFetcherThread(name: String,
   }
 
   private def earliestOrLatestOffset(topicPartition: TopicPartition, earliestOrLatest: Long,
consumerId: Int): Long = {
-    val partitions = Map(
-      topicPartition -> new ListOffsetRequest.PartitionData(earliestOrLatest, 1)
-    )
-    val request = new ListOffsetRequest(consumerId, partitions.asJava)
-    val clientResponse = sendRequest(ApiKeys.LIST_OFFSETS, None, request)
+    val (request, apiVersion) =
+      if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) {
+        val partitions = Map(topicPartition -> earliestOrLatest)
+        (new ListOffsetRequest(partitions.asJava, consumerId), 1)
+      } else {
+        val partitions = Map(topicPartition -> new ListOffsetRequest.PartitionData(earliestOrLatest,
1))
+        (new ListOffsetRequest(consumerId, partitions.asJava), 0)
+      }
+    val clientResponse = sendRequest(ApiKeys.LIST_OFFSETS, Some(apiVersion.toShort), request)
     val response = new ListOffsetResponse(clientResponse.responseBody)
     val partitionData = response.responseData.get(topicPartition)
     Errors.forCode(partitionData.errorCode) match {
-      case Errors.NONE => partitionData.offsets.asScala.head
+      case Errors.NONE =>
+        if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2)
+          partitionData.offset
+        else
+          partitionData.offsets.get(0)
       case errorCode => throw errorCode.exception
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index b449a69..c83b54f 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -14,7 +14,8 @@ package kafka.api
 
 
 import java.util
-import java.util.Properties
+import java.util.{Collections, Properties, Locale}
+
 import java.util.regex.Pattern
 
 import kafka.log.LogConfig
@@ -26,7 +27,7 @@ import org.apache.kafka.common.serialization.{ByteArraySerializer, StringDeseria
 import org.apache.kafka.test.{MockConsumerInterceptor, MockProducerInterceptor}
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{InvalidTopicException}
+import org.apache.kafka.common.errors.InvalidTopicException
 import org.apache.kafka.common.record.{CompressionType, TimestampType}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.junit.Assert._
@@ -34,7 +35,6 @@ import org.junit.Test
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.Buffer
-import java.util.Locale
 
 /* We have some tests in this class instead of `BaseConsumerTest` in order to keep the build
time under control. */
 class PlaintextConsumerTest extends BaseConsumerTest {
@@ -973,6 +973,84 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   @Test
+  def testOffsetsForTimes() {
+    val numParts = 2
+    val topic1 = "part-test-topic-1"
+    val topic2 = "part-test-topic-2"
+    val topic3 = "part-test-topic-3"
+    val props = new Properties()
+    props.setProperty(LogConfig.MessageFormatVersionProp, "0.9.0")
+    TestUtils.createTopic(this.zkUtils, topic1, numParts, 1, this.servers)
+    // Topic2 is in old message format.
+    TestUtils.createTopic(this.zkUtils, topic2, numParts, 1, this.servers, props)
+    TestUtils.createTopic(this.zkUtils, topic3, numParts, 1, this.servers)
+
+    val consumer = this.consumers.head
+
+    // Test negative target time
+    intercept[IllegalArgumentException](
+      consumer.offsetsForTimes(Collections.singletonMap(new TopicPartition(topic1, 0), -1)))
+
+    val timestampsToSearch = new util.HashMap[TopicPartition, java.lang.Long]()
+    var i = 0
+    for (topic <- List(topic1, topic2, topic3)) {
+      for (part <- 0 until numParts) {
+        val tp = new TopicPartition(topic, part)
+        // In sendRecords(), each message will have key, value and timestamp equal to the
sequence number.
+        sendRecords(100, tp)
+        timestampsToSearch.put(tp, i * 20)
+        i += 1
+      }
+    }
+    // The timestampToSearch map should contain:
+    // (topic1Partition0 -> 0,
+    //  topic1Partitoin1 -> 20,
+    //  topic2Partition0 -> 40,
+    //  topic2Partition1 -> 60,
+    //  topic3Partition0 -> 80,
+    //  topic3Partition1 -> 100)
+    val timestampOffsets = consumer.offsetsForTimes(timestampsToSearch)
+    assertEquals(0, timestampOffsets.get(new TopicPartition(topic1, 0)).offset())
+    assertEquals(0, timestampOffsets.get(new TopicPartition(topic1, 0)).timestamp())
+    assertEquals(20, timestampOffsets.get(new TopicPartition(topic1, 1)).offset())
+    assertEquals(20, timestampOffsets.get(new TopicPartition(topic1, 1)).timestamp())
+    assertEquals("null should be returned when message format is 0.9.0",
+      null, timestampOffsets.get(new TopicPartition(topic2, 0)))
+    assertEquals("null should be returned when message format is 0.9.0",
+      null, timestampOffsets.get(new TopicPartition(topic2, 1)))
+    assertEquals(80, timestampOffsets.get(new TopicPartition(topic3, 0)).offset())
+    assertEquals(80, timestampOffsets.get(new TopicPartition(topic3, 0)).timestamp())
+    assertEquals(null, timestampOffsets.get(new TopicPartition(topic3, 1)))
+  }
+
+  @Test
+  def testEarliestOrLatestOffsets() {
+    val topic0 = "topicWithNewMessageFormat"
+    val topic1 = "topicWithOldMessageFormat"
+    createTopicAndSendRecords(topicName = topic0, numPartitions = 2, recordsPerPartition
= 100)
+    val props = new Properties()
+    props.setProperty(LogConfig.MessageFormatVersionProp, "0.9.0")
+    TestUtils.createTopic(this.zkUtils, topic1, numPartitions = 1, replicationFactor = 1,
this.servers, props)
+    sendRecords(100, new TopicPartition(topic1, 0))
+
+    val t0p0 = new TopicPartition(topic0, 0)
+    val t0p1 = new TopicPartition(topic0, 1)
+    val t1p0 = new TopicPartition(topic1, 0)
+    val partitions = Set(t0p0, t0p1, t1p0).asJava
+    val consumer = this.consumers.head
+
+    val earliests = consumer.beginningOffsets(partitions)
+    assertEquals(0L, earliests.get(t0p0))
+    assertEquals(0L, earliests.get(t0p1))
+    assertEquals(0L, earliests.get(t1p0))
+
+    val latests = consumer.endOffsets(partitions)
+    assertEquals(100L, latests.get(t0p0))
+    assertEquals(100L, latests.get(t0p1))
+    assertEquals(100L, latests.get(t1p0))
+  }
+
+  @Test
   def testUnsubscribeTopic() {
     this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout
quickly to avoid slow test
     this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 49feebd..04d46de 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -199,17 +199,17 @@ class LogSegmentTest {
 
     assertEquals(490, seg.largestTimestamp)
     // Search for an indexed timestamp
-    assertEquals(42, seg.findOffsetByTimestamp(420).get)
-    assertEquals(43, seg.findOffsetByTimestamp(421).get)
+    assertEquals(42, seg.findOffsetByTimestamp(420).get.offset)
+    assertEquals(43, seg.findOffsetByTimestamp(421).get.offset)
     // Search for an un-indexed timestamp
-    assertEquals(43, seg.findOffsetByTimestamp(430).get)
-    assertEquals(44, seg.findOffsetByTimestamp(431).get)
+    assertEquals(43, seg.findOffsetByTimestamp(430).get.offset)
+    assertEquals(44, seg.findOffsetByTimestamp(431).get.offset)
     // Search beyond the last timestamp
-    assertEquals(50, seg.findOffsetByTimestamp(491).get)
+    assertEquals(None, seg.findOffsetByTimestamp(491))
     // Search before the first indexed timestamp
-    assertEquals(41, seg.findOffsetByTimestamp(401).get)
+    assertEquals(41, seg.findOffsetByTimestamp(401).get.offset)
     // Search before the first timestamp
-    assertEquals(40, seg.findOffsetByTimestamp(399).get)
+    assertEquals(40, seg.findOffsetByTimestamp(399).get.offset)
   }
 
   /**
@@ -251,7 +251,7 @@ class LogSegmentTest {
     TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
     seg.recover(64*1024)
     for(i <- 0 until 100)
-      assertEquals(i, seg.read(i, Some(i+1), 1024).messageSet.head.offset)
+      assertEquals(i, seg.read(i, Some(i + 1), 1024).messageSet.head.offset)
   }
 
   /**
@@ -267,8 +267,9 @@ class LogSegmentTest {
     TestUtils.writeNonsenseToFile(timeIndexFile, 5, timeIndexFile.length.toInt)
     seg.recover(64*1024)
     for(i <- 0 until 100) {
-      assertEquals(i, seg.findOffsetByTimestamp(i * 10).get)
-      assertEquals(i + 1, seg.findOffsetByTimestamp(i * 10 + 1).get)
+      assertEquals(i, seg.findOffsetByTimestamp(i * 10).get.offset)
+      if (i < 99)
+        assertEquals(i + 1, seg.findOffsetByTimestamp(i * 10 + 1).get.offset)
     }
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index e496853..9ecb651 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -625,9 +625,9 @@ class LogTest extends JUnitSuite {
     for(i <- 0 until numMessages) {
       assertEquals(i, log.read(i, 100, None).messageSet.head.offset)
       if (i == 0)
-        assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds
+ i * 10))
+        assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds
+ i * 10).get.offset)
       else
-        assertEquals(i, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10))
+        assertEquals(i, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset)
     }
     log.close()
   }
@@ -701,9 +701,9 @@ class LogTest extends JUnitSuite {
     for(i <- 0 until numMessages) {
       assertEquals(i, log.read(i, 100, None).messageSet.head.offset)
       if (i == 0)
-        assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds
+ i * 10))
+        assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds
+ i * 10).get.offset)
       else
-        assertEquals(i, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10))
+        assertEquals(i, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset)
     }
     log.close()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index d8c2b4e..629babb 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -20,8 +20,30 @@
 <h3><a id="upgrade" href="#upgrade">1.5 Upgrading From Previous Versions</a></h3>
 
 <h4><a id="upgrade_10_1" href="#upgrade_10_1">Upgrading from 0.10.0.X to 0.10.1.0</a></h4>
-0.10.1.0 is compatible with 0.10.0.X in terms of wire protocol. The upgrade can be done one
broker at a time by simply bringing it down, updating the code, and restarting it.
+0.10.1.0 has wire protocol changes. By following the recommended rolling upgrade plan below,
you guarantee no downtime during the upgrade.
 However, please notice the <a href="#upgrade_10_1_breaking">Potential breaking changes
in 0.10.1.0</a> before upgrade.
+<br>
+Note: Because new protocols are introduced, it is important to upgrade your Kafka clusters
before upgrading your clients.
+
+<p><b>For a rolling upgrade:</b></p>
+
+<ol>
+    <li> Update server.properties file on all brokers and add the following properties:
+        <ul>
+            <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2 or
0.9.0.0).</li>
+            <li>log.message.format.version=CURRENT_KAFKA_VERSION  (See <a href="#upgrade_10_performance_impact">potential
performance impact following the upgrade</a> for the details on what this configuration
does.)
+        </ul>
+    </li>
+    <li> Upgrade the brokers. This can be done a broker at a time by simply bringing
it down, updating the code, and restarting it. </li>
+    <li> Once the entire cluster is upgraded, bump the protocol version by editing
inter.broker.protocol.version and setting it to 0.10.1.0. NOTE: If your previous message format
version is before 0.10.0, you shouldn't touch log.message.format.version yet - this parameter
should only change once all consumers have been upgraded to on or above 0.10.0.0 </li>
+    <li> Restart the brokers one by one for the new protocol version to take effect.
</li>
+    <li> Once all consumers have been upgraded to 0.10.0, change log.message.format.version
to 0.10.1 on each broker and restart them one by one.
+    </li>
+</ol>
+
+<p><b>Note:</b> If you are willing to accept downtime, you can simply take
all the brokers down, update the code and start all of them. They will start with the new
protocol by default.
+
+<p><b>Note:</b> Bumping the protocol version and restarting can be done
any time after the brokers were upgraded. It does not have to be immediately after.
 
 <h5><a id="upgrade_10_1_breaking" href="#upgrade_10_1_breaking">Potential breaking
changes in 0.10.1.0</a></h5>
 <ul>
@@ -30,6 +52,7 @@ However, please notice the <a href="#upgrade_10_1_breaking">Potential
breaking c
     <li> The open file handlers of 0.10.0 will increase by ~33% because of the addition
of time index files for each segment.</li>
     <li> The time index and offset index share the same index size configuration. Since
each time index entry is 1.5x the size of offset index entry. User may need to increase log.index.size.max.bytes
to avoid potential frequent log rolling. </li>
     <li> Due to the increased number of index files, on some brokers with large amount
the log segments (e.g. >15K), the log loading process during the broker startup could be
longer. Based on our experiment, setting the num.recovery.threads.per.data.dir to one may
reduce the log loading time. </li>
+    <li> ListOffsetRequest v1 is introduced and used by default to support accurate
offset search based on timestamp.
 </ul>
 
 <h5><a id="upgrade_1010_notable" href="#upgrade_1010_notable">Notable changes
in 0.10.1.0</a></h5>
@@ -37,6 +60,7 @@ However, please notice the <a href="#upgrade_10_1_breaking">Potential
breaking c
     <li> The new Java consumer is no longer in beta and we recommend it for all new
development. The old Scala consumers are still supported, but they will be deprecated in the
next release
          and will be removed in a future major release. </li>
     <li> The BrokerState "RunningAsController" (value 4) has been removed. Due to a
bug, a broker would only be in this state briefly before transitioning out of it and hence
the impact of the removal should be minimal. The recommended way to detect if a given broker
is the controller is via the kafka.controller:type=KafkaController,name=ActiveControllerCount
metric. </li>
+    <li> The new Java Consumer now allows users to search offsets by timestamp on partitions.
 </ul>
 
 <h4><a id="upgrade_10" href="#upgrade_10">Upgrading from 0.8.x or 0.9.x to 0.10.0.0</a></h4>


Mime
View raw message