kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject [7/7] git commit: KAFKA-1227 New producer!
Date Tue, 28 Jan 2014 19:16:33 GMT
KAFKA-1227 New producer!


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/269d16d3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/269d16d3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/269d16d3

Branch: refs/heads/trunk
Commit: 269d16d3c915d09f650ae32aa81542bd8522ca68
Parents: 26a02c3
Author: Jay Kreps <jay.kreps@gmail.com>
Authored: Thu Jan 23 12:41:11 2014 -0800
Committer: Jay Kreps <jay.kreps@gmail.com>
Committed: Tue Jan 28 11:15:05 2014 -0800

----------------------------------------------------------------------
 clients/build.sbt                               |   11 +
 .../producer/BufferExhaustedException.java      |   17 +
 .../java/kafka/clients/producer/Callback.java   |   15 +
 .../clients/producer/DefaultPartitioner.java    |   35 +
 .../kafka/clients/producer/KafkaProducer.java   |  240 ++
 .../kafka/clients/producer/MockProducer.java    |  201 ++
 .../kafka/clients/producer/Partitioner.java     |   30 +
 .../java/kafka/clients/producer/Producer.java   |   38 +
 .../kafka/clients/producer/ProducerConfig.java  |  148 ++
 .../kafka/clients/producer/ProducerRecord.java  |   84 +
 .../java/kafka/clients/producer/RecordSend.java |   88 +
 .../clients/producer/internals/BufferPool.java  |  223 ++
 .../clients/producer/internals/Metadata.java    |  120 +
 .../internals/ProduceRequestResult.java         |   83 +
 .../producer/internals/RecordAccumulator.java   |  236 ++
 .../clients/producer/internals/RecordBatch.java |   84 +
 .../clients/producer/internals/Sender.java      |  503 ++++
 .../clients/tools/ProducerPerformance.java      |   70 +
 .../java/kafka/common/ByteSerialization.java    |   18 +
 clients/src/main/java/kafka/common/Cluster.java |  102 +
 .../main/java/kafka/common/Configurable.java    |   15 +
 .../main/java/kafka/common/Deserializer.java    |   18 +
 .../main/java/kafka/common/KafkaException.java  |   26 +
 clients/src/main/java/kafka/common/Metric.java  |   23 +
 clients/src/main/java/kafka/common/Node.java    |   76 +
 .../main/java/kafka/common/PartitionInfo.java   |   58 +
 .../src/main/java/kafka/common/Serializer.java  |   21 +
 .../java/kafka/common/StringSerialization.java  |   58 +
 .../main/java/kafka/common/TopicPartition.java  |   61 +
 .../kafka/common/config/AbstractConfig.java     |   93 +
 .../java/kafka/common/config/ConfigDef.java     |  253 ++
 .../kafka/common/config/ConfigException.java    |   24 +
 .../java/kafka/common/errors/ApiException.java  |   35 +
 .../common/errors/CorruptMessageException.java  |   23 +
 .../errors/LeaderNotAvailableException.java     |   19 +
 .../common/errors/MessageTooLargeException.java |   23 +
 .../kafka/common/errors/NetworkException.java   |   23 +
 .../errors/NotLeaderForPartitionException.java  |   23 +
 .../common/errors/OffsetMetadataTooLarge.java   |   22 +
 .../errors/OffsetOutOfRangeException.java       |   22 +
 .../kafka/common/errors/RetryableException.java |   31 +
 .../kafka/common/errors/TimeoutException.java   |   23 +
 .../common/errors/UnknownServerException.java   |   22 +
 .../UnknownTopicOrPartitionException.java       |   22 +
 .../java/kafka/common/metrics/CompoundStat.java |   40 +
 .../java/kafka/common/metrics/JmxReporter.java  |  184 ++
 .../java/kafka/common/metrics/KafkaMetric.java  |   55 +
 .../java/kafka/common/metrics/Measurable.java   |   16 +
 .../kafka/common/metrics/MeasurableStat.java    |   10 +
 .../java/kafka/common/metrics/MetricConfig.java |   71 +
 .../main/java/kafka/common/metrics/Metrics.java |  190 ++
 .../kafka/common/metrics/MetricsReporter.java   |   27 +
 .../main/java/kafka/common/metrics/Quota.java   |   36 +
 .../common/metrics/QuotaViolationException.java |   16 +
 .../main/java/kafka/common/metrics/Sensor.java  |  171 ++
 .../main/java/kafka/common/metrics/Stat.java    |   16 +
 .../java/kafka/common/metrics/stats/Avg.java    |   33 +
 .../java/kafka/common/metrics/stats/Count.java  |   29 +
 .../kafka/common/metrics/stats/Histogram.java   |  137 ++
 .../java/kafka/common/metrics/stats/Max.java    |   29 +
 .../java/kafka/common/metrics/stats/Min.java    |   29 +
 .../kafka/common/metrics/stats/Percentile.java  |   32 +
 .../kafka/common/metrics/stats/Percentiles.java |   76 +
 .../java/kafka/common/metrics/stats/Rate.java   |   85 +
 .../kafka/common/metrics/stats/SampledStat.java |  106 +
 .../java/kafka/common/metrics/stats/Total.java  |   31 +
 .../kafka/common/network/ByteBufferReceive.java |   43 +
 .../kafka/common/network/ByteBufferSend.java    |   54 +
 .../kafka/common/network/NetworkReceive.java    |   74 +
 .../java/kafka/common/network/NetworkSend.java  |   26 +
 .../main/java/kafka/common/network/Receive.java |   35 +
 .../java/kafka/common/network/Selectable.java   |   68 +
 .../java/kafka/common/network/Selector.java     |  349 +++
 .../main/java/kafka/common/network/Send.java    |   41 +
 .../java/kafka/common/protocol/ApiKeys.java     |   35 +
 .../main/java/kafka/common/protocol/Errors.java |   97 +
 .../java/kafka/common/protocol/ProtoUtils.java  |   95 +
 .../java/kafka/common/protocol/Protocol.java    |  130 ++
 .../kafka/common/protocol/types/ArrayOf.java    |   63 +
 .../java/kafka/common/protocol/types/Field.java |   48 +
 .../kafka/common/protocol/types/Schema.java     |  134 ++
 .../common/protocol/types/SchemaException.java  |   13 +
 .../kafka/common/protocol/types/Struct.java     |  227 ++
 .../java/kafka/common/protocol/types/Type.java  |  216 ++
 .../kafka/common/record/CompressionType.java    |   40 +
 .../common/record/InvalidRecordException.java   |   11 +
 .../main/java/kafka/common/record/LogEntry.java |   28 +
 .../java/kafka/common/record/MemoryRecords.java |  102 +
 .../main/java/kafka/common/record/Record.java   |  286 +++
 .../main/java/kafka/common/record/Records.java  |   29 +
 .../kafka/common/requests/RequestHeader.java    |   68 +
 .../java/kafka/common/requests/RequestSend.java |   38 +
 .../kafka/common/requests/ResponseHeader.java   |   45 +
 .../kafka/common/utils/AbstractIterator.java    |   72 +
 .../java/kafka/common/utils/CopyOnWriteMap.java |  130 ++
 .../src/main/java/kafka/common/utils/Crc32.java | 2169 ++++++++++++++++++
 .../java/kafka/common/utils/KafkaThread.java    |   18 +
 .../java/kafka/common/utils/SystemTime.java     |   26 +
 .../src/main/java/kafka/common/utils/Time.java  |   23 +
 .../src/main/java/kafka/common/utils/Utils.java |  230 ++
 .../clients/common/network/SelectorTest.java    |  292 +++
 .../kafka/clients/producer/BufferPoolTest.java  |  170 ++
 .../kafka/clients/producer/MetadataTest.java    |   55 +
 .../clients/producer/MockProducerTest.java      |   66 +
 .../clients/producer/RecordAccumulatorTest.java |  135 ++
 .../kafka/clients/producer/RecordSendTest.java  |   76 +
 .../java/kafka/clients/producer/SenderTest.java |   92 +
 .../java/kafka/common/config/ConfigDefTest.java |   88 +
 .../kafka/common/metrics/JmxReporterTest.java   |   21 +
 .../java/kafka/common/metrics/MetricsTest.java  |  176 ++
 .../common/metrics/stats/HistogramTest.java     |   56 +
 .../types/ProtocolSerializationTest.java        |   96 +
 .../kafka/common/record/MemoryRecordsTest.java  |   44 +
 .../java/kafka/common/record/RecordTest.java    |   87 +
 .../common/utils/AbstractIteratorTest.java      |   54 +
 .../test/java/kafka/common/utils/MockTime.java  |   28 +
 .../src/test/java/kafka/test/MetricsBench.java  |   38 +
 .../test/java/kafka/test/Microbenchmarks.java   |  143 ++
 .../src/test/java/kafka/test/MockSelector.java  |   87 +
 clients/src/test/java/kafka/test/TestUtils.java |   73 +
 project/Build.scala                             |    1 +
 121 files changed, 11720 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/build.sbt
----------------------------------------------------------------------
diff --git a/clients/build.sbt b/clients/build.sbt
new file mode 100644
index 0000000..ca3c8ee
--- /dev/null
+++ b/clients/build.sbt
@@ -0,0 +1,11 @@
+import sbt._
+import Keys._
+import AssemblyKeys._
+
+name := "clients"
+
+libraryDependencies ++= Seq(
+  "com.novocode"          % "junit-interface" % "0.9" % "test"
+)
+
+assemblySettings

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java b/clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java
new file mode 100644
index 0000000..d1aa4b6
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/BufferExhaustedException.java
@@ -0,0 +1,17 @@
+package kafka.clients.producer;
+
+import kafka.common.KafkaException;
+
+/**
+ * This exception is thrown if the producer is in non-blocking mode and the rate of data production exceeds the rate at
+ * which data can be sent for long enough for the alloted buffer to be exhausted.
+ */
+public class BufferExhaustedException extends KafkaException {
+
+    private static final long serialVersionUID = 1L;
+
+    public BufferExhaustedException(String message) {
+        super(message);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/clients/producer/Callback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/Callback.java b/clients/src/main/java/kafka/clients/producer/Callback.java
new file mode 100644
index 0000000..47e5af3
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/Callback.java
@@ -0,0 +1,15 @@
+package kafka.clients.producer;
+
+/**
+ * A callback interface that the user can implement to allow code to execute when the request is complete. This callback
+ * will execute in the background I/O thread so it should be fast.
+ */
+public interface Callback {
+
+    /**
+     * A callback method the user should implement. This method will be called when the send to the server has
+     * completed.
+     * @param send The results of the call. This send is guaranteed to be completed so none of its methods will block.
+     */
+    public void onCompletion(RecordSend send);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java b/clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java
new file mode 100644
index 0000000..b82fcfb
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java
@@ -0,0 +1,35 @@
+package kafka.clients.producer;
+
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import kafka.common.Cluster;
+import kafka.common.utils.Utils;
+
+/**
+ * A simple partitioning strategy that will work for messages with or without keys.
+ * <p>
+ * If there is a partition key specified in the record the partitioner will use that for partitioning. Otherwise, if
+ * there there is no partitionKey but there is a normal key that will be used. If neither key is specified the
+ * partitioner will round-robin over partitions in the topic.
+ * <p>
+ * For the cases where there is some key present the partition is computed based on the murmur2 hash of the serialized
+ * key.
+ */
+public class DefaultPartitioner implements Partitioner {
+
+    private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
+
+    /**
+     * Compute the partition
+     */
+    @Override
+    public int partition(ProducerRecord record, byte[] key, byte[] partitionKey, byte[] value, Cluster cluster, int numPartitions) {
+        byte[] keyToUse = partitionKey != null ? partitionKey : key;
+        if (keyToUse == null)
+            return Utils.abs(counter.getAndIncrement()) % numPartitions;
+        else
+            return Utils.abs(Utils.murmur2(keyToUse)) % numPartitions;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/kafka/clients/producer/KafkaProducer.java
new file mode 100644
index 0000000..58eee0c
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/KafkaProducer.java
@@ -0,0 +1,240 @@
+package kafka.clients.producer;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import kafka.clients.producer.internals.Metadata;
+import kafka.clients.producer.internals.RecordAccumulator;
+import kafka.clients.producer.internals.Sender;
+import kafka.common.Cluster;
+import kafka.common.KafkaException;
+import kafka.common.Metric;
+import kafka.common.Serializer;
+import kafka.common.TopicPartition;
+import kafka.common.config.ConfigException;
+import kafka.common.errors.MessageTooLargeException;
+import kafka.common.metrics.JmxReporter;
+import kafka.common.metrics.MetricConfig;
+import kafka.common.metrics.Metrics;
+import kafka.common.metrics.MetricsReporter;
+import kafka.common.network.Selector;
+import kafka.common.record.CompressionType;
+import kafka.common.record.Record;
+import kafka.common.record.Records;
+import kafka.common.utils.KafkaThread;
+import kafka.common.utils.SystemTime;
+
+/**
+ * A Kafka producer that can be used to send data to the Kafka cluster.
+ * <P>
+ * The producer is <i>thread safe</i> and should generally be shared among all threads for best performance.
+ * <p>
+ * The producer manages a single background thread that does I/O as well as a TCP connection to each of the brokers it
+ * needs to communicate with. Failure to close the producer after use will leak these.
+ */
+public class KafkaProducer implements Producer {
+
+    private final int maxRequestSize;
+    private final long metadataFetchTimeoutMs;
+    private final long totalMemorySize;
+    private final Partitioner partitioner;
+    private final Metadata metadata;
+    private final RecordAccumulator accumulator;
+    private final Sender sender;
+    private final Serializer keySerializer;
+    private final Serializer valueSerializer;
+    private final Metrics metrics;
+    private final Thread ioThread;
+
+    /**
+     * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
+     * are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>. Values can be
+     * either strings or Objects of the appropriate type (for example a numeric configuration would accept either the
+     * string "42" or the integer 42).
+     */
+    public KafkaProducer(Map<String, Object> configs) {
+        this(new ProducerConfig(configs));
+    }
+
+    /**
+     * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
+     * are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
+     */
+    public KafkaProducer(Properties properties) {
+        this(new ProducerConfig(properties));
+    }
+
+    private KafkaProducer(ProducerConfig config) {
+        this.metrics = new Metrics(new MetricConfig(),
+                                   Collections.singletonList((MetricsReporter) new JmxReporter("kafka.producer.")),
+                                   new SystemTime());
+        this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
+        this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
+        this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
+        this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
+        this.metadata = new Metadata();
+        this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
+        this.totalMemorySize = config.getLong(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG);
+        this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG),
+                                                 this.totalMemorySize,
+                                                 config.getLong(ProducerConfig.LINGER_MS_CONFIG),
+                                                 config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL),
+                                                 metrics,
+                                                 new SystemTime());
+        List<InetSocketAddress> addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BROKER_LIST_CONFIG));
+        this.metadata.update(Cluster.bootstrap(addresses), System.currentTimeMillis());
+        this.sender = new Sender(new Selector(),
+                                 this.metadata,
+                                 this.accumulator,
+                                 config.getString(ProducerConfig.CLIENT_ID_CONFIG),
+                                 config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
+                                 config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                                 (short) config.getInt(ProducerConfig.REQUIRED_ACKS_CONFIG),
+                                 config.getInt(ProducerConfig.REQUEST_TIMEOUT_CONFIG),
+                                 new SystemTime());
+        this.ioThread = new KafkaThread("kafka-network-thread", this.sender, true);
+        this.ioThread.start();
+    }
+
+    private static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
+        List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
+        for (String url : urls) {
+            if (url != null && url.length() > 0) {
+                String[] pieces = url.split(":");
+                if (pieces.length != 2)
+                    throw new ConfigException("Invalid url in metadata.broker.list: " + url);
+                try {
+                    InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1]));
+                    if (address.isUnresolved())
+                        throw new ConfigException("DNS resolution failed for metadata bootstrap url: " + url);
+                    addresses.add(address);
+                } catch (NumberFormatException e) {
+                    throw new ConfigException("Invalid port in metadata.broker.list: " + url);
+                }
+            }
+        }
+        if (addresses.size() < 1)
+            throw new ConfigException("No bootstrap urls given in metadata.broker.list.");
+        return addresses;
+    }
+
+    /**
+     * Asynchronously send a record to a topic. Equivalent to {@link #send(ProducerRecord, Callback) send(record, null)}
+     */
+    @Override
+    public RecordSend send(ProducerRecord record) {
+        return send(record, null);
+    }
+
+    /**
+     * Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
+     * <p>
+     * The send is asynchronous and this method will return immediately once the record has been serialized and stored
+     * in the buffer of messages waiting to be sent. This allows sending many records in parallel without necessitating
+     * blocking to wait for the response after each one.
+     * <p>
+     * The {@link RecordSend} returned by this call will hold the future response data including the offset assigned to
+     * the message and the error (if any) when the request has completed (or returned an error), and this object can be
+     * used to block awaiting the response. If you want the equivalent of a simple blocking send you can easily achieve
+     * that using the {@link kafka.clients.producer.RecordSend#await() await()} method on the {@link RecordSend} this
+     * call returns:
+     * 
+     * <pre>
+     *   ProducerRecord record = new ProducerRecord("the-topic", "key, "value");
+     *   producer.send(myRecord, null).await();
+     * </pre>
+     * 
+     * Note that the send method will not throw an exception if the request fails while communicating with the cluster,
+     * rather that exception will be thrown when accessing the {@link RecordSend} that is returned.
+     * <p>
+     * Those desiring fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that
+     * will be invoked when the request is complete. Note that the callback will execute in the I/O thread of the
+     * producer and so should be reasonably fast. An example usage of an inline callback would be the following:
+     * 
+     * <pre>
+     *   ProducerRecord record = new ProducerRecord("the-topic", "key, "value");
+     *   producer.send(myRecord,
+     *                 new Callback() {
+     *                     public void onCompletion(RecordSend send) {
+     *                         try {
+     *                             System.out.println("The offset of the message we just sent is: " + send.offset());
+     *                         } catch(KafkaException e) {
+     *                             e.printStackTrace();
+     *                         }
+     *                     }
+     *                 });
+     * </pre>
+     * <p>
+     * This call enqueues the message in the buffer of outgoing messages to be sent. This buffer has a hard limit on
+     * it's size controlled by the configuration <code>total.memory.bytes</code>. If <code>send()</code> is called
+     * faster than the I/O thread can send data to the brokers we will eventually run out of buffer space. The default
+     * behavior in this case is to block the send call until the I/O thread catches up and more buffer space is
+     * available. However if non-blocking usage is desired the setting <code>block.on.buffer.full=false</code> will
+     * cause the producer to instead throw an exception when this occurs.
+     * 
+     * @param record The record to send
+     * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
+     *        indicates no callback)
+     * @throws BufferExhausedException This exception is thrown if the buffer is full and blocking has been disabled.
+     * @throws MessageTooLargeException This exception is thrown if the serialized size of the message is larger than
+     *         the maximum buffer memory or maximum request size that has been configured (whichever is smaller).
+     */
+    @Override
+    public RecordSend send(ProducerRecord record, Callback callback) {
+        Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs);
+        byte[] key = keySerializer.toBytes(record.key());
+        byte[] value = valueSerializer.toBytes(record.value());
+        byte[] partitionKey = keySerializer.toBytes(record.partitionKey());
+        int partition = partitioner.partition(record, key, partitionKey, value, cluster, cluster.partitionsFor(record.topic()).size());
+        ensureValidSize(key, value);
+        try {
+            TopicPartition tp = new TopicPartition(record.topic(), partition);
+            RecordSend send = accumulator.append(tp, key, value, CompressionType.NONE, callback);
+            this.sender.wakeup();
+            return send;
+        } catch (InterruptedException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    /**
+     * Check that this key-value pair will have a serialized size small enough
+     */
+    private void ensureValidSize(byte[] key, byte[] value) {
+        int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
+        if (serializedSize > this.maxRequestSize)
+            throw new MessageTooLargeException("The message is " + serializedSize
+                                               + " bytes when serialized which is larger than the maximum request size you have configured with the "
+                                               + ProducerConfig.MAX_REQUEST_SIZE_CONFIG
+                                               + " configuration.");
+        if (serializedSize > this.totalMemorySize)
+            throw new MessageTooLargeException("The message is " + serializedSize
+                                               + " bytes when serialized which is larger than the total memory buffer you have configured with the "
+                                               + ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG
+                                               + " configuration.");
+    }
+
+    @Override
+    public Map<String, ? extends Metric> metrics() {
+        return Collections.unmodifiableMap(this.metrics.metrics());
+    }
+
+    /**
+     * Close this producer. This method blocks until all in-flight requests complete.
+     */
+    @Override
+    public void close() {
+        this.sender.initiateClose();
+        try {
+            this.ioThread.join();
+        } catch (InterruptedException e) {
+            throw new KafkaException(e);
+        }
+        this.metrics.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/MockProducer.java b/clients/src/main/java/kafka/clients/producer/MockProducer.java
new file mode 100644
index 0000000..2ea2030
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/MockProducer.java
@@ -0,0 +1,201 @@
+package kafka.clients.producer;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import kafka.clients.producer.internals.ProduceRequestResult;
+import kafka.common.Cluster;
+import kafka.common.Metric;
+import kafka.common.Serializer;
+import kafka.common.TopicPartition;
+
+/**
+ * A mock of the producer interface you can use for testing code that uses Kafka.
+ * <p>
+ * By default this mock will synchronously complete each send call successfully. However it can be configured to allow
+ * the user to control the completion of the call and supply an optional error for the producer to throw.
+ */
+public class MockProducer implements Producer {
+
+    private final Serializer keySerializer;
+    private final Serializer valueSerializer;
+    private final Partitioner partitioner;
+    private final Cluster cluster;
+    private final List<ProducerRecord> sent;
+    private final Deque<Completion> completions;
+    private boolean autoComplete;
+    private Map<TopicPartition, Long> offsets;
+
+    /**
+     * Create a mock producer
+     * 
+     * @param keySerializer A serializer to use on keys (useful to test your serializer on the values)
+     * @param valueSerializer A serializer to use on values (useful to test your serializer on the values)
+     * @param partitioner A partitioner to choose partitions (if null the partition will always be 0)
+     * @param cluster The cluster to pass to the partitioner (can be null if partitioner is null)
+     * @param autoComplete If true automatically complete all requests successfully and execute the callback. Otherwise
+     *        the user must call {@link #completeNext()} or {@link #errorNext(RuntimeException)} after
+     *        {@link #send(ProducerRecord) send()} to complete the call and unblock the @{link RecordSend} that is
+     *        returned.
+     */
+    public MockProducer(Serializer keySerializer, Serializer valueSerializer, Partitioner partitioner, Cluster cluster, boolean autoComplete) {
+        if (partitioner != null && (cluster == null | keySerializer == null | valueSerializer == null))
+            throw new IllegalArgumentException("If a partitioner is provided a cluster instance and key and value serializer for partitioning must also be given.");
+        this.keySerializer = keySerializer;
+        this.valueSerializer = valueSerializer;
+        this.partitioner = partitioner;
+        this.cluster = cluster;
+        this.autoComplete = autoComplete;
+        this.offsets = new HashMap<TopicPartition, Long>();
+        this.sent = new ArrayList<ProducerRecord>();
+        this.completions = new ArrayDeque<Completion>();
+    }
+
+    /**
+     * Create a new mock producer with no serializers or partitioner and the given autoComplete setting.
+     * 
+     * Equivalent to {@link #MockProducer(Serializer, Serializer, Partitioner, Cluster, boolean) new MockProducer(null,
+     * null, null, null, autoComplete)}
+     */
+    public MockProducer(boolean autoComplete) {
+        this(null, null, null, null, autoComplete);
+    }
+
+    /**
+     * Create a new auto completing mock producer with no serializers or partitioner.
+     * 
+     * Equivalent to {@link #MockProducer(boolean) new MockProducer(true)}
+     */
+    public MockProducer() {
+        this(true);
+    }
+
+    /**
+     * Adds the record to the list of sent records. The {@link RecordSend} returned will be immediately satisfied.
+     * 
+     * @see #history()
+     */
+    @Override
+    public synchronized RecordSend send(ProducerRecord record) {
+        return send(record, null);
+    }
+
+    /**
+     * Adds the record to the list of sent records. The {@link RecordSend} returned will be immediately satisfied and
+     * the callback will be synchronously executed.
+     * 
+     * @see #history()
+     */
+    @Override
+    public synchronized RecordSend send(ProducerRecord record, Callback callback) {
+        byte[] key = keySerializer == null ? null : keySerializer.toBytes(record.key());
+        byte[] partitionKey = keySerializer == null ? null : keySerializer.toBytes(record.partitionKey());
+        byte[] value = valueSerializer == null ? null : valueSerializer.toBytes(record.value());
+        int numPartitions = partitioner == null ? 0 : this.cluster.partitionsFor(record.topic()).size();
+        int partition = partitioner == null ? 0 : partitioner.partition(record, key, partitionKey, value, this.cluster, numPartitions);
+        ProduceRequestResult result = new ProduceRequestResult();
+        RecordSend send = new RecordSend(0, result);
+        TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
+        long offset = nextOffset(topicPartition);
+        Completion completion = new Completion(topicPartition, offset, send, result, callback);
+        this.sent.add(record);
+        if (autoComplete)
+            completion.complete(null);
+        else
+            this.completions.addLast(completion);
+        return send;
+    }
+
+    /**
+     * Get the next offset for this topic/partition
+     */
+    private long nextOffset(TopicPartition tp) {
+        Long offset = this.offsets.get(tp);
+        if (offset == null) {
+            this.offsets.put(tp, 1L);
+            return 0L;
+        } else {
+            Long next = offset + 1;
+            this.offsets.put(tp, next);
+            return offset;
+        }
+    }
+
+    public Map<String, Metric> metrics() {
+        return Collections.emptyMap();
+    }
+
+    /**
+     * "Closes" the producer
+     */
+    @Override
+    public void close() {
+    }
+
+    /**
+     * Get the list of sent records since the last call to {@link #clear()}
+     */
+    public synchronized List<ProducerRecord> history() {
+        return new ArrayList<ProducerRecord>(this.sent);
+    }
+
+    /**
+     * Clear the stored history of sent records
+     */
+    public synchronized void clear() {
+        this.sent.clear();
+        this.completions.clear();
+    }
+
+    /**
+     * Complete the earliest uncompleted call successfully.
+     * 
+     * @return true if there was an uncompleted call to complete
+     */
+    public synchronized boolean completeNext() {
+        return errorNext(null);
+    }
+
+    /**
+     * Complete the earliest uncompleted call with the given error.
+     * 
+     * @return true if there was an uncompleted call to complete
+     */
+    public synchronized boolean errorNext(RuntimeException e) {
+        Completion completion = this.completions.pollFirst();
+        if (completion != null) {
+            completion.complete(e);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private static class Completion {
+        private final long offset;
+        private final RecordSend send;
+        private final ProduceRequestResult result;
+        private final Callback callback;
+        private final TopicPartition topicPartition;
+
+        public Completion(TopicPartition topicPartition, long offset, RecordSend send, ProduceRequestResult result, Callback callback) {
+            this.send = send;
+            this.offset = offset;
+            this.result = result;
+            this.callback = callback;
+            this.topicPartition = topicPartition;
+        }
+
+        public void complete(RuntimeException e) {
+            result.done(topicPartition, e == null ? offset : -1L, e);
+            if (callback != null)
+                callback.onCompletion(send);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/clients/producer/Partitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/Partitioner.java b/clients/src/main/java/kafka/clients/producer/Partitioner.java
new file mode 100644
index 0000000..1b8e51f
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/Partitioner.java
@@ -0,0 +1,30 @@
+package kafka.clients.producer;
+
+import kafka.common.Cluster;
+
+/**
+ * An interface by which clients can override the default partitioning behavior that maps records to topic partitions.
+ * <p>
+ * A partitioner can use either the original java object the user provided or the serialized bytes.
+ * <p>
+ * It is expected that the partitioner will make use the key for partitioning, but there is no requirement that an
+ * implementation do so. An implementation can use the key, the value, the state of the cluster, or any other side data.
+ */
+public interface Partitioner {
+
+    /**
+     * Compute the partition for the given record. This partition number must be in the range [0...numPartitions). The
+     * cluster state provided is the most up-to-date view that the client has but leadership can change at any time so
+     * there is no guarantee that the node that is the leader for a particular partition at the time the partition
+     * function is called will still be the leader by the time the request is sent.
+     * 
+     * @param record The record being sent
+     * @param key The serialized bytes of the key (null if no key is given or the serialized form is null)
+     * @param value The serialized bytes of the value (null if no value is given or the serialized form is null)
+     * @param cluster The current state of the cluster
+     * @param numPartitions The total number of partitions for the given topic
+     * @return The partition to send this record to
+     */
+    public int partition(ProducerRecord record, byte[] key, byte[] partitionKey, byte[] value, Cluster cluster, int numPartitions);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/clients/producer/Producer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/Producer.java b/clients/src/main/java/kafka/clients/producer/Producer.java
new file mode 100644
index 0000000..6ba6633
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/Producer.java
@@ -0,0 +1,38 @@
+package kafka.clients.producer;
+
+import java.util.Map;
+
+import kafka.common.Metric;
+
+/**
+ * The interface for the {@link KafkaProducer}
+ * 
+ * @see KafkaProducer
+ * @see MockProducer
+ */
+public interface Producer {
+
+    /**
+     * Send the given record asynchronously and return a future which will eventually contain the response information.
+     * 
+     * @param record The record to send
+     * @return A future which will eventually contain the response information
+     */
+    public RecordSend send(ProducerRecord record);
+
+    /**
+     * Send a message and invoke the given callback when the send is complete
+     */
+    public RecordSend send(ProducerRecord record, Callback callback);
+
+    /**
+     * Return a map of metrics maintained by the producer
+     */
+    public Map<String, ? extends Metric> metrics();
+
+    /**
+     * Close this producer
+     */
+    public void close();
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/kafka/clients/producer/ProducerConfig.java
new file mode 100644
index 0000000..9758293
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/ProducerConfig.java
@@ -0,0 +1,148 @@
+package kafka.clients.producer;
+
+import static kafka.common.config.ConfigDef.Range.atLeast;
+import static kafka.common.config.ConfigDef.Range.between;
+
+import java.util.Map;
+
+import kafka.common.config.AbstractConfig;
+import kafka.common.config.ConfigDef;
+import kafka.common.config.ConfigDef.Type;
+
+/**
+ * The producer configuration keys
+ */
+public class ProducerConfig extends AbstractConfig {
+
+    private static final ConfigDef config;
+
+    /**
+     * A list of URLs to use for establishing the initial connection to the cluster. This list should be in the form
+     * <code>host1:port1,host2:port2,...</code>. These urls are just used for the initial connection to discover the
+     * full cluster membership (which may change dynamically) so this list need not contain the full set of servers (you
+     * may want more than one, though, in case a server is down).
+     */
+    public static final String BROKER_LIST_CONFIG = "metadata.broker.list";
+
+    /**
+     * The amount of time to block waiting to fetch metadata about a topic the first time a message is sent to that
+     * topic.
+     */
+    public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";
+
+    /**
+     * The buffer size allocated for a partition. When messages are received which are smaller than this size the
+     * producer will attempt to optimistically group them together until this size is reached.
+     */
+    public static final String MAX_PARTITION_SIZE_CONFIG = "max.partition.bytes";
+
+    /**
+     * The total memory used by the producer to buffer messages waiting to be sent to the server. If messages are sent
+     * faster than they can be delivered to the server the producer will either block or throw an exception based on the
+     * preference specified by {@link #BLOCK_ON_BUFFER_FULL}.
+     */
+    public static final String TOTAL_BUFFER_MEMORY_CONFIG = "total.memory.bytes";
+
+    /**
+     * The number of acknowledgments the producer requires from the server before considering a request complete.
+     */
+    public static final String REQUIRED_ACKS_CONFIG = "request.required.acks";
+
+    /**
+     * The maximum amount of time the server will wait for acknowledgments from followers to meet the acknowledgment
+     * requirements the producer has specified. If the requested number of acknowledgments are not met an error will be
+     * returned.
+     */
+    public static final String REQUEST_TIMEOUT_CONFIG = "request.timeout.ms";
+
+    /**
+     * The producer groups together any messages that arrive in between request sends. Normally this occurs only under
+     * load when messages arrive faster than they can be sent out. However the client can reduce the number of requests
+     * and increase throughput by adding a small amount of artificial delay to force more messages to batch together.
+     * This setting gives an upper bound on this delay. If we get {@link #MAX_PARTITION_SIZE_CONFIG} worth of messages
+     * for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many
+     * bytes accumulated for this partition we will "linger" for the specified time waiting for more messages to show
+     * up. This setting defaults to 0.
+     */
+    public static final String LINGER_MS_CONFIG = "linger.ms";
+
+    /**
+     * The fully qualified name of the {@link kafka.common.Serializer} class to use for serializing record values.
+     */
+    public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer.class";
+
+    /**
+     * The fully qualified name of the {@link kafka.common.Serializer} class to use for serializing record keys.
+     */
+    public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer.class";
+
+    /**
+     * The class to use for choosing a partition to send the message to
+     */
+    public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
+
+    /**
+     * Force a refresh of the cluster metadata after this period of time. This ensures that changes to the number of
+     * partitions or other settings will by taken up by producers without restart.
+     */
+    public static final String METADATA_REFRESH_MS_CONFIG = "topic.metadata.refresh.interval.ms";
+
+    /**
+     * The id string to pass to the server when making requests. The purpose of this is to be able to track the source
+     * of requests beyond just ip/port by allowing a logical application name to be included.
+     */
+    public static final String CLIENT_ID_CONFIG = "client.id";
+
+    /**
+     * The size of the TCP send buffer to use when sending data
+     */
+    public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
+
+    /**
+     * The maximum size of a request. This is also effectively a cap on the maximum message size. Note that the server
+     * has its own cap on message size which may be different from this.
+     */
+    public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
+
+    /**
+     * The amount of time to wait before attempting to reconnect to a given host. This avoids repeated connecting to a
+     * host in a tight loop.
+     */
+    public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
+
+    /**
+     * When our memory buffer is exhausted we must either stop accepting new messages (block) or throw errors. By
+     * default this setting is true and we block, however users who want to guarantee we never block can turn this into
+     * an error.
+     */
+    public static final String BLOCK_ON_BUFFER_FULL = "block.on.buffer.full";
+
+    public static final String ENABLE_JMX = "enable.jmx";
+
+    static {
+        /* TODO: add docs */
+        config = new ConfigDef().define(BROKER_LIST_CONFIG, Type.LIST, "blah blah")
+                                .define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, 60 * 1000, atLeast(0), "blah blah")
+                                .define(MAX_PARTITION_SIZE_CONFIG, Type.INT, 16384, atLeast(0), "blah blah")
+                                .define(TOTAL_BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), "blah blah")
+                                /* TODO: should be a string to handle acks=in-sync */
+                                .define(REQUIRED_ACKS_CONFIG, Type.INT, 1, between(-1, Short.MAX_VALUE), "blah blah")
+                                .define(REQUEST_TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), "blah blah")
+                                .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), "blah blah")
+                                .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, "blah blah")
+                                .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, "blah blah")
+                                .define(PARTITIONER_CLASS_CONFIG, Type.CLASS, DefaultPartitioner.class.getName(), "blah blah")
+                                .define(METADATA_REFRESH_MS_CONFIG, Type.LONG, 10 * 60 * 1000, atLeast(-1L), "blah blah")
+                                .define(CLIENT_ID_CONFIG, Type.STRING, "", "blah blah")
+                                .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), "blah blah")
+                                .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), "blah blah")
+                                .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah")
+                                .define(BLOCK_ON_BUFFER_FULL, Type.BOOLEAN, true, "blah blah")
+                                .define(ENABLE_JMX, Type.BOOLEAN, true, "");
+    }
+
+    ProducerConfig(Map<? extends Object, ? extends Object> props) {
+        super(config, props);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/clients/producer/ProducerRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/kafka/clients/producer/ProducerRecord.java
new file mode 100644
index 0000000..5fddbef
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/ProducerRecord.java
@@ -0,0 +1,84 @@
+package kafka.clients.producer;
+
+/**
+ * An unserialized key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent,
+ * a value (which can be null) which is the contents of the record and an optional key (which can also be null). In
+ * cases the key used for choosing a partition is going to be different the user can specify a partition key which will
+ * be used only for computing the partition to which this record will be sent and will not be retained with the record.
+ */
+public final class ProducerRecord {
+
+    private final String topic;
+    private final Object key;
+    private final Object partitionKey;
+    private final Object value;
+
+    /**
+     * Creates a record to be sent to Kafka using a special override key for partitioning that is different form the key
+     * retained in the record
+     * 
+     * @param topic The topic the record will be appended to
+     * @param key The key that will be included in the record
+     * @param partitionKey An override for the key to be used only for partitioning purposes in the client. This key
+     *        will not be retained or available to downstream consumers.
+     * @param value The record contents
+     */
+    public ProducerRecord(String topic, Object key, Object partitionKey, Object value) {
+        if (topic == null)
+            throw new IllegalArgumentException("Topic cannot be null");
+        this.topic = topic;
+        this.key = key;
+        this.partitionKey = partitionKey;
+        this.value = value;
+    }
+
+    /**
+     * Create a record to be sent to Kafka
+     * 
+     * @param topic The topic the record will be appended to
+     * @param key The key that will be included in the record
+     * @param value The record contents
+     */
+    public ProducerRecord(String topic, Object key, Object value) {
+        this(topic, key, key, value);
+    }
+
+    /**
+     * Create a record with no key
+     * 
+     * @param topic The topic this record should be sent to
+     * @param value The record contents
+     */
+    public ProducerRecord(String topic, Object value) {
+        this(topic, null, value);
+    }
+
+    /**
+     * The topic this record is being sent to
+     */
+    public String topic() {
+        return topic;
+    }
+
+    /**
+     * The key (or null if no key is specified)
+     */
+    public Object key() {
+        return key;
+    }
+
+    /**
+     * An override key to use instead of the main record key
+     */
+    public Object partitionKey() {
+        return partitionKey;
+    }
+
+    /**
+     * @return The value
+     */
+    public Object value() {
+        return value;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/clients/producer/RecordSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/RecordSend.java b/clients/src/main/java/kafka/clients/producer/RecordSend.java
new file mode 100644
index 0000000..1883dab
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/RecordSend.java
@@ -0,0 +1,88 @@
+package kafka.clients.producer;
+
+import java.util.concurrent.TimeUnit;
+
+import kafka.clients.producer.internals.ProduceRequestResult;
+import kafka.common.errors.ApiException;
+import kafka.common.errors.TimeoutException;
+
+/**
+ * An asynchronously computed response from sending a record. Calling <code>await()</code> or most of the other accessor
+ * methods will block until the response for this record is available. If you wish to avoid blocking provide a
+ * {@link kafka.clients.producer.Callback Callback} with the record send.
+ */
+public final class RecordSend {
+
+    private final long relativeOffset;
+    private final ProduceRequestResult result;
+
+    public RecordSend(long relativeOffset, ProduceRequestResult result) {
+        this.relativeOffset = relativeOffset;
+        this.result = result;
+    }
+
+    /**
+     * Block until this send has completed successfully. If the request fails, throw the error that occurred in sending
+     * the request.
+     * @return the same object for chaining of calls
+     * @throws TimeoutException if the thread is interrupted while waiting
+     * @throws ApiException if the request failed.
+     */
+    public RecordSend await() {
+        result.await();
+        if (result.error() != null)
+            throw result.error();
+        return this;
+    }
+
+    /**
+     * Block until this send is complete or the given timeout elapses
+     * @param timeout the time to wait
+     * @param unit the units of the time given
+     * @return the same object for chaining
+     * @throws TimeoutException if the request isn't satisfied in the time period given or the thread is interrupted
+     *         while waiting
+     * @throws ApiException if the request failed.
+     */
+    public RecordSend await(long timeout, TimeUnit unit) {
+        boolean success = result.await(timeout, unit);
+        if (!success)
+            throw new TimeoutException("Request did not complete after " + timeout + " " + unit);
+        if (result.error() != null)
+            throw result.error();
+        return this;
+    }
+
+    /**
+     * Get the offset for the given message. This method will block until the request is complete and will throw an
+     * exception if the request fails.
+     * @return The offset
+     */
+    public long offset() {
+        await();
+        return this.result.baseOffset() + this.relativeOffset;
+    }
+
+    /**
+     * Check if the request is complete without blocking
+     */
+    public boolean completed() {
+        return this.result.completed();
+    }
+
+    /**
+     * Block on request completion and return true if there was an error.
+     */
+    public boolean hasError() {
+        result.await();
+        return this.result.error() != null;
+    }
+
+    /**
+     * Return the error thrown
+     */
+    public Exception error() {
+        result.await();
+        return this.result.error();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/clients/producer/internals/BufferPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/kafka/clients/producer/internals/BufferPool.java
new file mode 100644
index 0000000..c222ca0
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/internals/BufferPool.java
@@ -0,0 +1,223 @@
+package kafka.clients.producer.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import kafka.clients.producer.BufferExhaustedException;
+
+/**
+ * A pool of ByteBuffers kept under a given memory limit. This class is fairly specific to the needs of the producer. In
+ * particular it has the following properties:
+ * <ol>
+ * <li>There is a special "poolable size" and buffers of this size are kept in a free list and recycled
+ * <li>It is fair. That is all memory is given to the longest waiting thread until it has sufficient memory. This
+ * prevents starvation or deadlock when a thread asks for a large chunk of memory and needs to block until multiple
+ * buffers are deallocated.
+ * </ol>
+ */
+public final class BufferPool {
+
+    private final long totalMemory;
+    private final int poolableSize;
+    private final boolean blockOnExhaustion;
+    private final ReentrantLock lock;
+    private final Deque<ByteBuffer> free;
+    private final Deque<Condition> waiters;
+    private long availableMemory;
+
+    /**
+     * Create a new buffer pool
+     * 
+     * @param memory The maximum amount of memory that this buffer pool can allocate
+     * @param poolableSize The buffer size to cache in the free list rather than deallocating
+     * @param blockOnExhaustion This controls the behavior when the buffer pool is out of memory. If true the
+     *        {@link #allocate(int)} call will block and wait for memory to be returned to the pool. If false
+     *        {@link #allocate(int)} will throw an exception if the buffer is out of memory.
+     */
+    public BufferPool(long memory, int poolableSize, boolean blockOnExhaustion) {
+        this.poolableSize = poolableSize;
+        this.blockOnExhaustion = blockOnExhaustion;
+        this.lock = new ReentrantLock();
+        this.free = new ArrayDeque<ByteBuffer>();
+        this.waiters = new ArrayDeque<Condition>();
+        this.totalMemory = memory;
+        this.availableMemory = memory;
+    }
+
+    /**
+     * Allocate a buffer of the given size
+     * 
+     * @param size The buffer size to allocate in bytes
+     * @return The buffer
+     * @throws InterruptedException If the thread is interrupted while blocked
+     * @throws IllegalArgument if size is larger than the total memory controlled by the pool (and hence we would block
+     *         forever)
+     * @throws BufferExhaustedException if the pool is in non-blocking mode and size exceeds the free memory in the pool
+     */
+    public ByteBuffer allocate(int size) throws InterruptedException {
+        if (size > this.totalMemory)
+            throw new IllegalArgumentException("Attempt to allocate " + size
+                                               + " bytes, but there is a hard limit of "
+                                               + this.totalMemory
+                                               + " on memory allocations.");
+
+        this.lock.lock();
+        try {
+            // check if we have a free buffer of the right size pooled
+            if (size == poolableSize && !this.free.isEmpty())
+                return this.free.pollFirst();
+
+            // now check if the request is immediately satisfiable with the
+            // memory on hand or if we need to block
+            int freeListSize = this.free.size() * this.poolableSize;
+            if (this.availableMemory + freeListSize >= size) {
+                // we have enough unallocated or pooled memory to immediately
+                // satisfy the request
+                freeUp(size);
+                this.availableMemory -= size;
+                lock.unlock();
+                return ByteBuffer.allocate(size);
+            } else if (!blockOnExhaustion) {
+                throw new BufferExhaustedException("You have exhausted the " + this.totalMemory
+                                                   + " bytes of memory you configured for the client and the client is configured to error"
+                                                   + " rather than block when memory is exhausted.");
+            } else {
+                // we are out of memory and will have to block
+                int accumulated = 0;
+                ByteBuffer buffer = null;
+                Condition moreMemory = this.lock.newCondition();
+                this.waiters.addLast(moreMemory);
+                // loop over and over until we have a buffer or have reserved
+                // enough memory to allocate one
+                while (accumulated < size) {
+                    moreMemory.await();
+                    // check if we can satisfy this request from the free list,
+                    // otherwise allocate memory
+                    if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
+                        // just grab a buffer from the free list
+                        buffer = this.free.pollFirst();
+                        accumulated = size;
+                    } else {
+                        // we'll need to allocate memory, but we may only get
+                        // part of what we need on this iteration
+                        freeUp(size - accumulated);
+                        int got = (int) Math.min(size - accumulated, this.availableMemory);
+                        this.availableMemory -= got;
+                        accumulated += got;
+                    }
+                }
+
+                // remove the condition for this thread to let the next thread
+                // in line start getting memory
+                Condition removed = this.waiters.removeFirst();
+                if (removed != moreMemory)
+                    throw new IllegalStateException("Wrong condition: this shouldn't happen.");
+
+                // signal any additional waiters if there is more memory left
+                // over for them
+                if (this.availableMemory > 0 || !this.free.isEmpty()) {
+                    if (!this.waiters.isEmpty())
+                        this.waiters.peekFirst().signal();
+                }
+
+                // unlock and return the buffer
+                lock.unlock();
+                if (buffer == null)
+                    return ByteBuffer.allocate(size);
+                else
+                    return buffer;
+            }
+        } finally {
+            if (lock.isHeldByCurrentThread())
+                lock.unlock();
+        }
+    }
+
+    /**
+     * Attempt to ensure we have at least the requested number of bytes of memory for allocation by deallocating pooled
+     * buffers (if needed)
+     */
+    private void freeUp(int size) {
+        while (!this.free.isEmpty() && this.availableMemory < size)
+            this.availableMemory += this.free.pollLast().capacity();
+    }
+
+    /**
+     * Return buffers to the pool. If they are of the poolable size add them to the free list, otherwise just mark the
+     * memory as free.
+     * 
+     * @param buffers The buffers to return
+     */
+    public void deallocate(ByteBuffer... buffers) {
+        lock.lock();
+        try {
+            for (int i = 0; i < buffers.length; i++) {
+                int size = buffers[i].capacity();
+                if (size == this.poolableSize) {
+                    buffers[i].clear();
+                    this.free.add(buffers[i]);
+                } else {
+                    this.availableMemory += size;
+                }
+                Condition moreMem = this.waiters.peekFirst();
+                if (moreMem != null)
+                    moreMem.signal();
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * the total free memory both unallocated and in the free list
+     */
+    public long availableMemory() {
+        lock.lock();
+        try {
+            return this.availableMemory + this.free.size() * this.poolableSize;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Get the unallocated memory (not in the free list or in use)
+     */
+    public long unallocatedMemory() {
+        lock.lock();
+        try {
+            return this.availableMemory;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * The number of threads blocked waiting on memory
+     */
+    public int queued() {
+        lock.lock();
+        try {
+            return this.waiters.size();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * The buffer size that will be retained in the free list after use
+     */
+    public int poolableSize() {
+        return this.poolableSize;
+    }
+
+    /**
+     * The total memory managed by this pool
+     */
+    public long totalMemory() {
+        return this.totalMemory;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/kafka/clients/producer/internals/Metadata.java
new file mode 100644
index 0000000..f5f8b9b
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/internals/Metadata.java
@@ -0,0 +1,120 @@
+package kafka.clients.producer.internals;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import kafka.common.Cluster;
+import kafka.common.PartitionInfo;
+import kafka.common.errors.TimeoutException;
+
+/**
+ * A class encapsulating some of the logic around metadata.
+ * <p>
+ * This class is shared by the client thread (for partitioning) and the background sender thread.
+ * 
+ * Metadata is maintained for only a subset of topics, which can be added to over time. When we request metdata for a
+ * topic we don't have any metadata for it will trigger a metadata update.
+ */
+public final class Metadata {
+
+    private final long refreshBackoffMs;
+    private final long metadataExpireMs;
+    private long lastRefresh;
+    private Cluster cluster;
+    private boolean forceUpdate;
+    private final Set<String> topics;
+
+    /**
+     * Create a metadata instance with reasonable defaults
+     */
+    public Metadata() {
+        this(100L, 60 * 60 * 1000L);
+    }
+
+    /**
+     * Create a new Metadata instance
+     * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
+     *        polling
+     * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
+     */
+    public Metadata(long refreshBackoffMs, long metadataExpireMs) {
+        this.refreshBackoffMs = refreshBackoffMs;
+        this.metadataExpireMs = metadataExpireMs;
+        this.lastRefresh = 0L;
+        this.cluster = Cluster.empty();
+        this.forceUpdate = false;
+        this.topics = new HashSet<String>();
+    }
+
+    /**
+     * Get the current cluster info without blocking
+     */
+    public synchronized Cluster fetch() {
+        return this.cluster;
+    }
+
+    /**
+     * Fetch cluster metadata including partitions for the given topic. If there is no metadata for the given topic,
+     * block waiting for an update.
+     * @param topic The topic we want metadata for
+     * @param maxWaitMs The maximum amount of time to block waiting for metadata
+     */
+    public synchronized Cluster fetch(String topic, long maxWaitMs) {
+        List<PartitionInfo> partitions = null;
+        do {
+            partitions = cluster.partitionsFor(topic);
+            if (partitions == null) {
+                long begin = System.currentTimeMillis();
+                topics.add(topic);
+                forceUpdate = true;
+                try {
+                    wait(maxWaitMs);
+                } catch (InterruptedException e) { /* this is fine, just try again */
+                }
+                long ellapsed = System.currentTimeMillis() - begin;
+                if (ellapsed > maxWaitMs)
+                    throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
+            } else {
+                return cluster;
+            }
+        } while (true);
+    }
+
+    /**
+     * Does the current cluster info need to be updated? An update is needed if it has been at least refreshBackoffMs
+     * since our last update and either (1) an update has been requested or (2) the current metadata has expired (more
+     * than metadataExpireMs has passed since the last refresh)
+     */
+    public synchronized boolean needsUpdate(long now) {
+        long msSinceLastUpdate = now - this.lastRefresh;
+        boolean updateAllowed = msSinceLastUpdate >= this.refreshBackoffMs;
+        boolean updateNeeded = this.forceUpdate || msSinceLastUpdate >= this.metadataExpireMs;
+        return updateAllowed && updateNeeded;
+    }
+
+    /**
+     * Force an update of the current cluster info
+     */
+    public synchronized void forceUpdate() {
+        this.forceUpdate = true;
+    }
+
+    /**
+     * Get the list of topics we are currently maintaining metadata for
+     */
+    public synchronized Set<String> topics() {
+        return new HashSet<String>(this.topics);
+    }
+
+    /**
+     * Update the cluster metadata
+     */
+    public synchronized void update(Cluster cluster, long now) {
+        this.forceUpdate = false;
+        this.lastRefresh = now;
+        this.cluster = cluster;
+        notifyAll();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java b/clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java
new file mode 100644
index 0000000..1049b61
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java
@@ -0,0 +1,83 @@
+package kafka.clients.producer.internals;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import kafka.clients.producer.RecordSend;
+import kafka.common.TopicPartition;
+import kafka.common.errors.TimeoutException;
+
+/**
+ * A class that models the future completion of a produce request for a single partition. There is one of these per
+ * partition in a produce request and it is shared by all the {@link RecordSend} instances that are batched together for
+ * the same partition in the request.
+ */
+public final class ProduceRequestResult {
+
+    private final CountDownLatch latch = new CountDownLatch(1);
+    private volatile TopicPartition topicPartition;
+    private volatile long baseOffset = -1L;
+    private volatile RuntimeException error;
+
+    public ProduceRequestResult() {
+    }
+
+    /**
+     * Mark this request as complete and unblock any threads waiting on its completion.
+     * @param topicPartition The topic and partition to which this record set was sent was sent
+     * @param baseOffset The base offset assigned to the message
+     * @param error The error that occurred if there was one, or null.
+     */
+    public void done(TopicPartition topicPartition, long baseOffset, RuntimeException error) {
+        this.topicPartition = topicPartition;
+        this.baseOffset = baseOffset;
+        this.error = error;
+        this.latch.countDown();
+    }
+
+    /**
+     * Await the completion of this request
+     */
+    public void await() {
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            throw new TimeoutException("Interrupted while waiting for request to complete.");
+        }
+    }
+
+    /**
+     * Await the completion of this request (up to the given time interval)
+     * @param timeout The maximum time to wait
+     * @param unit The unit for the max time
+     * @return true if the request completed, false if we timed out
+     */
+    public boolean await(long timeout, TimeUnit unit) {
+        try {
+            return latch.await(timeout, unit);
+        } catch (InterruptedException e) {
+            throw new TimeoutException("Interrupted while waiting for request to complete.");
+        }
+    }
+
+    /**
+     * The base offset for the request (the first offset in the message set)
+     */
+    public long baseOffset() {
+        return baseOffset;
+    }
+
+    /**
+     * The error thrown (generally on the server) while processing this request
+     */
+    public RuntimeException error() {
+        return error;
+    }
+
+    /**
+     * Has the request completed?
+     */
+    public boolean completed() {
+        return this.latch.getCount() == 0L;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java
new file mode 100644
index 0000000..a2b536c
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java
@@ -0,0 +1,236 @@
+package kafka.clients.producer.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import kafka.clients.producer.Callback;
+import kafka.clients.producer.RecordSend;
+import kafka.common.TopicPartition;
+import kafka.common.metrics.Measurable;
+import kafka.common.metrics.MetricConfig;
+import kafka.common.metrics.Metrics;
+import kafka.common.record.CompressionType;
+import kafka.common.record.MemoryRecords;
+import kafka.common.record.Record;
+import kafka.common.record.Records;
+import kafka.common.utils.CopyOnWriteMap;
+import kafka.common.utils.Time;
+import kafka.common.utils.Utils;
+
+/**
+ * This class acts as a queue that accumulates records into {@link kafka.common.record.MemoryRecords} instances to be
+ * sent to the server.
+ * <p>
+ * The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
+ * this behavior is explicitly disabled.
+ */
+public final class RecordAccumulator {
+
+    private volatile boolean closed;
+    private int drainIndex;
+    private final int batchSize;
+    private final long lingerMs;
+    private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
+    private final BufferPool free;
+    private final Time time;
+
+    /**
+     * Create a new record accumulator
+     * 
+     * @param batchSize The size to use when allocating {@link kafka.common.record.MemoryRecords} instances
+     * @param totalSize The maximum memory the record accumulator can use.
+     * @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for
+     *        sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some
+     *        latency for potentially better throughput due to more batching (and hence fewer, larger requests).
+     * @param blockOnBufferFull If true block when we are out of memory; if false throw an exception when we are out of
+     *        memory
+     * @param metrics The metrics
+     * @param time The time instance to use
+     */
+    public RecordAccumulator(int batchSize, long totalSize, long lingerMs, boolean blockOnBufferFull, Metrics metrics, Time time) {
+        this.drainIndex = 0;
+        this.closed = false;
+        this.batchSize = batchSize;
+        this.lingerMs = lingerMs;
+        this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();
+        this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull);
+        this.time = time;
+        registerMetrics(metrics);
+    }
+
+    private void registerMetrics(Metrics metrics) {
+        metrics.addMetric("blocked_threads",
+                          "The number of user threads blocked waiting for buffer memory to enqueue their messages",
+                          new Measurable() {
+                              public double measure(MetricConfig config, long now) {
+                                  return free.queued();
+                              }
+                          });
+        metrics.addMetric("buffer_total_bytes",
+                          "The total amount of buffer memory that is available (not currently used for buffering messages).",
+                          new Measurable() {
+                              public double measure(MetricConfig config, long now) {
+                                  return free.totalMemory();
+                              }
+                          });
+        metrics.addMetric("buffer_available_bytes",
+                          "The total amount of buffer memory that is available (not currently used for buffering messages).",
+                          new Measurable() {
+                              public double measure(MetricConfig config, long now) {
+                                  return free.availableMemory();
+                              }
+                          });
+    }
+
+    /**
+     * Add a record to the accumulator.
+     * <p>
+     * This method will block if sufficient memory isn't available for the record unless blocking has been disabled.
+     * 
+     * @param tp The topic/partition to which this record is being sent
+     * @param key The key for the record
+     * @param value The value for the record
+     * @param compression The compression codec for the record
+     * @param callback The user-supplied callback to execute when the request is complete
+     */
+    public RecordSend append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException {
+        if (closed)
+            throw new IllegalStateException("Cannot send after the producer is closed.");
+        // check if we have an in-progress batch
+        Deque<RecordBatch> dq = dequeFor(tp);
+        synchronized (dq) {
+            RecordBatch batch = dq.peekLast();
+            if (batch != null) {
+                RecordSend send = batch.tryAppend(key, value, compression, callback);
+                if (send != null)
+                    return send;
+            }
+        }
+
+        // we don't have an in-progress record batch try to allocate a new batch
+        int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
+        ByteBuffer buffer = free.allocate(size);
+        synchronized (dq) {
+            RecordBatch first = dq.peekLast();
+            if (first != null) {
+                RecordSend send = first.tryAppend(key, value, compression, callback);
+                if (send != null) {
+                    // somebody else found us a batch, return the one we waited for!
+                    // Hopefully this doesn't happen
+                    // often...
+                    free.deallocate(buffer);
+                    return send;
+                }
+            }
+            RecordBatch batch = new RecordBatch(tp, new MemoryRecords(buffer), time.milliseconds());
+            RecordSend send = Utils.notNull(batch.tryAppend(key, value, compression, callback));
+            dq.addLast(batch);
+            return send;
+        }
+    }
+
+    /**
+     * Get a list of topic-partitions which are ready to be sent.
+     * <p>
+     * A partition is ready if ANY of the following are true:
+     * <ol>
+     * <li>The record set is full
+     * <li>The record set has sat in the accumulator for at least lingerMs milliseconds
+     * <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions are
+     * immediately considered ready).
+     * <li>The accumulator has been closed
+     * </ol>
+     */
+    public List<TopicPartition> ready(long now) {
+        List<TopicPartition> ready = new ArrayList<TopicPartition>();
+        boolean exhausted = this.free.queued() > 0;
+        for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
+            Deque<RecordBatch> deque = entry.getValue();
+            synchronized (deque) {
+                RecordBatch batch = deque.peekFirst();
+                if (batch != null) {
+                    boolean full = deque.size() > 1 || !batch.records.buffer().hasRemaining();
+                    boolean expired = now - batch.created >= lingerMs;
+                    if (full | expired | exhausted | closed)
+                        ready.add(batch.topicPartition);
+                }
+            }
+        }
+        return ready;
+    }
+
+    /**
+     * Drain all the data for the given topic-partitions that will fit within the specified size. This method attempts
+     * to avoid choosing the same topic-partitions over and over.
+     * 
+     * @param partitions The list of partitions to drain
+     * @param maxSize The maximum number of bytes to drain
+     * @return A list of {@link RecordBatch} for partitions specified with total size less than the requested maxSize.
+     *         TODO: There may be a starvation issue due to iteration order
+     */
+    public List<RecordBatch> drain(List<TopicPartition> partitions, int maxSize) {
+        if (partitions.isEmpty())
+            return Collections.emptyList();
+        int size = 0;
+        List<RecordBatch> ready = new ArrayList<RecordBatch>();
+        /* to make starvation less likely this loop doesn't start at 0 */
+        int start = drainIndex = drainIndex % partitions.size();
+        do {
+            TopicPartition tp = partitions.get(drainIndex);
+            Deque<RecordBatch> deque = dequeFor(tp);
+            if (deque != null) {
+                synchronized (deque) {
+                    if (size + deque.peekFirst().records.sizeInBytes() > maxSize) {
+                        return ready;
+                    } else {
+                        RecordBatch batch = deque.pollFirst();
+                        size += batch.records.sizeInBytes();
+                        ready.add(batch);
+                    }
+                }
+            }
+            this.drainIndex = (this.drainIndex + 1) % partitions.size();
+        } while (start != drainIndex);
+        return ready;
+    }
+
+    /**
+     * Get the deque for the given topic-partition, creating it if necessary. Since new topics will only be added rarely
+     * we copy-on-write the hashmap
+     */
+    private Deque<RecordBatch> dequeFor(TopicPartition tp) {
+        Deque<RecordBatch> d = this.batches.get(tp);
+        if (d != null)
+            return d;
+        this.batches.putIfAbsent(tp, new ArrayDeque<RecordBatch>());
+        return this.batches.get(tp);
+    }
+
+    /**
+     * Deallocate the list of record batches
+     */
+    public void deallocate(Collection<RecordBatch> batches) {
+        ByteBuffer[] buffers = new ByteBuffer[batches.size()];
+        int i = 0;
+        for (RecordBatch batch : batches) {
+            buffers[i] = batch.records.buffer();
+            i++;
+        }
+        free.deallocate(buffers);
+    }
+
+    /**
+     * Close this accumulator and force all the record buffers to be drained
+     */
+    public void close() {
+        this.closed = true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/269d16d3/clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java
new file mode 100644
index 0000000..4a536a2
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java
@@ -0,0 +1,84 @@
+package kafka.clients.producer.internals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import kafka.clients.producer.Callback;
+import kafka.clients.producer.RecordSend;
+import kafka.common.TopicPartition;
+import kafka.common.record.CompressionType;
+import kafka.common.record.MemoryRecords;
+
+/**
+ * A batch of records that is or will be sent.
+ * 
+ * This class is not thread safe and external synchronization must be used when modifying it
+ */
+public final class RecordBatch {
+    public int recordCount = 0;
+    public final long created;
+    public final MemoryRecords records;
+    public final TopicPartition topicPartition;
+    private final ProduceRequestResult produceFuture;
+    private final List<Thunk> thunks;
+
+    public RecordBatch(TopicPartition tp, MemoryRecords records, long now) {
+        this.created = now;
+        this.records = records;
+        this.topicPartition = tp;
+        this.produceFuture = new ProduceRequestResult();
+        this.thunks = new ArrayList<Thunk>();
+    }
+
+    /**
+     * Append the message to the current message set and return the relative offset within that message set
+     * 
+     * @return The RecordSend corresponding to this message or null if there isn't sufficient room.
+     */
+    public RecordSend tryAppend(byte[] key, byte[] value, CompressionType compression, Callback callback) {
+        if (!this.records.hasRoomFor(key, value)) {
+            return null;
+        } else {
+            this.records.append(0L, key, value, compression);
+            RecordSend send = new RecordSend(this.recordCount++, this.produceFuture);
+            if (callback != null)
+                thunks.add(new Thunk(callback, send));
+            return send;
+        }
+    }
+
+    /**
+     * Complete the request
+     * 
+     * @param offset The offset
+     * @param errorCode The error code or 0 if no error
+     */
+    public void done(long offset, RuntimeException exception) {
+        this.produceFuture.done(topicPartition, offset, exception);
+        // execute callbacks
+        for (int i = 0; i < this.thunks.size(); i++) {
+            try {
+                this.thunks.get(i).execute();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    /**
+     * A callback and the associated RecordSend argument to pass to it.
+     */
+    final private static class Thunk {
+        final Callback callback;
+        final RecordSend send;
+
+        public Thunk(Callback callback, RecordSend send) {
+            this.callback = callback;
+            this.send = send;
+        }
+
+        public void execute() {
+            this.callback.onCompletion(this.send);
+        }
+    }
+}
\ No newline at end of file


Mime
View raw message