kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [1/2] kafka git commit: KAFKA-3505: Fix punctuate generated record metadata
Date Fri, 08 Apr 2016 16:00:04 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 8b9b07e5d -> 3a58407e2


http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 51276f3..7c158e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -131,21 +131,21 @@ public class KTableSourceTest {
             driver.process(topic1, "B", "01");
             driver.process(topic1, "C", "01");
 
-            proc1.checkAndClearResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
+            proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
 
             driver.process(topic1, "A", "02");
             driver.process(topic1, "B", "02");
 
-            proc1.checkAndClearResult("A:(02<-null)", "B:(02<-null)");
+            proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)");
 
             driver.process(topic1, "A", "03");
 
-            proc1.checkAndClearResult("A:(03<-null)");
+            proc1.checkAndClearProcessResult("A:(03<-null)");
 
             driver.process(topic1, "A", null);
             driver.process(topic1, "B", null);
 
-            proc1.checkAndClearResult("A:(null<-null)", "B:(null<-null)");
+            proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
 
         } finally {
             Utils.delete(stateDir);
@@ -176,21 +176,21 @@ public class KTableSourceTest {
             driver.process(topic1, "B", "01");
             driver.process(topic1, "C", "01");
 
-            proc1.checkAndClearResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
+            proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
 
             driver.process(topic1, "A", "02");
             driver.process(topic1, "B", "02");
 
-            proc1.checkAndClearResult("A:(02<-01)", "B:(02<-01)");
+            proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)");
 
             driver.process(topic1, "A", "03");
 
-            proc1.checkAndClearResult("A:(03<-02)");
+            proc1.checkAndClearProcessResult("A:(03<-02)");
 
             driver.process(topic1, "A", null);
             driver.process(topic1, "B", null);
 
-            proc1.checkAndClearResult("A:(null<-03)", "B:(null<-02)");
+            proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)");
 
         } finally {
             Utils.delete(stateDir);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 5bf1b5e..a1c07af 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -60,17 +59,17 @@ public class PartitionGroupTest {
 
         // add three 3 records with timestamp 1, 3, 5 to partition-1
         List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, recordKey, recordValue));
+            new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 3L, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue));
 
         group.addRawRecords(partition1, list1);
 
         // add three 3 records with timestamp 2, 4, 6 to partition-2
         List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, 0L,
0, 0, recordKey, recordValue));
+            new ConsumerRecord<>("topic", 2, 2L, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 2, 4L, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 2, 6L, recordKey, recordValue));
 
         group.addRawRecords(partition2, list2);
 
@@ -82,7 +81,7 @@ public class PartitionGroupTest {
         StampedRecord record;
         PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
 
-        // get one record
+        // get one record, now the time should be advanced
         record = group.nextRecord(info);
         assertEquals(partition1, info.partition());
         assertEquals(1L, record.timestamp);
@@ -99,5 +98,72 @@ public class PartitionGroupTest {
         assertEquals(2, group.numBuffered(partition1));
         assertEquals(2, group.numBuffered(partition2));
         assertEquals(3L, group.timestamp());
+
+        // add three 3 records with timestamp 2, 4, 6 to partition-1 again
+        List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
+            new ConsumerRecord<>("topic", 1, 2L, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 4L, recordKey, recordValue));
+
+        group.addRawRecords(partition1, list3);
+
+        assertEquals(6, group.numBuffered());
+        assertEquals(4, group.numBuffered(partition1));
+        assertEquals(2, group.numBuffered(partition2));
+        assertEquals(3L, group.timestamp());
+
+        // get one record, time should not be advanced
+        record = group.nextRecord(info);
+        assertEquals(partition1, info.partition());
+        assertEquals(3L, record.timestamp);
+        assertEquals(5, group.numBuffered());
+        assertEquals(3, group.numBuffered(partition1));
+        assertEquals(2, group.numBuffered(partition2));
+        assertEquals(3L, group.timestamp());
+
+        // get one more record, now time should be advanced
+        record = group.nextRecord(info);
+        assertEquals(partition1, info.partition());
+        assertEquals(5L, record.timestamp);
+        assertEquals(4, group.numBuffered());
+        assertEquals(2, group.numBuffered(partition1));
+        assertEquals(2, group.numBuffered(partition2));
+        assertEquals(3L, group.timestamp());
+
+        // get one more record, time should not be advanced
+        record = group.nextRecord(info);
+        assertEquals(partition1, info.partition());
+        assertEquals(2L, record.timestamp);
+        assertEquals(3, group.numBuffered());
+        assertEquals(1, group.numBuffered(partition1));
+        assertEquals(2, group.numBuffered(partition2));
+        assertEquals(4L, group.timestamp());
+
+        // get one more record, now time should be advanced
+        record = group.nextRecord(info);
+        assertEquals(partition2, info.partition());
+        assertEquals(4L, record.timestamp);
+        assertEquals(2, group.numBuffered());
+        assertEquals(1, group.numBuffered(partition1));
+        assertEquals(1, group.numBuffered(partition2));
+        assertEquals(4L, group.timestamp());
+
+        // get one more record, time should not be advanced
+        record = group.nextRecord(info);
+        assertEquals(partition1, info.partition());
+        assertEquals(4L, record.timestamp);
+        assertEquals(1, group.numBuffered());
+        assertEquals(0, group.numBuffered(partition1));
+        assertEquals(1, group.numBuffered(partition2));
+        assertEquals(4L, group.timestamp());
+
+        // get one more record, time should not be advanced
+        record = group.nextRecord(info);
+        assertEquals(partition2, info.partition());
+        assertEquals(6L, record.timestamp);
+        assertEquals(0, group.numBuffered());
+        assertEquals(0, group.numBuffered(partition1));
+        assertEquals(0, group.numBuffered(partition2));
+        assertEquals(4L, group.timestamp());
+
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 33fa5c4..dd48947 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.test.MockProcessorNode;
 import org.apache.kafka.test.MockSourceNode;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.junit.Test;
@@ -46,6 +47,7 @@ import java.util.Properties;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class StreamTaskTest {
@@ -58,10 +60,12 @@ public class StreamTaskTest {
     private final TopicPartition partition2 = new TopicPartition("topic2", 1);
     private final Set<TopicPartition> partitions = Utils.mkSet(partition1, partition2);
 
-    private final MockSourceNode source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
-    private final MockSourceNode source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
+    private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(intDeserializer,
intDeserializer);
+    private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(intDeserializer,
intDeserializer);
+    private final MockProcessorNode<Integer, Integer>  processor = new MockProcessorNode<>(10L);
+
     private final ProcessorTopology topology = new ProcessorTopology(
-            Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2),
+            Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2, (ProcessorNode)
processor),
             new HashMap<String, SourceNode>() {
                 {
                     put("topic1", source1);
@@ -94,6 +98,8 @@ public class StreamTaskTest {
     @Before
     public void setup() {
         consumer.assign(Arrays.asList(partition1, partition2));
+        source1.addChild(processor);
+        source2.addChild(processor);
     }
 
     @SuppressWarnings("unchecked")
@@ -211,6 +217,73 @@ public class StreamTaskTest {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testMaybePunctuate() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            StreamsConfig config = createConfig(baseDir);
+            StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions,
topology, consumer, producer, restoreStateConsumer, config, null);
+
+            task.addRecords(partition1, records(
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+            ));
+
+            task.addRecords(partition2, records(
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(),
15, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(),
25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(),
35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+            ));
+
+            assertTrue(task.maybePunctuate());
+
+            assertEquals(5, task.process());
+            assertEquals(1, source1.numReceived);
+            assertEquals(0, source2.numReceived);
+
+            assertFalse(task.maybePunctuate());
+
+            assertEquals(4, task.process());
+            assertEquals(1, source1.numReceived);
+            assertEquals(1, source2.numReceived);
+
+            assertTrue(task.maybePunctuate());
+
+            assertEquals(3, task.process());
+            assertEquals(2, source1.numReceived);
+            assertEquals(1, source2.numReceived);
+
+            assertFalse(task.maybePunctuate());
+
+            assertEquals(2, task.process());
+            assertEquals(2, source1.numReceived);
+            assertEquals(2, source2.numReceived);
+
+            assertTrue(task.maybePunctuate());
+
+            assertEquals(1, task.process());
+            assertEquals(3, source1.numReceived);
+            assertEquals(2, source2.numReceived);
+
+            assertFalse(task.maybePunctuate());
+
+            assertEquals(0, task.process());
+            assertEquals(3, source1.numReceived);
+            assertEquals(3, source2.numReceived);
+
+            assertFalse(task.maybePunctuate());
+
+            processor.supplier.checkAndClearPunctuateResult(10L, 20L, 30L);
+
+            task.close();
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
     private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[],
byte[]>... recs) {
         return Arrays.asList(recs);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index d3b8081..287af5a 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -143,7 +143,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
 
     @Override
     public void schedule(long interval) {
-        throw new UnsupportedOperationException("schedule() not supported");
+        throw new UnsupportedOperationException("schedule() not supported.");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
new file mode 100644
index 0000000..cf8a526
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MockProcessorNode<K, V> extends ProcessorNode<K, V> {
+
+    public static final String NAME = "MOCK-PROCESS-";
+    public static final AtomicInteger INDEX = new AtomicInteger(1);
+
+    public int numReceived = 0;
+
+    public final MockProcessorSupplier<K, V> supplier;
+
+    public MockProcessorNode(long scheduleInterval) {
+        this(new MockProcessorSupplier<K, V>(scheduleInterval));
+    }
+
+    private MockProcessorNode(MockProcessorSupplier<K, V> supplier) {
+        super(NAME + INDEX.getAndIncrement(), supplier.get(), Collections.<String>emptySet());
+
+        this.supplier = supplier;
+    }
+
+    @Override
+    public void process(K key, V value) {
+        this.numReceived++;
+        processor().process(key, value);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a58407e/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
index b402525..921c365 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.test;
 
+import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -30,16 +31,28 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K,
V> {
     public final ArrayList<String> processed = new ArrayList<>();
     public final ArrayList<Long> punctuated = new ArrayList<>();
 
+    private final long scheduleInterval;
+
+    public MockProcessorSupplier() {
+        this(-1L);
+    }
+
+    public MockProcessorSupplier(long scheduleInterval) {
+        this.scheduleInterval = scheduleInterval;
+    }
+
     @Override
     public Processor<K, V> get() {
         return new MockProcessor();
     }
 
-    public class MockProcessor implements Processor<K, V> {
+    public class MockProcessor extends AbstractProcessor<K, V> {
 
         @Override
         public void init(ProcessorContext context) {
-            // do nothing
+            super.init(context);
+            if (scheduleInterval > 0L)
+                context.schedule(scheduleInterval);
         }
 
         @Override
@@ -49,21 +62,30 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K,
V> {
 
         @Override
         public void punctuate(long streamTime) {
+            assertEquals(streamTime, context().timestamp());
+            assertEquals(null, context().topic());
+            assertEquals(-1, context().partition());
+            assertEquals(-1L, context().offset());
+
             punctuated.add(streamTime);
         }
+    }
 
-        @Override
-        public void close() {
-            // do nothing
+    public void checkAndClearProcessResult(String... expected) {
+        assertEquals("the number of outputs:", expected.length, processed.size());
+
+        for (int i = 0; i < expected.length; i++) {
+            assertEquals("output[" + i + "]:", expected[i], processed.get(i));
         }
 
+        processed.clear();
     }
 
-    public void checkAndClearResult(String... expected) {
-        assertEquals("the number of outputs:", expected.length, processed.size());
+    public void checkAndClearPunctuateResult(long... expected) {
+        assertEquals("the number of outputs:", expected.length, punctuated.size());
 
         for (int i = 0; i < expected.length; i++) {
-            assertEquals("output[" + i + "]:", expected[i], processed.get(i));
+            assertEquals("output[" + i + "]:", expected[i], (long) punctuated.get(i));
         }
 
         processed.clear();


Mime
View raw message