kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: resolve conflicts
Date Tue, 19 Jul 2016 15:52:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 e0fa9c2e5 -> 6ec48c1a8


resolve conflicts


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6ec48c1a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6ec48c1a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6ec48c1a

Branch: refs/heads/0.10.0
Commit: 6ec48c1a87d898168a5ec605bd77e76ce1369f88
Parents: e0fa9c2
Author: Damian Guy <damian.guy@gmail.com>
Authored: Tue Jul 19 08:44:48 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Jul 19 08:47:37 2016 -0700

----------------------------------------------------------------------
 .../streams/processor/TopologyBuilder.java      | 38 ++++++++++----------
 1 file changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6ec48c1a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 5425149..7161a80 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -203,11 +203,11 @@ public class TopologyBuilder {
      * @param topics the name of one or more Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
      */
-    public final TopologyBuilder addSource(String name, String... topics) {
+    public synchronized final TopologyBuilder addSource(String name, String... topics) {
         return addSource(name, (Deserializer) null, (Deserializer) null, topics);
     }
 
-    /**
+   /**
      * Add a new source that consumes the named topics and forwards the records to child
processor and/or sink nodes.
      * The source will use the specified key and value deserializers.
      *
@@ -223,7 +223,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if processor is already added or if topics have already
been registered by another source
      */
-    public final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer
valDeserializer, String... topics) {
+    public synchronized final TopologyBuilder addSource(String name, Deserializer keyDeserializer,
Deserializer valDeserializer, String... topics) {
         if (nodeFactories.containsKey(name))
             throw new TopologyBuilderException("Processor " + name + " is already added.");
 
@@ -256,7 +256,7 @@ public class TopologyBuilder {
      * @see #addSink(String, String, Serializer, Serializer, String...)
      * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
      */
-    public final TopologyBuilder addSink(String name, String topic, String... parentNames)
{
+    public synchronized final TopologyBuilder addSink(String name, String topic, String...
parentNames) {
         return addSink(name, topic, (Serializer) null, (Serializer) null, parentNames);
     }
 
@@ -283,7 +283,7 @@ public class TopologyBuilder {
      * @see #addSink(String, String, Serializer, Serializer, String...)
      * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
      */
-    public final TopologyBuilder addSink(String name, String topic, StreamPartitioner partitioner,
String... parentNames) {
+    public synchronized final TopologyBuilder addSink(String name, String topic, StreamPartitioner
partitioner, String... parentNames) {
         return addSink(name, topic, (Serializer) null, (Serializer) null, partitioner, parentNames);
     }
 
@@ -306,7 +306,7 @@ public class TopologyBuilder {
      * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
      */
-    public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer,
Serializer valSerializer, String... parentNames) {
+    public synchronized final TopologyBuilder addSink(String name, String topic, Serializer
keySerializer, Serializer valSerializer, String... parentNames) {
         return addSink(name, topic, keySerializer, valSerializer, (StreamPartitioner) null,
parentNames);
     }
 
@@ -331,7 +331,7 @@ public class TopologyBuilder {
      * @see #addSink(String, String, Serializer, Serializer, String...)
      * @throws TopologyBuilderException if parent processor is not added yet, or if this
processor's name is equal to the parent's name
      */
-    public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K>
keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner,
String... parentNames) {
+    public synchronized final <K, V> TopologyBuilder addSink(String name, String topic,
Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K,
V> partitioner, String... parentNames) {
         if (nodeFactories.containsKey(name))
             throw new TopologyBuilderException("Processor " + name + " is already added.");
 
@@ -363,7 +363,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if parent processor is not added yet, or if this
processor's name is equal to the parent's name
      */
-    public final TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String...
parentNames) {
+    public synchronized final TopologyBuilder addProcessor(String name, ProcessorSupplier
supplier, String... parentNames) {
         if (nodeFactories.containsKey(name))
             throw new TopologyBuilderException("Processor " + name + " is already added.");
 
@@ -391,7 +391,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if state store supplier is already added
      */
-    public final TopologyBuilder addStateStore(StateStoreSupplier supplier, boolean isInternal,
String... processorNames) {
+    public synchronized final TopologyBuilder addStateStore(StateStoreSupplier supplier,
boolean isInternal, String... processorNames) {
         if (stateFactories.containsKey(supplier.name())) {
             throw new TopologyBuilderException("StateStore " + supplier.name() + " is already
added.");
         }
@@ -413,7 +413,7 @@ public class TopologyBuilder {
      * @param supplier the supplier used to obtain this state store {@link StateStore} instance
      * @return this builder instance so methods can be chained together; never null
      */
-    public final TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames)
{
+    public synchronized final TopologyBuilder addStateStore(StateStoreSupplier supplier,
String... processorNames) {
         return this.addStateStore(supplier, true, processorNames);
     }
 
@@ -424,7 +424,7 @@ public class TopologyBuilder {
      * @param stateStoreNames the names of state stores that the processor uses
      * @return this builder instance so methods can be chained together; never null
      */
-    public final TopologyBuilder connectProcessorAndStateStores(String processorName, String...
stateStoreNames) {
+    public synchronized final TopologyBuilder connectProcessorAndStateStores(String processorName,
String... stateStoreNames) {
         if (stateStoreNames != null) {
             for (String stateStoreName : stateStoreNames) {
                 connectProcessorAndStateStore(processorName, stateStoreName);
@@ -444,7 +444,7 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never null
      * @throws TopologyBuilderException if less than two processors are specified, or if
one of the processors is not added yet
      */
-    public final TopologyBuilder connectProcessors(String... processorNames) {
+    public synchronized final TopologyBuilder connectProcessors(String... processorNames)
{
         if (processorNames.length < 2)
             throw new TopologyBuilderException("At least two processors need to participate
in the connection.");
 
@@ -467,7 +467,7 @@ public class TopologyBuilder {
      * @param topicName the name of the topic
      * @return this builder instance so methods can be chained together; never null
      */
-    public final TopologyBuilder addInternalTopic(String topicName) {
+    public synchronized final TopologyBuilder addInternalTopic(String topicName) {
         this.internalTopicNames.add(topicName);
 
         return this;
@@ -501,7 +501,7 @@ public class TopologyBuilder {
      *
      * @return groups of topic names
      */
-    public Map<Integer, TopicsInfo> topicGroups(String applicationId) {
+    public synchronized Map<Integer, TopicsInfo> topicGroups(String applicationId)
{
         Map<Integer, TopicsInfo> topicGroups = new HashMap<>();
 
         if (nodeGroups == null)
@@ -563,7 +563,7 @@ public class TopologyBuilder {
      *
      * @return groups of node names
      */
-    public Map<Integer, Set<String>> nodeGroups() {
+    public synchronized Map<Integer, Set<String>> nodeGroups() {
         if (nodeGroups == null)
             nodeGroups = makeNodeGroups();
 
@@ -611,7 +611,7 @@ public class TopologyBuilder {
      * @param sourceNodes a set of source node names
      * @return this builder instance so methods can be chained together; never null
      */
-    public final TopologyBuilder copartitionSources(Collection<String> sourceNodes)
{
+    public synchronized final TopologyBuilder copartitionSources(Collection<String>
sourceNodes) {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
         return this;
     }
@@ -622,7 +622,7 @@ public class TopologyBuilder {
      *
      * @return groups of topic names
      */
-    public Collection<Set<String>> copartitionGroups() {
+    public synchronized Collection<Set<String>> copartitionGroups() {
         List<Set<String>> list = new ArrayList<>(copartitionSourceGroups.size());
         for (Set<String> nodeNames : copartitionSourceGroups) {
             Set<String> copartitionGroup = new HashSet<>();
@@ -642,7 +642,7 @@ public class TopologyBuilder {
      *
      * @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
      */
-    public ProcessorTopology build(String applicationId, Integer topicGroupId) {
+    public synchronized ProcessorTopology build(String applicationId, Integer topicGroupId)
{
         Set<String> nodeGroup;
         if (topicGroupId != null) {
             nodeGroup = nodeGroups().get(topicGroupId);
@@ -702,7 +702,7 @@ public class TopologyBuilder {
      * Get the names of topics that are to be consumed by the source nodes created by this
builder.
      * @return the unmodifiable set of topic names used by source nodes, which changes as
new sources are added; never null
      */
-    public Set<String> sourceTopics(String applicationId) {
+    public synchronized Set<String> sourceTopics(String applicationId) {
         Set<String> topics = new HashSet<>();
         for (String topic : sourceTopicNames) {
             if (internalTopicNames.contains(topic)) {


Mime
View raw message