kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: MINOR: Expose window store sequence number
Date Wed, 15 Jun 2016 19:09:06 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 84ca88729 -> 17668e81c


MINOR: Expose window store sequence number

guozhangwang mjsax enothereska

Currently, Kafka Streams does not have a util to get access to the sequence number added to
the key of windows state store changelogs.  I'm interested in exposing it so the the contents
of a changelog topic can be 1) inspected for debugging purposes and 2) saved to text file
and loaded from text file

Author: Roger Hoover <roger.hoover@gmail.com>

Reviewers: Eno Thereska <eno.thereska@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1501 from theduderog/expose-seq-num


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/17668e81
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/17668e81
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/17668e81

Branch: refs/heads/trunk
Commit: 17668e81c95d89f6543657351abc0a18004ecde5
Parents: 84ca887
Author: Roger Hoover <roger.hoover@gmail.com>
Authored: Wed Jun 15 12:09:03 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Wed Jun 15 12:09:03 2016 -0700

----------------------------------------------------------------------
 .../state/internals/WindowStoreUtils.java       |  4 ++
 .../state/internals/WindowStoreUtilsTest.java   | 44 ++++++++++++++++++++
 2 files changed, 48 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/17668e81/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
index 30693e7..309c9c0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
@@ -62,4 +62,8 @@ public class WindowStoreUtils {
     public static long timestampFromBinaryKey(byte[] binaryKey) {
         return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE);
     }
+
+    public static int sequenceNumberFromBinaryKey(byte[] binaryKey) {
+        return ByteBuffer.wrap(binaryKey).getInt(binaryKey.length - SEQNUM_SIZE);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/17668e81/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreUtilsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreUtilsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreUtilsTest.java
new file mode 100644
index 0000000..e0cb3ae
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreUtilsTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class WindowStoreUtilsTest {
+    protected StateSerdes<String, String> serdes = new StateSerdes<>("dummy",
new Serdes.StringSerde(), new Serdes.StringSerde());
+
+    @Test
+    public void testSerialization() throws Exception {
+        final String key = "key1";
+        final long timestamp = 99L;
+        final int seqNum = 3;
+        byte[] bytes = WindowStoreUtils.toBinaryKey(key, timestamp, seqNum, serdes);
+        final String parsedKey = WindowStoreUtils.keyFromBinaryKey(bytes, serdes);
+        final long parsedTs = WindowStoreUtils.timestampFromBinaryKey(bytes);
+        final int parsedSeqNum = WindowStoreUtils.sequenceNumberFromBinaryKey(bytes);
+        assertEquals(key, parsedKey);
+        assertEquals(timestamp, parsedTs);
+        assertEquals(seqNum, parsedSeqNum);
+    }
+
+}
\ No newline at end of file


Mime
View raw message