kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 1.0 updated: MINOR: Fix record conversion time in metrics (#4671)
Date Fri, 09 Mar 2018 17:40:05 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new dd14bac  MINOR: Fix record conversion time in metrics (#4671)
dd14bac is described below

commit dd14bac7cfc21a9408b00cf051330eed185b1c84
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Fri Mar 9 16:50:34 2018 +0000

    MINOR: Fix record conversion time in metrics (#4671)
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 core/src/main/scala/kafka/log/LogValidator.scala   |  3 +-
 .../scala/unit/kafka/log/LogValidatorTest.scala    | 33 ++++++++++++++++------
 2 files changed, 26 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 15750e9..61349cc 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -118,6 +118,7 @@ private[kafka] object LogValidator extends Logging {
                                                    toMagicValue: Byte,
                                                    partitionLeaderEpoch: Int,
                                                    isFromClient: Boolean): ValidationAndOffsetAssignResult
= {
+    val startNanos = time.nanoseconds
     val sizeInBytesAfterConversion = AbstractRecords.estimateSizeInBytes(toMagicValue, offsetCounter.value,
       CompressionType.NONE, records.records)
 
@@ -143,7 +144,7 @@ private[kafka] object LogValidator extends Logging {
 
     val info = builder.info
     val recordsProcessingStats = new RecordsProcessingStats(builder.uncompressedBytesWritten,
-      builder.numRecords, time.nanoseconds - now)
+      builder.numRecords, time.nanoseconds - startNanos)
     ValidationAndOffsetAssignResult(
       validatedRecords = convertedRecords,
       maxTimestamp = info.maxTimestamp,
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 131152a..467b031 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -17,6 +17,7 @@
 package kafka.log
 
 import java.nio.ByteBuffer
+import java.util.concurrent.TimeUnit
 
 import kafka.common.LongRef
 import kafka.message.{CompressionCodec, DefaultCompressionCodec, GZIPCompressionCodec, NoCompressionCodec,
SnappyCompressionCodec}
@@ -638,7 +639,7 @@ class LogValidatorTest {
     val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.NONE)
     checkOffsets(records, 0)
     val offset = 1234567
-    checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+    val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
       time = time,
       now = System.currentTimeMillis(),
@@ -649,7 +650,10 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true).validatedRecords, offset)
+      isFromClient = true)
+    checkOffsets(validatedResults.validatedRecords, offset)
+    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords
= 3, records,
+      compressed = false)
   }
 
   @Test
@@ -657,7 +661,7 @@ class LogValidatorTest {
     val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.NONE)
     checkOffsets(records, 0)
     val offset = 1234567
-    checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+    val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
       time = time,
       now = System.currentTimeMillis(),
@@ -668,7 +672,10 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true).validatedRecords, offset)
+      isFromClient = true)
+    checkOffsets(validatedResults.validatedRecords, offset)
+    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords
= 3, records,
+      compressed = false)
   }
 
   @Test
@@ -676,7 +683,7 @@ class LogValidatorTest {
     val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
     val offset = 1234567
     checkOffsets(records, 0)
-    checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+    val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
       time = time,
       now = System.currentTimeMillis(),
@@ -687,7 +694,10 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true).validatedRecords, offset)
+      isFromClient = true)
+    checkOffsets(validatedResults.validatedRecords, offset)
+    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords
= 3, records,
+      compressed = true)
   }
 
   @Test
@@ -695,7 +705,7 @@ class LogValidatorTest {
     val records = createRecords(magicValue = RecordBatch.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
     val offset = 1234567
     checkOffsets(records, 0)
-    checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records,
+    val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
       offsetCounter = new LongRef(offset),
       time = time,
       now = System.currentTimeMillis(),
@@ -706,7 +716,10 @@ class LogValidatorTest {
       timestampType = TimestampType.LOG_APPEND_TIME,
       timestampDiffMaxMs = 1000L,
       partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
-      isFromClient = true).validatedRecords, offset)
+      isFromClient = true)
+    checkOffsets(validatedResults.validatedRecords, offset)
+    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords
= 3, records,
+      compressed = true)
   }
 
   @Test(expected = classOf[InvalidRecordException])
@@ -1077,8 +1090,10 @@ class LogValidatorTest {
                                    compressed: Boolean): Unit = {
     assertNotNull("Records processing info is null", stats)
     assertEquals(numConvertedRecords, stats.numRecordsConverted)
-    if (numConvertedRecords > 0)
+    if (numConvertedRecords > 0) {
       assertTrue(s"Conversion time not recorded $stats", stats.conversionTimeNanos >=
0)
+      assertTrue(s"Conversion time not valid $stats", stats.conversionTimeNanos <= TimeUnit.MINUTES.toNanos(1))
+    }
     val originalSize = records.sizeInBytes
     val tempBytes = stats.temporaryMemoryBytes
     if (numConvertedRecords > 0 && compressed)

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.

Mime
View raw message