kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3959: enforce offsets.topic.replication.factor upon __consumer_offsets auto topic creation (KIP-115)
Date Thu, 02 Feb 2017 03:55:20 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 3d9f34dd8 -> 063d534c5


KAFKA-3959: enforce offsets.topic.replication.factor upon __consumer_offsets auto topic creation
(KIP-115)

Kafka brokers have a config called "offsets.topic.replication.factor" that specify the replication
factor for the "__consumer_offsets" topic. The problem is that this config isn't being enforced.
If an attempt to create the internal topic is made when there are fewer brokers than "offsets.topic.replication.factor",
the topic ends up getting created anyway with the current number of live brokers. The current
behavior is pretty surprising when you have clients or tooling running as the cluster is getting
setup. Even if your cluster ends up being huge, you'll find out much later that __consumer_offsets
was setup with no replication.

The cluster not meeting the "offsets.topic.replication.factor" requirement on the internal
topic is another way of saying the cluster isn't fully setup yet.

The right behavior should be for "offsets.topic.replication.factor" to be enforced. Topic
creation of the internal topic should fail with GROUP_COORDINATOR_NOT_AVAILABLE until the
"offsets.topic.replication.factor" requirement is met. This closely resembles the behavior
of regular topic creation when the requested replication factor exceeds the current size of
the cluster, as the request fails with error INVALID_REPLICATION_FACTOR.

Author: Onur Karaman <okaraman@linkedin.com>

Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>,
Ewen Cheslack-Postava <ewen@confluent.io>

Closes #2177 from onurkaraman/KAFKA-3959


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/063d534c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/063d534c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/063d534c

Branch: refs/heads/trunk
Commit: 063d534c5160316cdf22e476d128e872a1412783
Parents: 3d9f34d
Author: Onur Karaman <okaraman@linkedin.com>
Authored: Wed Feb 1 19:55:06 2017 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Wed Feb 1 19:55:06 2017 -0800

----------------------------------------------------------------------
 config/server.properties                        |  5 +++++
 .../src/main/scala/kafka/server/KafkaApis.scala | 20 ++++++++++++--------
 .../main/scala/kafka/server/KafkaConfig.scala   |  5 +----
 .../test/scala/unit/kafka/utils/TestUtils.scala |  1 +
 docs/upgrade.html                               |  1 +
 .../integration/utils/EmbeddedKafkaCluster.java |  1 +
 .../services/kafka/templates/kafka.properties   |  1 +
 7 files changed, 22 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/063d534c/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index 506d0e7..37b5bb3 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -71,6 +71,11 @@ num.partitions=1
 # This value is recommended to be increased for installations with data dirs located in RAID
array.
 num.recovery.threads.per.data.dir=1
 
+############################# Internal Topic Settings  #############################
+# The replication factor for the group metadata internal topic "__consumer_offsets".
+# For anything other than development testing, a value greater than 1 is recommended to ensure
availability such as 3.
+offsets.topic.replication.factor=1
+
 ############################# Log Flush Policy #############################
 
 # Messages are immediately written to the filesystem but by default we only fsync() to sync

http://git-wip-us.apache.org/repos/asf/kafka/blob/063d534c/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 7bb3ed5..785c9ae 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -778,13 +778,13 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   private def createGroupMetadataTopic(): MetadataResponse.TopicMetadata = {
     val aliveBrokers = metadataCache.getAliveBrokers
-    val offsetsTopicReplicationFactor =
-      if (aliveBrokers.nonEmpty)
-        Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length)
-      else
-        config.offsetsTopicReplicationFactor.toInt
-    createTopic(Topic.GroupMetadataTopicName, config.offsetsTopicPartitions,
-      offsetsTopicReplicationFactor, coordinator.offsetsTopicConfigs)
+    if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
+      new MetadataResponse.TopicMetadata(Errors.GROUP_COORDINATOR_NOT_AVAILABLE, Topic.GroupMetadataTopicName,
true,
+        java.util.Collections.emptyList())
+    } else {
+      createTopic(Topic.GroupMetadataTopicName, config.offsetsTopicPartitions,
+        config.offsetsTopicReplicationFactor.toInt, coordinator.offsetsTopicConfigs)
+    }
   }
 
   private def getOrCreateGroupMetadataTopic(listenerName: ListenerName): MetadataResponse.TopicMetadata
= {
@@ -800,7 +800,11 @@ class KafkaApis(val requestChannel: RequestChannel,
       val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
       val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
         if (topic == Topic.GroupMetadataTopicName) {
-          createGroupMetadataTopic()
+          val topicMetadata = createGroupMetadataTopic()
+          if (topicMetadata.error() == Errors.GROUP_COORDINATOR_NOT_AVAILABLE) {
+            new MetadataResponse.TopicMetadata(Errors.INVALID_REPLICATION_FACTOR, topic,
Topic.isInternal(topic),
+              java.util.Collections.emptyList())
+          } else topicMetadata
         } else if (config.autoCreateTopicsEnable) {
           createTopic(topic, config.numPartitions, config.defaultReplicationFactor)
         } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/063d534c/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 0f0b291..7946475 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -541,10 +541,7 @@ object KafkaConfig {
   val OffsetMetadataMaxSizeDoc = "The maximum size for a metadata entry associated with an
offset commit"
   val OffsetsLoadBufferSizeDoc = "Batch size for reading from the offsets segments when loading
offsets into the cache."
   val OffsetsTopicReplicationFactorDoc = "The replication factor for the offsets topic (set
higher to ensure availability). " +
-  "To ensure that the effective replication factor of the offsets topic is the configured
value, " +
-  "the number of alive brokers has to be at least the replication factor at the time of the
" +
-  "first request for the offsets topic. If not, either the offsets topic creation will fail
or " +
-  "it will get a replication factor of min(alive brokers, configured replication factor)"
+  "Internal topic creation will fail until the cluster size meets this replication factor
requirement."
   val OffsetsTopicPartitionsDoc = "The number of partitions for the offset commit topic (should
not change after deployment)"
   val OffsetsTopicSegmentBytesDoc = "The offsets topic segment bytes should be kept relatively
small in order to facilitate faster log compaction and cache loads"
   val OffsetsTopicCompressionCodecDoc = "Compression codec for the offsets topic - compression
may be used to achieve \"atomic\" commits"

http://git-wip-us.apache.org/repos/asf/kafka/blob/063d534c/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 743756e..98daec1 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -217,6 +217,7 @@ object TestUtils extends Logging {
     props.put(KafkaConfig.DeleteTopicEnableProp, enableDeleteTopic.toString)
     props.put(KafkaConfig.ControlledShutdownRetryBackoffMsProp, "100")
     props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "2097152")
+    props.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
     rack.foreach(props.put(KafkaConfig.RackProp, _))
 
     if (protocolAndPorts.exists { case (protocol, _) => usesSslTransportLayer(protocol)
})

http://git-wip-us.apache.org/repos/asf/kafka/blob/063d534c/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index ebc61db..da5c983 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -72,6 +72,7 @@ Kafka cluster before upgrading your clients. Version 0.10.2 brokers support
0.8.
         should not be set in the Streams app any more. If the Kafka cluster is secured, Streams
apps must have the required security privileges to create new topics.</li>
     <li>Several new fields including "security.protocol", "connections.max.idle.ms",
"retry.backoff.ms", "reconnect.backoff.ms" and "request.timeout.ms" were added to
         StreamsConfig class. User should pay attenntion to the default values and set these
if needed. For more details please refer to <a id="streamsconfigs" href="#streamsconfigs">3.5
Kafka Streams Configs</a>.</li>
+    <li>The <code>offsets.topic.replication.factor</code> broker config
is now enforced upon auto topic creation. Internal auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE
error until the cluster size meets this replication factor requirement.</li>
 </ul>
 
 <h5><a id="upgrade_1020_new_protocols" href="#upgrade_1020_new_protocols">New
Protocol Versions</a></h5>

http://git-wip-us.apache.org/repos/asf/kafka/blob/063d534c/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index fe7bebc..656959a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -68,6 +68,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
         putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(),
2 * 1024 * 1024L);
         putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0);
+        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(),
(short) 1);
         putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true);
 
         for (int i = 0; i < brokers.length; i++) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/063d534c/tests/kafkatest/services/kafka/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index 7f3a920..c971715 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -63,3 +63,4 @@ replica.lag.time.max.ms={{replica_lag}}
 {% if auto_create_topics_enable is defined and auto_create_topics_enable is not none %}
 auto.create.topics.enable={{ auto_create_topics_enable }}
 {% endif %}
+offsets.topic.replication.factor={{ 3 if num_nodes > 3 else num_nodes }}
\ No newline at end of file


Mime
View raw message