kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2056; Fix transient testRangePartitionAssignor failure; reviewed by Guozhang Wang
Date Wed, 15 Apr 2015 18:17:16 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9e5d481c7 -> bfbd3acbf


KAFKA-2056; Fix transient testRangePartitionAssignor failure; reviewed by Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bfbd3acb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bfbd3acb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bfbd3acb

Branch: refs/heads/trunk
Commit: bfbd3acbf71fa1b913de154f7ffa12aead28a2d2
Parents: 9e5d481
Author: Fangmin Lv <lvfangmin@gmail.com>
Authored: Wed Apr 15 11:16:58 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Apr 15 11:16:58 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/consumer/PartitionAssignor.scala     | 6 ++++++
 .../main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 4 +---
 2 files changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bfbd3acb/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
index 4afda8b..849284a 100755
--- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
@@ -112,6 +112,9 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging {
         assignmentForConsumer += (topicPartition -> threadId)
       })
     }
+
+    // assign Map.empty for the consumers which are not associated with topic partitions
+    ctx.consumers.foreach(consumerId => partitionAssignment.getAndMaybePut(consumerId))
     partitionAssignment
   }
 }
@@ -164,6 +167,9 @@ class RangeAssignor() extends PartitionAssignor with Logging {
         }
       }
     }
+
+    // assign Map.empty for the consumers which are not associated with topic partitions
+    ctx.consumers.foreach(consumerId => partitionAssignment.getAndMaybePut(consumerId))
     partitionAssignment
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bfbd3acb/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index e250b94..aa8d940 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -684,9 +684,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         releasePartitionOwnership(topicRegistry)
         val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics,
zkClient)
         val globalPartitionAssignment = partitionAssignor.assign(assignmentContext)
-        val partitionAssignment = Option(globalPartitionAssignment.get(assignmentContext.consumerId)).getOrElse(
-          mutable.HashMap.empty[TopicAndPartition, ConsumerThreadId]
-        )
+        val partitionAssignment = globalPartitionAssignment.get(assignmentContext.consumerId)
         val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]](
           valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo]))
 


Mime
View raw message