kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3434; add old constructor to ConsumerRecord
Date Thu, 24 Mar 2016 05:36:42 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk de0624433 -> cb78223bf


KAFKA-3434; add old constructor to ConsumerRecord

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Grant Henke <granthenke@gmail.com>, Ismael Juma <ismael@juma.me.uk>,
Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1123 from hachikuji/KAFKA-3434


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cb78223b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cb78223b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cb78223b

Branch: refs/heads/trunk
Commit: cb78223bf90aca4f75699f36c1a82db7661a62f3
Parents: de06244
Author: Jason Gustafson <jason@confluent.io>
Authored: Wed Mar 23 22:36:19 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Wed Mar 23 22:36:19 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerRecord.java  | 29 ++++++++++++
 .../clients/consumer/internals/Fetcher.java     |  4 +-
 .../clients/consumer/ConsumerRecordTest.java    | 48 ++++++++++++++++++++
 3 files changed, 79 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cb78223b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
index 4165534..586156e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
@@ -12,6 +12,7 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.TimestampType;
 
 /**
@@ -19,6 +20,10 @@ import org.apache.kafka.common.record.TimestampType;
  * record is being received and an offset that points to the record in a Kafka partition.
  */
 public final class ConsumerRecord<K, V> {
+    public static final long NO_TIMESTAMP = Record.NO_TIMESTAMP;
+    public static final int NULL_SIZE = -1;
+    public static final int NULL_CHECKSUM = -1;
+
     private final String topic;
     private final int partition;
     private final long offset;
@@ -31,6 +36,27 @@ public final class ConsumerRecord<K, V> {
     private final V value;
 
     /**
+     * Creates a record to be received from a specified topic and partition (provided for
+     * compatibility with Kafka 0.9 before the message format supported timestamps and before
+     * serialized metadata were exposed).
+     *
+     * @param topic The topic this record is received from
+     * @param partition The partition of the topic this record is received from
+     * @param offset The offset of this record in the corresponding Kafka partition
+     * @param key The key of the record, if one exists (null is allowed)
+     * @param value The record contents
+     */
+    public ConsumerRecord(String topic,
+                          int partition,
+                          long offset,
+                          K key,
+                          V value) {
+        this(topic, partition, offset, NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
+                NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, key, value);
+    }
+
+
+    /**
      * Creates a record to be received from a specified topic and partition
      *
      * @param topic The topic this record is received from
@@ -38,6 +64,9 @@ public final class ConsumerRecord<K, V> {
      * @param offset The offset of this record in the corresponding Kafka partition
      * @param timestamp The timestamp of the record.
      * @param timestampType The timestamp type
+     * @param checksum The checksum (CRC32) of the full record
+     * @param serializedKeySize The length of the serialized key
+     * @param serializedValueSize The length of the serialized value
      * @param key The key of the record, if one exists (null is allowed)
      * @param value The record contents
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb78223b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 802a2f0..9a26551 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -653,8 +653,8 @@ public class Fetcher<K, V> {
 
             return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
                                         timestamp, timestampType, logEntry.record().checksum(),
-                                        keyByteArray == null ? -1 : keyByteArray.length,
-                                        valueByteArray == null ? -1 : valueByteArray.length,
+                                        keyByteArray == null ? ConsumerRecord.NULL_SIZE :
keyByteArray.length,
+                                        valueByteArray == null ? ConsumerRecord.NULL_SIZE
: valueByteArray.length,
                                         key, value);
         } catch (KafkaException e) {
             throw e;

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb78223b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
new file mode 100644
index 0000000..d1d3b24
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.record.TimestampType;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ConsumerRecordTest {
+
+    @Test
+    public void testOldConstructor() {
+        String topic = "topic";
+        int partition = 0;
+        long offset = 23;
+        String key = "key";
+        String value = "value";
+
+        ConsumerRecord record = new ConsumerRecord(topic, partition, offset, key, value);
+        assertEquals(topic, record.topic());
+        assertEquals(partition, record.partition());
+        assertEquals(offset, record.offset());
+        assertEquals(key, record.key());
+        assertEquals(value, record.value());
+        assertEquals(TimestampType.NO_TIMESTAMP_TYPE, record.timestampType());
+        assertEquals(ConsumerRecord.NO_TIMESTAMP, record.timestamp());
+        assertEquals(ConsumerRecord.NULL_CHECKSUM, record.checksum());
+        assertEquals(ConsumerRecord.NULL_SIZE, record.serializedKeySize());
+        assertEquals(ConsumerRecord.NULL_SIZE, record.serializedValueSize());
+    }
+
+
+}


Mime
View raw message