kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1398893 - in /incubator/kafka/branches/0.8: contrib/hadoop-consumer/src/main/java/kafka/etl/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/server/ core/src/main/scala/kafka/tools/ core/src/tes...
Date Tue, 16 Oct 2012 17:30:47 GMT
Author: nehanarkhede
Date: Tue Oct 16 17:30:46 2012
New Revision: 1398893

URL: http://svn.apache.org/viewvc?rev=1398893&view=rev
Log:
KAFKA-537 Expose clientId in ConsumerConfig and fix correlation id; patched by Yang Ye; reviewed
by Neha Narkhede

Modified:
    incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
    incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala

Modified: incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java?rev=1398893&r1=1398892&r2=1398893&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
(original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
Tue Oct 16 17:30:46 2012
@@ -60,7 +60,6 @@ public class KafkaETLContext {
     protected long _offset = Long.MAX_VALUE; /*current offset*/
     protected long _count; /*current count*/
 
-    protected int requestId = 0; /* the id of the next fetch request */
     protected FetchResponse _response = null;  /*fetch response*/
     protected Iterator<MessageAndOffset> _messageIt = null; /*message iterator*/
     protected Iterator<ByteBufferMessageSet> _respIterator = null;
@@ -74,6 +73,7 @@ public class KafkaETLContext {
     
     protected MultipleOutputs _mos;
     protected OutputCollector<KafkaETLKey, BytesWritable> _offsetOut = null;
+    protected FetchRequestBuilder builder = new FetchRequestBuilder();
     
     public long getTotalBytes() {
         return (_offsetRange[1] > _offsetRange[0])? _offsetRange[1] - _offsetRange[0]
: 0;
@@ -150,8 +150,7 @@ public class KafkaETLContext {
     public boolean fetchMore () throws IOException {
         if (!hasMore()) return false;
 
-        FetchRequest fetchRequest = new FetchRequestBuilder()
-                .correlationId(requestId)
+        FetchRequest fetchRequest = builder
                 .clientId(_request.clientId())
                 .addFetch(_request.getTopic(), _request.getPartition(), _offset, _bufferSize)
                 .build();

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala?rev=1398893&r1=1398892&r2=1398893&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala Tue Oct
16 17:30:46 2012
@@ -23,6 +23,7 @@ import kafka.api.ApiUtils._
 import scala.collection.immutable.Map
 import kafka.common.TopicAndPartition
 import kafka.consumer.ConsumerConfig
+import java.util.concurrent.atomic.AtomicInteger
 
 
 case class PartitionFetchInfo(offset: Long, fetchSize: Int)
@@ -30,8 +31,10 @@ case class PartitionFetchInfo(offset: Lo
 
 object FetchRequest {
   val CurrentVersion = 1.shortValue()
-  val DefaultCorrelationId = -1
-  val DefaultClientId = ""
+  val DefaultMaxWait = 0
+  val DefaultMinBytes = 0
+  val ReplicaFetcherClientId = "replica fetcher"
+  val DefaultCorrelationId = 0
 
   def readFrom(buffer: ByteBuffer): FetchRequest = {
     val versionId = buffer.getShort
@@ -57,10 +60,10 @@ object FetchRequest {
 
 case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
                         correlationId: Int = FetchRequest.DefaultCorrelationId,
-                        clientId: String = FetchRequest.DefaultClientId,
+                        clientId: String = ConsumerConfig.DefaultClientId,
                         replicaId: Int = Request.OrdinaryConsumerId,
-                        maxWait: Int = ConsumerConfig.MaxFetchWaitMs,
-                        minBytes: Int = ConsumerConfig.MinFetchBytes,
+                        maxWait: Int = FetchRequest.DefaultMaxWait,
+                        minBytes: Int = FetchRequest.DefaultMinBytes,
                         requestInfo: Map[TopicAndPartition, PartitionFetchInfo])
         extends RequestOrResponse(Some(RequestKeys.FetchKey)) {
 
@@ -123,12 +126,12 @@ case class FetchRequest(versionId: Short
 
 @nonthreadsafe
 class FetchRequestBuilder() {
-  private var correlationId = FetchRequest.DefaultCorrelationId
+  private val correlationId = new AtomicInteger(0)
   private val versionId = FetchRequest.CurrentVersion
-  private var clientId = FetchRequest.DefaultClientId
+  private var clientId = ConsumerConfig.DefaultClientId
   private var replicaId = Request.OrdinaryConsumerId
-  private var maxWait = ConsumerConfig.MaxFetchWaitMs
-  private var minBytes = ConsumerConfig.MinFetchBytes
+  private var maxWait = FetchRequest.DefaultMaxWait
+  private var minBytes = FetchRequest.DefaultMinBytes
   private val requestMap = new collection.mutable.HashMap[TopicAndPartition, PartitionFetchInfo]
 
   def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = {
@@ -136,11 +139,6 @@ class FetchRequestBuilder() {
     this
   }
 
-  def correlationId(correlationId: Int): FetchRequestBuilder = {
-    this.correlationId = correlationId
-    this
-  }
-
   def clientId(clientId: String): FetchRequestBuilder = {
     this.clientId = clientId
     this
@@ -161,5 +159,5 @@ class FetchRequestBuilder() {
     this
   }
 
-  def build() = FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes,
requestMap.toMap)
+  def build() = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId,
maxWait, minBytes, requestMap.toMap)
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala?rev=1398893&r1=1398892&r2=1398893&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala Tue
Oct 16 17:30:46 2012
@@ -42,6 +42,7 @@ object ConsumerConfig {
   val MirrorTopicsWhitelistProp = "mirror.topics.whitelist"
   val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"
   val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
+  val DefaultClientId = ""
 }
 
 class ConsumerConfig private (props: VerifiableProperties) extends ZKConfig(props) {
@@ -106,5 +107,10 @@ class ConsumerConfig private (props: Ver
    *  overhead of decompression.
    *  */
   val enableShallowIterator = props.getBoolean("shallowiterator.enable", false)
+
+  /**
+   * Cliient id is specified by the kafka consumer client, used to distinguish different
clients
+   */
+  val clientId = props.getString("clientid", groupId)
 }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala?rev=1398893&r1=1398892&r2=1398893&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
Tue Oct 16 17:30:46 2012
@@ -28,10 +28,15 @@ class ConsumerFetcherThread(name: String
                             val config: ConsumerConfig,
                             sourceBroker: Broker,
                             val consumerFetcherManager: ConsumerFetcherManager)
-        extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout
= config.socketTimeoutMs,
-          socketBufferSize = config.socketBufferSize, fetchSize = config.fetchSize,
-          fetcherBrokerId = Request.OrdinaryConsumerId, maxWait = config.maxFetchWaitMs,
-          minBytes = config.minFetchBytes) {
+        extends AbstractFetcherThread(name = name, 
+                                      clientId = config.clientId,
+                                      sourceBroker = sourceBroker,
+                                      socketTimeout = config.socketTimeoutMs,
+                                      socketBufferSize = config.socketBufferSize, 
+                                      fetchSize = config.fetchSize,
+                                      fetcherBrokerId = Request.OrdinaryConsumerId,
+                                      maxWait = config.maxFetchWaitMs,
+                                      minBytes = config.minFetchBytes) {
 
   // process fetched data
   def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData:
FetchResponsePartitionData) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala?rev=1398893&r1=1398892&r2=1398893&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
Tue Oct 16 17:30:46 2012
@@ -34,7 +34,7 @@ import java.util.concurrent.TimeUnit
 /**
  *  Abstract class for fetching data from multiple partitions from the same broker.
  */
-abstract class  AbstractFetcherThread(name: String, sourceBroker: Broker, socketTimeout:
Int, socketBufferSize: Int,
+abstract class  AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker,
socketTimeout: Int, socketBufferSize: Int,
                                      fetchSize: Int, fetcherBrokerId: Int = -1, maxWait:
Int = 0, minBytes: Int = 1)
   extends ShutdownableThread(name) {
 
@@ -42,7 +42,13 @@ abstract class  AbstractFetcherThread(na
   private val fetchMapLock = new Object
   val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout,
socketBufferSize)
   val fetcherMetrics = FetcherStat.getFetcherStat(name + "-" + sourceBroker.id)
-  
+
+  val fetchRequestuilder = new FetchRequestBuilder().
+          clientId(clientId).
+          replicaId(fetcherBrokerId).
+          maxWait(maxWait).
+          minBytes(minBytes)
+
   /* callbacks to be defined in subclass */
 
   // process fetched data
@@ -61,21 +67,15 @@ abstract class  AbstractFetcherThread(na
   }
 
   override def doWork() {
-    val builder = new FetchRequestBuilder().
-            clientId(name).
-            replicaId(fetcherBrokerId).
-            maxWait(maxWait).
-            minBytes(minBytes)
-
     fetchMapLock synchronized {
       fetchMap.foreach {
         case((topicAndPartition, offset)) =>
-          builder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
+          fetchRequestuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
                            offset, fetchSize)
       }
     }
 
-    val fetchRequest = builder.build()
+    val fetchRequest = fetchRequestuilder.build()
     val partitionsWithError = new mutable.HashSet[TopicAndPartition]
     var response: FetchResponse = null
     try {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?rev=1398893&r1=1398892&r2=1398893&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
Tue Oct 16 17:30:46 2012
@@ -17,17 +17,25 @@
 
 package kafka.server
 
-import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest, FetchResponsePartitionData}
 import kafka.cluster.Broker
 import kafka.message.ByteBufferMessageSet
 import kafka.common.TopicAndPartition
+import kafka.api.{FetchRequest, PartitionOffsetRequestInfo, OffsetRequest, FetchResponsePartitionData}
 
 
-class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: KafkaConfig,
replicaMgr: ReplicaManager)
-  extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout =
brokerConfig.replicaSocketTimeoutMs,
-    socketBufferSize = brokerConfig.replicaSocketBufferSize, fetchSize = brokerConfig.replicaFetchSize,
-    fetcherBrokerId = brokerConfig.brokerId, maxWait = brokerConfig.replicaMaxWaitTimeMs,
-    minBytes = brokerConfig.replicaMinBytes) {
+class ReplicaFetcherThread(name:String,
+                           sourceBroker: Broker,
+                           brokerConfig: KafkaConfig,
+                           replicaMgr: ReplicaManager)
+  extends AbstractFetcherThread(name = name,
+                                clientId = FetchRequest.ReplicaFetcherClientId + "- %s:%d".format(sourceBroker.host,
sourceBroker.port) ,
+                                sourceBroker = sourceBroker,
+                                socketTimeout = brokerConfig.replicaSocketTimeoutMs,
+                                socketBufferSize = brokerConfig.replicaSocketBufferSize,
+                                fetchSize = brokerConfig.replicaFetchSize,
+                                fetcherBrokerId = brokerConfig.brokerId,
+                                maxWait = brokerConfig.replicaMaxWaitTimeMs,
+                                minBytes = brokerConfig.replicaMinBytes) {
 
   // process fetched data
   def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData:
FetchResponsePartitionData) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala?rev=1398893&r1=1398892&r2=1398893&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
Tue Oct 16 17:30:46 2012
@@ -114,6 +114,12 @@ object SimpleConsumerShell extends Loggi
     val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
     val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt))
 
+    val fetchRequestBuilder = new FetchRequestBuilder()
+                       .clientId(clientId)
+                       .replicaId(Request.DebuggingConsumerId)
+                       .maxWait(maxWaitMs)
+                       .minBytes(ConsumerConfig.MinFetchBytes)
+
     // getting topic metadata
     info("Getting topic metatdata...")
     val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
@@ -168,17 +174,11 @@ object SimpleConsumerShell extends Loggi
     val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port,
10000, 64*1024)
     val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() {
       def run() {
-        var reqId = 0
         var offset = startingOffset
         try {
           while(true) {
-            val fetchRequest = new FetchRequestBuilder()
-                    .correlationId(reqId)
-                    .clientId(clientId)
-                    .replicaId(Request.DebuggingConsumerId)
+            val fetchRequest = fetchRequestBuilder
                     .addFetch(topic, partitionId, offset, fetchSize)
-                    .maxWait(maxWaitMs)
-                    .minBytes(ConsumerConfig.MinFetchBytes)
                     .build()
             val fetchResponse = simpleConsumer.fetch(fetchRequest)
             val messageSet = fetchResponse.messageSet(topic, partitionId)
@@ -206,11 +206,10 @@ object SimpleConsumerShell extends Loggi
               }
               consumed += 1
             }
-            reqId += 1
           }
         } catch {
           case e: Throwable =>
-            error("Error consuming topic, partition, replica (%s, %d, %d) with request id
[%d] and offset [%d]".format(topic, partitionId, replicaId, reqId, offset), e)
+            error("Error consuming topic, partition, replica (%s, %d, %d) with offset [%d]".format(topic,
partitionId, replicaId, offset), e)
         }
       }
     }, false)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1398893&r1=1398892&r2=1398893&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
Tue Oct 16 17:30:46 2012
@@ -62,7 +62,6 @@ class PrimitiveApiTest extends JUnit3Sui
 
   def testFetchRequestCanProperlySerialize() {
     val request = new FetchRequestBuilder()
-      .correlationId(100)
       .clientId("test-client")
       .maxWait(10001)
       .minBytes(4444)
@@ -99,12 +98,11 @@ class PrimitiveApiTest extends JUnit3Sui
                replica.logEndOffset > 0 && replica.logEndOffset == replica.highWatermark)
 
     val request = new FetchRequestBuilder()
-      .correlationId(100)
       .clientId("test-client")
       .addFetch(topic, 0, 0, 10000)
       .build()
     val fetched = consumer.fetch(request)
-    assertEquals("Returned correlationId doesn't match that in request.", 100, fetched.correlationId)
+    assertEquals("Returned correlationId doesn't match that in request.", 0, fetched.correlationId)
 
     val messageSet = fetched.messageSet(topic, 0)
     assertTrue(messageSet.iterator.hasNext)

Modified: incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java?rev=1398893&r1=1398892&r2=1398893&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
(original)
+++ incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
Tue Oct 16 17:30:46 2012
@@ -57,7 +57,6 @@ public class SimpleConsumerDemo {
 
     System.out.println("Testing single fetch");
     FetchRequest req = new FetchRequestBuilder()
-            .correlationId(0)
             .clientId(KafkaProperties.clientId)
             .addFetch(KafkaProperties.topic2, 0, 0L, 100)
             .build();
@@ -70,7 +69,6 @@ public class SimpleConsumerDemo {
         put(KafkaProperties.topic3, new ArrayList<Integer>(){{ add(0); }});
     }};
     req = new FetchRequestBuilder()
-            .correlationId(0)
             .clientId(KafkaProperties.clientId)
             .addFetch(KafkaProperties.topic2, 0, 0L, 100)
             .addFetch(KafkaProperties.topic3, 0, 0L, 100)

Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala?rev=1398893&r1=1398892&r2=1398893&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
(original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
Tue Oct 16 17:30:46 2012
@@ -59,11 +59,9 @@ object SimpleConsumerPerformance {
     var lastReportTime: Long = startMs
     var lastBytesRead = 0L
     var lastMessagesRead = 0L
-    var reqId = 0
     while(!done) {
       // TODO: add in the maxWait and minBytes for performance
       val request = new FetchRequestBuilder()
-        .correlationId(reqId)
         .clientId(config.clientId)
         .addFetch(config.topic, config.partition, offset, config.fetchSize)
         .build()
@@ -101,7 +99,6 @@ object SimpleConsumerPerformance {
         lastMessagesRead = totalMessagesRead
         consumedInterval = 0
       }
-      reqId += 1
     }
     val reportTime = System.currentTimeMillis
     val elapsed = (reportTime - startMs) / 1000.0



Mime
View raw message