kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-1797; (follow-up patch) add the serializer/deserializer api to the new java client; patched by Jun Rao; reviewed by Jay Kreps
Date Tue, 06 Jan 2015 19:07:57 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.8.2 53329583a -> f32e5ce24


kafka-1797; (follow-up patch) add the serializer/deserializer api to the new java client; patched by Jun Rao; reviewed by Jay Kreps


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f32e5ce2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f32e5ce2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f32e5ce2

Branch: refs/heads/0.8.2
Commit: f32e5ce247eb07ce33cfcb876771293e8a2eacdc
Parents: 5332958
Author: Jun Rao <junrao@gmail.com>
Authored: Tue Jan 6 11:07:46 2015 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue Jan 6 11:07:46 2015 -0800

----------------------------------------------------------------------
 build.gradle                                    |  1 +
 .../clients/consumer/ByteArrayDeserializer.java | 34 ----------
 .../kafka/clients/consumer/ConsumerConfig.java  |  4 +-
 .../kafka/clients/consumer/Deserializer.java    | 38 -----------
 .../kafka/clients/consumer/KafkaConsumer.java   | 37 +++++++++--
 .../clients/producer/ByteArraySerializer.java   | 34 ----------
 .../kafka/clients/producer/KafkaProducer.java   | 59 ++++++++++++++---
 .../kafka/clients/producer/ProducerConfig.java  |  4 +-
 .../kafka/clients/producer/Serializer.java      | 38 -----------
 .../kafka/common/config/AbstractConfig.java     | 13 ++--
 .../common/errors/DeserializationException.java | 47 --------------
 .../serialization/ByteArrayDeserializer.java    | 34 ++++++++++
 .../serialization/ByteArraySerializer.java      | 34 ++++++++++
 .../common/serialization/Deserializer.java      | 45 +++++++++++++
 .../kafka/common/serialization/Serializer.java  | 45 +++++++++++++
 .../serialization/StringDeserializer.java       | 50 ++++++++++++++
 .../common/serialization/StringSerializer.java  | 50 ++++++++++++++
 .../common/serialization/SerializationTest.java | 68 ++++++++++++++++++++
 .../kafka/producer/KafkaLog4jAppender.scala     |  2 +
 .../scala/kafka/tools/ConsoleProducer.scala     |  2 +
 .../main/scala/kafka/tools/MirrorMaker.scala    |  7 +-
 .../scala/kafka/tools/ProducerPerformance.scala |  2 +
 .../scala/kafka/tools/ReplayLogProducer.scala   |  2 +
 .../scala/kafka/tools/TestEndToEndLatency.scala |  2 +
 .../scala/kafka/tools/TestLogCleaning.scala     |  2 +
 .../kafka/api/ProducerCompressionTest.scala     |  2 +
 .../kafka/api/ProducerSendTest.scala            | 53 +++++++++++++++
 .../test/scala/unit/kafka/utils/TestUtils.scala |  2 +
 28 files changed, 492 insertions(+), 219 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 18f86e4..ba52288 100644
--- a/build.gradle
+++ b/build.gradle
@@ -371,6 +371,7 @@ project(':clients') {
   javadoc {
     include "**/org/apache/kafka/clients/producer/*"
     include "**/org/apache/kafka/common/errors/*"
+    include "**/org/apache/kafka/common/serialization/*"
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java
deleted file mode 100644
index 514cbd2..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.kafka.clients.consumer;
-
-import java.util.Map;
-
-public class ByteArrayDeserializer implements Deserializer<byte[]> {
-
-    @Override
-    public void configure(Map<String, ?> configs) {
-        // nothing to do
-    }
-
-    @Override
-    public byte[] deserialize(String topic, byte[] data, boolean isKey) {
-        return data;
-    }
-
-    @Override
-    public void close() {
-        // nothing to do
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 1d64f08..57c1807 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -185,8 +185,8 @@ public class ConsumerConfig extends AbstractConfig {
                                         METRICS_SAMPLE_WINDOW_MS_DOC)
                                 .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC)
                                 .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC)
-                                .define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.consumer.ByteArrayDeserializer", Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC)
-                                .define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.consumer.ByteArrayDeserializer", Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC);
+                                .define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC)
+                                .define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC);
 
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java
deleted file mode 100644
index c774a19..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.kafka.clients.consumer;
-
-import org.apache.kafka.common.Configurable;
-
-/**
- *
- * @param <T> Type to be deserialized into.
- *
- * A class that implements this interface is expected to have a constructor with no parameter.
- */
-public interface Deserializer<T> extends Configurable {
-    /**
-     *
-     * @param topic Topic associated with the data
-     * @param data Serialized bytes
-     * @param isKey Is data for key or value
-     * @return
-     */
-    public T deserialize(String topic, byte[] data, boolean isKey);
-
-    /**
-     * Close this deserializer
-     */
-    public void close();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index fe90663..7f8a41c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -18,6 +18,7 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.ClientUtils;
 import org.apache.kafka.common.utils.SystemTime;
 import org.slf4j.Logger;
@@ -345,7 +346,7 @@ public class KafkaConsumer<K,V> implements Consumer<K,V> {
      * @param configs   The consumer configs
      */
     public KafkaConsumer(Map<String, Object> configs) {
-        this(new ConsumerConfig(configs), null, null, null);
+        this(configs, null);
     }
 
     /**
@@ -358,7 +359,7 @@ public class KafkaConsumer<K,V> implements Consumer<K,V> {
      *                  every rebalance operation.  
      */
     public KafkaConsumer(Map<String, Object> configs, ConsumerRebalanceCallback callback) {
-        this(new ConsumerConfig(configs), callback, null, null);
+        this(configs, callback, null, null);
     }
 
     /**
@@ -375,7 +376,19 @@ public class KafkaConsumer<K,V> implements Consumer<K,V> {
      *                           won't be called when the deserializer is passed in directly.
      */
     public KafkaConsumer(Map<String, Object> configs, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
-        this(new ConsumerConfig(configs), callback, keyDeserializer, valueDeserializer);
+        this(new ConsumerConfig(addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
+             callback, keyDeserializer, valueDeserializer);
+    }
+
+    private static Map<String, Object> addDeserializerToConfig(Map<String, Object> configs,
+                                                               Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer) {
+        Map<String, Object> newConfigs = new HashMap<String, Object>();
+        newConfigs.putAll(configs);
+        if (keyDeserializer != null)
+            newConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass());
+        if (keyDeserializer != null)
+            newConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass());
+        return newConfigs;
     }
 
     /**
@@ -383,7 +396,7 @@ public class KafkaConsumer<K,V> implements Consumer<K,V> {
      * Valid configuration strings are documented at {@link ConsumerConfig}
      */
     public KafkaConsumer(Properties properties) {
-        this(new ConsumerConfig(properties), null, null, null);
+        this(properties, null);
     }
 
     /**
@@ -396,7 +409,7 @@ public class KafkaConsumer<K,V> implements Consumer<K,V> {
      *                   every rebalance operation.  
      */
     public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback) {
-        this(new ConsumerConfig(properties), callback, null, null);
+        this(properties, callback, null, null);
     }
 
     /**
@@ -413,7 +426,19 @@ public class KafkaConsumer<K,V> implements Consumer<K,V> {
      *                           won't be called when the deserializer is passed in directly.
      */
     public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
-        this(new ConsumerConfig(properties), callback, keyDeserializer, valueDeserializer);
+        this(new ConsumerConfig(addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)),
+             callback, keyDeserializer, valueDeserializer);
+    }
+
+    private static Properties addDeserializerToConfig(Properties properties,
+                                                      Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer) {
+        Properties newProperties = new Properties();
+        newProperties.putAll(properties);
+        if (keyDeserializer != null)
+            newProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
+        if (keyDeserializer != null)
+            newProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
+        return newProperties;
     }
 
     private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java b/clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java
deleted file mode 100644
index 9005b74..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.kafka.clients.producer;
-
-import java.util.Map;
-
-public class ByteArraySerializer implements Serializer<byte[]> {
-
-    @Override
-    public void configure(Map<String, ?> configs) {
-        // nothing to do
-    }
-
-    @Override
-    public byte[] serialize(String topic, byte[] data, boolean isKey) {
-        return data;
-    }
-
-    @Override
-    public void close() {
-        // nothing to do
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index d859fc5..db23a12 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -13,10 +13,7 @@
 package org.apache.kafka.clients.producer;
 
 import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -34,6 +31,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -44,6 +42,7 @@ import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.ClientUtils;
 import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.SystemTime;
@@ -102,7 +101,19 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
      *                         be called when the serializer is passed in directly.
      */
     public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
-        this(new ProducerConfig(configs), keySerializer, valueSerializer);
+        this(new ProducerConfig(addSerializerToConfig(configs, keySerializer, valueSerializer)),
+             keySerializer, valueSerializer);
+    }
+
+    private static Map<String, Object> addSerializerToConfig(Map<String, Object> configs,
+                                                      Serializer<?> keySerializer, Serializer<?> valueSerializer) {
+        Map<String, Object> newConfigs = new HashMap<String, Object>();
+        newConfigs.putAll(configs);
+        if (keySerializer != null)
+            newConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass());
+        if (valueSerializer != null)
+            newConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass());
+        return newConfigs;
     }
 
     /**
@@ -124,7 +135,19 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
      *                         be called when the serializer is passed in directly.
      */
     public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
-        this(new ProducerConfig(properties), keySerializer, valueSerializer);
+        this(new ProducerConfig(addSerializerToConfig(properties, keySerializer, valueSerializer)),
+             keySerializer, valueSerializer);
+    }
+
+    private static Properties addSerializerToConfig(Properties properties,
+                                                    Serializer<?> keySerializer, Serializer<?> valueSerializer) {
+        Properties newProperties = new Properties();
+        newProperties.putAll(properties);
+        if (keySerializer != null)
+            newProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass().getName());
+        if (valueSerializer != null)
+            newProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass().getName());
+        return newProperties;
     }
 
     private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
@@ -178,14 +201,18 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
 
         this.errors = this.metrics.sensor("errors");
 
-        if (keySerializer == null)
+        if (keySerializer == null) {
             this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                                               Serializer.class);
+            this.keySerializer.configure(config.originals(), true);
+        }
         else
             this.keySerializer = keySerializer;
-        if (valueSerializer == null)
+        if (valueSerializer == null) {
             this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                                                                 Serializer.class);
+            this.valueSerializer.configure(config.originals(), false);
+        }
         else
             this.valueSerializer = valueSerializer;
 
@@ -275,8 +302,20 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
         try {
             // first make sure the metadata for the topic is available
             waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs);
-            byte[] serializedKey = keySerializer.serialize(record.topic(), record.key(), true);
-            byte[] serializedValue = valueSerializer.serialize(record.topic(), record.value(), false);
+            byte[] serializedKey;
+            try {
+                serializedKey = keySerializer.serialize(record.topic(), record.key());
+            } catch (ClassCastException cce) {
+                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
+                        " to the one specified in key.serializer");
+            }
+            byte[] serializedValue;
+            try {
+                serializedValue = valueSerializer.serialize(record.topic(), record.value());
+            } catch (ClassCastException cce) {
+                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
+                        " to the one specified in value.serializer");
+            }
             ProducerRecord serializedRecord = new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), serializedKey, serializedValue);
             int partition = partitioner.partition(serializedRecord, metadata.fetch());
             int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 9cdc13d..c3d810c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -227,8 +227,8 @@ public class ProducerConfig extends AbstractConfig {
                                         atLeast(1),
                                         Importance.LOW,
                                         MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
-                                .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
-                                .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC);
+                                .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
+                                .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC);
     }
 
     ProducerConfig(Map<? extends Object, ? extends Object> props) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java
deleted file mode 100644
index de87f9c..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.kafka.clients.producer;
-
-import org.apache.kafka.common.Configurable;
-
-/**
- *
- * @param <T> Type to be serialized from.
- *
- * A class that implements this interface is expected to have a constructor with no parameter.
- */
-public interface Serializer<T> extends Configurable {
-    /**
-     *
-     * @param topic Topic associated with data
-     * @param data Typed data
-     * @param isKey Is data for key or value
-     * @return
-     */
-    public byte[] serialize(String topic, T data, boolean isKey);
-
-    /**
-     * Close this serializer
-     */
-    public void close();
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 3d4ab72..c4fa058 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -12,12 +12,7 @@
  */
 package org.apache.kafka.common.config;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.KafkaException;
@@ -97,6 +92,12 @@ public class AbstractConfig {
         return keys;
     }
 
+    public Map<String, ?> originals() {
+        Map<String, Object> copy = new HashMap<String, Object>();
+        copy.putAll(originals);
+        return copy;
+    }
+
     private void logAll() {
         StringBuilder b = new StringBuilder();
         b.append(getClass().getSimpleName());

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java b/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java
deleted file mode 100644
index a543339..0000000
--- a/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.kafka.common.errors;
-
-import org.apache.kafka.common.KafkaException;
-
-/**
- *  Any exception during deserialization in the consumer
- */
-public class DeserializationException extends KafkaException {
-
-    private static final long serialVersionUID = 1L;
-
-    public DeserializationException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public DeserializationException(String message) {
-        super(message);
-    }
-
-    public DeserializationException(Throwable cause) {
-        super(cause);
-    }
-
-    public DeserializationException() {
-        super();
-    }
-
-    /* avoid the expensive and useless stack trace for deserialization exceptions */
-    @Override
-    public Throwable fillInStackTrace() {
-        return this;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java
new file mode 100644
index 0000000..d89b3ff
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.serialization;
+
+import java.util.Map;
+
+public class ByteArrayDeserializer implements Deserializer<byte[]> {
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        // nothing to do
+    }
+
+    @Override
+    public byte[] deserialize(String topic, byte[] data) {
+        return data;
+    }
+
+    @Override
+    public void close() {
+        // nothing to do
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java
new file mode 100644
index 0000000..beaef94
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.serialization;
+
+import java.util.Map;
+
+public class ByteArraySerializer implements Serializer<byte[]> {
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        // nothing to do
+    }
+
+    @Override
+    public byte[] serialize(String topic, byte[] data) {
+        return data;
+    }
+
+    @Override
+    public void close() {
+        // nothing to do
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
new file mode 100644
index 0000000..94659f7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.serialization;
+
+import java.util.Map;
+
+/**
+ *
+ * @param <T> Type to be deserialized into.
+ *
+ * A class that implements this interface is expected to have a constructor with no parameter.
+ */
+public interface Deserializer<T> {
+
+    /**
+     * Configure this class.
+     * @param configs configs in key/value pairs
+     * @param isKey whether is for key or value
+     */
+    public void configure(Map<String, ?> configs, boolean isKey);
+
+    /**
+     *
+     * @param topic topic associated with the data
+     * @param data serialized bytes
+     * @return deserialized object
+     */
+    public T deserialize(String topic, byte[] data);
+
+    /**
+     * Close this deserializer
+     */
+    public void close();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
new file mode 100644
index 0000000..0e98136
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.serialization;
+
+import java.util.Map;
+
+/**
+ *
+ * @param <T> Type to be serialized from.
+ *
+ * A class that implements this interface is expected to have a constructor with no parameter.
+ */
+public interface Serializer<T> {
+
+    /**
+     * Configure this class.
+     * @param configs configs in key/value pairs
+     * @param isKey whether is for key or value
+     */
+    public void configure(Map<String, ?> configs, boolean isKey);
+
+    /**
+     *
+     * @param topic topic associated with data
+     * @param data typed data
+     * @return serialized bytes
+     */
+    public byte[] serialize(String topic, T data);
+
+    /**
+     * Close this serializer
+     */
+    public void close();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java
new file mode 100644
index 0000000..a3b3700
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+
+/**
+ *  String encoding defaults to UTF8 and can be customized by setting the property key.deserializer.encoding,
+ *  value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
+ */
+public class StringDeserializer implements Deserializer<String> {
+    private String encoding = "UTF8";
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
+        Object encodingValue = configs.get(propertyName);
+        if (encodingValue == null)
+            encodingValue = configs.get("deserializer.encoding");
+        if (encodingValue != null && encodingValue instanceof String)
+            encoding = (String) encodingValue;
+    }
+
+    @Override
+    public String deserialize(String topic, byte[] data) {
+        try {
+            return new String(data, encoding);
+        } catch (UnsupportedEncodingException e) {
+            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
+        }
+    }
+
+    @Override
+    public void close() {
+        // nothing to do
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java
new file mode 100644
index 0000000..02db47f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+
+/**
+ *  String encoding defaults to UTF8 and can be customized by setting the property key.serializer.encoding,
+ *  value.serializer.encoding or serializer.encoding. The first two take precedence over the last.
+ */
+public class StringSerializer implements Serializer<String> {
+    private String encoding = "UTF8";
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
+        Object encodingValue = configs.get(propertyName);
+        if (encodingValue == null)
+            encodingValue = configs.get("serializer.encoding");
+        if (encodingValue != null && encodingValue instanceof String)
+            encoding = (String) encodingValue;
+    }
+
+    @Override
+    public byte[] serialize(String topic, String data) {
+        try {
+            return data.getBytes(encoding);
+        } catch (UnsupportedEncodingException e) {
+            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
+        }
+    }
+
+    @Override
+    public void close() {
+        // nothing to do
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
new file mode 100644
index 0000000..d550a31
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class SerializationTest {
+
+    private static class SerDeser<T> {
+        final Serializer<T> serializer;
+        final Deserializer<T> deserializer;
+
+        public SerDeser(Serializer<T> serializer, Deserializer<T> deserializer) {
+            this.serializer = serializer;
+            this.deserializer = deserializer;
+        }
+    }
+
+    @Test
+    public void testStringSerializer() {
+       String str = "my string";
+        String mytopic = "testTopic";
+        List<String> encodings = new ArrayList<String>();
+        encodings.add("UTF8");
+        encodings.add("UTF-16");
+
+        for ( String encoding : encodings) {
+            SerDeser<String> serDeser = getStringSerDeser(encoding);
+            Serializer<String> serializer = serDeser.serializer;
+            Deserializer<String> deserializer = serDeser.deserializer;
+
+            assertEquals("Should get the original string after serialization and deserialization with encoding " + encoding,
+                    str, deserializer.deserialize(mytopic, serializer.serialize(mytopic, str)));
+        }
+
+    }
+
+    private SerDeser<String> getStringSerDeser(String encoder) {
+        Map<String, Object> serializerConfigs = new HashMap<String, Object>();
+        serializerConfigs.put("key.serializer.encoding", encoder);
+        Serializer<String> serializer = new StringSerializer();
+        serializer.configure(serializerConfigs, true);
+
+        Map<String, Object> deserializerConfigs = new HashMap<String, Object>();
+        deserializerConfigs.put("key.deserializer.encoding", encoder);
+        Deserializer<String> deserializer = new StringDeserializer();
+        deserializer.configure(deserializerConfigs, true);
+
+        return new SerDeser<String>(serializer, deserializer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
index e194942..652dfb8 100644
--- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
+++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
@@ -60,6 +60,8 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
       throw new MissingConfigException("topic must be specified by the Kafka log4j appender")
     if(compressionType != null) props.put(org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType)
     if(requiredNumAcks != Int.MaxValue) props.put(org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG, requiredNumAcks.toString)
+    props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+    props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
     producer = new KafkaProducer[Array[Byte],Array[Byte]](props)
     LogLog.debug("Kafka producer connected to " +  brokerList)
     LogLog.debug("Logging for topic: " + topic)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/core/src/main/scala/kafka/tools/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 397d80d..a680b62 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -59,6 +59,8 @@ object ConsoleProducer {
             props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString)
             props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString)
             props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer")
+            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
 
             new NewShinyProducer(props)
           } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 2126f6e..cd6ccb5 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -22,7 +22,7 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.producer.{BaseProducer, NewShinyProducer, OldProducer}
 import kafka.serializer._
 import kafka.utils._
-import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 
 import java.util.Random
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
@@ -133,8 +133,11 @@ object MirrorMaker extends Logging {
     producerThreads = (0 until numProducers).map(i => {
       producerProps.setProperty("client.id", clientId + "-" + i)
       val producer =
-      if (useNewProducer)
+      if (useNewProducer) {
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
         new NewShinyProducer(producerProps)
+      }
       else
         new OldProducer(producerProps)
       new ProducerThread(mirrorDataChannel, producer, i)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/core/src/main/scala/kafka/tools/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
index f2dc4ed..bc25cd2 100644
--- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
@@ -188,6 +188,8 @@ object ProducerPerformance extends Logging {
         props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString)
         props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString)
         props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name)
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
         new NewShinyProducer(props)
       } else {
         props.put("metadata.broker.list", config.brokerList)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index f541987..2b8537b 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -120,6 +120,8 @@ object ReplayLogProducer extends Logging {
     import scala.collection.JavaConversions._
     val producerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt))
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
   }
 
   class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread with Logging {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
index 2ebc7bf..48cff20 100644
--- a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
+++ b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
@@ -56,6 +56,8 @@ object TestEndToEndLatency {
     producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0")
     producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
     producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString)
+    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
     val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
 
     // make sure the consumer fetcher has started before sending data since otherwise

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/core/src/main/scala/kafka/tools/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/TestLogCleaning.scala b/core/src/main/scala/kafka/tools/TestLogCleaning.scala
index b81010e..af496f7 100644
--- a/core/src/main/scala/kafka/tools/TestLogCleaning.scala
+++ b/core/src/main/scala/kafka/tools/TestLogCleaning.scala
@@ -242,6 +242,8 @@ object TestLogCleaning {
     val producerProps = new Properties
     producerProps.setProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
     producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
+    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
     val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
     val rand = new Random(1)
     val keyCount = (messages / dups).toInt

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index 1505fd4..e635588 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -75,6 +75,8 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK
     props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
     props.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")
     props.put(ProducerConfig.LINGER_MS_CONFIG, "200")
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
     var producer = new KafkaProducer[Array[Byte],Array[Byte]](props)
     val consumer = new SimpleConsumer("localhost", port, 100, 1024*1024, "")
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index 6196060..b15237b 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -30,6 +30,10 @@ import kafka.consumer.SimpleConsumer
 import kafka.api.FetchRequestBuilder
 import kafka.message.Message
 import kafka.integration.KafkaServerTestHarness
+import org.apache.kafka.common.errors.SerializationException
+import java.util.Properties
+import org.apache.kafka.common.config.ConfigException
+import org.apache.kafka.common.serialization.ByteArraySerializer
 
 
 class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
@@ -126,6 +130,55 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness {
     }
   }
 
+  @Test
+  def testSerializer() {
+    // send a record with a wrong type should receive a serialization exception
+    try {
+      val producer = createNewProducerWithWrongSerializer(brokerList)
+      val record5 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), "key".getBytes, "value".getBytes)
+      producer.send(record5)
+      fail("Should have gotten a SerializationException")
+    } catch {
+      case se: SerializationException => // this is ok
+    }
+
+    try {
+      createNewProducerWithNoSerializer(brokerList)
+      fail("Instantiating a producer without specifying a serializer should cause a ConfigException")
+    } catch {
+      case ce : ConfigException => // this is ok
+    }
+
+    // create a producer with explicit serializers should succeed
+    createNewProducerWithExplicitSerializer(brokerList)
+  }
+
+  private def createNewProducerWithWrongSerializer(brokerList: String) : KafkaProducer[Array[Byte],Array[Byte]] = {
+    import org.apache.kafka.clients.producer.ProducerConfig
+
+    val producerProps = new Properties()
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
+    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
+    return new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
+  }
+
+  private def createNewProducerWithNoSerializer(brokerList: String) : KafkaProducer[Array[Byte],Array[Byte]] = {
+    import org.apache.kafka.clients.producer.ProducerConfig
+
+    val producerProps = new Properties()
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    return new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
+  }
+
+  private def createNewProducerWithExplicitSerializer(brokerList: String) : KafkaProducer[Array[Byte],Array[Byte]] = {
+    import org.apache.kafka.clients.producer.ProducerConfig
+
+    val producerProps = new Properties()
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    return new KafkaProducer[Array[Byte],Array[Byte]](producerProps, new ByteArraySerializer, new ByteArraySerializer)
+  }
+
   /**
    * testClose checks the closing behavior
    *

http://git-wip-us.apache.org/repos/asf/kafka/blob/f32e5ce2/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 94d0028..c9e8ba2 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -395,6 +395,8 @@ object TestUtils extends Logging {
     producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
     producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
     producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
+    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
     return new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
   }
 


Mime
View raw message