kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-1797; add the serializer/deserializer api to the new java client; patched by Jun Rao; reviewed by Neha Narkhede
Date Thu, 18 Dec 2014 00:14:45 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.8.2 bd212b7a9 -> 03ad24822


kafka-1797; add the serializer/deserializer api to the new java client; patched by Jun Rao; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/03ad2482
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/03ad2482
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/03ad2482

Branch: refs/heads/0.8.2
Commit: 03ad248226a4ffc7236ac4ffeeb3a4d125d92efe
Parents: bd212b7
Author: Jun Rao <junrao@gmail.com>
Authored: Wed Dec 17 16:14:35 2014 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Dec 17 16:14:35 2014 -0800

----------------------------------------------------------------------
 .../clients/consumer/ByteArrayDeserializer.java |  34 ++++++
 .../apache/kafka/clients/consumer/Consumer.java |   4 +-
 .../kafka/clients/consumer/ConsumerConfig.java  |  14 ++-
 .../consumer/ConsumerRebalanceCallback.java     |   4 +-
 .../kafka/clients/consumer/ConsumerRecord.java  |  16 +--
 .../kafka/clients/consumer/ConsumerRecords.java |  14 +--
 .../kafka/clients/consumer/Deserializer.java    |  38 ++++++
 .../kafka/clients/consumer/KafkaConsumer.java   | 122 ++++++++++++-------
 .../kafka/clients/consumer/MockConsumer.java    |   6 +-
 .../clients/producer/ByteArraySerializer.java   |  34 ++++++
 .../kafka/clients/producer/KafkaProducer.java   |  68 +++++++++--
 .../apache/kafka/clients/producer/Producer.java |   6 +-
 .../kafka/clients/producer/ProducerConfig.java  |  15 ++-
 .../kafka/clients/producer/ProducerRecord.java  |  20 +--
 .../kafka/clients/producer/Serializer.java      |  38 ++++++
 .../clients/producer/internals/Partitioner.java |   2 +-
 .../common/errors/DeserializationException.java |  47 +++++++
 .../common/errors/SerializationException.java   |  46 +++++++
 .../scala/kafka/producer/BaseProducer.scala     |   4 +-
 .../kafka/producer/KafkaLog4jAppender.scala     |   6 +-
 .../main/scala/kafka/tools/MirrorMaker.scala    |  21 ++--
 .../scala/kafka/tools/ReplayLogProducer.scala   |   4 +-
 .../scala/kafka/tools/TestEndToEndLatency.scala |   4 +-
 .../scala/kafka/tools/TestLogCleaning.scala     |   6 +-
 .../kafka/api/ProducerCompressionTest.scala     |   4 +-
 .../kafka/api/ProducerFailureHandlingTest.scala |  32 +++--
 .../kafka/api/ProducerSendTest.scala            |  16 +--
 .../test/scala/unit/kafka/utils/TestUtils.scala |   4 +-
 28 files changed, 484 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java
new file mode 100644
index 0000000..514cbd2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import java.util.Map;
+
+public class ByteArrayDeserializer implements Deserializer<byte[]> {
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        // nothing to do
+    }
+
+    @Override
+    public byte[] deserialize(String topic, byte[] data, boolean isKey) {
+        return data;
+    }
+
+    @Override
+    public void close() {
+        // nothing to do
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 227f564..1bce501 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.TopicPartition;
  * @see KafkaConsumer
  * @see MockConsumer
  */
-public interface Consumer extends Closeable {
+public interface Consumer<K,V> extends Closeable {
 
     /**
      * Incrementally subscribe to the given list of topics. This API is mutually exclusive to 
@@ -63,7 +63,7 @@ public interface Consumer extends Closeable {
      *         of data is controlled by {@link ConsumerConfig#FETCH_MIN_BYTES_CONFIG} and {@link ConsumerConfig#FETCH_MAX_WAIT_MS_CONFIG}.
      *         If no data is available for timeout ms, returns an empty list
      */
-    public Map<String, ConsumerRecords> poll(long timeout);
+    public Map<String, ConsumerRecords<K,V>> poll(long timeout);
 
     /**
      * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 46efc0c..1d64f08 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -151,6 +151,14 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
     private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows " + "plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
 
+    /** <code>key.deserializer</code> */
+    public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
+    private static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the <code>Deserializer</code> interface.";
+
+    /** <code>value.deserializer</code> */
+    public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
+    private static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>Deserializer</code> interface.";
+
     static {
         /* TODO: add config docs */
         config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, "blah blah")
@@ -176,8 +184,10 @@ public class ConsumerConfig extends AbstractConfig {
                                         Importance.LOW,
                                         METRICS_SAMPLE_WINDOW_MS_DOC)
                                 .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC)
-                                .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC);
-                                
+                                .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC)
+                                .define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.consumer.ByteArrayDeserializer", Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC)
+                                .define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.consumer.ByteArrayDeserializer", Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC);
+
     }
 
     ConsumerConfig(Map<? extends Object, ? extends Object> props) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
index f026ae4..e4cf7d1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
@@ -35,7 +35,7 @@ public interface ConsumerRebalanceCallback {
      * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}  
      * @param partitions The list of partitions that are assigned to the consumer after rebalance
      */
-    public void onPartitionsAssigned(Consumer consumer, Collection<TopicPartition> partitions);
+    public void onPartitionsAssigned(Consumer<?,?> consumer, Collection<TopicPartition> partitions);
     
     /**
      * A callback method the user can implement to provide handling of offset commits to a customized store on the 
@@ -46,5 +46,5 @@ public interface ConsumerRebalanceCallback {
      * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}  
      * @param partitions The list of partitions that were assigned to the consumer on the last rebalance
      */
-    public void onPartitionsRevoked(Consumer consumer, Collection<TopicPartition> partitions);
+    public void onPartitionsRevoked(Consumer<?,?> consumer, Collection<TopicPartition> partitions);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
index 436d8a4..16af70a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
@@ -18,10 +18,10 @@ import org.apache.kafka.common.TopicPartition;
  * A key/value pair to be received from Kafka. This consists of a topic name and a partition number, from which the 
  * record is being received and an offset that points to the record in a Kafka partition. 
  */
-public final class ConsumerRecord {
+public final class ConsumerRecord<K,V> {
     private final TopicPartition partition; 
-    private final byte[] key;
-    private final byte[] value;
+    private final K key;
+    private final V value;
     private final long offset;
     private volatile Exception error;
     
@@ -34,7 +34,7 @@ public final class ConsumerRecord {
      * @param value     The record contents
      * @param offset    The offset of this record in the corresponding Kafka partition
      */
-    public ConsumerRecord(String topic, int partitionId, byte[] key, byte[] value, long offset) {
+    public ConsumerRecord(String topic, int partitionId, K key, V value, long offset) {
         this(topic, partitionId, key, value, offset, null);
     }
 
@@ -46,7 +46,7 @@ public final class ConsumerRecord {
      * @param value The record contents
      * @param offset The offset of this record in the corresponding Kafka partition
      */
-    public ConsumerRecord(String topic, int partitionId, byte[] value, long offset) {
+    public ConsumerRecord(String topic, int partitionId, V value, long offset) {
         this(topic, partitionId, null, value, offset);
     }
 
@@ -60,7 +60,7 @@ public final class ConsumerRecord {
         this(topic, partitionId, null, null, -1L, error);
     }
     
-    private ConsumerRecord(String topic, int partitionId, byte[] key, byte[] value, long offset, Exception error) {
+    private ConsumerRecord(String topic, int partitionId, K key, V value, long offset, Exception error) {
         if (topic == null)
             throw new IllegalArgumentException("Topic cannot be null");
         this.partition = new TopicPartition(topic, partitionId);
@@ -95,7 +95,7 @@ public final class ConsumerRecord {
      * The key (or null if no key is specified)
      * @throws Exception The exception thrown while fetching this record.
      */
-    public byte[] key() throws Exception {
+    public K key() throws Exception {
         if (this.error != null)
             throw this.error;
         return key;
@@ -105,7 +105,7 @@ public final class ConsumerRecord {
      * The value
      * @throws Exception The exception thrown while fetching this record.
      */
-    public byte[] value() throws Exception {
+    public V value() throws Exception {
         if (this.error != null)
             throw this.error;
         return value;

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
index 2ecfc8a..bdf4b26 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -21,12 +21,12 @@ import java.util.Map.Entry;
  * A container that holds the list {@link ConsumerRecord} per partition for a particular topic. There is one for every topic returned by a 
  * {@link Consumer#poll(long)} operation. 
  */
-public class ConsumerRecords {
+public class ConsumerRecords<K,V> {
 
     private final String topic;
-    private final Map<Integer, List<ConsumerRecord>> recordsPerPartition;
+    private final Map<Integer, List<ConsumerRecord<K,V>>> recordsPerPartition;
     
-    public ConsumerRecords(String topic, Map<Integer, List<ConsumerRecord>> records) {
+    public ConsumerRecords(String topic, Map<Integer, List<ConsumerRecord<K,V>>> records) {
         this.topic = topic;
         this.recordsPerPartition = records;
     }
@@ -36,16 +36,16 @@ public class ConsumerRecords {
      * specified, returns records for all partitions
      * @return The list of {@link ConsumerRecord}s associated with the given partitions.
      */
-    public List<ConsumerRecord> records(int... partitions) {
-        List<ConsumerRecord> recordsToReturn = new ArrayList<ConsumerRecord>(); 
+    public List<ConsumerRecord<K,V>> records(int... partitions) {
+        List<ConsumerRecord<K,V>> recordsToReturn = new ArrayList<ConsumerRecord<K,V>>();
         if(partitions.length == 0) {
             // return records for all partitions
-            for(Entry<Integer, List<ConsumerRecord>> record : recordsPerPartition.entrySet()) {
+            for(Entry<Integer, List<ConsumerRecord<K,V>>> record : recordsPerPartition.entrySet()) {
                 recordsToReturn.addAll(record.getValue());
             }
         } else {
            for(int partition : partitions) {
-               List<ConsumerRecord> recordsForThisPartition = recordsPerPartition.get(partition);
+               List<ConsumerRecord<K,V>> recordsForThisPartition = recordsPerPartition.get(partition);
                recordsToReturn.addAll(recordsForThisPartition);
            }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java
new file mode 100644
index 0000000..c774a19
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.Configurable;
+
+/**
+ *
+ * @param <T> Type to be deserialized into.
+ *
+ * A class that implements this interface is expected to have a constructor with no parameter.
+ */
+public interface Deserializer<T> extends Configurable {
+    /**
+     *
+     * @param topic Topic associated with the data
+     * @param data Serialized bytes
+     * @param isKey Is data for key or value
+     * @return
+     */
+    public T deserialize(String topic, byte[] data, boolean isKey);
+
+    /**
+     * Close this deserializer
+     */
+    public void close();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index fe93afa..fe90663 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -12,19 +12,6 @@
 */
 package org.apache.kafka.clients.consumer;
 
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.Map.Entry;
-import java.util.concurrent.Future;
-
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.JmxReporter;
@@ -36,6 +23,9 @@ import org.apache.kafka.common.utils.SystemTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.InetSocketAddress;
+import java.util.*;
+
 /**
  * A Kafka client that consumes records from a Kafka cluster.
  * <P>
@@ -50,12 +40,12 @@ import org.slf4j.LoggerFactory;
  * a convenience method to demonstrate the different use cases of the consumer APIs. Here is a sample implementation of such a process() method.
  * <pre>
  * {@code
- * private Map<TopicPartition, Long> process(Map<String, ConsumerRecords> records) {
+ * private Map<TopicPartition, Long> process(Map<String, ConsumerRecord<byte[], byte[]> records) {
  *     Map<TopicPartition, Long> processedOffsets = new HashMap<TopicPartition, Long>();
- *     for(Entry<String, ConsumerRecords> recordMetadata : records.entrySet()) {
- *          List<ConsumerRecord> recordsPerTopic = recordMetadata.getValue().records();
+ *     for(Entry<String, ConsumerRecords<byte[], byte[]>> recordMetadata : records.entrySet()) {
+ *          List<ConsumerRecord<byte[], byte[]>> recordsPerTopic = recordMetadata.getValue().records();
  *          for(int i = 0;i < recordsPerTopic.size();i++) {
- *               ConsumerRecord record = recordsPerTopic.get(i);
+ *               ConsumerRecord<byte[], byte[]> record = recordsPerTopic.get(i);
  *               // process record
  *               try {
  *               	processedOffsets.put(record.topicAndpartition(), record.offset());
@@ -80,11 +70,11 @@ import org.slf4j.LoggerFactory;
  * props.put("session.timeout.ms", "1000");
  * props.put("enable.auto.commit", "true");
  * props.put("auto.commit.interval.ms", "10000");
- * KafkaConsumer consumer = new KafkaConsumer(props);
+ * KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
  * consumer.subscribe("foo", "bar");
  * boolean isRunning = true;
  * while(isRunning) {
- *   Map<String, ConsumerRecords> records = consumer.poll(100);
+ *   Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
  *   process(records);
  * }
  * consumer.close();
@@ -102,14 +92,14 @@ import org.slf4j.LoggerFactory;
  * props.put("group.id", "test");
  * props.put("session.timeout.ms", "1000");
  * props.put("enable.auto.commit", "false");
- * KafkaConsumer consumer = new KafkaConsumer(props);
+ * KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
  * consumer.subscribe("foo", "bar");
  * int commitInterval = 100;
  * int numRecords = 0;
  * boolean isRunning = true;
  * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
  * while(isRunning) {
- *     Map<String, ConsumerRecords> records = consumer.poll(100);
+ *     Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
  *     try {
  *         Map<TopicPartition, Long> lastConsumedOffsets = process(records);
  *         consumedOffsets.putAll(lastConsumedOffsets);
@@ -156,16 +146,17 @@ import org.slf4j.LoggerFactory;
  * props.put("group.id", "test");
  * props.put("session.timeout.ms", "1000");
  * props.put("enable.auto.commit", "false");
- * KafkaConsumer consumer = new KafkaConsumer(props,
+ * KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
+ *                                            props,
  *                                            new ConsumerRebalanceCallback() {
  *                                                boolean rewindOffsets = true;  // should be retrieved from external application config
- *                                                public void onPartitionsAssigned(Consumer consumer, Collection<TopicPartition> partitions) {
+ *                                                public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
  *                                                    Map<TopicPartition, Long> latestCommittedOffsets = consumer.committed(partitions);
  *                                                    if(rewindOffsets)
  *                                                        Map<TopicPartition, Long> newOffsets = rewindOffsets(latestCommittedOffsets, 100);
  *                                                    consumer.seek(newOffsets);
  *                                                }
- *                                                public void onPartitionsRevoked(Consumer consumer, Collection<TopicPartition> partitions) {
+ *                                                public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
  *                                                    consumer.commit(true);
  *                                                }
  *                                                // this API rewinds every partition back by numberOfMessagesToRewindBackTo messages 
@@ -183,7 +174,7 @@ import org.slf4j.LoggerFactory;
  * boolean isRunning = true;
  * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
  * while(isRunning) {
- *     Map<String, ConsumerRecords> records = consumer.poll(100);
+ *     Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
  *     Map<TopicPartition, Long> lastConsumedOffsets = process(records);
  *     consumedOffsets.putAll(lastConsumedOffsets);
  *     numRecords += records.size();
@@ -211,13 +202,14 @@ import org.slf4j.LoggerFactory;
  * props.put("group.id", "test");
  * props.put("session.timeout.ms", "1000");
  * props.put("enable.auto.commit", "false"); // since enable.auto.commit only applies to Kafka based offset storage
- * KafkaConsumer consumer = new KafkaConsumer(props,
+ * KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
+ *                                            props,
  *                                            new ConsumerRebalanceCallback() {
- *                                                public void onPartitionsAssigned(Consumer consumer, Collection<TopicPartition> partitions) {
+ *                                                public void onPartitionsAssigned(Consumer<?,?> consumer, Collection<TopicPartition> partitions) {
  *                                                    Map<TopicPartition, Long> lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(partitions);
  *                                                    consumer.seek(lastCommittedOffsets);
  *                                                }
- *                                                public void onPartitionsRevoked(Consumer consumer, Collection<TopicPartition> partitions) {
+ *                                                public void onPartitionsRevoked(Consumer<?,?> consumer, Collection<TopicPartition> partitions) {
  *                                                    Map<TopicPartition, Long> offsets = getLastConsumedOffsets(partitions);
  *                                                    commitOffsetsToCustomStore(offsets); 
  *                                                }
@@ -234,7 +226,7 @@ import org.slf4j.LoggerFactory;
  * boolean isRunning = true;
  * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
  * while(isRunning) {
- *     Map<String, ConsumerRecords> records = consumer.poll(100);
+ *     Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
  *     Map<TopicPartition, Long> lastConsumedOffsets = process(records);
  *     consumedOffsets.putAll(lastConsumedOffsets);
  *     numRecords += records.size();
@@ -259,7 +251,7 @@ import org.slf4j.LoggerFactory;
  * props.put("group.id", "test");
  * props.put("enable.auto.commit", "true");
  * props.put("auto.commit.interval.ms", "10000");
- * KafkaConsumer consumer = new KafkaConsumer(props);
+ * KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
  * // subscribe to some partitions of topic foo
  * TopicPartition partition0 = new TopicPartition("foo", 0);
  * TopicPartition partition1 = new TopicPartition("foo", 1);
@@ -276,7 +268,7 @@ import org.slf4j.LoggerFactory;
  * boolean isRunning = true;
  * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
  * while(isRunning) {
- *     Map<String, ConsumerRecords> records = consumer.poll(100);
+ *     Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
  *     Map<TopicPartition, Long> lastConsumedOffsets = process(records);
  *     consumedOffsets.putAll(lastConsumedOffsets);
  *     for(TopicPartition partition : partitions) {
@@ -298,7 +290,7 @@ import org.slf4j.LoggerFactory;
  * {@code  
  * Properties props = new Properties();
  * props.put("metadata.broker.list", "localhost:9092");
- * KafkaConsumer consumer = new KafkaConsumer(props);
+ * KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
  * // subscribe to some partitions of topic foo
  * TopicPartition partition0 = new TopicPartition("foo", 0);
  * TopicPartition partition1 = new TopicPartition("foo", 1);
@@ -314,7 +306,7 @@ import org.slf4j.LoggerFactory;
  * boolean isRunning = true;
  * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
  * while(isRunning) {
- *     Map<String, ConsumerRecords> records = consumer.poll(100);
+ *     Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
  *     Map<TopicPartition, Long> lastConsumedOffsets = process(records);
  *     consumedOffsets.putAll(lastConsumedOffsets);
  *     // commit offsets for partitions 0,1 for topic foo to custom store
@@ -331,7 +323,7 @@ import org.slf4j.LoggerFactory;
  * }
  * </pre>
  */
-public class KafkaConsumer implements Consumer {
+public class KafkaConsumer<K,V> implements Consumer<K,V> {
 
     private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
 
@@ -340,7 +332,9 @@ public class KafkaConsumer implements Consumer {
     private final Metrics metrics;
     private final Set<String> subscribedTopics;
     private final Set<TopicPartition> subscribedPartitions;
-    
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+
     /**
      * A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
      * are documented <a href="http://kafka.apache.org/documentation.html#consumerconfigs">here</a>. Values can be
@@ -351,7 +345,7 @@ public class KafkaConsumer implements Consumer {
      * @param configs   The consumer configs
      */
     public KafkaConsumer(Map<String, Object> configs) {
-        this(new ConsumerConfig(configs), null);
+        this(new ConsumerConfig(configs), null, null, null);
     }
 
     /**
@@ -364,7 +358,24 @@ public class KafkaConsumer implements Consumer {
      *                  every rebalance operation.  
      */
     public KafkaConsumer(Map<String, Object> configs, ConsumerRebalanceCallback callback) {
-        this(new ConsumerConfig(configs), callback);
+        this(new ConsumerConfig(configs), callback, null, null);
+    }
+
+    /**
+     * A consumer is instantiated by providing a set of key-value pairs as configuration, a {@link ConsumerRebalanceCallback}
+     * implementation, a key and a value {@link Deserializer}.
+     * <p>
+     * Valid configuration strings are documented at {@link ConsumerConfig}
+     * @param configs   The consumer configs
+     * @param callback  A callback interface that the user can implement to manage customized offsets on the start and end of
+     *                  every rebalance operation.
+     * @param keyDeserializer  The deserializer for key that implements {@link Deserializer}. The configure() method won't
+     *                         be called when the deserializer is passed in directly.
+     * @param valueDeserializer  The deserializer for value that implements {@link Deserializer}. The configure() method
+     *                           won't be called when the deserializer is passed in directly.
+     */
+    public KafkaConsumer(Map<String, Object> configs, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
+        this(new ConsumerConfig(configs), callback, keyDeserializer, valueDeserializer);
     }
 
     /**
@@ -372,7 +383,7 @@ public class KafkaConsumer implements Consumer {
      * Valid configuration strings are documented at {@link ConsumerConfig}
      */
     public KafkaConsumer(Properties properties) {
-        this(new ConsumerConfig(properties), null);
+        this(new ConsumerConfig(properties), null, null, null);
     }
 
     /**
@@ -385,14 +396,27 @@ public class KafkaConsumer implements Consumer {
      *                   every rebalance operation.  
      */
     public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback) {
-        this(new ConsumerConfig(properties), callback);
+        this(new ConsumerConfig(properties), callback, null, null);
     }
 
-    private KafkaConsumer(ConsumerConfig config) {
-        this(config, null);
+    /**
+     * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration and a
+     * {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}.
+     * <p>
+     * Valid configuration strings are documented at {@link ConsumerConfig}
+     * @param properties The consumer configuration properties
+     * @param callback   A callback interface that the user can implement to manage customized offsets on the start and end of
+     *                   every rebalance operation.
+     * @param keyDeserializer  The deserializer for key that implements {@link Deserializer}. The configure() method won't
+     *                         be called when the deserializer is passed in directly.
+     * @param valueDeserializer  The deserializer for value that implements {@link Deserializer}. The configure() method
+     *                           won't be called when the deserializer is passed in directly.
+     */
+    public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
+        this(new ConsumerConfig(properties), callback, keyDeserializer, valueDeserializer);
     }
 
-    private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback) {
+    private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
         log.trace("Starting the Kafka consumer");
         subscribedTopics = new HashSet<String>();
         subscribedPartitions = new HashSet<TopicPartition>();
@@ -402,6 +426,18 @@ public class KafkaConsumer implements Consumer {
         this.metadataFetchTimeoutMs = config.getLong(ConsumerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
         this.totalMemorySize = config.getLong(ConsumerConfig.TOTAL_BUFFER_MEMORY_CONFIG);
         List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+
+        if (keyDeserializer == null)
+            this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                                                                Deserializer.class);
+        else
+            this.keyDeserializer = keyDeserializer;
+        if (valueDeserializer == null)
+            this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                                                                  Deserializer.class);
+        else
+            this.valueDeserializer = valueDeserializer;
+
         config.logUnused();
         log.debug("Kafka consumer started");
     }
@@ -488,7 +524,7 @@ public class KafkaConsumer implements Consumer {
      * @return map of topic to records since the last fetch for the subscribed list of topics and partitions
      */
     @Override
-    public Map<String, ConsumerRecords> poll(long timeout) {
+    public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
         // TODO Auto-generated method stub
         return null;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index c3aad3b..8cab16c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -33,7 +33,7 @@ import org.apache.kafka.common.TopicPartition;
  * The consumer runs in the user thread and multiplexes I/O over TCP connections to each of the brokers it
  * needs to communicate with. Failure to close the consumer after use will leak these resources.
  */
-public class MockConsumer implements Consumer {
+public class MockConsumer implements Consumer<byte[], byte[]> {
 
     private final Set<TopicPartition> subscribedPartitions;
     private final Set<String> subscribedTopics;
@@ -90,10 +90,10 @@ public class MockConsumer implements Consumer {
     }
 
     @Override
-    public Map<String, ConsumerRecords> poll(long timeout) {
+    public Map<String, ConsumerRecords<byte[], byte[]>> poll(long timeout) {
         // hand out one dummy record, 1 per topic
         Map<String, List<ConsumerRecord>> records = new HashMap<String, List<ConsumerRecord>>();
-        Map<String, ConsumerRecords> recordMetadata = new HashMap<String, ConsumerRecords>();
+        Map<String, ConsumerRecords<byte[], byte[]>> recordMetadata = new HashMap<String, ConsumerRecords<byte[], byte[]>>();
         for(TopicPartition partition : subscribedPartitions) {
             // get the last consumed offset
             long messageSequence = consumedOffsets.get(partition);

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java b/clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java
new file mode 100644
index 0000000..9005b74
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer;
+
+import java.util.Map;
+
+public class ByteArraySerializer implements Serializer<byte[]> {
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        // nothing to do
+    }
+
+    @Override
+    public byte[] serialize(String topic, byte[] data, boolean isKey) {
+        return data;
+    }
+
+    @Override
+    public void close() {
+        // nothing to do
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 32f444e..d859fc5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory;
  * The producer manages a single background thread that does I/O as well as a TCP connection to each of the brokers it
  * needs to communicate with. Failure to close the producer after use will leak these resources.
  */
-public class KafkaProducer implements Producer {
+public class KafkaProducer<K,V> implements Producer<K,V> {
 
     private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
 
@@ -75,26 +75,59 @@ public class KafkaProducer implements Producer {
     private final CompressionType compressionType;
     private final Sensor errors;
     private final Time time;
+    private final Serializer<K> keySerializer;
+    private final Serializer<V> valueSerializer;
 
     /**
      * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
      * are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>. Values can be
      * either strings or Objects of the appropriate type (for example a numeric configuration would accept either the
      * string "42" or the integer 42).
+     * @param configs   The producer configs
+     *
      */
     public KafkaProducer(Map<String, Object> configs) {
-        this(new ProducerConfig(configs));
+        this(new ProducerConfig(configs), null, null);
+    }
+
+    /**
+     * A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value {@link Serializer}.
+     * Valid configuration strings are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
+     * Values can be either strings or Objects of the appropriate type (for example a numeric configuration would accept
+     * either the string "42" or the integer 42).
+     * @param configs   The producer configs
+     * @param keySerializer  The serializer for key that implements {@link Serializer}. The configure() method won't be
+     *                       called when the serializer is passed in directly.
+     * @param valueSerializer  The serializer for value that implements {@link Serializer}. The configure() method won't
+     *                         be called when the serializer is passed in directly.
+     */
+    public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+        this(new ProducerConfig(configs), keySerializer, valueSerializer);
     }
 
     /**
      * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
      * are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
+     * @param properties   The producer configs
      */
     public KafkaProducer(Properties properties) {
-        this(new ProducerConfig(properties));
+        this(new ProducerConfig(properties), null, null);
     }
 
-    private KafkaProducer(ProducerConfig config) {
+    /**
+     * A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value {@link Serializer}.
+     * Valid configuration strings are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
+     * @param properties   The producer configs
+     * @param keySerializer  The serializer for key that implements {@link Serializer}. The configure() method won't be
+     *                       called when the serializer is passed in directly.
+     * @param valueSerializer  The serializer for value that implements {@link Serializer}. The configure() method won't
+     *                         be called when the serializer is passed in directly.
+     */
+    public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+        this(new ProducerConfig(properties), keySerializer, valueSerializer);
+    }
+
+    private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
         log.trace("Starting the Kafka producer");
         this.time = new SystemTime();
         MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
@@ -145,6 +178,17 @@ public class KafkaProducer implements Producer {
 
         this.errors = this.metrics.sensor("errors");
 
+        if (keySerializer == null)
+            this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                                                              Serializer.class);
+        else
+            this.keySerializer = keySerializer;
+        if (valueSerializer == null)
+            this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                                                                Serializer.class);
+        else
+            this.valueSerializer = valueSerializer;
+
         config.logUnused();
         log.debug("Kafka producer started");
     }
@@ -159,9 +203,10 @@ public class KafkaProducer implements Producer {
 
     /**
      * Asynchronously send a record to a topic. Equivalent to {@link #send(ProducerRecord, Callback) send(record, null)}
+     * @param record  The record to be sent
      */
     @Override
-    public Future<RecordMetadata> send(ProducerRecord record) {
+    public Future<RecordMetadata> send(ProducerRecord<K,V> record) {
         return send(record, null);
     }
 
@@ -226,16 +271,19 @@ public class KafkaProducer implements Producer {
      *        indicates no callback)
      */
     @Override
-    public Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
+    public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback) {
         try {
             // first make sure the metadata for the topic is available
             waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs);
-            int partition = partitioner.partition(record, metadata.fetch());
-            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(record.key(), record.value());
+            byte[] serializedKey = keySerializer.serialize(record.topic(), record.key(), true);
+            byte[] serializedValue = valueSerializer.serialize(record.topic(), record.value(), false);
+            ProducerRecord serializedRecord = new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), serializedKey, serializedValue);
+            int partition = partitioner.partition(serializedRecord, metadata.fetch());
+            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
             ensureValidRecordSize(serializedSize);
             TopicPartition tp = new TopicPartition(record.topic(), partition);
             log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
-            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, record.key(), record.value(), compressionType, callback);
+            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, compressionType, callback);
             if (result.batchIsFull || result.newBatchCreated) {
                 log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                 this.sender.wakeup();
@@ -324,6 +372,8 @@ public class KafkaProducer implements Producer {
             throw new KafkaException(e);
         }
         this.metrics.close();
+        this.keySerializer.close();
+        this.valueSerializer.close();
         log.debug("The Kafka producer has closed.");
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index 36e8398..5baa606 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -31,7 +31,7 @@ import org.apache.kafka.common.PartitionInfo;
  * @see KafkaProducer
  * @see MockProducer
  */
-public interface Producer extends Closeable {
+public interface Producer<K,V> extends Closeable {
 
     /**
      * Send the given record asynchronously and return a future which will eventually contain the response information.
@@ -39,12 +39,12 @@ public interface Producer extends Closeable {
      * @param record The record to send
      * @return A future which will eventually contain the response information
      */
-    public Future<RecordMetadata> send(ProducerRecord record);
+    public Future<RecordMetadata> send(ProducerRecord<K,V> record);
 
     /**
      * Send a record and invoke the given callback when the record has been acknowledged by the server
      */
-    public Future<RecordMetadata> send(ProducerRecord record, Callback callback);
+    public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback);
 
     /**
      * Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 9095caf..9cdc13d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -17,7 +17,6 @@ import static org.apache.kafka.common.config.ConfigDef.Range.between;
 import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 
 import java.util.Arrays;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.kafka.common.config.AbstractConfig;
@@ -173,6 +172,14 @@ public class ProducerConfig extends AbstractConfig {
     public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
     private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking.";
 
+    /** <code>key.serializer</code> */
+    public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
+    private static final String KEY_SERIALIZER_CLASS_DOC = "Serializer class for key that implements the <code>Serializer</code> interface.";
+
+    /** <code>value.serializer</code> */
+    public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
+    private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface.";
+
     static {
         config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, BOOSTRAP_SERVERS_DOC)
                                 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
@@ -180,7 +187,7 @@ public class ProducerConfig extends AbstractConfig {
                                 .define(ACKS_CONFIG,
                                         Type.STRING,
                                         "1",
-                                        in(Arrays.asList("all","-1", "0", "1")),
+                                        in(Arrays.asList("all", "-1", "0", "1")),
                                         Importance.HIGH,
                                         ACKS_DOC)
                                 .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
@@ -219,7 +226,9 @@ public class ProducerConfig extends AbstractConfig {
                                         5,
                                         atLeast(1),
                                         Importance.LOW,
-                                        MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC);
+                                        MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
+                                .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
+                                .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC);
     }
 
     ProducerConfig(Map<? extends Object, ? extends Object> props) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
index c3181b3..065d4e6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
@@ -20,12 +20,12 @@ package org.apache.kafka.clients.producer;
  * specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is
  * present a partition will be assigned in a round-robin fashion.
  */
-public final class ProducerRecord {
+public final class ProducerRecord<K, V> {
 
     private final String topic;
     private final Integer partition;
-    private final byte[] key;
-    private final byte[] value;
+    private final K key;
+    private final V value;
 
     /**
      * Creates a record to be sent to a specified topic and partition
@@ -35,7 +35,7 @@ public final class ProducerRecord {
      * @param key The key that will be included in the record
      * @param value The record contents
      */
-    public ProducerRecord(String topic, Integer partition, byte[] key, byte[] value) {
+    public ProducerRecord(String topic, Integer partition, K key, V value) {
         if (topic == null)
             throw new IllegalArgumentException("Topic cannot be null");
         this.topic = topic;
@@ -51,7 +51,7 @@ public final class ProducerRecord {
      * @param key The key that will be included in the record
      * @param value The record contents
      */
-    public ProducerRecord(String topic, byte[] key, byte[] value) {
+    public ProducerRecord(String topic, K key, V value) {
         this(topic, null, key, value);
     }
 
@@ -61,7 +61,7 @@ public final class ProducerRecord {
      * @param topic The topic this record should be sent to
      * @param value The record contents
      */
-    public ProducerRecord(String topic, byte[] value) {
+    public ProducerRecord(String topic, V value) {
         this(topic, null, value);
     }
 
@@ -75,14 +75,14 @@ public final class ProducerRecord {
     /**
      * The key (or null if no key is specified)
      */
-    public byte[] key() {
+    public K key() {
         return key;
     }
 
     /**
      * @return The value
      */
-    public byte[] value() {
+    public V value() {
         return value;
     }
 
@@ -95,8 +95,8 @@ public final class ProducerRecord {
 
     @Override
     public String toString() {
-        String key = this.key == null ? "null" : ("byte[" + this.key.length + "]");
-        String value = this.value == null ? "null" : ("byte[" + this.value.length + "]");
+        String key = this.key == null ? "null" : this.key.toString();
+        String value = this.value == null ? "null" : this.value.toString();
         return "ProducerRecord(topic=" + topic + ", partition=" + partition + ", key=" + key + ", value=" + value;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java
new file mode 100644
index 0000000..de87f9c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer;
+
+import org.apache.kafka.common.Configurable;
+
+/**
+ *
+ * @param <T> Type to be serialized from.
+ *
+ * A class that implements this interface is expected to have a constructor with no parameter.
+ */
+public interface Serializer<T> extends Configurable {
+    /**
+     *
+     * @param topic Topic associated with data
+     * @param data Typed data
+     * @param isKey Is data for key or value
+     * @return
+     */
+    public byte[] serialize(String topic, T data, boolean isKey);
+
+    /**
+     * Close this serializer
+     */
+    public void close();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
index 40e8234..483899d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
@@ -43,7 +43,7 @@ public class Partitioner {
      * @param record The record being sent
      * @param cluster The current cluster metadata
      */
-    public int partition(ProducerRecord record, Cluster cluster) {
+    public int partition(ProducerRecord<byte[], byte[]> record, Cluster cluster) {
         List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
         int numPartitions = partitions.size();
         if (record.partition() != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java b/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java
new file mode 100644
index 0000000..a543339
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ *  Any exception during deserialization in the consumer
+ */
+public class DeserializationException extends KafkaException {
+
+    private static final long serialVersionUID = 1L;
+
+    public DeserializationException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public DeserializationException(String message) {
+        super(message);
+    }
+
+    public DeserializationException(Throwable cause) {
+        super(cause);
+    }
+
+    public DeserializationException() {
+        super();
+    }
+
+    /* avoid the expensive and useless stack trace for deserialization exceptions */
+    @Override
+    public Throwable fillInStackTrace() {
+        return this;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/clients/src/main/java/org/apache/kafka/common/errors/SerializationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/SerializationException.java b/clients/src/main/java/org/apache/kafka/common/errors/SerializationException.java
new file mode 100644
index 0000000..00388d1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/SerializationException.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ *  Any exception during serialization in the producer
+ */
+public class SerializationException extends KafkaException {
+
+    private static final long serialVersionUID = 1L;
+
+    public SerializationException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public SerializationException(String message) {
+        super(message);
+    }
+
+    public SerializationException(Throwable cause) {
+        super(cause);
+    }
+
+    public SerializationException() {
+        super();
+    }
+
+    /* avoid the expensive and useless stack trace for serialization exceptions */
+    @Override
+    public Throwable fillInStackTrace() {
+        return this;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/core/src/main/scala/kafka/producer/BaseProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BaseProducer.scala b/core/src/main/scala/kafka/producer/BaseProducer.scala
index b020793..8e00713 100644
--- a/core/src/main/scala/kafka/producer/BaseProducer.scala
+++ b/core/src/main/scala/kafka/producer/BaseProducer.scala
@@ -33,10 +33,10 @@ class NewShinyProducer(producerProps: Properties) extends BaseProducer {
   // decide whether to send synchronously based on producer properties
   val sync = producerProps.getProperty("producer.type", "async").equals("sync")
 
-  val producer = new KafkaProducer(producerProps)
+  val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
 
   override def send(topic: String, key: Array[Byte], value: Array[Byte]) {
-    val record = new ProducerRecord(topic, key, value)
+    val record = new ProducerRecord[Array[Byte],Array[Byte]](topic, key, value)
     if(sync) {
       this.producer.send(record).get()
     } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
index 4b5b823..e194942 100644
--- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
+++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
@@ -32,7 +32,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
   var requiredNumAcks: Int = Int.MaxValue
   var syncSend: Boolean = false
 
-  private var producer: KafkaProducer = null
+  private var producer: KafkaProducer[Array[Byte],Array[Byte]] = null
 
   def getTopic: String = topic
   def setTopic(topic: String) { this.topic = topic }
@@ -60,7 +60,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
       throw new MissingConfigException("topic must be specified by the Kafka log4j appender")
     if(compressionType != null) props.put(org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType)
     if(requiredNumAcks != Int.MaxValue) props.put(org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG, requiredNumAcks.toString)
-    producer = new KafkaProducer(props)
+    producer = new KafkaProducer[Array[Byte],Array[Byte]](props)
     LogLog.debug("Kafka producer connected to " +  brokerList)
     LogLog.debug("Logging for topic: " + topic)
   }
@@ -68,7 +68,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
   override def append(event: LoggingEvent)  {
     val message = subAppend(event)
     LogLog.debug("[" + new Date(event.getTimeStamp).toString + "]" + message)
-    val response = producer.send(new ProducerRecord(topic, message.getBytes()))
+    val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message.getBytes()))
     if (syncSend) response.get
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index f399105..2126f6e 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -39,7 +39,7 @@ object MirrorMaker extends Logging {
   private var producerThreads: Seq[ProducerThread] = null
   private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false)
 
-  private val shutdownMessage : ProducerRecord = new ProducerRecord("shutdown", "shutdown".getBytes)
+  private val shutdownMessage : ProducerRecord[Array[Byte],Array[Byte]] = new ProducerRecord[Array[Byte],Array[Byte]]("shutdown", "shutdown".getBytes)
 
   def main(args: Array[String]) {
     
@@ -187,9 +187,9 @@ object MirrorMaker extends Logging {
 
   class DataChannel(capacity: Int, numInputs: Int, numOutputs: Int) extends KafkaMetricsGroup {
 
-    val queues = new Array[BlockingQueue[ProducerRecord]](numOutputs)
+    val queues = new Array[BlockingQueue[ProducerRecord[Array[Byte],Array[Byte]]]](numOutputs)
     for (i <- 0 until numOutputs)
-      queues(i) = new ArrayBlockingQueue[ProducerRecord](capacity)
+      queues(i) = new ArrayBlockingQueue[ProducerRecord[Array[Byte],Array[Byte]]](capacity)
 
     private val counter = new AtomicInteger(new Random().nextInt())
 
@@ -201,7 +201,7 @@ object MirrorMaker extends Logging {
     private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.NANOSECONDS)
     private val channelSizeHist = newHistogram("MirrorMaker-DataChannel-Size")
 
-    def put(record: ProducerRecord) {
+    def put(record: ProducerRecord[Array[Byte],Array[Byte]]) {
       // If the key of the message is empty, use round-robin to select the queue
       // Otherwise use the queue based on the key value so that same key-ed messages go to the same queue
       val queueId =
@@ -213,7 +213,7 @@ object MirrorMaker extends Logging {
       put(record, queueId)
     }
 
-    def put(record: ProducerRecord, queueId: Int) {
+    def put(record: ProducerRecord[Array[Byte],Array[Byte]], queueId: Int) {
       val queue = queues(queueId)
 
       var putSucceed = false
@@ -225,9 +225,9 @@ object MirrorMaker extends Logging {
       channelSizeHist.update(queue.size)
     }
 
-    def take(queueId: Int): ProducerRecord = {
+    def take(queueId: Int): ProducerRecord[Array[Byte],Array[Byte]] = {
       val queue = queues(queueId)
-      var data: ProducerRecord = null
+      var data: ProducerRecord[Array[Byte],Array[Byte]] = null
       while (data == null) {
         val startTakeTime = SystemTime.nanoseconds
         data = queue.poll(500, TimeUnit.MILLISECONDS)
@@ -254,7 +254,7 @@ object MirrorMaker extends Logging {
       info("Starting mirror maker consumer thread " + threadName)
       try {
         for (msgAndMetadata <- stream) {
-          val data = new ProducerRecord(msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message)
+          val data = new ProducerRecord[Array[Byte],Array[Byte]](msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message)
           mirrorDataChannel.put(data)
         }
       } catch {
@@ -297,7 +297,7 @@ object MirrorMaker extends Logging {
       info("Starting mirror maker producer thread " + threadName)
       try {
         while (true) {
-          val data: ProducerRecord = dataChannel.take(threadId)
+          val data: ProducerRecord[Array[Byte],Array[Byte]] = dataChannel.take(threadId)
           trace("Sending message with value size %d".format(data.value().size))
           if(data eq shutdownMessage) {
             info("Received shutdown message")
@@ -345,5 +345,4 @@ object MirrorMaker extends Logging {
       }
     }
   }
-}
-
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index 3393a3d..f541987 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -124,7 +124,7 @@ object ReplayLogProducer extends Logging {
 
   class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread with Logging {
     val shutdownLatch = new CountDownLatch(1)
-    val producer = new KafkaProducer(config.producerProps)
+    val producer = new KafkaProducer[Array[Byte],Array[Byte]](config.producerProps)
 
     override def run() {
       info("Starting consumer thread..")
@@ -137,7 +137,7 @@ object ReplayLogProducer extends Logging {
             stream
         for (messageAndMetadata <- iter) {
           try {
-            val response = producer.send(new ProducerRecord(config.outputTopic,
+            val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](config.outputTopic,
                                             messageAndMetadata.key(), messageAndMetadata.message()))
             if(config.isSync) {
               response.get()

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
index 67196f3..2ebc7bf 100644
--- a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
+++ b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
@@ -56,7 +56,7 @@ object TestEndToEndLatency {
     producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0")
     producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
     producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString)
-    val producer = new KafkaProducer(producerProps)
+    val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
 
     // make sure the consumer fetcher has started before sending data since otherwise
     // the consumption from the tail will skip the first message and hence be blocked
@@ -67,7 +67,7 @@ object TestEndToEndLatency {
     val latencies = new Array[Long](numMessages)
     for (i <- 0 until numMessages) {
       val begin = System.nanoTime
-      producer.send(new ProducerRecord(topic, message))
+      producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message))
       val received = iter.next
       val elapsed = System.nanoTime - begin
       // poor man's progress bar

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/core/src/main/scala/kafka/tools/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/TestLogCleaning.scala b/core/src/main/scala/kafka/tools/TestLogCleaning.scala
index 1d4ea93..b81010e 100644
--- a/core/src/main/scala/kafka/tools/TestLogCleaning.scala
+++ b/core/src/main/scala/kafka/tools/TestLogCleaning.scala
@@ -242,7 +242,7 @@ object TestLogCleaning {
     val producerProps = new Properties
     producerProps.setProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
     producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
-    val producer = new KafkaProducer(producerProps)
+    val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
     val rand = new Random(1)
     val keyCount = (messages / dups).toInt
     val producedFile = File.createTempFile("kafka-log-cleaner-produced-", ".txt")
@@ -254,9 +254,9 @@ object TestLogCleaning {
       val delete = i % 100 < percentDeletes
       val msg = 
         if(delete)
-          new ProducerRecord(topic, key.toString.getBytes(), null)
+          new ProducerRecord[Array[Byte],Array[Byte]](topic, key.toString.getBytes(), null)
         else
-          new ProducerRecord(topic, key.toString.getBytes(), i.toString.getBytes())
+          new ProducerRecord[Array[Byte],Array[Byte]](topic, key.toString.getBytes(), i.toString.getBytes())
       producer.send(msg)
       producedWriter.write(TestRecord(topic, key, i, delete).toString)
       producedWriter.newLine()

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index 6379f2b..1505fd4 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -75,7 +75,7 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK
     props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
     props.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")
     props.put(ProducerConfig.LINGER_MS_CONFIG, "200")
-    var producer = new KafkaProducer(props)
+    var producer = new KafkaProducer[Array[Byte],Array[Byte]](props)
     val consumer = new SimpleConsumer("localhost", port, 100, 1024*1024, "")
 
     try {
@@ -89,7 +89,7 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK
 
       // make sure the returned messages are correct
       val responses = for (message <- messages)
-        yield producer.send(new ProducerRecord(topic, null, null, message))
+        yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, null, null, message))
       val futures = responses.toList
       for ((future, offset) <- futures zip (0 until numRecords)) {
         assertEquals(offset.toLong, future.get.offset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 209a409..a890316 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -52,10 +52,10 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
   private var consumer1: SimpleConsumer = null
   private var consumer2: SimpleConsumer = null
 
-  private var producer1: KafkaProducer = null
-  private var producer2: KafkaProducer = null
-  private var producer3: KafkaProducer = null
-  private var producer4: KafkaProducer = null
+  private var producer1: KafkaProducer[Array[Byte],Array[Byte]] = null
+  private var producer2: KafkaProducer[Array[Byte],Array[Byte]] = null
+  private var producer3: KafkaProducer[Array[Byte],Array[Byte]] = null
+  private var producer4: KafkaProducer[Array[Byte],Array[Byte]] = null
 
   private val topic1 = "topic-1"
   private val topic2 = "topic-2"
@@ -93,7 +93,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
     TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
 
     // send a too-large record
-    val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
+    val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
     assertEquals("Returned metadata should have offset -1", producer1.send(record).get.offset, -1L)
   }
 
@@ -106,7 +106,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
     TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
 
     // send a too-large record
-    val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
+    val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
     intercept[ExecutionException] {
       producer2.send(record).get
     }
@@ -118,7 +118,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
   @Test
   def testNonExistentTopic() {
     // send a record with non-exist topic
-    val record = new ProducerRecord(topic2, null, "key".getBytes, "value".getBytes)
+    val record = new ProducerRecord[Array[Byte],Array[Byte]](topic2, null, "key".getBytes, "value".getBytes)
     intercept[ExecutionException] {
       producer1.send(record).get
     }
@@ -143,7 +143,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
     producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize)
 
     // send a record with incorrect broker list
-    val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
+    val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes)
     intercept[ExecutionException] {
       producer4.send(record).get
     }
@@ -160,7 +160,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
     TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
 
     // first send a message to make sure the metadata is refreshed
-    val record1 = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
+    val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes)
     producer1.send(record1).get
     producer2.send(record1).get
 
@@ -180,7 +180,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
     val msgSize = producerBufferSize / tooManyRecords
     val value = new Array[Byte](msgSize)
     new Random().nextBytes(value)
-    val record2 = new ProducerRecord(topic1, null, "key".getBytes, value)
+    val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, value)
 
     intercept[KafkaException] {
       for (i <- 1 to tooManyRecords)
@@ -201,7 +201,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
     TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
 
     // create a record with incorrect partition id, send should fail
-    val record = new ProducerRecord(topic1, new Integer(1), "key".getBytes, "value".getBytes)
+    val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, new Integer(1), "key".getBytes, "value".getBytes)
     intercept[IllegalArgumentException] {
       producer1.send(record)
     }
@@ -221,7 +221,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
     // create topic
     TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
 
-    val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
+    val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes)
 
     // first send a message to make sure the metadata is refreshed
     producer1.send(record).get
@@ -311,8 +311,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
 
     TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps)
 
-
-    val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes)
+    val record = new ProducerRecord[Array[Byte],Array[Byte]](topicName, null, "key".getBytes, "value".getBytes)
     try {
       producer3.send(record).get
       fail("Expected exception when producing to topic with fewer brokers than min.insync.replicas")
@@ -333,8 +332,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
 
     TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps)
 
-
-    val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes)
+    val record = new ProducerRecord[Array[Byte],Array[Byte]](topicName, null, "key".getBytes, "value".getBytes)
     // This should work
     producer3.send(record).get
 
@@ -366,7 +364,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes
     override def doWork(): Unit = {
       val responses =
         for (i <- sent+1 to sent+numRecords)
-        yield producer.send(new ProducerRecord(topic1, null, null, i.toString.getBytes))
+        yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, null, i.toString.getBytes))
       val futures = responses.toList
 
       try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index d407af9..6196060 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -86,24 +86,24 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
       TestUtils.createTopic(zkClient, topic, 1, 2, servers)
 
       // send a normal record
-      val record0 = new ProducerRecord(topic, new Integer(0), "key".getBytes, "value".getBytes)
+      val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), "key".getBytes, "value".getBytes)
       assertEquals("Should have offset 0", 0L, producer.send(record0, callback).get.offset)
 
       // send a record with null value should be ok
-      val record1 = new ProducerRecord(topic, new Integer(0), "key".getBytes, null)
+      val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), "key".getBytes, null)
       assertEquals("Should have offset 1", 1L, producer.send(record1, callback).get.offset)
 
       // send a record with null key should be ok
-      val record2 = new ProducerRecord(topic, new Integer(0), null, "value".getBytes)
+      val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), null, "value".getBytes)
       assertEquals("Should have offset 2", 2L, producer.send(record2, callback).get.offset)
 
       // send a record with null part id should be ok
-      val record3 = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
+      val record3 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
       assertEquals("Should have offset 3", 3L, producer.send(record3, callback).get.offset)
 
       // send a record with null topic should fail
       try {
-        val record4 = new ProducerRecord(null, new Integer(0), "key".getBytes, "value".getBytes)
+        val record4 = new ProducerRecord[Array[Byte],Array[Byte]](null, new Integer(0), "key".getBytes, "value".getBytes)
         producer.send(record4, callback)
         fail("Should not allow sending a record without topic")
       } catch {
@@ -140,7 +140,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
       TestUtils.createTopic(zkClient, topic, 1, 2, servers)
 
       // non-blocking send a list of records
-      val record0 = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
+      val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
       for (i <- 1 to numRecords)
         producer.send(record0)
       val response0 = producer.send(record0)
@@ -182,7 +182,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
 
       val responses =
         for (i <- 1 to numRecords)
-        yield producer.send(new ProducerRecord(topic, partition, null, ("value" + i).getBytes))
+        yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, ("value" + i).getBytes))
       val futures = responses.toList
       futures.map(_.get)
       for (future <- futures)
@@ -228,7 +228,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
 
     try {
       // Send a message to auto-create the topic
-      val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
+      val record = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes)
       assertEquals("Should have offset 0", 0L, producer.send(record).get.offset)
 
       // double check that the topic is created with leader elected

http://git-wip-us.apache.org/repos/asf/kafka/blob/03ad2482/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 0da774d..94d0028 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -383,7 +383,7 @@ object TestUtils extends Logging {
                         metadataFetchTimeout: Long = 3000L,
                         blockOnBufferFull: Boolean = true,
                         bufferSize: Long = 1024L * 1024L,
-                        retries: Int = 0) : KafkaProducer = {
+                        retries: Int = 0) : KafkaProducer[Array[Byte],Array[Byte]] = {
     import org.apache.kafka.clients.producer.ProducerConfig
 
     val producerProps = new Properties()
@@ -395,7 +395,7 @@ object TestUtils extends Logging {
     producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
     producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
     producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
-    return new KafkaProducer(producerProps)
+    return new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
   }
 
   /**


Mime
View raw message