kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch trunk updated: [TRIVIAL] Remove unused StreamsGraphNode#repartitionRequired (#6227)
Date Tue, 26 Feb 2019 15:46:23 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dc91ce5  [TRIVIAL] Remove unused StreamsGraphNode#repartitionRequired (#6227)
dc91ce5 is described below

commit dc91ce58af941b822fbad3b560c35774130848f0
Author: Lee Dongjin <dongjin@apache.org>
AuthorDate: Wed Feb 27 00:22:25 2019 +0900

    [TRIVIAL] Remove unused StreamsGraphNode#repartitionRequired (#6227)
    
    I found this defect while inspecting [KAFKA-7293: Merge followed by groupByKey/join might
violate co-partitioning](https://issues.apache.org/jira/browse/KAFKA-7293); This flag is never
used now. Instead, `KStreamImpl#repartitionRequired` is now covering its functionality.
    
    Reviewers: Matthias J. Sax <mjsax@apache.org>, Bill Bejeck <bbejeck@gmail.com>
---
 .../internals/GroupedStreamAggregateBuilder.java   |  3 +-
 .../kstream/internals/InternalStreamsBuilder.java  |  2 +-
 .../kstream/internals/KGroupedTableImpl.java       |  3 +-
 .../streams/kstream/internals/KStreamImpl.java     | 41 +++++++++-------------
 .../streams/kstream/internals/KTableImpl.java      |  8 ++---
 .../internals/graph/BaseJoinProcessorNode.java     |  3 +-
 .../internals/graph/BaseRepartitionNode.java       |  2 +-
 .../internals/graph/ProcessorGraphNode.java        | 14 ++------
 .../kstream/internals/graph/StateStoreNode.java    |  2 +-
 .../internals/graph/StatefulProcessorNode.java     | 14 +++-----
 .../kstream/internals/graph/StreamSinkNode.java    |  3 +-
 .../kstream/internals/graph/StreamSourceNode.java  |  4 +--
 .../internals/graph/StreamTableJoinNode.java       |  3 +-
 .../kstream/internals/graph/StreamsGraphNode.java  | 10 +-----
 .../internals/graph/TableProcessorNode.java        |  2 +-
 .../kstream/internals/AbstractStreamTest.java      |  3 +-
 .../internals/graph/GraphGraceSearchUtilTest.java  | 24 +++++--------
 17 files changed, 46 insertions(+), 95 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index 46546f4..cd5155f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -102,8 +102,7 @@ class GroupedStreamAggregateBuilder<K, V> {
             new StatefulProcessorNode<>(
                 aggFunctionName,
                 new ProcessorParameters<>(aggregateSupplier, aggFunctionName),
-                storeBuilder,
-                repartitionRequired
+                storeBuilder
             );
 
         builder.addGraphNode(parentNode, statefulProcessorNode);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index e0983eb..c06b988 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -65,7 +65,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
     private static final String TOPOLOGY_ROOT = "root";
     private static final Logger LOG = LoggerFactory.getLogger(InternalStreamsBuilder.class);
 
-    protected final StreamsGraphNode root = new StreamsGraphNode(TOPOLOGY_ROOT, false) {
+    protected final StreamsGraphNode root = new StreamsGraphNode(TOPOLOGY_ROOT) {
         @Override
         public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
             // no-op for root node
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 4675f56..56be0f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -88,8 +88,7 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K,
V> implements KGr
         final StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode<>(
             funcName,
             new ProcessorParameters<>(aggregateSupplier, funcName),
-            new KeyValueStoreMaterializer<>(materialized).materialize(),
-            false
+            new KeyValueStoreMaterializer<>(materialized).materialize()
         );
 
         // now the repartition node must be the parent of the StateProcessorNode
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index ba08b89..0eda64f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -133,7 +133,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V>
implements KStream<K
 
 
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new
KStreamFilter<>(predicate, false), name);
-        final ProcessorGraphNode<? super K, ? super V> filterProcessorNode = new ProcessorGraphNode<>(name,
processorParameters, repartitionRequired);
+        final ProcessorGraphNode<? super K, ? super V> filterProcessorNode = new ProcessorGraphNode<>(name,
processorParameters);
         builder.addGraphNode(this.streamsGraphNode, filterProcessorNode);
 
         return new KStreamImpl<>(name,
@@ -151,7 +151,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V>
implements KStream<K
         final String name = builder.newProcessorName(FILTER_NAME);
 
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new
KStreamFilter<>(predicate, true), name);
-        final ProcessorGraphNode<? super K, ? super V> filterNotProcessorNode = new
ProcessorGraphNode<>(name, processorParameters, repartitionRequired);
+        final ProcessorGraphNode<? super K, ? super V> filterNotProcessorNode = new
ProcessorGraphNode<>(name, processorParameters);
 
         builder.addGraphNode(this.streamsGraphNode, filterNotProcessorNode);
 
@@ -185,7 +185,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V>
implements KStream<K
 
         final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(kStreamMap,
name);
 
-        return new ProcessorGraphNode<>(name, processorParameters, repartitionRequired);
+        return new ProcessorGraphNode<>(name, processorParameters);
     }
 
     @Override
@@ -195,7 +195,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V>
implements KStream<K
 
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new
KStreamMap<>(mapper), name);
 
-        final ProcessorGraphNode<? super K, ? super V> mapProcessorNode = new ProcessorGraphNode<>(name,
processorParameters, true);
+        final ProcessorGraphNode<? super K, ? super V> mapProcessorNode = new ProcessorGraphNode<>(name,
processorParameters);
 
         mapProcessorNode.keyChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, mapProcessorNode);
@@ -222,7 +222,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V>
implements KStream<K
         final String name = builder.newProcessorName(MAPVALUES_NAME);
 
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new
KStreamMapValues<>(mapper), name);
-        final ProcessorGraphNode<? super K, ? super V> mapValuesProcessorNode = new
ProcessorGraphNode<>(name, processorParameters, repartitionRequired);
+        final ProcessorGraphNode<? super K, ? super V> mapValuesProcessorNode = new
ProcessorGraphNode<>(name, processorParameters);
 
         mapValuesProcessorNode.setValueChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, mapValuesProcessorNode);
@@ -244,7 +244,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V>
implements KStream<K
         final String name = builder.newProcessorName(PRINTING_NAME);
 
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(printedInternal.build(this.name),
name);
-        final ProcessorGraphNode<? super K, ? super V> printNode = new ProcessorGraphNode<>(name,
processorParameters, false);
+        final ProcessorGraphNode<? super K, ? super V> printNode = new ProcessorGraphNode<>(name,
processorParameters);
 
         builder.addGraphNode(this.streamsGraphNode, printNode);
     }
@@ -255,7 +255,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V>
implements KStream<K
         final String name = builder.newProcessorName(FLATMAP_NAME);
 
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new
KStreamFlatMap<>(mapper), name);
-        final ProcessorGraphNode<? super K, ? super V> flatMapNode = new ProcessorGraphNode<>(name,
processorParameters, true);
+        final ProcessorGraphNode<? super K, ? super V> flatMapNode = new ProcessorGraphNode<>(name,
processorParameters);
         flatMapNode.keyChangingOperation(true);
 
         builder.addGraphNode(this.streamsGraphNode, flatMapNode);
@@ -281,7 +281,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V>
implements KStream<K
         final String name = builder.newProcessorName(FLATMAPVALUES_NAME);
 
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new
KStreamFlatMapValues<>(mapper), name);
-        final ProcessorGraphNode<? super K, ? super V> flatMapValuesNode = new ProcessorGraphNode<>(name,
processorParameters, repartitionRequired);
+        final ProcessorGraphNode<? super K, ? super V> flatMapValuesNode = new ProcessorGraphNode<>(name,
processorParameters);
 
         flatMapValuesNode.setValueChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, flatMapValuesNode);
@@ -308,14 +308,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V>
implements KStream<K
         }
 
         final ProcessorParameters processorParameters = new ProcessorParameters<>(new
KStreamBranch(predicates.clone(), childNames), branchName);
-        final ProcessorGraphNode<K, V> branchNode = new ProcessorGraphNode<>(branchName,
processorParameters, false);
+        final ProcessorGraphNode<K, V> branchNode = new ProcessorGraphNode<>(branchName,
processorParameters);
         builder.addGraphNode(this.streamsGraphNode, branchNode);
 
         final KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class,
predicates.length);
 
         for (int i = 0; i < predicates.length; i++) {
             final ProcessorParameters innerProcessorParameters = new ProcessorParameters<>(new
KStreamPassThrough<K, V>(), childNames[i]);
-            final ProcessorGraphNode<K, V> branchChildNode = new ProcessorGraphNode<>(childNames[i],
innerProcessorParameters, repartitionRequired);
+            final ProcessorGraphNode<K, V> branchChildNode = new ProcessorGraphNode<>(childNames[i],
innerProcessorParameters);
 
             builder.addGraphNode(branchNode, branchChildNode);
             branchChildren[i] = new KStreamImpl<>(childNames[i], keySerde, valSerde,
sourceNodes, repartitionRequired, branchChildNode, builder);
@@ -343,9 +343,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V>
implements KStream<K
         final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new
KStreamPassThrough<>(), name);
 
 
-        final ProcessorGraphNode<? super K, ? super V> mergeNode = new ProcessorGraphNode<>(name,
-                                                                                        
   processorParameters,
-                                                                                        
   requireRepartitioning);
+        final ProcessorGraphNode<? super K, ? super V> mergeNode = new ProcessorGraphNode<>(name,
processorParameters);
 
         mergeNode.setMergeNode(true);
         builder.addGraphNode(Arrays.asList(this.streamsGraphNode, streamImpl.streamsGraphNode),
mergeNode);
@@ -364,9 +362,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V>
implements KStream<K
             name
         );
 
-        final ProcessorGraphNode<? super K, ? super V> foreachNode = new ProcessorGraphNode<>(name,
-                                                                                        
     processorParameters,
-                                                                                        
     repartitionRequired);
+        final ProcessorGraphNode<? super K, ? super V> foreachNode = new ProcessorGraphNode<>(name,
processorParameters);
         builder.addGraphNode(this.streamsGraphNode, foreachNode);
     }
 
@@ -380,9 +376,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V>
implements KStream<K
             name
         );
 
-        final ProcessorGraphNode<? super K, ? super V> peekNode = new ProcessorGraphNode<>(name,
-                                                                                        
  processorParameters,
-                                                                                        
  repartitionRequired);
+        final ProcessorGraphNode<? super K, ? super V> peekNode = new ProcessorGraphNode<>(name,
processorParameters);
 
         builder.addGraphNode(this.streamsGraphNode, peekNode);
 
@@ -460,8 +454,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V>
implements KStream<K
         final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>(
             name,
             new ProcessorParameters<>(new KStreamFlatTransform<>(transformerSupplier),
name),
-            stateStoreNames,
-            true
+            stateStoreNames
         );
 
         transformNode.keyChangingOperation(true);
@@ -494,8 +487,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V>
implements KStream<K
         final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>(
             name,
             new ProcessorParameters<>(new KStreamTransformValues<>(valueTransformerWithKeySupplier),
name),
-            stateStoreNames,
-            repartitionRequired
+            stateStoreNames
         );
 
         transformNode.setValueChangingOperation(true);
@@ -515,8 +507,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V>
implements KStream<K
         final StatefulProcessorNode<? super K, ? super V> processNode = new StatefulProcessorNode<>(
             name,
             new ProcessorParameters<>(processorSupplier, name),
-            stateStoreNames,
-            repartitionRequired
+            stateStoreNames
         );
 
         builder.addGraphNode(this.streamsGraphNode, processNode);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 6e65b89..68f940c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -328,8 +328,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V>
implements KTable<
 
         final ProcessorGraphNode<K, V> toStreamNode = new ProcessorGraphNode<>(
             name,
-            processorParameters,
-            false
+            processorParameters
         );
 
         builder.addGraphNode(this.streamsGraphNode, toStreamNode);
@@ -370,8 +369,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V>
implements KTable<
         final ProcessorGraphNode<K, Change<V>> node = new StatefulProcessorNode<>(
             name,
             new ProcessorParameters<>(suppressionSupplier, name),
-            new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName),
-            false
+            new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName)
         );
 
         builder.addGraphNode(streamsGraphNode, node);
@@ -579,7 +577,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V>
implements KTable<
         final ProcessorParameters<K, Change<V>> processorParameters = new ProcessorParameters<>(selectSupplier,
selectName);
 
         // select the aggregate key and values (old and new), it would require parent to
send old values
-        final ProcessorGraphNode<K, Change<V>> groupByMapNode = new ProcessorGraphNode<>(selectName,
processorParameters, false);
+        final ProcessorGraphNode<K, Change<V>> groupByMapNode = new ProcessorGraphNode<>(selectName,
processorParameters);
 
         builder.addGraphNode(this.streamsGraphNode, groupByMapNode);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java
index fd1fcc9..ce410c3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseJoinProcessorNode.java
@@ -41,8 +41,7 @@ abstract class BaseJoinProcessorNode<K, V1, V2, VR> extends StreamsGraphNode
{
                           final String thisJoinSideNodeName,
                           final String otherJoinSideNodeName) {
 
-        super(nodeName,
-              false);
+        super(nodeName);
 
         this.valueJoiner = valueJoiner;
         this.joinThisProcessorParameters = joinThisProcessorParameters;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
index e7f8e56..460f640 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/BaseRepartitionNode.java
@@ -39,7 +39,7 @@ public abstract class BaseRepartitionNode<K, V> extends StreamsGraphNode
{
                         final String sinkName,
                         final String repartitionTopic) {
 
-        super(nodeName, false);
+        super(nodeName);
 
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
index 658f55e..2cfe3cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorGraphNode.java
@@ -29,23 +29,13 @@ public class ProcessorGraphNode<K, V> extends StreamsGraphNode {
     private final ProcessorParameters<K, V> processorParameters;
 
     public ProcessorGraphNode(final String nodeName,
-                              final ProcessorParameters<K, V> processorParameters,
-                              final boolean repartitionRequired) {
+                              final ProcessorParameters<K, V> processorParameters)
{
 
-        super(nodeName, repartitionRequired);
+        super(nodeName);
 
         this.processorParameters = processorParameters;
     }
 
-    public ProcessorGraphNode(final String nodeName,
-                              final ProcessorParameters<K, V> processorParameters)
{
-        this(
-            nodeName,
-            processorParameters,
-            false
-        );
-    }
-
     public ProcessorParameters processorParameters() {
         return processorParameters;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
index 6d3b8ba..ea42cec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StateStoreNode.java
@@ -25,7 +25,7 @@ public class StateStoreNode extends StreamsGraphNode {
     protected final StoreBuilder storeBuilder;
 
     public StateStoreNode(final StoreBuilder storeBuilder) {
-        super(storeBuilder.name(), false);
+        super(storeBuilder.name());
 
         this.storeBuilder = storeBuilder;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
index df2a52e..1e910ce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java
@@ -36,11 +36,8 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K,
V> {
      */
     public StatefulProcessorNode(final String nodeName,
                                  final ProcessorParameters<K, V> processorParameters,
-                                 final String[] storeNames,
-                                 final boolean repartitionRequired) {
-        super(nodeName,
-              processorParameters,
-              repartitionRequired);
+                                 final String[] storeNames) {
+        super(nodeName, processorParameters);
 
         this.storeNames = storeNames;
         this.storeBuilder = null;
@@ -53,11 +50,8 @@ public class StatefulProcessorNode<K, V> extends ProcessorGraphNode<K,
V> {
      */
     public StatefulProcessorNode(final String nodeName,
                                  final ProcessorParameters<K, V> processorParameters,
-                                 final StoreBuilder<? extends StateStore> materializedKTableStoreBuilder,
-                                 final boolean repartitionRequired) {
-        super(nodeName,
-              processorParameters,
-              repartitionRequired);
+                                 final StoreBuilder<? extends StateStore> materializedKTableStoreBuilder)
{
+        super(nodeName, processorParameters);
 
         this.storeNames = null;
         this.storeBuilder = materializedKTableStoreBuilder;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
index 95076c8..dfe7f9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSinkNode.java
@@ -34,8 +34,7 @@ public class StreamSinkNode<K, V> extends StreamsGraphNode {
                           final TopicNameExtractor<K, V> topicNameExtractor,
                           final ProducedInternal<K, V> producedInternal) {
 
-        super(nodeName,
-              false);
+        super(nodeName);
 
         this.topicNameExtractor = topicNameExtractor;
         this.producedInternal = producedInternal;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
index 7d50c2a..317a95f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java
@@ -35,7 +35,7 @@ public class StreamSourceNode<K, V> extends StreamsGraphNode {
     public StreamSourceNode(final String nodeName,
                             final Collection<String> topicNames,
                             final ConsumedInternal<K, V> consumedInternal) {
-        super(nodeName, false);
+        super(nodeName);
 
         this.topicNames = topicNames;
         this.consumedInternal = consumedInternal;
@@ -45,7 +45,7 @@ public class StreamSourceNode<K, V> extends StreamsGraphNode {
                             final Pattern topicPattern,
                             final ConsumedInternal<K, V> consumedInternal) {
 
-        super(nodeName, false);
+        super(nodeName);
 
         this.topicPattern = topicPattern;
         this.consumedInternal = consumedInternal;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
index 1389156..f12eec7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamTableJoinNode.java
@@ -36,8 +36,7 @@ public class StreamTableJoinNode<K, V> extends StreamsGraphNode {
                                final ProcessorParameters<K, V> processorParameters,
                                final String[] storeNames,
                                final String otherJoinSideNodeName) {
-        super(nodeName,
-              false);
+        super(nodeName);
 
         // in the case of Stream-Table join the state stores associated with the KTable
         this.storeNames = storeNames;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java
index 5bb3649..6ee8efd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java
@@ -29,17 +29,14 @@ public abstract class StreamsGraphNode {
     private final Collection<StreamsGraphNode> childNodes = new LinkedHashSet<>();
     private final Collection<StreamsGraphNode> parentNodes = new LinkedHashSet<>();
     private final String nodeName;
-    private final boolean repartitionRequired;
     private boolean keyChangingOperation;
     private boolean valueChangingOperation;
     private boolean mergeNode;
     private Integer buildPriority;
     private boolean hasWrittenToTopology = false;
 
-    public StreamsGraphNode(final String nodeName,
-                            final boolean repartitionRequired) {
+    public StreamsGraphNode(final String nodeName) {
         this.nodeName = nodeName;
-        this.repartitionRequired = repartitionRequired;
     }
 
     public Collection<StreamsGraphNode> parentNodes() {
@@ -88,10 +85,6 @@ public abstract class StreamsGraphNode {
         return nodeName;
     }
 
-    public boolean repartitionRequired() {
-        return repartitionRequired;
-    }
-
     public boolean isKeyChangingOperation() {
         return keyChangingOperation;
     }
@@ -140,7 +133,6 @@ public abstract class StreamsGraphNode {
         return "StreamsGraphNode{" +
                "nodeName='" + nodeName + '\'' +
                ", buildPriority=" + buildPriority +
-               ", repartitionRequired=" + repartitionRequired +
                ", hasWrittenToTopology=" + hasWrittenToTopology +
                ", keyChangingOperation=" + keyChangingOperation +
                ", valueChangingOperation=" + valueChangingOperation +
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
index 0409c62..f335843 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
@@ -37,7 +37,7 @@ public class TableProcessorNode<K, V, S extends StateStore> extends
StreamsGraph
                               final MaterializedInternal<K, V, S> materializedInternal,
                               final String[] storeNames) {
 
-        super(nodeName, false);
+        super(nodeName);
         this.processorParameters = processorParameters;
         this.materializedInternal = materializedInternal;
         this.storeNames = storeNames != null ? storeNames : new String[]{};
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index 425f837..89242eb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -107,8 +107,7 @@ public class AbstractStreamTest {
             final String name = builder.newProcessorName("RANDOM-FILTER-");
             final ProcessorGraphNode<K, V> processorNode = new ProcessorGraphNode<>(
                 name,
-                new ProcessorParameters<>(new ExtendedKStreamDummy<>(), name),
-                false);
+                new ProcessorParameters<>(new ExtendedKStreamDummy<>(), name));
             builder.addGraphNode(this.streamsGraphNode, processorNode);
             return new KStreamImpl<>(name, null, null, sourceNodes, false, processorNode,
builder);
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
index 1612cb9..8c61c73 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
@@ -63,8 +63,7 @@ public class GraphGraceSearchUtilTest {
                 },
                 "dummy"
             ),
-            (StoreBuilder<? extends StateStore>) null,
-            false
+            (StoreBuilder<? extends StateStore>) null
         );
 
         final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless",
null);
@@ -92,8 +91,7 @@ public class GraphGraceSearchUtilTest {
                 ),
                 "asdf"
             ),
-            (StoreBuilder<? extends StateStore>) null,
-            false
+            (StoreBuilder<? extends StateStore>) null
         );
 
         final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
@@ -116,8 +114,7 @@ public class GraphGraceSearchUtilTest {
                 ),
                 "asdf"
             ),
-            (StoreBuilder<? extends StateStore>) null,
-            false
+            (StoreBuilder<? extends StateStore>) null
         );
 
         final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
@@ -132,8 +129,7 @@ public class GraphGraceSearchUtilTest {
             new ProcessorParameters<>(new KStreamSessionWindowAggregate<String,
Long, Integer>(
                 windows, "asdf", null, null, null
             ), "asdf"),
-            (StoreBuilder<? extends StateStore>) null,
-            false
+            (StoreBuilder<? extends StateStore>) null
         );
 
         final StatefulProcessorNode<String, Long> statefulParent = new StatefulProcessorNode<>(
@@ -151,8 +147,7 @@ public class GraphGraceSearchUtilTest {
                 },
                 "dummy"
             ),
-            (StoreBuilder<? extends StateStore>) null,
-            false
+            (StoreBuilder<? extends StateStore>) null
         );
         graceGrandparent.addChild(statefulParent);
 
@@ -178,8 +173,7 @@ public class GraphGraceSearchUtilTest {
                 ),
                 "asdf"
             ),
-            (StoreBuilder<? extends StateStore>) null,
-            false
+            (StoreBuilder<? extends StateStore>) null
         );
 
         final ProcessorGraphNode<String, Long> statelessParent = new ProcessorGraphNode<>("stateless",
null);
@@ -206,8 +200,7 @@ public class GraphGraceSearchUtilTest {
                 ),
                 "asdf"
             ),
-            (StoreBuilder<? extends StateStore>) null,
-            false
+            (StoreBuilder<? extends StateStore>) null
         );
 
         final StatefulProcessorNode<String, Long> rightParent = new StatefulProcessorNode<>(
@@ -221,8 +214,7 @@ public class GraphGraceSearchUtilTest {
                 ),
                 "asdf"
             ),
-            (StoreBuilder<? extends StateStore>) null,
-            false
+            (StoreBuilder<? extends StateStore>) null
         );
 
         final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless",
null);


Mime
View raw message