kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3264; Deprecate the old Scala consumer (KIP-109)
Date Fri, 02 Jun 2017 11:30:55 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 99a3f8165 -> 0d71cf108


KAFKA-3264; Deprecate the old Scala consumer (KIP-109)

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

This patch had conflicts when merged, resolved by
Committer: Ismael Juma <ismael@juma.me.uk>

Closes #2328 from vahidhashemian/KAFKA-3264

(cherry picked from commit f85c18032be5d583c2ca8f9fa1eb1b5ec55e59fa)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0d71cf10
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0d71cf10
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0d71cf10

Branch: refs/heads/0.11.0
Commit: 0d71cf108e11c49b3399065071a6a43b3e20cd21
Parents: 99a3f81
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Authored: Fri Jun 2 11:28:13 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Jun 2 12:31:06 2017 +0100

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/AdminUtils.scala |  4 +++
 .../kafka/admin/ConsumerGroupCommand.scala      |  1 +
 .../src/main/scala/kafka/api/FetchRequest.scala |  3 ++
 .../main/scala/kafka/client/ClientUtils.scala   |  3 +-
 .../scala/kafka/consumer/BaseConsumer.scala     |  8 +++++
 .../scala/kafka/consumer/ConsumerConfig.scala   | 22 +++++++-----
 .../kafka/consumer/ConsumerConnector.scala      | 14 +++++---
 .../kafka/consumer/ConsumerFetcherManager.scala |  1 +
 .../kafka/consumer/ConsumerFetcherThread.scala  |  4 +++
 .../scala/kafka/consumer/ConsumerIterator.scala |  3 ++
 .../kafka/consumer/ConsumerTopicStats.scala     |  3 ++
 .../consumer/FetchRequestAndResponseStats.scala |  3 ++
 .../scala/kafka/consumer/FetchedDataChunk.scala |  3 +-
 .../main/scala/kafka/consumer/KafkaStream.scala |  2 ++
 .../kafka/consumer/PartitionAssignor.scala      |  9 +++++
 .../kafka/consumer/PartitionTopicInfo.scala     |  6 ++--
 .../scala/kafka/consumer/SimpleConsumer.scala   |  2 ++
 .../main/scala/kafka/consumer/TopicCount.scala  |  7 +++-
 .../kafka/consumer/TopicEventHandler.scala      |  1 +
 .../main/scala/kafka/consumer/TopicFilter.scala |  5 +--
 .../consumer/ZookeeperConsumerConnector.scala   |  2 ++
 .../consumer/ZookeeperTopicEventWatcher.scala   |  1 +
 .../javaapi/consumer/ConsumerConnector.java     |  4 +++
 .../consumer/ConsumerRebalanceListener.java     |  4 +++
 .../kafka/javaapi/consumer/SimpleConsumer.scala |  6 ++--
 .../consumer/ZookeeperConsumerConnector.scala   |  1 +
 .../scala/kafka/metrics/KafkaMetricsGroup.scala |  1 +
 .../scala/kafka/network/BlockingChannel.scala   |  2 ++
 .../kafka/server/AbstractFetcherThread.scala    |  2 +-
 .../kafka/tools/ConsumerOffsetChecker.scala     |  1 +
 .../scala/kafka/tools/ExportZkOffsets.scala     | 36 +++++++++++---------
 .../kafka/tools/SimpleConsumerPerformance.scala |  3 ++
 .../scala/kafka/tools/SimpleConsumerShell.scala |  6 ++--
 .../scala/kafka/tools/UpdateOffsetsInZK.scala   | 10 +++---
 .../kafka/tools/VerifyConsumerRebalance.scala   |  2 ++
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  9 +++++
 .../scala/kafka/tools/TestLogCleaning.scala     | 35 ++++++++++---------
 .../kafka/admin/DeleteConsumerGroupTest.scala   |  1 +
 .../kafka/admin/DescribeConsumerGroupTest.scala |  4 +++
 .../kafka/consumer/ConsumerIteratorTest.scala   |  5 +--
 .../kafka/consumer/PartitionAssignorTest.scala  |  3 +-
 .../unit/kafka/consumer/TopicFilterTest.scala   |  5 +--
 .../unit/kafka/integration/FetcherTest.scala    |  1 +
 .../test/scala/unit/kafka/utils/TestUtils.scala |  5 ++-
 .../java/kafka/examples/SimpleConsumerDemo.java |  4 +++
 45 files changed, 187 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index bd8771b..e252b01 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -352,6 +352,7 @@ object AdminUtils extends Logging with AdminUtilities {
       }
     }
 
+  @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
   def isConsumerGroupActive(zkUtils: ZkUtils, group: String) = {
     zkUtils.getConsumersInGroup(group).nonEmpty
   }
@@ -363,6 +364,7 @@ object AdminUtils extends Logging with AdminUtilities {
    * @param group Consumer group
    * @return whether or not we deleted the consumer group information
    */
+  @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
   def deleteConsumerGroupInZK(zkUtils: ZkUtils, group: String) = {
     if (!isConsumerGroupActive(zkUtils, group)) {
       val dir = new ZKGroupDirs(group)
@@ -381,6 +383,7 @@ object AdminUtils extends Logging with AdminUtilities {
    * @param topic Topic of the consumer group information we wish to delete
    * @return whether or not we deleted the consumer group information for the given topic
    */
+  @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
   def deleteConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, group: String, topic: String) = {
     val topics = zkUtils.getTopicsByConsumerGroup(group)
     if (topics == Seq(topic)) {
@@ -401,6 +404,7 @@ object AdminUtils extends Logging with AdminUtilities {
    * @param zkUtils Zookeeper utilities
    * @param topic Topic of the consumer group information we wish to delete
    */
+  @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
   def deleteAllConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, topic: String) {
     val groups = zkUtils.getAllConsumerGroupsForTopic(topic)
     groups.foreach(group => deleteConsumerGroupInfoForTopicInZK(zkUtils, group, topic))

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 2f26f57..4c1f593 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -231,6 +231,7 @@ object ConsumerGroupCommand extends Logging {
     def exportOffsetsToReset(assignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): String = throw new UnsupportedOperationException
   }
 
+  @deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
   class ZkConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService {
 
     private val zkUtils = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 60284f7..ceed815 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -37,6 +37,7 @@ import scala.util.Random
 
 case class PartitionFetchInfo(offset: Long, fetchSize: Int)
 
+@deprecated("This object has been deprecated and will be removed in a future release.", "0.11.0.0")
 object FetchRequest {
 
   private val random = new Random
@@ -90,6 +91,7 @@ object FetchRequest {
 
 }
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
                         correlationId: Int = FetchRequest.DefaultCorrelationId,
                         clientId: String = ConsumerConfig.DefaultClientId,
@@ -227,6 +229,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
   }
 }
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 @nonthreadsafe
 class FetchRequestBuilder() {
   private val correlationId = new AtomicInteger(0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 5508549..3a2806f 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -34,7 +34,8 @@ import java.io.IOException
  /**
  * Helper functions common to clients (producer, consumer, or admin)
  */
-object ClientUtils extends Logging{
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
+object ClientUtils extends Logging {
 
   /**
    * Used by the producer to send a metadata request since it has access to the ProducerConfig

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index b1a203f..cec74d0 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -34,6 +34,8 @@ import org.apache.kafka.common.header.internals.RecordHeaders
  * this class should be removed (along with BaseProducer)
  * once we deprecate old consumer
  */
+@deprecated("This trait has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.consumer.KafkaConsumer instead.", "0.11.0.0")
 trait BaseConsumer {
   def receive(): BaseConsumerRecord
   def stop()
@@ -41,6 +43,8 @@ trait BaseConsumer {
   def commit()
 }
 
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.consumer.ConsumerRecord instead.", "0.11.0.0")
 case class BaseConsumerRecord(topic: String,
                               partition: Int,
                               offset: Long,
@@ -50,6 +54,8 @@ case class BaseConsumerRecord(topic: String,
                               value: Array[Byte],
                               headers: Headers = new RecordHeaders())
 
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.consumer.KafkaConsumer instead.", "0.11.0.0")
 class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset: Option[Long], whitelist: Option[String], consumerProps: Properties, val timeoutMs: Long = Long.MaxValue) extends BaseConsumer {
   import org.apache.kafka.clients.consumer.KafkaConsumer
 
@@ -117,6 +123,8 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset:
   }
 }
 
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.consumer.KafkaConsumer instead.", "0.11.0.0")
 class OldConsumer(topicFilter: TopicFilter, consumerProps: Properties) extends BaseConsumer {
   import kafka.serializer.DefaultDecoder
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index 9eff3ed..bea0307 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -23,6 +23,8 @@ import kafka.utils._
 import kafka.common.{InvalidConfigException, Config}
 import java.util.Locale
 
+@deprecated("This object has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.consumer.ConsumerConfig instead.", "0.11.0.0")
 object ConsumerConfig extends Config {
   val RefreshMetadataBackoffMs = 200
   val SocketTimeout = 30 * 1000
@@ -99,6 +101,8 @@ object ConsumerConfig extends Config {
   }
 }
 
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.consumer.ConsumerConfig instead.", "0.11.0.0")
 class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(props) {
   import ConsumerConfig._
 
@@ -116,19 +120,19 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
 
   /** the socket timeout for network requests. Its value should be at least fetch.wait.max.ms. */
   val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout)
-  
+
   /** the socket receive buffer for network requests */
   val socketReceiveBufferBytes = props.getInt("socket.receive.buffer.bytes", SocketBufferSize)
-  
+
   /** the number of bytes of messages to attempt to fetch from each partition */
   val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize)
 
   /** the number threads used to fetch data */
   val numConsumerFetchers = props.getInt("num.consumer.fetchers", NumConsumerFetchers)
-  
+
   /** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */
   val autoCommitEnable = props.getBoolean("auto.commit.enable", AutoCommit)
-  
+
   /** the frequency in ms that the consumer offsets are committed to zookeeper */
   val autoCommitIntervalMs = props.getInt("auto.commit.interval.ms", AutoCommitInterval)
 
@@ -137,10 +141,10 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
 
   /** max number of retries during rebalance */
   val rebalanceMaxRetries = props.getInt("rebalance.max.retries", MaxRebalanceRetries)
-  
+
   /** the minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */
   val fetchMinBytes = props.getInt("fetch.min.bytes", MinFetchBytes)
-  
+
   /** the maximum amount of data the server should return for a fetch request */
   val fetchMaxBytes = props.getInt("fetch.max.bytes", MaxFetchBytes)
 
@@ -148,7 +152,7 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
   val fetchWaitMaxMs = props.getInt("fetch.wait.max.ms", MaxFetchWaitMs)
   require(fetchWaitMaxMs <= socketTimeoutMs, "socket.timeout.ms should always be at least fetch.wait.max.ms" +
     " to prevent unnecessary socket timeouts")
-  
+
   /** backoff time between retries during rebalance */
   val rebalanceBackoffMs = props.getInt("rebalance.backoff.ms", zkSyncTimeMs)
 
@@ -195,7 +199,7 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
 
   /** Select a strategy for assigning partitions to consumer streams. Possible values: range, roundrobin */
   val partitionAssignmentStrategy = props.getString("partition.assignment.strategy", DefaultPartitionAssignmentStrategy)
-  
+
   validate(this)
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
index 5a5190d..46fbab7 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package kafka.consumer
 
 import kafka.common.{OffsetAndMetadata, TopicAndPartition}
@@ -26,8 +27,9 @@ import kafka.serializer._
 /**
  *  Main interface for consumer
  */
+@deprecated("This trait has been deprecated and will be removed in a future release.", "0.11.0.0")
 trait ConsumerConnector {
-  
+
   /**
    *  Create a list of MessageStreams for each topic.
    *
@@ -37,7 +39,7 @@ trait ConsumerConnector {
    *          an iterator over message/metadata pairs.
    */
   def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, List[KafkaStream[Array[Byte],Array[Byte]]]]
-  
+
   /**
    *  Create a list of MessageStreams for each topic.
    *
@@ -52,7 +54,7 @@ trait ConsumerConnector {
                                 keyDecoder: Decoder[K],
                                 valueDecoder: Decoder[V])
     : Map[String,List[KafkaStream[K,V]]]
-  
+
   /**
    *  Create a list of message streams for all topics that match a given filter.
    *
@@ -73,7 +75,7 @@ trait ConsumerConnector {
    *  Commit the offsets of all broker partitions connected by this connector.
    */
   def commitOffsets(retryOnFailure: Boolean)
-  
+
   /**
    * KAFKA-1743: This method added for backward compatibility.
    */
@@ -90,13 +92,15 @@ trait ConsumerConnector {
    * @param listener The consumer rebalance listener to wire in
    */
   def setConsumerRebalanceListener(listener: ConsumerRebalanceListener)
-  
+
   /**
    *  Shut down the connector
    */
   def shutdown()
 }
 
+@deprecated("This object has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.consumer.Consumer instead.", "0.11.0.0")
 object Consumer extends Logging {
   /**
    *  Create a ConsumerConnector

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 57a97ef..51a7a04 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger
  *  Once ConsumerFetcherManager is created, startConnections() and stopAllConnections() can be called repeatedly
  *  until shutdown() is called.
  */
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 class ConsumerFetcherManager(private val consumerIdString: String,
                              private val config: ConsumerConfig,
                              private val zkUtils : ZkUtils)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 394132b..4f14570 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -30,6 +30,8 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.EpochEndOffset
 
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.consumer.internals.Fetcher instead.", "0.11.0.0")
 class ConsumerFetcherThread(name: String,
                             val config: ConsumerConfig,
                             sourceBroker: BrokerEndPoint,
@@ -120,6 +122,8 @@ class ConsumerFetcherThread(name: String,
   override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): Map[TopicPartition, Long] = { Map() }
 }
 
+@deprecated("This object has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.consumer.internals.Fetcher instead.", "0.11.0.0")
 object ConsumerFetcherThread {
 
   class FetchRequest(val underlying: kafka.api.FetchRequest) extends AbstractFetcherThread.FetchRequest {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index df98db7..9ca2253 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -30,6 +30,7 @@ import kafka.common.{KafkaException, MessageSizeTooLargeException}
  * The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
  *
  */
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk],
                              consumerTimeoutMs: Int,
                              private val keyDecoder: Decoder[K],
@@ -116,5 +117,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
   }
 }
 
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.common.errors.TimeoutException instead.", "0.11.0.0")
 class ConsumerTimeoutException() extends RuntimeException()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
index 01797ff..d13b327 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala
@@ -23,6 +23,7 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.common.{ClientIdTopic, ClientIdAllTopics, ClientIdAndTopic}
 
 @threadsafe
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 class ConsumerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
   val tags = metricId match {
     case ClientIdAndTopic(clientId, topic) => Map("clientId" -> clientId, "topic" -> topic)
@@ -37,6 +38,7 @@ class ConsumerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
  * Tracks metrics for each topic the given consumer client has consumed data from.
  * @param clientId The clientId of the given consumer client.
  */
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 class ConsumerTopicStats(clientId: String) extends Logging {
   private val valueFactory = (k: ClientIdAndTopic) => new ConsumerTopicMetrics(k)
   private val stats = new Pool[ClientIdAndTopic, ConsumerTopicMetrics](Some(valueFactory))
@@ -52,6 +54,7 @@ class ConsumerTopicStats(clientId: String) extends Logging {
 /**
  * Stores the topic stats information of each consumer client in a (clientId -> ConsumerTopicStats) map.
  */
+@deprecated("This object has been deprecated and will be removed in a future release.", "0.11.0.0")
 object ConsumerTopicStatsRegistry {
   private val valueFactory = (k: String) => new ConsumerTopicStats(k)
   private val globalStats = new Pool[String, ConsumerTopicStats](Some(valueFactory))

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
index 05ea9ac..462a85b 100644
--- a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
+++ b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
@@ -23,6 +23,7 @@ import kafka.common.{ClientIdAllBrokers, ClientIdBroker, ClientIdAndBroker}
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import kafka.utils.Pool
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 class FetchRequestAndResponseMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup {
   val tags = metricId match {
     case ClientIdAndBroker(clientId, brokerHost, brokerPort) =>
@@ -41,6 +42,7 @@ class FetchRequestAndResponseMetrics(metricId: ClientIdBroker) extends KafkaMetr
  * Tracks metrics of the requests made by a given consumer client to all brokers, and the responses obtained from the brokers.
  * @param clientId ClientId of the given consumer
  */
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 class FetchRequestAndResponseStats(clientId: String) {
   private val valueFactory = (k: ClientIdBroker) => new FetchRequestAndResponseMetrics(k)
   private val stats = new Pool[ClientIdBroker, FetchRequestAndResponseMetrics](Some(valueFactory))
@@ -56,6 +58,7 @@ class FetchRequestAndResponseStats(clientId: String) {
 /**
  * Stores the fetch request and response stats information of each consumer client in a (clientId -> FetchRequestAndResponseStats) map.
  */
+@deprecated("This object has been deprecated and will be removed in a future release.", "0.11.0.0")
 object FetchRequestAndResponseStatsRegistry {
   private val valueFactory = (k: String) => new FetchRequestAndResponseStats(k)
   private val globalStats = new Pool[String, FetchRequestAndResponseStats](Some(valueFactory))

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/consumer/FetchedDataChunk.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/FetchedDataChunk.scala b/core/src/main/scala/kafka/consumer/FetchedDataChunk.scala
index 4845fcd..91eb874 100644
--- a/core/src/main/scala/kafka/consumer/FetchedDataChunk.scala
+++ b/core/src/main/scala/kafka/consumer/FetchedDataChunk.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -19,6 +19,7 @@ package kafka.consumer
 
 import kafka.message.ByteBufferMessageSet
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 case class FetchedDataChunk(messages: ByteBufferMessageSet,
                             topicInfo: PartitionTopicInfo,
                             fetchOffset: Long)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/consumer/KafkaStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/KafkaStream.scala b/core/src/main/scala/kafka/consumer/KafkaStream.scala
index aebf3ea..faba42f 100644
--- a/core/src/main/scala/kafka/consumer/KafkaStream.scala
+++ b/core/src/main/scala/kafka/consumer/KafkaStream.scala
@@ -22,6 +22,8 @@ import java.util.concurrent.BlockingQueue
 import kafka.serializer.Decoder
 import kafka.message.MessageAndMetadata
 
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.streams.KafkaStreams instead.", "0.11.0.0")
 class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk],
                         consumerTimeoutMs: Int,
                         private val keyDecoder: Decoder[K],

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
index f02df35..52c3d8b 100755
--- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
@@ -23,6 +23,8 @@ import kafka.utils.{Pool, CoreUtils, ZkUtils, Logging}
 
 import scala.collection.mutable
 
+@deprecated("This trait has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.consumer.internals.PartitionAssignor instead.", "0.11.0.0")
 trait PartitionAssignor {
 
   /**
@@ -34,6 +36,8 @@ trait PartitionAssignor {
 
 }
 
+@deprecated("This object has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.consumer.internals.PartitionAssignor instead.", "0.11.0.0")
 object PartitionAssignor {
   def createInstance(assignmentStrategy: String) = assignmentStrategy match {
     case "roundrobin" => new RoundRobinAssignor()
@@ -41,6 +45,7 @@ object PartitionAssignor {
   }
 }
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 class AssignmentContext(group: String, val consumerId: String, excludeInternalTopics: Boolean, zkUtils: ZkUtils) {
   val myTopicThreadIds: collection.Map[String, collection.Set[ConsumerThreadId]] = {
     val myTopicCount = TopicCount.constructTopicCount(group, consumerId, zkUtils, excludeInternalTopics)
@@ -63,6 +68,8 @@ class AssignmentContext(group: String, val consumerId: String, excludeInternalTo
  * instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts
  * will be within a delta of exactly one across all consumer threads.)
  */
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.consumer.RoundRobinAssignor instead.", "0.11.0.0")
 class RoundRobinAssignor() extends PartitionAssignor with Logging {
 
   def assign(ctx: AssignmentContext) = {
@@ -117,6 +124,8 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging {
  * will get at least one partition and the first consumer thread will get one extra partition. So the assignment will be:
  * p0 -> C1-0, p1 -> C1-0, p2 -> C1-1, p3 -> C2-0, p4 -> C2-1
  */
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.consumer.RangeAssignor instead.", "0.11.0.0")
 class RangeAssignor() extends PartitionAssignor with Logging {
 
   def assign(ctx: AssignmentContext) = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
index c7c7836..9a0879a 100644
--- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic._
 import kafka.message._
 import kafka.utils.Logging
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 class PartitionTopicInfo(val topic: String,
                          val partitionId: Int,
                          private val chunkQueue: BlockingQueue[FetchedDataChunk],
@@ -66,11 +67,12 @@ class PartitionTopicInfo(val topic: String,
       chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))
     }
   }
-  
+
   override def toString: String = topic + ":" + partitionId.toString + ": fetched offset = " + fetchedOffset.get +
     ": consumed offset = " + consumedOffset.get
 }
 
+@deprecated("This object has been deprecated and will be removed in a future release.", "0.11.0.0")
 object PartitionTopicInfo {
   val InvalidOffset = -1L
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index e93f08c..b30c9ce 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -32,6 +32,8 @@ import org.apache.kafka.common.utils.Utils._
 /**
  * A consumer of kafka messages
  */
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.consumer.KafkaConsumer instead.", "0.11.0.0")
 @threadsafe
 class SimpleConsumer(val host: String,
                      val port: Int,

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/consumer/TopicCount.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index f423f8c..68beaed 100755
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -21,6 +21,7 @@ import scala.collection._
 import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging, CoreUtils}
 import kafka.common.KafkaException
 
+@deprecated("This trait has been deprecated and will be removed in a future release.", "0.11.0.0")
 private[kafka] trait TopicCount {
 
   def getConsumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]]
@@ -29,12 +30,14 @@ private[kafka] trait TopicCount {
 
 }
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 case class ConsumerThreadId(consumer: String, threadId: Int) extends Ordered[ConsumerThreadId] {
   override def toString = "%s-%d".format(consumer, threadId)
 
   def compare(that: ConsumerThreadId) = toString.compare(that.toString)
 }
 
+@deprecated("This object has been deprecated and will be removed in a future release.", "0.11.0.0")
 private[kafka] object TopicCount extends Logging {
   val whiteListPattern = "white_list"
   val blackListPattern = "black_list"
@@ -105,6 +108,7 @@ private[kafka] object TopicCount extends Logging {
 
 }
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 private[kafka] class StaticTopicCount(val consumerIdString: String,
                                 val topicCountMap: Map[String, Int])
                                 extends TopicCount {
@@ -116,6 +120,7 @@ private[kafka] class StaticTopicCount(val consumerIdString: String,
   def pattern = TopicCount.staticPattern
 }
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 private[kafka] class WildcardTopicCount(zkUtils: ZkUtils,
                                         consumerIdString: String,
                                         topicFilter: TopicFilter,

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/consumer/TopicEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicEventHandler.scala b/core/src/main/scala/kafka/consumer/TopicEventHandler.scala
index 2423f0a..2852e9b 100644
--- a/core/src/main/scala/kafka/consumer/TopicEventHandler.scala
+++ b/core/src/main/scala/kafka/consumer/TopicEventHandler.scala
@@ -17,6 +17,7 @@
 
 package kafka.consumer
 
+@deprecated("This trait has been deprecated and will be removed in a future release.", "0.11.0.0")
 trait TopicEventHandler[T] {
 
   def handleTopicEvent(allTopics: Seq[T])

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/consumer/TopicFilter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicFilter.scala b/core/src/main/scala/kafka/consumer/TopicFilter.scala
index 69d7455..b71b01a 100644
--- a/core/src/main/scala/kafka/consumer/TopicFilter.scala
+++ b/core/src/main/scala/kafka/consumer/TopicFilter.scala
@@ -22,6 +22,7 @@ import java.util.regex.{Pattern, PatternSyntaxException}
 
 import org.apache.kafka.common.internals.Topic
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 sealed abstract class TopicFilter(rawRegex: String) extends Logging {
 
   val regex = rawRegex
@@ -44,6 +45,7 @@ sealed abstract class TopicFilter(rawRegex: String) extends Logging {
   def isTopicAllowed(topic: String, excludeInternalTopics: Boolean): Boolean
 }
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) {
   override def isTopicAllowed(topic: String, excludeInternalTopics: Boolean) = {
     val allowed = topic.matches(regex) && !(Topic.isInternal(topic) && excludeInternalTopics)
@@ -53,10 +55,9 @@ case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) {
 
     allowed
   }
-
-
 }
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 case class Blacklist(rawRegex: String) extends TopicFilter(rawRegex) {
   override def isTopicAllowed(topic: String, excludeInternalTopics: Boolean) = {
     val allowed = (!topic.matches(regex)) && !(Topic.isInternal(topic) && excludeInternalTopics)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index acc3cdf..ba2fce1 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -80,10 +80,12 @@ import scala.collection.JavaConverters._
  * Each consumer tracks the offset of the latest message consumed for each partition.
  *
  */
+@deprecated("This object has been deprecated and will be removed in a future release.", "0.11.0.0")
 private[kafka] object ZookeeperConsumerConnector {
   val shutdownCommand: FetchedDataChunk = new FetchedDataChunk(null, null, -1L)
 }
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                                 val enableFetcher: Boolean) // for testing only
         extends ConsumerConnector with Logging with KafkaMetricsGroup {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
index d00f465..1a86227 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
@@ -22,6 +22,7 @@ import kafka.utils.{ZkUtils, Logging}
 import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener}
 import org.apache.zookeeper.Watcher.Event.KeeperState
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 class ZookeeperTopicEventWatcher(val zkUtils: ZkUtils,
     val eventHandler: TopicEventHandler[String]) extends Logging {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
index 21101aa..def977c 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
+++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
@@ -25,6 +25,10 @@ import kafka.serializer.Decoder;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * @deprecated since 0.11.0.0, this interface will be removed in a future release.
+ */
+@Deprecated
 public interface ConsumerConnector {
     /**
      *  Create a list of MessageStreams of type T for each topic.

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java
index 9c899ed..ff23760 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java
+++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java
@@ -25,6 +25,10 @@ import java.util.Set;
  * This listener is used for execution of tasks defined by user when a consumer rebalance
  * occurs in {@link kafka.consumer.ZookeeperConsumerConnector}
  */
+/**
+ * @deprecated since 0.11.0.0, this interface will be removed in a future release.
+ */
+@Deprecated
 public interface ConsumerRebalanceListener {
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
index abf6069..188babb 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
@@ -24,6 +24,8 @@ import kafka.javaapi.OffsetRequest
 /**
  * A consumer of kafka messages
  */
+@deprecated("This class has been deprecated and will be removed in a future release. " +
+            "Please use org.apache.kafka.clients.consumer.KafkaConsumer instead.", "0.11.0.0")
 @threadsafe
 class SimpleConsumer(val host: String,
                      val port: Int,
@@ -46,7 +48,7 @@ class SimpleConsumer(val host: String,
     import kafka.javaapi.Implicits._
     underlying.fetch(request)
   }
-  
+
   /**
    *  Fetch a set of messages from a topic.
    *
@@ -59,7 +61,7 @@ class SimpleConsumer(val host: String,
 
   /**
    *  Fetch metadata for a sequence of topics.
-   *  
+   *
    *  @param request specifies the versionId, clientId, sequence of topics.
    *  @return metadata for each topic in the request.
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
index e145075..e3b2ec1 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
@@ -59,6 +59,7 @@ import scala.collection.JavaConverters._
  *
 */
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                                 val enableFetcher: Boolean) // for testing only
     extends ConsumerConnector {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
index ede49c4..0847625 100644
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
@@ -181,6 +181,7 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
     else None
   }
 
+  @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
   def removeAllConsumerMetrics(clientId: String) {
     FetchRequestAndResponseStatsRegistry.removeConsumerFetchRequestAndResponseStats(clientId)
     ConsumerTopicStatsRegistry.removeConsumerTopicStat(clientId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/network/BlockingChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala
index 0f10577..69fd054 100644
--- a/core/src/main/scala/kafka/network/BlockingChannel.scala
+++ b/core/src/main/scala/kafka/network/BlockingChannel.scala
@@ -25,6 +25,7 @@ import kafka.utils.{Logging, nonthreadsafe}
 import org.apache.kafka.common.network.NetworkReceive
 
 
+@deprecated("This object has been deprecated and will be removed in a future release.", "0.11.0.0")
 object BlockingChannel{
   val UseDefaultBufferSize = -1
 }
@@ -34,6 +35,7 @@ object BlockingChannel{
  *
  */
 @nonthreadsafe
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 class BlockingChannel( val host: String, 
                        val port: Int, 
                        val readBufferSize: Int, 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index cb0680c..b17d255 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -241,7 +241,7 @@ abstract class AbstractFetcherThread(name: String,
         !partitionStates.contains(tp)
       }.map { case (tp, offset) =>
         val fetchState =
-          if (PartitionTopicInfo.isOffsetInvalid(offset))
+          if (offset < 0)
             new PartitionFetchState(handleOffsetOutOfRange(tp), includeLogTruncation)
           else
             new PartitionFetchState(offset, includeLogTruncation)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index 35d7ba4..d5e29ac 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -34,6 +34,7 @@ import kafka.api.PartitionOffsetRequestInfo
 import org.I0Itec.zkclient.exception.ZkNoNodeException
 import org.apache.kafka.common.network.ListenerName
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 object ConsumerOffsetChecker extends Logging {
 
   private val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map()

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
index 3de530f..49593c2 100644
--- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -30,23 +30,25 @@ import scala.collection.JavaConverters._
 /**
  *  A utility that retrieves the offset of broker partitions in ZK and
  *  prints to an output file in the following format:
- *  
+ *
  *  /consumers/group1/offsets/topic1/1-0:286894308
  *  /consumers/group1/offsets/topic1/2-0:284803985
- *  
+ *
  *  This utility expects 3 arguments:
  *  1. Zk host:port string
  *  2. group name (all groups implied if omitted)
  *  3. output filename
- *     
+ *
  *  To print debug message, add the following line to log4j.properties:
  *  log4j.logger.kafka.tools.ExportZkOffsets$=DEBUG
  *  (for eclipse debugging, copy log4j.properties to the binary directory in "core" such as core/bin)
  */
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 object ExportZkOffsets extends Logging {
 
   def main(args: Array[String]) {
     val parser = new OptionParser(false)
+    warn("WARNING: ExportZkOffsets is deprecated and will be dropped in a future release following 0.11.0.0.")
 
     val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.")
                             .withRequiredArg()
@@ -59,19 +61,19 @@ object ExportZkOffsets extends Logging {
                             .withRequiredArg()
                             .ofType(classOf[String])
     parser.accepts("help", "Print this message.")
-    
+
     if(args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "Export consumer offsets to an output file.")
-            
+
     val options = parser.parse(args : _*)
-    
+
     if (options.has("help")) {
        parser.printHelpOn(System.out)
        Exit.exit(0)
     }
-    
+
     CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, outFileOpt)
-    
+
     val zkConnect  = options.valueOf(zkConnectOpt)
     val groups     = options.valuesOf(groupOpt)
     val outfile    = options.valueOf(outFileOpt)
@@ -79,13 +81,13 @@ object ExportZkOffsets extends Logging {
     var zkUtils   : ZkUtils    = null
     val fileWriter : OutputStreamWriter =
         new OutputStreamWriter(new FileOutputStream(outfile), StandardCharsets.UTF_8)
-    
+
     try {
       zkUtils = ZkUtils(zkConnect,
                         30000,
                         30000,
                         JaasUtils.isZkSecurityEnabled())
-      
+
       var consumerGroups: Seq[String] = null
 
       if (groups.size == 0) {
@@ -94,13 +96,13 @@ object ExportZkOffsets extends Logging {
       else {
         consumerGroups = groups.asScala
       }
-      
+
       for (consumerGrp <- consumerGroups) {
         val topicsList = getTopicsList(zkUtils, consumerGrp)
-        
+
         for (topic <- topicsList) {
           val bidPidList = getBrokeridPartition(zkUtils, consumerGrp, topic)
-          
+
           for (bidPid <- bidPidList) {
             val zkGrpTpDir = new ZKGroupTopicDirs(consumerGrp,topic)
             val offsetPath = zkGrpTpDir.consumerOffsetDir + "/" + bidPid
@@ -113,9 +115,9 @@ object ExportZkOffsets extends Logging {
             }
           }
         }
-      }      
+      }
     }
-    finally {      
+    finally {
       fileWriter.flush()
       fileWriter.close()
     }
@@ -123,7 +125,7 @@ object ExportZkOffsets extends Logging {
 
   private def getBrokeridPartition(zkUtils: ZkUtils, consumerGroup: String, topic: String): List[String] =
     zkUtils.getChildrenParentMayNotExist("/consumers/%s/offsets/%s".format(consumerGroup, topic)).toList
-  
+
   private def getTopicsList(zkUtils: ZkUtils, consumerGroup: String): List[String] =
     zkUtils.getChildren("/consumers/%s/offsets".format(consumerGroup)).toList
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
index a925ea8..f5d64b4 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
@@ -31,11 +31,14 @@ import org.apache.kafka.common.utils.Time
 /**
  * Performance test for the simple consumer
  */
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 object SimpleConsumerPerformance {
 
   private val logger = Logger.getLogger(getClass())
 
   def main(args: Array[String]) {
+    logger.warn("WARNING: SimpleConsumerPerformance is deprecated and will be dropped in a future release following 0.11.0.0.")
+
     val config = new ConsumerPerfConfig(args)
     logger.info("Starting SimpleConsumer...")
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index 098826c..b3643a3 100755
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -32,11 +32,13 @@ import org.apache.kafka.common.utils.Utils
 /**
  * Command line program to dump out messages to standard out using the simple consumer
  */
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 object SimpleConsumerShell extends Logging {
 
   def UseLeaderReplica = -1
 
   def main(args: Array[String]): Unit = {
+    warn("WARNING: SimpleConsumerShell is deprecated and will be dropped in a future release following 0.11.0.0.")
 
     val parser = new OptionParser(false)
     val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.")
@@ -96,7 +98,7 @@ object SimpleConsumerShell extends Logging {
         "skip it instead of halt.")
     val noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend",
         "If set, when the simple consumer reaches the end of the Log, it will stop, not waiting for new produced messages")
-        
+
     if(args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "A low-level tool for fetching data directly from a particular replica.")
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
index c599d30..0261254 100755
--- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
+++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -17,11 +17,10 @@
 
 package kafka.tools
 
-import org.I0Itec.zkclient.ZkClient
 import kafka.consumer.{ConsumerConfig, SimpleConsumer}
 import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
 import kafka.common.{KafkaException, TopicAndPartition}
-import kafka.utils.{CoreUtils, Exit, ZKGroupTopicDirs, ZkUtils}
+import kafka.utils.{Exit, Logging, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.security.JaasUtils
@@ -30,11 +29,14 @@ import org.apache.kafka.common.utils.Utils
 /**
  *  A utility that updates the offset of every broker partition to the offset of earliest or latest log segment file, in ZK.
  */
-object UpdateOffsetsInZK {
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
+object UpdateOffsetsInZK extends Logging {
   val Earliest = "earliest"
   val Latest = "latest"
 
   def main(args: Array[String]) {
+    warn("WARNING: UpdateOffsetsInZK is deprecated and will be dropped in releases following 0.11.0.0.")
+
     if(args.length < 3)
       usage
     val config = new ConsumerConfig(Utils.loadProps(args(1)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
index b80b7da..1645957 100644
--- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
+++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
@@ -21,9 +21,11 @@ import joptsimple.OptionParser
 import org.apache.kafka.common.security._
 import kafka.utils.{CommandLineUtils, Exit, Logging, ZKGroupTopicDirs, ZkUtils}
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 object VerifyConsumerRebalance extends Logging {
   def main(args: Array[String]) {
     val parser = new OptionParser(false)
+    warn("WARNING: VerifyConsumerRebalance is deprecated and will be dropped in a future release following 0.11.0.0.")
 
     val zkConnectOpt = parser.accepts("zookeeper.connect", "ZooKeeper connect string.").
       withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String])

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index ac497c4..899b7c3 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -421,6 +421,7 @@ class ZkUtils(val zkClient: ZkClient,
     }
   }
 
+  @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
   def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = {
     val topicDirs = new ZKGroupTopicDirs(group, topic)
     topicDirs.consumerOwnerDir + "/" + partition
@@ -823,11 +824,13 @@ class ZkUtils(val zkClient: ZkClient,
     zkClient.delete(brokerPartTopicPath)
   }
 
+  @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
   def getConsumersInGroup(group: String): Seq[String] = {
     val dirs = new ZKGroupDirs(group)
     getChildren(dirs.consumerRegistryDir)
   }
 
+  @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
   def getConsumersPerTopic(group: String, excludeInternalTopics: Boolean): mutable.Map[String, List[ConsumerThreadId]] = {
     val dirs = new ZKGroupDirs(group)
     val consumers = getChildrenParentMayNotExist(dirs.consumerRegistryDir)
@@ -847,6 +850,7 @@ class ZkUtils(val zkClient: ZkClient,
     consumersPerTopicMap
   }
 
+  @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
   def getTopicsPerMemberId(group: String, excludeInternalTopics: Boolean = true): Map[String, List[String]] = {
     val dirs = new ZKGroupDirs(group)
     val memberIds = getChildrenParentMayNotExist(dirs.consumerRegistryDir)
@@ -916,14 +920,17 @@ class ZkUtils(val zkClient: ZkClient,
     }
   }
 
+  @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
   def getConsumerGroups() = {
     getChildren(ConsumersPath)
   }
 
+  @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
   def getTopicsByConsumerGroup(consumerGroup:String) = {
     getChildrenParentMayNotExist(new ZKGroupDirs(consumerGroup).consumerGroupOwnersDir)
   }
 
+  @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
   def getAllConsumerGroupsForTopic(topic: String): Set[String] = {
     val groups = getChildrenParentMayNotExist(ConsumersPath)
     if (groups == null) Set.empty
@@ -957,6 +964,7 @@ private object ZKStringSerializer extends ZkSerializer {
   }
 }
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 class ZKGroupDirs(val group: String) {
   def consumerDir = ConsumersPath
   def consumerGroupDir = consumerDir + "/" + group
@@ -965,6 +973,7 @@ class ZKGroupDirs(val group: String) {
   def consumerGroupOwnersDir = consumerGroupDir + "/owners"
 }
 
+@deprecated("This class has been deprecated and will be removed in a future release.", "0.11.0.0")
 class ZKGroupTopicDirs(group: String, topic: String) extends ZKGroupDirs(group) {
   def consumerOffsetDir = consumerGroupOffsetsDir + "/" + topic
   def consumerOwnerDir = consumerGroupOwnersDir + "/" + topic

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/test/scala/kafka/tools/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/TestLogCleaning.scala b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
index 0725601..4ad6629 100755
--- a/core/src/test/scala/kafka/tools/TestLogCleaning.scala
+++ b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
@@ -33,16 +33,16 @@ import scala.collection.JavaConverters._
 
 /**
  * This is a torture test that runs against an existing broker. Here is how it works:
- * 
+ *
  * It produces a series of specially formatted messages to one or more partitions. Each message it produces
  * it logs out to a text file. The messages have a limited set of keys, so there is duplication in the key space.
- * 
+ *
  * The broker will clean its log as the test runs.
- * 
+ *
  * When the specified number of messages have been produced we create a consumer and consume all the messages in the topic
  * and write that out to another text file.
- * 
- * Using a stable unix sort we sort both the producer log of what was sent and the consumer log of what was retrieved by the message key. 
+ *
+ * Using a stable unix sort we sort both the producer log of what was sent and the consumer log of what was retrieved by the message key.
  * Then we compare the final message in both logs for each key. If this final message is not the same for all keys we
  * print an error and exit with exit code 1, otherwise we print the size reduction and exit with exit code 0.
  */
@@ -129,13 +129,13 @@ object TestLogCleaning {
     val consumedLines = lineCount(consumedDataFile)
     val reduction = 1.0 - consumedLines.toDouble/producedLines.toDouble
     println("%d rows of data produced, %d rows of data consumed (%.1f%% reduction).".format(producedLines, consumedLines, 100 * reduction))
-    
+
     println("De-duplicating and validating output files...")
     validateOutput(producedDataFile, consumedDataFile)
     producedDataFile.delete()
     consumedDataFile.delete()
   }
-  
+
   def dumpLog(dir: File) {
     require(dir.exists, "Non-existent directory: " + dir.getAbsolutePath)
     for (file <- dir.list.sorted; if file.endsWith(Log.LogFileSuffix)) {
@@ -151,9 +151,9 @@ object TestLogCleaning {
       }
     }
   }
-  
+
   def lineCount(file: File): Int = io.Source.fromFile(file).getLines.size
-  
+
   def validateOutput(producedDataFile: File, consumedDataFile: File) {
     val producedReader = externalSort(producedDataFile)
     val consumedReader = externalSort(consumedDataFile)
@@ -186,7 +186,7 @@ object TestLogCleaning {
     producedDedupedFile.delete()
     consumedDedupedFile.delete()
   }
-  
+
   def valuesIterator(reader: BufferedReader) = {
     new IteratorTemplate[TestRecord] {
       def makeNext(): TestRecord = {
@@ -200,7 +200,7 @@ object TestLogCleaning {
       }
     }
   }
-  
+
   def readNext(reader: BufferedReader): TestRecord = {
     var line = reader.readLine()
     if(line == null)
@@ -218,14 +218,14 @@ object TestLogCleaning {
     }
     null
   }
-  
+
   def peekLine(reader: BufferedReader) = {
     reader.mark(4096)
     val line = reader.readLine
     reader.reset()
     line
   }
-  
+
   def externalSort(file: File): BufferedReader = {
     val builder = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%", "--temporary-directory=" + System.getProperty("java.io.tmpdir"), file.getAbsolutePath)
     val process = builder.start()
@@ -265,7 +265,7 @@ object TestLogCleaning {
       val topic = topics((i % topics.length).toInt)
       val key = rand.nextInt(keyCount)
       val delete = i % 100 < percentDeletes
-      val msg = 
+      val msg =
         if(delete)
           new ProducerRecord[Array[Byte],Array[Byte]](topic, key.toString.getBytes(), null)
         else
@@ -278,7 +278,8 @@ object TestLogCleaning {
     producer.close()
     producedFile
   }
-  
+
+  @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
   def makeConsumer(zkUrl: String, topics: Array[String]): ZookeeperConsumerConnector = {
     val consumerProps = new Properties
     consumerProps.setProperty("group.id", "log-cleaner-test-" + new Random().nextInt(Int.MaxValue))
@@ -287,7 +288,7 @@ object TestLogCleaning {
     consumerProps.setProperty("auto.offset.reset", "smallest")
     new ZookeeperConsumerConnector(new ConsumerConfig(consumerProps))
   }
-  
+
   def consumeMessages(zkUrl: String, topics: Array[String]): File = {
     val connector = makeConsumer(zkUrl, topics)
     val streams = connector.createMessageStreams(topics.map(topic => (topic, 1)).toMap, new StringDecoder, new StringDecoder)
@@ -311,7 +312,7 @@ object TestLogCleaning {
     connector.shutdown()
     consumedFile
   }
-  
+
 }
 
 case class TestRecord(topic: String, key: Int, value: Long, delete: Boolean) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
index 9a20a1d..affdf01 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import kafka.integration.KafkaServerTestHarness
 
 
+@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
 class DeleteConsumerGroupTest extends KafkaServerTestHarness {
   def generateConfigs() = TestUtils.createBrokerConfigs(3, zkConnect, false, true).map(KafkaConfig.fromProps)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 2bceeaa..6a68e52 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -61,6 +61,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
   }
 
   @Test
+  @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
   def testDescribeNonExistingGroup() {
     // mocks
     props.setProperty("zookeeper.connect", zkConnect)
@@ -82,6 +83,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
   }
 
   @Test
+  @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
   def testDescribeExistingGroup() {
     // mocks
     props.setProperty("zookeeper.connect", zkConnect)
@@ -108,6 +110,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
   }
 
   @Test
+  @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
   def testDescribeExistingGroupWithNoMembers() {
     // mocks
     props.setProperty("zookeeper.connect", zkConnect)
@@ -141,6 +144,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
   }
 
   @Test
+  @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
   def testDescribeConsumersWithNoAssignedPartitions() {
     // mocks
     props.setProperty("zookeeper.connect", zkConnect)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index b7fc657..0d38e10 100755
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -33,6 +33,7 @@ import org.junit.{Before, Test}
 import kafka.serializer._
 import kafka.integration.KafkaServerTestHarness
 
+@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
 class ConsumerIteratorTest extends KafkaServerTestHarness {
 
   val numNodes = 1
@@ -72,9 +73,9 @@ class ConsumerIteratorTest extends KafkaServerTestHarness {
     assertEquals(1, queue.size)
     queue.put(ZookeeperConsumerConnector.shutdownCommand)
 
-    val iter = new ConsumerIterator[String, String](queue, 
+    val iter = new ConsumerIterator[String, String](queue,
                                                     consumerConfig.consumerTimeoutMs,
-                                                    new StringDecoder(), 
+                                                    new StringDecoder(),
                                                     new StringDecoder(),
                                                     clientId = "")
     val receivedMessages = (0 until 5).map(_ => iter.next.message)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
index 1e45bfb..3012112 100644
--- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
@@ -28,6 +28,7 @@ import kafka.consumer.PartitionAssignorTest.Scenario
 import kafka.consumer.PartitionAssignorTest.WildcardSubscriptionInfo
 import org.junit.Test
 
+@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
 class PartitionAssignorTest extends Logging {
 
   @Test
@@ -42,7 +43,7 @@ class PartitionAssignorTest extends Logging {
       val topicPartitionCounts = Map((1 to topicCount).map(topic => {
         ("topic-" + topic, PartitionAssignorTest.MinPartitionCount.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxPartitionCount)))
       }):_*)
-      
+
       val subscriptions = Map((1 to consumerCount).map { consumer =>
         val streamCount = 1.max(TestUtils.random.nextInt(PartitionAssignorTest.MaxStreamCount + 1))
         ("g1c" + consumer, WildcardSubscriptionInfo(streamCount, ".*", isWhitelist = true))

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
index 8de4a89..028201f 100644
--- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
@@ -23,6 +23,7 @@ import org.junit.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.Test
 
+@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
 class TopicFilterTest extends JUnitSuite {
 
   @Test
@@ -45,7 +46,7 @@ class TopicFilterTest extends JUnitSuite {
 
     val topicFilter4 = Whitelist("test-(?!bad\\b)[\\w]+")
     assertTrue(topicFilter4.isTopicAllowed("test-good", excludeInternalTopics = true))
-    assertFalse(topicFilter4.isTopicAllowed("test-bad", excludeInternalTopics = true))    
+    assertFalse(topicFilter4.isTopicAllowed("test-bad", excludeInternalTopics = true))
   }
 
   @Test
@@ -80,5 +81,5 @@ class TopicFilterTest extends JUnitSuite {
     assertEquals("-\\\\u001f-", getTopicCountMapKey("-\\u001f-"))
     assertEquals("-\\\\u007f-", getTopicCountMapKey("-\\u007f-"))
     assertEquals("-\\\\u009f-", getTopicCountMapKey("-\\u009f-"))
-  }    
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 3f59302..6076089 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -29,6 +29,7 @@ import kafka.server._
 import kafka.consumer._
 import kafka.utils.TestUtils
 
+@deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0")
 class FetcherTest extends KafkaServerTestHarness {
   val numNodes = 1
   def generateConfigs() = TestUtils.createBrokerConfigs(numNodes, zkConnect).map(KafkaConfig.fromProps)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 054a4ff..572de9b 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -653,6 +653,8 @@ object TestUtils extends Logging {
     props
   }
 
+
+  @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
   def updateConsumerOffset(config : ConsumerConfig, path : String, offset : Long) = {
     val zkUtils = ZkUtils(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
     zkUtils.updatePersistentPath(path, offset.toString)
@@ -1051,7 +1053,7 @@ object TestUtils extends Logging {
       case -1 => s"test-$x".getBytes
       case _ => new Array[Byte](valueBytes)
     })
-    
+
     val futures = values.map { value =>
       producer.send(new ProducerRecord(topic, value))
     }
@@ -1082,6 +1084,7 @@ object TestUtils extends Logging {
    *                           If not specified, then all available messages will be consumed, and no exception is thrown.
    * @return the list of messages consumed.
    */
+  @deprecated("This method has been deprecated and will be removed in a future release.", "0.11.0.0")
   def getMessages(topicMessageStreams: Map[String, List[KafkaStream[String, String]]],
                      nMessagesPerThread: Int = -1): List[String] = {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d71cf10/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
index 28bdb16..6ebd791 100644
--- a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
+++ b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
@@ -30,6 +30,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * @deprecated since 0.11.0.0. This class will be removed in a future release.
+ */
+@Deprecated
 public class SimpleConsumerDemo {
 
     private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {


Mime
View raw message