kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-3682; ArrayIndexOutOfBoundsException thrown by
Date Fri, 27 May 2016 22:39:10 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 55225bd55 -> 756ec494d


KAFKA-3682; ArrayIndexOutOfBoundsException thrown by

SkimpyOffsetMap.get() when full

Limited number of attempts to number of map slots after the internal
positionOf() goes into linear search mode.
Added unit test

Co-developed with mimaison

Author: edoardo <ecomar@uk.ibm.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #1352 from edoardocomar/KAFKA-3682


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/756ec494
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/756ec494
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/756ec494

Branch: refs/heads/trunk
Commit: 756ec494da75c49e6b537bd094b941c0492ec46e
Parents: 55225bd
Author: edoardo <ecomar@uk.ibm.com>
Authored: Fri May 27 15:39:07 2016 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Fri May 27 15:39:07 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/OffsetMap.scala         |  5 +++++
 .../src/test/scala/unit/kafka/log/OffsetMapTest.scala | 14 +++++++++++++-
 2 files changed, 18 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/756ec494/core/src/main/scala/kafka/log/OffsetMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetMap.scala b/core/src/main/scala/kafka/log/OffsetMap.scala
index 3893b2c..f453030 100755
--- a/core/src/main/scala/kafka/log/OffsetMap.scala
+++ b/core/src/main/scala/kafka/log/OffsetMap.scala
@@ -118,7 +118,12 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5")
extend
     // search for the hash of this key by repeated probing until we find the hash we are
looking for or we find an empty slot
     var attempt = 0
     var pos = 0
+    //we need to guard against attempt integer overflow if the map is full
+    //limit attempt to number of slots once positionOf(..) enters linear search mode
+    val maxAttempts = slots + hashSize - 4
     do {
+     if(attempt >= maxAttempts)
+        return -1L
       pos = positionOf(hash1, attempt)
       bytes.position(pos)
       if(isEmpty(pos))

http://git-wip-us.apache.org/repos/asf/kafka/blob/756ec494/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
index f50daa4..a5bec17 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
@@ -44,7 +44,19 @@ class OffsetMapTest extends JUnitSuite {
       assertEquals(map.get(key(i)), -1L)
   }
   
-  def key(key: Int) = ByteBuffer.wrap(key.toString.getBytes)
+  @Test
+  def testGetWhenFull() {
+    val map = new SkimpyOffsetMap(4096)
+    var i = 37L  //any value would do
+    while (map.size < map.slots) {
+      map.put(key(i), i)
+      i = i + 1L
+    }
+    assertEquals(map.get(key(i)), -1L)
+    assertEquals(map.get(key(i-1L)), i-1L)
+  }
+
+  def key(key: Long) = ByteBuffer.wrap(key.toString.getBytes)
   
   def validateMap(items: Int, loadFactor: Double = 0.5): SkimpyOffsetMap = {
     val map = new SkimpyOffsetMap((items/loadFactor * 24).toInt)


Mime
View raw message