kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-1797; (missed parametric in a few files) add the serializer/deserializer api to the new java client; patched by Jun Rao
Date Tue, 13 Jan 2015 05:45:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.8.2 8d5d45904 -> 828b808f9


kafka-1797; (missed parametric in a few files) add the serializer/deserializer api to the
new java client; patched by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/828b808f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/828b808f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/828b808f

Branch: refs/heads/0.8.2
Commit: 828b808f98afcd0c670d144af07dc14fcf743edc
Parents: 8d5d459
Author: Jun Rao <junrao@gmail.com>
Authored: Mon Jan 12 21:21:16 2015 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Jan 12 21:37:00 2015 -0800

----------------------------------------------------------------------
 .../kafka/clients/producer/MockProducer.java    |  6 ++---
 .../clients/tools/ProducerPerformance.java      |  4 +--
 .../clients/producer/MockProducerTest.java      |  9 +++----
 .../kafka/clients/producer/PartitionerTest.java | 27 +++++++++-----------
 .../kafka/api/ProducerFailureHandlingTest.scala |  2 +-
 5 files changed, 21 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/828b808f/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index c0f1d57..00d1300 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -40,7 +40,7 @@ import org.apache.kafka.common.TopicPartition;
  * By default this mock will synchronously complete each send call successfully. However
it can be configured to allow
  * the user to control the completion of the call and supply an optional error for the producer
to throw.
  */
-public class MockProducer implements Producer {
+public class MockProducer implements Producer<byte[],byte[]> {
 
     private final Cluster cluster;
     private final Partitioner partitioner = new Partitioner();
@@ -90,7 +90,7 @@ public class MockProducer implements Producer {
      * @see #history()
      */
     @Override
-    public synchronized Future<RecordMetadata> send(ProducerRecord record) {
+    public synchronized Future<RecordMetadata> send(ProducerRecord<byte[], byte[]>
record) {
         return send(record, null);
     }
 
@@ -100,7 +100,7 @@ public class MockProducer implements Producer {
      * @see #history()
      */
     @Override
-    public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback
callback) {
+    public synchronized Future<RecordMetadata> send(ProducerRecord<byte[],byte[]>
record, Callback callback) {
         int partition = 0;
         if (this.cluster.partitionsForTopic(record.topic()) != null)
             partition = partitioner.partition(record, this.cluster);

http://git-wip-us.apache.org/repos/asf/kafka/blob/828b808f/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
index ac86150..3a8d5f4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
+++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
@@ -46,12 +46,12 @@ public class ProducerPerformance {
                 throw new IllegalArgumentException("Invalid property: " + args[i]);
             props.put(pieces[0], pieces[1]);
         }
-        KafkaProducer producer = new KafkaProducer(props);
+        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props);
 
         /* setup perf test */
         byte[] payload = new byte[recordSize];
         Arrays.fill(payload, (byte) 1);
-        ProducerRecord record = new ProducerRecord(topicName, payload);
+        ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicName,
payload);
         long sleepTime = NS_PER_SEC / throughput;
         long sleepDeficitNs = 0;
         Stats stats = new Stats(numRecords, 5000);

http://git-wip-us.apache.org/repos/asf/kafka/blob/828b808f/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 9a9411f..3676b05 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -25,9 +25,6 @@ import static org.junit.Assert.fail;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
-import org.apache.kafka.clients.producer.MockProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.junit.Test;
 
 public class MockProducerTest {
@@ -37,7 +34,7 @@ public class MockProducerTest {
     @Test
     public void testAutoCompleteMock() throws Exception {
         MockProducer producer = new MockProducer(true);
-        ProducerRecord record = new ProducerRecord(topic, "key".getBytes(), "value".getBytes());
+        ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topic,
"key".getBytes(), "value".getBytes());
         Future<RecordMetadata> metadata = producer.send(record);
         assertTrue("Send should be immediately complete", metadata.isDone());
         assertFalse("Send should be successful", isError(metadata));
@@ -51,8 +48,8 @@ public class MockProducerTest {
     @Test
     public void testManualCompletion() throws Exception {
         MockProducer producer = new MockProducer(false);
-        ProducerRecord record1 = new ProducerRecord("topic", "key1".getBytes(), "value1".getBytes());
-        ProducerRecord record2 = new ProducerRecord("topic", "key2".getBytes(), "value2".getBytes());
+        ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<byte[], byte[]>("topic",
"key1".getBytes(), "value1".getBytes());
+        ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<byte[], byte[]>("topic",
"key2".getBytes(), "value2".getBytes());
         Future<RecordMetadata> md1 = producer.send(record1);
         assertFalse("Send shouldn't have completed", md1.isDone());
         Future<RecordMetadata> md2 = producer.send(record2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/828b808f/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
index f06e28c..1002f05 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
@@ -16,21 +16,18 @@
  */
 package org.apache.kafka.clients.producer;
 
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
-
-import java.util.List;
-
-
-import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.internals.Partitioner;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.junit.Test;
 
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 public class PartitionerTest {
 
     private byte[] key = "key".getBytes();
@@ -50,22 +47,22 @@ public class PartitionerTest {
     public void testUserSuppliedPartitioning() {
         assertEquals("If the user supplies a partition we should use it.",
                      0,
-                     partitioner.partition(new ProducerRecord("test", 0, key, value), cluster));
+                     partitioner.partition(new ProducerRecord<byte[], byte[]>("test",
0, key, value), cluster));
     }
 
     @Test
     public void testKeyPartitionIsStable() {
-        int partition = partitioner.partition(new ProducerRecord("test", key, value), cluster);
+        int partition = partitioner.partition(new ProducerRecord<byte[], byte[]>("test",
key, value), cluster);
         assertEquals("Same key should yield same partition",
                      partition,
-                     partitioner.partition(new ProducerRecord("test", key, "value2".getBytes()),
cluster));
+                     partitioner.partition(new ProducerRecord<byte[], byte[]>("test",
key, "value2".getBytes()), cluster));
     }
 
     @Test
     public void testRoundRobinIsStable() {
-        int startPart = partitioner.partition(new ProducerRecord("test", value), cluster);
+        int startPart = partitioner.partition(new ProducerRecord<byte[], byte[]>("test",
value), cluster);
         for (int i = 1; i <= 100; i++) {
-            int partition = partitioner.partition(new ProducerRecord("test", value), cluster);
+            int partition = partitioner.partition(new ProducerRecord<byte[], byte[]>("test",
value), cluster);
             assertEquals("Should yield a different partition each call with round-robin partitioner",
                 partition, (startPart + i) % 2);
       }
@@ -74,7 +71,7 @@ public class PartitionerTest {
     @Test
     public void testRoundRobinWithDownNode() {
         for (int i = 0; i < partitions.size(); i++) {
-            int part = partitioner.partition(new ProducerRecord("test", value), cluster);
+            int part = partitioner.partition(new ProducerRecord<byte[], byte[]>("test",
value), cluster);
             assertTrue("We should never choose a leader-less node in round robin", part >=
0 && part < 2);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/828b808f/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index a890316..07a7bee 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -299,7 +299,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
 
   @Test(expected = classOf[InvalidTopicException])
   def testCannotSendToInternalTopic() {
-    producer1.send(new ProducerRecord(Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get
+    producer1.send(new ProducerRecord[Array[Byte],Array[Byte]](Topic.InternalTopics.head,
"test".getBytes, "test".getBytes)).get
   }
 
   @Test


Mime
View raw message