kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5584; Fix integer overflow in Log.size
Date Thu, 13 Jul 2017 00:38:35 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 0547a0825 -> 2d2e9adb5


KAFKA-5584; Fix integer overflow in Log.size

It may lead to wrong metrics and it may break
size-based retention.

Author: Gregor Uhlenheuer <kongo2002@googlemail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3521 from kongo2002/KAFKA-5584


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2d2e9adb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2d2e9adb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2d2e9adb

Branch: refs/heads/trunk
Commit: 2d2e9adb5d8d3805f082208ae9dd241f87566b27
Parents: 0547a08
Author: Gregor Uhlenheuer <kongo2002@googlemail.com>
Authored: Wed Jul 12 16:56:09 2017 -0700
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed Jul 12 17:20:19 2017 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala          | 11 ++++++++++-
 core/src/test/scala/unit/kafka/log/LogTest.scala | 13 +++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2d2e9adb/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 176a268..824d302 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1144,7 +1144,7 @@ class Log(@volatile var dir: File,
   /**
    * The size of the log in bytes
    */
-  def size: Long = logSegments.map(_.size).sum
+  def size: Long = Log.sizeInBytes(logSegments)
 
   /**
    * The offset metadata of the next message that will be appended to the log
@@ -1648,6 +1648,15 @@ object Log {
     filename.substring(0, filename.indexOf('.')).toLong
 
   /**
+    * Calculate a log's size (in bytes) based on its log segments
+    *
+    * @param segments The log segments to calculate the size of
+    * @return Sum of the log segments' sizes (in bytes)
+    */
+  def sizeInBytes(segments: Iterable[LogSegment]): Long =
+    segments.map(_.size.toLong).sum
+
+  /**
    * Parse the topic and partition out of the directory name of a log
    */
   def parseTopicPartitionName(dir: File): TopicPartition = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d2e9adb/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 65a4eeb..008cd27 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -199,6 +199,19 @@ class LogTest {
   }
 
   @Test
+  def testSizeForLargeLogs(): Unit = {
+    val largeSize = Int.MaxValue.toLong * 2
+    val logSegment = EasyMock.createMock(classOf[LogSegment])
+
+    EasyMock.expect(logSegment.size).andReturn(Int.MaxValue).anyTimes
+    EasyMock.replay(logSegment)
+
+    assertEquals(Int.MaxValue, Log.sizeInBytes(Seq(logSegment)))
+    assertEquals(largeSize, Log.sizeInBytes(Seq(logSegment, logSegment)))
+    assertTrue(Log.sizeInBytes(Seq(logSegment, logSegment)) > Int.MaxValue)
+  }
+
+  @Test
   def testPidMapOffsetUpdatedForNonIdempotentData() {
     val log = createLog(2048)
     val records = TestUtils.records(List(new SimpleRecord(time.milliseconds, "key".getBytes,
"value".getBytes)))


Mime
View raw message