kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: need to be backwards compatible with deprecated default configs until removed
Date Tue, 11 Jul 2017 00:59:57 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 6dc04fe8b -> dbf83c28e


MINOR: need to be backwards compatible with deprecated default configs until removed

…s until removed

Author: Bill Bejeck <bill@confluent.io>

Reviewers: Damian Guy <damian.guy@gmail.com>, Matthias J. Sax <matthias@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>

Closes #3461 from bbejeck/MINOR_make_sure_deprecated_streams_configs_still_usable

(cherry picked from commit 2628695356de87aba7d046855fbe1f1963d3f911)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dbf83c28
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dbf83c28
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dbf83c28

Branch: refs/heads/0.11.0
Commit: dbf83c28e92e660c72268cf28bc8c24d67065e5a
Parents: 6dc04fe
Author: Bill Bejeck <bill@confluent.io>
Authored: Mon Jul 10 17:59:43 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Jul 10 17:59:54 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/StreamsConfig.java | 35 +++++++--------
 .../apache/kafka/streams/StreamsConfigTest.java | 46 ++++++++++++++++++++
 2 files changed, 61 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dbf83c28/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 3783612..cf103e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -714,14 +714,7 @@ public class StreamsConfig extends AbstractConfig {
     @Deprecated
     public Serde keySerde() {
         try {
-            Serde<?> serde = getConfiguredInstance(KEY_SERDE_CLASS_CONFIG, Serde.class);
-            // the default value of deprecated key serde is null
-            if (serde == null) {
-                serde = defaultKeySerde();
-            } else {
-                serde.configure(originals(), true);
-            }
-            return serde;
+            return defaultKeySerde();
         } catch (final Exception e) {
             throw new StreamsException(String.format("Failed to configure key serde %s",
get(KEY_SERDE_CLASS_CONFIG)), e);
         }
@@ -735,11 +728,15 @@ public class StreamsConfig extends AbstractConfig {
      */
     public Serde defaultKeySerde() {
         try {
-            Serde<?> serde = getConfiguredInstance(DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serde.class);
+            Serde<?> serde = getConfiguredInstance(KEY_SERDE_CLASS_CONFIG, Serde.class);
+            if (serde == null) {
+                serde = getConfiguredInstance(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serde.class);
+            }
             serde.configure(originals(), true);
             return serde;
         } catch (final Exception e) {
-            throw new StreamsException(String.format("Failed to configure key serde %s",
get(DEFAULT_KEY_SERDE_CLASS_CONFIG)), e);
+            throw new StreamsException(
+                String.format("Failed to configure key serde %s", get(DEFAULT_KEY_SERDE_CLASS_CONFIG)),
e);
         }
     }
 
@@ -752,14 +749,7 @@ public class StreamsConfig extends AbstractConfig {
     @Deprecated
     public Serde valueSerde() {
         try {
-            Serde<?> serde = getConfiguredInstance(VALUE_SERDE_CLASS_CONFIG, Serde.class);
-            // the default value of deprecated value serde is null
-            if (serde == null) {
-                serde = defaultValueSerde();
-            } else {
-                serde.configure(originals(), false);
-            }
-            return serde;
+            return defaultValueSerde();
         } catch (final Exception e) {
             throw new StreamsException(String.format("Failed to configure value serde %s",
get(VALUE_SERDE_CLASS_CONFIG)), e);
         }
@@ -773,11 +763,16 @@ public class StreamsConfig extends AbstractConfig {
      */
     public Serde defaultValueSerde() {
         try {
-            Serde<?> serde = getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serde.class);
+            Serde<?> serde = getConfiguredInstance(VALUE_SERDE_CLASS_CONFIG, Serde.class);
+            if (serde == null) {
+                serde = getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.class);
+            }
             serde.configure(originals(), false);
+
             return serde;
         } catch (final Exception e) {
-            throw new StreamsException(String.format("Failed to configure value serde %s",
get(DEFAULT_VALUE_SERDE_CLASS_CONFIG)), e);
+            throw new StreamsException(
+                String.format("Failed to configure value serde %s", get(DEFAULT_VALUE_SERDE_CLASS_CONFIG)),
e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dbf83c28/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 2dd4553..9f0f67a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.serialization.Deserializer;
@@ -25,6 +26,8 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
+import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.hamcrest.CoreMatchers;
 import org.junit.Before;
 import org.junit.Test;
@@ -41,6 +44,7 @@ import static org.apache.kafka.common.requests.IsolationLevel.READ_UNCOMMITTED;
 import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
 import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
 import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
+import static org.apache.kafka.test.StreamsTestUtils.minimalStreamsConfig;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -390,6 +394,40 @@ public class StreamsConfigTest {
         assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), equalTo(commitIntervalMs));
     }
 
+    @Test
+    public void shouldBeBackwardsCompatibleWithDeprecatedConfigs() {
+        final Properties props = minimalStreamsConfig();
+        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Double().getClass());
+        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Double().getClass());
+        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
+
+        final StreamsConfig config = new StreamsConfig(props);
+        assertTrue(config.defaultKeySerde() instanceof Serdes.DoubleSerde);
+        assertTrue(config.defaultValueSerde() instanceof Serdes.DoubleSerde);
+        assertTrue(config.defaultTimestampExtractor() instanceof MockTimestampExtractor);
+    }
+
+    @Test
+    public void shouldUseNewConfigsWhenPresent() {
+        final Properties props = minimalStreamsConfig();
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
+
+        final StreamsConfig config = new StreamsConfig(props);
+        assertTrue(config.defaultKeySerde() instanceof Serdes.LongSerde);
+        assertTrue(config.defaultValueSerde() instanceof Serdes.LongSerde);
+        assertTrue(config.defaultTimestampExtractor() instanceof MockTimestampExtractor);
+    }
+
+    @Test
+    public void shouldUseCorrectDefaultsWhenNoneSpecified() {
+        final StreamsConfig config = new StreamsConfig(minimalStreamsConfig());
+        assertTrue(config.defaultKeySerde() instanceof Serdes.ByteArraySerde);
+        assertTrue(config.defaultValueSerde() instanceof Serdes.ByteArraySerde);
+        assertTrue(config.defaultTimestampExtractor() instanceof FailOnInvalidTimestamp);
+    }
+
     static class MisconfiguredSerde implements Serde {
         @Override
         public void configure(final Map configs, final boolean isKey) {
@@ -411,4 +449,12 @@ public class StreamsConfigTest {
             return null;
         }
     }
+
+    public static class MockTimestampExtractor implements TimestampExtractor {
+
+        @Override
+        public long extract(final ConsumerRecord<Object, Object> record, final long
previousTimestamp) {
+            return 0;
+        }
+    }
 }


Mime
View raw message