kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Tweak implementation of `FetchRequest.shuffle` and upgrade.html improvements
Date Tue, 04 Oct 2016 01:18:30 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 7115c66ae -> cf0bf7c7a


MINOR: Tweak implementation of `FetchRequest.shuffle` and upgrade.html improvements

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #1955 from ijuma/kip-74-follow-up


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cf0bf7c7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cf0bf7c7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cf0bf7c7

Branch: refs/heads/trunk
Commit: cf0bf7c7a24aa5a19069b6e8456353c9b498dffc
Parents: 7115c66
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Tue Oct 4 02:18:19 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Oct 4 02:18:19 2016 +0100

----------------------------------------------------------------------
 .../src/main/scala/kafka/api/FetchRequest.scala | 10 +++-
 .../scala/unit/kafka/api/FetchRequestTest.scala | 63 ++++++++++++++++++++
 docs/upgrade.html                               | 18 ++++--
 3 files changed, 85 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cf0bf7c7/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index f100d4b..3c380c9 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -65,8 +65,14 @@ object FetchRequest {
     FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, maxBytes,
Vector(pairs:_*))
   }
 
-  def shuffle(requestInfo: Seq[(TopicAndPartition, PartitionFetchInfo)]): Seq[(TopicAndPartition,
PartitionFetchInfo)] =
-    random.shuffle(requestInfo)
+  def shuffle(requestInfo: Seq[(TopicAndPartition, PartitionFetchInfo)]): Seq[(TopicAndPartition,
PartitionFetchInfo)] = {
+    val groupedByTopic = requestInfo.groupBy { case (tp, _) => tp.topic }.map { case (topic,
values) =>
+      topic -> random.shuffle(values)
+    }
+    random.shuffle(groupedByTopic.toSeq).flatMap { case (topic, partitions) =>
+      partitions.map { case (tp, fetchInfo) => tp -> fetchInfo }
+    }
+  }
 
   def batchByTopic[T](s: Seq[(TopicAndPartition, T)]): Seq[(String, Seq[(Int, T)])] = {
     val result = new ArrayBuffer[(String, ArrayBuffer[(Int, T)])]

http://git-wip-us.apache.org/repos/asf/kafka/blob/cf0bf7c7/core/src/test/scala/unit/kafka/api/FetchRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/api/FetchRequestTest.scala
new file mode 100644
index 0000000..c2bdf49
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/api/FetchRequestTest.scala
@@ -0,0 +1,63 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.api
+
+import kafka.common.TopicAndPartition
+import org.junit.Assert.{assertEquals, assertNotEquals}
+import org.junit.Test
+
+class FetchRequestTest {
+
+  @Test
+  def testShuffle() {
+    val seq = (0 to 100).map { i =>
+      val topic = s"topic${i % 10}"
+      (TopicAndPartition(topic, i / 10), PartitionFetchInfo(i, 50))
+    }
+    val shuffled = FetchRequest.shuffle(seq)
+    assertEquals(seq.size, shuffled.size)
+    assertNotEquals(seq, shuffled)
+
+    seq.foreach { case (tp1, fetchInfo1) =>
+      shuffled.foreach { case (tp2, fetchInfo2) =>
+        if (tp1 == tp2)
+          assertEquals(fetchInfo1, fetchInfo2)
+      }
+    }
+
+    val topics = seq.map { case (TopicAndPartition(t, _), _) => t }.distinct
+    topics.foreach { topic =>
+      val startIndex = shuffled.indexWhere { case (tp, _) => tp.topic == topic }
+      val endIndex = shuffled.lastIndexWhere { case (tp, _) => tp.topic == topic }
+      // all partitions for a given topic should appear in sequence
+      assertEquals(Set(topic), shuffled.slice(startIndex, endIndex + 1).map { case (tp, _)
=> tp.topic }.toSet)
+    }
+
+    val shuffled2 = FetchRequest.shuffle(seq)
+    assertNotEquals(shuffled, shuffled2)
+    assertNotEquals(seq, shuffled2)
+  }
+
+  @Test
+  def testShuffleWithSingleTopic() {
+    val seq = (0 to 50).map(i => (TopicAndPartition("topic", i), PartitionFetchInfo(i,
70)))
+    val shuffled = FetchRequest.shuffle(seq)
+    assertEquals(seq.size, shuffled.size)
+    assertNotEquals(seq, shuffled)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/cf0bf7c7/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 372bf14..4f9d4f2 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -30,12 +30,12 @@ Note: Because new protocols are introduced, it is important to upgrade
your Kafk
 <ol>
     <li> Update server.properties file on all brokers and add the following properties:
         <ul>
-            <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2 or
0.9.0.0).</li>
+            <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0.0
or 0.10.0.0).</li>
             <li>log.message.format.version=CURRENT_KAFKA_VERSION  (See <a href="#upgrade_10_performance_impact">potential
performance impact following the upgrade</a> for the details on what this configuration
does.)
         </ul>
     </li>
     <li> Upgrade the brokers. This can be done a broker at a time by simply bringing
it down, updating the code, and restarting it. </li>
-    <li> Once the entire cluster is upgraded, bump the protocol version by editing
inter.broker.protocol.version and setting it to 0.10.1.0. NOTE: If your previous message format
version is before 0.10.0, you shouldn't touch log.message.format.version yet - this parameter
should only change once all consumers have been upgraded to on or above 0.10.0.0 </li>
+    <li> Once the entire cluster is upgraded, bump the protocol version by editing
inter.broker.protocol.version and setting it to 0.10.1.0. NOTE: If your previous message format
version is before 0.10.0, you shouldn't touch log.message.format.version yet - this parameter
should only change once all consumers have been upgraded to 0.10.0.0 or later.</li>
     <li> Restart the brokers one by one for the new protocol version to take effect.
</li>
     <li> Once all consumers have been upgraded to 0.10.0, change log.message.format.version
to 0.10.1 on each broker and restart them one by one.
     </li>
@@ -69,12 +69,22 @@ Note: Because new protocols are introduced, it is important to upgrade
your Kafk
          error code will be returned. This may cause unexpected timeouts or delays when using
the producer and consumer since
          Kafka clients will typically retry automatically on unknown topic errors. You should
consult the client logs if you
          suspect this could be happening.</li>
+    <li> Fetch responses have a size limit by default (50 MB for consumers and 10 MB
for replication). The existing per partition limits also apply (1 MB for consumers
+         and replication). Note that neither of these limits is an absolute maximum as explained
in the next point. </li>
+    <li> Consumers and replicas can make progress if a message larger than the response/partition
size limit is found. More concretely, if the first message in the
+         first non-empty partition of the fetch is larger than either or both limits, the
message will still be returned. </li>
+    <li> Overloaded constructors were added to <code>kafka.api.FetchRequest</code>
and <code>kafka.javaapi.FetchRequest</code> to allow the caller to specify the
+         order of the partitions (since order is significant in v3). The previously existing
constructors were deprecated and the partitions are shuffled before
+         the request is sent to avoid starvation issues. </li>
 </ul>
 
 <h5><a id="upgrade_1010_new_protocols" href="#upgrade_1010_new_protocols">New
Protocol Versions</a></h5>
 <ul>
-    <li> ListOffsetRequest v1 is introduced and used by default to support accurate
offset search based on timestamp.
-    <li> MetadataRequest/Response v2 has been introduced. v2 adds a new field "cluster_id"
to MetadataResponse.
+    <li> ListOffsetRequest v1 supports accurate offset search based on timestamps.
</li>
+    <li> MetadataResponse v2 introduces a new field: "cluster_id". </li>
+    <li> FetchRequest v3 supports limiting the response size (in addition to the existing
per partition limit), it returns messages
+         bigger than the limits if required to make progress and the order of partitions
in the request is now significant. </li>
+    <li> JoinGroup v1 introduces a new field: "rebalance_timeout". </li>
 </ul>
 
 <h4><a id="upgrade_10" href="#upgrade_10">Upgrading from 0.8.x or 0.9.x to 0.10.0.0</a></h4>


Mime
View raw message