kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-3887 Follow-up: add unit test for null checking in KTable aggregates
Date Mon, 11 Jul 2016 20:57:06 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c43926822 -> 136a8fabc


http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java
deleted file mode 100644
index 4e6dcb2..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements.  See the NOTICE file distributed with this work for additional information
regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
- * "License"); you may not use this file except in compliance with the License.  You may
obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under
the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
- * or implied. See the License for the specific language governing permissions and limitations
under
- * the License.
- */
-
-package org.apache.kafka.streams.integration;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-/**
- * End-to-end integration test that reads data from an input topic and writes the same data
as-is to
- * a new output topic, using an embedded Kafka cluster.
- */
-public class PassThroughIntegrationTest {
-    @ClassRule
-    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
-    private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
-    private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
-
-    @BeforeClass
-    public static void startKafkaCluster() throws Exception {
-        CLUSTER.createTopic(DEFAULT_INPUT_TOPIC);
-        CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
-    }
-
-    @Test
-    public void shouldWriteTheInputDataAsIsToTheOutputTopic() throws Exception {
-        List<String> inputValues = Arrays.asList(
-            "hello world",
-            "the world is not enough",
-            "the world of the stock market is coming to an end"
-        );
-
-        //
-        // Step 1: Configure and start the processor topology.
-        //
-        KStreamBuilder builder = new KStreamBuilder();
-
-        Properties streamsConfiguration = new Properties();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "pass-through-integration-test");
-        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
-        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-        // Write the input data as-is to the output topic.
-        builder.stream(DEFAULT_INPUT_TOPIC).to(DEFAULT_OUTPUT_TOPIC);
-
-        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
-        streams.start();
-
-        //
-        // Step 2: Produce some input data to the input topic.
-        //
-        Properties producerConfig = new Properties();
-        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
-        producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
-        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues,
producerConfig);
-
-        //
-        // Step 3: Verify the application's output data.
-        //
-        Properties consumerConfig = new Properties();
-        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "pass-through-integration-test-standard-consumer");
-        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        List<String> actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig,
-            DEFAULT_OUTPUT_TOPIC, inputValues.size());
-        streams.close();
-        assertThat(actualValues, equalTo(inputValues));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
deleted file mode 100644
index af51dca..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements.  See the NOTICE file distributed with this work for additional information
regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
- * "License"); you may not use this file except in compliance with the License.  You may
obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under
the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
- * or implied. See the License for the specific language governing permissions and limitations
under
- * the License.
- */
-
-package org.apache.kafka.streams.integration;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.ValueMapper;
-import org.apache.kafka.test.TestUtils;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Properties;
-
-import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-
-
-/**
- * End-to-end integration test based on a simple word count example, using an embedded Kafka
- * cluster.
- */
-public class WordCountIntegrationTest {
-    @ClassRule
-    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
-    private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
-    private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
-
-    @BeforeClass
-    public static void startKafkaCluster() throws Exception {
-        CLUSTER.createTopic(DEFAULT_INPUT_TOPIC, 2, 1);
-        CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
-    }
-
-    @Test
-    public void shouldCountWords() throws Exception {
-        List<String> inputValues = Arrays.asList("hello", "world", "world", "hello
world");
-        List<KeyValue<String, Long>> expectedWordCounts = Arrays.asList(
-            new KeyValue<>("hello", 1L),
-            new KeyValue<>("hello", 2L),
-            new KeyValue<>("world", 1L),
-            new KeyValue<>("world", 2L),
-            new KeyValue<>("world", 3L)
-        );
-
-        //
-        // Step 1: Configure and start the processor topology.
-        //
-        final Serde<String> stringSerde = Serdes.String();
-        final Serde<Long> longSerde = Serdes.Long();
-
-        Properties streamsConfiguration = new Properties();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-integration-test");
-        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
-        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-
-        KStreamBuilder builder = new KStreamBuilder();
-
-        KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC);
-
-        KStream<String, Long> wordCounts = textLines
-            .flatMapValues(new ValueMapper<String, Iterable<String>>() {
-                @Override
-                public Iterable<String> apply(String value) {
-                    return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
-                }
-            }).groupBy(new KeyValueMapper<String, String, String>() {
-                @Override
-                public String apply(final String key, final String value) {
-                    return value;
-                }
-            }).count("Counts")
-            .toStream();
-
-        wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC);
-
-        // Remove any state from previous test runs
-        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
-
-        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
-        streams.start();
-        
-        //
-        // Step 2: Produce some input data to the input topic.
-        //
-        Properties producerConfig = new Properties();
-        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
-        producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
-        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues,
producerConfig);
-
-        //
-        // Step 3: Verify the application's output data.
-        //
-        Properties consumerConfig = new Properties();
-        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "wordcount-integration-test-standard-consumer");
-        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
-        List<KeyValue<String, Long>> actualWordCounts = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
-            DEFAULT_OUTPUT_TOPIC, expectedWordCounts.size());
-        Collections.sort(actualWordCounts, new Comparator<KeyValue<String, Long>>()
{
-            @Override
-            public int compare(final KeyValue<String, Long> o1, final KeyValue<String,
Long> o2) {
-                int keyComparison = o1.key.compareTo(o2.key);
-                if (keyComparison == 0) {
-                    return o1.value.compareTo(o2.value);
-                }
-                return keyComparison;
-            }
-        });
-        streams.close();
-        assertThat(actualWordCounts, equalTo(expectedWordCounts));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
deleted file mode 100644
index fc0451a..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-
-public class KGroupedTableImplTest {
-
-    private File stateDir;
-
-    @Before
-    public void setUp() throws IOException {
-        stateDir = TestUtils.tempDirectory("kafka-test");
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testGroupedCountOccurences() throws IOException {
-        final KStreamBuilder builder = new KStreamBuilder();
-        final String input = "count-test-input";
-        final MockProcessorSupplier processorSupplier = new MockProcessorSupplier<>();
-
-        builder.table(Serdes.String(), Serdes.String(), input)
-                .groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>()
{
-                    @Override
-                    public KeyValue<String, String> apply(final String key, final String
value) {
-                        return new KeyValue<>(value, value);
-                    }
-                }, Serdes.String(), Serdes.String())
-                .count("count")
-                .toStream()
-                .process(processorSupplier);
-
-
-        final KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir);
-
-
-        driver.process(input, "A", "green");
-        driver.process(input, "B", "green");
-        driver.process(input, "A", "blue");
-        driver.process(input, "C", "yellow");
-        driver.process(input, "D", "green");
-
-        final List<String> expected = Arrays.asList("green:1", "green:2", "blue:1",
"green:1", "yellow:1", "green:2");
-        final List<String> actual = processorSupplier.processed;
-        assertEquals(expected, actual);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 75e007d..e5864ee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -62,7 +62,8 @@ public class KTableAggregateTest {
     @Test
     public void testAggBasic() throws Exception {
         final KStreamBuilder builder = new KStreamBuilder();
-        String topic1 = "topic1";
+        final String topic1 = "topic1";
+        final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
 
         KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
         KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String,
String>NoOpKeyValueMapper(),
@@ -74,8 +75,7 @@ public class KTableAggregateTest {
                 stringSerde,
                 "topic1-Canonized");
 
-        MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
-        table2.toStream().process(proc2);
+        table2.toStream().process(proc);
 
         driver = new KStreamTestDriver(builder, stateDir);
 
@@ -96,24 +96,25 @@ public class KTableAggregateTest {
                 "C:0+5",
                 "D:0+6",
                 "B:0+2+4-2+7", "B:0+2+4-2+7-4",
-                "C:0+5+8", "C:0+5+8-5"), proc2.processed);
+                "C:0+5+8", "C:0+5+8-5"), proc.processed);
     }
 
     @Test
     public void testAggRepartition() throws Exception {
         final KStreamBuilder builder = new KStreamBuilder();
-        String topic1 = "topic1";
+        final String topic1 = "topic1";
+        final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
 
         KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
         KTable<String, String> table2 = table1.groupBy(new KeyValueMapper<String,
String, KeyValue<String, String>>() {
             @Override
                 public KeyValue<String, String> apply(String key, String value) {
                     if (key.equals("null")) {
-                        return KeyValue.pair(null, value + "s");
+                        return KeyValue.pair(null, value);
                     } else if (key.equals("NULL")) {
                         return null;
                     } else {
-                        return KeyValue.pair(value, value + "s");
+                        return KeyValue.pair(value, value);
                     }
                 }
             },
@@ -126,12 +127,13 @@ public class KTableAggregateTest {
                 stringSerde,
                 "topic1-Canonized");
 
-        MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
-        table2.toStream().process(proc2);
+        table2.toStream().process(proc);
 
         driver = new KStreamTestDriver(builder, stateDir);
 
         driver.process(topic1, "A", "1");
+        driver.process(topic1, "A", null);
+        driver.process(topic1, "A", "1");
         driver.process(topic1, "B", "2");
         driver.process(topic1, "null", "3");
         driver.process(topic1, "B", "4");
@@ -139,11 +141,36 @@ public class KTableAggregateTest {
         driver.process(topic1, "B", "7");
 
         assertEquals(Utils.mkList(
-                "1:0+1s",
-                "2:0+2s",
-                "4:0+4s",
-                "2:0+2s-2s",
-                "7:0+7s",
-                "4:0+4s-4s"), proc2.processed);
+                "1:0+1",
+                "1:0+1-1",
+                "1:0+1-1+1",
+                "2:0+2",
+                "4:0+4",
+                "2:0+2-2",
+                "7:0+7",
+                "4:0+4-4"), proc.processed);
+    }
+
+    @Test
+    public void testCount() throws IOException {
+        final KStreamBuilder builder = new KStreamBuilder();
+        final String input = "count-test-input";
+        final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
+
+        builder.table(Serdes.String(), Serdes.String(), input)
+                .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(),
stringSerde, stringSerde)
+                .count("count")
+                .toStream()
+                .process(proc);
+
+        final KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir);
+
+        driver.process(input, "A", "green");
+        driver.process(input, "B", "green");
+        driver.process(input, "A", "blue");
+        driver.process(input, "C", "yellow");
+        driver.process(input, "D", "green");
+
+        assertEquals(Utils.mkList("green:1", "green:2", "blue:1", "green:1", "yellow:1",
"green:2"), proc.processed);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index aaa6cc7..84bfdd6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -125,7 +125,7 @@ public class KTableSourceTest {
     }
 
     @Test
-    public void testNotSedingOldValue() throws IOException {
+    public void testNotSendingOldValue() throws IOException {
         final KStreamBuilder builder = new KStreamBuilder();
 
         String topic1 = "topic1";
@@ -160,7 +160,7 @@ public class KTableSourceTest {
     }
 
     @Test
-    public void testSedingOldValue() throws IOException {
+    public void testSendingOldValue() throws IOException {
         final KStreamBuilder builder = new KStreamBuilder();
 
         String topic1 = "topic1";

http://git-wip-us.apache.org/repos/asf/kafka/blob/136a8fab/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
index 769ee71..4861e7c 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
@@ -23,14 +23,35 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
 public class MockKeyValueMapper {
 
     private static class NoOpKeyValueMapper<K, V> implements KeyValueMapper<K, V,
KeyValue<K, V>> {
-
         @Override
         public KeyValue<K, V> apply(K key, V value) {
             return KeyValue.pair(key, value);
         }
     }
 
+    private static class SelectValueKeyValueMapper<K, V> implements KeyValueMapper<K,
V, KeyValue<V, V>> {
+        @Override
+        public KeyValue<V, V> apply(K key, V value) {
+            return KeyValue.pair(value, value);
+        }
+    }
+
+    private static class SelectValueMapper<K, V> implements KeyValueMapper<K, V,
V> {
+        @Override
+        public V apply(K key, V value) {
+            return value;
+        }
+    }
+
     public static <K, V> KeyValueMapper<K, V, KeyValue<K, V>> NoOpKeyValueMapper()
{
         return new NoOpKeyValueMapper<>();
     }
+
+    public static <K, V> KeyValueMapper<K, V, KeyValue<V, V>> SelectValueKeyValueMapper()
{
+        return new SelectValueKeyValueMapper<>();
+    }
+
+    public static <K, V> KeyValueMapper<K, V, V> SelectValueMapper() {
+        return new SelectValueMapper<>();
+    }
 }
\ No newline at end of file


Mime
View raw message