kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: HOTFIX: do not call partitioner if num partitions is non-positive
Date Fri, 30 Sep 2016 16:36:50 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 43c30b8c3 -> 429224300


HOTFIX: do not call partitioner if num partitions is non-positive

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Eno Thereska, Damian Guy

Closes #1929 from guozhangwang/KMinor-check-zero-num-partitions and squashes the following
commits:

c8edc2d [Guozhang Wang] fix unit test
3127f9c [Guozhang Wang] do not call partitioner if num partitions is non-positive


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/42922430
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/42922430
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/42922430

Branch: refs/heads/trunk
Commit: 4292243009c066c9ab5477ce886d8eb00534587e
Parents: 43c30b8
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Fri Sep 30 09:36:46 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Sep 30 09:36:46 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/streams/processor/internals/RecordCollector.java  | 2 +-
 .../java/org/apache/kafka/test/ProcessorTopologyTestDriver.java    | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/42922430/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index 45687c8..cd5ee1c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -73,7 +73,7 @@ public class RecordCollector {
         Integer partition = record.partition();
         if (partition == null && partitioner != null) {
             List<PartitionInfo> partitions = this.producer.partitionsFor(record.topic());
-            if (partitions != null)
+            if (partitions != null && partitions.size() > 0)
                 partition = partitioner.partition(record.key(), record.value(), partitions.size());
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/42922430/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 0db69be..83a9092 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -158,7 +158,7 @@ public class ProcessorTopologyTestDriver {
         producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, bytesSerializer)
{
             @Override
             public List<PartitionInfo> partitionsFor(String topic) {
-                return Collections.emptyList();
+                return Collections.singletonList(new PartitionInfo(topic, 0, null, null,
null));
             }
         };
         restoreStateConsumer = createRestoreConsumer(id, storeNames);


Mime
View raw message