kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject svn commit: r1396425 - in /incubator/kafka: branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Date Wed, 10 Oct 2012 00:06:28 GMT
Author: jjkoshy
Date: Wed Oct 10 00:06:28 2012
New Revision: 1396425

URL: http://svn.apache.org/viewvc?rev=1396425&view=rev
Log:
Fix wildcard consumption to work with greater than one stream; KAFKA-550; patched by Joel
Koshy; reviewed by Jun Rao and Neha Narkhede.

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1396425&r1=1396424&r2=1396425&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Wed Oct 10 00:06:28 2012
@@ -656,29 +656,27 @@ private[kafka] class ZookeeperConsumerCo
     val consumerThreadIdsPerTopic: Map[String, Set[String]] =
       topicCount.getConsumerThreadIdsPerTopic
 
-    /*
-     * This usage of map flatten breaks up consumerThreadIdsPerTopic into
-     * a set of (topic, thread-id) pairs that we then use to construct
-     * the updated (topic, thread-id) -> queues map
-     */
-    implicit def getTopicThreadIds(v: (String, Set[String])): Set[(String, String)] = v._2.map((v._1,
_))
-
-    // iterator over (topic, thread-id) tuples
-    val topicThreadIds: Iterable[(String, String)] =
-      consumerThreadIdsPerTopic.flatten
-
-    // list of (pairs of pairs): e.g., ((topic, thread-id),(queue, stream))
-    val threadQueueStreamPairs = topicCount match {
+    val allQueuesAndStreams = topicCount match {
       case wildTopicCount: WildcardTopicCount =>
-        for (tt <- topicThreadIds; qs <- queuesAndStreams) yield (tt -> qs)
-      case statTopicCount: StaticTopicCount => {
-        require(topicThreadIds.size == queuesAndStreams.size,
-          "Mismatch between thread ID count (%d) and queue count (%d)".format(
-          topicThreadIds.size, queuesAndStreams.size))
-        topicThreadIds.zip(queuesAndStreams)
-      }
+        /*
+         * Wild-card consumption streams share the same queues, so we need to
+         * duplicate the list for the subsequent zip operation.
+         */
+        (1 to consumerThreadIdsPerTopic.keySet.size).flatMap(_ => queuesAndStreams).toList
+      case statTopicCount: StaticTopicCount =>
+        queuesAndStreams
     }
 
+    val topicThreadIds = consumerThreadIdsPerTopic.map {
+      case(topic, threadIds) =>
+        threadIds.map((topic, _))
+    }.flatten
+
+    require(topicThreadIds.size == allQueuesAndStreams.size,
+      "Mismatch between thread ID count (%d) and queue count (%d)"
+      .format(topicThreadIds.size, allQueuesAndStreams.size))
+    val threadQueueStreamPairs = topicThreadIds.zip(allQueuesAndStreams)
+
     threadQueueStreamPairs.foreach(e => {
       val topicThreadId = e._1
       val q = e._2._1

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1396425&r1=1396424&r2=1396425&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Wed Oct 10 00:06:28 2012
@@ -681,29 +681,27 @@ private[kafka] class ZookeeperConsumerCo
     val consumerThreadIdsPerTopic: Map[String, Set[String]] =
       topicCount.getConsumerThreadIdsPerTopic
 
-    /*
-     * This usage of map flatten breaks up consumerThreadIdsPerTopic into
-     * a set of (topic, thread-id) pairs that we then use to construct
-     * the updated (topic, thread-id) -> queues map
-     */
-    implicit def getTopicThreadIds(v: (String, Set[String])): Set[(String, String)] = v._2.map((v._1,
_))
-
-    // iterator over (topic, thread-id) tuples
-    val topicThreadIds: Iterable[(String, String)] =
-      consumerThreadIdsPerTopic.flatten
-
-    // list of (pairs of pairs): e.g., ((topic, thread-id),(queue, stream))
-    val threadQueueStreamPairs = topicCount match {
+    val allQueuesAndStreams = topicCount match {
       case wildTopicCount: WildcardTopicCount =>
-        for (tt <- topicThreadIds; qs <- queuesAndStreams) yield (tt -> qs)
-      case statTopicCount: StaticTopicCount => {
-        require(topicThreadIds.size == queuesAndStreams.size,
-          "Mismatch between thread ID count (%d) and queue count (%d)".format(
-          topicThreadIds.size, queuesAndStreams.size))
-        topicThreadIds.zip(queuesAndStreams)
-      }
+        /*
+         * Wild-card consumption streams share the same queues, so we need to
+         * duplicate the list for the subsequent zip operation.
+         */
+        (1 to consumerThreadIdsPerTopic.keySet.size).flatMap(_ => queuesAndStreams).toList
+      case statTopicCount: StaticTopicCount =>
+        queuesAndStreams
     }
 
+    val topicThreadIds = consumerThreadIdsPerTopic.map {
+      case(topic, threadIds) =>
+        threadIds.map((topic, _))
+    }.flatten
+
+    require(topicThreadIds.size == allQueuesAndStreams.size,
+      "Mismatch between thread ID count (%d) and queue count (%d)"
+      .format(topicThreadIds.size, allQueuesAndStreams.size))
+    val threadQueueStreamPairs = topicThreadIds.zip(allQueuesAndStreams)
+
     threadQueueStreamPairs.foreach(e => {
       val topicThreadId = e._1
       val q = e._2._1



Mime
View raw message