kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/3] kafka git commit: KAFKA-4144: Allow per stream/table timestamp extractor
Date Sat, 13 May 2017 04:39:08 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 81f0ff9..08839cd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -176,13 +177,20 @@ public class TopologyBuilder {
         private final Pattern pattern;
         private final Deserializer<?> keyDeserializer;
         private final Deserializer<?> valDeserializer;
-
-        private SourceNodeFactory(String name, String[] topics, Pattern pattern, Deserializer<?> keyDeserializer, Deserializer<?> valDeserializer) {
+        private final TimestampExtractor timestampExtractor;
+
+        private SourceNodeFactory(final String name,
+                                  final String[] topics,
+                                  final Pattern pattern,
+                                  final TimestampExtractor timestampExtractor,
+                                  final Deserializer<?> keyDeserializer,
+                                  final Deserializer<?> valDeserializer) {
             super(name);
             this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<String>();
             this.pattern = pattern;
             this.keyDeserializer = keyDeserializer;
             this.valDeserializer = valDeserializer;
+            this.timestampExtractor = timestampExtractor;
         }
 
         List<String> getTopics(Collection<String> subscribedTopics) {
@@ -190,7 +198,7 @@ public class TopologyBuilder {
             // yet and hence the map from source node to topics is stale, in this case we put the pattern as a place holder;
             // this should only happen for debugging since during runtime this function should always be called after the metadata has updated.
             if (subscribedTopics.isEmpty())
-                return Collections.singletonList("Pattern[" + pattern + "]");
+                return Collections.singletonList("" + pattern + "");
 
             List<String> matchedTopics = new ArrayList<>();
             for (String update : subscribedTopics) {
@@ -218,9 +226,9 @@ public class TopologyBuilder {
             // yet and hence the map from source node to topics is stale, in this case we put the pattern as a place holder;
             // this should only happen for debugging since during runtime this function should always be called after the metadata has updated.
             if (sourceTopics == null)
-                return new SourceNode<>(name, Collections.singletonList("Pattern[" + pattern + "]"), keyDeserializer, valDeserializer);
+                return new SourceNode<>(name, Collections.singletonList("" + pattern + ""), timestampExtractor, keyDeserializer, valDeserializer);
             else
-                return new SourceNode<>(name, maybeDecorateInternalSourceTopics(sourceTopics), keyDeserializer, valDeserializer);
+                return new SourceNode<>(name, maybeDecorateInternalSourceTopics(sourceTopics), timestampExtractor, keyDeserializer, valDeserializer);
         }
 
         private boolean isMatch(String topic) {
@@ -325,9 +333,10 @@ public class TopologyBuilder {
 
     /**
      * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
-     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and
-     * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
      *
      * @param name the unique name of the source used to reference this node when
      * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
@@ -335,14 +344,15 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      */
     public synchronized final TopologyBuilder addSource(final String name, final String... topics) {
-        return addSource(null, name, null, null, topics);
+        return addSource(null, name, null, null, null, topics);
     }
 
     /**
      * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
-     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and
-     * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
      *
      * @param offsetReset the auto offset reset policy to use for this source if no committed offsets found; acceptable values earliest or latest
      * @param name the unique name of the source used to reference this node when
@@ -350,33 +360,72 @@ public class TopologyBuilder {
      * @param topics the name of one or more Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
      */
-    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name,  final String... topics) {
-        return addSource(offsetReset, name, null, null, topics);
+
+    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final String... topics) {
+        return addSource(offsetReset, name, null, null, null, topics);
+    }
+
+    /**
+     * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
+     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
+     *
+     * @param timestampExtractor the stateless timestamp extractor used for this source,
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param topics             the name of one or more Kafka topics that this source is to consume
+     * @return this builder instance so methods can be chained together; never null
+     */
+    public synchronized final TopologyBuilder addSource(final TimestampExtractor timestampExtractor, final String name, final String... topics) {
+        return addSource(null, name, timestampExtractor, null, null, topics);
     }
 
+    /**
+     * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes.
+     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
+     *
+     * @param offsetReset        the auto offset reset policy to use for this source if no committed offsets found;
+     *                           acceptable values earliest or latest
+     * @param timestampExtractor the stateless timestamp extractor used for this source,
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param topics             the name of one or more Kafka topics that this source is to consume
+     * @return this builder instance so methods can be chained together; never null
+     */
+    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final String name, final String... topics) {
+        return addSource(offsetReset, name, timestampExtractor, null, null, topics);
+    }
 
     /**
      * Add a new source that consumes from topics matching the given pattern
      * and forward the records to child processor and/or sink nodes.
-     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and
-     * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
      *
      * @param name the unique name of the source used to reference this node when
      * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
      * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
      */
+
     public synchronized final TopologyBuilder addSource(final String name, final Pattern topicPattern) {
-        return addSource(null, name, null, null, topicPattern);
+        return addSource(null, name, null, null, null, topicPattern);
     }
 
     /**
      * Add a new source that consumes from topics matching the given pattern
      * and forward the records to child processor and/or sink nodes.
-     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and
-     * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
      *
      * @param offsetReset the auto offset reset policy value for this source if no committed offsets found; acceptable values earliest or latest.
      * @param name the unique name of the source used to reference this node when
@@ -384,50 +433,97 @@ public class TopologyBuilder {
      * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
      */
-    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name,  final Pattern topicPattern) {
-        return addSource(offsetReset, name, null, null, topicPattern);
+
+    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final Pattern topicPattern) {
+        return addSource(offsetReset, name, null, null, null, topicPattern);
+    }
+
+
+    /**
+     * Add a new source that consumes from topics matching the given pattern
+     * and forward the records to child processor and/or sink nodes.
+     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
+     *
+     * @param timestampExtractor the stateless timestamp extractor used for this source,
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
+     * @return this builder instance so methods can be chained together; never null
+     */
+    public synchronized final TopologyBuilder addSource(final TimestampExtractor timestampExtractor, final String name, final   Pattern topicPattern) {
+        return addSource(null, name, timestampExtractor, null, null, topicPattern);
+    }
+
+
+    /**
+     * Add a new source that consumes from topics matching the given pattern
+     * and forward the records to child processor and/or sink nodes.
+     * The source will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key deserializer} and
+     * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
+     *
+     * @param offsetReset        the auto offset reset policy value for this source if no committed offsets found;
+     *                           acceptable values earliest or latest.
+     * @param timestampExtractor the stateless timestamp extractor used for this source,
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
+     * @return this builder instance so methods can be chained together; never null
+     */
+    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final String name, final Pattern topicPattern) {
+        return addSource(offsetReset, name, timestampExtractor, null, null, topicPattern);
     }
 
 
     /**
      * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
      * The source will use the specified key and value deserializers.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
      *
-     * @param name the unique name of the source used to reference this node when
-     * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
-     * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming records; may be null if the source
-     * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} specified in the
-     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
-     * @param valDeserializer the {@link Deserializer value deserializer} used when consuming records; may be null if the source
-     * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
-     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
-     * @param topics the name of one or more Kafka topics that this source is to consume
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
+     * @param keyDeserializer    key deserializer used to read this source, if not specified the default
+     *                           key deserializer defined in the configs will be used
+     * @param valDeserializer    value deserializer used to read this source,
+     *                           if not specified the default value deserializer defined in the configs will be used
+     * @param topics             the name of one or more Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source
      */
-    public synchronized final TopologyBuilder addSource(final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final String... topics) {
-        return addSource(null, name, keyDeserializer, valDeserializer, topics);
 
+    public synchronized final TopologyBuilder addSource(final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final String... topics) {
+        return addSource(null, name, null, keyDeserializer, valDeserializer, topics);
     }
 
     /**
      * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
      * The source will use the specified key and value deserializers.
      *
-     * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found; acceptable values are earliest or latest.
-     * @param name the unique name of the source used to reference this node when
-     * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
-     * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming records; may be null if the source
-     * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} specified in the
-     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
-     * @param valDeserializer the {@link Deserializer value deserializer} used when consuming records; may be null if the source
-     * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
-     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
-     * @param topics the name of one or more Kafka topics that this source is to consume
+     * @param offsetReset        the auto offset reset policy to use for this stream if no committed offsets found;
+     *                           acceptable values are earliest or latest.
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param timestampExtractor the stateless timestamp extractor used for this source,
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param keyDeserializer    key deserializer used to read this source, if not specified the default
+     *                           key deserializer defined in the configs will be used
+     * @param valDeserializer    value deserializer used to read this source,
+     *                           if not specified the default value deserializer defined in the configs will be used
+     * @param topics             the name of one or more Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source
      */
-    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final String... topics) {
+
+    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
+                                                        final String name,
+                                                        final TimestampExtractor timestampExtractor,
+                                                        final Deserializer keyDeserializer,
+                                                        final Deserializer valDeserializer,
+                                                        final String... topics) {
         if (topics.length == 0) {
             throw new TopologyBuilderException("You must provide at least one topic");
         }
@@ -442,7 +538,7 @@ public class TopologyBuilder {
             sourceTopicNames.add(topic);
         }
 
-        nodeFactories.put(name, new SourceNodeFactory(name, topics, null, keyDeserializer, valDeserializer));
+        nodeFactories.put(name, new SourceNodeFactory(name, topics, null, timestampExtractor, keyDeserializer, valDeserializer));
         nodeToSourceTopics.put(name, Arrays.asList(topics));
         nodeGrouper.add(name);
 
@@ -460,9 +556,45 @@ public class TopologyBuilder {
      * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will
      * receive all records forwarded from the {@link SourceNode}. This
      * {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     *
+     * @param storeSupplier         user defined state store supplier
+     * @param sourceName            name of the {@link SourceNode} that will be automatically added
+     * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
+     * @param valueDeserializer     the {@link Deserializer} to deserialize values with
+     * @param topic                 the topic to source the data from
+     * @param processorName         the name of the {@link ProcessorSupplier}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @return this builder instance so methods can be chained together; never null
+     */
+    public synchronized TopologyBuilder addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+                                                       final String sourceName,
+                                                       final Deserializer keyDeserializer,
+                                                       final Deserializer valueDeserializer,
+                                                       final String topic,
+                                                       final String processorName,
+                                                       final ProcessorSupplier stateUpdateSupplier) {
+        return addGlobalStore(storeSupplier, sourceName, null, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
+    }
+
+
+
+    /**
+     * Adds a global {@link StateStore} to the topology. The {@link StateStore} sources its data
+     * from all partitions of the provided input topic. There will be exactly one instance of this
+     * {@link StateStore} per Kafka Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving
+     * from the partitions of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an {@link ProcessorNode} that will
+     * receive all records forwarded from the {@link SourceNode}. This
+     * {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
      *
      * @param storeSupplier         user defined state store supplier
      * @param sourceName            name of the {@link SourceNode} that will be automatically added
+     * @param timestampExtractor    the stateless timestamp extractor used for this source,
+     *                              if not specified the default extractor defined in the configs will be used
      * @param keyDeserializer       the {@link Deserializer} to deserialize keys with
      * @param valueDeserializer     the {@link Deserializer} to deserialize values with
      * @param topic                 the topic to source the data from
@@ -472,6 +604,7 @@ public class TopologyBuilder {
      */
     public synchronized TopologyBuilder addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
                                                        final String sourceName,
+                                                       final TimestampExtractor timestampExtractor,
                                                        final Deserializer keyDeserializer,
                                                        final Deserializer valueDeserializer,
                                                        final String topic,
@@ -502,7 +635,7 @@ public class TopologyBuilder {
 
         globalTopics.add(topic);
         final String[] topics = {topic};
-        nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, topics, null, keyDeserializer, valueDeserializer));
+        nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, topics, null, timestampExtractor, keyDeserializer, valueDeserializer));
         nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
         nodeGrouper.add(sourceName);
 
@@ -537,23 +670,21 @@ public class TopologyBuilder {
      * The source will use the specified key and value deserializers. The provided
      * de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
      * topics that share the same key-value data format.
+     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
      *
-     * @param name the unique name of the source used to reference this node when
-     * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
-     * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming records; may be null if the source
-     * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} specified in the
-     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
-     * @param valDeserializer the {@link Deserializer value deserializer} used when consuming records; may be null if the source
-     * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
-     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
-     * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
+     * @param keyDeserializer    key deserializer used to read this source, if not specified the default
+     *                           key deserializer defined in the configs will be used
+     * @param valDeserializer    value deserializer used to read this source,
+     *                           if not specified the default value deserializer defined in the configs will be used
+     * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name
      */
 
     public synchronized final TopologyBuilder addSource(final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final Pattern topicPattern) {
-        return addSource(null, name,  keyDeserializer, valDeserializer, topicPattern);
-
+        return addSource(null, name, null, keyDeserializer, valDeserializer, topicPattern);
     }
 
     /**
@@ -563,21 +694,27 @@ public class TopologyBuilder {
      * de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
      * topics that share the same key-value data format.
      *
-     * @param offsetReset  the auto offset reset policy to use for this stream if no committed offsets found; acceptable values are earliest or latest
-     * @param name the unique name of the source used to reference this node when
-     * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
-     * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming records; may be null if the source
-     * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} specified in the
-     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
-     * @param valDeserializer the {@link Deserializer value deserializer} used when consuming records; may be null if the source
-     * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
-     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
-     * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume
+     * @param offsetReset        the auto offset reset policy to use for this stream if no committed offsets found;
+     *                           acceptable values are earliest or latest
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
+     * @param timestampExtractor the stateless timestamp extractor used for this source,
+     *                           if not specified the default extractor defined in the configs will be used
+     * @param keyDeserializer    key deserializer used to read this source, if not specified the default
+     *                           key deserializer defined in the configs will be used
+     * @param valDeserializer    value deserializer used to read this source,
+     *                           if not specified the default value deserializer defined in the configs will be used
+     * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name
      */
 
-    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name,  final Deserializer keyDeserializer, final Deserializer valDeserializer, final Pattern topicPattern) {
+    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
+                                                        final String name,
+                                                        final TimestampExtractor timestampExtractor,
+                                                        final Deserializer keyDeserializer,
+                                                        final Deserializer valDeserializer,
+                                                        final Pattern topicPattern) {
         Objects.requireNonNull(topicPattern, "topicPattern can't be null");
         Objects.requireNonNull(name, "name can't be null");
 
@@ -593,17 +730,47 @@ public class TopologyBuilder {
 
         maybeAddToResetList(earliestResetPatterns, latestResetPatterns, offsetReset, topicPattern);
 
-        nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, keyDeserializer, valDeserializer));
+        nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, timestampExtractor, keyDeserializer, valDeserializer));
         nodeToSourcePatterns.put(name, topicPattern);
         nodeGrouper.add(name);
 
         return this;
     }
 
+
+    /**
+     * Add a new source that consumes from topics matching the given pattern
+     * and forwards the records to child processor and/or sink nodes.
+     * The source will use the specified key and value deserializers. The provided
+     * de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
+     * topics that share the same key-value data format.
+     *
+     * @param offsetReset        the auto offset reset policy to use for this stream if no committed offsets found;
+     *                           acceptable values are earliest or latest
+     * @param name               the unique name of the source used to reference this node when
+     *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}
+     * @param keyDeserializer    key deserializer used to read this source, if not specified the default
+     *                           key deserializer defined in the configs will be used
+     * @param valDeserializer    value deserializer used to read this source,
+     *                           if not specified the default value deserializer defined in the configs will be used
+     * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
+     * @return this builder instance so methods can be chained together; never null
+     * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name
+     */
+
+    public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
+                                                        final String name,
+                                                        final Deserializer keyDeserializer,
+                                                        final Deserializer valDeserializer,
+                                                        final Pattern topicPattern) {
+        return addSource(offsetReset, name, null, keyDeserializer, valDeserializer, topicPattern);
+    }
+
+
     /**
      * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
-     * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} and
-     * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
+     * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
+     * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
      *
      * @param name the unique name of the sink
@@ -622,8 +789,8 @@ public class TopologyBuilder {
     /**
      * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic, using
      * the supplied partitioner.
-     * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} and
-     * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
+     * The sink will use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
+     * {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
      * <p>
      * The sink will also use the specified {@link StreamPartitioner} to determine how records are distributed among
@@ -653,10 +820,10 @@ public class TopologyBuilder {
      * @param name the unique name of the sink
      * @param topic the name of the Kafka topic to which this sink should write its records
      * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink
-     * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
+     * should use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param valSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink
-     * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
+     * should use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
      * and write to its topic
@@ -676,10 +843,10 @@ public class TopologyBuilder {
      * @param name the unique name of the sink
      * @param topic the name of the Kafka topic to which this sink should write its records
      * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink
-     * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
+     * should use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param valSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink
-     * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
+     * should use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
      * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
      * @param partitioner the function that should be used to determine the partition for each record processed by the sink
      * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
@@ -1236,7 +1403,7 @@ public class TopologyBuilder {
         final WindowStoreSupplier windowStoreSupplier = (WindowStoreSupplier) supplier;
         final InternalTopicConfig config = new InternalTopicConfig(name,
                                                                    Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact,
-                                                                               InternalTopicConfig.CleanupPolicy.delete),
+                                                                           InternalTopicConfig.CleanupPolicy.delete),
                                                                    supplier.logConfig());
         config.setRetentionMs(windowStoreSupplier.retentionPeriod());
         return config;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 4a1cffb..bae6f22 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.processor.TimestampExtractor;
 
 import java.util.Collections;
 import java.util.Comparator;
@@ -36,8 +35,6 @@ public class PartitionGroup {
 
     private final PriorityQueue<RecordQueue> queuesByTime;
 
-    private final TimestampExtractor timestampExtractor;
-
     public static class RecordInfo {
         public RecordQueue queue;
 
@@ -57,7 +54,7 @@ public class PartitionGroup {
     // since task is thread-safe, we do not need to synchronize on local variables
     private int totalBuffered;
 
-    public PartitionGroup(Map<TopicPartition, RecordQueue> partitionQueues, TimestampExtractor timestampExtractor) {
+    public PartitionGroup(Map<TopicPartition, RecordQueue> partitionQueues) {
         this.queuesByTime = new PriorityQueue<>(partitionQueues.size(), new Comparator<RecordQueue>() {
 
             @Override
@@ -73,8 +70,6 @@ public class PartitionGroup {
 
         this.partitionQueues = partitionQueues;
 
-        this.timestampExtractor = timestampExtractor;
-
         this.totalBuffered = 0;
     }
 
@@ -176,4 +171,4 @@ public class PartitionGroup {
             queue.clear();
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 9f46a2c..486a2ea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.streams.kstream.internals.ChangedDeserializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.TimestampExtractor;
 
 import java.util.List;
 
@@ -29,14 +30,20 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
     private ProcessorContext context;
     private Deserializer<K> keyDeserializer;
     private Deserializer<V> valDeserializer;
+    private final TimestampExtractor timestampExtractor;
 
-    public SourceNode(String name, List<String> topics, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer) {
+    public SourceNode(String name, List<String> topics, TimestampExtractor timestampExtractor, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer) {
         super(name);
         this.topics = topics;
+        this.timestampExtractor = timestampExtractor;
         this.keyDeserializer = keyDeserializer;
         this.valDeserializer = valDeserializer;
     }
 
+    public SourceNode(String name, List<String> topics, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer) {
+        this(name, topics, null, keyDeserializer, valDeserializer);
+    }
+
     K deserializeKey(String topic, byte[] data) {
         return keyDeserializer.deserialize(topic, data);
     }
@@ -93,4 +100,7 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
         return sb.toString();
     }
 
-}
+    public TimestampExtractor getTimestampExtractor() {
+        return timestampExtractor;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 4a49f9d..568a5b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -115,15 +115,15 @@ public class StreamTask extends AbstractTask implements Punctuator {
         // to corresponding source nodes in the processor topology
         final Map<TopicPartition, RecordQueue> partitionQueues = new HashMap<>();
 
-        final TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
-
+        final TimestampExtractor defaultTimestampExtractor  = config.defaultTimestampExtractor();
         for (final TopicPartition partition : partitions) {
             final SourceNode source = topology.source(partition.topic());
-            final RecordQueue queue = new RecordQueue(partition, source, timestampExtractor);
+            final TimestampExtractor sourceTimestampExtractor = source.getTimestampExtractor() != null ? source.getTimestampExtractor() : defaultTimestampExtractor;
+            final RecordQueue queue = new RecordQueue(partition, source, sourceTimestampExtractor);
             partitionQueues.put(partition, queue);
         }
 
-        partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor);
+        partitionGroup = new PartitionGroup(partitionQueues);
 
         // initialize the consumed offset cache
         consumedOffsets = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index b229af9..cfd702e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -50,8 +50,8 @@ public class StreamsConfigTest {
     public void setUp() {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-config-test");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         props.put("DUMMY", "dummy");
         props.put("key.deserializer.encoding", "UTF8");
         props.put("value.deserializer.encoding", "UTF-16");
@@ -95,11 +95,11 @@ public class StreamsConfigTest {
 
         serializer.configure(serializerConfigs, true);
         assertEquals("Should get the original string after serialization and deserialization with the configured encoding",
-                str, streamsConfig.keySerde().deserializer().deserialize(topic, serializer.serialize(topic, str)));
+                str, streamsConfig.defaultKeySerde().deserializer().deserialize(topic, serializer.serialize(topic, str)));
 
         serializer.configure(serializerConfigs, false);
         assertEquals("Should get the original string after serialization and deserialization with the configured encoding",
-                str, streamsConfig.valueSerde().deserializer().deserialize(topic, serializer.serialize(topic, str)));
+                str, streamsConfig.defaultValueSerde().deserializer().deserialize(topic, serializer.serialize(topic, str)));
     }
 
     @Test
@@ -204,16 +204,16 @@ public class StreamsConfigTest {
 
     @Test(expected = StreamsException.class)
     public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() throws Exception {
-        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        streamsConfig.keySerde();
+        streamsConfig.defaultKeySerde();
     }
 
     @Test(expected = StreamsException.class)
     public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() throws Exception {
-        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        streamsConfig.valueSerde();
+        streamsConfig.defaultValueSerde();
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index 25dea92..8593317 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -102,7 +102,7 @@ public class FanoutIntegrationTest {
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "fanout-integration-test");
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 09eceab..e5ed3d8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -100,7 +100,7 @@ public class GlobalKTableIntegrationTest {
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
-        globalTable = builder.globalTable(Serdes.Long(), Serdes.String(), globalOne, globalStore);
+        globalTable = builder.globalTable(Serdes.Long(), Serdes.String(), null, globalOne, globalStore);
         stream = builder.stream(Serdes.String(), Serdes.Long(), inputStream);
         table = builder.table(Serdes.String(), Serdes.Long(), inputTable, "table");
         foreachAction = new ForeachAction<String, String>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index a6bb044..29cdc1b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -86,8 +86,8 @@ public class InternalTopicIntegrationTest {
         streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
@@ -136,8 +136,8 @@ public class InternalTopicIntegrationTest {
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "compact-topics-integration-test");
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         final KStreamBuilder builder = new KStreamBuilder();

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index 8047611..040c784 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -126,8 +126,8 @@ public class JoinIntegrationTest {
         STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        STREAMS_CONFIG.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
-        STREAMS_CONFIG.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
         STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index 1806321..9b5b428 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -86,8 +86,8 @@ public class KStreamKTableJoinIntegrationTest {
         streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-integration-test-" + testNo);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index 5fa7e49..c77cf3b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -78,8 +78,8 @@ public class KTableKTableJoinIntegrationTest {
 
         streamsConfig = new Properties();
         streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index ab23af0..4b5ae17 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -137,8 +137,8 @@ public class QueryableStateIntegrationTest {
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("qs-test").getPath());
-        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
         // override this to make the rebalances happen quickly
         streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 775ac8d..626f38d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -265,8 +265,8 @@ public class ResetIntegrationTest {
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + testNo);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
-        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index e7cb669..82e5b23 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -25,10 +25,12 @@ import org.apache.kafka.streams.kstream.internals.KTableImpl;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.MockTimestampExtractor;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -44,6 +46,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
 
 public class KStreamBuilderTest {
 
@@ -396,4 +400,54 @@ public class KStreamBuilderTest {
         assertTrue(builder.latestResetTopicsPattern().matcher(topicTwo).matches());
         assertFalse(builder.earliestResetTopicsPattern().matcher(topicTwo).matches());
     }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void kStreamTimestampExtractorShouldBeNull() throws Exception {
+        builder.stream("topic");
+        final ProcessorTopology processorTopology = builder.build(null);
+        assertNull(processorTopology.source("topic").getTimestampExtractor());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldAddTimestampExtractorToStreamWithKeyValSerdePerSource() throws Exception {
+        builder.stream(new MockTimestampExtractor(), null, null, "topic");
+        final ProcessorTopology processorTopology = builder.build(null);
+        for (final SourceNode sourceNode: processorTopology.sources()) {
+            assertThat(sourceNode.getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldAddTimestampExtractorToStreamWithOffsetResetPerSource() throws Exception {
+        builder.stream(null, new MockTimestampExtractor(), null, null, "topic");
+        final ProcessorTopology processorTopology = builder.build(null);
+        assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldAddTimestampExtractorToTablePerSource() throws Exception {
+        builder.table("topic", "store");
+        final ProcessorTopology processorTopology = builder.build(null);
+        assertNull(processorTopology.source("topic").getTimestampExtractor());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void kTableTimestampExtractorShouldBeNull() throws Exception {
+        builder.table("topic", "store");
+        final ProcessorTopology processorTopology = builder.build(null);
+        assertNull(processorTopology.source("topic").getTimestampExtractor());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldAddTimestampExtractorToTableWithKeyValSerdePerSource() throws Exception {
+        builder.table(null, new MockTimestampExtractor(), null, null, "topic", "store");
+        final ProcessorTopology processorTopology = builder.build(null);
+        assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
index 2526154..69b328d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
@@ -52,7 +52,7 @@ public class GlobalKTableJoinsTest {
     @Before
     public void setUp() throws Exception {
         stateDir = TestUtils.tempDirectory();
-        global = builder.globalTable(Serdes.String(), Serdes.String(), globalTopic, "global-store");
+        global = builder.globalTable(Serdes.String(), Serdes.String(), null, globalTopic, "global-store");
         stream = builder.stream(Serdes.String(), Serdes.String(), streamTopic);
         keyValueMapper = new KeyValueMapper<String, String, String>() {
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 23b0389..d214fb1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -279,14 +279,14 @@ public class KStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullMapperOnJoinWithGlobalTable() throws Exception {
-        testStream.join(builder.globalTable(Serdes.String(), Serdes.String(), "global", "global"),
+        testStream.join(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"),
                         null,
                         MockValueJoiner.TOSTRING_JOINER);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullJoinerOnJoinWithGlobalTable() throws Exception {
-        testStream.join(builder.globalTable(Serdes.String(), Serdes.String(), "global", "global"),
+        testStream.join(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"),
                         MockKeyValueMapper.<String, String>SelectValueMapper(),
                         null);
     }
@@ -300,14 +300,14 @@ public class KStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullMapperOnLeftJoinWithGlobalTable() throws Exception {
-        testStream.leftJoin(builder.globalTable(Serdes.String(), Serdes.String(), "global", "global"),
+        testStream.leftJoin(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"),
                         null,
                         MockValueJoiner.TOSTRING_JOINER);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullJoinerOnLeftJoinWithGlobalTable() throws Exception {
-        testStream.leftJoin(builder.globalTable(Serdes.String(), Serdes.String(), "global", "global"),
+        testStream.leftJoin(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"),
                         MockKeyValueMapper.<String, String>SelectValueMapper(),
                         null);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 9947870..6d6ec1d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -214,8 +214,8 @@ public class SimpleBenchmark {
         // the socket buffer needs to be large, especially when running in AWS with
         // high latency. if running locally the default is fine.
         props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
-        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
-        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());
         props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
         props.put(StreamsConfig.POLL_MS_CONFIG, POLL_MS);
         return props;

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 41ab803..0b60b00 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
+import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.ProcessorTopologyTestDriver;
 import org.junit.Test;
 
@@ -53,6 +54,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
 
 public class TopologyBuilderTest {
 
@@ -118,7 +121,7 @@ public class TopologyBuilderTest {
         final Serde<String> stringSerde = Serdes.String();
 
         try {
-            builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", stringSerde.deserializer(), stringSerde.deserializer(), new String[]{});
+            builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), new String[]{});
             fail("Should throw TopologyBuilderException with no topics");
         } catch (TopologyBuilderException tpe) {
             //no-op
@@ -130,9 +133,9 @@ public class TopologyBuilderTest {
         final TopologyBuilder builder = new TopologyBuilder();
         final Serde<String> stringSerde = Serdes.String();
 
-        builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", stringSerde.deserializer(), stringSerde.deserializer(), "topic-1");
+        builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-1");
         try {
-            builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source", stringSerde.deserializer(), stringSerde.deserializer(), "topic-2");
+            builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-2");
             fail("Should throw TopologyBuilderException for duplicate source name");
         } catch (TopologyBuilderException tpe) {
             //no-op
@@ -701,6 +704,61 @@ public class TopologyBuilderTest {
 
     @SuppressWarnings("unchecked")
     @Test
+    public void shouldAddTimestampExtractorPerSource() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource(new MockTimestampExtractor(), "source", "topic");
+        final ProcessorTopology processorTopology = builder.build(null);
+        assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldAddTimestampExtractorWithOffsetResetPerSource() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource(null, new MockTimestampExtractor(), "source", "topic");
+        final ProcessorTopology processorTopology = builder.build(null);
+        assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldAddTimestampExtractorWithPatternPerSource() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        final Pattern pattern = Pattern.compile("t.*");
+        builder.addSource(new MockTimestampExtractor(), "source", pattern);
+        final ProcessorTopology processorTopology = builder.build(null);
+        assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldAddTimestampExtractorWithOffsetResetAndPatternPerSource() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        final Pattern pattern = Pattern.compile("t.*");
+        builder.addSource(null, new MockTimestampExtractor(), "source", pattern);
+        final ProcessorTopology processorTopology = builder.build(null);
+        assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesPerSource() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource(null, "source", new MockTimestampExtractor(), null, null, "topic");
+        final ProcessorTopology processorTopology = builder.build(null);
+        assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesAndPatternPerSource() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        final Pattern pattern = Pattern.compile("t.*");
+        builder.addSource(null, "source", new MockTimestampExtractor(), null, null, pattern);
+        final ProcessorTopology processorTopology = builder.build(null);
+        assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
+    }
+
     public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception {
 
         final TopologyBuilder topologyBuilder = new TopologyBuilder()

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 031c732..bf3259e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -87,7 +87,7 @@ public class InternalTopicManagerTest {
                 setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "Internal-Topic-ManagerTest");
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, userEndPoint);
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
-                setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
+                setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 447843f..9f54b4a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -51,7 +51,7 @@ public class PartitionGroupTest {
             put(partition1, queue1);
             put(partition2, queue2);
         }
-    }, timestampExtractor);
+    });
 
     @Test
     public void testTimeTracking() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index d19c91a..369c47f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -78,9 +78,9 @@ public class ProcessorTopologyTest {
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "processor-topology-test");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
         props.setProperty(StreamsConfig.STATE_DIR_CONFIG, localState.getAbsolutePath());
-        props.setProperty(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
         this.config = new StreamsConfig(props);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 3d4411c..a358be5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -119,7 +119,7 @@ public class StandbyTaskTest {
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
                 setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
-                setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
+                setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 4cc7b92..9937ad4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -111,7 +111,7 @@ public class StreamPartitionAssignorTest {
                 setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-partition-assignor-test");
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, userEndPoint);
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
-                setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
+                setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
             }
         };
     }


Mime
View raw message