kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: MINOR: add suppress warnings annotations in Streams API
Date Wed, 04 Oct 2017 21:42:10 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 51c652c40 -> 713a67fdd


http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index ae3808e..cbbe848 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -183,23 +183,27 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired);
     }
-    
+
+    @SuppressWarnings("deprecation")
     @Override
     public void print() {
         print(defaultKeyValueMapper, null, null, this.name);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void print(final String label) {
         print(defaultKeyValueMapper, null, null, label);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void print(final Serde<K> keySerde,
                       final Serde<V> valSerde) {
         print(defaultKeyValueMapper, keySerde, valSerde, this.name);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void print(final Serde<K> keySerde,
                       final Serde<V> valSerde,
@@ -207,17 +211,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         print(defaultKeyValueMapper, keySerde, valSerde, label);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void print(final KeyValueMapper<? super K, ? super V, String> mapper) {
         print(mapper, null, null, this.name);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void print(final KeyValueMapper<? super K, ? super V, String> mapper,
                       final String label) {
         print(mapper, null, null, label);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void print(final KeyValueMapper<? super K, ? super V, String> mapper,
                       final Serde<K> keySerde,
@@ -225,6 +232,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         print(mapper, keySerde, valSerde, this.name);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void print(final KeyValueMapper<? super K, ? super V, String> mapper,
                       final Serde<K> keySerde,
@@ -243,17 +251,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         builder.internalTopologyBuilder.addProcessor(name, printedInternal.build(this.name), this.name);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void writeAsText(final String filePath) {
         writeAsText(filePath, this.name, null, null, defaultKeyValueMapper);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void writeAsText(final String filePath,
                             final String label) {
         writeAsText(filePath, label, null, null, defaultKeyValueMapper);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void writeAsText(final String filePath,
                             final Serde<K> keySerde,
@@ -261,6 +272,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         writeAsText(filePath, this.name, keySerde, valSerde, defaultKeyValueMapper);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void writeAsText(final String filePath,
                             final String label,
@@ -269,12 +281,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         writeAsText(filePath, label, keySerde, valSerde, defaultKeyValueMapper);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void writeAsText(final String filePath,
                             final KeyValueMapper<? super K, ? super V, String> mapper) {
         writeAsText(filePath, this.name, null, null, mapper);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void writeAsText(final String filePath,
                             final String label,
@@ -282,6 +296,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         writeAsText(filePath, label, null, null, mapper);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void writeAsText(final String filePath,
                             final Serde<K> keySerde,
@@ -290,6 +305,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         writeAsText(filePath, this.name, keySerde, valSerde, mapper);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void writeAsText(final String filePath,
                             final String label,
@@ -368,6 +384,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         return new KStreamImpl<>(builder, name, allSourceNodes, requireRepartitioning);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KStream<K, V> through(final Serde<K> keySerde,
                                  final Serde<V> valSerde,
@@ -404,6 +421,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         return new KStreamImpl<>(builder, name, sourceNodes, repartitionRequired);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KStream<K, V> through(final Serde<K> keySerde,
                                  final Serde<V> valSerde,
@@ -411,6 +429,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         return through(topic, Produced.with(keySerde, valSerde));
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KStream<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
                                  final String topic) {
@@ -427,12 +446,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         to(topic, Produced.<K, V>with(null, null, null));
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void to(final StreamPartitioner<? super K, ? super V> partitioner,
                    final String topic) {
         to(topic, Produced.streamPartitioner(partitioner));
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void to(final Serde<K> keySerde,
                    final Serde<V> valSerde,
@@ -440,7 +461,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         to(topic, Produced.with(keySerde, valSerde));
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "deprecation"})
     @Override
     public void to(final Serde<K> keySerde,
                    final Serde<V> valSerde,
@@ -459,6 +480,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     }
 
+    @SuppressWarnings("unchecked")
     private void to(final String topic, final ProducedInternal<K, V> produced) {
         final String name = builder.newName(SINK_NAME);
         final Serializer<K> keySerializer = produced.keySerde() == null ? null : produced.keySerde().serializer();
@@ -513,6 +535,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         }
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <V1, R> KStream<K, R> join(final KStream<K, V1> other,
                                       final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
@@ -540,6 +563,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                              new KStreamImplJoin(false, false));
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <V1, R> KStream<K, R> outerJoin(final KStream<K, V1> other,
                                            final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
@@ -641,6 +665,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         return sourceName;
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <V1, R> KStream<K, R> leftJoin(final KStream<K, V1> other,
                                           final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
@@ -696,6 +721,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         }
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <V1, R> KStream<K, R> join(final KTable<K, V1> other,
                                       final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
@@ -732,6 +758,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         return new KStreamImpl<>(builder, name, sourceNodes, false);
     }
 
+    @SuppressWarnings("unchecked")
     private <V1, R> KStream<K, R> doStreamTableJoin(final KTable<K, V1> other,
                                                     final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                                     final boolean leftJoin) {
@@ -768,6 +795,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         }
     }
 
+    @SuppressWarnings("deprecation")
+    @Override
     public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other,
                                           final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                           final Serde<K> keySerde,
@@ -795,6 +824,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                         true);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <K1> KGroupedStream<K1, V> groupBy(final KeyValueMapper<? super K, ? super V, K1> selector,
                                               final Serde<K1> keySerde,
@@ -820,6 +850,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KGroupedStream<K, V> groupByKey(final Serde<K> keySerde,
                                            final Serde<V> valSerde) {
@@ -849,6 +880,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             this.rightOuter = rightOuter;
         }
 
+        @SuppressWarnings("unchecked")
         public <K1, R, V1, V2> KStream<K1, R> join(final KStream<K1, V1> lhs,
                                                    final KStream<K1, V2> other,
                                                    final ValueJoiner<? super V1, ? super V2, ? extends R> joiner,

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index b322415..4f26767 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -51,6 +51,7 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
 
         private WindowStore<K, V2> otherWindow;
 
+        @SuppressWarnings("unchecked")
         @Override
         public void init(ProcessorContext context) {
             super.init(context);

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index a42db0b..db8de1a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -33,7 +33,6 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -142,8 +141,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return this.queryableStoreName;
     }
 
+    @SuppressWarnings("deprecation")
     private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
-                                  final StateStoreSupplier<KeyValueStore> storeSupplier,
+                                  final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier,
                                   final boolean isFilterNot) {
         Objects.requireNonNull(predicate, "predicate can't be null");
         String name = builder.newName(FILTER_NAME);
@@ -196,19 +196,21 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return doFilter(predicate, new MaterializedInternal<>(materialized), false);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
                                final String queryableStoreName) {
-        StateStoreSupplier<KeyValueStore> storeSupplier = null;
+        org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier = null;
         if (queryableStoreName != null) {
             storeSupplier = keyValueStore(this.keySerde, this.valSerde, queryableStoreName);
         }
         return doFilter(predicate, storeSupplier, false);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate,
-                               final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                               final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return doFilter(predicate, storeSupplier, false);
     }
@@ -226,26 +228,29 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return doFilter(predicate, new MaterializedInternal<>(materialized), true);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
                                   final String queryableStoreName) {
-        StateStoreSupplier<KeyValueStore> storeSupplier = null;
+        org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier = null;
         if (queryableStoreName != null) {
             storeSupplier = keyValueStore(this.keySerde, this.valSerde, queryableStoreName);
         }
         return doFilter(predicate, storeSupplier, true);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate,
-                                  final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                  final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return doFilter(predicate, storeSupplier, true);
     }
 
+    @SuppressWarnings("deprecation")
     private <V1> KTable<K, V1> doMapValues(final ValueMapper<? super V, ? extends V1> mapper,
                                            final Serde<V1> valueSerde,
-                                           final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                           final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(mapper);
         String name = builder.newName(MAPVALUES_NAME);
         String internalStoreName = null;
@@ -284,21 +289,23 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, this.queryableStoreName, true);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper,
                                         final Serde<V1> valueSerde,
                                         final String queryableStoreName) {
-        StateStoreSupplier<KeyValueStore> storeSupplier = null;
+        org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier = null;
         if (queryableStoreName != null) {
             storeSupplier = keyValueStore(this.keySerde, valueSerde, queryableStoreName);
         }
         return doMapValues(mapper, valueSerde, storeSupplier);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public  <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper,
                                          final Serde<V1> valueSerde,
-                                         final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                         final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return doMapValues(mapper, valueSerde, storeSupplier);
     }
@@ -322,7 +329,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         print(keySerde, valSerde, this.name);
     }
 
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings({"unchecked", "deprecation"})
     @Override
     public void print(final Serde<K> keySerde,
                       final Serde<V> valSerde,
@@ -356,7 +363,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
     /**
      * @throws TopologyException if file is not found
      */
-    @SuppressWarnings("deprecation")
+    @SuppressWarnings({"unchecked", "deprecation"})
     @Override
     public void writeAsText(final String filePath,
                             final String label,
@@ -390,6 +397,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> through(final Serde<K> keySerde,
                                 final Serde<V> valSerde,
@@ -408,12 +416,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                                      queryableStoreName != null));
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> through(final Serde<K> keySerde,
                                 final Serde<V> valSerde,
                                 final StreamPartitioner<? super K, ? super V> partitioner,
                                 final String topic,
-                                final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         to(keySerde, valSerde, partitioner, topic);
 
@@ -421,6 +430,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return builder.table(topic, consumed, storeSupplier);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> through(final Serde<K> keySerde,
                                 final Serde<V> valSerde,
@@ -428,6 +438,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                                 final String topic) {
         return through(keySerde, valSerde, partitioner, topic, (String) null);
     }
+
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> through(final Serde<K> keySerde,
                                 final Serde<V> valSerde,
@@ -436,15 +448,17 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return through(keySerde, valSerde, null, topic, queryableStoreName);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> through(final Serde<K> keySerde,
                                 final Serde<V> valSerde,
                                 final String topic,
-                                final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return through(keySerde, valSerde, null, topic, storeSupplier);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> through(final Serde<K> keySerde,
                                 final Serde<V> valSerde,
@@ -452,6 +466,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return through(keySerde, valSerde, null, topic, (String) null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
                                 final String topic,
@@ -459,49 +474,57 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return through(null, null, partitioner, topic, queryableStoreName);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
                                 final String topic,
-                                final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return through(null, null, partitioner, topic, storeSupplier);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
                                 final String topic) {
         return through(null, null, partitioner, topic, (String) null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> through(final String topic,
                                 final String queryableStoreName) {
         return through(null, null, null, topic, queryableStoreName);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> through(final String topic,
-                                final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return through(null, null, null, topic, storeSupplier);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public KTable<K, V> through(final String topic) {
         return through(null, null, null, topic, (String) null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void to(final String topic) {
         to(null, null, null, topic);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void to(final StreamPartitioner<? super K, ? super V> partitioner,
                    final String topic) {
         to(null, null, partitioner, topic);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void to(final Serde<K> keySerde,
                    final Serde<V> valSerde,
@@ -509,6 +532,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         this.toStream().to(keySerde, valSerde, null, topic);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public void to(final Serde<K> keySerde,
                    final Serde<V> valSerde,
@@ -552,6 +576,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return doJoin(other, joiner, new MaterializedInternal<>(materialized), false, false);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <V1, R> KTable<K, R> join(final KTable<K, V1> other,
                                      final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
@@ -560,10 +585,11 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return doJoin(other, joiner, false, false, joinSerde, queryableStoreName);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <V1, R> KTable<K, R> join(final KTable<K, V1> other,
                                      final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
-                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                     final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return doJoin(other, joiner, false, false, storeSupplier);
     }
@@ -581,6 +607,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return doJoin(other, joiner, new MaterializedInternal<>(materialized), true, true);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other,
                                           final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
@@ -589,10 +616,11 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return doJoin(other, joiner, true, true, joinSerde, queryableStoreName);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other,
                                           final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
-                                          final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                          final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return doJoin(other, joiner, true, true, storeSupplier);
     }
@@ -614,6 +642,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
                       false);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other,
                                          final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
@@ -622,15 +651,16 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return doJoin(other, joiner, true, false, joinSerde, queryableStoreName);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other,
                                          final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
-                                         final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                         final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
         return doJoin(other, joiner, true, false, storeSupplier);
     }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({"unchecked", "deprecation"})
     private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other,
                                         final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                         final boolean leftOuter,
@@ -640,16 +670,18 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         Objects.requireNonNull(other, "other can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
 
-        final StateStoreSupplier storeSupplier = queryableStoreName == null ? null : keyValueStore(this.keySerde, joinSerde, queryableStoreName);
+        final org.apache.kafka.streams.processor.StateStoreSupplier storeSupplier
+            = queryableStoreName == null ? null : keyValueStore(this.keySerde, joinSerde, queryableStoreName);
 
         return doJoin(other, joiner, leftOuter, rightOuter, storeSupplier);
     }
 
+    @SuppressWarnings({"unchecked", "deprecation"})
     private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other,
                                         final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                         final boolean leftOuter,
                                         final boolean rightOuter,
-                                        final StateStoreSupplier<KeyValueStore> storeSupplier) {
+                                        final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(other, "other can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
         final String joinMergeName = builder.newName(MERGE_NAME);
@@ -668,6 +700,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return result;
     }
 
+    @SuppressWarnings("unchecked")
     private <VO, VR> KTable<K, VR> doJoin(final KTable<K, VO> other,
                                           final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                           final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
@@ -692,6 +725,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return result;
     }
 
+    @SuppressWarnings("unchecked")
     private <V1, R> KTable<K, R> buildJoin(final AbstractStream<K> other,
                                            final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                            final boolean leftOuter,
@@ -740,6 +774,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         return new KTableImpl<>(builder, joinMergeName, joinMerge, allSourceNodes, internalQueryableName, internalQueryableName != null);
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector,
                                                   final Serde<K1> keySerde,

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
index ffb3697..b4e5d44 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedSerializer.java
@@ -43,6 +43,7 @@ public class WindowedSerializer<T> implements Serializer<Windowed<T>> {
     // Default constructor needed by Kafka
     public WindowedSerializer() {}
 
+    @SuppressWarnings("unchecked")
     @Override
     public void configure(Map<String, ?> configs, boolean isKey) {
         if (inner == null) {
@@ -76,12 +77,12 @@ public class WindowedSerializer<T> implements Serializer<Windowed<T>> {
         inner.close();
     }
 
-    public byte[] serializeBaseKey(String topic, Windowed<T> data) {
+    byte[] serializeBaseKey(String topic, Windowed<T> data) {
         return inner.serialize(topic, data.key());
     }
 
     // Only for testing
-    public Serializer<T> innerSerializer() {
+    Serializer<T> innerSerializer() {
         return inner;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index ad0f236..66dfa27 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -50,6 +50,7 @@ import java.util.regex.Pattern;
  *
  * @deprecated use {@link Topology} instead
  */
+@SuppressWarnings("unchecked")
 @Deprecated
 public class TopologyBuilder {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index c24686e..52465ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -46,7 +46,7 @@ public abstract class AbstractTask implements Task {
     final ProcessorTopology topology;
     final ProcessorStateManager stateMgr;
     final Set<TopicPartition> partitions;
-    final Consumer consumer;
+    final Consumer<byte[], byte[]> consumer;
     final String logPrefix;
     final boolean eosEnabled;
     final Logger log;

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 06405ef..f2cbf51 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -23,7 +23,6 @@ import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
@@ -179,10 +178,10 @@ public class InternalTopologyBuilder {
 
     private static class StateStoreSupplierFactory extends AbstractStateStoreFactory {
         @SuppressWarnings("deprecation")
-        private final StateStoreSupplier supplier;
+        private final org.apache.kafka.streams.processor.StateStoreSupplier supplier;
 
         @SuppressWarnings("deprecation")
-        StateStoreSupplierFactory(final StateStoreSupplier<?> supplier) {
+        StateStoreSupplierFactory(final org.apache.kafka.streams.processor.StateStoreSupplier<?> supplier) {
             super(supplier.name(),
                   supplier.loggingEnabled(),
                   supplier instanceof WindowStoreSupplier,
@@ -196,6 +195,7 @@ public class InternalTopologyBuilder {
             return supplier.get();
         }
 
+        @SuppressWarnings("deprecation")
         @Override
         public long retentionPeriod() {
             if (!isWindowStore()) {
@@ -498,7 +498,7 @@ public class InternalTopologyBuilder {
     }
 
     @SuppressWarnings("deprecation")
-    public final void addStateStore(final StateStoreSupplier supplier,
+    public final void addStateStore(final org.apache.kafka.streams.processor.StateStoreSupplier supplier,
                                     final String... processorNames) {
         Objects.requireNonNull(supplier, "supplier can't be null");
         if (stateFactories.containsKey(supplier.name())) {
@@ -531,7 +531,7 @@ public class InternalTopologyBuilder {
     }
 
     @SuppressWarnings("deprecation")
-    public final void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
+    public final void addGlobalStore(final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier,
                                      final String sourceName,
                                      final TimestampExtractor timestampExtractor,
                                      final Deserializer keyDeserializer,
@@ -925,6 +925,7 @@ public class InternalTopologyBuilder {
         return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()), storeToChangelogTopic, new ArrayList<>(globalStateStores.values()));
     }
 
+    @SuppressWarnings("unchecked")
     private void buildSinkNode(final Map<String, ProcessorNode> processorMap,
                                final Map<String, SinkNode> topicSinkMap,
                                final SinkNodeFactory sinkNodeFactory,

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
index b395d42..47cd61d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/QuickUnion.java
@@ -52,6 +52,7 @@ public class QuickUnion<T> {
         return current;
     }
 
+    @SuppressWarnings("unchecked")
     public void unite(T id1, T... idList) {
         for (T id2 : idList) {
             unitePair(id1, id2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
index b254bb8..03bbceb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
@@ -112,7 +112,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
 
 
     private Map<String, String> constructTags(final String scopeName, final String entityName, final String... tags) {
-        List<String> updatedTagList = new ArrayList(Arrays.asList(tags));
+        List<String> updatedTagList = new ArrayList<>(Arrays.asList(tags));
         updatedTagList.add(scopeName + "-id");
         updatedTagList.add(entityName);
         return tagMap(updatedTagList.toArray(new String[updatedTagList.size()]));

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 0173c1d..c9c44af 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier;
 import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier;
@@ -271,7 +270,7 @@ public class Stores {
                                     }
 
                                     @Override
-                                    public StateStoreSupplier build() {
+                                    public org.apache.kafka.streams.processor.StateStoreSupplier build() {
                                         log.trace("Defining InMemory Store name={} capacity={} logged={}", name, capacity, logged);
                                         if (capacity < Integer.MAX_VALUE) {
                                             return new InMemoryLRUCacheStoreSupplier<>(name, capacity, keySerde, valueSerde, logged, logConfig);
@@ -335,7 +334,7 @@ public class Stores {
                                     }
 
                                     @Override
-                                    public StateStoreSupplier build() {
+                                    public org.apache.kafka.streams.processor.StateStoreSupplier build() {
                                         log.trace("Defining RocksDb Store name={} numSegments={} logged={}", name, numSegments, logged);
                                         if (sessionWindows) {
                                             return new RocksDBSessionStoreSupplier<>(name, retentionPeriod, keySerde, valueSerde, logged, logConfig, cachingEnabled);
@@ -535,6 +534,7 @@ public class Stores {
      * @param <K> the type of keys
      * @param <V> the type of values
      */
+    @Deprecated
     public interface InMemoryKeyValueFactory<K, V> {
         /**
          * Limits the in-memory key-value store to hold a maximum number of entries. The default is {@link Integer#MAX_VALUE}, which is
@@ -567,7 +567,7 @@ public class Stores {
          * Return the instance of StateStoreSupplier of new key-value store.
          * @return the state store supplier; never null
          */
-        StateStoreSupplier build();
+        org.apache.kafka.streams.processor.StateStoreSupplier build();
     }
 
     /**
@@ -576,6 +576,7 @@ public class Stores {
      * @param <K> the type of keys
      * @param <V> the type of values
      */
+    @Deprecated
     public interface PersistentKeyValueFactory<K, V> {
 
         /**
@@ -614,11 +615,12 @@ public class Stores {
          * @return the factory to create a persistent key-value store
          */
         PersistentKeyValueFactory<K, V> enableCaching();
+
         /**
          * Return the instance of StateStoreSupplier of new key-value store.
          * @return the key-value store; never null
          */
-        StateStoreSupplier build();
+        org.apache.kafka.streams.processor.StateStoreSupplier build();
 
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
index 92e8ce0..10d0fe2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreSupplier.java
@@ -19,12 +19,11 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 
 import java.util.Map;
 
-
-abstract class AbstractStoreSupplier<K, V, T extends StateStore> implements StateStoreSupplier<T> {
+@Deprecated
+abstract class AbstractStoreSupplier<K, V, T extends StateStore> implements org.apache.kafka.streams.processor.StateStoreSupplier<T> {
     protected final String name;
     protected final Serde<K> keySerde;
     protected final Serde<V> valueSerde;

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index 6d1e6dd..f955421 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -34,6 +34,7 @@ import java.util.Map;
  *
  * @see org.apache.kafka.streams.state.Stores#create(String)
  */
+@Deprecated
 public class InMemoryKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
 
     public InMemoryKeyValueStoreSupplier(String name, Serde<K> keySerde, Serde<V> valueSerde, boolean logged, Map<String, String> logConfig) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
index c93bacb..0f897ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
@@ -29,6 +29,7 @@ import java.util.Map;
  * @param <V> The value type
  *
  */
+@Deprecated
 public class InMemoryLRUCacheStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
 
     private final int capacity;

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
index 7ac8bab..a6ff8d5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStore.java
@@ -95,6 +95,7 @@ public class MeteredKeyValueBytesStore<K, V> extends WrappedStateStore.AbstractS
         }, time);
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void init(ProcessorContext context, StateStore root) {
         this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 8c10987..8d9065c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -57,6 +57,7 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
         this.time = time;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         //noinspection unchecked

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 49d8050..20c7c43 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -57,7 +57,7 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
         this.valueSerde = valueSerde;
     }
 
-
+    @SuppressWarnings("unchecked")
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
         this.context = context;

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
index d629c1c..4b233f0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
@@ -29,7 +29,7 @@ import java.util.Map;
  * @param <V> the type of values
  * @see org.apache.kafka.streams.state.Stores#create(String)
  */
-
+@Deprecated
 public class RocksDBKeyValueStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, KeyValueStore> {
 
     private final KeyValueStoreBuilder<K, V> builder;

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
index f5432dc..1552f7d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreSupplier.java
@@ -30,6 +30,7 @@ import java.util.Map;
  *
  * @see org.apache.kafka.streams.state.Stores#create(String)
  */
+@Deprecated
 public class RocksDBSessionStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, SessionStore> implements WindowStoreSupplier<SessionStore> {
 
     static final int NUM_SEGMENTS = 3;

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
index b899f5e..2a82f79 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -30,7 +30,7 @@ import java.util.Map;
  *
  * @see org.apache.kafka.streams.state.Stores#create(String)
  */
-
+@Deprecated
 public class RocksDBWindowStoreSupplier<K, V> extends AbstractStoreSupplier<K, V, WindowStore> implements WindowStoreSupplier<WindowStore> {
     public static final int MIN_SEGMENTS = 2;
     private final long retentionPeriod;

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
index ffeb7d8..b9b7181 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
@@ -21,8 +21,6 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
 import org.apache.kafka.streams.state.SessionStore;
 
-import static org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS;
-
 public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSupplier {
     private final String name;
     private final long retentionPeriod;
@@ -38,12 +36,14 @@ public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSuppli
         return name;
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public SessionStore<Bytes, byte[]> get() {
-        final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore(name,
-                                                                                    retentionPeriod,
-                                                                                    NUM_SEGMENTS,
-                                                                                    new SessionKeySchema());
+        final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore(
+            name,
+            retentionPeriod,
+            org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS,
+            new SessionKeySchema());
         return new RocksDBSessionStore<>(segmented, Serdes.Bytes(), Serdes.ByteArray());
     }
 
@@ -52,8 +52,11 @@ public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSuppli
         return "rocksdb-session";
     }
 
+    @SuppressWarnings("deprecation")
     @Override
     public long segmentIntervalMs() {
-        return Segments.segmentInterval(retentionPeriod, NUM_SEGMENTS);
+        return Segments.segmentInterval(
+            retentionPeriod,
+            org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier.NUM_SEGMENTS);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
index a0500b6..e873435 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -20,8 +20,6 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
 
-import static org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS;
-
 public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier {
     private final String name;
     private final long retentionPeriod;
@@ -29,13 +27,14 @@ public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier
     private final long windowSize;
     private final boolean retainDuplicates;
 
+    @SuppressWarnings("deprecation")
     public RocksDbWindowBytesStoreSupplier(final String name,
                                            final long retentionPeriod,
                                            final int segments,
                                            final long windowSize,
                                            final boolean retainDuplicates) {
-        if (segments < MIN_SEGMENTS) {
-            throw new IllegalArgumentException("numSegments must be >= " + MIN_SEGMENTS);
+        if (segments < org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS) {
+            throw new IllegalArgumentException("numSegments must be >= " + org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier.MIN_SEGMENTS);
         }
         this.name = name;
         this.retentionPeriod = retentionPeriod;

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java
index ad24c25..3495352 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreSupplier.java
@@ -17,14 +17,14 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 
 /**
- * A windowed state store supplier that extends the {@link StateStoreSupplier} interface.
+ * A windowed state store supplier that extends the {@link org.apache.kafka.streams.processor.StateStoreSupplier} interface.
  *
  * @param <T> State store type
  */
-public interface WindowStoreSupplier<T extends StateStore> extends StateStoreSupplier<T> {
+@Deprecated
+public interface WindowStoreSupplier<T extends StateStore> extends org.apache.kafka.streams.processor.StateStoreSupplier<T> {
 
     // window retention period in milli-second
     long retentionPeriod();

http://git-wip-us.apache.org/repos/asf/kafka/blob/713a67fd/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
index a4b3118..100a11c 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
@@ -27,7 +27,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Map;
 
-@SuppressWarnings("deprecation")
+@Deprecated
 public class MockStateStoreSupplier implements StateStoreSupplier {
     private String name;
     private boolean persistent;


Mime
View raw message