kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2236; Offset request reply racing with segment rolling
Date Wed, 04 May 2016 21:26:34 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 7a0821d65 -> c46cc4802


KAFKA-2236; Offset request reply racing with segment rolling

Author: William Thurston <wthurston@linkedin.com>
Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Ismael Juma, Guozhang Wang

Closes #1318 from ijuma/KAFKA-2236-offset-request-reply-segment-rolling-race


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c46cc480
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c46cc480
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c46cc480

Branch: refs/heads/trunk
Commit: c46cc480214080844ef0ca04d96f1db61b1f2ea3
Parents: 7a0821d
Author: William Thurston <wthurston@linkedin.com>
Authored: Wed May 4 14:26:30 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed May 4 14:26:30 2016 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/server/KafkaApis.scala |  9 ++++----
 .../scala/unit/kafka/server/LogOffsetTest.scala | 22 +++++++++++++++++++-
 2 files changed, 26 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c46cc480/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index cf7814e..eb6358d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -579,17 +579,18 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  private def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long]
= {
+  private[server] def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int):
Seq[Long] = {
     val segsArray = log.logSegments.toArray
     var offsetTimeArray: Array[(Long, Long)] = null
-    if (segsArray.last.size > 0)
+    val lastSegmentHasSize = segsArray.last.size > 0
+    if (lastSegmentHasSize)
       offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1)
     else
       offsetTimeArray = new Array[(Long, Long)](segsArray.length)
 
     for (i <- 0 until segsArray.length)
       offsetTimeArray(i) = (segsArray(i).baseOffset, segsArray(i).lastModified)
-    if (segsArray.last.size > 0)
+    if (lastSegmentHasSize)
       offsetTimeArray(segsArray.length) = (log.logEndOffset, SystemTime.milliseconds)
 
     var startIndex = -1
@@ -1048,4 +1049,4 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
       throw new ClusterAuthorizationException(s"Request $request is not authorized.")
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c46cc480/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index d5c696e..463cd8a 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -18,12 +18,14 @@
 package kafka.server
 
 import java.io.File
+import java.util.concurrent.atomic.AtomicLong
 import java.util.{Properties, Random}
 
 import kafka.admin.AdminUtils
 import kafka.api.{FetchRequestBuilder, OffsetRequest, PartitionOffsetRequestInfo}
 import kafka.common.TopicAndPartition
 import kafka.consumer.SimpleConsumer
+import kafka.log.{Log, LogSegment}
 import kafka.message.{ByteBufferMessageSet, Message, NoCompressionCodec}
 import kafka.utils.TestUtils._
 import kafka.utils._
@@ -31,11 +33,12 @@ import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.utils.Utils
+import org.easymock.{EasyMock, IAnswer}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
 class LogOffsetTest extends ZooKeeperTestHarness {
-  val random = new Random() 
+  val random = new Random()
   var logDir: File = null
   var topicLogDir: File = null
   var server: KafkaServer = null
@@ -194,6 +197,23 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     assertEquals(Seq(0L), consumerOffsets)
   }
 
+  /* We test that `fetchOffsetsBefore` works correctly if `LogSegment.size` changes after
each invocation (simulating
+   * a race condition) */
+  @Test
+  def testFetchOffsetsBeforeWithChangingSegmentSize() {
+    val log = EasyMock.niceMock(classOf[Log])
+    val logSegment = EasyMock.niceMock(classOf[LogSegment])
+    EasyMock.expect(logSegment.size).andStubAnswer(new IAnswer[Long] {
+      private val value = new AtomicLong(0)
+      def answer: Long = value.getAndIncrement()
+    })
+    EasyMock.replay(logSegment)
+    val logSegments = Seq(logSegment)
+    EasyMock.expect(log.logSegments).andStubReturn(logSegments)
+    EasyMock.replay(log)
+    server.apis.fetchOffsetsBefore(log, System.currentTimeMillis, 100)
+  }
+
   private def createBrokerConfig(nodeId: Int): Properties = {
     val props = new Properties
     props.put("broker.id", nodeId.toString)


Mime
View raw message