kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-1667; topic-level configuration not validated; patched by Dmytro Kostiuchenko; reviewed by Jun Rao
Date Tue, 25 Nov 2014 22:36:52 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9f8b8dad2 -> 834b64198


kafka-1667; topic-level configuration not validated; patched by Dmytro Kostiuchenko; reviewed
by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/834b6419
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/834b6419
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/834b6419

Branch: refs/heads/trunk
Commit: 834b6419806750ff555e8ffd104caa420004e84e
Parents: 9f8b8da
Author: Dmytro Kostiuchenko <edio@archlinux.us>
Authored: Tue Nov 25 14:36:31 2014 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue Nov 25 14:36:31 2014 -0800

----------------------------------------------------------------------
 .../apache/kafka/common/config/ConfigDef.java   |   9 ++
 .../main/scala/kafka/admin/TopicCommand.scala   |   2 +-
 core/src/main/scala/kafka/log/LogConfig.scala   | 144 +++++++++++--------
 core/src/main/scala/kafka/utils/Utils.scala     |  26 ++++
 .../test/scala/kafka/log/LogConfigTest.scala    |  93 ++++++++++++
 .../integration/UncleanLeaderElectionTest.scala |   4 +-
 6 files changed, 218 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/834b6419/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index c4cea2c..347e252 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -19,6 +19,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * This class is used for specifying the set of expected configurations, their type, their
defaults, their
@@ -49,6 +50,14 @@ public class ConfigDef {
     private final Map<String, ConfigKey> configKeys = new HashMap<String, ConfigKey>();
 
     /**
+     * Returns unmodifiable set of properties names defined in this {@linkplain ConfigDef}
+     * @return new unmodifiable {@link Set} instance containing the keys
+     */
+    public Set<String> names() {
+        return Collections.unmodifiableSet(configKeys.keySet());
+    }
+
+    /**
      * Define a new configuration
      * @param name The name of the config parameter
      * @param type The type of the config

http://git-wip-us.apache.org/repos/asf/kafka/blob/834b6419/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 0b2735e..285c033 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -256,7 +256,7 @@ object TopicCommand {
                          .ofType(classOf[String])
     val nl = System.getProperty("line.separator")
     val configOpt = parser.accepts("config", "A topic configuration override for the topic
being created or altered."  + 
-                                                         "The following is a list of valid
configurations: " + nl + LogConfig.ConfigNames.map("\t" + _).mkString(nl) + nl +  
+                                                         "The following is a list of valid
configurations: " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl +
                                                          "See the Kafka documentation for
full details on the topic configs.")
                           .withRequiredArg
                           .describedAs("name=value")

http://git-wip-us.apache.org/repos/asf/kafka/blob/834b6419/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index e48922a..f2fbc55 100644
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -21,7 +21,7 @@ import java.util.Properties
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection._
-import kafka.common._
+import org.apache.kafka.common.config.ConfigDef
 
 object Defaults {
   val SegmentSize = 1024 * 1024
@@ -44,21 +44,20 @@ object Defaults {
 
 /**
  * Configuration settings for a log
- * @param segmentSize The soft maximum for the size of a segment file in the log
+ * @param segmentSize The hard maximum for the size of a segment file in the log
  * @param segmentMs The soft maximum on the amount of time before a new log segment is rolled
  * @param segmentJitterMs The maximum random jitter subtracted from segmentMs to avoid thundering
herds of segment rolling
  * @param flushInterval The number of messages that can be written to the log before a flush
is forced
  * @param flushMs The amount of time the log can have dirty data before a flush is forced
  * @param retentionSize The approximate total number of bytes this log can use
- * @param retentionMs The age approximate maximum age of the last segment that is retained
+ * @param retentionMs The approximate maximum age of the last segment that is retained
  * @param maxIndexSize The maximum size of an index file
  * @param indexInterval The approximate number of bytes between index entries
  * @param fileDeleteDelayMs The time to wait before deleting a file from the filesystem
  * @param deleteRetentionMs The time to retain delete markers in the log. Only applicable
for logs that are being compacted.
  * @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes
already cleaned
  * @param compact Should old segments in this log be deleted or deduplicated?
- * @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled;
actually a controller-level property
- *                                    but included here for topic-specific configuration
validation purposes
+ * @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled
  * @param minInSyncReplicas If number of insync replicas drops below this number, we stop
accepting writes with -1 (or all) required acks
  *
  */
@@ -106,6 +105,10 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
 }
 
 object LogConfig {
+
+  val Delete = "delete"
+  val Compact = "compact"
+
   val SegmentBytesProp = "segment.bytes"
   val SegmentMsProp = "segment.ms"
   val SegmentJitterMsProp = "segment.jitter.ms"
@@ -123,46 +126,84 @@ object LogConfig {
   val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable"
   val MinInSyncReplicasProp = "min.insync.replicas"
 
-  val ConfigNames = Set(SegmentBytesProp,
-                        SegmentMsProp,
-                        SegmentJitterMsProp,
-                        SegmentIndexBytesProp,
-                        FlushMessagesProp,
-                        FlushMsProp,
-                        RetentionBytesProp,
-                        RententionMsProp,
-                        MaxMessageBytesProp,
-                        IndexIntervalBytesProp,
-                        FileDeleteDelayMsProp,
-                        DeleteRetentionMsProp,
-                        MinCleanableDirtyRatioProp,
-                        CleanupPolicyProp,
-                        UncleanLeaderElectionEnableProp,
-                        MinInSyncReplicasProp)
+  val SegmentSizeDoc = "The hard maximum for the size of a segment file in the log"
+  val SegmentMsDoc = "The soft maximum on the amount of time before a new log segment is
rolled"
+  val SegmentJitterMsDoc = "The maximum random jitter subtracted from segmentMs to avoid
thundering herds of segment" +
+    " rolling"
+  val FlushIntervalDoc = "The number of messages that can be written to the log before a
flush is forced"
+  val FlushMsDoc = "The amount of time the log can have dirty data before a flush is forced"
+  val RetentionSizeDoc = "The approximate total number of bytes this log can use"
+  val RetentionMsDoc = "The approximate maximum age of the last segment that is retained"
+  val MaxIndexSizeDoc = "The maximum size of an index file"
+  val MaxMessageSizeDoc = "The maximum size of a message"
+  val IndexIntervalDoc = "The approximate number of bytes between index entries"
+  val FileDeleteDelayMsDoc = "The time to wait before deleting a file from the filesystem"
+  val DeleteRetentionMsDoc = "The time to retain delete markers in the log. Only applicable
for logs that are being" +
+    " compacted."
+  val MinCleanableRatioDoc = "The ratio of bytes that are available for cleaning to the bytes
already cleaned"
+  val CompactDoc = "Should old segments in this log be deleted or deduplicated?"
+  val UncleanLeaderElectionEnableDoc = "Indicates whether unclean leader election is enabled"
+  val MinInSyncReplicasDoc = "If number of insync replicas drops below this number, we stop
accepting writes with" +
+    " -1 (or all) required acks"
+
+  private val configDef = {
+    import ConfigDef.Range._
+    import ConfigDef.ValidString._
+    import ConfigDef.Type._
+    import ConfigDef.Importance._
+    import java.util.Arrays.asList
+
+    new ConfigDef()
+      .define(SegmentBytesProp, INT, Defaults.SegmentSize, atLeast(0), MEDIUM, SegmentSizeDoc)
+      .define(SegmentMsProp, LONG, Defaults.SegmentMs, atLeast(0), MEDIUM, SegmentMsDoc)
+      .define(SegmentJitterMsProp, LONG, Defaults.SegmentJitterMs, atLeast(0), MEDIUM, SegmentJitterMsDoc)
+      .define(SegmentIndexBytesProp, INT, Defaults.MaxIndexSize, atLeast(0), MEDIUM, MaxIndexSizeDoc)
+      .define(FlushMessagesProp, LONG, Defaults.FlushInterval, atLeast(0), MEDIUM, FlushIntervalDoc)
+      .define(FlushMsProp, LONG, Defaults.FlushMs, atLeast(0), MEDIUM, FlushMsDoc)
+      // can be negative. See kafka.log.LogManager.cleanupSegmentsToMaintainSize
+      .define(RetentionBytesProp, LONG, Defaults.RetentionSize, MEDIUM, RetentionSizeDoc)
+      .define(RententionMsProp, LONG, Defaults.RetentionMs, atLeast(0), MEDIUM, RetentionMsDoc)
+      .define(MaxMessageBytesProp, INT, Defaults.MaxMessageSize, atLeast(0), MEDIUM, MaxMessageSizeDoc)
+      .define(IndexIntervalBytesProp, INT, Defaults.IndexInterval, atLeast(0), MEDIUM,  IndexIntervalDoc)
+      .define(DeleteRetentionMsProp, LONG, Defaults.DeleteRetentionMs, atLeast(0), MEDIUM,
DeleteRetentionMsDoc)
+      .define(FileDeleteDelayMsProp, LONG, Defaults.FileDeleteDelayMs, atLeast(0), MEDIUM,
FileDeleteDelayMsDoc)
+      .define(MinCleanableDirtyRatioProp, DOUBLE, Defaults.MinCleanableDirtyRatio, between(0,
1), MEDIUM,
+        MinCleanableRatioDoc)
+      .define(CleanupPolicyProp, STRING, if (Defaults.Compact) Compact else Delete, in(asList(Compact,
Delete)), MEDIUM,
+        CompactDoc)
+      // we validate true/false explicitly to fail in case of typo
+      .define(UncleanLeaderElectionEnableProp, STRING, Defaults.UncleanLeaderElectionEnable.toString,
+        in(asList(true.toString, false.toString)), MEDIUM, UncleanLeaderElectionEnableDoc)
+      .define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), MEDIUM,
MinInSyncReplicasDoc)
+  }
+
+  def configNames() = {
+    import JavaConversions._
+    configDef.names().toList.sorted
+  }
 
   /**
    * Parse the given properties instance into a LogConfig object
    */
   def fromProps(props: Properties): LogConfig = {
-    new LogConfig(segmentSize = props.getProperty(SegmentBytesProp, Defaults.SegmentSize.toString).toInt,
-                  segmentMs = props.getProperty(SegmentMsProp, Defaults.SegmentMs.toString).toLong,
-                  segmentJitterMs = props.getProperty(SegmentJitterMsProp, Defaults.SegmentJitterMs.toString).toLong,
-                  maxIndexSize = props.getProperty(SegmentIndexBytesProp, Defaults.MaxIndexSize.toString).toInt,
-                  flushInterval = props.getProperty(FlushMessagesProp, Defaults.FlushInterval.toString).toLong,
-                  flushMs = props.getProperty(FlushMsProp, Defaults.FlushMs.toString).toLong,
-                  retentionSize = props.getProperty(RetentionBytesProp, Defaults.RetentionSize.toString).toLong,
-                  retentionMs = props.getProperty(RententionMsProp, Defaults.RetentionMs.toString).toLong,
-                  maxMessageSize = props.getProperty(MaxMessageBytesProp, Defaults.MaxMessageSize.toString).toInt,
-                  indexInterval = props.getProperty(IndexIntervalBytesProp, Defaults.IndexInterval.toString).toInt,
-                  fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp, Defaults.FileDeleteDelayMs.toString).toInt,
-                  deleteRetentionMs = props.getProperty(DeleteRetentionMsProp, Defaults.DeleteRetentionMs.toString).toLong,
-                  minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp,
-                    Defaults.MinCleanableDirtyRatio.toString).toDouble,
-                  compact = props.getProperty(CleanupPolicyProp, if(Defaults.Compact) "compact"
else "delete")
-                    .trim.toLowerCase != "delete",
-                  uncleanLeaderElectionEnable = props.getProperty(UncleanLeaderElectionEnableProp,
-                    Defaults.UncleanLeaderElectionEnable.toString).toBoolean,
-                  minInSyncReplicas = props.getProperty(MinInSyncReplicasProp,Defaults.MinInSyncReplicas.toString).toInt)
+    import kafka.utils.Utils.evaluateDefaults
+    val parsed = configDef.parse(evaluateDefaults(props))
+    new LogConfig(segmentSize = parsed.get(SegmentBytesProp).asInstanceOf[Int],
+                  segmentMs = parsed.get(SegmentMsProp).asInstanceOf[Long],
+                  segmentJitterMs = parsed.get(SegmentJitterMsProp).asInstanceOf[Long],
+                  maxIndexSize = parsed.get(SegmentIndexBytesProp).asInstanceOf[Int],
+                  flushInterval = parsed.get(FlushMessagesProp).asInstanceOf[Long],
+                  flushMs = parsed.get(FlushMsProp).asInstanceOf[Long],
+                  retentionSize = parsed.get(RetentionBytesProp).asInstanceOf[Long],
+                  retentionMs = parsed.get(RententionMsProp).asInstanceOf[Long],
+                  maxMessageSize = parsed.get(MaxMessageBytesProp).asInstanceOf[Int],
+                  indexInterval = parsed.get(IndexIntervalBytesProp).asInstanceOf[Int],
+                  fileDeleteDelayMs = parsed.get(FileDeleteDelayMsProp).asInstanceOf[Long],
+                  deleteRetentionMs = parsed.get(DeleteRetentionMsProp).asInstanceOf[Long],
+                  minCleanableRatio = parsed.get(MinCleanableDirtyRatioProp).asInstanceOf[Double],
+                  compact = parsed.get(CleanupPolicyProp).asInstanceOf[String].toLowerCase
!= Delete,
+                  uncleanLeaderElectionEnable = parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[String].toBoolean,
+                  minInSyncReplicas = parsed.get(MinInSyncReplicasProp).asInstanceOf[Int])
   }
 
   /**
@@ -179,30 +220,17 @@ object LogConfig {
    */
   def validateNames(props: Properties) {
     import JavaConversions._
+    val names = configDef.names()
     for(name <- props.keys)
-      require(LogConfig.ConfigNames.contains(name), "Unknown configuration \"%s\".".format(name))
+      require(names.contains(name), "Unknown configuration \"%s\".".format(name))
   }
 
   /**
-   * Check that the given properties contain only valid log config names, and that all values
can be parsed.
+   * Check that the given properties contain only valid log config names and that all values
can be parsed and are valid
    */
   def validate(props: Properties) {
     validateNames(props)
-    validateMinInSyncReplicas(props)
-    LogConfig.fromProps(LogConfig().toProps, props) // check that we can parse the values
-  }
-
-  /**
-   * Check that MinInSyncReplicas is reasonable
-   * Unfortunately, we can't validate its smaller than number of replicas
-   * since we don't have this information here
-   */
-  private def validateMinInSyncReplicas(props: Properties) {
-    val minIsr = props.getProperty(MinInSyncReplicasProp)
-    if (minIsr != null && minIsr.toInt < 1) {
-      throw new InvalidConfigException("Wrong value " + minIsr + " of min.insync.replicas
in topic configuration; " +
-        " Valid values are at least 1")
-    }
+    configDef.parse(props)
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/834b6419/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index 23aefb4..58685cc 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -505,6 +505,32 @@ object Utils extends Logging {
     props.store(writer, "")
     writer.toString
   }
+
+  /**
+   * Turn {@linkplain java.util.Properties} with default values into a {@linkplain java.util.Map}.
Following example
+   * illustrates difference from the cast
+   * <pre>
+   * val defaults = new Properties()
+   * defaults.put("foo", "bar")
+   * val props = new Properties(defaults)
+   *
+   * props.getProperty("foo") // "bar"
+   * props.get("foo") // null
+   * evaluateDefaults(props).get("foo") // "bar"
+   * </pre>
+   *
+   * @param props properties to evaluate
+   * @return new java.util.Map instance
+   */
+  def evaluateDefaults(props: Properties): java.util.Map[String, String] = {
+    import java.util._
+    import JavaConversions.asScalaSet
+    val evaluated = new HashMap[String, String]()
+    for (name <- props.stringPropertyNames()) {
+      evaluated.put(name, props.getProperty(name))
+    }
+    evaluated
+  }
   
   /**
    * Read some properties with the given default values

http://git-wip-us.apache.org/repos/asf/kafka/blob/834b6419/core/src/test/scala/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/log/LogConfigTest.scala b/core/src/test/scala/kafka/log/LogConfigTest.scala
new file mode 100644
index 0000000..99b0df7
--- /dev/null
+++ b/core/src/test/scala/kafka/log/LogConfigTest.scala
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import org.apache.kafka.common.config.ConfigException
+import org.scalatest.junit.JUnit3Suite
+import org.junit.{Assert, Test}
+import java.util.Properties
+
+class LogConfigTest extends JUnit3Suite {
+
+  @Test
+  def testFromPropsDefaults() {
+    val defaults = new Properties()
+    defaults.put(LogConfig.SegmentBytesProp, "4242")
+    val props = new Properties(defaults)
+
+    val config = LogConfig.fromProps(props)
+
+    Assert.assertEquals(4242, config.segmentSize)
+    Assert.assertEquals("LogConfig defaults should be retained", Defaults.MaxMessageSize,
config.maxMessageSize)
+  }
+
+  @Test
+  def testFromPropsEmpty() {
+    val p = new Properties()
+    val config = LogConfig.fromProps(p)
+    Assert.assertEquals(LogConfig(), config)
+  }
+
+  @Test
+  def testFromPropsToProps() {
+    import scala.util.Random._
+    val expected = new Properties()
+    LogConfig.configNames().foreach((name) => {
+      name match {
+        case LogConfig.UncleanLeaderElectionEnableProp => expected.setProperty(name, randFrom("true",
"false"))
+        case LogConfig.CleanupPolicyProp => expected.setProperty(name, randFrom(LogConfig.Compact,
LogConfig.Delete))
+        case LogConfig.MinCleanableDirtyRatioProp => expected.setProperty(name, "%.1f".format(nextDouble
* .9 + .1))
+        case LogConfig.MinInSyncReplicasProp => expected.setProperty(name, (nextInt(Int.MaxValue
- 1) + 1).toString)
+        case LogConfig.RetentionBytesProp => expected.setProperty(name, nextInt().toString)
+        case positiveIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString)
+      }
+    })
+
+    val actual = LogConfig.fromProps(expected).toProps
+    Assert.assertEquals(expected, actual)
+  }
+
+  @Test
+  def testFromPropsInvalid() {
+    LogConfig.configNames().foreach((name) => {
+      name match {
+        case LogConfig.UncleanLeaderElectionEnableProp  => return
+        case LogConfig.RetentionBytesProp => assertPropertyInvalid(name, "not_a_number")
+        case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar");
+        case LogConfig.MinCleanableDirtyRatioProp => assertPropertyInvalid(name, "not_a_number",
"-0.1", "1.2")
+        case LogConfig.MinInSyncReplicasProp => assertPropertyInvalid(name, "not_a_number",
"0", "-1")
+        case positiveIntProperty => assertPropertyInvalid(name, "not_a_number", "-1")
+      }
+    })
+   }
+
+  private def assertPropertyInvalid(name: String, values: AnyRef*) {
+    values.foreach((value) => {
+      val props = new Properties
+      props.setProperty(name, value.toString)
+      intercept[ConfigException] {
+        LogConfig.fromProps(props)
+      }
+    })
+  }
+
+  private def randFrom[T](choices: T*): T = {
+    import scala.util.Random
+    choices(Random.nextInt(choices.size))
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/834b6419/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index f44568c..ba3bcdc 100644
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -17,6 +17,8 @@
 
 package kafka.integration
 
+import org.apache.kafka.common.config.ConfigException
+
 import scala.collection.mutable.MutableList
 import scala.util.Random
 import org.apache.log4j.{Level, Logger}
@@ -155,7 +157,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness
{
     val topicProps = new Properties()
     topicProps.put("unclean.leader.election.enable", "invalid")
 
-    intercept[IllegalArgumentException] {
+    intercept[ConfigException] {
       AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId
-> Seq(brokerId1)), topicProps)
     }
   }


Mime
View raw message