kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Ensure timestamp type is provided when up-converting messages
Date Thu, 02 Feb 2017 23:03:49 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk cb674e548 -> ea70a9bcf


MINOR: Ensure timestamp type is provided when up-converting messages

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #2483 from hachikuji/minor-upconvert-timestamp-safety


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ea70a9bc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ea70a9bc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ea70a9bc

Branch: refs/heads/trunk
Commit: ea70a9bcfc8a9ce317e8fa9b8cc4f7cb1aba26a9
Parents: cb674e5
Author: Jason Gustafson <jason@confluent.io>
Authored: Thu Feb 2 22:34:53 2017 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Thu Feb 2 22:34:53 2017 +0000

----------------------------------------------------------------------
 .../kafka/common/record/AbstractRecords.java    |  4 +-
 .../common/record/MemoryRecordsBuilder.java     |  3 ++
 .../org/apache/kafka/common/record/Record.java  | 24 +++++++---
 .../org/apache/kafka/common/record/Records.java |  3 +-
 .../kafka/common/record/TimestampType.java      |  3 ++
 .../kafka/common/record/FileRecordsTest.java    | 10 ++--
 .../kafka/common/record/SimpleRecordTest.java   | 12 ++++-
 .../kafka/common/record/TimestampTypeTest.java  |  5 ++
 core/src/main/scala/kafka/log/LogSegment.scala  | 10 ++--
 .../src/main/scala/kafka/log/LogValidator.scala | 20 ++++----
 .../src/main/scala/kafka/server/KafkaApis.scala | 17 +++----
 .../scala/unit/kafka/log/LogConfigTest.scala    |  2 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |  4 +-
 .../scala/unit/kafka/log/LogValidatorTest.scala | 48 ++++++++++++++------
 14 files changed, 110 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ea70a9bc/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 3a96d88..47b96e2 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -55,10 +55,10 @@ public abstract class AbstractRecords implements Records {
      * Convert this message set to use the specified message format.
      */
     @Override
-    public Records toMessageFormat(byte toMagic) {
+    public Records toMessageFormat(byte toMagic, TimestampType upconvertTimestampType) {
         List<LogEntry> converted = new ArrayList<>();
         for (LogEntry entry : deepEntries())
-            converted.add(LogEntry.create(entry.offset(), entry.record().convert(toMagic)));
+            converted.add(LogEntry.create(entry.offset(), entry.record().convert(toMagic,
upconvertTimestampType)));
 
         if (converted.isEmpty()) {
             // This indicates that the message is too large, which indicates that the buffer
is not large

http://git-wip-us.apache.org/repos/asf/kafka/blob/ea70a9bc/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 69e9003..0260f17 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -112,6 +112,9 @@ public class MemoryRecordsBuilder {
                                 long baseOffset,
                                 long logAppendTime,
                                 int writeLimit) {
+        if (magic > Record.MAGIC_VALUE_V0 && timestampType == TimestampType.NO_TIMESTAMP_TYPE)
+            throw new IllegalArgumentException("TimestampType must be set for magic >=
0");
+
         this.magic = magic;
         this.timestampType = timestampType;
         this.compressionType = compressionType;

http://git-wip-us.apache.org/repos/asf/kafka/blob/ea70a9bc/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index 0c0fa3c..9dca544 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -242,9 +242,9 @@ public final class Record {
     }
 
     /**
-     * The timestamp of the message.
-     * @return the timstamp type or {@link TimestampType#NO_TIMESTAMP_TYPE} if the magic
is 0 or the message has
-     *   been up-converted.
+     * Get the timestamp type of the record.
+     *
+     * @return The timestamp type or {@link TimestampType#NO_TIMESTAMP_TYPE} if the magic
is 0.
      */
     public TimestampType timestampType() {
         if (magic() == 0)
@@ -338,15 +338,25 @@ public final class Record {
      * Convert this record to another message format.
      *
      * @param toMagic The target magic version to convert to
+     * @param upconvertTimestampType The timestamp type to use if up-converting from magic
0, ignored if
+     *                               down-converting or if no conversion is needed
      * @return A new record instance with a freshly allocated ByteBuffer.
      */
-    public Record convert(byte toMagic) {
-        if (toMagic == magic())
+    public Record convert(byte toMagic, TimestampType upconvertTimestampType) {
+        byte magic = magic();
+        if (toMagic == magic)
             return this;
 
+        final TimestampType timestampType;
+        if (magic == Record.MAGIC_VALUE_V0) {
+            if (upconvertTimestampType == TimestampType.NO_TIMESTAMP_TYPE)
+                throw new IllegalArgumentException("Cannot up-convert using timestamp type
" + upconvertTimestampType);
+            timestampType = upconvertTimestampType;
+        } else {
+            timestampType = timestampType();
+        }
+
         ByteBuffer buffer = ByteBuffer.allocate(convertedSize(toMagic));
-        TimestampType timestampType = wrapperRecordTimestampType != null ?
-                wrapperRecordTimestampType : TimestampType.forAttributes(attributes());
         convertTo(buffer, toMagic, timestamp(), timestampType);
         buffer.rewind();
         return new Record(buffer);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ea70a9bc/clients/src/main/java/org/apache/kafka/common/record/Records.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java
index 9235f92..bdc9655 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Records.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java
@@ -83,8 +83,9 @@ public interface Records {
      * Convert all entries in this buffer to the format passed as a parameter. Note that
this requires
      * deep iteration since all of the deep records must also be converted to the desired
format.
      * @param toMagic The magic value to convert to
+     * @param upconvertTimestampType The timestamp type to use if up-converting from magic
0
      * @return A Records (which may or may not be the same instance)
      */
-    Records toMessageFormat(byte toMagic);
+    Records toMessageFormat(byte toMagic, TimestampType upconvertTimestampType);
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ea70a9bc/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java b/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
index 55c966a..182cbd1 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/TimestampType.java
@@ -34,6 +34,9 @@ public enum TimestampType {
     }
 
     public byte updateAttributes(byte attributes) {
+        if (this == NO_TIMESTAMP_TYPE)
+            throw new IllegalArgumentException("Cannot use NO_TIMESTAMP_TYPE in attributes");
+
         return this == CREATE_TIME ?
             (byte) (attributes & ~Record.TIMESTAMP_TYPE_MASK) : (byte) (attributes |
Record.TIMESTAMP_TYPE_MASK);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ea70a9bc/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index dcd3bef..d0ab427 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -299,7 +299,7 @@ public class FileRecordsTest {
         int start = fileRecords.searchForOffsetWithSize(1, 0).position;
         int size = entry.sizeInBytes();
         FileRecords slice = fileRecords.read(start, size - 1);
-        Records messageV0 = slice.toMessageFormat(Record.MAGIC_VALUE_V0);
+        Records messageV0 = slice.toMessageFormat(Record.MAGIC_VALUE_V0, TimestampType.NO_TIMESTAMP_TYPE);
         assertTrue("No message should be there", shallowEntries(messageV0).isEmpty());
         assertEquals("There should be " + (size - 1) + " bytes", size - 1, messageV0.sizeInBytes());
     }
@@ -316,7 +316,7 @@ public class FileRecordsTest {
         try (FileRecords fileRecords = FileRecords.open(tempFile())) {
             fileRecords.append(records);
             fileRecords.flush();
-            Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V1);
+            Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V1,
TimestampType.CREATE_TIME);
             verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V1);
         }
     }
@@ -332,7 +332,7 @@ public class FileRecordsTest {
         try (FileRecords fileRecords = FileRecords.open(tempFile())) {
             fileRecords.append(records);
             fileRecords.flush();
-            Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V1);
+            Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V1,
TimestampType.CREATE_TIME);
             verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V1);
         }
     }
@@ -348,7 +348,7 @@ public class FileRecordsTest {
         try (FileRecords fileRecords = FileRecords.open(tempFile())) {
             fileRecords.append(records);
             fileRecords.flush();
-            Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V0);
+            Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V0,
TimestampType.NO_TIMESTAMP_TYPE);
             verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V0);
         }
     }
@@ -364,7 +364,7 @@ public class FileRecordsTest {
         try (FileRecords fileRecords = FileRecords.open(tempFile())) {
             fileRecords.append(records);
             fileRecords.flush();
-            Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V0);
+            Records convertedRecords = fileRecords.toMessageFormat(Record.MAGIC_VALUE_V0,
TimestampType.NO_TIMESTAMP_TYPE);
             verifyConvertedMessageSet(entries, convertedRecords, Record.MAGIC_VALUE_V0);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ea70a9bc/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
index 427c743..e4c4a67 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
@@ -57,6 +57,12 @@ public class SimpleRecordTest {
         record.ensureValid();
     }
 
+    @Test(expected = IllegalArgumentException.class)
+    public void cannotUpconvertWithNoTimestampType() {
+        Record record = Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "foo".getBytes(),
"bar".getBytes());
+        record.convert(Record.MAGIC_VALUE_V1, TimestampType.NO_TIMESTAMP_TYPE);
+    }
+
     @Test
     public void testConvertFromV0ToV1() {
         byte[][] keys = new byte[][] {"a".getBytes(), "".getBytes(), null, "b".getBytes()};
@@ -64,10 +70,11 @@ public class SimpleRecordTest {
 
         for (int i = 0; i < keys.length; i++) {
             Record record = Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, keys[i],
values[i]);
-            Record converted = record.convert(Record.MAGIC_VALUE_V1);
+            Record converted = record.convert(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME);
 
             assertEquals(Record.MAGIC_VALUE_V1, converted.magic());
             assertEquals(Record.NO_TIMESTAMP, converted.timestamp());
+            assertEquals(TimestampType.CREATE_TIME, converted.timestampType());
             assertEquals(record.key(), converted.key());
             assertEquals(record.value(), converted.value());
             assertTrue(record.isValid());
@@ -82,10 +89,11 @@ public class SimpleRecordTest {
 
         for (int i = 0; i < keys.length; i++) {
             Record record = Record.create(Record.MAGIC_VALUE_V1, System.currentTimeMillis(),
keys[i], values[i]);
-            Record converted = record.convert(Record.MAGIC_VALUE_V0);
+            Record converted = record.convert(Record.MAGIC_VALUE_V0, TimestampType.NO_TIMESTAMP_TYPE);
 
             assertEquals(Record.MAGIC_VALUE_V0, converted.magic());
             assertEquals(Record.NO_TIMESTAMP, converted.timestamp());
+            assertEquals(TimestampType.NO_TIMESTAMP_TYPE, converted.timestampType());
             assertEquals(record.key(), converted.key());
             assertEquals(record.value(), converted.value());
             assertTrue(record.isValid());

http://git-wip-us.apache.org/repos/asf/kafka/blob/ea70a9bc/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java b/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java
index 4759715..e7e2a3b 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java
@@ -34,4 +34,9 @@ public class TimestampTypeTest {
         assertEquals(TimestampType.LOG_APPEND_TIME, TimestampType.forAttributes(attributes));
     }
 
+    @Test(expected = IllegalArgumentException.class)
+    public void updateAttributesNotAllowedForNoTimestampType() {
+        TimestampType.NO_TIMESTAMP_TYPE.updateAttributes((byte) 0);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ea70a9bc/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 8854c3a..15fa29a 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -98,7 +98,11 @@ class LogSegment(val log: FileRecords,
    * @param records The log entries to append.
    */
   @nonthreadsafe
-  def append(firstOffset: Long, largestOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp:
Long, records: MemoryRecords) {
+  def append(firstOffset: Long,
+             largestOffset: Long,
+             largestTimestamp: Long,
+             shallowOffsetOfMaxTimestamp: Long,
+             records: MemoryRecords) {
     if (records.sizeInBytes > 0) {
       trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at
shallow offset %d"
           .format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp,
shallowOffsetOfMaxTimestamp))
@@ -419,9 +423,9 @@ class LogSegment(val log: FileRecords,
    */
   def close() {
     CoreUtils.swallow(timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck
= true))
-    CoreUtils.swallow(index.close)
+    CoreUtils.swallow(index.close())
     CoreUtils.swallow(timeIndex.close())
-    CoreUtils.swallow(log.close)
+    CoreUtils.swallow(log.close())
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/ea70a9bc/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 224a792..45e364c 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -148,15 +148,15 @@ private[kafka] object LogValidator {
    * 3. When magic value to use is above 0, but some fields of inner messages need to be
overwritten.
    * 4. Message format conversion is needed.
    */
-  def validateMessagesAndAssignOffsetsCompressed(records: MemoryRecords,
-                                                 offsetCounter: LongRef,
-                                                 now: Long,
-                                                 sourceCodec: CompressionCodec,
-                                                 targetCodec: CompressionCodec,
-                                                 compactedTopic: Boolean = false,
-                                                 messageFormatVersion: Byte = Record.CURRENT_MAGIC_VALUE,
-                                                 messageTimestampType: TimestampType,
-                                                 messageTimestampDiffMaxMs: Long): ValidationAndOffsetAssignResult
= {
+  private def validateMessagesAndAssignOffsetsCompressed(records: MemoryRecords,
+                                                         offsetCounter: LongRef,
+                                                         now: Long,
+                                                         sourceCodec: CompressionCodec,
+                                                         targetCodec: CompressionCodec,
+                                                         compactedTopic: Boolean = false,
+                                                         messageFormatVersion: Byte = Record.CURRENT_MAGIC_VALUE,
+                                                         messageTimestampType: TimestampType,
+                                                         messageTimestampDiffMaxMs: Long):
ValidationAndOffsetAssignResult = {
     // No in place assignment situation 1 and 2
     var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion >
Record.MAGIC_VALUE_V0
 
@@ -186,7 +186,7 @@ private[kafka] object LogValidator {
       if (record.magic != messageFormatVersion)
         inPlaceAssignment = false
 
-      validatedRecords += record.convert(messageFormatVersion)
+      validatedRecords += record.convert(messageFormatVersion, messageTimestampType)
     }
 
     if (!inPlaceAssignment) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ea70a9bc/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 8a2672f..fad75ec 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol}
-import org.apache.kafka.common.record.{MemoryRecords, Record}
+import org.apache.kafka.common.record.{MemoryRecords, Record, TimestampType}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -80,7 +80,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       ApiKeys.forId(request.requestId) match {
         case ApiKeys.PRODUCE => handleProducerRequest(request)
         case ApiKeys.FETCH => handleFetchRequest(request)
-        case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
+        case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
         case ApiKeys.METADATA => handleTopicMetadataRequest(request)
         case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
         case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
@@ -481,7 +481,8 @@ class KafkaApis(val requestChannel: RequestChannel,
           val convertedData = if (versionId <= 1 && replicaManager.getMagicAndTimestampType(tp).exists(_._1
> Record.MAGIC_VALUE_V0) &&
             !data.records.hasMatchingShallowMagic(Record.MAGIC_VALUE_V0)) {
             trace(s"Down converting message to V0 for fetch request from $clientId")
-            FetchPartitionData(data.error, data.hw, data.records.toMessageFormat(Record.MAGIC_VALUE_V0))
+            val downConvertedRecords = data.records.toMessageFormat(Record.MAGIC_VALUE_V0,
TimestampType.NO_TIMESTAMP_TYPE)
+            FetchPartitionData(data.error, data.hw, downConvertedRecords)
           } else data
 
           tp -> new FetchResponse.PartitionData(convertedData.error.code, convertedData.hw,
convertedData.records)
@@ -560,20 +561,20 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * Handle an offset request
    */
-  def handleOffsetRequest(request: RequestChannel.Request) {
+  def handleListOffsetRequest(request: RequestChannel.Request) {
     val version = request.header.apiVersion()
 
     val mergedResponseMap =
       if (version == 0)
-        handleOffsetRequestV0(request)
+        handleListOffsetRequestV0(request)
       else
-        handleOffsetRequestV1(request)
+        handleListOffsetRequestV1(request)
 
     val response = new ListOffsetResponse(mergedResponseMap.asJava, version)
     requestChannel.sendResponse(new RequestChannel.Response(request, response))
   }
 
-  private def handleOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition,
ListOffsetResponse.PartitionData] = {
+  private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition,
ListOffsetResponse.PartitionData] = {
     val correlationId = request.header.correlationId
     val clientId = request.header.clientId
     val offsetRequest = request.body.asInstanceOf[ListOffsetRequest]
@@ -624,7 +625,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     responseMap ++ unauthorizedResponseStatus
   }
 
-  private def handleOffsetRequestV1(request : RequestChannel.Request): Map[TopicPartition,
ListOffsetResponse.PartitionData] = {
+  private def handleListOffsetRequestV1(request : RequestChannel.Request): Map[TopicPartition,
ListOffsetResponse.PartitionData] = {
     val correlationId = request.header.correlationId
     val clientId = request.header.clientId
     val offsetRequest = request.body.asInstanceOf[ListOffsetRequest]

http://git-wip-us.apache.org/repos/asf/kafka/blob/ea70a9bc/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 862083d..5df76bc 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -117,7 +117,7 @@ class LogConfigTest {
     p.setProperty(LogConfig.RetentionBytesProp, "100")
     LogConfig.validate(p)
     p.setProperty(LogConfig.RetentionBytesProp, "90")
-    val except = intercept[IllegalArgumentException] {
+    intercept[IllegalArgumentException] {
       LogConfig.validate(p)
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ea70a9bc/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 08cdac5..8fbeced 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -353,7 +353,7 @@ class LogTest extends JUnitSuite {
     val numMessages = 100
     val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(i.toString.getBytes))
     messageSets.foreach(log.append(_))
-    log.flush
+    log.flush()
 
     /* do successive reads to ensure all our messages are there */
     var offset = 0L
@@ -362,7 +362,7 @@ class LogTest extends JUnitSuite {
       val head = messages.iterator.next()
       assertEquals("Offsets not equal", offset, head.offset)
       assertEquals("Messages not equal at offset " + offset, messageSets(i).shallowEntries.iterator.next().record,
-        head.record.convert(messageSets(i).shallowEntries.iterator.next().record.magic))
+        head.record.convert(messageSets(i).shallowEntries.iterator.next().record.magic, TimestampType.NO_TIMESTAMP_TYPE))
       offset = head.offset + 1
     }
     val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset =
Some(numMessages + 1)).records

http://git-wip-us.apache.org/repos/asf/kafka/blob/ea70a9bc/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index b201feb..bb50497 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -80,8 +80,7 @@ class LogValidatorTest extends JUnitSuite {
   def testLogAppendTimeWithoutRecompression() {
     val now = System.currentTimeMillis()
     // The timestamps should be overwritten
-    val records = createRecords(magicValue = Record.MAGIC_VALUE_V1,
-      timestamp = 0L, codec = CompressionType.GZIP)
+    val records = createRecords(magicValue = Record.MAGIC_VALUE_V1, timestamp = 0L, codec
= CompressionType.GZIP)
     val validatedResults = LogValidator.validateMessagesAndAssignOffsets(
       records,
       offsetCounter = new LongRef(0),
@@ -107,8 +106,7 @@ class LogValidatorTest extends JUnitSuite {
   def testCreateTimeNonCompressed() {
     val now = System.currentTimeMillis()
     val timestampSeq = Seq(now - 1, now + 1, now)
-    val records =
-      MemoryRecords.withRecords(CompressionType.NONE,
+    val records = MemoryRecords.withRecords(CompressionType.NONE,
         Record.create(Record.MAGIC_VALUE_V1, timestampSeq(0), "hello".getBytes),
         Record.create(Record.MAGIC_VALUE_V1, timestampSeq(1), "there".getBytes),
         Record.create(Record.MAGIC_VALUE_V1, timestampSeq(2), "beautiful".getBytes))
@@ -125,9 +123,9 @@ class LogValidatorTest extends JUnitSuite {
 
     var i = 0
     for (logEntry <- validatedRecords.deepEntries.asScala) {
-      logEntry.record.ensureValid()
-      assertEquals(logEntry.record.timestamp, timestampSeq(i))
-      assertEquals(logEntry.record.timestampType, TimestampType.CREATE_TIME)
+      assertTrue(logEntry.record.isValid)
+      assertEquals(timestampSeq(i), logEntry.record.timestamp)
+      assertEquals(TimestampType.CREATE_TIME, logEntry.record.timestampType)
       i += 1
     }
     assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatingResults.maxTimestamp)
@@ -136,17 +134,39 @@ class LogValidatorTest extends JUnitSuite {
   }
 
   @Test
+  def testCreateTimeUpConversion() {
+    val records = createRecords(magicValue = Record.MAGIC_VALUE_V0, codec = CompressionType.GZIP)
+    val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
+        offsetCounter = new LongRef(0),
+        now = System.currentTimeMillis(),
+        sourceCodec = DefaultCompressionCodec,
+        targetCodec = DefaultCompressionCodec,
+        messageFormatVersion = Record.MAGIC_VALUE_V1,
+        messageTimestampType = TimestampType.CREATE_TIME,
+        messageTimestampDiffMaxMs = 1000L)
+    val validatedRecords = validatedResults.validatedRecords
+
+    for (logEntry <- validatedRecords.deepEntries.asScala) {
+      assertTrue(logEntry.record.isValid)
+      assertEquals(Record.NO_TIMESTAMP, logEntry.record.timestamp)
+      assertEquals(TimestampType.CREATE_TIME, logEntry.record.timestampType)
+    }
+    assertEquals(s"Max timestamp should be ${Record.NO_TIMESTAMP}", Record.NO_TIMESTAMP,
validatedResults.maxTimestamp)
+    assertEquals(s"Offset of max timestamp should be ${validatedRecords.deepEntries.asScala.size
- 1}",
+      validatedRecords.deepEntries.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp)
+    assertTrue("Message size should have been changed", validatedResults.messageSizeMaybeChanged)
+  }
+
+  @Test
   def testCreateTimeCompressed() {
     val now = System.currentTimeMillis()
     val timestampSeq = Seq(now - 1, now + 1, now)
-    val records =
-      MemoryRecords.withRecords(CompressionType.GZIP,
+    val records = MemoryRecords.withRecords(CompressionType.GZIP,
         Record.create(Record.MAGIC_VALUE_V1, timestampSeq(0), "hello".getBytes),
         Record.create(Record.MAGIC_VALUE_V1, timestampSeq(1), "there".getBytes),
         Record.create(Record.MAGIC_VALUE_V1, timestampSeq(2), "beautiful".getBytes))
 
-    val validatedResults =
-      LogValidator.validateMessagesAndAssignOffsets(records,
+    val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records,
         offsetCounter = new LongRef(0),
         now = System.currentTimeMillis(),
         sourceCodec = DefaultCompressionCodec,
@@ -158,9 +178,9 @@ class LogValidatorTest extends JUnitSuite {
 
     var i = 0
     for (logEntry <- validatedRecords.deepEntries.asScala) {
-      logEntry.record.ensureValid()
-      assertEquals(logEntry.record.timestamp, timestampSeq(i))
-      assertEquals(logEntry.record.timestampType, TimestampType.CREATE_TIME)
+      assertTrue(logEntry.record.isValid)
+      assertEquals(timestampSeq(i), logEntry.record.timestamp)
+      assertEquals(TimestampType.CREATE_TIME, logEntry.record.timestampType)
       i += 1
     }
     assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatedResults.maxTimestamp)


Mime
View raw message