kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [10/11] git commit: Use uniform convention for naming properties keys; kafka-648; patched by Sriram Subramanian; reviewed by Jun Rao
Date Sat, 12 Jan 2013 02:26:37 GMT
Use uniform convention for naming properties keys; kafka-648; patched by Sriram Subramanian; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a4095319
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a4095319
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a4095319

Branch: refs/heads/trunk
Commit: a40953196e1ef558eb61b78219a20c20a4bd63df
Parents: dbe87f6
Author: Jun Rao <junrao@gmail.com>
Authored: Fri Jan 11 16:12:57 2013 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri Jan 11 16:12:57 2013 -0800

----------------------------------------------------------------------
 config/consumer.properties                         |    2 +-
 config/producer.properties                         |   25 +----
 config/server.properties                           |   34 +++----
 .../main/java/kafka/etl/impl/DataGenerator.java    |    2 +-
 .../kafka/bridge/hadoop/KafkaOutputFormat.java     |    3 +-
 core/src/main/scala/kafka/client/ClientUtils.scala |    2 +-
 core/src/main/scala/kafka/cluster/Partition.scala  |   10 +-
 .../scala/kafka/consumer/ConsoleConsumer.scala     |   16 ++--
 .../main/scala/kafka/consumer/ConsumerConfig.scala |   32 +++---
 .../kafka/consumer/ConsumerFetcherThread.scala     |    8 +-
 .../consumer/ZookeeperConsumerConnector.scala      |   20 ++--
 core/src/main/scala/kafka/log/LogManager.scala     |   34 ++++----
 .../scala/kafka/producer/ConsoleProducer.scala     |   10 +-
 .../scala/kafka/producer/KafkaLog4jAppender.scala  |    4 +-
 core/src/main/scala/kafka/producer/Producer.scala  |   12 +-
 .../main/scala/kafka/producer/ProducerConfig.scala |   25 +++---
 .../main/scala/kafka/producer/SyncProducer.scala   |    2 +-
 .../scala/kafka/producer/SyncProducerConfig.scala  |   10 +-
 .../kafka/producer/async/AsyncProducerConfig.scala |    8 +-
 .../kafka/producer/async/DefaultEventHandler.scala |    9 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |    6 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |   72 ++++++++-------
 core/src/main/scala/kafka/server/KafkaServer.scala |    4 +-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |    8 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |    6 +-
 .../main/scala/kafka/tools/KafkaMigrationTool.java |    4 +-
 .../main/scala/kafka/tools/ReplayLogProducer.scala |   14 ++--
 core/src/main/scala/kafka/utils/ZkUtils.scala      |    6 +-
 .../scala/other/kafka/TestEndToEndLatency.scala    |    4 +-
 .../scala/other/kafka/TestLogPerformance.scala     |    2 +-
 .../scala/other/kafka/TestZKConsumerOffsets.scala  |    2 +-
 .../kafka/integration/AutoOffsetResetTest.scala    |    4 +-
 .../integration/ProducerConsumerTestHarness.scala  |    8 +-
 .../test/scala/unit/kafka/log/LogManagerTest.scala |   22 ++--
 .../test/scala/unit/kafka/log/LogOffsetTest.scala  |    6 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala   |   36 ++++----
 .../unit/kafka/producer/AsyncProducerTest.scala    |    8 +-
 .../scala/unit/kafka/producer/ProducerTest.scala   |   16 ++--
 .../unit/kafka/producer/SyncProducerTest.scala     |   17 ++--
 .../unit/kafka/server/ISRExpirationTest.scala      |   10 +-
 .../scala/unit/kafka/server/LogRecoveryTest.scala  |    8 +-
 .../scala/unit/kafka/server/SimpleFetchTest.scala  |    4 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   24 +++---
 .../src/main/java/kafka/examples/Consumer.java     |    8 +-
 .../scala/kafka/perf/ConsumerPerformance.scala     |   12 +-
 .../scala/kafka/perf/ProducerPerformance.scala     |   16 ++--
 .../config/mirror_producer.properties              |    4 +-
 .../config/mirror_producer1.properties             |    4 +-
 .../config/mirror_producer2.properties             |    4 +-
 .../config/mirror_producer3.properties             |    4 +-
 .../config/server_source1.properties               |   20 ++--
 .../config/server_source2.properties               |   20 ++--
 .../config/server_source3.properties               |   20 ++--
 .../config/server_source4.properties               |   20 ++--
 .../config/server_target1.properties               |   20 ++--
 .../config/server_target2.properties               |   20 ++--
 .../config/server_target3.properties               |   20 ++--
 .../config/whitelisttest.consumer.properties       |    6 +-
 system_test/common/util.sh                         |    8 +-
 .../config/migration_producer.properties           |   29 +-----
 .../config/server.properties                       |   36 ++++----
 .../config/blacklisttest.consumer.properties       |    6 +-
 .../mirror_maker/config/mirror_producer.properties |    4 +-
 .../config/server_source_1_1.properties            |   20 ++--
 .../config/server_source_1_2.properties            |   20 ++--
 .../config/server_source_2_1.properties            |   20 ++--
 .../config/server_source_2_2.properties            |   20 ++--
 .../config/server_target_1_1.properties            |   20 ++--
 .../config/server_target_1_2.properties            |   20 ++--
 .../config/whitelisttest_1.consumer.properties     |    6 +-
 .../config/whitelisttest_2.consumer.properties     |    6 +-
 .../config/mirror_consumer.properties              |   20 ++--
 .../config/mirror_producer.properties              |    2 +-
 .../config/server.properties                       |   56 ++++++------
 system_test/producer_perf/config/server.properties |   20 ++--
 .../replication_testsuite/config/server.properties |   56 ++++++------
 76 files changed, 541 insertions(+), 585 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/config/consumer.properties
----------------------------------------------------------------------
diff --git a/config/consumer.properties b/config/consumer.properties
index a067ac0..1c43bf9 100644
--- a/config/consumer.properties
+++ b/config/consumer.properties
@@ -23,7 +23,7 @@ zk.connect=127.0.0.1:2181
 zk.connectiontimeout.ms=1000000
 
 #consumer group id
-groupid=test-consumer-group
+group.id=test-consumer-group
 
 #consumer timeout
 #consumer.timeout.ms=5000

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/config/producer.properties
----------------------------------------------------------------------
diff --git a/config/producer.properties b/config/producer.properties
index eb36691..a1c8cb2 100644
--- a/config/producer.properties
+++ b/config/producer.properties
@@ -36,35 +36,18 @@ serializer.class=kafka.serializer.StringEncoder
 # allow topic level compression
 #compressed.topics=
 
-# max message size; messages larger than that size are discarded; default is 1000000
-#max.message.size=
-
-
 ############################# Async Producer #############################
 # maximum time, in milliseconds, for buffering data on the producer queue 
-#queue.time=
+#queue.buffering.max.ms=
 
 # the maximum size of the blocking queue for buffering on the producer 
-#queue.size=
+#queue.buffering.max.messages=
 
 # Timeout for event enqueue:
 # 0: events will be enqueued immediately or dropped if the queue is full
 # -ve: enqueue will block indefinitely if the queue is full
 # +ve: enqueue will block up to this many milliseconds if the queue is full
-#queue.enqueueTimeout.ms=
+#queue.enqueue.timeout.ms=
 
 # the number of messages batched at the producer 
-#batch.size=
-
-# the callback handler for one or multiple events 
-#callback.handler=
-
-# properties required to initialize the callback handler 
-#callback.handler.props=
-
-# the handler for events 
-#event.handler=
-
-# properties required to initialize the event handler 
-#event.handler.props=
-
+#batch.num.messages=

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index f4521fb..9a9cd06 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -17,7 +17,7 @@
 ############################# Server Basics #############################
 
 # The id of the broker. This must be set to a unique integer for each broker.
-brokerid=0
+broker.id=0
 
 ############################# Socket Server Settings #############################
 
@@ -27,22 +27,22 @@ port=9092
 # Hostname the broker will bind to and advertise to producers and consumers.
 # If not set, the server will bind to all interfaces and advertise the value returned from
 # from java.net.InetAddress.getCanonicalHostName().
-#hostname=localhost
+#host.name=localhost
 
 # The number of threads handling network requests
-network.threads=2
+num.network.threads=2
  
 # The number of threads doing disk I/O
-io.threads=2
+num.io.threads=2
 
 # The send buffer (SO_SNDBUF) used by the socket server
-socket.send.buffer=1048576
+socket.send.buffer.bytes=1048576
 
 # The receive buffer (SO_RCVBUF) used by the socket server
-socket.receive.buffer=1048576
+socket.receive.buffer.bytes=1048576
 
 # The maximum size of a request that the socket server will accept (protection against OOM)
-max.socket.request.bytes=104857600
+socket.request.max.bytes=104857600
 
 
 ############################# Log Basics #############################
@@ -54,9 +54,6 @@ log.dir=/tmp/kafka-logs
 # for consumption, but also mean more files.
 num.partitions=1
 
-# Overrides for for the default given by num.partitions on a per-topic basis
-#topic.partition.count.map=topic1:3, topic2:4
-
 ############################# Log Flush Policy #############################
 
 # The following configurations control the flush of data to disk. This is the most
@@ -69,16 +66,13 @@ num.partitions=1
 # every N messages (or both). This can be done globally and overridden on a per-topic basis.
 
 # The number of messages to accept before forcing a flush of data to disk
-log.flush.interval=10000
+log.flush.interval.messages=10000
 
 # The maximum amount of time a message can sit in a log before we force a flush
-log.default.flush.interval.ms=1000
-
-# Per-topic overrides for log.default.flush.interval.ms
-#topic.flush.intervals.ms=topic1:1000, topic2:3000
+log.flush.interval.ms=1000
 
-# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
-log.default.flush.scheduler.interval.ms=1000
+# Per-topic overrides for log.flush.interval.ms
+#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000
 
 ############################# Log Retention Policy #############################
 
@@ -91,11 +85,11 @@ log.default.flush.scheduler.interval.ms=1000
 log.retention.hours=168
 
 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
-# segments don't drop below log.retention.size.
-#log.retention.size=1073741824
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
 
 # The maximum size of a log segment file. When this size is reached a new log segment will be created.
-log.file.size=536870912
+log.segment.bytes=536870912
 
 # The interval at which log segments are checked to see if they can be deleted according 
 # to the retention policies

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
index 7f70f9e..df17978 100644
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
+++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
@@ -71,7 +71,7 @@ public class DataGenerator {
 		System.out.println("server uri:" + _uri.toString());
         Properties producerProps = new Properties();
         producerProps.put("broker.list", String.format("%s:%d", _uri.getHost(), _uri.getPort()));
-        producerProps.put("buffer.size", String.valueOf(TCP_BUFFER_SIZE));
+        producerProps.put("send.buffer.bytes", String.valueOf(TCP_BUFFER_SIZE));
         producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT));
         producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL));
         

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
index 9a1c359..2fd2035 100644
--- a/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
+++ b/contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java
@@ -119,10 +119,9 @@ public class KafkaOutputFormat<W extends BytesWritable> extends OutputFormat<Nul
     job.setInt("kafka.output.compression_codec", compressionCodec);
 
     props.setProperty("producer.type", producerType);
-    props.setProperty("buffer.size", Integer.toString(bufSize));
+    props.setProperty("send.buffer.bytes", Integer.toString(bufSize));
     props.setProperty("connect.timeout.ms", Integer.toString(timeout));
     props.setProperty("reconnect.interval", Integer.toString(interval));
-    props.setProperty("max.message.size", Integer.toString(maxSize));
     props.setProperty("compression.codec", Integer.toString(compressionCodec));
 
     if (uri.getScheme().equals("kafka")) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 968a91f..af5d231 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -61,7 +61,7 @@ object ClientUtils extends Logging{
   def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String): TopicMetadataResponse = {
     val props = new Properties()
     props.put("broker.list", brokers.map(_.getConnectionString()).mkString(","))
-    props.put("clientid", clientId)
+    props.put("client.id", clientId)
     val producerConfig = new ProducerConfig(props)
     fetchTopicMetadata(topics, brokers, producerConfig, 0)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 9ca542f..ea5b5a0 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -261,11 +261,11 @@ class Partition(val topic: String,
         .format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(",")))
   }
 
-  def maybeShrinkIsr(replicaMaxLagTimeMs: Long,  replicaMaxLagBytes: Long) {
+  def maybeShrinkIsr(replicaMaxLagTimeMs: Long,  replicaMaxLagMessages: Long) {
     leaderIsrUpdateLock synchronized {
       leaderReplicaIfLocal() match {
         case Some(leaderReplica) =>
-          val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagBytes)
+          val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages)
           if(outOfSyncReplicas.size > 0) {
             val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
             assert(newInSyncReplicas.size > 0)
@@ -281,12 +281,12 @@ class Partition(val topic: String,
     }
   }
 
-  def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncBytes: Long): Set[Replica] = {
+  def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncMessages: Long): Set[Replica] = {
     /**
      * there are two cases that need to be handled here -
      * 1. Stuck followers: If the leo of the replica is less than the leo of leader and the leo hasn't been updated
      *                     for keepInSyncTimeMs ms, the follower is stuck and should be removed from the ISR
-     * 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncBytes, the
+     * 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncMessages, the
      *                     follower is not catching up and should be removed from the ISR
      **/
     val leaderLogEndOffset = leaderReplica.logEndOffset
@@ -298,7 +298,7 @@ class Partition(val topic: String,
     val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTimeMs < (time.milliseconds - keepInSyncTimeMs))
     debug("Stuck replicas for topic %s partition %d are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
     // Case 2 above
-    val slowReplicas = candidateReplicas.filter(r => r.logEndOffset >= 0 && (leaderLogEndOffset - r.logEndOffset) > keepInSyncBytes)
+    val slowReplicas = candidateReplicas.filter(r => r.logEndOffset >= 0 && (leaderLogEndOffset - r.logEndOffset) > keepInSyncMessages)
     debug("Slow replicas for topic %s partition %d are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(",")))
     stuckReplicas ++ slowReplicas
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index b857d14..5dffa7e 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -144,14 +144,14 @@ object ConsoleConsumer extends Logging {
     }
 
     val props = new Properties()
-    props.put("groupid", options.valueOf(groupIdOpt))
-    props.put("socket.buffersize", options.valueOf(socketBufferSizeOpt).toString)
-    props.put("fetch.size", options.valueOf(fetchSizeOpt).toString)
-    props.put("min.fetch.bytes", options.valueOf(minFetchBytesOpt).toString)
-    props.put("max.fetch.wait.ms", options.valueOf(maxWaitMsOpt).toString)
-    props.put("autocommit.enable", "true")
-    props.put("autocommit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString)
-    props.put("autooffset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest")
+    props.put("group.id", options.valueOf(groupIdOpt))
+    props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)
+    props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString)
+    props.put("fetch.min.bytes", options.valueOf(minFetchBytesOpt).toString)
+    props.put("fetch.wait.max.ms", options.valueOf(maxWaitMsOpt).toString)
+    props.put("auto.commit.enable", "true")
+    props.put("auto.commit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString)
+    props.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest")
     props.put("zk.connect", options.valueOf(zkConnectOpt))
     props.put("consumer.timeout.ms", options.valueOf(consumerTimeoutMsOpt).toString)
     val config = new ConsumerConfig(props)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index b379c9d..45db07b 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -52,11 +52,11 @@ object ConsumerConfig extends Config {
   }
 
   def validateClientId(clientId: String) {
-    validateChars("clientid", clientId)
+    validateChars("client.id", clientId)
   }
 
   def validateGroupId(groupId: String) {
-    validateChars("groupid", groupId)
+    validateChars("group.id", groupId)
   }
 
   def validateAutoOffsetReset(autoOffsetReset: String) {
@@ -77,38 +77,38 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
   }
 
   /** a string that uniquely identifies a set of consumers within the same consumer group */
-  val groupId = props.getString("groupid")
+  val groupId = props.getString("group.id")
 
   /** consumer id: generated automatically if not set.
    *  Set this explicitly for only testing purpose. */
-  val consumerId: Option[String] = Option(props.getString("consumerid", null))
+  val consumerId: Option[String] = Option(props.getString("consumer.id", null))
 
   /** the socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms. */
   val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout)
   
   /** the socket receive buffer for network requests */
-  val socketBufferSize = props.getInt("socket.buffersize", SocketBufferSize)
+  val socketReceiveBufferBytes = props.getInt("socket.receive.buffer.bytes", SocketBufferSize)
   
   /** the number of byes of messages to attempt to fetch */
-  val fetchSize = props.getInt("fetch.size", FetchSize)
+  val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize)
   
   /** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */
-  val autoCommit = props.getBoolean("autocommit.enable", AutoCommit)
+  val autoCommitEnable = props.getBoolean("auto.commit.enable", AutoCommit)
   
   /** the frequency in ms that the consumer offsets are committed to zookeeper */
-  val autoCommitIntervalMs = props.getInt("autocommit.interval.ms", AutoCommitInterval)
+  val autoCommitIntervalMs = props.getInt("auto.commit.interval.ms", AutoCommitInterval)
 
   /** max number of messages buffered for consumption */
-  val maxQueuedChunks = props.getInt("queuedchunks.max", MaxQueuedChunks)
+  val queuedMaxMessages = props.getInt("queued.max.messages", MaxQueuedChunks)
 
   /** max number of retries during rebalance */
-  val maxRebalanceRetries = props.getInt("rebalance.retries.max", MaxRebalanceRetries)
+  val rebalanceMaxRetries = props.getInt("rebalance.max.retries", MaxRebalanceRetries)
   
   /** the minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */
-  val minFetchBytes = props.getInt("min.fetch.bytes", MinFetchBytes)
+  val fetchMinBytes = props.getInt("fetch.min.bytes", MinFetchBytes)
   
-  /** the maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediate satisfy min.fetch.bytes */
-  val maxFetchWaitMs = props.getInt("max.fetch.wait.ms", MaxFetchWaitMs)
+  /** the maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes */
+  val fetchWaitMaxMs = props.getInt("fetch.wait.max.ms", MaxFetchWaitMs)
   
   /** backoff time between retries during rebalance */
   val rebalanceBackoffMs = props.getInt("rebalance.backoff.ms", zkSyncTimeMs)
@@ -120,7 +120,7 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
      smallest : automatically reset the offset to the smallest offset
      largest : automatically reset the offset to the largest offset
      anything else: throw exception to the consumer */
-  val autoOffsetReset = props.getString("autooffset.reset", AutoOffsetReset)
+  val autoOffsetReset = props.getString("auto.offset.reset", AutoOffsetReset)
 
   /** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
   val consumerTimeoutMs = props.getInt("consumer.timeout.ms", ConsumerTimeoutMs)
@@ -129,12 +129,12 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
    *  Typically, it's only used for mirroring raw messages from one kafka cluster to another to save the
    *  overhead of decompression.
    *  */
-  val enableShallowIterator = props.getBoolean("shallowiterator.enable", false)
+  val shallowIteratorEnable = props.getBoolean("shallow.iterator.enable", false)
 
   /**
    * Client id is specified by the kafka consumer client, used to distinguish different clients
    */
-  val clientId = props.getString("clientid", groupId)
+  val clientId = props.getString("client.id", groupId)
 
   validate(this)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 2ce024c..713c7c9 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -33,11 +33,11 @@ class ConsumerFetcherThread(name: String,
                                       clientId = config.clientId + "-" + name,
                                       sourceBroker = sourceBroker,
                                       socketTimeout = config.socketTimeoutMs,
-                                      socketBufferSize = config.socketBufferSize, 
-                                      fetchSize = config.fetchSize,
+                                      socketBufferSize = config.socketReceiveBufferBytes,
+                                      fetchSize = config.fetchMessageMaxBytes,
                                       fetcherBrokerId = Request.OrdinaryConsumerId,
-                                      maxWait = config.maxFetchWaitMs,
-                                      minBytes = config.minFetchBytes) {
+                                      maxWait = config.fetchWaitMaxMs,
+                                      minBytes = config.fetchMinBytes) {
 
   // process fetched data
   def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index aee9293..42a9628 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -112,7 +112,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
   connectZk()
   createFetcher()
-  if (config.autoCommit) {
+  if (config.autoCommitEnable) {
     scheduler.startup
     info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
     scheduler.scheduleWithRate(autoCommit, "Kafka-consumer-autocommit-", config.autoCommitIntervalMs,
@@ -160,14 +160,14 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       if (wildcardTopicWatcher != null)
         wildcardTopicWatcher.shutdown()
       try {
-        if (config.autoCommit)
+        if (config.autoCommitEnable)
           scheduler.shutdownNow()
         fetcher match {
           case Some(f) => f.shutdown
           case None =>
         }
         sendShutdownToAllQueues()
-        if (config.autoCommit)
+        if (config.autoCommitEnable)
           commitOffsets()
         if (zkClient != null) {
           zkClient.close()
@@ -194,9 +194,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     // make a list of (queue,stream) pairs, one pair for each threadId
     val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
       threadIdSet.map(_ => {
-        val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
+        val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
         val stream = new KafkaStream[K,V](
-          queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator, config.clientId)
+          queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.shallowIteratorEnable, config.clientId)
         (queue, stream)
       })
     ).flatten.toList
@@ -365,7 +365,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
     def syncedRebalance() {
       rebalanceLock synchronized {
-        for (i <- 0 until config.maxRebalanceRetries) {
+        for (i <- 0 until config.rebalanceMaxRetries) {
           info("begin rebalancing consumer " + consumerIdString + " try #" + i)
           var done = false
           val cluster = getCluster(zkClient)
@@ -393,7 +393,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         }
       }
 
-      throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.maxRebalanceRetries +" retries")
+      throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.rebalanceMaxRetries +" retries")
     }
 
     private def rebalance(cluster: Cluster): Boolean = {
@@ -610,7 +610,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                                  queue,
                                                  consumedOffset,
                                                  fetchedOffset,
-                                                 new AtomicInteger(config.fetchSize),
+                                                 new AtomicInteger(config.fetchMessageMaxBytes),
                                                  config.clientId)
       partTopicInfoMap.put(partition, partTopicInfo)
       debug(partTopicInfo + " selected new offset " + offset)
@@ -709,12 +709,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
     private val wildcardQueuesAndStreams = (1 to numStreams)
       .map(e => {
-        val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
+        val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
         val stream = new KafkaStream[K,V](queue, 
                                           config.consumerTimeoutMs, 
                                           keyDecoder, 
                                           valueDecoder, 
-                                          config.enableShallowIterator,
+                                          config.shallowIteratorEnable,
                                           config.clientId)
         (queue, stream)
     }).toList

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 5f0148c..497cfdd 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -43,15 +43,15 @@ private[kafka] class LogManager(val config: KafkaConfig,
   val CleanShutdownFile = ".kafka_cleanshutdown"
   val LockFile = ".lock"
   val logDirs: Array[File] = config.logDirs.map(new File(_)).toArray
-  private val logFileSizeMap = config.logFileSizeMap
-  private val logFlushInterval = config.flushInterval
-  private val logFlushIntervals = config.flushIntervalMap
+  private val logFileSizeMap = config.logSegmentBytesPerTopicMap
+  private val logFlushInterval = config.logFlushIntervalMessages
+  private val logFlushIntervals = config.logFlushIntervalMsPerTopicMap
   private val logCreationLock = new Object
-  private val logRetentionSizeMap = config.logRetentionSizeMap
-  private val logRetentionMsMap = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
-  private val logRollMsMap = config.logRollHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L))
+  private val logRetentionSizeMap = config.logRetentionBytesPerTopicMap
+  private val logRetentionMsMap = config.logRetentionHoursPerTopicMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
+  private val logRollMsMap = config.logRollHoursPerTopicMap.map(e => (e._1, e._2 * 60 * 60 * 1000L))
   private val logRollDefaultIntervalMs = 1000L * 60 * 60 * config.logRollHours
-  private val logCleanupIntervalMs = 1000L * 60 * config.logCleanupIntervalMinutes
+  private val logCleanupIntervalMs = 1000L * 60 * config.logCleanupIntervalMins
   private val logCleanupDefaultAgeMs = 1000L * 60 * 60 * config.logRetentionHours
 
   this.logIdent = "[Log Manager on Broker " + config.brokerId + "] "
@@ -111,14 +111,14 @@ private[kafka] class LogManager(val config: KafkaConfig,
             info("Loading log '" + dir.getName + "'")
             val topicPartition = parseTopicPartitionName(dir.getName)
             val rollIntervalMs = logRollMsMap.get(topicPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
-            val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logFileSize)
+            val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logSegmentBytes)
             val log = new Log(dir, 
                               maxLogFileSize, 
-                              config.maxMessageSize, 
+                              config.messageMaxBytes,
                               logFlushInterval, 
                               rollIntervalMs, 
                               needsRecovery, 
-                              config.logIndexMaxSizeBytes,
+                              config.logIndexSizeMaxBytes,
                               config.logIndexIntervalBytes,
                               time, 
                               config.brokerId)
@@ -139,10 +139,10 @@ private[kafka] class LogManager(val config: KafkaConfig,
     if(scheduler != null) {
       info("Starting log cleaner every " + logCleanupIntervalMs + " ms")
       scheduler.scheduleWithRate(cleanupLogs, "kafka-logcleaner-", 60 * 1000, logCleanupIntervalMs, false)
-      info("Starting log flusher every " + config.flushSchedulerThreadRate +
+      info("Starting log flusher every " + config.logFlushSchedulerIntervalMs +
                    " ms with the following overrides " + logFlushIntervals)
       scheduler.scheduleWithRate(flushDirtyLogs, "kafka-logflusher-",
-                                 config.flushSchedulerThreadRate, config.flushSchedulerThreadRate, false)
+                                 config.logFlushSchedulerIntervalMs, config.logFlushSchedulerIntervalMs, false)
     }
   }
   
@@ -186,14 +186,14 @@ private[kafka] class LogManager(val config: KafkaConfig,
       val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition)
       dir.mkdirs()
       val rollIntervalMs = logRollMsMap.get(topicAndPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
-      val maxLogFileSize = logFileSizeMap.get(topicAndPartition.topic).getOrElse(config.logFileSize)
+      val maxLogFileSize = logFileSizeMap.get(topicAndPartition.topic).getOrElse(config.logSegmentBytes)
       log = new Log(dir, 
                     maxLogFileSize, 
-                    config.maxMessageSize, 
+                    config.messageMaxBytes,
                     logFlushInterval, 
                     rollIntervalMs, 
                     needsRecovery = false, 
-                    config.logIndexMaxSizeBytes, 
+                    config.logIndexSizeMaxBytes,
                     config.logIndexIntervalBytes, 
                     time, 
                     config.brokerId)
@@ -249,7 +249,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
    */
   private def cleanupSegmentsToMaintainSize(log: Log): Int = {
     val topic = parseTopicPartitionName(log.dir.getName).topic
-    val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionSize)
+    val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionBytes)
     if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize) return 0
     var diff = log.size - maxLogRetentionSize
     def shouldDelete(segment: LogSegment) = {
@@ -310,7 +310,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
     for (log <- allLogs) {
       try {
         val timeSinceLastFlush = System.currentTimeMillis - log.getLastFlushedTime
-        var logFlushInterval = config.defaultFlushIntervalMs
+        var logFlushInterval = config.logFlushIntervalMs
         if(logFlushIntervals.contains(log.topicName))
           logFlushInterval = logFlushIntervals(log.topicName)
         debug(log.topicName + " flush interval  " + logFlushInterval +

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/producer/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index 4e2f2af..1a98174 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -125,12 +125,12 @@ object ConsoleProducer {
     props.put("compression.codec", codec.toString)
     props.put("producer.type", if(sync) "sync" else "async")
     if(options.has(batchSizeOpt))
-      props.put("batch.size", batchSize.toString)
-    props.put("queue.time", sendTimeout.toString)
-    props.put("queue.size", queueSize.toString)
+      props.put("batch.num.messages", batchSize.toString)
+    props.put("queue.buffering.max.ms", sendTimeout.toString)
+    props.put("queue.buffering.max.messages", queueSize.toString)
     props.put("queue.enqueueTimeout.ms", queueEnqueueTimeoutMs.toString)
-    props.put("producer.request.required.acks", requestRequiredAcks.toString)
-    props.put("producer.request.timeout.ms", requestTimeoutMs.toString)
+    props.put("request.required.acks", requestRequiredAcks.toString)
+    props.put("request.timeout.ms", requestTimeoutMs.toString)
     props.put("key.serializer.class", keyEncoderClass)
     props.put("serializer.class", valueEncoderClass)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
index a7c101a..af077e0 100644
--- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
+++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
@@ -73,8 +73,8 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
     //These have default values in ProducerConfig and AsyncProducerConfig. We don't care if they're not specified
     if(producerType != null) props.put("producer.type", producerType)
     if(compressionCodec != null) props.put("compression.codec", compressionCodec)
-    if(enqueueTimeout != null) props.put("queue.enqueueTimeout.ms", enqueueTimeout)
-    if(queueSize != null) props.put("queue.size", queueSize)
+    if(enqueueTimeout != null) props.put("queue.enqueue.timeout.ms", enqueueTimeout)
+    if(queueSize != null) props.put("queue.buffering.max.messages", queueSize)
     val config : ProducerConfig = new ProducerConfig(props)
     producer = new Producer[String, String](config)
     LogLog.debug("Kafka producer connected to " +  config.brokerList)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/producer/Producer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala
index a183525..66638f2 100644
--- a/core/src/main/scala/kafka/producer/Producer.scala
+++ b/core/src/main/scala/kafka/producer/Producer.scala
@@ -31,7 +31,7 @@ class Producer[K,V](config: ProducerConfig,
   extends Logging {
 
   private val hasShutdown = new AtomicBoolean(false)
-  private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueSize)
+  private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)
 
   private val random = new Random
   private var sync: Boolean = true
@@ -44,8 +44,8 @@ class Producer[K,V](config: ProducerConfig,
       producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + asyncProducerID, 
                                                        queue,
                                                        eventHandler, 
-                                                       config.queueTime, 
-                                                       config.batchSize,
+                                                       config.queueBufferingMaxMs,
+                                                       config.batchNumMessages,
                                                        config.clientId)
       producerSendThread.start()
   }
@@ -87,17 +87,17 @@ class Producer[K,V](config: ProducerConfig,
 
   private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
     for (message <- messages) {
-      val added = config.enqueueTimeoutMs match {
+      val added = config.queueEnqueueTimeoutMs match {
         case 0  =>
           queue.offer(message)
         case _  =>
           try {
-            config.enqueueTimeoutMs < 0 match {
+            config.queueEnqueueTimeoutMs < 0 match {
             case true =>
               queue.put(message)
               true
             case _ =>
-              queue.offer(message, config.enqueueTimeoutMs, TimeUnit.MILLISECONDS)
+              queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)
             }
           }
           catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/producer/ProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala
index 235b228..e27ec44 100644
--- a/core/src/main/scala/kafka/producer/ProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala
@@ -26,12 +26,12 @@ import kafka.common.{InvalidConfigException, Config}
 object ProducerConfig extends Config {
   def validate(config: ProducerConfig) {
     validateClientId(config.clientId)
-    validateBatchSize(config.batchSize, config.queueSize)
+    validateBatchSize(config.batchNumMessages, config.queueBufferingMaxMessages)
     validateProducerType(config.producerType)
   }
 
   def validateClientId(clientId: String) {
-    validateChars("clientid", clientId)
+    validateChars("client.id", clientId)
   }
 
   def validateBatchSize(batchSize: Int, queueSize: Int) {
@@ -101,17 +101,16 @@ class ProducerConfig private (val props: VerifiableProperties)
    */
   val compressedTopics = Utils.parseCsvList(props.getString("compressed.topics", null))
 
-  /**
-   * The producer using the zookeeper software load balancer maintains a ZK cache that gets
-   * updated by the zookeeper watcher listeners. During some events like a broker bounce, the
-   * producer ZK cache can get into an inconsistent state, for a small time period. In this time
-   * period, it could end up picking a broker partition that is unavailable. When this happens, the
-   * ZK cache needs to be updated.
-   * This parameter specifies the number of times the producer attempts to refresh this ZK cache.
-   */
-  val producerRetries = props.getInt("producer.num.retries", 3)
+  /** The leader may be unavailable transiently, which can fail the sending of a message.
+    *  This property specifies the number of retries when such failures occur.
+    */
+  val messageSendMaxRetries = props.getInt("message.send.max.retries", 3)
 
-  val producerRetryBackoffMs = props.getInt("producer.retry.backoff.ms", 100)
+  /** Before each retry, the producer refreshes the metadata of relevant topics. Since leader
+    * election takes a bit of time, this property specifies the amount of time that the producer
+    * waits before refreshing the metadata.
+    */
+  val retryBackoffMs = props.getInt("retry.backoff.ms", 100)
 
   /**
    * The producer generally refreshes the topic metadata from brokers when there is a failure
@@ -121,7 +120,7 @@ class ProducerConfig private (val props: VerifiableProperties)
    * Important note: the refresh happen only AFTER the message is sent, so if the producer never sends
    * a message the metadata is never refreshed
    */
-  val topicMetadataRefreshIntervalMs = props.getInt("producer.metadata.refresh.interval.ms", 600000)
+  val topicMetadataRefreshIntervalMs = props.getInt("topic.metadata.refresh.interval.ms", 600000)
 
   validate(this)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 0ef320b..0469a39 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -36,7 +36,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
   private val lock = new Object()
   @volatile private var shutdown: Boolean = false
   private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
-    config.bufferSize, config.requestTimeoutMs)
+    config.sendBufferBytes, config.requestTimeoutMs)
   val brokerInfo = "host_%s-port_%s".format(config.host, config.port)
   val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
index 5ebd29a..ef32620 100644
--- a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
@@ -36,24 +36,22 @@ class SyncProducerConfig private (val props: VerifiableProperties) extends SyncP
 trait SyncProducerConfigShared {
   val props: VerifiableProperties
   
-  val bufferSize = props.getInt("buffer.size", 100*1024)
-
-  val maxMessageSize = props.getInt("max.message.size", 1000000)
+  val sendBufferBytes = props.getInt("send.buffer.bytes", 100*1024)
 
   /* the client application sending the producer requests */
-  val clientId = props.getString("clientid", SyncProducerConfig.DefaultClientId)
+  val clientId = props.getString("client.id", SyncProducerConfig.DefaultClientId)
 
   /*
    * The required acks of the producer requests - negative value means ack
    * after the replicas in ISR have caught up to the leader's offset
    * corresponding to this produce request.
    */
-  val requiredAcks = props.getShort("producer.request.required.acks", SyncProducerConfig.DefaultRequiredAcks)
+  val requestRequiredAcks = props.getShort("request.required.acks", SyncProducerConfig.DefaultRequiredAcks)
 
   /*
    * The ack timeout of the producer requests. Value must be non-negative and non-zero
    */
-  val requestTimeoutMs = props.getIntInRange("producer.request.timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs,
+  val requestTimeoutMs = props.getIntInRange("request.timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs,
                                              (1, Integer.MAX_VALUE))
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
index 07935d7..973fa08 100644
--- a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
@@ -22,10 +22,10 @@ trait AsyncProducerConfig {
   val props: VerifiableProperties
 
   /* maximum time, in milliseconds, for buffering data on the producer queue */
-  val queueTime = props.getInt("queue.time", 5000)
+  val queueBufferingMaxMs = props.getInt("queue.buffering.max.ms", 5000)
 
   /** the maximum size of the blocking queue for buffering on the producer */
-  val queueSize = props.getInt("queue.size", 10000)
+  val queueBufferingMaxMessages = props.getInt("queue.buffering.max.messages", 10000)
 
   /**
    * Timeout for event enqueue:
@@ -33,10 +33,10 @@ trait AsyncProducerConfig {
    * -ve: enqueue will block indefinitely if the queue is full
    * +ve: enqueue will block up to this many milliseconds if the queue is full
    */
-  val enqueueTimeoutMs = props.getInt("queue.enqueueTimeout.ms", 0)
+  val queueEnqueueTimeoutMs = props.getInt("queue.enqueue.timeout.ms", 0)
 
   /** the number of messages batched at the producer */
-  val batchSize = props.getInt("batch.size", 200)
+  val batchNumMessages = props.getInt("batch.num.messages", 200)
 
   /** the serializer class for values */
   val serializerClass = props.getString("serializer.class", "kafka.serializer.DefaultEncoder")

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 58f582f..9a4e4bc 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -59,7 +59,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
           producerTopicStats.getProducerAllTopicStats.byteRate.mark(dataSize)
       }
       var outstandingProduceRequests = serializedData
-      var remainingRetries = config.producerRetries + 1
+      var remainingRetries = config.messageSendMaxRetries + 1
       val correlationIdStart = correlationId.get()
       while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
         topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
@@ -72,7 +72,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
         outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
         if (outstandingProduceRequests.size > 0)  {
           // back off and update the topic metadata cache before attempting another send operation
-          Thread.sleep(config.producerRetryBackoffMs)
+          Thread.sleep(config.retryBackoffMs)
           // get topics of the outstanding produce requests and refresh metadata for those
           Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))
           remainingRetries -= 1
@@ -81,9 +81,10 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
       }
       if(outstandingProduceRequests.size > 0) {
         producerStats.failedSendRate.mark()
+
         val correlationIdEnd = correlationId.get()
         error("Failed to send the following requests with correlation ids in [%d,%d]: %s".format(correlationIdStart, correlationIdEnd-1, outstandingProduceRequests))
-        throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null)
+        throw new FailedToSendMessageException("Failed to send messages after " + config.messageSendMaxRetries + " tries.", null)
       }
     }
   }
@@ -231,7 +232,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
       messagesPerTopic.keys.toSeq
     } else if(messagesPerTopic.size > 0) {
       val currentCorrelationId = correlationId.getAndIncrement
-      val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requiredAcks,
+      val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks,
         config.requestTimeoutMs, messagesPerTopic)
       var failedTopicPartitions = Seq.empty[TopicAndPartition]
       try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index e2dfb3e..60752fb 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -41,9 +41,9 @@ class KafkaApis(val requestChannel: RequestChannel,
                 brokerId: Int) extends Logging {
 
   private val producerRequestPurgatory =
-    new ProducerRequestPurgatory(replicaManager.config.producerRequestPurgatoryPurgeInterval)
+    new ProducerRequestPurgatory(replicaManager.config.producerPurgatoryPurgeIntervalRequests)
   private val fetchRequestPurgatory =
-    new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchRequestPurgatoryPurgeInterval)
+    new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchPurgatoryPurgeIntervalRequests)
   private val delayedRequestMetrics = new DelayedRequestMetrics
 
   private val requestLogger = Logger.getLogger("kafka.request.logger")
@@ -442,7 +442,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           case ErrorMapping.UnknownTopicOrPartitionCode =>
             try {
               /* check if auto creation of topics is turned on */
-              if (config.autoCreateTopics) {
+              if (config.autoCreateTopicsEnable) {
                 try {
                   CreateTopicCommand.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
                   info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 962b65f..f65db33 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -36,37 +36,37 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /*********** General Configuration ***********/
   
   /* the broker id for this server */
-  val brokerId: Int = props.getIntInRange("brokerid", (0, Int.MaxValue))
+  val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue))
 
   /* the maximum size of message that the server can receive */
-  val maxMessageSize = props.getIntInRange("max.message.size", 1000000, (0, Int.MaxValue))
+  val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000, (0, Int.MaxValue))
   
   /* the number of network threads that the server uses for handling network requests */
-  val numNetworkThreads = props.getIntInRange("network.threads", 3, (1, Int.MaxValue))
+  val numNetworkThreads = props.getIntInRange("num.network.threads", 3, (1, Int.MaxValue))
 
   /* the number of io threads that the server uses for carrying out network requests */
-  val numIoThreads = props.getIntInRange("io.threads", 8, (1, Int.MaxValue))
+  val numIoThreads = props.getIntInRange("num.io.threads", 8, (1, Int.MaxValue))
   
   /* the number of queued requests allowed before blocking the network threads */
-  val numQueuedRequests = props.getIntInRange("max.queued.requests", 500, (1, Int.MaxValue))
+  val queuedMaxRequests = props.getIntInRange("queued.max.requests", 500, (1, Int.MaxValue))
   
   /*********** Socket Server Configuration ***********/
   
   /* the port to listen and accept connections on */
   val port: Int = props.getInt("port", 6667)
 
-  /* hostname of broker. If this is set, it will only bind to this address.  If this is not set,
+  /* hostname of broker. If this is set, it will only bind to this address. If this is not set,
    * it will bind to all interfaces, and publish one to ZK */
-  val hostName: String = props.getString("hostname", null)
+  val hostName: String = props.getString("host.name", null)
 
   /* the SO_SNDBUFF buffer of the socket sever sockets */
-  val socketSendBuffer: Int = props.getInt("socket.send.buffer", 100*1024)
+  val socketSendBufferBytes: Int = props.getInt("socket.send.buffer.bytes", 100*1024)
   
   /* the SO_RCVBUFF buffer of the socket sever sockets */
-  val socketReceiveBuffer: Int = props.getInt("socket.receive.buffer", 100*1024)
+  val socketReceiveBufferBytes: Int = props.getInt("socket.receive.buffer.bytes", 100*1024)
   
   /* the maximum number of bytes in a socket request */
-  val maxSocketRequestSize: Int = props.getIntInRange("max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
+  val socketRequestMaxBytes: Int = props.getIntInRange("socket.request.max.bytes", 100*1024*1024, (1, Int.MaxValue))
   
   /*********** Log Configuration ***********/
 
@@ -74,56 +74,56 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue))
   
   /* the directories in which the log data is kept */
-  val logDirs = Utils.parseCsvList(props.getString("log.directories", props.getString("log.dir", "")))
+  val logDirs = Utils.parseCsvList(props.getString("log.dirs", props.getString("log.dir", "/tmp/kafka-logs")))
   require(logDirs.size > 0)
   
   /* the maximum size of a single log file */
-  val logFileSize = props.getIntInRange("log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
+  val logSegmentBytes = props.getIntInRange("log.segment.bytes", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
 
   /* the maximum size of a single log file for some specific topic */
-  val logFileSizeMap = props.getMap("topic.log.file.size", _.toInt > 0).mapValues(_.toInt)
+  val logSegmentBytesPerTopicMap = props.getMap("log.segment.bytes.per.topic", _.toInt > 0).mapValues(_.toInt)
 
   /* the maximum time before a new log segment is rolled out */
   val logRollHours = props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue))
 
   /* the number of hours before rolling out a new log segment for some specific topic */
-  val logRollHoursMap = props.getMap("topic.log.roll.hours", _.toInt > 0).mapValues(_.toInt)  
+  val logRollHoursPerTopicMap = props.getMap("log.roll.hours.per.topic", _.toInt > 0).mapValues(_.toInt)
 
   /* the number of hours to keep a log file before deleting it */
   val logRetentionHours = props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue))
 
   /* the number of hours to keep a log file before deleting it for some specific topic*/
-  val logRetentionHoursMap = props.getMap("topic.log.retention.hours", _.toInt > 0).mapValues(_.toInt)
+  val logRetentionHoursPerTopicMap = props.getMap("log.retention.hours.per.topic", _.toInt > 0).mapValues(_.toInt)
 
   /* the maximum size of the log before deleting it */
-  val logRetentionSize = props.getLong("log.retention.size", -1)
+  val logRetentionBytes = props.getLong("log.retention.bytes", -1)
 
   /* the maximum size of the log for some specific topic before deleting it */
-  val logRetentionSizeMap = props.getMap("topic.log.retention.size", _.toLong > 0).mapValues(_.toLong)
+  val logRetentionBytesPerTopicMap = props.getMap("log.retention.bytes.per.topic", _.toLong > 0).mapValues(_.toLong)
 
   /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
-  val logCleanupIntervalMinutes = props.getIntInRange("log.cleanup.interval.mins", 10, (1, Int.MaxValue))
+  val logCleanupIntervalMins = props.getIntInRange("log.cleanup.interval.mins", 10, (1, Int.MaxValue))
   
   /* the maximum size in bytes of the offset index */
-  val logIndexMaxSizeBytes = props.getIntInRange("log.index.max.size", 10*1024*1024, (4, Int.MaxValue))
+  val logIndexSizeMaxBytes = props.getIntInRange("log.index.size.max.bytes", 10*1024*1024, (4, Int.MaxValue))
   
   /* the interval with which we add an entry to the offset index */
   val logIndexIntervalBytes = props.getIntInRange("log.index.interval.bytes", 4096, (0, Int.MaxValue))
 
   /* the number of messages accumulated on a log partition before messages are flushed to disk */
-  val flushInterval = props.getIntInRange("log.flush.interval", 500, (1, Int.MaxValue))
+  val logFlushIntervalMessages = props.getIntInRange("log.flush.interval.messages", 500, (1, Int.MaxValue))
 
   /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000  */
-  val flushIntervalMap = props.getMap("topic.flush.intervals.ms", _.toInt > 0).mapValues(_.toInt)
+  val logFlushIntervalMsPerTopicMap = props.getMap("log.flush.interval.ms.per.topic", _.toInt > 0).mapValues(_.toInt)
 
   /* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */
-  val flushSchedulerThreadRate = props.getInt("log.default.flush.scheduler.interval.ms",  3000)
+  val logFlushSchedulerIntervalMs = props.getInt("log.flush.scheduler.interval.ms",  3000)
 
   /* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */
-  val defaultFlushIntervalMs = props.getInt("log.default.flush.interval.ms", flushSchedulerThreadRate)
+  val logFlushIntervalMs = props.getInt("log.flush.interval.ms", logFlushSchedulerIntervalMs)
 
   /* enable auto creation of topic on the server */
-  val autoCreateTopics = props.getBoolean("auto.create.topics", true)
+  val autoCreateTopicsEnable = props.getBoolean("auto.create.topics.enable", true)
 
   /*********** Replication configuration ***********/
 
@@ -136,36 +136,38 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* default replication factors for automatically created topics */
   val defaultReplicationFactor = props.getInt("default.replication.factor", 1)
 
-  val replicaMaxLagTimeMs = props.getLong("replica.max.lag.time.ms", 10000)
+  /* If a follower hasn't sent any fetch requests during this time, the leader will remove the follower from isr */
+  val replicaLagTimeMaxMs = props.getLong("replica.lag.time.max.ms", 10000)
 
-  val replicaMaxLagBytes = props.getLong("replica.max.lag.bytes", 4000)
+  /* If the lag in messages between a leader and a follower exceeds this number, the leader will remove the follower from isr */
+  val replicaLagMaxMessages = props.getLong("replica.lag.max.messages", 4000)
 
   /* the socket timeout for network requests */
   val replicaSocketTimeoutMs = props.getInt("replica.socket.timeout.ms", ConsumerConfig.SocketTimeout)
 
   /* the socket receive buffer for network requests */
-  val replicaSocketBufferSize = props.getInt("replica.socket.buffersize", ConsumerConfig.SocketBufferSize)
+  val replicaSocketReceiveBufferBytes = props.getInt("replica.socket.receive.buffer.bytes", ConsumerConfig.SocketBufferSize)
 
   /* the number of byes of messages to attempt to fetch */
-  val replicaFetchSize = props.getInt("replica.fetch.size", ConsumerConfig.FetchSize)
+  val replicaFetchMaxBytes = props.getInt("replica.fetch.max.bytes", ConsumerConfig.FetchSize)
 
   /* max wait time for each fetcher request issued by follower replicas*/
-  val replicaMaxWaitTimeMs = props.getInt("replica.fetch.wait.time.ms", 500)
+  val replicaFetchWaitMaxMs = props.getInt("replica.fetch.wait.max.ms", 500)
 
   /* minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs */
-  val replicaMinBytes = props.getInt("replica.fetch.min.bytes", 1)
+  val replicaFetchMinBytes = props.getInt("replica.fetch.min.bytes", 1)
 
   /* number of fetcher threads used to replicate messages from a source broker.
    * Increasing this value can increase the degree of I/O parallelism in the follower broker. */
-  val numReplicaFetchers = props.getInt("replica.fetchers", 1)
+  val numReplicaFetchers = props.getInt("num.replica.fetchers", 1)
   
-  /* the frequency with which the highwater mark is saved out to disk */
-  val highWaterMarkCheckpointIntervalMs = props.getLong("replica.highwatermark.checkpoint.ms", 5000L)
+  /* the frequency with which the high watermark is saved out to disk */
+  val replicaHighWatermarkCheckpointIntervalMs = props.getLong("replica.high.watermark.checkpoint.interval.ms", 5000L)
 
   /* the purge interval (in number of requests) of the fetch request purgatory */
-  val fetchRequestPurgatoryPurgeInterval = props.getInt("fetch.purgatory.purge.interval", 10000)
+  val fetchPurgatoryPurgeIntervalRequests = props.getInt("fetch.purgatory.purge.interval.requests", 10000)
 
   /* the purge interval (in number of requests) of the producer request purgatory */
-  val producerRequestPurgatoryPurgeInterval = props.getInt("producer.purgatory.purge.interval", 10000)
+  val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 10000)
 
  }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index ae35e4f..1fe1ca9 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -65,8 +65,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
                                     config.hostName,
                                     config.port,
                                     config.numNetworkThreads,
-                                    config.numQueuedRequests,
-                                    config.maxSocketRequestSize)
+                                    config.queuedMaxRequests,
+                                    config.socketRequestMaxBytes)
 
     socketServer.startup
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index c1d3235..6ae601e 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -31,11 +31,11 @@ class ReplicaFetcherThread(name:String,
                                 clientId = FetchRequest.ReplicaFetcherClientId,
                                 sourceBroker = sourceBroker,
                                 socketTimeout = brokerConfig.replicaSocketTimeoutMs,
-                                socketBufferSize = brokerConfig.replicaSocketBufferSize,
-                                fetchSize = brokerConfig.replicaFetchSize,
+                                socketBufferSize = brokerConfig.replicaSocketReceiveBufferBytes,
+                                fetchSize = brokerConfig.replicaFetchMaxBytes,
                                 fetcherBrokerId = brokerConfig.brokerId,
-                                maxWait = brokerConfig.replicaMaxWaitTimeMs,
-                                minBytes = brokerConfig.replicaMinBytes) {
+                                maxWait = brokerConfig.replicaFetchWaitMaxMs,
+                                minBytes = brokerConfig.replicaFetchMinBytes) {
 
   // process fetched data
   def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 42068ca..064af6b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -72,7 +72,7 @@ class ReplicaManager(val config: KafkaConfig,
 
   def startHighWaterMarksCheckPointThread() = {
     if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
-      kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.highWaterMarkCheckpointIntervalMs)
+      kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.replicaHighWatermarkCheckpointIntervalMs)
   }
 
   /**
@@ -91,7 +91,7 @@ class ReplicaManager(val config: KafkaConfig,
 
   def startup() {
     // start ISR expiration thread
-    kafkaScheduler.scheduleWithRate(maybeShrinkIsr, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs)
+    kafkaScheduler.scheduleWithRate(maybeShrinkIsr, "isr-expiration-thread-", 0, config.replicaLagTimeMaxMs)
   }
 
   def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short  = {
@@ -244,7 +244,7 @@ class ReplicaManager(val config: KafkaConfig,
   private def maybeShrinkIsr(): Unit = {
     trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
     leaderPartitionsLock synchronized {
-      leaderPartitions.foreach(partition => partition.maybeShrinkIsr(config.replicaMaxLagTimeMs, config.replicaMaxLagBytes))
+      leaderPartitions.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
index 36a119b..1f5c7ba 100644
--- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
+++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@ -182,9 +182,9 @@ public class KafkaMigrationTool
       Properties kafkaConsumerProperties_07 = new Properties();
       kafkaConsumerProperties_07.load(new FileInputStream(consumerConfigFile_07));
       /** Disable shallow iteration because the message format is different between 07 and 08, we have to get each individual message **/
-      if(kafkaConsumerProperties_07.getProperty("shallowiterator.enable", "").equals("true")){
+      if(kafkaConsumerProperties_07.getProperty("shallow.iterator.enable", "").equals("true")){
         logger.warn("Shallow iterator should not be used in the migration tool");
-        kafkaConsumerProperties_07.setProperty("shallowiterator.enable", "false");
+        kafkaConsumerProperties_07.setProperty("shallow.iterator.enable", "false");
       }
       Object consumerConfig_07 = ConsumerConfigConstructor_07.newInstance(kafkaConsumerProperties_07);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index db14c82..d744a78 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -42,12 +42,12 @@ object ReplayLogProducer extends Logging {
 
     // consumer properties
     val consumerProps = new Properties
-    consumerProps.put("groupid", GroupId)
+    consumerProps.put("group.id", GroupId)
     consumerProps.put("zk.connect", config.zkConnect)
     consumerProps.put("consumer.timeout.ms", "10000")
-    consumerProps.put("autooffset.reset", OffsetRequest.SmallestTimeString)
-    consumerProps.put("fetch.size", (1024*1024).toString)
-    consumerProps.put("socket.buffer.size", (2 * 1024 * 1024).toString)
+    consumerProps.put("auto.offset.reset", OffsetRequest.SmallestTimeString)
+    consumerProps.put("fetch.message.max.bytes", (1024*1024).toString)
+    consumerProps.put("socket.receive.buffer.bytes", (2 * 1024 * 1024).toString)
     val consumerConfig = new ConsumerConfig(consumerProps)
     val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
     val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(config.inputTopic -> config.numThreads))
@@ -141,10 +141,10 @@ object ReplayLogProducer extends Logging {
     val props = new Properties()
     props.put("broker.list", config.brokerList)
     props.put("reconnect.interval", Integer.MAX_VALUE.toString)
-    props.put("buffer.size", (64*1024).toString)
+    props.put("send.buffer.bytes", (64*1024).toString)
     props.put("compression.codec", config.compressionCodec.codec.toString)
-    props.put("batch.size", config.batchSize.toString)
-    props.put("queue.enqueueTimeout.ms", "-1")
+    props.put("batch.num.messages", config.batchSize.toString)
+    props.put("queue.enqueue.timeout.ms", "-1")
     
     if(config.isAsync)
       props.put("producer.type", "async")

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 5ba5938..f594404 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -785,11 +785,11 @@ class ZKConfig(props: VerifiableProperties) {
   val zkConnect = props.getString("zk.connect", null)
 
   /** zookeeper session timeout */
-  val zkSessionTimeoutMs = props.getInt("zk.sessiontimeout.ms", 6000)
+  val zkSessionTimeoutMs = props.getInt("zk.session.timeout.ms", 6000)
 
   /** the max time that the client waits to establish a connection to zookeeper */
-  val zkConnectionTimeoutMs = props.getInt("zk.connectiontimeout.ms",zkSessionTimeoutMs)
+  val zkConnectionTimeoutMs = props.getInt("zk.connection.timeout.ms",zkSessionTimeoutMs)
 
   /** how far a ZK follower can be behind a ZK leader */
-  val zkSyncTimeMs = props.getInt("zk.synctime.ms", 2000)
+  val zkSyncTimeMs = props.getInt("zk.sync.time.ms", 2000)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
index 5be4f4e..98c12b7 100644
--- a/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
+++ b/core/src/test/scala/other/kafka/TestEndToEndLatency.scala
@@ -35,9 +35,9 @@ object TestEndToEndLatency {
     val topic = "test"
     
     val consumerProps = new Properties()
-    consumerProps.put("groupid", topic)
+    consumerProps.put("group.id", topic)
     consumerProps.put("auto.commit", "true")
-    consumerProps.put("autooffset.reset", "largest")
+    consumerProps.put("auto.offset.reset", "largest")
     consumerProps.put("zk.connect", zkConnect)
     consumerProps.put("socket.timeout.ms", 1201000.toString)
     

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/test/scala/other/kafka/TestLogPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLogPerformance.scala b/core/src/test/scala/other/kafka/TestLogPerformance.scala
index 75c33e0..9f3bb40 100644
--- a/core/src/test/scala/other/kafka/TestLogPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestLogPerformance.scala
@@ -33,7 +33,7 @@ object TestLogPerformance {
     val props = TestUtils.createBrokerConfig(0, -1)
     val config = new KafkaConfig(props)
     val dir = TestUtils.tempDir()
-    val log = new Log(dir, 50*1024*1024, config.maxMessageSize, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, time = SystemTime)
+    val log = new Log(dir, 50*1024*1024, config.messageMaxBytes, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, time = SystemTime)
     val bytes = new Array[Byte](messageSize)
     new java.util.Random().nextBytes(bytes)
     val message = new Message(bytes)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala b/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
index 5b72eed..31534ca 100644
--- a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
+++ b/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
@@ -31,7 +31,7 @@ object TestZKConsumerOffsets {
     val topic = args(1)
     val autoOffsetReset = args(2)    
     val props = Utils.loadProps(args(0))
-    props.put("autooffset.reset", "largest")
+    props.put("auto.offset.reset", "largest")
     
     val config = new ConsumerConfig(props)
     val consumerConnector: ConsumerConnector = Consumer.create(config)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index d7945a5..4c646f0 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -78,9 +78,9 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
     // update offset in zookeeper for consumer to jump "forward" in time
     val dirs = new ZKGroupTopicDirs(group, topic)
     var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer)
-    consumerProps.put("autooffset.reset", resetTo)
+    consumerProps.put("auto.offset.reset", resetTo)
     consumerProps.put("consumer.timeout.ms", "2000")
-    consumerProps.put("max.fetch.wait.ms", "0")
+    consumerProps.put("fetch.wait.max.ms", "0")
     val consumerConfig = new ConsumerConfig(consumerProps)
 
     TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0", offset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
index caea858..0fde254 100644
--- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
@@ -35,12 +35,12 @@ trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarnes
       val props = new Properties()
       props.put("partitioner.class", "kafka.utils.StaticPartitioner")
       props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
-      props.put("buffer.size", "65536")
+      props.put("send.buffer.bytes", "65536")
       props.put("connect.timeout.ms", "100000")
       props.put("reconnect.interval", "10000")
-      props.put("producer.retry.backoff.ms", "1000")
-      props.put("producer.num.retries", "3")
-      props.put("producer.request.required.acks", "-1")
+      props.put("retry.backoff.ms", "1000")
+      props.put("message.send.max.retries", "3")
+      props.put("request.required.acks", "-1")
       props.put("serializer.class", classOf[StringEncoder].getName.toString)
       producer = new Producer(new ProducerConfig(props))
       consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index b06d812..ce893bf 100644
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -40,8 +40,8 @@ class LogManagerTest extends JUnit3Suite {
   override def setUp() {
     super.setUp()
     config = new KafkaConfig(TestUtils.createBrokerConfig(0, -1)) {
-                   override val logFileSize = 1024
-                   override val flushInterval = 10000
+                   override val logSegmentBytes = 1024
+                   override val logFlushIntervalMessages = 10000
                    override val logRetentionHours = maxLogAgeHours
                  }
     scheduler.startup
@@ -114,10 +114,10 @@ class LogManagerTest extends JUnit3Suite {
     val props = TestUtils.createBrokerConfig(0, -1)
     logManager.shutdown()
     config = new KafkaConfig(props) {
-      override val logFileSize = (10 * (setSize - 1)) // each segment will be 10 messages
-      override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long]
+      override val logSegmentBytes = (10 * (setSize - 1)) // each segment will be 10 messages
+      override val logRetentionBytes = (5 * 10 * setSize + 10).asInstanceOf[Long]
       override val logRetentionHours = retentionHours
-      override val flushInterval = 100
+      override val logFlushIntervalMessages = 100
       override val logRollHours = maxRollInterval
     }
     logManager = new LogManager(config, scheduler, time)
@@ -158,11 +158,11 @@ class LogManagerTest extends JUnit3Suite {
     val props = TestUtils.createBrokerConfig(0, -1)
     logManager.shutdown()
     config = new KafkaConfig(props) {
-                   override val logFileSize = 1024 *1024 *1024
-                   override val flushSchedulerThreadRate = 50
-                   override val flushInterval = Int.MaxValue
+                   override val logSegmentBytes = 1024 *1024 *1024
+                   override val logFlushSchedulerIntervalMs = 50
+                   override val logFlushIntervalMessages = Int.MaxValue
                    override val logRollHours = maxRollInterval
-                   override val flushIntervalMap = Map("timebasedflush" -> 100)
+                   override val logFlushIntervalMsPerTopicMap = Map("timebasedflush" -> 100)
                  }
     logManager = new LogManager(config, scheduler, time)
     logManager.startup
@@ -173,7 +173,7 @@ class LogManagerTest extends JUnit3Suite {
     }
     val ellapsed = System.currentTimeMillis - log.getLastFlushedTime
     assertTrue("The last flush time has to be within defaultflushInterval of current time (was %d)".format(ellapsed),
-                     ellapsed < 2*config.flushSchedulerThreadRate)
+                     ellapsed < 2*config.logFlushSchedulerIntervalMs)
   }
   
   @Test
@@ -183,7 +183,7 @@ class LogManagerTest extends JUnit3Suite {
     val dirs = Seq(TestUtils.tempDir().getAbsolutePath, 
                    TestUtils.tempDir().getAbsolutePath, 
                    TestUtils.tempDir().getAbsolutePath)
-    props.put("log.directories", dirs.mkString(","))
+    props.put("log.dirs", dirs.mkString(","))
     logManager.shutdown()
     logManager = new LogManager(new KafkaConfig(props), scheduler, time)
     

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4095319/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
index c6ea3b6..b343d98 100644
--- a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
@@ -198,15 +198,15 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   private def createBrokerConfig(nodeId: Int, port: Int): Properties = {
     val props = new Properties
-    props.put("brokerid", nodeId.toString)
+    props.put("broker.id", nodeId.toString)
     props.put("port", port.toString)
     props.put("log.dir", getLogDir.getAbsolutePath)
-    props.put("log.flush.interval", "1")
+    props.put("log.flush.interval.messages", "1")
     props.put("enable.zookeeper", "false")
     props.put("num.partitions", "20")
     props.put("log.retention.hours", "10")
     props.put("log.cleanup.interval.mins", "5")
-    props.put("log.file.size", logSize.toString)
+    props.put("log.segment.bytes", logSize.toString)
     props.put("zk.connect", zkConnect.toString)
     props
   }


Mime
View raw message