kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-1864; Revisit defaults for the internal offsets topic; patched by Jun Rao; reviewed by Jeol Koshy, Neha Narkhede, and Gwen Shapira
Date Sat, 17 Jan 2015 02:56:37 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1769642bb -> 5174df537


kafka-1864; Revisit defaults for the internal offsets topic; patched by Jun Rao; reviewed
by Jeol Koshy, Neha Narkhede, and Gwen Shapira


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5174df53
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5174df53
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5174df53

Branch: refs/heads/trunk
Commit: 5174df53778cb5cb2d6d86e4cec9f3185a2c85db
Parents: 1769642
Author: Jun Rao <junrao@gmail.com>
Authored: Fri Jan 16 18:56:32 2015 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Jan 16 18:56:32 2015 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/KafkaApis.scala         | 11 +++++++++--
 core/src/main/scala/kafka/server/KafkaConfig.scala       |  6 +++++-
 core/src/main/scala/kafka/server/OffsetManager.scala     |  4 ++--
 .../kafka/api/ProducerFailureHandlingTest.scala          |  7 +++++++
 .../test/scala/unit/kafka/server/OffsetCommitTest.scala  |  7 +++++++
 5 files changed, 30 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5174df53/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c011a1b..ec8d9f7 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -351,10 +351,17 @@ class KafkaApis(val requestChannel: RequestChannel,
         if (topic == OffsetManager.OffsetsTopicName || config.autoCreateTopicsEnable) {
           try {
             if (topic == OffsetManager.OffsetsTopicName) {
-              AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor,
+              val aliveBrokers = metadataCache.getAliveBrokers
+              val offsetsTopicReplicationFactor =
+                if (aliveBrokers.length > 0)
+                  Math.min(config.offsetsTopicReplicationFactor, aliveBrokers.length)
+                else
+                  config.offsetsTopicReplicationFactor
+              AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions,
+                                     offsetsTopicReplicationFactor,
                                      offsetManager.offsetsTopicConfig)
               info("Auto creation of topic %s with %d partitions and replication factor %d
is successful!"
-                .format(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor))
+                .format(topic, config.offsetsTopicPartitions, offsetsTopicReplicationFactor))
             }
             else {
               AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5174df53/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index d3d8ac4..88689df 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -312,7 +312,11 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   val offsetsLoadBufferSize = props.getIntInRange("offsets.load.buffer.size",
     OffsetManagerConfig.DefaultLoadBufferSize, (1, Integer.MAX_VALUE))
 
-  /** The replication factor for the offset commit topic (set higher to ensure availability).
*/
+  /** The replication factor for the offsets topic (set higher to ensure availability). To
+    * ensure that the effective replication factor of the offsets topic is the configured
value,
+    * the number of alive brokers has to be at least the replication factor at the time of
the
+    * first request for the offsets topic. If not, either the offsets topic creation will
fail or
+    * it will get a replication factor of min(alive brokers, configured replication factor)
*/
   val offsetsTopicReplicationFactor: Short = props.getShortInRange("offsets.topic.replication.factor",
     OffsetManagerConfig.DefaultOffsetsTopicReplicationFactor, (1, Short.MaxValue))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5174df53/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index 3c79428..0bdd42f 100644
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -75,9 +75,9 @@ object OffsetManagerConfig {
   val DefaultMaxMetadataSize = 4096
   val DefaultLoadBufferSize = 5*1024*1024
   val DefaultOffsetsRetentionCheckIntervalMs = 600000L
-  val DefaultOffsetsTopicNumPartitions = 1
+  val DefaultOffsetsTopicNumPartitions = 50
   val DefaultOffsetsTopicSegmentBytes = 100*1024*1024
-  val DefaultOffsetsTopicReplicationFactor = 1.toShort
+  val DefaultOffsetsTopicReplicationFactor = 3.toShort
   val DefaultOffsetsTopicCompressionCodec = NoCompressionCodec
   val DefaultOffsetCommitTimeoutMs = 5000
   val DefaultOffsetCommitRequiredAcks = (-1).toShort

http://git-wip-us.apache.org/repos/asf/kafka/blob/5174df53/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 5ec613c..420a1dd 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -46,6 +46,13 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
       override val zkConnect = TestZKUtils.zookeeperConnect
       override val autoCreateTopicsEnable = false
       override val messageMaxBytes = serverMessageMaxBytes
+      // TODO: Currently, when there is no topic in a cluster, the controller doesn't send
any UpdateMetadataRequest to
+      // the broker. As a result, the live broker list in metadataCache is empty. If the
number of live brokers is 0, we
+      // try to create the offset topic with the default offsets.topic.replication.factor
of 3. The creation will fail
+      // since there is not enough live brokers. This causes testCannotSendToInternalTopic()
to fail. Temporarily fixing
+      // the issue by overriding offsets.topic.replication.factor to 1 for now. When we fix
KAFKA-1867, we need to
+      // remove the following config override.
+      override val offsetsTopicReplicationFactor = 1.asInstanceOf[Short]
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5174df53/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 4a3a5b2..5b93239 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -46,6 +46,13 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
   override def setUp() {
     super.setUp()
     val config: Properties = createBrokerConfig(1, brokerPort)
+    // TODO: Currently, when there is no topic in a cluster, the controller doesn't send
any UpdateMetadataRequest to
+    // the broker. As a result, the live broker list in metadataCache is empty. This causes
the ConsumerMetadataRequest
+    // to fail since if the number of live brokers is 0, we try to create the offset topic
with the default
+    // offsets.topic.replication.factor of 3. The creation will fail since there is not enough
live brokers. In order
+    // for the unit test to pass, overriding offsets.topic.replication.factor to 1 for now.
When we fix KAFKA-1867, we
+    // need to remove the following config override.
+    config.put("offsets.topic.replication.factor", "1")
     val logDirPath = config.getProperty("log.dir")
     logDir = new File(logDirPath)
     time = new MockTime()


Mime
View raw message