kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject git commit: KAFKA-1309; Fix cross-compilation issue (due to use of deprecated JavaConversions API in javaapi.OffsetCommitRequest; reviewed by Neha Narkhede
Date Tue, 18 Mar 2014 17:38:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 53e4061b2 -> 423d9d5af


KAFKA-1309; Fix cross-compilation issue (due to use of deprecated JavaConversions API in javaapi.OffsetCommitRequest;
reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/423d9d5a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/423d9d5a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/423d9d5a

Branch: refs/heads/trunk
Commit: 423d9d5af9481b782c7655dc2410e61e1b48a0bb
Parents: 53e4061
Author: Joel Koshy <jjkoshy@gmail.com>
Authored: Tue Mar 18 10:38:03 2014 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Tue Mar 18 10:38:03 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/api/OffsetCommitRequest.scala   |  4 ++--
 .../scala/kafka/consumer/ZookeeperConsumerConnector.scala |  2 +-
 .../main/scala/kafka/javaapi/OffsetCommitRequest.scala    |  4 ++--
 core/src/test/scala/other/kafka/TestOffsetManager.scala   |  2 +-
 .../unit/kafka/api/RequestResponseSerializationTest.scala |  2 +-
 .../test/scala/unit/kafka/server/OffsetCommitTest.scala   | 10 +++++-----
 6 files changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/423d9d5a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index 9f6956e..630768a 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -54,12 +54,12 @@ object OffsetCommitRequest extends Logging {
         (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset, metadata, timestamp))
       })
     })
-    OffsetCommitRequest(consumerGroupId, mutable.Map(pairs:_*), versionId, correlationId,
clientId)
+    OffsetCommitRequest(consumerGroupId, immutable.Map(pairs:_*), versionId, correlationId,
clientId)
   }
 }
 
 case class OffsetCommitRequest(groupId: String,
-                               requestInfo: mutable.Map[TopicAndPartition, OffsetAndMetadata],
+                               requestInfo: immutable.Map[TopicAndPartition, OffsetAndMetadata],
                                versionId: Short = OffsetCommitRequest.CurrentVersion,
                                override val correlationId: Int = 0,
                                clientId: String = OffsetCommitRequest.DefaultClientId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/423d9d5a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 9a3db90..ff5e819 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -286,7 +286,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
     while (!done) {
       val committed = offsetsChannelLock synchronized { // committed when we receive either
no error codes or only MetadataTooLarge errors
-        val offsetsToCommit = mutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos)
=>
+        val offsetsToCommit = immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos)
=>
           partitionTopicInfos.filterNot { case (partition, info) =>
             val newOffset = info.getConsumeOffset()
             newOffset == checkpointedOffsets.get(TopicAndPartition(topic, info.partitionId))

http://git-wip-us.apache.org/repos/asf/kafka/blob/423d9d5a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
index 6de320d..08dcc55 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
@@ -25,10 +25,10 @@ class OffsetCommitRequest(groupId: String,
                           correlationId: Int,
                           clientId: String) {
   val underlying = {
-    val scalaMap: collection.mutable.Map[TopicAndPartition, OffsetAndMetadata] = {
+    val scalaMap: collection.immutable.Map[TopicAndPartition, OffsetAndMetadata] = {
       import collection.JavaConversions._
 
-      collection.JavaConversions.asMap(requestInfo)
+      requestInfo.toMap
     }
     kafka.api.OffsetCommitRequest(
       groupId = groupId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/423d9d5a/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index 83317f0..c468419 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -66,7 +66,7 @@ object TestOffsetManager {
     }
 
     override def doWork() {
-      val commitRequest = OffsetCommitRequest(group, mutable.Map((1 to partitionCount).map(TopicAndPartition("topic-"
+ id, _) -> OffsetAndMetadata(offset, metadata)):_*))
+      val commitRequest = OffsetCommitRequest(group, immutable.Map((1 to partitionCount).map(TopicAndPartition("topic-"
+ id, _) -> OffsetAndMetadata(offset, metadata)):_*))
       try {
         ensureConnected()
         offsetsChannel.send(commitRequest)

http://git-wip-us.apache.org/repos/asf/kafka/blob/423d9d5a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 5378446..d39a9a4 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -147,7 +147,7 @@ object SerializationTestUtils {
   }
 
   def createTestOffsetCommitRequest: OffsetCommitRequest = {
-    new OffsetCommitRequest("group 1", collection.mutable.Map(
+    new OffsetCommitRequest("group 1", collection.immutable.Map(
       TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="some metadata",
timestamp=SystemTime.milliseconds),
       TopicAndPartition(topic1, 1) -> OffsetAndMetadata(offset=100L, metadata=OffsetAndMetadata.NoMetadata,
timestamp=SystemTime.milliseconds)
     ))

http://git-wip-us.apache.org/repos/asf/kafka/blob/423d9d5a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index e632997..ae9bb3a 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -81,7 +81,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
     assertTrue("Leader should be elected after topic creation", leaderIdOpt.isDefined)
-    val commitRequest = OffsetCommitRequest("test-group", mutable.Map(topicAndPartition ->
OffsetAndMetadata(offset=42L)))
+    val commitRequest = OffsetCommitRequest("test-group", immutable.Map(topicAndPartition
-> OffsetAndMetadata(offset=42L)))
     val commitResponse = simpleConsumer.commitOffsets(commitRequest)
 
     assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(topicAndPartition).get)
@@ -95,7 +95,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertEquals(42L, fetchResponse.requestInfo.get(topicAndPartition).get.offset)
 
     // Commit a new offset
-    val commitRequest1 = OffsetCommitRequest(group, mutable.Map(topicAndPartition -> OffsetAndMetadata(
+    val commitRequest1 = OffsetCommitRequest(group, immutable.Map(topicAndPartition ->
OffsetAndMetadata(
       offset=100L,
       metadata="some metadata"
     )))
@@ -120,7 +120,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     val topic3 = "topic-3"
     val topic4 = "topic-4"
 
-    val commitRequest = OffsetCommitRequest("test-group", mutable.Map(
+    val commitRequest = OffsetCommitRequest("test-group", immutable.Map(
       TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="metadata
one"),
       TopicAndPartition(topic2, 0) -> OffsetAndMetadata(offset=43L, metadata="metadata
two"),
       TopicAndPartition(topic3, 0) -> OffsetAndMetadata(offset=44L, metadata="metadata
three"),
@@ -172,7 +172,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     var leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topicAndPartition.topic,
0, 1000)
     assertTrue("Leader should be elected after topic creation", leaderIdOpt.isDefined)
 
-    val commitRequest = OffsetCommitRequest("test-group", mutable.Map(topicAndPartition ->
OffsetAndMetadata(
+    val commitRequest = OffsetCommitRequest("test-group", immutable.Map(topicAndPartition
-> OffsetAndMetadata(
       offset=42L,
       metadata=random.nextString(server.config.offsetMetadataMaxSize)
     )))
@@ -180,7 +180,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
 
     assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(topicAndPartition).get)
 
-    val commitRequest1 = OffsetCommitRequest(group, mutable.Map(topicAndPartition -> OffsetAndMetadata(
+    val commitRequest1 = OffsetCommitRequest(group, immutable.Map(topicAndPartition ->
OffsetAndMetadata(
       offset=42L,
       metadata=random.nextString(server.config.offsetMetadataMaxSize + 1)
     )))


Mime
View raw message