kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [2/2] kafka git commit: kafka-1797; add the serializer/deserializer api to the new java client; patched by Jun Rao; reviewed by Neha Narkhede
Date Thu, 18 Dec 2014 00:29:19 GMT
kafka-1797; add the serializer/deserializer api to the new java client; patched by Jun Rao; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/92d1d4cd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/92d1d4cd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/92d1d4cd

Branch: refs/heads/trunk
Commit: 92d1d4cd31e1045f0e000e8d2d777c73f7529743
Parents: ae0bb84
Author: Jun Rao <junrao@gmail.com>
Authored: Wed Dec 17 16:29:09 2014 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Dec 17 16:29:09 2014 -0800

----------------------------------------------------------------------
 .../clients/consumer/ByteArrayDeserializer.java |  34 +++++
 .../apache/kafka/clients/consumer/Consumer.java |   4 +-
 .../kafka/clients/consumer/ConsumerConfig.java  |  14 ++-
 .../consumer/ConsumerRebalanceCallback.java     |   4 +-
 .../kafka/clients/consumer/ConsumerRecord.java  |  16 +--
 .../kafka/clients/consumer/ConsumerRecords.java |  14 +--
 .../kafka/clients/consumer/Deserializer.java    |  38 ++++++
 .../kafka/clients/consumer/KafkaConsumer.java   | 124 ++++++++++++-------
 .../kafka/clients/consumer/MockConsumer.java    |   6 +-
 .../clients/producer/ByteArraySerializer.java   |  34 +++++
 .../kafka/clients/producer/KafkaProducer.java   |  76 ++++++++++--
 .../kafka/clients/producer/MockProducer.java    |  14 +--
 .../apache/kafka/clients/producer/Producer.java |   6 +-
 .../kafka/clients/producer/ProducerConfig.java  |  15 ++-
 .../kafka/clients/producer/ProducerRecord.java  |  20 +--
 .../kafka/clients/producer/Serializer.java      |  38 ++++++
 .../clients/producer/internals/Partitioner.java |   2 +-
 .../clients/tools/ProducerPerformance.java      |   4 +-
 .../common/errors/DeserializationException.java |  47 +++++++
 .../common/errors/SerializationException.java   |  46 +++++++
 .../clients/producer/MockProducerTest.java      |   6 +-
 .../kafka/clients/producer/PartitionerTest.java |  12 +-
 .../scala/kafka/producer/BaseProducer.scala     |   4 +-
 .../kafka/producer/KafkaLog4jAppender.scala     |   6 +-
 .../main/scala/kafka/tools/MirrorMaker.scala    |   3 +-
 .../scala/kafka/tools/ReplayLogProducer.scala   |   4 +-
 .../scala/kafka/tools/TestEndToEndLatency.scala |   4 +-
 .../scala/kafka/tools/TestLogCleaning.scala     |   6 +-
 .../kafka/api/ProducerCompressionTest.scala     |   4 +-
 .../kafka/api/ProducerFailureHandlingTest.scala |  32 ++---
 .../kafka/api/ProducerSendTest.scala            |  16 +--
 .../test/scala/unit/kafka/utils/TestUtils.scala |   4 +-
 32 files changed, 499 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java
new file mode 100644
index 0000000..514cbd2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import java.util.Map;
+
+public class ByteArrayDeserializer implements Deserializer<byte[]> {
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        // nothing to do
+    }
+
+    @Override
+    public byte[] deserialize(String topic, byte[] data, boolean isKey) {
+        return data;
+    }
+
+    @Override
+    public void close() {
+        // nothing to do
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 227f564..1bce501 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.TopicPartition;
  * @see KafkaConsumer
  * @see MockConsumer
  */
-public interface Consumer extends Closeable {
+public interface Consumer<K,V> extends Closeable {
 
     /**
      * Incrementally subscribe to the given list of topics. This API is mutually exclusive to 
@@ -63,7 +63,7 @@ public interface Consumer extends Closeable {
      *         of data is controlled by {@link ConsumerConfig#FETCH_MIN_BYTES_CONFIG} and {@link ConsumerConfig#FETCH_MAX_WAIT_MS_CONFIG}.
      *         If no data is available for timeout ms, returns an empty list
      */
-    public Map<String, ConsumerRecords> poll(long timeout);
+    public Map<String, ConsumerRecords<K,V>> poll(long timeout);
 
     /**
      * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 46efc0c..1d64f08 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -151,6 +151,14 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
     private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows " + "plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
 
+    /** <code>key.deserializer</code> */
+    public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
+    private static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the <code>Deserializer</code> interface.";
+
+    /** <code>value.deserializer</code> */
+    public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
+    private static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>Deserializer</code> interface.";
+
     static {
         /* TODO: add config docs */
         config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, "blah blah")
@@ -176,8 +184,10 @@ public class ConsumerConfig extends AbstractConfig {
                                         Importance.LOW,
                                         METRICS_SAMPLE_WINDOW_MS_DOC)
                                 .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC)
-                                .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC);
-                                
+                                .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC)
+                                .define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.consumer.ByteArrayDeserializer", Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC)
+                                .define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.consumer.ByteArrayDeserializer", Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC);
+
     }
 
     ConsumerConfig(Map<? extends Object, ? extends Object> props) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
index f026ae4..e4cf7d1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
@@ -35,7 +35,7 @@ public interface ConsumerRebalanceCallback {
      * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}  
      * @param partitions The list of partitions that are assigned to the consumer after rebalance
      */
-    public void onPartitionsAssigned(Consumer consumer, Collection<TopicPartition> partitions);
+    public void onPartitionsAssigned(Consumer<?,?> consumer, Collection<TopicPartition> partitions);
     
     /**
      * A callback method the user can implement to provide handling of offset commits to a customized store on the 
@@ -46,5 +46,5 @@ public interface ConsumerRebalanceCallback {
      * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}  
      * @param partitions The list of partitions that were assigned to the consumer on the last rebalance
      */
-    public void onPartitionsRevoked(Consumer consumer, Collection<TopicPartition> partitions);
+    public void onPartitionsRevoked(Consumer<?,?> consumer, Collection<TopicPartition> partitions);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
index 436d8a4..16af70a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
@@ -18,10 +18,10 @@ import org.apache.kafka.common.TopicPartition;
  * A key/value pair to be received from Kafka. This consists of a topic name and a partition number, from which the 
  * record is being received and an offset that points to the record in a Kafka partition. 
  */
-public final class ConsumerRecord {
+public final class ConsumerRecord<K,V> {
     private final TopicPartition partition; 
-    private final byte[] key;
-    private final byte[] value;
+    private final K key;
+    private final V value;
     private final long offset;
     private volatile Exception error;
     
@@ -34,7 +34,7 @@ public final class ConsumerRecord {
      * @param value     The record contents
      * @param offset    The offset of this record in the corresponding Kafka partition
      */
-    public ConsumerRecord(String topic, int partitionId, byte[] key, byte[] value, long offset) {
+    public ConsumerRecord(String topic, int partitionId, K key, V value, long offset) {
         this(topic, partitionId, key, value, offset, null);
     }
 
@@ -46,7 +46,7 @@ public final class ConsumerRecord {
      * @param value The record contents
      * @param offset The offset of this record in the corresponding Kafka partition
      */
-    public ConsumerRecord(String topic, int partitionId, byte[] value, long offset) {
+    public ConsumerRecord(String topic, int partitionId, V value, long offset) {
         this(topic, partitionId, null, value, offset);
     }
 
@@ -60,7 +60,7 @@ public final class ConsumerRecord {
         this(topic, partitionId, null, null, -1L, error);
     }
     
-    private ConsumerRecord(String topic, int partitionId, byte[] key, byte[] value, long offset, Exception error) {
+    private ConsumerRecord(String topic, int partitionId, K key, V value, long offset, Exception error) {
         if (topic == null)
             throw new IllegalArgumentException("Topic cannot be null");
         this.partition = new TopicPartition(topic, partitionId);
@@ -95,7 +95,7 @@ public final class ConsumerRecord {
      * The key (or null if no key is specified)
      * @throws Exception The exception thrown while fetching this record.
      */
-    public byte[] key() throws Exception {
+    public K key() throws Exception {
         if (this.error != null)
             throw this.error;
         return key;
@@ -105,7 +105,7 @@ public final class ConsumerRecord {
      * The value
      * @throws Exception The exception thrown while fetching this record.
      */
-    public byte[] value() throws Exception {
+    public V value() throws Exception {
         if (this.error != null)
             throw this.error;
         return value;

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
index 2ecfc8a..bdf4b26 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -21,12 +21,12 @@ import java.util.Map.Entry;
  * A container that holds the list {@link ConsumerRecord} per partition for a particular topic. There is one for every topic returned by a 
  * {@link Consumer#poll(long)} operation. 
  */
-public class ConsumerRecords {
+public class ConsumerRecords<K,V> {
 
     private final String topic;
-    private final Map<Integer, List<ConsumerRecord>> recordsPerPartition;
+    private final Map<Integer, List<ConsumerRecord<K,V>>> recordsPerPartition;
     
-    public ConsumerRecords(String topic, Map<Integer, List<ConsumerRecord>> records) {
+    public ConsumerRecords(String topic, Map<Integer, List<ConsumerRecord<K,V>>> records) {
         this.topic = topic;
         this.recordsPerPartition = records;
     }
@@ -36,16 +36,16 @@ public class ConsumerRecords {
      * specified, returns records for all partitions
      * @return The list of {@link ConsumerRecord}s associated with the given partitions.
      */
-    public List<ConsumerRecord> records(int... partitions) {
-        List<ConsumerRecord> recordsToReturn = new ArrayList<ConsumerRecord>(); 
+    public List<ConsumerRecord<K,V>> records(int... partitions) {
+        List<ConsumerRecord<K,V>> recordsToReturn = new ArrayList<ConsumerRecord<K,V>>();
         if(partitions.length == 0) {
             // return records for all partitions
-            for(Entry<Integer, List<ConsumerRecord>> record : recordsPerPartition.entrySet()) {
+            for(Entry<Integer, List<ConsumerRecord<K,V>>> record : recordsPerPartition.entrySet()) {
                 recordsToReturn.addAll(record.getValue());
             }
         } else {
            for(int partition : partitions) {
-               List<ConsumerRecord> recordsForThisPartition = recordsPerPartition.get(partition);
+               List<ConsumerRecord<K,V>> recordsForThisPartition = recordsPerPartition.get(partition);
                recordsToReturn.addAll(recordsForThisPartition);
            }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java
new file mode 100644
index 0000000..fa857d4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.Configurable;
+
+/**
+ *
+ * @param <T> Type to be deserialized into.
+ *
+ * A class that implements this interface is expected to have a constructor with no parameter.
+ */
+public interface Deserializer<T> extends Configurable {
+    /**
+     *
+     * @param topic Topic associated with the data
+     * @param data Serialized bytes
+     * @param isKey Is data for key or value
+     * @return deserialized typed data
+     */
+    public T deserialize(String topic, byte[] data, boolean isKey);
+
+    /**
+     * Close this deserializer
+     */
+    public void close();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index fe93afa..a43b160 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -12,19 +12,6 @@
 */
 package org.apache.kafka.clients.consumer;
 
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.Map.Entry;
-import java.util.concurrent.Future;
-
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.JmxReporter;
@@ -36,6 +23,9 @@ import org.apache.kafka.common.utils.SystemTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.InetSocketAddress;
+import java.util.*;
+
 /**
  * A Kafka client that consumes records from a Kafka cluster.
  * <P>
@@ -50,12 +40,12 @@ import org.slf4j.LoggerFactory;
  * a convenience method to demonstrate the different use cases of the consumer APIs. Here is a sample implementation of such a process() method.
  * <pre>
  * {@code
- * private Map<TopicPartition, Long> process(Map<String, ConsumerRecords> records) {
+ * private Map<TopicPartition, Long> process(Map<String, ConsumerRecord<byte[], byte[]> records) {
  *     Map<TopicPartition, Long> processedOffsets = new HashMap<TopicPartition, Long>();
- *     for(Entry<String, ConsumerRecords> recordMetadata : records.entrySet()) {
- *          List<ConsumerRecord> recordsPerTopic = recordMetadata.getValue().records();
+ *     for(Entry<String, ConsumerRecords<byte[], byte[]>> recordMetadata : records.entrySet()) {
+ *          List<ConsumerRecord<byte[], byte[]>> recordsPerTopic = recordMetadata.getValue().records();
  *          for(int i = 0;i < recordsPerTopic.size();i++) {
- *               ConsumerRecord record = recordsPerTopic.get(i);
+ *               ConsumerRecord<byte[], byte[]> record = recordsPerTopic.get(i);
  *               // process record
  *               try {
  *               	processedOffsets.put(record.topicAndpartition(), record.offset());
@@ -80,11 +70,11 @@ import org.slf4j.LoggerFactory;
  * props.put("session.timeout.ms", "1000");
  * props.put("enable.auto.commit", "true");
  * props.put("auto.commit.interval.ms", "10000");
- * KafkaConsumer consumer = new KafkaConsumer(props);
+ * KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
  * consumer.subscribe("foo", "bar");
  * boolean isRunning = true;
  * while(isRunning) {
- *   Map<String, ConsumerRecords> records = consumer.poll(100);
+ *   Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
  *   process(records);
  * }
  * consumer.close();
@@ -102,14 +92,14 @@ import org.slf4j.LoggerFactory;
  * props.put("group.id", "test");
  * props.put("session.timeout.ms", "1000");
  * props.put("enable.auto.commit", "false");
- * KafkaConsumer consumer = new KafkaConsumer(props);
+ * KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
  * consumer.subscribe("foo", "bar");
  * int commitInterval = 100;
  * int numRecords = 0;
  * boolean isRunning = true;
  * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
  * while(isRunning) {
- *     Map<String, ConsumerRecords> records = consumer.poll(100);
+ *     Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
  *     try {
  *         Map<TopicPartition, Long> lastConsumedOffsets = process(records);
  *         consumedOffsets.putAll(lastConsumedOffsets);
@@ -156,16 +146,17 @@ import org.slf4j.LoggerFactory;
  * props.put("group.id", "test");
  * props.put("session.timeout.ms", "1000");
  * props.put("enable.auto.commit", "false");
- * KafkaConsumer consumer = new KafkaConsumer(props,
+ * KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
+ *                                            props,
  *                                            new ConsumerRebalanceCallback() {
  *                                                boolean rewindOffsets = true;  // should be retrieved from external application config
- *                                                public void onPartitionsAssigned(Consumer consumer, Collection<TopicPartition> partitions) {
+ *                                                public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
  *                                                    Map<TopicPartition, Long> latestCommittedOffsets = consumer.committed(partitions);
  *                                                    if(rewindOffsets)
  *                                                        Map<TopicPartition, Long> newOffsets = rewindOffsets(latestCommittedOffsets, 100);
  *                                                    consumer.seek(newOffsets);
  *                                                }
- *                                                public void onPartitionsRevoked(Consumer consumer, Collection<TopicPartition> partitions) {
+ *                                                public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
  *                                                    consumer.commit(true);
  *                                                }
  *                                                // this API rewinds every partition back by numberOfMessagesToRewindBackTo messages 
@@ -183,7 +174,7 @@ import org.slf4j.LoggerFactory;
  * boolean isRunning = true;
  * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
  * while(isRunning) {
- *     Map<String, ConsumerRecords> records = consumer.poll(100);
+ *     Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
  *     Map<TopicPartition, Long> lastConsumedOffsets = process(records);
  *     consumedOffsets.putAll(lastConsumedOffsets);
  *     numRecords += records.size();
@@ -211,13 +202,14 @@ import org.slf4j.LoggerFactory;
  * props.put("group.id", "test");
  * props.put("session.timeout.ms", "1000");
  * props.put("enable.auto.commit", "false"); // since enable.auto.commit only applies to Kafka based offset storage
- * KafkaConsumer consumer = new KafkaConsumer(props,
+ * KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
+ *                                            props,
  *                                            new ConsumerRebalanceCallback() {
- *                                                public void onPartitionsAssigned(Consumer consumer, Collection<TopicPartition> partitions) {
+ *                                                public void onPartitionsAssigned(Consumer<?,?> consumer, Collection<TopicPartition> partitions) {
  *                                                    Map<TopicPartition, Long> lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(partitions);
  *                                                    consumer.seek(lastCommittedOffsets);
  *                                                }
- *                                                public void onPartitionsRevoked(Consumer consumer, Collection<TopicPartition> partitions) {
+ *                                                public void onPartitionsRevoked(Consumer<?,?> consumer, Collection<TopicPartition> partitions) {
  *                                                    Map<TopicPartition, Long> offsets = getLastConsumedOffsets(partitions);
  *                                                    commitOffsetsToCustomStore(offsets); 
  *                                                }
@@ -234,7 +226,7 @@ import org.slf4j.LoggerFactory;
  * boolean isRunning = true;
  * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
  * while(isRunning) {
- *     Map<String, ConsumerRecords> records = consumer.poll(100);
+ *     Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
  *     Map<TopicPartition, Long> lastConsumedOffsets = process(records);
  *     consumedOffsets.putAll(lastConsumedOffsets);
  *     numRecords += records.size();
@@ -259,7 +251,7 @@ import org.slf4j.LoggerFactory;
  * props.put("group.id", "test");
  * props.put("enable.auto.commit", "true");
  * props.put("auto.commit.interval.ms", "10000");
- * KafkaConsumer consumer = new KafkaConsumer(props);
+ * KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
  * // subscribe to some partitions of topic foo
  * TopicPartition partition0 = new TopicPartition("foo", 0);
  * TopicPartition partition1 = new TopicPartition("foo", 1);
@@ -276,7 +268,7 @@ import org.slf4j.LoggerFactory;
  * boolean isRunning = true;
  * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
  * while(isRunning) {
- *     Map<String, ConsumerRecords> records = consumer.poll(100);
+ *     Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
  *     Map<TopicPartition, Long> lastConsumedOffsets = process(records);
  *     consumedOffsets.putAll(lastConsumedOffsets);
  *     for(TopicPartition partition : partitions) {
@@ -298,7 +290,7 @@ import org.slf4j.LoggerFactory;
  * {@code  
  * Properties props = new Properties();
  * props.put("metadata.broker.list", "localhost:9092");
- * KafkaConsumer consumer = new KafkaConsumer(props);
+ * KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
  * // subscribe to some partitions of topic foo
  * TopicPartition partition0 = new TopicPartition("foo", 0);
  * TopicPartition partition1 = new TopicPartition("foo", 1);
@@ -314,7 +306,7 @@ import org.slf4j.LoggerFactory;
  * boolean isRunning = true;
  * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
  * while(isRunning) {
- *     Map<String, ConsumerRecords> records = consumer.poll(100);
+ *     Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
  *     Map<TopicPartition, Long> lastConsumedOffsets = process(records);
  *     consumedOffsets.putAll(lastConsumedOffsets);
  *     // commit offsets for partitions 0,1 for topic foo to custom store
@@ -331,7 +323,7 @@ import org.slf4j.LoggerFactory;
  * }
  * </pre>
  */
-public class KafkaConsumer implements Consumer {
+public class KafkaConsumer<K,V> implements Consumer<K,V> {
 
     private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
 
@@ -340,7 +332,9 @@ public class KafkaConsumer implements Consumer {
     private final Metrics metrics;
     private final Set<String> subscribedTopics;
     private final Set<TopicPartition> subscribedPartitions;
-    
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+
     /**
      * A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
      * are documented <a href="http://kafka.apache.org/documentation.html#consumerconfigs">here</a>. Values can be
@@ -351,7 +345,7 @@ public class KafkaConsumer implements Consumer {
      * @param configs   The consumer configs
      */
     public KafkaConsumer(Map<String, Object> configs) {
-        this(new ConsumerConfig(configs), null);
+        this(new ConsumerConfig(configs), null, null, null);
     }
 
     /**
@@ -364,7 +358,24 @@ public class KafkaConsumer implements Consumer {
      *                  every rebalance operation.  
      */
     public KafkaConsumer(Map<String, Object> configs, ConsumerRebalanceCallback callback) {
-        this(new ConsumerConfig(configs), callback);
+        this(new ConsumerConfig(configs), callback, null, null);
+    }
+
+    /**
+     * A consumer is instantiated by providing a set of key-value pairs as configuration, a {@link ConsumerRebalanceCallback}
+     * implementation, a key and a value {@link Deserializer}.
+     * <p>
+     * Valid configuration strings are documented at {@link ConsumerConfig}
+     * @param configs   The consumer configs
+     * @param callback  A callback interface that the user can implement to manage customized offsets on the start and end of
+     *                  every rebalance operation.
+     * @param keyDeserializer  The deserializer for key that implements {@link Deserializer}. The configure() method won't
+     *                         be called in the consumer when the deserializer is passed in directly.
+     * @param valueDeserializer  The deserializer for value that implements {@link Deserializer}. The configure() method
+     *                           won't be called in the consumer when the deserializer is passed in directly.
+     */
+    public KafkaConsumer(Map<String, Object> configs, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
+        this(new ConsumerConfig(configs), callback, keyDeserializer, valueDeserializer);
     }
 
     /**
@@ -372,12 +383,12 @@ public class KafkaConsumer implements Consumer {
      * Valid configuration strings are documented at {@link ConsumerConfig}
      */
     public KafkaConsumer(Properties properties) {
-        this(new ConsumerConfig(properties), null);
+        this(new ConsumerConfig(properties), null, null, null);
     }
 
     /**
      * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration and a 
-     * {@link ConsumerRebalanceCallback} implementation. 
+     * {@link ConsumerRebalanceCallback} implementation.
      * <p>
      * Valid configuration strings are documented at {@link ConsumerConfig}
      * @param properties The consumer configuration properties
@@ -385,14 +396,27 @@ public class KafkaConsumer implements Consumer {
      *                   every rebalance operation.  
      */
     public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback) {
-        this(new ConsumerConfig(properties), callback);
+        this(new ConsumerConfig(properties), callback, null, null);
     }
 
-    private KafkaConsumer(ConsumerConfig config) {
-        this(config, null);
+    /**
+     * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration and a
+     * {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}.
+     * <p>
+     * Valid configuration strings are documented at {@link ConsumerConfig}
+     * @param properties The consumer configuration properties
+     * @param callback   A callback interface that the user can implement to manage customized offsets on the start and end of
+     *                   every rebalance operation.
+     * @param keyDeserializer  The deserializer for key that implements {@link Deserializer}. The configure() method won't
+     *                         be called in the consumer when the deserializer is passed in directly.
+     * @param valueDeserializer  The deserializer for value that implements {@link Deserializer}. The configure() method
+     *                           won't be called in the consumer when the deserializer is passed in directly.
+     */
+    public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
+        this(new ConsumerConfig(properties), callback, keyDeserializer, valueDeserializer);
     }
 
-    private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback) {
+    private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
         log.trace("Starting the Kafka consumer");
         subscribedTopics = new HashSet<String>();
         subscribedPartitions = new HashSet<TopicPartition>();
@@ -402,6 +426,18 @@ public class KafkaConsumer implements Consumer {
         this.metadataFetchTimeoutMs = config.getLong(ConsumerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
         this.totalMemorySize = config.getLong(ConsumerConfig.TOTAL_BUFFER_MEMORY_CONFIG);
         List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+
+        if (keyDeserializer == null)
+            this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                                                                Deserializer.class);
+        else
+            this.keyDeserializer = keyDeserializer;
+        if (valueDeserializer == null)
+            this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                                                                  Deserializer.class);
+        else
+            this.valueDeserializer = valueDeserializer;
+
         config.logUnused();
         log.debug("Kafka consumer started");
     }
@@ -488,7 +524,7 @@ public class KafkaConsumer implements Consumer {
      * @return map of topic to records since the last fetch for the subscribed list of topics and partitions
      */
     @Override
-    public Map<String, ConsumerRecords> poll(long timeout) {
+    public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
         // TODO Auto-generated method stub
         return null;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index c3aad3b..8cab16c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -33,7 +33,7 @@ import org.apache.kafka.common.TopicPartition;
  * The consumer runs in the user thread and multiplexes I/O over TCP connections to each of the brokers it
  * needs to communicate with. Failure to close the consumer after use will leak these resources.
  */
-public class MockConsumer implements Consumer {
+public class MockConsumer implements Consumer<byte[], byte[]> {
 
     private final Set<TopicPartition> subscribedPartitions;
     private final Set<String> subscribedTopics;
@@ -90,10 +90,10 @@ public class MockConsumer implements Consumer {
     }
 
     @Override
-    public Map<String, ConsumerRecords> poll(long timeout) {
+    public Map<String, ConsumerRecords<byte[], byte[]>> poll(long timeout) {
         // hand out one dummy record, 1 per topic
         Map<String, List<ConsumerRecord>> records = new HashMap<String, List<ConsumerRecord>>();
-        Map<String, ConsumerRecords> recordMetadata = new HashMap<String, ConsumerRecords>();
+        Map<String, ConsumerRecords<byte[], byte[]>> recordMetadata = new HashMap<String, ConsumerRecords<byte[], byte[]>>();
         for(TopicPartition partition : subscribedPartitions) {
             // get the last consumed offset
             long messageSequence = consumedOffsets.get(partition);

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java b/clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java
new file mode 100644
index 0000000..9005b74
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer;
+
+import java.util.Map;
+
+public class ByteArraySerializer implements Serializer<byte[]> {
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        // nothing to do
+    }
+
+    @Override
+    public byte[] serialize(String topic, byte[] data, boolean isKey) {
+        return data;
+    }
+
+    @Override
+    public void close() {
+        // nothing to do
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 32f444e..f61efb3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory;
  * The producer manages a single background thread that does I/O as well as a TCP connection to each of the brokers it
  * needs to communicate with. Failure to close the producer after use will leak these resources.
  */
-public class KafkaProducer implements Producer {
+public class KafkaProducer<K,V> implements Producer<K,V> {
 
     private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
 
@@ -75,26 +75,59 @@ public class KafkaProducer implements Producer {
     private final CompressionType compressionType;
     private final Sensor errors;
     private final Time time;
+    private final Serializer<K> keySerializer;
+    private final Serializer<V> valueSerializer;
 
     /**
      * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
      * are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>. Values can be
      * either strings or Objects of the appropriate type (for example a numeric configuration would accept either the
      * string "42" or the integer 42).
+     * @param configs   The producer configs
+     *
      */
     public KafkaProducer(Map<String, Object> configs) {
-        this(new ProducerConfig(configs));
+        this(new ProducerConfig(configs), null, null);
+    }
+
+    /**
+     * A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value {@link Serializer}.
+     * Valid configuration strings are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
+     * Values can be either strings or Objects of the appropriate type (for example a numeric configuration would accept
+     * either the string "42" or the integer 42).
+     * @param configs   The producer configs
+     * @param keySerializer  The serializer for key that implements {@link Serializer}. The configure() method won't be
+     *                       called in the producer when the serializer is passed in directly.
+     * @param valueSerializer  The serializer for value that implements {@link Serializer}. The configure() method won't
+     *                         be called in the producer when the serializer is passed in directly.
+     */
+    public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+        this(new ProducerConfig(configs), keySerializer, valueSerializer);
     }
 
     /**
      * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
      * are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
+     * @param properties   The producer configs
      */
     public KafkaProducer(Properties properties) {
-        this(new ProducerConfig(properties));
+        this(new ProducerConfig(properties), null, null);
     }
 
-    private KafkaProducer(ProducerConfig config) {
+    /**
+     * A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value {@link Serializer}.
+     * Valid configuration strings are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
+     * @param properties   The producer configs
+     * @param keySerializer  The serializer for key that implements {@link Serializer}. The configure() method won't be
+     *                       called in the producer when the serializer is passed in directly.
+     * @param valueSerializer  The serializer for value that implements {@link Serializer}. The configure() method won't
+     *                         be called in the producer when the serializer is passed in directly.
+     */
+    public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+        this(new ProducerConfig(properties), keySerializer, valueSerializer);
+    }
+
+    private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
         log.trace("Starting the Kafka producer");
         this.time = new SystemTime();
         MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
@@ -145,6 +178,17 @@ public class KafkaProducer implements Producer {
 
         this.errors = this.metrics.sensor("errors");
 
+        if (keySerializer == null)
+            this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                                                              Serializer.class);
+        else
+            this.keySerializer = keySerializer;
+        if (valueSerializer == null)
+            this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                                                                Serializer.class);
+        else
+            this.valueSerializer = valueSerializer;
+
         config.logUnused();
         log.debug("Kafka producer started");
     }
@@ -159,9 +203,10 @@ public class KafkaProducer implements Producer {
 
     /**
      * Asynchronously send a record to a topic. Equivalent to {@link #send(ProducerRecord, Callback) send(record, null)}
+     * @param record  The record to be sent
      */
     @Override
-    public Future<RecordMetadata> send(ProducerRecord record) {
+    public Future<RecordMetadata> send(ProducerRecord<K,V> record) {
         return send(record, null);
     }
 
@@ -183,14 +228,14 @@ public class KafkaProducer implements Producer {
      * If you want to simulate a simple blocking call you can do the following:
      * 
      * <pre>
-     *   producer.send(new ProducerRecord("the-topic", "key, "value")).get();
+     *   producer.send(new ProducerRecord<byte[],byte[]>("the-topic", "key".getBytes(), "value".getBytes())).get();
      * </pre>
      * <p>
      * Those desiring fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that
      * will be invoked when the request is complete.
      * 
      * <pre>
-     *   ProducerRecord record = new ProducerRecord("the-topic", "key, "value");
+     *   ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", "key".getBytes(), "value".getBytes());
      *   producer.send(myRecord,
      *                 new Callback() {
      *                     public void onCompletion(RecordMetadata metadata, Exception e) {
@@ -205,8 +250,8 @@ public class KafkaProducer implements Producer {
      * following example <code>callback1</code> is guaranteed to execute before <code>callback2</code>:
      * 
      * <pre>
-     * producer.send(new ProducerRecord(topic, partition, key, value), callback1);
-     * producer.send(new ProducerRecord(topic, partition, key2, value2), callback2);
+     * producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key, value), callback1);
+     * producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
      * </pre>
      * <p>
      * Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or
@@ -226,16 +271,19 @@ public class KafkaProducer implements Producer {
      *        indicates no callback)
      */
     @Override
-    public Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
+    public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback) {
         try {
             // first make sure the metadata for the topic is available
             waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs);
-            int partition = partitioner.partition(record, metadata.fetch());
-            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(record.key(), record.value());
+            byte[] serializedKey = keySerializer.serialize(record.topic(), record.key(), true);
+            byte[] serializedValue = valueSerializer.serialize(record.topic(), record.value(), false);
+            ProducerRecord serializedRecord = new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), serializedKey, serializedValue);
+            int partition = partitioner.partition(serializedRecord, metadata.fetch());
+            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
             ensureValidRecordSize(serializedSize);
             TopicPartition tp = new TopicPartition(record.topic(), partition);
             log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
-            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, record.key(), record.value(), compressionType, callback);
+            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, compressionType, callback);
             if (result.batchIsFull || result.newBatchCreated) {
                 log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                 this.sender.wakeup();
@@ -324,6 +372,8 @@ public class KafkaProducer implements Producer {
             throw new KafkaException(e);
         }
         this.metrics.close();
+        this.keySerializer.close();
+        this.valueSerializer.close();
         log.debug("The Kafka producer has closed.");
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index c0f1d57..34624c3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -40,11 +40,11 @@ import org.apache.kafka.common.TopicPartition;
  * By default this mock will synchronously complete each send call successfully. However it can be configured to allow
  * the user to control the completion of the call and supply an optional error for the producer to throw.
  */
-public class MockProducer implements Producer {
+public class MockProducer implements Producer<byte[], byte[]> {
 
     private final Cluster cluster;
     private final Partitioner partitioner = new Partitioner();
-    private final List<ProducerRecord> sent;
+    private final List<ProducerRecord<byte[], byte[]>> sent;
     private final Deque<Completion> completions;
     private boolean autoComplete;
     private Map<TopicPartition, Long> offsets;
@@ -62,7 +62,7 @@ public class MockProducer implements Producer {
         this.cluster = cluster;
         this.autoComplete = autoComplete;
         this.offsets = new HashMap<TopicPartition, Long>();
-        this.sent = new ArrayList<ProducerRecord>();
+        this.sent = new ArrayList<ProducerRecord<byte[], byte[]>>();
         this.completions = new ArrayDeque<Completion>();
     }
 
@@ -90,7 +90,7 @@ public class MockProducer implements Producer {
      * @see #history()
      */
     @Override
-    public synchronized Future<RecordMetadata> send(ProducerRecord record) {
+    public synchronized Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> record) {
         return send(record, null);
     }
 
@@ -100,7 +100,7 @@ public class MockProducer implements Producer {
      * @see #history()
      */
     @Override
-    public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
+    public synchronized Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> record, Callback callback) {
         int partition = 0;
         if (this.cluster.partitionsForTopic(record.topic()) != null)
             partition = partitioner.partition(record, this.cluster);
@@ -147,8 +147,8 @@ public class MockProducer implements Producer {
     /**
      * Get the list of sent records since the last call to {@link #clear()}
      */
-    public synchronized List<ProducerRecord> history() {
-        return new ArrayList<ProducerRecord>(this.sent);
+    public synchronized List<ProducerRecord<byte[], byte[]>> history() {
+        return new ArrayList<ProducerRecord<byte[], byte[]>>(this.sent);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index 36e8398..5baa606 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -31,7 +31,7 @@ import org.apache.kafka.common.PartitionInfo;
  * @see KafkaProducer
  * @see MockProducer
  */
-public interface Producer extends Closeable {
+public interface Producer<K,V> extends Closeable {
 
     /**
      * Send the given record asynchronously and return a future which will eventually contain the response information.
@@ -39,12 +39,12 @@ public interface Producer extends Closeable {
      * @param record The record to send
      * @return A future which will eventually contain the response information
      */
-    public Future<RecordMetadata> send(ProducerRecord record);
+    public Future<RecordMetadata> send(ProducerRecord<K,V> record);
 
     /**
      * Send a record and invoke the given callback when the record has been acknowledged by the server
      */
-    public Future<RecordMetadata> send(ProducerRecord record, Callback callback);
+    public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback);
 
     /**
      * Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 72d3ddd..a893d88 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -17,7 +17,6 @@ import static org.apache.kafka.common.config.ConfigDef.Range.between;
 import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 
 import java.util.Arrays;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.kafka.common.config.AbstractConfig;
@@ -175,6 +174,14 @@ public class ProducerConfig extends AbstractConfig {
                                                                             + " Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of"
                                                                             + " message re-ordering due to retries (i.e., if retries are enabled).";
 
+    /** <code>key.serializer</code> */
+    public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
+    private static final String KEY_SERIALIZER_CLASS_DOC = "Serializer class for key that implements the <code>Serializer</code> interface.";
+
+    /** <code>value.serializer</code> */
+    public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
+    private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface.";
+
     static {
         config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, BOOSTRAP_SERVERS_DOC)
                                 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
@@ -182,7 +189,7 @@ public class ProducerConfig extends AbstractConfig {
                                 .define(ACKS_CONFIG,
                                         Type.STRING,
                                         "1",
-                                        in(Arrays.asList("all","-1", "0", "1")),
+                                        in(Arrays.asList("all", "-1", "0", "1")),
                                         Importance.HIGH,
                                         ACKS_DOC)
                                 .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
@@ -221,7 +228,9 @@ public class ProducerConfig extends AbstractConfig {
                                         5,
                                         atLeast(1),
                                         Importance.LOW,
-                                        MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC);
+                                        MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
+                                .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
+                                .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC);
     }
 
     ProducerConfig(Map<? extends Object, ? extends Object> props) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
index c3181b3..065d4e6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
@@ -20,12 +20,12 @@ package org.apache.kafka.clients.producer;
  * specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is
  * present a partition will be assigned in a round-robin fashion.
  */
-public final class ProducerRecord {
+public final class ProducerRecord<K, V> {
 
     private final String topic;
     private final Integer partition;
-    private final byte[] key;
-    private final byte[] value;
+    private final K key;
+    private final V value;
 
     /**
      * Creates a record to be sent to a specified topic and partition
@@ -35,7 +35,7 @@ public final class ProducerRecord {
      * @param key The key that will be included in the record
      * @param value The record contents
      */
-    public ProducerRecord(String topic, Integer partition, byte[] key, byte[] value) {
+    public ProducerRecord(String topic, Integer partition, K key, V value) {
         if (topic == null)
             throw new IllegalArgumentException("Topic cannot be null");
         this.topic = topic;
@@ -51,7 +51,7 @@ public final class ProducerRecord {
      * @param key The key that will be included in the record
      * @param value The record contents
      */
-    public ProducerRecord(String topic, byte[] key, byte[] value) {
+    public ProducerRecord(String topic, K key, V value) {
         this(topic, null, key, value);
     }
 
@@ -61,7 +61,7 @@ public final class ProducerRecord {
      * @param topic The topic this record should be sent to
      * @param value The record contents
      */
-    public ProducerRecord(String topic, byte[] value) {
+    public ProducerRecord(String topic, V value) {
         this(topic, null, value);
     }
 
@@ -75,14 +75,14 @@ public final class ProducerRecord {
     /**
      * The key (or null if no key is specified)
      */
-    public byte[] key() {
+    public K key() {
         return key;
     }
 
     /**
      * @return The value
      */
-    public byte[] value() {
+    public V value() {
         return value;
     }
 
@@ -95,8 +95,8 @@ public final class ProducerRecord {
 
     @Override
     public String toString() {
-        String key = this.key == null ? "null" : ("byte[" + this.key.length + "]");
-        String value = this.value == null ? "null" : ("byte[" + this.value.length + "]");
+        String key = this.key == null ? "null" : this.key.toString();
+        String value = this.value == null ? "null" : this.value.toString();
         return "ProducerRecord(topic=" + topic + ", partition=" + partition + ", key=" + key + ", value=" + value;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java
new file mode 100644
index 0000000..0378683
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer;
+
+import org.apache.kafka.common.Configurable;
+
+/**
+ *
+ * @param <T> Type to be serialized from.
+ *
+ * A class that implements this interface is expected to have a constructor with no parameter.
+ */
+public interface Serializer<T> extends Configurable {
+    /**
+     *
+     * @param topic Topic associated with data
+     * @param data Typed data
+     * @param isKey Is data for key or value
+     * @return bytes of the serialized data
+     */
+    public byte[] serialize(String topic, T data, boolean isKey);
+
+    /**
+     * Close this serializer
+     */
+    public void close();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
index 40e8234..483899d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
@@ -43,7 +43,7 @@ public class Partitioner {
      * @param record The record being sent
      * @param cluster The current cluster metadata
      */
-    public int partition(ProducerRecord record, Cluster cluster) {
+    public int partition(ProducerRecord<byte[], byte[]> record, Cluster cluster) {
         List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
         int numPartitions = partitions.size();
         if (record.partition() != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
index 28175fb..1b82800 100644
--- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
+++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
@@ -46,12 +46,12 @@ public class ProducerPerformance {
                 throw new IllegalArgumentException("Invalid property: " + args[i]);
             props.put(pieces[0], pieces[1]);
         }
-        KafkaProducer producer = new KafkaProducer(props);
+        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[],byte[]>(props);
 
         /* setup perf test */
         byte[] payload = new byte[recordSize];
         Arrays.fill(payload, (byte) 1);
-        ProducerRecord record = new ProducerRecord(topicName, payload);
+        ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>(topicName, payload);
         long sleepTime = NS_PER_SEC / throughput;
         long sleepDeficitNs = 0;
         Stats stats = new Stats(numRecords, 5000);

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java b/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java
new file mode 100644
index 0000000..a543339
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ *  Any exception during deserialization in the consumer
+ */
+public class DeserializationException extends KafkaException {
+
+    private static final long serialVersionUID = 1L;
+
+    public DeserializationException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public DeserializationException(String message) {
+        super(message);
+    }
+
+    public DeserializationException(Throwable cause) {
+        super(cause);
+    }
+
+    public DeserializationException() {
+        super();
+    }
+
+    /* avoid the expensive and useless stack trace for deserialization exceptions */
+    @Override
+    public Throwable fillInStackTrace() {
+        return this;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/clients/src/main/java/org/apache/kafka/common/errors/SerializationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/SerializationException.java b/clients/src/main/java/org/apache/kafka/common/errors/SerializationException.java
new file mode 100644
index 0000000..00388d1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/SerializationException.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ *  Any exception during serialization in the producer
+ */
+public class SerializationException extends KafkaException {
+
+    private static final long serialVersionUID = 1L;
+
+    public SerializationException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public SerializationException(String message) {
+        super(message);
+    }
+
+    public SerializationException(Throwable cause) {
+        super(cause);
+    }
+
+    public SerializationException() {
+        super();
+    }
+
+    /* avoid the expensive and useless stack trace for serialization exceptions */
+    @Override
+    public Throwable fillInStackTrace() {
+        return this;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 9a9411f..1e2ca03 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -37,7 +37,7 @@ public class MockProducerTest {
     @Test
     public void testAutoCompleteMock() throws Exception {
         MockProducer producer = new MockProducer(true);
-        ProducerRecord record = new ProducerRecord(topic, "key".getBytes(), "value".getBytes());
+        ProducerRecord record = new ProducerRecord<byte[], byte[]>(topic, "key".getBytes(), "value".getBytes());
         Future<RecordMetadata> metadata = producer.send(record);
         assertTrue("Send should be immediately complete", metadata.isDone());
         assertFalse("Send should be successful", isError(metadata));
@@ -51,8 +51,8 @@ public class MockProducerTest {
     @Test
     public void testManualCompletion() throws Exception {
         MockProducer producer = new MockProducer(false);
-        ProducerRecord record1 = new ProducerRecord("topic", "key1".getBytes(), "value1".getBytes());
-        ProducerRecord record2 = new ProducerRecord("topic", "key2".getBytes(), "value2".getBytes());
+        ProducerRecord record1 = new ProducerRecord<byte[], byte[]>("topic", "key1".getBytes(), "value1".getBytes());
+        ProducerRecord record2 = new ProducerRecord<byte[], byte[]>("topic", "key2".getBytes(), "value2".getBytes());
         Future<RecordMetadata> md1 = producer.send(record1);
         assertFalse("Send shouldn't have completed", md1.isDone());
         Future<RecordMetadata> md2 = producer.send(record2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
index f06e28c..1d077fd 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
@@ -50,22 +50,22 @@ public class PartitionerTest {
     public void testUserSuppliedPartitioning() {
         assertEquals("If the user supplies a partition we should use it.",
                      0,
-                     partitioner.partition(new ProducerRecord("test", 0, key, value), cluster));
+                     partitioner.partition(new ProducerRecord<byte[], byte[]>("test", 0, key, value), cluster));
     }
 
     @Test
     public void testKeyPartitionIsStable() {
-        int partition = partitioner.partition(new ProducerRecord("test", key, value), cluster);
+        int partition = partitioner.partition(new ProducerRecord<byte[], byte[]>("test", key, value), cluster);
         assertEquals("Same key should yield same partition",
                      partition,
-                     partitioner.partition(new ProducerRecord("test", key, "value2".getBytes()), cluster));
+                     partitioner.partition(new ProducerRecord<byte[], byte[]>("test", key, "value2".getBytes()), cluster));
     }
 
     @Test
     public void testRoundRobinIsStable() {
-        int startPart = partitioner.partition(new ProducerRecord("test", value), cluster);
+        int startPart = partitioner.partition(new ProducerRecord<byte[], byte[]>("test", value), cluster);
         for (int i = 1; i <= 100; i++) {
-            int partition = partitioner.partition(new ProducerRecord("test", value), cluster);
+            int partition = partitioner.partition(new ProducerRecord<byte[], byte[]>("test", value), cluster);
             assertEquals("Should yield a different partition each call with round-robin partitioner",
                 partition, (startPart + i) % 2);
       }
@@ -74,7 +74,7 @@ public class PartitionerTest {
     @Test
     public void testRoundRobinWithDownNode() {
         for (int i = 0; i < partitions.size(); i++) {
-            int part = partitioner.partition(new ProducerRecord("test", value), cluster);
+            int part = partitioner.partition(new ProducerRecord<byte[], byte[]>("test", value), cluster);
             assertTrue("We should never choose a leader-less node in round robin", part >= 0 && part < 2);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/core/src/main/scala/kafka/producer/BaseProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BaseProducer.scala b/core/src/main/scala/kafka/producer/BaseProducer.scala
index b020793..8e00713 100644
--- a/core/src/main/scala/kafka/producer/BaseProducer.scala
+++ b/core/src/main/scala/kafka/producer/BaseProducer.scala
@@ -33,10 +33,10 @@ class NewShinyProducer(producerProps: Properties) extends BaseProducer {
   // decide whether to send synchronously based on producer properties
   val sync = producerProps.getProperty("producer.type", "async").equals("sync")
 
-  val producer = new KafkaProducer(producerProps)
+  val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
 
   override def send(topic: String, key: Array[Byte], value: Array[Byte]) {
-    val record = new ProducerRecord(topic, key, value)
+    val record = new ProducerRecord[Array[Byte],Array[Byte]](topic, key, value)
     if(sync) {
       this.producer.send(record).get()
     } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
index 4b5b823..e194942 100644
--- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
+++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
@@ -32,7 +32,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
   var requiredNumAcks: Int = Int.MaxValue
   var syncSend: Boolean = false
 
-  private var producer: KafkaProducer = null
+  private var producer: KafkaProducer[Array[Byte],Array[Byte]] = null
 
   def getTopic: String = topic
   def setTopic(topic: String) { this.topic = topic }
@@ -60,7 +60,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
       throw new MissingConfigException("topic must be specified by the Kafka log4j appender")
     if(compressionType != null) props.put(org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType)
     if(requiredNumAcks != Int.MaxValue) props.put(org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG, requiredNumAcks.toString)
-    producer = new KafkaProducer(props)
+    producer = new KafkaProducer[Array[Byte],Array[Byte]](props)
     LogLog.debug("Kafka producer connected to " +  brokerList)
     LogLog.debug("Logging for topic: " + topic)
   }
@@ -68,7 +68,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
   override def append(event: LoggingEvent)  {
     val message = subAppend(event)
     LogLog.debug("[" + new Date(event.getTimeStamp).toString + "]" + message)
-    val response = producer.send(new ProducerRecord(topic, message.getBytes()))
+    val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message.getBytes()))
     if (syncSend) response.get
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 77d951d..53cb16c 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -27,7 +27,6 @@ import kafka.producer.{OldProducer, NewShinyProducer}
 import kafka.metrics.KafkaMetricsGroup
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.clients.producer.{RecordMetadata, ProducerRecord}
-import org.apache.kafka.common.KafkaException
 
 import scala.collection.JavaConversions._
 
@@ -547,7 +546,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
       extends NewShinyProducer(producerProps) with MirrorMakerBaseProducer {
 
     override def send(topicPartition: TopicAndPartition, offset: Long, key: Array[Byte], value: Array[Byte]) {
-      val record = new ProducerRecord(topicPartition.topic, key, value)
+      val record = new ProducerRecord[Array[Byte],Array[Byte]](topicPartition.topic, key, value)
       if(sync) {
         topicPartitionOffsetMap.getAndMaybePut(topicPartition).put(this.producer.send(record).get().partition(), offset)
       } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index 3393a3d..f541987 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -124,7 +124,7 @@ object ReplayLogProducer extends Logging {
 
   class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread with Logging {
     val shutdownLatch = new CountDownLatch(1)
-    val producer = new KafkaProducer(config.producerProps)
+    val producer = new KafkaProducer[Array[Byte],Array[Byte]](config.producerProps)
 
     override def run() {
       info("Starting consumer thread..")
@@ -137,7 +137,7 @@ object ReplayLogProducer extends Logging {
             stream
         for (messageAndMetadata <- iter) {
           try {
-            val response = producer.send(new ProducerRecord(config.outputTopic,
+            val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](config.outputTopic,
                                             messageAndMetadata.key(), messageAndMetadata.message()))
             if(config.isSync) {
               response.get()

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
index 67196f3..2ebc7bf 100644
--- a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
+++ b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
@@ -56,7 +56,7 @@ object TestEndToEndLatency {
     producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0")
     producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
     producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString)
-    val producer = new KafkaProducer(producerProps)
+    val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
 
     // make sure the consumer fetcher has started before sending data since otherwise
     // the consumption from the tail will skip the first message and hence be blocked
@@ -67,7 +67,7 @@ object TestEndToEndLatency {
     val latencies = new Array[Long](numMessages)
     for (i <- 0 until numMessages) {
       val begin = System.nanoTime
-      producer.send(new ProducerRecord(topic, message))
+      producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message))
       val received = iter.next
       val elapsed = System.nanoTime - begin
       // poor man's progress bar

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/core/src/main/scala/kafka/tools/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/TestLogCleaning.scala b/core/src/main/scala/kafka/tools/TestLogCleaning.scala
index 1d4ea93..b81010e 100644
--- a/core/src/main/scala/kafka/tools/TestLogCleaning.scala
+++ b/core/src/main/scala/kafka/tools/TestLogCleaning.scala
@@ -242,7 +242,7 @@ object TestLogCleaning {
     val producerProps = new Properties
     producerProps.setProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
     producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
-    val producer = new KafkaProducer(producerProps)
+    val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
     val rand = new Random(1)
     val keyCount = (messages / dups).toInt
     val producedFile = File.createTempFile("kafka-log-cleaner-produced-", ".txt")
@@ -254,9 +254,9 @@ object TestLogCleaning {
       val delete = i % 100 < percentDeletes
       val msg = 
         if(delete)
-          new ProducerRecord(topic, key.toString.getBytes(), null)
+          new ProducerRecord[Array[Byte],Array[Byte]](topic, key.toString.getBytes(), null)
         else
-          new ProducerRecord(topic, key.toString.getBytes(), i.toString.getBytes())
+          new ProducerRecord[Array[Byte],Array[Byte]](topic, key.toString.getBytes(), i.toString.getBytes())
       producer.send(msg)
       producedWriter.write(TestRecord(topic, key, i, delete).toString)
       producedWriter.newLine()

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index 6379f2b..1505fd4 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -75,7 +75,7 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK
     props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
     props.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")
     props.put(ProducerConfig.LINGER_MS_CONFIG, "200")
-    var producer = new KafkaProducer(props)
+    var producer = new KafkaProducer[Array[Byte],Array[Byte]](props)
     val consumer = new SimpleConsumer("localhost", port, 100, 1024*1024, "")
 
     try {
@@ -89,7 +89,7 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK
 
       // make sure the returned messages are correct
       val responses = for (message <- messages)
-        yield producer.send(new ProducerRecord(topic, null, null, message))
+        yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, null, null, message))
       val futures = responses.toList
       for ((future, offset) <- futures zip (0 until numRecords)) {
         assertEquals(offset.toLong, future.get.offset)


Mime
View raw message