kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2902: streaming config use get base consumer configs.
Date Thu, 03 Dec 2015 02:40:12 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9fb1e2573 -> a5382a333


KAFKA-2902: streaming config use get base consumer configs.

Changes made for using getBaseConsumerConfigs from StreamingConfig.getConsumerConfigs.

Author: bbejeck <bbejeck@gmail.com>
Author: Bill Bejeck <bbejeck@gmail.com>

Reviewers: Guozhang Wang

Closes #596 from bbejeck/KAFKA-2902-StreamingConfig-use-getBaseConsumerConfigs


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a5382a33
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a5382a33
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a5382a33

Branch: refs/heads/trunk
Commit: a5382a333ebb51a10c1a1cab46d66f10abff128a
Parents: 9fb1e25
Author: bbejeck <bbejeck@gmail.com>
Authored: Wed Dec 2 18:40:07 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Dec 2 18:40:07 2015 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/StreamingConfig.java   |  2 +-
 .../kafka/streams/StreamingConfigTest.java      | 70 ++++++++++++++++++++
 2 files changed, 71 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a5382a33/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
index eb4b83f..437afd8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
@@ -222,7 +222,7 @@ public class StreamingConfig extends AbstractConfig {
     }
 
     public Map<String, Object> getConsumerConfigs(StreamThread streamThread) {
-        Map<String, Object> props = getRestoreConsumerConfigs();
+        Map<String, Object> props = getBaseConsumerConfigs();
         props.put(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG));
         props.put(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
         props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, KafkaStreamingPartitionAssignor.class.getName());

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5382a33/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java
new file mode 100644
index 0000000..a491e4a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.examples.WallclockTimestampExtractor;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Properties;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+
+
+public class StreamingConfigTest {
+
+    private Properties props = new Properties();
+    private StreamingConfig streamingConfig;
+    private StreamThread streamThreadPlaceHolder = null;
+
+
+    @Before
+    public void setUp() {
+        props.put(StreamingConfig.CLIENT_ID_CONFIG, "Example-Processor-Job");
+        props.put("group.id", "test-consumer-group");
+        props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+        props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
+        streamingConfig = new StreamingConfig(props);
+    }
+
+
+
+    @Test
+    public void testGetConsumerConfigs() throws Exception {
+        Map<String, Object> returnedProps = streamingConfig.getConsumerConfigs(streamThreadPlaceHolder);
+        assertEquals(returnedProps.get("group.id"), "test-consumer-group");
+
+    }
+
+    @Test
+    public void testGetRestoreConsumerConfigs() throws Exception {
+        Map<String, Object> returnedProps = streamingConfig.getRestoreConsumerConfigs();
+        assertNull(returnedProps.get("group.id"));
+    }
+}


Mime
View raw message