kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: MINOR: internal config objects should not be logged (#5389)
Date Wed, 25 Jul 2018 02:34:01 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 487b954  MINOR: internal config objects should not be logged (#5389)
487b954 is described below

commit 487b95454233e981a65c184c7f3b86adf34058ef
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Tue Jul 24 19:33:55 2018 -0700

    MINOR: internal config objects should not be logged (#5389)
    
    Reviewers: Guozhang Wang <guozhang@confluent.io>, Bill Bejeck <bill@confluent.io>
---
 .../kafka/clients/admin/AdminClientConfig.java     |  6 ++++-
 .../org/apache/kafka/streams/StreamsConfig.java    |  7 +++++-
 .../processor/internals/InternalTopicManager.java  | 18 +++++++-------
 .../internals/StreamsPartitionAssignor.java        | 28 ++++++++++++----------
 4 files changed, 35 insertions(+), 24 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index b5ca15a..058c491 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -174,7 +174,11 @@ public class AdminClientConfig extends AbstractConfig {
     }
 
     public AdminClientConfig(Map<?, ?> props) {
-        super(CONFIG, props);
+        this(props, false);
+    }
+
+    protected AdminClientConfig(Map<?, ?> props, boolean doLog) {
+        super(CONFIG, props, doLog);
     }
 
     public static Set<String> configNames() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index d1f7d93..54fcbc0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -830,7 +830,12 @@ public class StreamsConfig extends AbstractConfig {
      * @param props properties that specify Kafka Streams and internal consumer/producer
configuration
      */
     public StreamsConfig(final Map<?, ?> props) {
-        super(CONFIG, props);
+        this(props, true);
+    }
+
+    protected StreamsConfig(final Map<?, ?> props,
+                            final boolean doLog) {
+        super(CONFIG, props, doLog);
         eosEnabled = EXACTLY_ONCE.equals(getString(PROCESSING_GUARANTEE_CONFIG));
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 2c2df04..6159ee2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -35,7 +35,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
@@ -44,6 +43,12 @@ public class InternalTopicManager {
     private final static String INTERRUPTED_ERROR_MESSAGE = "Thread got interrupted. This
indicates a bug. " +
         "Please report at https://issues.apache.org/jira/projects/KAFKA or dev-mailing list
(https://kafka.apache.org/contact).";
 
+    private static final class InternalAdminClientConfig extends AdminClientConfig {
+        private InternalAdminClientConfig(final Map<?, ?> props) {
+            super(props, false);
+        }
+    }
+
     private final Logger log;
     private final long windowChangeLogAdditionalRetention;
     private final Map<String, String> defaultTopicConfigs = new HashMap<>();
@@ -57,12 +62,12 @@ public class InternalTopicManager {
                                 final StreamsConfig streamsConfig) {
         this.adminClient = adminClient;
 
-        LogContext logContext = new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName()));
+        final LogContext logContext = new LogContext(String.format("stream-thread [%s] ",
Thread.currentThread().getName()));
         log = logContext.logger(getClass());
 
         replicationFactor = streamsConfig.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue();
         windowChangeLogAdditionalRetention = streamsConfig.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG);
-        retries = new AdminClientConfig(streamsConfig.getAdminConfigs("dummy")).getInt(AdminClientConfig.RETRIES_CONFIG);
+        retries = new InternalAdminClientConfig(streamsConfig.getAdminConfigs("dummy")).getInt(AdminClientConfig.RETRIES_CONFIG);
 
         log.debug("Configs:" + Utils.NL,
             "\t{} = {}" + Utils.NL,
@@ -146,12 +151,7 @@ public class InternalTopicManager {
                 }
 
                 if (retry) {
-                    final Iterator<NewTopic> it = newTopics.iterator();
-                    while (it.hasNext()) {
-                        if (createTopicNames.contains(it.next().name())) {
-                            it.remove();
-                        }
-                    }
+                    newTopics.removeIf(newTopic -> createTopicNames.contains(newTopic.name()));
 
                     continue;
                 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index d81d4f1..a56fb28 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -64,7 +64,6 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
     private final static int VERSION_THREE = 3;
     private final static int VERSION_FOUR = 4;
     private final static int EARLIEST_PROBEABLE_VERSION = VERSION_THREE;
-    private int minReceivedMetadataVersion = UNKNOWN;
     protected Set<Integer> supportedVersions = new HashSet<>();
 
     private Logger log;
@@ -194,17 +193,19 @@ public class StreamsPartitionAssignor implements PartitionAssignor,
Configurable
         }
     }
 
-    protected static final Comparator<TopicPartition> PARTITION_COMPARATOR = new Comparator<TopicPartition>()
{
-        @Override
-        public int compare(final TopicPartition p1,
-                           final TopicPartition p2) {
-            final int result = p1.topic().compareTo(p2.topic());
+    private static final class InternalStreamsConfig extends StreamsConfig {
+        private InternalStreamsConfig(final Map<?, ?> props) {
+            super(props, false);
+        }
+    }
 
-            if (result != 0) {
-                return result;
-            } else {
-                return Integer.compare(p1.partition(), p2.partition());
-            }
+    protected static final Comparator<TopicPartition> PARTITION_COMPARATOR = (p1, p2)
-> {
+        final int result = p1.topic().compareTo(p2.topic());
+
+        if (result != 0) {
+            return result;
+        } else {
+            return Integer.compare(p1.partition(), p2.partition());
         }
     };
 
@@ -236,7 +237,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
      */
     @Override
     public void configure(final Map<String, ?> configs) {
-        final StreamsConfig streamsConfig = new StreamsConfig(configs);
+        final StreamsConfig streamsConfig = new InternalStreamsConfig(configs);
 
         // Setting the logger with the passed in client thread name
         logPrefix = String.format("stream-thread [%s] ", streamsConfig.getString(CommonClientConfigs.CLIENT_ID_CONFIG));
@@ -394,7 +395,8 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
         final Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>();
         final Set<String> futureConsumers = new HashSet<>();
 
-        minReceivedMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION;
+        int minReceivedMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION;
+
         supportedVersions.clear();
         int futureMetadataVersion = UNKNOWN;
         for (final Map.Entry<String, Subscription> entry : subscriptions.entrySet())
{


Mime
View raw message