kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1160953 - in /incubator/kafka/trunk/core/src/main/scala/kafka/consumer: FetcherRunnable.scala ZookeeperConsumerConnector.scala
Date Wed, 24 Aug 2011 01:33:59 GMT
Author: nehanarkhede
Date: Wed Aug 24 01:33:59 2011
New Revision: 1160953

URL: http://svn.apache.org/viewvc?rev=1160953&view=rev
Log:
Code clean up in FetcherRunnable and ZookeeperConsumerConnector; KAFKA-120; patched by junrao;
reviewed by nehanarkhede

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala?rev=1160953&r1=1160952&r2=1160953&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala Wed Aug
24 01:33:59 2011
@@ -69,7 +69,7 @@ class FetcherRunnable(val name: String,
           try {
             var done = false
             if(messages.getErrorCode == ErrorMapping.OffsetOutOfRangeCode) {
-              logger.info("offset " + info.getFetchOffset + " out of range")
+              logger.info("offset for " + info + " out of range")
               // see if we can fix this error
               val resetOffset = resetConsumerOffsets(info.topic, info.partition)
               if(resetOffset >= 0) {
@@ -136,7 +136,8 @@ class FetcherRunnable(val name: String,
     val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
 
     // reset manually in zookeeper
-    logger.info("updating partition " + partition.name + " with " + (if(offset == OffsetRequest.EarliestTime)
"earliest " else " latest ") + "offset " + offsets(0))
+    logger.info("updating partition " + partition.name + " for topic " + topic + " with "
+
+            (if(offset == OffsetRequest.EarliestTime) "earliest " else " latest ") + "offset
" + offsets(0))
     ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition.name,
offsets(0).toString)
 
     offsets(0)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1160953&r1=1160952&r2=1160953&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Wed Aug 24 01:33:59 2011
@@ -137,8 +137,7 @@ private[kafka] class ZookeeperConsumerCo
       }
       catch {
         case e =>
-          logger.fatal(e)
-          logger.fatal(Utils.stackTrace(e))
+          logger.fatal("error during consumer connector shutdown", e)
       }
       logger.info("ZKConsumerConnector shut down completed")
     }
@@ -240,7 +239,7 @@ private[kafka] class ZookeeperConsumerCo
         catch {
           case t: Throwable =>
           // log it and let it go
-            logger.warn("exception during commitOffsets: " + t + Utils.stackTrace(t))
+            logger.warn("exception during commitOffsets",  t)
         }
         if(logger.isDebugEnabled)
           logger.debug("Committed offset " + newOffset + " for topic " + info)
@@ -434,7 +433,7 @@ private[kafka] class ZookeeperConsumerCo
               // occasionally, we may hit a ZK exception because the ZK state is changing
while we are iterating.
               // For example, a ZK node can disappear between the time we get all children
and the time we try to get
               // the value of a child. Just let this go since another rebalance will be triggered.
-              logger.info("exception during rebalance " + e)
+              logger.info("exception during rebalance ", e)
           }
           logger.info("end rebalancing consumer " + consumerIdString + " try #" + i)
           if (done)
@@ -450,12 +449,6 @@ private[kafka] class ZookeeperConsumerCo
     }
 
     private def rebalance(): Boolean = {
-      // testing code
-      //if ("group1_consumer1" == consumerIdString) {
-      //  logger.info("sleeping " + consumerIdString)
-      //  Thread.sleep(20)
-      //}
-
       val myTopicThreadIdsMap = getTopicCount(consumerIdString).getConsumerThreadIdsPerTopic
       val cluster = ZkUtils.getCluster(zkClient)
       val consumersPerTopicMap = getConsumersPerTopic(group)



Mime
View raw message