kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Rename stream partition assignor to streams partition assignor (#4621)
Date Mon, 26 Feb 2018 22:39:59 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f26fbb9  MINOR: Rename stream partition assignor to streams partition assignor (#4621)
f26fbb9 is described below

commit f26fbb9adcd66a31740da5c99f14a108bbc24304
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Mon Feb 26 14:39:47 2018 -0800

    MINOR: Rename stream partition assignor to streams partition assignor (#4621)
    
    This is a straight-forward change that make the name of the partition assignor to be aligned
with Streams.
    
    Reviewers: Matthias J. Sax <mjsax@apache.org>
---
 checkstyle/suppressions.xml                        | 10 +++---
 .../org/apache/kafka/streams/StreamsConfig.java    |  4 +--
 .../streams/processor/DefaultPartitionGrouper.java |  4 +--
 .../kafka/streams/processor/TopologyBuilder.java   |  2 +-
 ...Assignor.java => StreamsPartitionAssignor.java} |  2 +-
 .../org/apache/kafka/streams/state/HostInfo.java   |  4 +--
 .../apache/kafka/streams/StreamsConfigTest.java    |  4 +--
 .../streams/processor/TopologyBuilderTest.java     |  6 ++--
 .../CopartitionedTopicsValidatorTest.java          | 36 +++++++++++-----------
 ...Test.java => StreamsPartitionAssignorTest.java} |  6 ++--
 10 files changed, 39 insertions(+), 39 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f23805e..45ee4e6 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -134,7 +134,7 @@
               files="(TopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl|StreamThread|StreamTask).java"/>
 
     <suppress checks="MethodLength"
-              files="StreamPartitionAssignor.java"/>
+              files="StreamsPartitionAssignor.java"/>
 
     <suppress checks="ParameterNumber"
               files="StreamTask.java"/>
@@ -142,22 +142,22 @@
               files="RocksDBWindowStoreSupplier.java"/>
 
     <suppress checks="ClassDataAbstractionCoupling"
-              files="(TopologyBuilder|KStreamImpl|StreamPartitionAssignor|KafkaStreams|KTableImpl).java"/>
+              files="(TopologyBuilder|KStreamImpl|StreamsPartitionAssignor|KafkaStreams|KTableImpl).java"/>
 
     <suppress checks="CyclomaticComplexity"
               files="TopologyBuilder.java"/>
     <suppress checks="CyclomaticComplexity"
-              files="StreamPartitionAssignor.java"/>
+              files="StreamsPartitionAssignor.java"/>
     <suppress checks="CyclomaticComplexity"
               files="StreamThread.java"/>
 
     <suppress checks="JavaNCSS"
-              files="StreamPartitionAssignor.java"/>
+              files="StreamsPartitionAssignor.java"/>
 
     <suppress checks="NPathComplexity"
               files="ProcessorStateManager.java"/>
     <suppress checks="NPathComplexity"
-              files="StreamPartitionAssignor.java"/>
+              files="StreamsPartitionAssignor.java"/>
     <suppress checks="NPathComplexity"
               files="StreamThread.java"/>
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index ec0a1a8..6b36261 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -39,7 +39,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
+import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -782,7 +782,7 @@ public class StreamsConfig extends AbstractConfig {
         consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
-        consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName());
+        consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamsPartitionAssignor.class.getName());
         consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
 
         // add admin retries configs for creating topics
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
index 19e4809..c86171c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
@@ -20,7 +20,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
+import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,7 +83,7 @@ public class DefaultPartitionGrouper implements PartitionGrouper {
 
             if (partitions.isEmpty()) {
                 log.info("Skipping assigning topic {} to tasks since its metadata is not
available yet", topic);
-                return StreamPartitionAssignor.NOT_AVAILABLE;
+                return StreamsPartitionAssignor.NOT_AVAILABLE;
             } else {
                 int numPartitions = partitions.size();
                 if (numPartitions > maxNumPartitions)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 6f34e25..dab7bd7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -29,7 +29,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
-import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
+import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.SubscriptionUpdates;
 import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.util.Collection;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
similarity index 99%
rename from streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
rename to streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 2a26272..9aa0e94 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -54,7 +54,7 @@ import java.util.UUID;
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
 
-public class StreamPartitionAssignor implements PartitionAssignor, Configurable {
+public class StreamsPartitionAssignor implements PartitionAssignor, Configurable {
 
     private final static int UNKNOWN = -1;
     public final static int NOT_AVAILABLE = -2;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
index c1b1021..58cdba6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.state;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
+import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
 
 /**
  * Represents a user defined endpoint in a {@link org.apache.kafka.streams.KafkaStreams}
application.
@@ -30,7 +30,7 @@ import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
  *  {@link KafkaStreams#metadataForKey(String, Object, Serializer)}
  *
  *  The HostInfo is constructed during Partition Assignment
- *  see {@link StreamPartitionAssignor}
+ *  see {@link StreamsPartitionAssignor}
  *  It is extracted from the config {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG}
  *
  *  If developers wish to expose an endpoint in their KafkaStreams applications they should
provide the above
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index cc072d5..0309659 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -30,7 +30,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
+import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
 import org.hamcrest.CoreMatchers;
 import org.junit.Before;
 import org.junit.Test;
@@ -119,7 +119,7 @@ public class StreamsConfigTest {
 
         assertEquals(42, returnedProps.get(StreamsConfig.REPLICATION_FACTOR_CONFIG));
         assertEquals(1, returnedProps.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG));
-        assertEquals(StreamPartitionAssignor.class.getName(), returnedProps.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
+        assertEquals(StreamsPartitionAssignor.class.getName(), returnedProps.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG));
         assertEquals(7L, returnedProps.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
         assertEquals("dummy:host", returnedProps.get(StreamsConfig.APPLICATION_SERVER_CONFIG));
         assertEquals(null, returnedProps.get(StreamsConfig.RETRIES_CONFIG));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index ac3cd49..7a81594 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -27,7 +27,7 @@ import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
+import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
 import org.apache.kafka.streams.processor.internals.UnwindowedChangelogTopicConfig;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
@@ -661,7 +661,7 @@ public class TopologyBuilderTest {
         builder.addSource("source-2", Pattern.compile("topic-[A-C]"));
         builder.addSource("source-3", Pattern.compile("topic-\\d"));
 
-        StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
+        StreamsPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamsPartitionAssignor.SubscriptionUpdates();
         Field updatedTopicsField  = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
         updatedTopicsField.setAccessible(true);
 
@@ -741,7 +741,7 @@ public class TopologyBuilderTest {
                 .addProcessor("my-processor", new MockProcessorSupplier(), "ingest")
                 .addStateStore(new MockStateStoreSupplier("testStateStore", false), "my-processor");
 
-        final StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
+        final StreamsPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamsPartitionAssignor.SubscriptionUpdates();
         final Field updatedTopicsField  = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
         updatedTopicsField.setAccessible(true);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
index 19277e9..bbc59fa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
@@ -33,8 +33,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 public class CopartitionedTopicsValidatorTest {
 
-    private final StreamPartitionAssignor.CopartitionedTopicsValidator validator
-            = new StreamPartitionAssignor.CopartitionedTopicsValidator("thread");
+    private final StreamsPartitionAssignor.CopartitionedTopicsValidator validator
+            = new StreamsPartitionAssignor.CopartitionedTopicsValidator("thread");
     private final Map<TopicPartition, PartitionInfo> partitions = new HashMap<>();
     private final Cluster cluster = Cluster.empty();
 
@@ -49,7 +49,7 @@ public class CopartitionedTopicsValidatorTest {
     @Test(expected = TopologyBuilderException.class)
     public void shouldThrowTopologyBuilderExceptionIfNoPartitionsFoundForCoPartitionedTopic()
{
         validator.validate(Collections.singleton("topic"),
-                           Collections.<String, StreamPartitionAssignor.InternalTopicMetadata>emptyMap(),
+                           Collections.<String, StreamsPartitionAssignor.InternalTopicMetadata>emptyMap(),
                            cluster);
     }
 
@@ -57,14 +57,14 @@ public class CopartitionedTopicsValidatorTest {
     public void shouldThrowTopologyBuilderExceptionIfPartitionCountsForCoPartitionedTopicsDontMatch()
{
         partitions.remove(new TopicPartition("second", 0));
         validator.validate(Utils.mkSet("first", "second"),
-                           Collections.<String, StreamPartitionAssignor.InternalTopicMetadata>emptyMap(),
+                           Collections.<String, StreamsPartitionAssignor.InternalTopicMetadata>emptyMap(),
                            cluster.withPartitions(partitions));
     }
 
 
     @Test
     public void shouldEnforceCopartitioningOnRepartitionTopics() {
-        final StreamPartitionAssignor.InternalTopicMetadata metadata = createTopicMetadata("repartitioned",
10);
+        final StreamsPartitionAssignor.InternalTopicMetadata metadata = createTopicMetadata("repartitioned",
10);
 
         validator.validate(Utils.mkSet("first", "second", metadata.config.name()),
                            Collections.singletonMap(metadata.config.name(),
@@ -77,10 +77,10 @@ public class CopartitionedTopicsValidatorTest {
 
     @Test
     public void shouldSetNumPartitionsToMaximumPartitionsWhenAllTopicsAreRepartitionTopics()
{
-        final StreamPartitionAssignor.InternalTopicMetadata one = createTopicMetadata("one",
1);
-        final StreamPartitionAssignor.InternalTopicMetadata two = createTopicMetadata("two",
15);
-        final StreamPartitionAssignor.InternalTopicMetadata three = createTopicMetadata("three",
5);
-        final Map<String, StreamPartitionAssignor.InternalTopicMetadata> repartitionTopicConfig
= new HashMap<>();
+        final StreamsPartitionAssignor.InternalTopicMetadata one = createTopicMetadata("one",
1);
+        final StreamsPartitionAssignor.InternalTopicMetadata two = createTopicMetadata("two",
15);
+        final StreamsPartitionAssignor.InternalTopicMetadata three = createTopicMetadata("three",
5);
+        final Map<String, StreamsPartitionAssignor.InternalTopicMetadata> repartitionTopicConfig
= new HashMap<>();
 
         repartitionTopicConfig.put(one.config.name(), one);
         repartitionTopicConfig.put(two.config.name(), two);
@@ -100,9 +100,9 @@ public class CopartitionedTopicsValidatorTest {
 
     @Test
     public void shouldSetRepartitionTopicsPartitionCountToNotAvailableIfAnyNotAvaliable()
{
-        final StreamPartitionAssignor.InternalTopicMetadata one = createTopicMetadata("one",
1);
-        final StreamPartitionAssignor.InternalTopicMetadata two = createTopicMetadata("two",
StreamPartitionAssignor.NOT_AVAILABLE);
-        final Map<String, StreamPartitionAssignor.InternalTopicMetadata> repartitionTopicConfig
= new HashMap<>();
+        final StreamsPartitionAssignor.InternalTopicMetadata one = createTopicMetadata("one",
1);
+        final StreamsPartitionAssignor.InternalTopicMetadata two = createTopicMetadata("two",
StreamsPartitionAssignor.NOT_AVAILABLE);
+        final Map<String, StreamsPartitionAssignor.InternalTopicMetadata> repartitionTopicConfig
= new HashMap<>();
 
         repartitionTopicConfig.put(one.config.name(), one);
         repartitionTopicConfig.put(two.config.name(), two);
@@ -114,18 +114,18 @@ public class CopartitionedTopicsValidatorTest {
                            repartitionTopicConfig,
                            cluster.withPartitions(partitions));
 
-        assertThat(one.numPartitions, equalTo(StreamPartitionAssignor.NOT_AVAILABLE));
-        assertThat(two.numPartitions, equalTo(StreamPartitionAssignor.NOT_AVAILABLE));
+        assertThat(one.numPartitions, equalTo(StreamsPartitionAssignor.NOT_AVAILABLE));
+        assertThat(two.numPartitions, equalTo(StreamsPartitionAssignor.NOT_AVAILABLE));
 
     }
 
-    private StreamPartitionAssignor.InternalTopicMetadata createTopicMetadata(final String
repartitionTopic,
-                                                                              final int partitions)
{
+    private StreamsPartitionAssignor.InternalTopicMetadata createTopicMetadata(final String
repartitionTopic,
+                                                                               final int
partitions) {
         final InternalTopicConfig repartitionTopicConfig
                 = new RepartitionTopicConfig(repartitionTopic, Collections.<String, String>emptyMap());
 
-        final StreamPartitionAssignor.InternalTopicMetadata metadata
-                = new StreamPartitionAssignor.InternalTopicMetadata(repartitionTopicConfig);
+        final StreamsPartitionAssignor.InternalTopicMetadata metadata
+                = new StreamsPartitionAssignor.InternalTopicMetadata(repartitionTopicConfig);
         metadata.numPartitions = partitions;
         return metadata;
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
similarity index 99%
rename from streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
rename to streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 02ab803..bf3f1d1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -65,7 +65,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
 
-public class StreamPartitionAssignorTest {
+public class StreamsPartitionAssignorTest {
 
     private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
     private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
@@ -105,7 +105,7 @@ public class StreamPartitionAssignorTest {
     private final TaskId task1 = new TaskId(0, 1);
     private final TaskId task2 = new TaskId(0, 2);
     private final TaskId task3 = new TaskId(0, 3);
-    private final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+    private final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor();
     private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
     private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
     private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
@@ -762,7 +762,7 @@ public class StreamPartitionAssignorTest {
             .count();
 
         // joining the stream and the table
-        // this triggers the enforceCopartitioning() routine in the StreamPartitionAssignor,
+        // this triggers the enforceCopartitioning() routine in the StreamsPartitionAssignor,
         // forcing the stream.map to get repartitioned to a topic with four partitions.
         stream1.join(
             table1,

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message