kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-5979; Use single AtomicCounter to generate internal names
Date Thu, 28 Sep 2017 10:07:57 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk e5f2471c5 -> e846daa89


KAFKA-5979; Use single AtomicCounter to generate internal names

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Bill Bejeck <bill@confluent.io>, Damian Guy <damian.guy@gmail.com>

Closes #3979 from mjsax/kafka-5979-kip-120-regression


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e846daa8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e846daa8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e846daa8

Branch: refs/heads/trunk
Commit: e846daa89b1cbdf7c08f1b719fcf1ffc3885614f
Parents: e5f2471
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Thu Sep 28 11:07:54 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Thu Sep 28 11:07:54 2017 +0100

----------------------------------------------------------------------
 .../apache/kafka/streams/kstream/KStreamBuilder.java    |  7 ++-----
 .../kstream/internals/InternalStreamsBuilder.java       |  2 +-
 .../kafka/streams/kstream/KStreamBuilderTest.java       | 12 ++++++------
 3 files changed, 9 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e846daa8/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index e7bcc95..ab666ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -37,7 +37,6 @@ import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
 
 import java.util.Collections;
 import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
 /**
@@ -52,8 +51,6 @@ import java.util.regex.Pattern;
 @Deprecated
 public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyBuilder {
 
-    private final AtomicInteger index = new AtomicInteger(0);
-
     private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(super.internalTopologyBuilder);
 
     private Topology.AutoOffsetReset translateAutoOffsetReset(final org.apache.kafka.streams.processor.TopologyBuilder.AutoOffsetReset
resetPolicy) {
@@ -1249,7 +1246,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
      * @return a new unique name
      */
     public String newName(final String prefix) {
-        return prefix + String.format("%010d", index.getAndIncrement());
+        return internalStreamsBuilder.newName(prefix);
     }
 
     /**
@@ -1261,7 +1258,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
      * @return a new unique name
      */
     public String newStoreName(final String prefix) {
-        return prefix + String.format(KTableImpl.STATE_STORE_NAME + "%010d", index.getAndIncrement());
+        return internalStreamsBuilder.newStoreName(prefix);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e846daa8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index fa696fe..357a70c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -159,7 +159,7 @@ public class InternalStreamsBuilder {
         return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<K, V>(storeBuilder.name()));
     }
 
-    String newName(final String prefix) {
+    public String newName(final String prefix) {
         return prefix + String.format("%010d", index.getAndIncrement());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e846daa8/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index c0bfa99..5ffedb8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -19,8 +19,8 @@ package org.apache.kafka.streams.kstream;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.kstream.internals.KTableImpl;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TopologyBuilder;
@@ -29,25 +29,25 @@ import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.MockValueJoiner;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertTrue;
 
 @SuppressWarnings("deprecation")
 public class KStreamBuilderTest {
@@ -303,7 +303,7 @@ public class KStreamBuilderTest {
         final KStream<String, String> mapped = playEvents.map(MockKeyValueMapper.<String,
String>SelectValueKeyValueMapper());
         mapped.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count("count");
         assertEquals(Collections.singletonList("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store"));
-        assertEquals(Collections.singletonList(APP_ID + "-KSTREAM-MAP-0000000000-repartition"),
builder.stateStoreNameToSourceTopics().get("count"));
+        assertEquals(Collections.singletonList(APP_ID + "-KSTREAM-MAP-0000000003-repartition"),
builder.stateStoreNameToSourceTopics().get("count"));
     }
 
     @Test


Mime
View raw message