kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6180; Add a Validator for NonNull configurations and remove redundant null checks on lists (#4188)
Date Thu, 25 Jan 2018 05:06:54 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new aa42a11  KAFKA-6180; Add a Validator for NonNull configurations and remove redundant
null checks on lists (#4188)
aa42a11 is described below

commit aa42a11dfd99ee9ab24d2e9a7521ef1c97ae1ff4
Author: Charly Molter <charly.molter@gmail.com>
AuthorDate: Thu Jan 25 05:06:44 2018 +0000

    KAFKA-6180; Add a Validator for NonNull configurations and remove redundant null checks
on lists (#4188)
---
 .../kafka/clients/consumer/ConsumerConfig.java     |  9 +++++--
 .../kafka/clients/consumer/KafkaConsumer.java      | 10 +++----
 .../kafka/clients/producer/KafkaProducer.java      | 31 +++++++---------------
 .../kafka/clients/producer/ProducerConfig.java     |  9 ++++---
 .../apache/kafka/common/config/AbstractConfig.java |  4 +--
 .../org/apache/kafka/common/config/ConfigDef.java  | 29 ++++++++++++++++++++
 .../kafka/common/security/ssl/SslFactory.java      |  4 +--
 .../java/org/apache/kafka/common/utils/Utils.java  |  2 ++
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  2 +-
 .../kafka/common/config/AbstractConfigTest.java    | 16 +++++++++++
 .../apache/kafka/common/config/ConfigDefTest.java  |  2 ++
 .../kafka/connect/runtime/ConnectorConfig.java     |  8 ++----
 core/src/main/scala/kafka/server/KafkaConfig.scala |  4 +--
 13 files changed, 84 insertions(+), 46 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 3fe58d7..72e496c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -257,6 +257,8 @@ public class ConsumerConfig extends AbstractConfig {
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
                                         Type.LIST,
+                                        Collections.emptyList(),
+                                        new ConfigDef.NonNullValidator(),
                                         Importance.HIGH,
                                         CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
                                 .define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH,
GROUP_ID_DOC)
@@ -273,6 +275,7 @@ public class ConsumerConfig extends AbstractConfig {
                                 .define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                                         Type.LIST,
                                         Collections.singletonList(RangeAssignor.class),
+                                        new ConfigDef.NonNullValidator(),
                                         Importance.MEDIUM,
                                         PARTITION_ASSIGNMENT_STRATEGY_DOC)
                                 .define(METADATA_MAX_AGE_CONFIG,
@@ -382,7 +385,8 @@ public class ConsumerConfig extends AbstractConfig {
                                         CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
                                 .define(METRIC_REPORTER_CLASSES_CONFIG,
                                         Type.LIST,
-                                        "",
+                                        Collections.emptyList(),
+                                        new ConfigDef.NonNullValidator(),
                                         Importance.LOW,
                                         CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
                                 .define(KEY_DESERIALIZER_CLASS_CONFIG,
@@ -407,7 +411,8 @@ public class ConsumerConfig extends AbstractConfig {
                                         CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
                                 .define(INTERCEPTOR_CLASSES_CONFIG,
                                         Type.LIST,
-                                        null,
+                                        Collections.emptyList(),
+                                        new ConfigDef.NonNullValidator(),
                                         Importance.LOW,
                                         INTERCEPTOR_CLASSES_DOC)
                                 .define(MAX_POLL_RECORDS_CONFIG,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 08877c9..0bbbcf1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -63,6 +63,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -683,7 +684,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
             userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
             List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs,
false)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                     ConsumerInterceptor.class);
-            this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList);
+            this.interceptors = new ConsumerInterceptors<>(interceptorList);
             if (keyDeserializer == null) {
                 this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                         Deserializer.class);
@@ -815,7 +816,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
         this.keyDeserializer = keyDeserializer;
         this.valueDeserializer = valueDeserializer;
         this.fetcher = fetcher;
-        this.interceptors = interceptors;
+        this.interceptors = Objects.requireNonNull(interceptors);
         this.time = time;
         this.client = client;
         this.metrics = metrics;
@@ -1122,10 +1123,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
                     if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
                         client.pollNoWakeup();
 
-                    if (this.interceptors == null)
-                        return new ConsumerRecords<>(records);
-                    else
-                        return this.interceptors.onConsume(new ConsumerRecords<>(records));
+                    return this.interceptors.onConsume(new ConsumerRecords<>(records));
                 }
 
                 long elapsed = time.milliseconds() - start;
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 4e67fe8..5fc9a1b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -378,7 +378,7 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
             userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
             List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs,
false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                     ProducerInterceptor.class);
-            this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
+            this.interceptors = new ProducerInterceptors<>(interceptorList);
             ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer,
valueSerializer, interceptorList, reporters);
             this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
             this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
@@ -780,7 +780,7 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
     @Override
     public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback
callback) {
         // intercept the record, which can be potentially modified; this method does not
throw exceptions
-        ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record
: this.interceptors.onSend(record);
+        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
         return doSend(interceptedRecord, callback);
     }
 
@@ -822,7 +822,7 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
             long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
             log.trace("Sending record {} with callback {} to topic {} partition {}", record,
callback, record.topic(), partition);
             // producer callback will make sure to call both 'callback' and interceptor callback
-            Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback,
this.interceptors, tp);
+            Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors,
tp);
 
             if (transactionManager != null && transactionManager.isTransactional())
                 transactionManager.maybeAddPartitionToTransaction(tp);
@@ -842,29 +842,24 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
             if (callback != null)
                 callback.onCompletion(null, e);
             this.errors.record();
-            if (this.interceptors != null)
-                this.interceptors.onSendError(record, tp, e);
+            this.interceptors.onSendError(record, tp, e);
             return new FutureFailure(e);
         } catch (InterruptedException e) {
             this.errors.record();
-            if (this.interceptors != null)
-                this.interceptors.onSendError(record, tp, e);
+            this.interceptors.onSendError(record, tp, e);
             throw new InterruptException(e);
         } catch (BufferExhaustedException e) {
             this.errors.record();
             this.metrics.sensor("buffer-exhausted-records").record();
-            if (this.interceptors != null)
-                this.interceptors.onSendError(record, tp, e);
+            this.interceptors.onSendError(record, tp, e);
             throw e;
         } catch (KafkaException e) {
             this.errors.record();
-            if (this.interceptors != null)
-                this.interceptors.onSendError(record, tp, e);
+            this.interceptors.onSendError(record, tp, e);
             throw e;
         } catch (Exception e) {
             // we notify interceptor about all exceptions, since onSend is called before
anything else in this method
-            if (this.interceptors != null)
-                this.interceptors.onSendError(record, tp, e);
+            this.interceptors.onSendError(record, tp, e);
             throw e;
         }
     }
@@ -1198,14 +1193,8 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
         }
 
         public void onCompletion(RecordMetadata metadata, Exception exception) {
-            if (this.interceptors != null) {
-                if (metadata == null) {
-                    this.interceptors.onAcknowledgement(new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP,
-                                    Long.valueOf(-1L), -1, -1), exception);
-                } else {
-                    this.interceptors.onAcknowledgement(metadata, exception);
-                }
-            }
+            metadata = metadata != null ? metadata : new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP,
Long.valueOf(-1L), -1, -1);
+            this.interceptors.onAcknowledgement(metadata, exception);
             if (this.userCallback != null)
                 this.userCallback.onCompletion(metadata, exception);
         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 0631814..6428dc4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serializer;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -221,7 +222,7 @@ public class ProducerConfig extends AbstractConfig {
             "Note that transactions requires a cluster of at least three brokers by default
what is the recommended setting for production; for development you can change this, by adjusting
broker setting `transaction.state.log.replication.factor`.";
 
     static {
-        CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH,
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
+        CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(),
new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
                                 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L,
atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
                                 .define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE),
Importance.HIGH, RETRIES_DOC)
                                 .define(ACKS_CONFIG,
@@ -273,7 +274,8 @@ public class ProducerConfig extends AbstractConfig {
                                         CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
                                 .define(METRIC_REPORTER_CLASSES_CONFIG,
                                         Type.LIST,
-                                        "",
+                                        Collections.emptyList(),
+                                        new ConfigDef.NonNullValidator(),
                                         Importance.LOW,
                                         CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
                                 .define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
@@ -302,7 +304,8 @@ public class ProducerConfig extends AbstractConfig {
                                         Importance.MEDIUM, PARTITIONER_CLASS_DOC)
                                 .define(INTERCEPTOR_CLASSES_CONFIG,
                                         Type.LIST,
-                                        null,
+                                        Collections.emptyList(),
+                                        new ConfigDef.NonNullValidator(),
                                         Importance.LOW,
                                         INTERCEPTOR_CLASSES_DOC)
                                 .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 61a5798..9e32074 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -289,9 +289,7 @@ public class AbstractConfig {
      */
     public <T> List<T> getConfiguredInstances(String key, Class<T> t, Map<String,
Object> configOverrides) {
         List<String> klasses = getList(key);
-        List<T> objects = new ArrayList<T>();
-        if (klasses == null)
-            return objects;
+        List<T> objects = new ArrayList<>();
         Map<String, Object> configPairs = originals();
         configPairs.putAll(configOverrides);
         for (Object klass : klasses) {
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 3340ab3..3080298 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -937,6 +937,35 @@ public class ConfigDef {
         }
     }
 
+    public static class NonNullValidator implements Validator {
+        @Override
+        public void ensureValid(String name, Object value) {
+            if (value == null) {
+                // Pass in the string null to avoid the findbugs warning
+                throw new ConfigException(name, "null", "entry must be non null");
+            }
+        }
+    }
+
+    public static class CompositeValidator implements Validator {
+        private final List<Validator> validators;
+
+        private CompositeValidator(List<Validator> validators) {
+            this.validators = Collections.unmodifiableList(validators);
+        }
+
+        public static CompositeValidator of(Validator... validators) {
+            return new CompositeValidator(Arrays.asList(validators));
+        }
+
+        @Override
+        public void ensureValid(String name, Object value) {
+            for (Validator validator: validators) {
+                validator.ensureValid(name, value);
+            }
+        }
+    }
+
     public static class NonEmptyString implements Validator {
 
         @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index 285582c..0d1fbf9 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -89,12 +89,12 @@ public class SslFactory implements Reconfigurable {
 
         @SuppressWarnings("unchecked")
         List<String> cipherSuitesList = (List<String>) configs.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG);
-        if (cipherSuitesList != null)
+        if (cipherSuitesList != null && !cipherSuitesList.isEmpty())
             this.cipherSuites = cipherSuitesList.toArray(new String[cipherSuitesList.size()]);
 
         @SuppressWarnings("unchecked")
         List<String> enabledProtocolsList = (List<String>) configs.get(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG);
-        if (enabledProtocolsList != null)
+        if (enabledProtocolsList != null && !enabledProtocolsList.isEmpty())
             this.enabledProtocols = enabledProtocolsList.toArray(new String[enabledProtocolsList.size()]);
 
         String endpointIdentification = (String) configs.get(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 8d8f118..9da3822 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -54,6 +54,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Matcher;
@@ -464,6 +465,7 @@ public final class Utils {
      * @return The string representation.
      */
     public static <T> String join(Collection<T> list, String separator) {
+        Objects.requireNonNull(list);
         StringBuilder sb = new StringBuilder();
         Iterator<T> iter = list.iterator();
         while (iter.hasNext()) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index ab682d6..a827168 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1615,7 +1615,7 @@ public class KafkaConsumerTest {
 
         OffsetResetStrategy autoResetStrategy = OffsetResetStrategy.EARLIEST;
         List<PartitionAssignor> assignors = Arrays.asList(assignor);
-        ConsumerInterceptors<String, String> interceptors = null;
+        ConsumerInterceptors<String, String> interceptors = new ConsumerInterceptors<>(Collections.<ConsumerInterceptor<String,
String>>emptyList());
 
         Metrics metrics = new Metrics();
         ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricGroupPrefix);
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index 9e21179..2e15715 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.security.TestSecurityConfig;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -51,6 +52,21 @@ public class AbstractConfigTest {
     }
 
     @Test
+    public void testEmptyList() {
+        AbstractConfig conf;
+        ConfigDef configDef = new ConfigDef().define("a", Type.LIST, "", new ConfigDef.NonNullValidator(),
Importance.HIGH, "doc");
+
+        conf = new AbstractConfig(configDef, Collections.emptyMap());
+        assertEquals(Collections.emptyList(), conf.getList("a"));
+
+        conf = new AbstractConfig(configDef, Collections.singletonMap("a", ""));
+        assertEquals(Collections.emptyList(), conf.getList("a"));
+
+        conf = new AbstractConfig(configDef, Collections.singletonMap("a", "b,c,d"));
+        assertEquals(Arrays.asList("b", "c", "d"), conf.getList("a"));
+    }
+
+    @Test
     public void testOriginalsWithPrefix() {
         Properties props = new Properties();
         props.put("foo.bar", "abc");
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index ed4997d..602147b 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -158,6 +158,8 @@ public class ConfigDefTest {
         testValidators(Type.STRING, ValidString.in("good", "values", "default"), "default",
                 new Object[]{"good", "values", "default"}, new Object[]{"bad", "inputs",
null});
         testValidators(Type.LIST, ConfigDef.ValidList.in("1", "2", "3"), "1", new Object[]{"1",
"2", "3"}, new Object[]{"4", "5", "6"});
+        testValidators(Type.STRING, new ConfigDef.NonNullValidator(), "a", new Object[]{"abb"},
new Object[] {null});
+        testValidators(Type.STRING, ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(),
ValidString.in("a", "b")), "a", new Object[]{"a", "b"}, new Object[] {null, -1, "c"});
     }
 
     @Test
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 0f8c390..e63d100 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -101,16 +101,15 @@ public class ConnectorConfig extends AbstractConfig {
                 .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG),
Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY)
                 .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, KEY_CONVERTER_CLASS_DOC,
COMMON_GROUP, 4, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
                 .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, VALUE_CONVERTER_CLASS_DOC,
COMMON_GROUP, 5, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY)
-                .define(TRANSFORMS_CONFIG, Type.LIST, null, new ConfigDef.Validator() {
+                .define(TRANSFORMS_CONFIG, Type.LIST, Collections.emptyList(), ConfigDef.CompositeValidator.of(new
ConfigDef.NonNullValidator(), new ConfigDef.Validator() {
                     @Override
                     public void ensureValid(String name, Object value) {
-                        if (value == null) return;
                         final List<String> transformAliases = (List<String>)
value;
                         if (transformAliases.size() > new HashSet<>(transformAliases).size())
{
                             throw new ConfigException(name, value, "Duplicate alias provided.");
                         }
                     }
-                }, Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, 6, Width.LONG, TRANSFORMS_DISPLAY);
+                }), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, 6, Width.LONG, TRANSFORMS_DISPLAY);
     }
 
     public ConnectorConfig(Plugins plugins) {
@@ -139,9 +138,6 @@ public class ConnectorConfig extends AbstractConfig {
      */
     public <R extends ConnectRecord<R>> List<Transformation<R>> transformations()
{
         final List<String> transformAliases = getList(TRANSFORMS_CONFIG);
-        if (transformAliases == null || transformAliases.isEmpty()) {
-            return Collections.emptyList();
-        }
 
         final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 47f13f6..fba186c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -18,7 +18,7 @@
 package kafka.server
 
 import java.util
-import java.util.Properties
+import java.util.{Collections, Properties}
 
 import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1}
 import kafka.cluster.EndPoint
@@ -894,7 +894,7 @@ object KafkaConfig {
       .define(SslEndpointIdentificationAlgorithmProp, STRING, null, LOW, SslEndpointIdentificationAlgorithmDoc)
       .define(SslSecureRandomImplementationProp, STRING, null, LOW, SslSecureRandomImplementationDoc)
       .define(SslClientAuthProp, STRING, Defaults.SslClientAuth, in(Defaults.SslClientAuthRequired,
Defaults.SslClientAuthRequested, Defaults.SslClientAuthNone), MEDIUM, SslClientAuthDoc)
-      .define(SslCipherSuitesProp, LIST, null, MEDIUM, SslCipherSuitesDoc)
+      .define(SslCipherSuitesProp, LIST, Collections.emptyList(), MEDIUM, SslCipherSuitesDoc)
 
       /** ********* Sasl Configuration ****************/
       .define(SaslMechanismInterBrokerProtocolProp, STRING, Defaults.SaslMechanismInterBrokerProtocol,
MEDIUM, SaslMechanismInterBrokerProtocolDoc)

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message