kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3108: custom StreamParitioner for Windowed key
Date Fri, 15 Jan 2016 01:20:12 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a3d3d5379 -> 37be6d98d


KAFKA-3108: custom StreamParitioner for Windowed key

guozhangwang

When ```WindowedSerializer``` is specified in ```to(...)``` or ```through(...)``` for a key,
we use ```WindowedStreamPartitioner```.

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #779 from ymatsuda/partitioner


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/37be6d98
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/37be6d98
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/37be6d98

Branch: refs/heads/trunk
Commit: 37be6d98da842512367ab0b31d8f0244afafda92
Parents: a3d3d53
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Thu Jan 14 17:20:08 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Jan 14 17:20:08 2016 -0800

----------------------------------------------------------------------
 .../streams/kstream/internals/KStreamImpl.java  | 10 ++-
 .../kstream/internals/WindowedSerializer.java   |  6 +-
 .../internals/WindowedStreamPartitioner.java    | 52 ++++++++++++
 .../WindowedStreamPartitionerTest.java          | 84 ++++++++++++++++++++
 4 files changed, 150 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/37be6d98/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index f53c0d0..2459f0d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.state.RocksDBWindowStoreSupplier;
 import org.apache.kafka.streams.state.Serdes;
 
@@ -210,11 +211,18 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
         to(topic, null, null);
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer)
{
         String name = topology.newName(SINK_NAME);
+        StreamPartitioner<K, V> streamPartitioner = null;
+
+        if (keySerializer != null && keySerializer instanceof WindowedSerializer)
{
+            WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>)
keySerializer;
+            streamPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object,
V>(windowedSerializer);
+        }
 
-        topology.addSink(name, topic, keySerializer, valSerializer, this.name);
+        topology.addSink(name, topic, keySerializer, valSerializer, streamPartitioner, this.name);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/37be6d98/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
index 4407a5b..0afcad1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
@@ -49,9 +49,13 @@ public class WindowedSerializer<T> implements Serializer<Windowed<T>>
{
         return buf.array();
     }
 
-
     @Override
     public void close() {
         inner.close();
     }
+
+    public byte[] serializeBaseKey(String topic, Windowed<T> data) {
+        return inner.serialize(topic, data.value());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/37be6d98/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
new file mode 100644
index 0000000..10e69cc
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+
+public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>,
V> {
+
+    private final WindowedSerializer<K> serializer;
+
+    public WindowedStreamPartitioner(WindowedSerializer<K> serializer) {
+        this.serializer = serializer;
+    }
+
+    /**
+     * WindowedStreamPartitioner determines the partition number for a message with the given
windowed key and value
+     * and the current number of partitions. The partition number id determined by the original
key of the windowed key
+     * using the same logic as DefaultPartitioner so that the topic is partitioned by the
original key.
+     *
+     * @param windowedKey the key of the message
+     * @param value the value of the message
+     * @param numPartitions the total number of partitions
+     * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default
partitioning logic should be used
+     */
+    public Integer partition(Windowed<K> windowedKey, V value, int numPartitions) {
+        byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey);
+
+        // hash the keyBytes to choose a partition
+        return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
+    }
+
+    private static int toPositive(int number) {
+        return number & 0x7fffffff;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/37be6d98/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
new file mode 100644
index 0000000..1b8cbb8
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+public class WindowedStreamPartitionerTest {
+
+    private String topicName = "topic";
+
+    private IntegerSerializer keySerializer = new IntegerSerializer();
+    private StringSerializer valSerializer = new StringSerializer();
+
+    private List<PartitionInfo> infos = Arrays.asList(
+            new PartitionInfo(topicName, 0, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 1, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 2, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 3, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 4, Node.noNode(), new Node[0], new Node[0]),
+            new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0])
+    );
+
+    private Cluster cluster = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
+
+    @Test
+    public void testCopartitioning() {
+
+        Random rand = new Random();
+
+        DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
+
+        WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(keySerializer);
+        WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(windowedSerializer);
+
+        for (int k = 0; k < 10; k++) {
+            Integer key = rand.nextInt();
+            byte[] keyBytes = keySerializer.serialize(topicName, key);
+
+            String value = key.toString();
+            byte[] valueBytes = valSerializer.serialize(topicName, value);
+
+            Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value,
valueBytes, cluster);
+
+            for (int w = 0; w < 10; w++) {
+                HoppingWindow window = new HoppingWindow(10 * w, 20 * w);
+
+                Windowed<Integer> windowedKey = new Windowed<>(key, window);
+                Integer actual = streamPartitioner.partition(windowedKey, value, infos.size());
+
+                assertEquals(expected, actual);
+            }
+        }
+    }
+
+}


Mime
View raw message