kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject [23/30] git commit: KAFKA-640 kafka.common.InvalidClientIdException in broker log4j messages; patched by Swapnil; reviewed by Neha Narkhede
Date Tue, 18 Dec 2012 17:44:12 GMT
KAFKA-640 kafka.common.InvalidClientIdException in broker log4j messages; patched by Swapnil;
reviewed by Neha Narkhede

git-svn-id: https://svn.apache.org/repos/asf/kafka/branches/0.8@1415765 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6486fd1a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6486fd1a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6486fd1a

Branch: refs/heads/trunk
Commit: 6486fd1a9f503af24c83d16a014a6405ae07cf9b
Parents: d7c71c0
Author: Neha Narkhede <nehanarkhede@apache.org>
Authored: Fri Nov 30 17:49:49 2012 +0000
Committer: Neha Narkhede <nehanarkhede@apache.org>
Committed: Fri Nov 30 17:49:49 2012 +0000

----------------------------------------------------------------------
 .../main/scala/kafka/consumer/SimpleConsumer.scala |    8 ++++----
 .../main/scala/kafka/producer/SyncProducer.scala   |    8 ++++----
 .../scala/kafka/server/AbstractFetcherThread.scala |    7 ++++---
 .../scala/kafka/server/ReplicaFetcherThread.scala  |    2 +-
 .../test/scala/unit/kafka/utils/ClientIdTest.scala |    1 -
 5 files changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6486fd1a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index e42923a..7ecd11f 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -81,7 +81,7 @@ class SimpleConsumer(val host: String,
   ClientId.validate(clientId)
   private val lock = new Object()
   private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize,
soTimeout)
-  private val fetchRequestAndResponseStats = new FetchRequestAndResponseStats(clientId, "host_"
+ host + "-port_" + port)
+  private val fetchRequestAndResponseStats = new FetchRequestAndResponseStats(clientId +
"-host_%s-port_%s".format(host, port))
 
   private def connect(): BlockingChannel = {
     close
@@ -169,7 +169,7 @@ class SimpleConsumer(val host: String,
   }
 }
 
-class FetchRequestAndResponseStats(clientId: String, brokerInfo: String) extends KafkaMetricsGroup
{
-  val requestTimer = new KafkaTimer(newTimer(clientId + "-" + brokerInfo + "-FetchRequestRateAndTimeMs",
TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val respondSizeHist = newHistogram(clientId + "-" + brokerInfo + "-FetchResponseSize")
+class FetchRequestAndResponseStats(clientId: String) extends KafkaMetricsGroup {
+  val requestTimer = new KafkaTimer(newTimer(clientId + "-FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS,
TimeUnit.SECONDS))
+  val respondSizeHist = newHistogram(clientId + "-FetchResponseSize")
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6486fd1a/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index ea03d51..3183ceb 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -39,7 +39,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
   @volatile private var shutdown: Boolean = false
   private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
     config.bufferSize, config.requestTimeoutMs)
-  val producerRequestStats = new ProducerRequestStats(config.clientId, "host_" + config.host
+ "-port_" + config.port)
+  val producerRequestStats = new ProducerRequestStats(config.clientId + "-host_%s-port_%s".format(config.host,
config.port))
 
   trace("Instantiating Scala Sync Producer")
 
@@ -150,7 +150,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
   }
 }
 
-class ProducerRequestStats(clientId: String, brokerInfo: String) extends KafkaMetricsGroup
{
-  val requestTimer = new KafkaTimer(newTimer(clientId + "-" + brokerInfo + "-ProduceRequestRateAndTimeMs",
TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
-  val requestSizeHist = newHistogram(clientId + "-" + brokerInfo + "-ProducerRequestSize")
+class ProducerRequestStats(clientId: String) extends KafkaMetricsGroup {
+  val requestTimer = new KafkaTimer(newTimer(clientId + "-ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS,
TimeUnit.SECONDS))
+  val requestSizeHist = newHistogram(clientId + "-ProducerRequestSize")
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6486fd1a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index e4520a4..6d73c82 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -42,9 +42,10 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   private val partitionMapLock = new ReentrantLock
   private val partitionMapCond = partitionMapLock.newCondition()
   val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout,
socketBufferSize, clientId)
-  val fetcherStats = new FetcherStats(clientId)
+  private val brokerInfo = "host_%s-port_%s".format(sourceBroker.host, sourceBroker.port)
+  val fetcherStats = new FetcherStats(clientId + "-" + brokerInfo)
   val fetcherMetrics = fetcherStats.getFetcherStats(name + "-" + sourceBroker.id)
-  val fetcherLagStats = new FetcherLagStats(clientId)
+  val fetcherLagStats = new FetcherLagStats(clientId + "-" + brokerInfo)
 
   /* callbacks to be defined in subclass */
 
@@ -65,7 +66,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
 
   override def doWork() {
     val fetchRequestuilder = new FetchRequestBuilder().
-            clientId(clientId).
+            clientId(clientId + "-" + brokerInfo).
             replicaId(fetcherBrokerId).
             maxWait(maxWait).
             minBytes(minBytes)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6486fd1a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 34166e4..c1d3235 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -28,7 +28,7 @@ class ReplicaFetcherThread(name:String,
                            brokerConfig: KafkaConfig,
                            replicaMgr: ReplicaManager)
   extends AbstractFetcherThread(name = name,
-                                clientId = FetchRequest.ReplicaFetcherClientId + "-host_%s-port_%d".format(sourceBroker.host,
sourceBroker.port),
+                                clientId = FetchRequest.ReplicaFetcherClientId,
                                 sourceBroker = sourceBroker,
                                 socketTimeout = brokerConfig.replicaSocketTimeoutMs,
                                 socketBufferSize = brokerConfig.replicaSocketBufferSize,

http://git-wip-us.apache.org/repos/asf/kafka/blob/6486fd1a/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala b/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala
index 6b9315e..794dcdc 100644
--- a/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala
@@ -27,7 +27,6 @@ class ClientIdTest {
   @Test
   def testInvalidClientIds() {
     val invalidclientIds = new ArrayBuffer[String]()
-    invalidclientIds += (".", "..")
     var longName = "ATCG"
     for (i <- 1 to 6)
       longName += longName


Mime
View raw message