kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-1743; ConsumerConnector.commitOffsets in 0.8.2 is not backward compatible; patched by Manikumar Reddy; reviewed by Jun Rao
Date Wed, 19 Nov 2014 03:01:07 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.8.2 00dfa8926 -> fbecd489c


kafka-1743; ConsumerConnector.commitOffsets in 0.8.2 is not backward compatible; patched by
Manikumar Reddy; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fbecd489
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fbecd489
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fbecd489

Branch: refs/heads/0.8.2
Commit: fbecd489cd469c102aa42cc12fef58bb8596c92a
Parents: 00dfa89
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Authored: Tue Nov 18 19:00:54 2014 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue Nov 18 19:00:54 2014 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/consumer/ConsumerConnector.scala    |  7 ++++++-
 .../kafka/consumer/ZookeeperConsumerConnector.scala      | 11 ++++++++---
 .../javaapi/consumer/ZookeeperConsumerConnector.scala    |  2 +-
 .../kafka/consumer/ZookeeperConsumerConnectorTest.scala  |  4 ++--
 4 files changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fbecd489/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
index 07677c1..62c0686 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
@@ -70,7 +70,12 @@ trait ConsumerConnector {
   /**
    *  Commit the offsets of all broker partitions connected by this connector.
    */
-  def commitOffsets(retryOnFailure: Boolean = true)
+  def commitOffsets(retryOnFailure: Boolean)
+  
+  /**
+   * KAFKA-1743: This method added for backward compatibility.
+   */
+  def commitOffsets
   
   /**
    *  Shut down the connector

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbecd489/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index fe9d8e0..3e1718b 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -199,7 +199,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
           }
           sendShutdownToAllQueues()
           if (config.autoCommitEnable)
-            commitOffsets()
+            commitOffsets(true)
           if (zkClient != null) {
             zkClient.close()
             zkClient = null
@@ -286,7 +286,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     }
   }
 
-  def commitOffsets(isAutoCommit: Boolean = true) {
+  def commitOffsets(isAutoCommit: Boolean) {
     var retriesRemaining = 1 + (if (isAutoCommit) config.offsetsCommitMaxRetries else 0)
// no retries for commits from auto-commit
     var done = false
 
@@ -374,6 +374,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     }
   }
 
+  /**
+   * KAFKA-1743: This method added for backward compatibility.
+   */
+  def commitOffsets { commitOffsets(true) }
+
   private def fetchOffsetFromZooKeeper(topicPartition: TopicAndPartition) = {
     val dirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic)
     val offsetString = readDataMaybeNull(zkClient, dirs.consumerOffsetDir + "/" + topicPartition.partition)._1
@@ -716,7 +721,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
           * successfully and the fetchers restart to fetch more data chunks
           **/
         if (config.autoCommitEnable)
-          commitOffsets()
+          commitOffsets(true)
         case None =>
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbecd489/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
index 1f98db5..9d5a47f 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
@@ -108,7 +108,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder(), new DefaultDecoder())
 
   def commitOffsets() {
-    underlying.commitOffsets()
+    underlying.commitOffsets(true)
   }
 
   def commitOffsets(retryOnFailure: Boolean) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbecd489/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index bad099a..8c4687b 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -113,7 +113,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     assertEquals(expected_1, actual_1)
 
     // commit consumed offsets
-    zkConsumerConnector1.commitOffsets()
+    zkConsumerConnector1.commitOffsets(true)
 
     // create a consumer
     val consumerConfig2 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect,
group, consumer2)) {
@@ -201,7 +201,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     assertEquals(expected_1, actual_1)
 
     // commit consumed offsets
-    zkConsumerConnector1.commitOffsets()
+    zkConsumerConnector1.commitOffsets(true)
 
     // create a consumer
     val consumerConfig2 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect,
group, consumer2)) {


Mime
View raw message