kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1431 ConsoleConsumer - Option to clean zk consumer path; reviewed by Neha Narkhede and Jun Rao
Date Thu, 15 May 2014 22:48:17 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c179c45f2 -> b866c5506


KAFKA-1431 ConsoleConsumer - Option to clean zk consumer path;reviewed by Neha Narkhede and
Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b866c550
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b866c550
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b866c550

Branch: refs/heads/trunk
Commit: b866c5506a936572c9a948c4bae68da0d045fc53
Parents: c179c45
Author: Neha Narkhede <neha.narkhede@gmail.com>
Authored: Thu May 15 15:48:16 2014 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Thu May 15 15:48:16 2014 -0700

----------------------------------------------------------------------
 .../scala/kafka/consumer/ConsoleConsumer.scala  | 31 ++++++++++++--------
 1 file changed, 18 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b866c550/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index 0f62819..24c9287 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -104,6 +104,7 @@ object ConsoleConsumer extends Logging {
             .withRequiredArg
             .describedAs("prop")
             .ofType(classOf[String])
+    val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified,
the consumer path in zookeeper is deleted when starting up");
     val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already
have an established offset to consume from, " +
             "start with the earliest message present in the log rather than the latest message.")
     val autoCommitIntervalOpt = parser.accepts("autocommit.interval.ms", "The time interval
at which to save the current offset in ms")
@@ -159,6 +160,16 @@ object ConsoleConsumer extends Logging {
       KafkaMetricsReporter.startReporters(verifiableProps)
     }
 
+    if (!options.has(deleteConsumerOffsetsOpt) && options.has(resetBeginningOpt)
&&
+       checkZkPathExists(options.valueOf(zkConnectOpt),"/consumers/" + options.valueOf(groupIdOpt)+
"/offsets")) {
+      System.err.println("Found previous offset information for this group "+options.valueOf(groupIdOpt)
+        +". Please use --delete-consumer-offsets to delete previous offsets metadata")
+      System.exit(1)
+    }
+
+    if(options.has(deleteConsumerOffsetsOpt))
+      ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt))
+
     val offsetsStorage = options.valueOf(offsetsStorageOpt)
     val props = new Properties()
     props.put("group.id", options.valueOf(groupIdOpt))
@@ -191,14 +202,12 @@ object ConsoleConsumer extends Logging {
 
     val connector = Consumer.create(config)
 
-    if(options.has(resetBeginningOpt))
-      ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt))
 
     Runtime.getRuntime.addShutdownHook(new Thread() {
       override def run() {
         connector.shutdown()
         // if there is no group specified then avoid polluting zookeeper with persistent
group data, this is a hack
-        if(!options.has(groupIdOpt))  
+        if(!options.has(groupIdOpt))
           ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt))
       }
     })
@@ -253,20 +262,16 @@ object ConsoleConsumer extends Logging {
     }
   }
 
-  def tryCleanupZookeeper(zkUrl: String, groupId: String) {
+  def checkZkPathExists(zkUrl: String, path: String): Boolean = {
     try {
-      val dir = "/consumers/" + groupId
-      info("Cleaning up temporary zookeeper data under " + dir + ".")
-      val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
-      zk.deleteRecursive(dir)
-      zk.close()
+      val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer);
+      zk.exists(path)
     } catch {
-      case _: Throwable => // swallow
+      case _: Throwable => false
     }
   }
 }
 
-
 object MessageFormatter {
   def tryParseFormatterArgs(args: Iterable[String]): Properties = {
     val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
@@ -291,7 +296,7 @@ class DefaultMessageFormatter extends MessageFormatter {
   var printKey = false
   var keySeparator = "\t".getBytes
   var lineSeparator = "\n".getBytes
-  
+
   override def init(props: Properties) {
     if(props.containsKey("print.key"))
       printKey = props.getProperty("print.key").trim.toLowerCase.equals("true")
@@ -300,7 +305,7 @@ class DefaultMessageFormatter extends MessageFormatter {
     if(props.containsKey("line.separator"))
       lineSeparator = props.getProperty("line.separator").getBytes
   }
-  
+
   def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
     if(printKey) {
       output.write(if (key == null) "null".getBytes() else key)


Mime
View raw message