kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4205; KafkaApis: fix NPE caused by conversion to array
Date Sun, 04 Dec 2016 18:59:22 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.1 5908a2523 -> c4766089f


KAFKA-4205; KafkaApis: fix NPE caused by conversion to array

NPE was caused by `log.logSegments.toArray` resulting in array containing `null` values. The
exact reason still remains somewhat a mystery to me, but it seems that the culprit is `JavaConverters`
in combination with concurrent data structure access.

Here's a simple code example to prove that:
```scala
import java.util.concurrent.ConcurrentSkipListMap
// Same as `JavaConversions`, but allows explicit conversions via `asScala`/`asJava` methods.
import scala.collection.JavaConverters._

case object Value
val m = new ConcurrentSkipListMap[Int, Value.type]
new Thread { override def run() = { while (true) m.put(9000, Value) } }.start()
new Thread { override def run() = { while (true) m.remove(9000) } }.start()
new Thread { override def run() = { while (true) { println(m.values.asScala.toArray.headOption)
} } }.start()
```

Running the example will occasionally print `Some(null)` indicating that there's something
shady going on during `toArray` conversion.

`null`s magically disappear by making the following change:
```diff
- println(m.values.asScala.toArray.headOption)
+ println(m.values.asScala.toSeq.headOption)
```

Author: Anton Karamanov <ataraxer@yandex-team.ru>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>

Closes #2204 from ataraxer/KAFKA-4205


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c4766089
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c4766089
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c4766089

Branch: refs/heads/0.10.1
Commit: c4766089f75c0e3627dbd3d596256622d0c1beb7
Parents: 5908a25
Author: Anton Karamanov <ataraxer@yandex-team.ru>
Authored: Sun Dec 4 10:51:16 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sun Dec 4 10:59:02 2016 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         | 12 +++++-----
 .../scala/kafka/log/LogCleanerManager.scala     |  2 +-
 .../src/main/scala/kafka/server/KafkaApis.scala | 23 +++++++++++---------
 .../scala/unit/kafka/server/LogOffsetTest.scala | 19 ++++++++++++++++
 4 files changed, 40 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c4766089/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index f29cde7..aea1a08 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -598,19 +598,21 @@ class Log(val dir: File,
           s"for partition $topicAndPartition is ${config.messageFormatVersion} which is earlier
than the minimum " +
           s"required version $KAFKA_0_10_0_IV0")
 
+    // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
+    // constant time access while being safe to use with concurrent collections unlike `toArray`.
+    val segmentsCopy = logSegments.toBuffer
     // For the earliest and latest, we do not need to return the timestamp.
-    val segsArray = logSegments.toArray
     if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
-        return Some(TimestampOffset(Message.NoTimestamp, segsArray(0).baseOffset))
+        return Some(TimestampOffset(Message.NoTimestamp, segmentsCopy.head.baseOffset))
     else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP)
         return Some(TimestampOffset(Message.NoTimestamp, logEndOffset))
 
     val targetSeg = {
       // Get all the segments whose largest timestamp is smaller than target timestamp
-      val earlierSegs = segsArray.takeWhile(_.largestTimestamp < targetTimestamp)
+      val earlierSegs = segmentsCopy.takeWhile(_.largestTimestamp < targetTimestamp)
       // We need to search the first segment whose largest timestamp is greater than the
target timestamp if there is one.
-      if (earlierSegs.length < segsArray.length)
-        Some(segsArray(earlierSegs.length))
+      if (earlierSegs.length < segmentsCopy.length)
+        Some(segmentsCopy(earlierSegs.length))
       else
         None
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4766089/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index b3e6e72..0cfe6c3 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -290,7 +290,7 @@ private[log] object LogCleanerManager extends Logging {
     }
 
     // dirty log segments
-    val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset).toArray
+    val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset)
 
     val compactionLagMs = math.max(log.config.compactionLagMs, 0L)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4766089/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index b664ea7..197ddb5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -714,18 +714,21 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   private[server] def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int):
Seq[Long] = {
-    val segsArray = log.logSegments.toArray
-    var offsetTimeArray: Array[(Long, Long)] = null
-    val lastSegmentHasSize = segsArray.last.size > 0
-    if (lastSegmentHasSize)
-      offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1)
-    else
-      offsetTimeArray = new Array[(Long, Long)](segsArray.length)
+    // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
+    // constant time access while being safe to use with concurrent collections unlike `toArray`.
+    val segments = log.logSegments.toBuffer
+    val lastSegmentHasSize = segments.last.size > 0
+
+    val offsetTimeArray =
+      if (lastSegmentHasSize)
+        new Array[(Long, Long)](segments.length + 1)
+      else
+        new Array[(Long, Long)](segments.length)
 
-    for (i <- 0 until segsArray.length)
-      offsetTimeArray(i) = (segsArray(i).baseOffset, segsArray(i).lastModified)
+    for (i <- segments.indices)
+      offsetTimeArray(i) = (segments(i).baseOffset, segments(i).lastModified)
     if (lastSegmentHasSize)
-      offsetTimeArray(segsArray.length) = (log.logEndOffset, SystemTime.milliseconds)
+      offsetTimeArray(segments.length) = (log.logEndOffset, SystemTime.milliseconds)
 
     var startIndex = -1
     timestamp match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4766089/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 0885709..d98b82c 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -214,6 +214,25 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     server.apis.fetchOffsetsBefore(log, System.currentTimeMillis, 100)
   }
 
+  /* We test that `fetchOffsetsBefore` works correctly if `Log.logSegments` content and size
are
+   * different (simulating a race condition) */
+  @Test
+  def testFetchOffsetsBeforeWithChangingSegments() {
+    val log = EasyMock.niceMock(classOf[Log])
+    val logSegment = EasyMock.niceMock(classOf[LogSegment])
+    EasyMock.expect(log.logSegments).andStubAnswer {
+      new IAnswer[Iterable[LogSegment]] {
+        def answer = new Iterable[LogSegment] {
+          override def size = 2
+          def iterator = Seq(logSegment).iterator
+        }
+      }
+    }
+    EasyMock.replay(logSegment)
+    EasyMock.replay(log)
+    server.apis.fetchOffsetsBefore(log, System.currentTimeMillis, 100)
+  }
+
   private def createBrokerConfig(nodeId: Int): Properties = {
     val props = new Properties
     props.put("broker.id", nodeId.toString)


Mime
View raw message