kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3855: Guard race conditions in TopologyBuilder
Date Tue, 19 Jul 2016 15:44:52 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 0744449ea -> 114945cb6


KAFKA-3855: Guard race conditions in TopologyBuilder

Mark all public `TopologyBuilder` methods as synchronized as they can modify data-structures
and these methods could be called from multiple threads

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1633 from dguy/kafka-3855


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/114945cb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/114945cb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/114945cb

Branch: refs/heads/trunk
Commit: 114945cb6297f909a7b1272bd61e17b662d42d24
Parents: 0744449
Author: Damian Guy <damian.guy@gmail.com>
Authored: Tue Jul 19 08:44:48 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Jul 19 08:44:48 2016 -0700

----------------------------------------------------------------------
 .../streams/processor/TopologyBuilder.java      | 46 ++++++++++----------
 1 file changed, 23 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/114945cb/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 19440e4..2c02b0c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -239,7 +239,7 @@ public class TopologyBuilder {
      * @param topics the name of one or more Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
      */
-    public final TopologyBuilder addSource(String name, String... topics) {
+    public synchronized final TopologyBuilder addSource(String name, String... topics) {
         return addSource(name, (Deserializer) null, (Deserializer) null, topics);
     }
 
@@ -256,7 +256,7 @@ public class TopologyBuilder {
      * @param topicPattern regular expression pattern to match Kafka topics that this source
is to consume
      * @return this builder instance so methods can be chained together; never null
      */
-    public final TopologyBuilder addSource(String name, Pattern topicPattern) {
+    public synchronized final TopologyBuilder addSource(String name, Pattern topicPattern)
{
         return addSource(name, (Deserializer) null, (Deserializer) null, topicPattern);
     }
 
@@ -276,7 +276,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if processor is already added or if topics have already
been registered by another source
      */
-    public final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer
valDeserializer, String... topics) {
+    public synchronized final TopologyBuilder addSource(String name, Deserializer keyDeserializer,
Deserializer valDeserializer, String... topics) {
         if (nodeFactories.containsKey(name))
             throw new TopologyBuilderException("Processor " + name + " is already added.");
 
@@ -320,7 +320,7 @@ public class TopologyBuilder {
      * @throws TopologyBuilderException if processor is already added or if topics have already
been registered by name
      */
 
-    public final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer
valDeserializer, Pattern topicPattern) {
+    public synchronized final TopologyBuilder addSource(String name, Deserializer keyDeserializer,
Deserializer valDeserializer, Pattern topicPattern) {
 
         if (topicPattern == null) {
             throw new TopologyBuilderException("Pattern can't be null");
@@ -358,7 +358,7 @@ public class TopologyBuilder {
      * @see #addSink(String, String, Serializer, Serializer, String...)
      * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
      */
-    public final TopologyBuilder addSink(String name, String topic, String... parentNames)
{
+    public synchronized final TopologyBuilder addSink(String name, String topic, String...
parentNames) {
         return addSink(name, topic, (Serializer) null, (Serializer) null, parentNames);
     }
 
@@ -385,7 +385,7 @@ public class TopologyBuilder {
      * @see #addSink(String, String, Serializer, Serializer, String...)
      * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
      */
-    public final TopologyBuilder addSink(String name, String topic, StreamPartitioner partitioner,
String... parentNames) {
+    public synchronized final TopologyBuilder addSink(String name, String topic, StreamPartitioner
partitioner, String... parentNames) {
         return addSink(name, topic, (Serializer) null, (Serializer) null, partitioner, parentNames);
     }
 
@@ -408,7 +408,7 @@ public class TopologyBuilder {
      * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
      */
-    public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer,
Serializer valSerializer, String... parentNames) {
+    public synchronized final TopologyBuilder addSink(String name, String topic, Serializer
keySerializer, Serializer valSerializer, String... parentNames) {
         return addSink(name, topic, keySerializer, valSerializer, (StreamPartitioner) null,
parentNames);
     }
 
@@ -433,7 +433,7 @@ public class TopologyBuilder {
      * @see #addSink(String, String, Serializer, Serializer, String...)
      * @throws TopologyBuilderException if parent processor is not added yet, or if this
processor's name is equal to the parent's name
      */
-    public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K>
keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner,
String... parentNames) {
+    public synchronized final <K, V> TopologyBuilder addSink(String name, String topic,
Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K,
V> partitioner, String... parentNames) {
         if (nodeFactories.containsKey(name))
             throw new TopologyBuilderException("Processor " + name + " is already added.");
 
@@ -465,7 +465,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if parent processor is not added yet, or if this
processor's name is equal to the parent's name
      */
-    public final TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String...
parentNames) {
+    public synchronized final TopologyBuilder addProcessor(String name, ProcessorSupplier
supplier, String... parentNames) {
         if (nodeFactories.containsKey(name))
             throw new TopologyBuilderException("Processor " + name + " is already added.");
 
@@ -493,7 +493,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if state store supplier is already added
      */
-    public final TopologyBuilder addStateStore(StateStoreSupplier supplier, boolean isInternal,
String... processorNames) {
+    public synchronized final TopologyBuilder addStateStore(StateStoreSupplier supplier,
boolean isInternal, String... processorNames) {
         if (stateFactories.containsKey(supplier.name())) {
             throw new TopologyBuilderException("StateStore " + supplier.name() + " is already
added.");
         }
@@ -515,7 +515,7 @@ public class TopologyBuilder {
      * @param supplier the supplier used to obtain this state store {@link StateStore} instance
      * @return this builder instance so methods can be chained together; never null
      */
-    public final TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames)
{
+    public synchronized final TopologyBuilder addStateStore(StateStoreSupplier supplier,
String... processorNames) {
         return this.addStateStore(supplier, true, processorNames);
     }
 
@@ -526,7 +526,7 @@ public class TopologyBuilder {
      * @param stateStoreNames the names of state stores that the processor uses
      * @return this builder instance so methods can be chained together; never null
      */
-    public final TopologyBuilder connectProcessorAndStateStores(String processorName, String...
stateStoreNames) {
+    public synchronized final TopologyBuilder connectProcessorAndStateStores(String processorName,
String... stateStoreNames) {
         if (stateStoreNames != null) {
             for (String stateStoreName : stateStoreNames) {
                 connectProcessorAndStateStore(processorName, stateStoreName);
@@ -546,7 +546,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if less than two processors are specified, or if
one of the processors is not added yet
      */
-    public final TopologyBuilder connectProcessors(String... processorNames) {
+    public synchronized final TopologyBuilder connectProcessors(String... processorNames)
{
         if (processorNames.length < 2)
             throw new TopologyBuilderException("At least two processors need to participate
in the connection.");
 
@@ -569,7 +569,7 @@ public class TopologyBuilder {
      * @param topicName the name of the topic
      * @return this builder instance so methods can be chained together; never null
      */
-    public final TopologyBuilder addInternalTopic(String topicName) {
+    public synchronized final TopologyBuilder addInternalTopic(String topicName) {
         this.internalTopicNames.add(topicName);
 
         return this;
@@ -603,7 +603,7 @@ public class TopologyBuilder {
      *
      * @return groups of topic names
      */
-    public Map<Integer, TopicsInfo> topicGroups() {
+    public synchronized Map<Integer, TopicsInfo> topicGroups() {
         Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>();
 
 
@@ -681,7 +681,7 @@ public class TopologyBuilder {
      *
      * @return groups of node names
      */
-    public Map<Integer, Set<String>> nodeGroups() {
+    public synchronized Map<Integer, Set<String>> nodeGroups() {
         if (nodeGroups == null)
             nodeGroups = makeNodeGroups();
 
@@ -729,7 +729,7 @@ public class TopologyBuilder {
      * @param sourceNodes a set of source node names
      * @return this builder instance so methods can be chained together; never null
      */
-    public final TopologyBuilder copartitionSources(Collection<String> sourceNodes)
{
+    public synchronized final TopologyBuilder copartitionSources(Collection<String>
sourceNodes) {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
         return this;
     }
@@ -740,7 +740,7 @@ public class TopologyBuilder {
      *
      * @return groups of topic names
      */
-    public Collection<Set<String>> copartitionGroups() {
+    public synchronized Collection<Set<String>> copartitionGroups() {
         List<Set<String>> list = new ArrayList<>(copartitionSourceGroups.size());
         for (Set<String> nodeNames : copartitionSourceGroups) {
             Set<String> copartitionGroup = new HashSet<>();
@@ -777,7 +777,7 @@ public class TopologyBuilder {
      *
      * @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
      */
-    public ProcessorTopology build(String applicationId, Integer topicGroupId) {
+    public synchronized ProcessorTopology build(String applicationId, Integer topicGroupId)
{
         Set<String> nodeGroup;
         if (topicGroupId != null) {
             nodeGroup = nodeGroups().get(topicGroupId);
@@ -839,7 +839,7 @@ public class TopologyBuilder {
      * Get the names of topics that are to be consumed by the source nodes created by this
builder.
      * @return the unmodifiable set of topic names used by source nodes, which changes as
new sources are added; never null
      */
-    public Set<String> sourceTopics() {
+    public synchronized Set<String> sourceTopics() {
         Set<String> topics = new HashSet<>();
         for (String topic : sourceTopicNames) {
             if (internalTopicNames.contains(topic)) {
@@ -856,7 +856,7 @@ public class TopologyBuilder {
         return Collections.unmodifiableSet(topics);
     }
 
-    public Pattern sourceTopicPattern() {
+    public synchronized Pattern sourceTopicPattern() {
         if (this.topicPattern == null && !nodeToSourcePatterns.isEmpty()) {
             StringBuilder builder = new StringBuilder();
             for (Pattern pattern : nodeToSourcePatterns.values()) {
@@ -876,7 +876,7 @@ public class TopologyBuilder {
         return this.topicPattern;
     }
 
-    public void updateSubscriptions(SubscriptionUpdates subscriptionUpdates) {
+    public synchronized void updateSubscriptions(SubscriptionUpdates subscriptionUpdates)
{
         this.subscriptionUpdates = subscriptionUpdates;
     }
 
@@ -886,7 +886,7 @@ public class TopologyBuilder {
      * @param applicationId   the streams applicationId. Should be the same as set by
      * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG}
      */
-    public void setApplicationId(String applicationId) {
+    public synchronized void setApplicationId(String applicationId) {
         this.applicationId = applicationId;
     }
 }


Mime
View raw message