kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/2] kafka git commit: KAFKA-1215; Rack-Aware replica assignment option
Date Tue, 15 Mar 2016 17:03:10 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk deb2b004c -> 951e30adc


http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 7c2577c..8910e09 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -33,20 +33,20 @@ import TestUtils._
 
 import scala.collection.{Map, immutable}
 
-class AdminTest extends ZooKeeperTestHarness with Logging {
+class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
 
   @Test
   def testReplicaAssignment() {
-    val brokerList = List(0, 1, 2, 3, 4)
+    val brokerMetadatas = (0 to 4).map(new BrokerMetadata(_, None))
 
     // test 0 replication factor
     intercept[AdminOperationException] {
-      AdminUtils.assignReplicasToBrokers(brokerList, 10, 0)
+      AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 0)
     }
 
     // test wrong replication factor
     intercept[AdminOperationException] {
-      AdminUtils.assignReplicasToBrokers(brokerList, 10, 6)
+      AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 6)
     }
 
     // correct assignment
@@ -62,9 +62,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
         8 -> List(3, 0, 1),
         9 -> List(4, 1, 2))
 
-    val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0)
-    val e = (expectedAssignment.toList == actualAssignment.toList)
-    assertTrue(expectedAssignment.toList == actualAssignment.toList)
+    val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 3, 0)
+    assertEquals(expectedAssignment, actualAssignment)
   }
 
   @Test
@@ -314,7 +313,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
     val partition = 1
     val preferredReplica = 0
     // create brokers
-    val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps)
+    val brokerRack = Map(0 -> "rack0", 1 -> "rack1", 2 -> "rack2")
+    val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false, rackInfo = brokerRack).map(KafkaConfig.fromProps)
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
     val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
@@ -452,4 +452,35 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
       server.config.logDirs.foreach(CoreUtils.rm(_))
     }
   }
+
+  @Test
+  def testGetBrokerMetadatas() {
+    // broker 4 has no rack information
+    val brokerList = 0 to 5
+    val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1",
5 -> "rack3")
+    val brokerMetadatas = toBrokerMetadata(rackInfo, brokersWithoutRack = brokerList.filterNot(rackInfo.keySet))
+    TestUtils.createBrokersInZk(brokerMetadatas, zkUtils)
+
+    val processedMetadatas1 = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Disabled)
+    assertEquals(brokerList, processedMetadatas1.map(_.id))
+    assertEquals(List.fill(brokerList.size)(None), processedMetadatas1.map(_.rack))
+
+    val processedMetadatas2 = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Safe)
+    assertEquals(brokerList, processedMetadatas2.map(_.id))
+    assertEquals(List.fill(brokerList.size)(None), processedMetadatas2.map(_.rack))
+
+    intercept[AdminOperationException] {
+      AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced)
+    }
+
+    val partialList = List(0, 1, 2, 3, 5)
+    val processedMetadatas3 = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced,
Some(partialList))
+    assertEquals(partialList, processedMetadatas3.map(_.id))
+    assertEquals(partialList.map(rackInfo), processedMetadatas3.flatMap(_.rack))
+
+    val numPartitions = 3
+    AdminUtils.createTopic(zkUtils, "foo", numPartitions, 2, rackAwareMode = RackAwareMode.Safe)
+    val assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo"))
+    assertEquals(numPartitions, assignment.size)
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala b/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala
new file mode 100644
index 0000000..facc745
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import scala.collection.{Map, Seq, mutable}
+import org.junit.Assert._
+
+trait RackAwareTest {
+
+  def checkReplicaDistribution(assignment: Map[Int, Seq[Int]],
+                               brokerRackMapping: Map[Int, String],
+                               numBrokers: Int,
+                               numPartitions: Int,
+                               replicationFactor: Int,
+                               verifyRackAware: Boolean = true,
+                               verifyLeaderDistribution: Boolean = true,
+                               verifyReplicasDistribution: Boolean = true) {
+    // always verify that no broker will be assigned for more than one replica
+    for ((_, brokerList) <- assignment) {
+      assertEquals("More than one replica is assigned to same broker for the same partition",
brokerList.toSet.size, brokerList.size)
+    }
+    val distribution = getReplicaDistribution(assignment, brokerRackMapping)
+
+    if (verifyRackAware) {
+      val partitionRackMap = distribution.partitionRacks
+      assertEquals("More than one replica of the same partition is assigned to the same rack",
+        List.fill(numPartitions)(replicationFactor), partitionRackMap.values.toList.map(_.distinct.size))
+    }
+
+    if (verifyLeaderDistribution) {
+      val leaderCount = distribution.brokerLeaderCount
+      val leaderCountPerBroker = numPartitions / numBrokers
+      assertEquals("Preferred leader count is not even for brokers", List.fill(numBrokers)(leaderCountPerBroker),
leaderCount.values.toList)
+    }
+
+    if (verifyReplicasDistribution) {
+      val replicasCount = distribution.brokerReplicasCount
+      val numReplicasPerBroker = numPartitions * replicationFactor / numBrokers
+      assertEquals("Replica count is not even for broker", List.fill(numBrokers)(numReplicasPerBroker),
replicasCount.values.toList)
+    }
+  }
+
+  def getReplicaDistribution(assignment: Map[Int, Seq[Int]], brokerRackMapping: Map[Int,
String]): ReplicaDistributions = {
+    val leaderCount = mutable.Map[Int, Int]()
+    val partitionCount = mutable.Map[Int, Int]()
+    val partitionRackMap = mutable.Map[Int, List[String]]()
+    assignment.foreach { case (partitionId, replicaList) =>
+      val leader = replicaList.head
+      leaderCount(leader) = leaderCount.getOrElse(leader, 0) + 1
+      for (brokerId <- replicaList) {
+        partitionCount(brokerId) = partitionCount.getOrElse(brokerId, 0) + 1
+        val rack = brokerRackMapping.getOrElse(brokerId, sys.error(s"No mapping found for
$brokerId in `brokerRackMapping`"))
+        partitionRackMap(partitionId) = rack :: partitionRackMap.getOrElse(partitionId, List())
+      }
+    }
+    ReplicaDistributions(partitionRackMap, leaderCount, partitionCount)
+  }
+
+  def toBrokerMetadata(rackMap: Map[Int, String], brokersWithoutRack: Seq[Int] = Seq.empty):
Seq[BrokerMetadata] =
+    rackMap.toSeq.map { case (brokerId, rack) =>
+      BrokerMetadata(brokerId, Some(rack))
+    } ++ brokersWithoutRack.map { brokerId =>
+      BrokerMetadata(brokerId, None)
+    }.sortBy(_.id)
+
+}
+
+case class ReplicaDistributions(partitionRacks: Map[Int, Seq[String]], brokerLeaderCount:
Map[Int, Int], brokerReplicasCount: Map[Int, Int])

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
new file mode 100644
index 0000000..0f71a19
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import kafka.utils.{Logging, TestUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.junit.Test
+
+class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest
{
+
+  @Test
+  def testRackAwareReassign() {
+    val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1",
4 -> "rack3", 5 -> "rack3")
+    TestUtils.createBrokersInZk(toBrokerMetadata(rackInfo), zkUtils)
+
+    val numPartitions = 18
+    val replicationFactor = 3
+
+    // create a non rack aware assignment topic first
+    val createOpts = new kafka.admin.TopicCommand.TopicCommandOptions(Array(
+      "--partitions", numPartitions.toString,
+      "--replication-factor", replicationFactor.toString,
+      "--disable-rack-aware",
+      "--topic", "foo"))
+    kafka.admin.TopicCommand.createTopic(zkUtils, createOpts)
+
+    val topicJson = """{"topics": [{"topic": "foo"}], "version":1}"""
+    val (proposedAssignment, currentAssignment) = ReassignPartitionsCommand.generateAssignment(zkUtils,
+      rackInfo.keys.toSeq.sorted, topicJson, disableRackAware = false)
+
+    val assignment = proposedAssignment map { case (topicPartition, replicas) =>
+      (topicPartition.partition, replicas)
+    }
+    checkReplicaDistribution(assignment, rackInfo, rackInfo.size, numPartitions, replicationFactor)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index d554b02..b42aaf4 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -27,7 +27,7 @@ import kafka.admin.TopicCommand.TopicCommandOptions
 import kafka.utils.ZkUtils._
 import kafka.coordinator.GroupCoordinator
 
-class TopicCommandTest extends ZooKeeperTestHarness with Logging {
+class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
 
   @Test
   def testConfigPreservationAcrossPartitionAlteration() {
@@ -157,4 +157,34 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging {
       Array("--partitions", numPartitions.toString, "--replication-factor", "1", "--topic",
topic, "--if-not-exists"))
     TopicCommand.createTopic(zkUtils, createNotExistsOpts)
   }
+
+  @Test
+  def testCreateAlterTopicWithRackAware() {
+    val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1",
4 -> "rack3", 5 -> "rack3")
+    TestUtils.createBrokersInZk(toBrokerMetadata(rackInfo), zkUtils)
+
+    val numPartitions = 18
+    val replicationFactor = 3
+    val createOpts = new TopicCommandOptions(Array(
+      "--partitions", numPartitions.toString,
+      "--replication-factor", replicationFactor.toString,
+      "--topic", "foo"))
+    TopicCommand.createTopic(zkUtils, createOpts)
+
+    var assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo")).map { case (tp, replicas)
=>
+      tp.partition -> replicas
+    }
+    checkReplicaDistribution(assignment, rackInfo, rackInfo.size, numPartitions, replicationFactor)
+
+    val alteredNumPartitions = 36
+    // verify that adding partitions will also be rack aware
+    val alterOpts = new TopicCommandOptions(Array(
+      "--partitions", alteredNumPartitions.toString,
+      "--topic", "foo"))
+    TopicCommand.alterTopic(zkUtils, alterOpts)
+    assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo")).map { case (tp, replicas)
=>
+      tp.partition -> replicas
+    }
+    checkReplicaDistribution(assignment, rackInfo, rackInfo.size, alteredNumPartitions, replicationFactor)
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
index 905612c..400d6d6 100644
--- a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
@@ -28,20 +28,6 @@ import scala.collection.mutable
 class BrokerEndPointTest extends Logging {
 
   @Test
-  def testSerDe() {
-
-    val endpoint = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)
-    val listEndPoints = Map(SecurityProtocol.PLAINTEXT -> endpoint)
-    val origBroker = new Broker(1, listEndPoints)
-    val brokerBytes = ByteBuffer.allocate(origBroker.sizeInBytes)
-
-    origBroker.writeTo(brokerBytes)
-
-    val newBroker = Broker.readFrom(brokerBytes.flip().asInstanceOf[ByteBuffer])
-    assert(origBroker == newBroker)
-  }
-
-  @Test
   def testHashAndEquals() {
     val endpoint1 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)
     val endpoint2 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 7524e6a..fa240d2 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -530,7 +530,7 @@ class KafkaConfigTest {
         case KafkaConfig.MetricNumSamplesProp => assertPropertyInvalid(getBaseProperties,
name, "not_a_number", "-1", "0")
         case KafkaConfig.MetricSampleWindowMsProp => assertPropertyInvalid(getBaseProperties,
name, "not_a_number", "-1", "0")
         case KafkaConfig.MetricReporterClassesProp => // ignore string
-
+        case KafkaConfig.RackProp => // ignore string
         //SSL Configs
         case KafkaConfig.PrincipalBuilderClassProp =>
         case KafkaConfig.SslProtocolProp => // ignore string

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2523083..49fb85f 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -27,29 +27,27 @@ import java.security.cert.X509Certificate
 import javax.net.ssl.X509TrustManager
 import charset.Charset
 
-import kafka.security.auth.{Resource, Authorizer, Acl}
+import kafka.security.auth.{Acl, Authorizer, Resource}
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.utils.Utils._
 import org.apache.kafka.test.TestSslUtils
 
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
-
 import kafka.server._
 import kafka.producer._
 import kafka.message._
 import kafka.api._
-import kafka.cluster.Broker
-import kafka.consumer.{ConsumerTimeoutException, KafkaStream, ConsumerConfig}
-import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
+import kafka.cluster.{Broker, EndPoint}
+import kafka.consumer.{ConsumerConfig, ConsumerTimeoutException, KafkaStream}
+import kafka.serializer.{DefaultEncoder, Encoder, StringEncoder}
 import kafka.common.TopicAndPartition
 import kafka.admin.AdminUtils
 import kafka.producer.ProducerConfig
 import kafka.log._
 import kafka.utils.ZkUtils._
-
 import org.junit.Assert._
 import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.clients.consumer.{RangeAssignor, KafkaConsumer}
+import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.network.Mode
 
@@ -154,11 +152,12 @@ object TestUtils extends Logging {
     enablePlaintext: Boolean = true,
     enableSsl: Boolean = false,
     enableSaslPlaintext: Boolean = false,
-    enableSaslSsl: Boolean = false): Seq[Properties] = {
+    enableSaslSsl: Boolean = false,
+    rackInfo: Map[Int, String] = Map()): Seq[Properties] = {
     (0 until numConfigs).map { node =>
       createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, RandomPort,
         interBrokerSecurityProtocol, trustStoreFile, enablePlaintext = enablePlaintext, enableSsl
= enableSsl,
-        enableSaslPlaintext = enableSaslPlaintext, enableSaslSsl = enableSaslSsl)
+        enableSaslPlaintext = enableSaslPlaintext, enableSaslSsl = enableSaslSsl, rack =
rackInfo.get(node))
     }
   }
 
@@ -180,7 +179,7 @@ object TestUtils extends Logging {
     enablePlaintext: Boolean = true,
     enableSaslPlaintext: Boolean = false, saslPlaintextPort: Int = RandomPort,
     enableSsl: Boolean = false, sslPort: Int = RandomPort,
-    enableSaslSsl: Boolean = false, saslSslPort: Int = RandomPort)
+    enableSaslSsl: Boolean = false, saslSslPort: Int = RandomPort, rack: Option[String] =
None)
   : Properties = {
 
     def shouldEnable(protocol: SecurityProtocol) = interBrokerSecurityProtocol.fold(false)(_
== protocol)
@@ -210,6 +209,7 @@ object TestUtils extends Logging {
     props.put("delete.topic.enable", enableDeleteTopic.toString)
     props.put("controlled.shutdown.retry.backoff.ms", "100")
     props.put("log.cleaner.dedupe.buffer.size", "2097152")
+    rack.foreach(props.put("broker.rack", _))
 
     if (protocolAndPorts.exists { case (protocol, _) => usesSslTransportLayer(protocol)
})
       props.putAll(sslConfigs(Mode.SERVER, false, trustStoreFile, s"server$nodeId"))
@@ -591,9 +591,16 @@ object TestUtils extends Logging {
     }
   }
 
-  def createBrokersInZk(zkUtils: ZkUtils, ids: Seq[Int]): Seq[Broker] = {
-    val brokers = ids.map(id => new Broker(id, "localhost", 6667, SecurityProtocol.PLAINTEXT))
-    brokers.foreach(b => zkUtils.registerBrokerInZk(b.id, "localhost", 6667, b.endPoints,
jmxPort = -1))
+  def createBrokersInZk(zkUtils: ZkUtils, ids: Seq[Int]): Seq[Broker] =
+    createBrokersInZk(ids.map(kafka.admin.BrokerMetadata(_, None)), zkUtils)
+
+  def createBrokersInZk(brokerMetadatas: Seq[kafka.admin.BrokerMetadata], zkUtils: ZkUtils):
Seq[Broker] = {
+    val brokers = brokerMetadatas.map { b =>
+      val protocol = SecurityProtocol.PLAINTEXT
+      Broker(b.id, Map(protocol -> EndPoint("localhost", 6667, protocol)).toMap, b.rack)
+    }
+    brokers.foreach(b => zkUtils.registerBrokerInZk(b.id, "localhost", 6667, b.endPoints,
jmxPort = -1,
+      rack = b.rack, ApiVersion.latestVersion))
     brokers
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/951e30ad/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 15ea3ae..ba3d024 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -21,6 +21,11 @@
 0.10.0.0 has <a href="#upgrade_10_breaking">potential breaking changes</a> (please
review before upgrading) and
 there may be a <a href="#upgrade_10_performance_impact">performance impact during the
upgrade</a>. Because new protocols
 are introduced, it is important to upgrade your Kafka clusters before upgrading your clients.
+<p/>
+<b>Notes to clients with version 0.9.0.0: </b>Due to a bug introduced in 0.9.0.0,
+clients that depend on ZooKeeper (old Scala high-level Consumer and MirrorMaker if used with
the old consumer) will not
+work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded to 0.9.0.1 <b>before</b>
brokers are upgraded to
+0.10.0.x. This step is not necessary for 0.8.X or 0.9.0.1 clients.
 
 <p><b>For a rolling upgrade:</b></p>
 


Mime
View raw message