kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3579; Reference both old and new consumer properties in `TopicCommand`
Date Sun, 08 May 2016 22:05:14 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 51f7a35c9 -> 62b9fa225


KAFKA-3579; Reference both old and new consumer properties in `TopicCommand`

Add references to the new consumer property 'max.partition.fetch.bytes' along with the old
consumer property 'fetch.message.max.bytes' in the corresponding warning messages of TopicCommand.
Also, create and leverage a static variable for the default value of the new consumer property.
Also, use 'DEFAULT_...' for default propoerty constant names in the code instead of '..._DEFAULT'.

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Manikumar reddy O <manikumar.reddy@gmail.com>, Ashish Singh <asingh@cloudera.com>,
Grant Henke <granthenke@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #1239 from vahidhashemian/KAFKA-3579


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/62b9fa22
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/62b9fa22
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/62b9fa22

Branch: refs/heads/trunk
Commit: 62b9fa22545a8e254b4ffd07ddc5bd3315542548
Parents: 51f7a35
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Authored: Sun May 8 22:27:58 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Sun May 8 22:27:58 2016 +0100

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerConfig.java  |  7 ++--
 .../internals/ConsumerCoordinatorTest.java      | 14 ++++----
 .../main/scala/kafka/admin/TopicCommand.scala   | 35 +++++++++++---------
 3 files changed, 31 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/62b9fa22/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 69c4a36..6523d18 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -115,6 +115,7 @@ public class ConsumerConfig extends AbstractConfig {
      */
     public static final String MAX_PARTITION_FETCH_BYTES_CONFIG = "max.partition.fetch.bytes";
     private static final String MAX_PARTITION_FETCH_BYTES_DOC = "The maximum amount of data
per-partition the server will return. The maximum total memory used for a request will be
<code>#partitions * max.partition.fetch.bytes</code>. This size must be at least
as large as the maximum message size the server allows or else it is possible for the producer
to send messages larger than the consumer can fetch. If that happens, the consumer can get
stuck trying to fetch a large message on a certain partition.";
+    public static final int DEFAULT_MAX_PARTITION_FETCH_BYTES = 1 * 1024 * 1024;
 
     /** <code>send.buffer.bytes</code> */
     public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG;
@@ -184,7 +185,7 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String EXCLUDE_INTERNAL_TOPICS_CONFIG = "exclude.internal.topics";
     private static final String EXCLUDE_INTERNAL_TOPICS_DOC = "Whether records from internal
topics (such as offsets) should be exposed to the consumer. "
                                                             + "If set to <code>true</code>
the only way to receive records from an internal topic is subscribing to it.";
-    public static final boolean EXCLUDE_INTERNAL_TOPICS_DEFAULT = true;
+    public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true;
     
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
@@ -231,7 +232,7 @@ public class ConsumerConfig extends AbstractConfig {
                                         CommonClientConfigs.CLIENT_ID_DOC)
                                 .define(MAX_PARTITION_FETCH_BYTES_CONFIG,
                                         Type.INT,
-                                        1 * 1024 * 1024,
+                                        DEFAULT_MAX_PARTITION_FETCH_BYTES,
                                         atLeast(0),
                                         Importance.HIGH,
                                         MAX_PARTITION_FETCH_BYTES_DOC)
@@ -332,7 +333,7 @@ public class ConsumerConfig extends AbstractConfig {
                                         MAX_POLL_RECORDS_DOC)
                                 .define(EXCLUDE_INTERNAL_TOPICS_CONFIG,
                                         Type.BOOLEAN,
-                                        EXCLUDE_INTERNAL_TOPICS_DEFAULT,
+                                        DEFAULT_EXCLUDE_INTERNAL_TOPICS,
                                         Importance.MEDIUM,
                                         EXCLUDE_INTERNAL_TOPICS_DOC)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/62b9fa22/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 82a854a..fc5c929 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -114,7 +114,7 @@ public class ConsumerCoordinatorTest {
         this.partitionAssignor.clear();
 
         client.setNode(node);
-        this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT,
autoCommitEnabled);
+        this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS,
autoCommitEnabled);
     }
 
     @After
@@ -735,7 +735,7 @@ public class ConsumerCoordinatorTest {
         final String consumerId = "consumer";
 
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true);
+                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);
 
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
@@ -761,7 +761,7 @@ public class ConsumerCoordinatorTest {
         final String consumerId = "consumer";
 
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true);
+                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);
 
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
@@ -789,7 +789,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testAutoCommitManualAssignment() {
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true);
+                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);
 
         subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 100);
@@ -807,7 +807,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testAutoCommitManualAssignmentCoordinatorUnknown() {
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
-                ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true);
+                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);
 
         subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 100);
@@ -1096,7 +1096,7 @@ public class ConsumerCoordinatorTest {
 
         try (Metrics metrics = new Metrics(time)) {
             ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(roundRobin,
range),
-                    ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, false);
+                    ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false);
             List<ProtocolMetadata> metadata = coordinator.metadata();
             assertEquals(2, metadata.size());
             assertEquals(roundRobin.name(), metadata.get(0).name());
@@ -1105,7 +1105,7 @@ public class ConsumerCoordinatorTest {
 
         try (Metrics metrics = new Metrics(time)) {
             ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(range,
roundRobin),
-                    ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, false);
+                    ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false);
             List<ProtocolMetadata> metadata = coordinator.metadata();
             assertEquals(2, metadata.size());
             assertEquals(range.name(), metadata.get(0).name());

http://git-wip-us.apache.org/repos/asf/kafka/blob/62b9fa22/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 029adea..e6ebb96 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -20,7 +20,7 @@ package kafka.admin
 import java.util.Properties
 import joptsimple._
 import kafka.common.{AdminCommandFailedException, Topic, TopicExistsException}
-import kafka.consumer.{ConsumerConfig, Whitelist}
+import kafka.consumer.{ConsumerConfig => OldConsumerConfig, Whitelist}
 import kafka.coordinator.GroupCoordinator
 import kafka.log.{Defaults, LogConfig}
 import kafka.server.ConfigType
@@ -31,6 +31,7 @@ import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.Utils
 import scala.collection.JavaConversions._
 import scala.collection._
+import org.apache.kafka.clients.consumer.{ConsumerConfig => NewConsumerConfig}
 import org.apache.kafka.common.internals.TopicConstants
 
 
@@ -383,30 +384,34 @@ object TopicCommand extends Logging {
   def shortMessageSizeWarning(maxMessageBytes: Int): String = {
     "\n\n" +
       "*****************************************************************************************************\n"
+
-      "*** WARNING: you are creating a topic where the max.message.bytes is greater than
the consumer ***\n" +
-      "*** default. This operation is potentially dangerous. Consumers will get failures
if their        ***\n" +
-      "*** fetch.message.max.bytes < the value you are using.                        
                   ***\n" +
+      "*** WARNING: you are creating a topic where the max.message.bytes is greater than
the broker's    ***\n" +
+      "*** default max.message.bytes. This operation is potentially dangerous. Consumers
will get        ***\n" +
+      s"*** failures if their fetch.message.max.bytes (old consumer) or ${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG}
        ***\n"+ 
+      "*** (new consumer) < the value you are using.                                 
                   ***\n" +
       "*****************************************************************************************************\n"
+
       s"- value set here: $maxMessageBytes\n" +
-      s"- Default Consumer fetch.message.max.bytes: ${ConsumerConfig.FetchSize}\n" +
+      s"- Default Old Consumer fetch.message.max.bytes: ${OldConsumerConfig.FetchSize}\n"
+
+      s"- Default New Consumer ${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG}: ${NewConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES}\n"
+
       s"- Default Broker max.message.bytes: ${kafka.server.Defaults.MessageMaxBytes}\n\n"
   }
 
   def longMessageSizeWarning(maxMessageBytes: Int): String = {
     "\n\n" +
-      "****************************************************************************************************\n"
+
-      "*** WARNING: you are creating a topic where the max.message.bytes is greater than
the broker      ***\n" +
-      "*** default. This operation is dangerous. There are two potential side effects:  
               ***\n" +
-      "*** - Consumers will get failures if their fetch.message.max.bytes < the value
you are using     ***\n" +
-      "*** - Producer requests larger than replica.fetch.max.bytes will not replicate and
hence have    ***\n" +
-      "***   a higher risk of data loss                                                 
               ***\n" +
-      "*** You should ensure both of these settings are greater than the value set here before
using    ***\n" +
-      "*** this topic.                                                                  
               ***\n" +
-      "****************************************************************************************************\n"
+
+      "*****************************************************************************************************\n"
+
+      "*** WARNING: you are creating a topic where the max.message.bytes is greater than
the broker's    ***\n" +
+      "*** default max.message.bytes. This operation is dangerous. There are two potential
side effects: ***\n" +
+      "*** - Consumers will get failures if their fetch.message.max.bytes (old consumer)
or              ***\n" +
+      s"***   ${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG} (new consumer) < the
value you are using                          ***\n" +
+      "*** - Producer requests larger than replica.fetch.max.bytes will not replicate and
hence have     ***\n" +
+      "***   a higher risk of data loss                                                 
                ***\n" +
+      "*** You should ensure both of these settings are greater than the value set here before
using     ***\n" +
+      "*** this topic.                                                                  
                ***\n" +
+      "*****************************************************************************************************\n"
+
       s"- value set here: $maxMessageBytes\n" +
       s"- Default Broker replica.fetch.max.bytes: ${kafka.server.Defaults.ReplicaFetchMaxBytes}\n"
+
       s"- Default Broker max.message.bytes: ${kafka.server.Defaults.MessageMaxBytes}\n" +
-      s"- Default Consumer fetch.message.max.bytes: ${ConsumerConfig.FetchSize}\n\n"
+      s"- Default Old Consumer fetch.message.max.bytes: ${OldConsumerConfig.FetchSize}\n"
+
+      s"- Default New Consumer ${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG}: ${NewConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES}\n\n"
   }
 }
 


Mime
View raw message