kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [3/3] kafka git commit: KAFKA-4144: Allow per stream/table timestamp extractor
Date Sat, 13 May 2017 04:39:09 GMT
KAFKA-4144: Allow per stream/table timestamp extractor

Author: Jeyhun Karimov <je.karimov@gmail.com>

Reviewers: Damian Guy, Eno Thereska, Matthias J. Sax, Guozhang Wang

Closes #2466 from jeyhunkarimov/KAFKA-4144


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9198467e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9198467e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9198467e

Branch: refs/heads/trunk
Commit: 9198467eb293435385edcef9028a59fa41a3828a
Parents: a1c8e7d
Author: Jeyhun Karimov <je.karimov@gmail.com>
Authored: Fri May 12 21:38:49 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri May 12 21:38:49 2017 -0700

----------------------------------------------------------------------
 .../examples/pageview/PageViewTypedDemo.java    |   2 +-
 .../examples/pageview/PageViewUntypedDemo.java  |   2 +-
 .../kafka/streams/examples/pipe/PipeDemo.java   |   4 +-
 .../examples/wordcount/WordCountDemo.java       |   4 +-
 .../wordcount/WordCountProcessorDemo.java       |   4 +-
 .../org/apache/kafka/streams/KafkaStreams.java  |   4 +-
 .../org/apache/kafka/streams/StreamsConfig.java | 117 ++++-
 .../kafka/streams/kstream/KGroupedTable.java    |   2 +-
 .../kafka/streams/kstream/KStreamBuilder.java   | 434 ++++++++++++++++---
 .../streams/processor/TimestampExtractor.java   |   1 +
 .../streams/processor/TopologyBuilder.java      | 317 ++++++++++----
 .../processor/internals/PartitionGroup.java     |   9 +-
 .../streams/processor/internals/SourceNode.java |  14 +-
 .../streams/processor/internals/StreamTask.java |   8 +-
 .../apache/kafka/streams/StreamsConfigTest.java |  16 +-
 .../integration/FanoutIntegrationTest.java      |   2 +-
 .../GlobalKTableIntegrationTest.java            |   2 +-
 .../InternalTopicIntegrationTest.java           |   8 +-
 .../integration/JoinIntegrationTest.java        |   4 +-
 .../KStreamKTableJoinIntegrationTest.java       |   4 +-
 .../KTableKTableJoinIntegrationTest.java        |   4 +-
 .../QueryableStateIntegrationTest.java          |   4 +-
 .../integration/ResetIntegrationTest.java       |   4 +-
 .../streams/kstream/KStreamBuilderTest.java     |  54 +++
 .../internals/GlobalKTableJoinsTest.java        |   2 +-
 .../kstream/internals/KStreamImplTest.java      |   8 +-
 .../kafka/streams/perf/SimpleBenchmark.java     |   4 +-
 .../streams/processor/TopologyBuilderTest.java  |  64 ++-
 .../internals/InternalTopicManagerTest.java     |   2 +-
 .../processor/internals/PartitionGroupTest.java |   2 +-
 .../internals/ProcessorTopologyTest.java        |   6 +-
 .../processor/internals/StandbyTaskTest.java    |   2 +-
 .../internals/StreamPartitionAssignorTest.java  |   2 +-
 .../processor/internals/StreamTaskTest.java     |  17 +-
 .../processor/internals/StreamThreadTest.java   |  21 +-
 .../streams/state/KeyValueStoreTestDriver.java  |   6 +-
 .../StreamThreadStateStoreProviderTest.java     |   5 +-
 .../streams/tests/BrokerCompatibilityTest.java  |   4 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   2 +-
 .../org/apache/kafka/test/StreamsTestUtils.java |   4 +-
 40 files changed, 947 insertions(+), 228 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index f235044..bae7ed6 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -83,7 +83,7 @@ public class PageViewTypedDemo {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-typed");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
+        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
 
         // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index 39e84e8..ed05f77 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -59,7 +59,7 @@ public class PageViewUntypedDemo {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-untyped");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
+        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
 
         // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
index 62b52c0..86182a3 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
@@ -41,8 +41,8 @@ public class PipeDemo {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
         // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
index 616228a..50e906f 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -49,8 +49,8 @@ public class WordCountDemo {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
 
         // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
         // Note: To re-run the demo, you need to use the offset reset tool:

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index 9ef24f0..f2c79d6 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -104,8 +104,8 @@ public class WordCountProcessorDemo {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
         // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index bc2a433..74f5fc1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -99,8 +99,8 @@ import static org.apache.kafka.common.utils.Utils.getPort;
  * Map<String, Object> props = new HashMap<>();
  * props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
  * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- * props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- * props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  * StreamsConfig config = new StreamsConfig(props);
  *
  * KStreamBuilder builder = new KStreamBuilder();

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 35e6e3d..c1fd2d6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
+import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 
@@ -134,9 +135,23 @@ public class StreamsConfig extends AbstractConfig {
     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
     private static final String CONNECTIONS_MAX_IDLE_MS_DOC = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC;
 
+    /** {@code default key.serde} */
+    public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde";
+    private static final String DEFAULT_KEY_SERDE_CLASS_DOC = " Default serializer / deserializer class for key that implements the <code>Serde</code> interface.";
+
+    /** {@code default timestamp.extractor} */
+    public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "default.timestamp.extractor";
+    private static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = "Default timestamp extractor class that implements the <code>TimestampExtractor</code> interface.";
+
+    /** {@code default value.serde} */
+    public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG = "default.value.serde";
+    private static final String DEFAULT_VALUE_SERDE_CLASS_DOC = "Default serializer / deserializer class for value that implements the <code>Serde</code> interface.";
+
     /** {@code key.serde} */
+    @Deprecated
     public static final String KEY_SERDE_CLASS_CONFIG = "key.serde";
-    private static final String KEY_SERDE_CLASS_DOC = "Serializer / deserializer class for key that implements the <code>Serde</code> interface.";
+    @Deprecated
+    private static final String KEY_SERDE_CLASS_DOC = "Serializer / deserializer class for key that implements the <code>Serde</code> interface. This config is deprecated, use \"default.key.serde\" instead";
 
     /** {@code metadata.max.age.ms} */
     public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
@@ -216,12 +231,16 @@ public class StreamsConfig extends AbstractConfig {
     private static final String STATE_DIR_DOC = "Directory location for state store.";
 
     /** {@code timestamp.extractor} */
+    @Deprecated
     public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor";
-    private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface.";
+    @Deprecated
+    private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface. This config is deprecated, use \"default.timestamp.extractor\" instead";
 
     /** {@code value.serde} */
+    @Deprecated
     public static final String VALUE_SERDE_CLASS_CONFIG = "value.serde";
-    private static final String VALUE_SERDE_CLASS_DOC = "Serializer / deserializer class for value that implements the <code>Serde</code> interface.";
+    @Deprecated
+    private static final String VALUE_SERDE_CLASS_DOC = "Serializer / deserializer class for value that implements the <code>Serde</code> interface. This config is deprecated, use \"default.value.serde\" instead";
 
     /** {@code windowstore.changelog.additional.retention.ms} */
     public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = "windowstore.changelog.additional.retention.ms";
@@ -267,24 +286,39 @@ public class StreamsConfig extends AbstractConfig {
                     REPLICATION_FACTOR_DOC)
             .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
                     Type.CLASS,
-                    FailOnInvalidTimestamp.class.getName(),
+                    null,
                     Importance.MEDIUM,
                     TIMESTAMP_EXTRACTOR_CLASS_DOC)
+            .define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
+                        Type.CLASS,
+                        FailOnInvalidTimestamp.class.getName(),
+                        Importance.MEDIUM,
+                        DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC)
             .define(PARTITION_GROUPER_CLASS_CONFIG,
                     Type.CLASS,
                     DefaultPartitionGrouper.class.getName(),
                     Importance.MEDIUM,
                     PARTITION_GROUPER_CLASS_DOC)
             .define(KEY_SERDE_CLASS_CONFIG,
-                    Type.CLASS,
-                    Serdes.ByteArraySerde.class.getName(),
-                    Importance.MEDIUM,
-                    KEY_SERDE_CLASS_DOC)
+                        Type.CLASS,
+                        null,
+                        Importance.MEDIUM,
+                        KEY_SERDE_CLASS_DOC)
+            .define(DEFAULT_KEY_SERDE_CLASS_CONFIG,
+                        Type.CLASS,
+                        Serdes.ByteArraySerde.class.getName(),
+                        Importance.MEDIUM,
+                        DEFAULT_KEY_SERDE_CLASS_DOC)
             .define(VALUE_SERDE_CLASS_CONFIG,
                     Type.CLASS,
-                    Serdes.ByteArraySerde.class.getName(),
+                    null,
                     Importance.MEDIUM,
                     VALUE_SERDE_CLASS_DOC)
+            .define(DEFAULT_VALUE_SERDE_CLASS_CONFIG,
+                        Type.CLASS,
+                        Serdes.ByteArraySerde.class.getName(),
+                        Importance.MEDIUM,
+                        DEFAULT_VALUE_SERDE_CLASS_DOC)
             .define(COMMIT_INTERVAL_MS_CONFIG,
                     Type.LONG,
                     30000,
@@ -590,14 +624,20 @@ public class StreamsConfig extends AbstractConfig {
 
     /**
      * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #KEY_SERDE_CLASS_CONFIG key Serde
-     * class}.
+     * class}. This method is deprecated. Use {@link #defaultKeySerde()} method instead.
      *
      * @return an configured instance of key Serde class
      */
+    @Deprecated
     public Serde keySerde() {
         try {
-            final Serde<?> serde = getConfiguredInstance(KEY_SERDE_CLASS_CONFIG, Serde.class);
-            serde.configure(originals(), true);
+            Serde<?> serde = getConfiguredInstance(KEY_SERDE_CLASS_CONFIG, Serde.class);
+            // the default value of deprecated key serde is null
+            if (serde == null) {
+                serde = defaultKeySerde();
+            } else {
+                serde.configure(originals(), true);
+            }
             return serde;
         } catch (final Exception e) {
             throw new StreamsException(String.format("Failed to configure key serde %s", get(KEY_SERDE_CLASS_CONFIG)), e);
@@ -605,15 +645,37 @@ public class StreamsConfig extends AbstractConfig {
     }
 
     /**
+     * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #DEFAULT_KEY_SERDE_CLASS_CONFIG key Serde
+     * class}.
+     *
+     * @return an configured instance of key Serde class
+     */
+    public Serde defaultKeySerde() {
+        try {
+            Serde<?> serde = getConfiguredInstance(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serde.class);
+            serde.configure(originals(), true);
+            return serde;
+        } catch (final Exception e) {
+            throw new StreamsException(String.format("Failed to configure key serde %s", get(DEFAULT_KEY_SERDE_CLASS_CONFIG)), e);
+        }
+    }
+
+    /**
      * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #VALUE_SERDE_CLASS_CONFIG value
-     * Serde class}.
+     * Serde class}. This method is deprecated. Use {@link #defaultValueSerde()} instead.
      *
      * @return an configured instance of value Serde class
      */
+    @Deprecated
     public Serde valueSerde() {
         try {
-            final Serde<?> serde = getConfiguredInstance(VALUE_SERDE_CLASS_CONFIG, Serde.class);
-            serde.configure(originals(), false);
+            Serde<?> serde = getConfiguredInstance(VALUE_SERDE_CLASS_CONFIG, Serde.class);
+            // the default value of deprecated value serde is null
+            if (serde == null) {
+                serde = defaultValueSerde();
+            } else {
+                serde.configure(originals(), false);
+            }
             return serde;
         } catch (final Exception e) {
             throw new StreamsException(String.format("Failed to configure value serde %s", get(VALUE_SERDE_CLASS_CONFIG)), e);
@@ -621,6 +683,31 @@ public class StreamsConfig extends AbstractConfig {
     }
 
     /**
+     * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #DEFAULT_VALUE_SERDE_CLASS_CONFIG value
+     * Serde class}.
+     *
+     * @return an configured instance of value Serde class
+     */
+    public Serde defaultValueSerde() {
+        try {
+            Serde<?> serde = getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.class);
+            serde.configure(originals(), false);
+            return serde;
+        } catch (final Exception e) {
+            throw new StreamsException(String.format("Failed to configure value serde %s", get(DEFAULT_VALUE_SERDE_CLASS_CONFIG)), e);
+        }
+    }
+
+
+    public TimestampExtractor defaultTimestampExtractor() {
+        TimestampExtractor timestampExtractor = getConfiguredInstance(TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
+        if (timestampExtractor == null) {
+            return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
+        }
+        return timestampExtractor;
+    }
+
+    /**
      * Override any client properties in the original configs with overrides
      *
      * @param configNames The given set of configuration names.

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index d14e600..2079860 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -353,7 +353,7 @@ public interface KGroupedTable<K, V> {
      * Records with {@code null} key are ignored.
      * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining via reduce(...)} as it,
      * for example, allows the result to have a different type than the input values.
-     * If the result value type does not match the {@link StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value
+     * If the result value type does not match the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value
      * serde} you should use {@link KGroupedTable#aggregate(Initializer, Aggregator, Aggregator, Serde, String)
      * aggregate(Initializer, Aggregator, Aggregator, Serde, String)}.
      * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 0e02c8f..fb05e4d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.internals.KTableSource;
 import org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
@@ -52,10 +53,10 @@ public class KStreamBuilder extends TopologyBuilder {
 
     /**
      * Create a {@link KStream} from the specified topics.
-     * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the
-     * {@link StreamsConfig config} are used.
+     * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value
+     * deserializers as specified in the {@link StreamsConfig config} are used.
      * <p>
-     * If multiple topics are specified there are no ordering guaranteed for records from different topics.
+     * If multiple topics are specified there is no ordering guarantee for records from different topics.
      * <p>
      * Note that the specified input topics must be partitioned by key.
      * If this is not the case it is the user's responsibility to repartition the date before any key based operation
@@ -65,14 +66,15 @@ public class KStreamBuilder extends TopologyBuilder {
      * @return a {@link KStream} for the specified topics
      */
     public <K, V> KStream<K, V> stream(final String... topics) {
-        return stream(null, null, null, topics);
+        return stream(null, null, null, null, topics);
     }
 
     /**
      * Create a {@link KStream} from the specified topics.
-     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+     * The default {@link TimestampExtractor} and default key and value deserializers as specified in the
+     * {@link StreamsConfig config} are used.
      * <p>
-     * If multiple topics are specified there are no ordering guaranteed for records from different topics.
+     * If multiple topics are specified there is no ordering guarantee for records from different topics.
      * <p>
      * Note that the specified input topics must be partitioned by key.
      * If this is not the case it is the user's responsibility to repartition the date before any key based operation
@@ -85,13 +87,14 @@ public class KStreamBuilder extends TopologyBuilder {
      */
     public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset,
                                        final String... topics) {
-        return stream(offsetReset, null, null, topics);
+        return stream(offsetReset, null, null, null, topics);
     }
 
+
     /**
      * Create a {@link KStream} from the specified topic pattern.
-     * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the
-     * {@link StreamsConfig config} are used.
+     * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value
+     * deserializers as specified in the {@link StreamsConfig config} are used.
      * <p>
      * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
      * them and there is no ordering guarantee between records from different topics.
@@ -104,12 +107,13 @@ public class KStreamBuilder extends TopologyBuilder {
      * @return a {@link KStream} for topics matching the regex pattern.
      */
     public <K, V> KStream<K, V> stream(final Pattern topicPattern) {
-        return stream(null, null, null, topicPattern);
+        return stream(null, null,  null, null, topicPattern);
     }
 
     /**
      * Create a {@link KStream} from the specified topic pattern.
-     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+     * The default {@link TimestampExtractor} and default key and value deserializers as specified in the
+     * {@link StreamsConfig config} are used.
      * <p>
      * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
      * them and there is no ordering guarantee between records from different topics.
@@ -124,14 +128,15 @@ public class KStreamBuilder extends TopologyBuilder {
      * @return a {@link KStream} for topics matching the regex pattern.
      */
     public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Pattern topicPattern) {
-        return stream(offsetReset, null, null, topicPattern);
+        return stream(offsetReset, null, null, null, topicPattern);
     }
 
     /**
      * Create a {@link KStream} from the specified topics.
-     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
+     * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor} as specified in the
+     * {@link StreamsConfig config} are used.
      * <p>
-     * If multiple topics are specified there are no ordering guaranteed for records from different topics.
+     * If multiple topics are specified there is no ordering guarantee for records from different topics.
      * <p>
      * Note that the specified input topics must be partitioned by key.
      * If this is not the case it is the user's responsibility to repartition the date before any key based operation
@@ -145,14 +150,15 @@ public class KStreamBuilder extends TopologyBuilder {
      * @return a {@link KStream} for the specified topics
      */
     public <K, V> KStream<K, V> stream(final Serde<K> keySerde, final Serde<V> valSerde, final String... topics) {
-        return stream(null, keySerde, valSerde, topics);
+        return stream(null, null, keySerde, valSerde, topics);
     }
 
 
     /**
      * Create a {@link KStream} from the specified topics.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
      * <p>
-     * If multiple topics are specified there are no ordering guaranteed for records from different topics.
+     * If multiple topics are specified there is no ordering guarantee for records from different topics.
      * <p>
      * Note that the specified input topics must be partitioned by key.
      * If this is not the case it is the user's responsibility to repartition the date before any key based operation
@@ -171,9 +177,65 @@ public class KStreamBuilder extends TopologyBuilder {
                                        final Serde<K> keySerde,
                                        final Serde<V> valSerde,
                                        final String... topics) {
+        return stream(offsetReset, null, keySerde, valSerde, topics);
+    }
+
+
+    /**
+     * Create a {@link KStream} from the specified topics.
+     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
+     * <p>
+     * If multiple topics are specified there is no ordering guarantee for records from different topics.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+     * (like aggregation or join) is applied to the returned {@link KStream}.
+     *
+     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param keySerde           key serde used to read this source {@link KStream}, if not specified the default
+     *                           serde defined in the configs will be used
+     * @param valSerde           value serde used to read this source {@link KStream},
+     *                           if not specified the default serde defined in the configs will be used
+     * @param topics             the topic names; must contain at least one topic name
+     * @return a {@link KStream} for the specified topics
+     */
+    public <K, V> KStream<K, V> stream(final TimestampExtractor timestampExtractor,
+                                       final Serde<K> keySerde,
+                                       final Serde<V> valSerde,
+                                       final String... topics) {
+        return stream(null, timestampExtractor, keySerde, valSerde, topics);
+    }
+
+
+    /**
+     * Create a {@link KStream} from the specified topics.
+     * <p>
+     * If multiple topics are specified there is no ordering guarantee for records from different topics.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+     * (like aggregation or join) is applied to the returned {@link KStream}.
+     *
+     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topics
+     *                           if no valid committed offsets are available
+     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param keySerde           key serde used to read this source {@link KStream},
+     *                           if not specified the default serde defined in the configs will be used
+     * @param valSerde           value serde used to read this source {@link KStream},
+     *                           if not specified the default serde defined in the configs will be used
+     * @param topics             the topic names; must contain at least one topic name
+     * @return a {@link KStream} for the specified topics
+     */
+    public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset,
+                                       final TimestampExtractor timestampExtractor,
+                                       final Serde<K> keySerde,
+                                       final Serde<V> valSerde,
+                                       final String... topics) {
         final String name = newName(KStreamImpl.SOURCE_NAME);
 
-        addSource(offsetReset, name,  keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics);
+        addSource(offsetReset, name, timestampExtractor, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics);
 
         return new KStreamImpl<>(this, name, Collections.singleton(name), false);
     }
@@ -181,7 +243,8 @@ public class KStreamBuilder extends TopologyBuilder {
 
     /**
      * Create a {@link KStream} from the specified topic pattern.
-     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
+     * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor}
+     * as specified in the {@link StreamsConfig config} are used.
      * <p>
      * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
      * them and there is no ordering guarantee between records from different topics.
@@ -200,11 +263,12 @@ public class KStreamBuilder extends TopologyBuilder {
     public <K, V> KStream<K, V> stream(final Serde<K> keySerde,
                                        final Serde<V> valSerde,
                                        final Pattern topicPattern) {
-        return stream(null, keySerde, valSerde, topicPattern);
+        return stream(null, null, keySerde, valSerde, topicPattern);
     }
 
     /**
      * Create a {@link KStream} from the specified topic pattern.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
      * <p>
      * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
      * them and there is no ordering guarantee between records from different topics.
@@ -226,20 +290,78 @@ public class KStreamBuilder extends TopologyBuilder {
                                        final Serde<K> keySerde,
                                        final Serde<V> valSerde,
                                        final Pattern topicPattern) {
+        return stream(offsetReset, null, keySerde, valSerde, topicPattern);
+    }
+
+    /**
+     * Create a {@link KStream} from the specified topic pattern.
+     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
+     * <p>
+     * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
+     * them and there is no ordering guarantee between records from different topics.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+     * (like aggregation or join) is applied to the returned {@link KStream}.
+     *
+     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param keySerde           key serde used to read this source {@link KStream},
+     *                           if not specified the default serde defined in the configs will be used
+     * @param valSerde           value serde used to read this source {@link KStream},
+     *                           if not specified the default serde defined in the configs will be used
+     * @param topicPattern       the pattern to match for topic names
+     * @return a {@link KStream} for topics matching the regex pattern.
+     */
+    public <K, V> KStream<K, V> stream(final TimestampExtractor timestampExtractor,
+                                       final Serde<K> keySerde,
+                                       final Serde<V> valSerde,
+                                       final Pattern topicPattern) {
+        return stream(null, timestampExtractor, keySerde, valSerde, topicPattern);
+    }
+
+
+    /**
+     * Create a {@link KStream} from the specified topic pattern.
+     * <p>
+     * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
+     * them and there is no ordering guarantee between records from different topics.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case it is the user's responsibility to repartition the date before any key based operation
+     * (like aggregation or join) is applied to the returned {@link KStream}.
+     *
+     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the matched topics if no valid
+     *                           committed  offsets are available
+     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param keySerde           key serde used to read this source {@link KStream},
+     *                           if not specified the default serde defined in the configs will be used
+     * @param valSerde           value serde used to read this source {@link KStream},
+     *                           if not specified the default serde defined in the configs will be used
+     * @param topicPattern       the pattern to match for topic names
+     * @return a {@link KStream} for topics matching the regex pattern.
+     */
+    public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset,
+                                       final TimestampExtractor timestampExtractor,
+                                       final Serde<K> keySerde,
+                                       final Serde<V> valSerde,
+                                       final Pattern topicPattern) {
         final String name = newName(KStreamImpl.SOURCE_NAME);
 
-        addSource(offsetReset, name, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern);
+        addSource(offsetReset, name, timestampExtractor, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern);
 
         return new KStreamImpl<>(this, name, Collections.singleton(name), false);
     }
 
+
     /**
      * Create a {@link KTable} for the specified topic.
-     * The default {@code "auto.offset.reset"} strategy and default key and value deserializers as specified in the
-     * {@link StreamsConfig config} are used.
+     * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and
+     * default key and value deserializers as specified in the {@link StreamsConfig config} are used.
      * Input {@link KeyValue records} with {@code null} key will be dropped.
      * <p>
-     * Note that the specified input topics must be partitioned by key.
+     * Note that the specified input topic must be partitioned by key.
      * If this is not the case the returned {@link KTable} will be corrupted.
      * <p>
      * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
@@ -264,7 +386,7 @@ public class KStreamBuilder extends TopologyBuilder {
      */
     public <K, V> KTable<K, V> table(final String topic,
                                      final String queryableStoreName) {
-        return table(null, null, null, topic, queryableStoreName);
+        return table(null, null,  null, null, topic, queryableStoreName);
     }
 
     /**
@@ -298,7 +420,7 @@ public class KStreamBuilder extends TopologyBuilder {
      */
     public <K, V> KTable<K, V> table(final String topic,
                                      final StateStoreSupplier<KeyValueStore> storeSupplier) {
-        return table(null, null, null, topic, storeSupplier);
+        return table(null, null, null, null, topic, storeSupplier);
     }
 
 
@@ -319,7 +441,7 @@ public class KStreamBuilder extends TopologyBuilder {
      * @return a {@link KTable} for the specified topic
      */
     public <K, V> KTable<K, V> table(final String topic) {
-        return table(null, null, null, topic, (String) null);
+        return table(null, null, null, null, topic, (String) null);
     }
 
     /**
@@ -355,15 +477,16 @@ public class KStreamBuilder extends TopologyBuilder {
     public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
                                      final String topic,
                                      final String queryableStoreName) {
-        return table(offsetReset, null, null, topic, queryableStoreName);
+        return table(offsetReset, null, null, null, topic, queryableStoreName);
     }
 
     /**
      * Create a {@link KTable} for the specified topic.
-     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+     * The default {@link TimestampExtractor} and default key and value deserializers
+     * as specified in the {@link StreamsConfig config} are used.
      * Input {@link KeyValue records} with {@code null} key will be dropped.
      * <p>
-     * Note that the specified input topics must be partitioned by key.
+     * Note that the specified input topic must be partitioned by key.
      * If this is not the case the returned {@link KTable} will be corrupted.
      * <p>
      * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
@@ -391,7 +514,7 @@ public class KStreamBuilder extends TopologyBuilder {
     public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
                                      final String topic,
                                      final StateStoreSupplier<KeyValueStore> storeSupplier) {
-        return table(offsetReset, null, null, topic, storeSupplier);
+        return table(offsetReset, null, null, null, topic, storeSupplier);
     }
 
     /**
@@ -414,15 +537,93 @@ public class KStreamBuilder extends TopologyBuilder {
      */
     public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
                                      final String topic) {
-        return table(offsetReset, null, null, topic, (String) null);
+        return table(offsetReset, null, null, null, topic, (String) null);
     }
 
+
     /**
      * Create a {@link KTable} for the specified topic.
-     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
+     * The default {@code "auto.offset.reset"} strategy and default key and value deserializers
+     * as specified in the {@link StreamsConfig config} are used.
+     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topic must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code storeName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param topic              the topic name; cannot be {@code null}
+     * @param storeName          the state store name; cannot be {@code null}
+     * @return a {@link KTable} for the specified topic
+     */
+    public <K, V> KTable<K, V> table(final TimestampExtractor timestampExtractor,
+                                     final String topic,
+                                     final String storeName) {
+        return table(null, timestampExtractor, null, null, topic, storeName);
+    }
+
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
+     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topic must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code storeName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
+     *                    offsets are available
+     * @param topic       the topic name; cannot be {@code null}
+     * @param storeName   the state store name; cannot be {@code null}
+     * @return a {@link KTable} for the specified topic
+     */
+    public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
+                                     final TimestampExtractor timestampExtractor,
+                                     final String topic,
+                                     final String storeName) {
+        return table(offsetReset, timestampExtractor, null, null, topic, storeName);
+    }
+
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor}
+     * as specified in the {@link StreamsConfig config} are used.
      * Input {@link KeyValue records} with {@code null} key will be dropped.
      * <p>
-     * Note that the specified input topics must be partitioned by key.
+     * Note that the specified input topic must be partitioned by key.
      * If this is not the case the returned {@link KTable} will be corrupted.
      * <p>
      * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
@@ -453,15 +654,16 @@ public class KStreamBuilder extends TopologyBuilder {
                                      final Serde<V> valSerde,
                                      final String topic,
                                      final String queryableStoreName) {
-        return table(null, keySerde, valSerde, topic, queryableStoreName);
+        return table(null, null, keySerde, valSerde, topic, queryableStoreName);
     }
 
     /**
      * Create a {@link KTable} for the specified topic.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
      * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
      * Input {@link KeyValue records} with {@code null} key will be dropped.
      * <p>
-     * Note that the specified input topics must be partitioned by key.
+     * Note that the specified input topic must be partitioned by key.
      * If this is not the case the returned {@link KTable} will be corrupted.
      * <p>
      * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
@@ -492,7 +694,7 @@ public class KStreamBuilder extends TopologyBuilder {
                                      final Serde<V> valSerde,
                                      final String topic,
                                      final StateStoreSupplier<KeyValueStore> storeSupplier) {
-        return table(null, keySerde, valSerde, topic, storeSupplier);
+        return table(null, null, keySerde, valSerde, topic, storeSupplier);
     }
 
     /**
@@ -518,12 +720,13 @@ public class KStreamBuilder extends TopologyBuilder {
     public <K, V> KTable<K, V> table(final Serde<K> keySerde,
                                      final Serde<V> valSerde,
                                      final String topic) {
-        return table(null, keySerde, valSerde, topic, (String) null);
+        return table(null, null, keySerde, valSerde, topic, (String) null);
     }
 
     private <K, V> KTable<K, V> doTable(final AutoOffsetReset offsetReset,
                                         final Serde<K> keySerde,
                                         final Serde<V> valSerde,
+                                        final TimestampExtractor timestampExtractor,
                                         final String topic,
                                         final StateStoreSupplier<KeyValueStore> storeSupplier,
                                         final boolean isQueryable) {
@@ -531,7 +734,7 @@ public class KStreamBuilder extends TopologyBuilder {
         final String name = newName(KTableImpl.SOURCE_NAME);
         final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name());
 
-        addSource(offsetReset, source, keySerde == null ? null : keySerde.deserializer(),
+        addSource(offsetReset, source, timestampExtractor, keySerde == null ? null : keySerde.deserializer(),
                 valSerde == null ? null : valSerde.deserializer(),
                 topic);
         addProcessor(name, processorSupplier, source);
@@ -582,6 +785,96 @@ public class KStreamBuilder extends TopologyBuilder {
                                      final Serde<V> valSerde,
                                      final String topic,
                                      final String queryableStoreName) {
+        return table(offsetReset, null, keySerde, valSerde, topic, queryableStoreName);
+    }
+
+
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
+     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topic must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code storeName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param keySerde           key serde used to send key-value pairs,
+     *                           if not specified the default key serde defined in the configuration will be used
+     * @param valSerde           value serde used to send key-value pairs,
+     *                           if not specified the default value serde defined in the configuration will be used
+     * @param topic              the topic name; cannot be {@code null}
+     * @param storeName          the state store name; cannot be {@code null}
+     * @return a {@link KTable} for the specified topic
+     */
+    public <K, V> KTable<K, V> table(final TimestampExtractor timestampExtractor,
+                                     final Serde<K> keySerde,
+                                     final Serde<V> valSerde,
+                                     final String topic,
+                                     final String storeName) {
+        return table(null, timestampExtractor, keySerde, valSerde, topic, storeName);
+    }
+
+
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topic must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code storeName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topic if no valid
+     *                           committed offsets are available
+     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KTable},
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param keySerde           key serde used to send key-value pairs,
+     *                           if not specified the default key serde defined in the configuration will be used
+     * @param valSerde           value serde used to send key-value pairs,
+     *                           if not specified the default value serde defined in the configuration will be used
+     * @param topic              the topic name; cannot be {@code null}
+     * @param queryableStoreName the state store name; If {@code null} this is the equivalent of {@link KStreamBuilder#table(AutoOffsetReset, Serde, Serde, String)} ()} ()}.
+     * @return a {@link KTable} for the specified topic
+     */
+    public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
+                                     final TimestampExtractor timestampExtractor,
+                                     final Serde<K> keySerde,
+                                     final Serde<V> valSerde,
+                                     final String topic,
+                                     final String queryableStoreName) {
         final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
         final StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(internalStoreName,
                 keySerde,
@@ -589,7 +882,7 @@ public class KStreamBuilder extends TopologyBuilder {
                 false,
                 Collections.<String, String>emptyMap(),
                 true);
-        return doTable(offsetReset, keySerde, valSerde, topic, storeSupplier, queryableStoreName != null);
+        return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, queryableStoreName != null);
     }
 
     /**
@@ -618,7 +911,7 @@ public class KStreamBuilder extends TopologyBuilder {
                                      final Serde<K> keySerde,
                                      final Serde<V> valSerde,
                                      final String topic) {
-        return table(offsetReset, keySerde, valSerde, topic, (String) null);
+        return table(offsetReset, null, keySerde, valSerde, topic, (String) null);
     }
     /**
      * Create a {@link KTable} for the specified topic.
@@ -654,12 +947,13 @@ public class KStreamBuilder extends TopologyBuilder {
      * @return a {@link KTable} for the specified topic
      */
     public <K, V> KTable<K, V> table(final AutoOffsetReset offsetReset,
+                                     final TimestampExtractor timestampExtractor,
                                      final Serde<K> keySerde,
                                      final Serde<V> valSerde,
                                      final String topic,
                                      final StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
-        return doTable(offsetReset, keySerde, valSerde, topic, storeSupplier, true);
+        return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, true);
     }
 
     /**
@@ -689,7 +983,7 @@ public class KStreamBuilder extends TopologyBuilder {
      */
     public <K, V> GlobalKTable<K, V> globalTable(final String topic,
                                                  final String queryableStoreName) {
-        return globalTable(null, null, topic, queryableStoreName);
+        return globalTable(null, null, null,  topic, queryableStoreName);
     }
 
     /**
@@ -709,13 +1003,15 @@ public class KStreamBuilder extends TopologyBuilder {
      * @return a {@link GlobalKTable} for the specified topic
      */
     public <K, V> GlobalKTable<K, V> globalTable(final String topic) {
-        return globalTable(null, null, topic, (String) null);
+        return globalTable(null, null, null, topic, (String) null);
     }
 
+
     /**
      * Create a {@link GlobalKTable} for the specified topic.
-     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * The default {@link TimestampExtractor} and default key and value deserializers as specified in
+     * the {@link StreamsConfig config} are used.
+     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
      * <p>
      * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
      * {@code queryableStoreName}.
@@ -744,10 +1040,11 @@ public class KStreamBuilder extends TopologyBuilder {
     @SuppressWarnings("unchecked")
     public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
                                                  final Serde<V> valSerde,
+                                                 final TimestampExtractor timestampExtractor,
                                                  final String topic,
                                                  final String queryableStoreName) {
         final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
-        return doGlobalTable(keySerde, valSerde, topic, new RocksDBKeyValueStoreSupplier<>(internalStoreName,
+        return doGlobalTable(keySerde, valSerde, timestampExtractor, topic, new RocksDBKeyValueStoreSupplier<>(internalStoreName,
                             keySerde,
                             valSerde,
                     false,
@@ -757,8 +1054,8 @@ public class KStreamBuilder extends TopologyBuilder {
 
     /**
      * Create a {@link GlobalKTable} for the specified topic.
-     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
      * <p>
      * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
      * {@code queryableStoreName}.
@@ -789,11 +1086,44 @@ public class KStreamBuilder extends TopologyBuilder {
                                                  final Serde<V> valSerde,
                                                  final String topic,
                                                  final StateStoreSupplier<KeyValueStore> storeSupplier) {
-        return doGlobalTable(keySerde, valSerde, topic, storeSupplier);
+        return doGlobalTable(keySerde, valSerde, null, topic, storeSupplier);
     }
 
+
+    /**
+     * Create a {@link GlobalKTable} for the specified topic.
+     * Input {@link KeyValue} pairs with {@code null} key will be dropped.
+     * <p>
+     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with the given
+     * {@code storeName}.
+     * However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ...
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long valueForKey = localStore.get(key);
+     * }</pre>
+     * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
+     * regardless of the specified value in {@link StreamsConfig}.
+     *
+     * @param timestampExtractor the stateless timestamp extractor used for this source {@link GlobalKTable},
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param keySerde           key serde used to send key-value pairs,
+     *                           if not specified the default key serde defined in the configuration will be used
+     * @param valSerde           value serde used to send key-value pairs,
+     *                           if not specified the default value serde defined in the configuration will be used
+     * @param topic              the topic name; cannot be {@code null}
+     * @param storeSupplier      user defined state store supplier. Cannot be {@code null}.
+     * @return a {@link GlobalKTable} for the specified topic
+     */
+    @SuppressWarnings("unchecked")
     private <K, V> GlobalKTable<K, V> doGlobalTable(final Serde<K> keySerde,
                                                     final Serde<V> valSerde,
+                                                    final TimestampExtractor timestampExtractor,
                                                     final String topic,
                                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
@@ -805,7 +1135,7 @@ public class KStreamBuilder extends TopologyBuilder {
         final Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
         final Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
 
-        addGlobalStore(storeSupplier, sourceName, keyDeserializer, valueDeserializer, topic, processorName, tableSource);
+        addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, tableSource);
         return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeSupplier.name()));
     }
 
@@ -834,13 +1164,13 @@ public class KStreamBuilder extends TopologyBuilder {
                                                  final Serde<V> valSerde,
                                                  final String topic) {
 
-        return globalTable(keySerde, valSerde, topic, (String) null);
+        return globalTable(keySerde, valSerde, null, topic, (String) null);
     }
 
     /**
      * Create a new instance of {@link KStream} by merging the given {@link KStream}s.
      * <p>
-     * There are nor ordering guaranteed for records from different {@link KStream}s.
+     * There is no ordering guarantee for records from different {@link KStream}s.
      *
      * @param streams the {@link KStream}s to be merged
      * @return a {@link KStream} containing all records of the given streams

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
index 51c156a..f5e0e1d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
@@ -28,6 +28,7 @@ public interface TimestampExtractor {
     /**
      * Extracts a timestamp from a record. The timestamp must be positive to be considered a valid timestamp.
      * Returning a negative timestamp will cause the record not to be processed but rather silently skipped.
+     * The timestamp extractor implementation must be stateless.
      * <p>
      * The extracted timestamp MUST represent the milliseconds since midnight, January 1, 1970 UTC.
      * <p>


Mime
View raw message