kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Rename RecordBatch to ProducerBatch to free the name for KIP-98
Date Tue, 07 Mar 2017 01:31:05 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 5cf491c27 -> 81f9e1376


MINOR: Rename RecordBatch to ProducerBatch to free the name for KIP-98

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Apurva Mehta <apurva.1618@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2646 from hachikuji/rename-record-batch


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/81f9e137
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/81f9e137
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/81f9e137

Branch: refs/heads/trunk
Commit: 81f9e1376cefed57022de62cd3abf5641c46e4aa
Parents: 5cf491c
Author: Jason Gustafson <jason@confluent.io>
Authored: Tue Mar 7 01:29:56 2017 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Mar 7 01:30:01 2017 +0000

----------------------------------------------------------------------
 .../producer/internals/ProducerBatch.java       | 246 +++++++++++++++++++
 .../producer/internals/RecordAccumulator.java   | 100 ++++----
 .../clients/producer/internals/RecordBatch.java | 246 -------------------
 .../clients/producer/internals/Sender.java      |  40 +--
 .../producer/internals/ProducerBatchTest.java   |  73 ++++++
 .../internals/RecordAccumulatorTest.java        |  36 +--
 .../producer/internals/RecordBatchTest.java     |  72 ------
 7 files changed, 407 insertions(+), 406 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/81f9e137/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
new file mode 100644
index 0000000..46273a1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A batch of records that is or will be sent.
+ *
+ * This class is not thread safe and external synchronization must be used when modifying it
+ */
+public final class ProducerBatch {
+
+    private static final Logger log = LoggerFactory.getLogger(ProducerBatch.class);
+
+    final long createdMs;
+    final TopicPartition topicPartition;
+    final ProduceRequestResult produceFuture;
+
+    private final List<Thunk> thunks = new ArrayList<>();
+    private final MemoryRecordsBuilder recordsBuilder;
+
+    private volatile int attempts;
+    int recordCount;
+    int maxRecordSize;
+    private long lastAttemptMs;
+    private long lastAppendTime;
+    private long drainedMs;
+    private String expiryErrorMessage;
+    private AtomicBoolean completed;
+    private boolean retry;
+
+    public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now) {
+        this.createdMs = now;
+        this.lastAttemptMs = now;
+        this.recordsBuilder = recordsBuilder;
+        this.topicPartition = tp;
+        this.lastAppendTime = createdMs;
+        this.produceFuture = new ProduceRequestResult(topicPartition);
+        this.completed = new AtomicBoolean();
+    }
+
+    /**
+     * Append the record to the current record set and return the relative offset within that record set
+     *
+     * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
+     */
+    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
+        if (!recordsBuilder.hasRoomFor(key, value)) {
+            return null;
+        } else {
+            long checksum = this.recordsBuilder.append(timestamp, key, value);
+            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
+            this.lastAppendTime = now;
+            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
+                                                                   timestamp, checksum,
+                                                                   key == null ? -1 : key.length,
+                                                                   value == null ? -1 : value.length);
+            if (callback != null)
+                thunks.add(new Thunk(callback, future));
+            this.recordCount++;
+            return future;
+        }
+    }
+
+    /**
+     * Complete the request.
+     *
+     * @param baseOffset The base offset of the messages assigned by the server
+     * @param logAppendTime The log append time or -1 if CreateTime is being used
+     * @param exception The exception that occurred (or null if the request was successful)
+     */
+    public void done(long baseOffset, long logAppendTime, RuntimeException exception) {
+        log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.",
+                  topicPartition, baseOffset, exception);
+
+        if (completed.getAndSet(true))
+            throw new IllegalStateException("Batch has already been completed");
+
+        // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
+        produceFuture.set(baseOffset, logAppendTime, exception);
+
+        // execute callbacks
+        for (Thunk thunk : thunks) {
+            try {
+                if (exception == null) {
+                    RecordMetadata metadata = thunk.future.value();
+                    thunk.callback.onCompletion(metadata, null);
+                } else {
+                    thunk.callback.onCompletion(null, exception);
+                }
+            } catch (Exception e) {
+                log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
+            }
+        }
+
+        produceFuture.done();
+    }
+
+    /**
+     * A callback and the associated FutureRecordMetadata argument to pass to it.
+     */
+    final private static class Thunk {
+        final Callback callback;
+        final FutureRecordMetadata future;
+
+        public Thunk(Callback callback, FutureRecordMetadata future) {
+            this.callback = callback;
+            this.future = future;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "ProducerBatch(topicPartition=" + topicPartition + ", recordCount=" + recordCount + ")";
+    }
+
+    /**
+     * A batch whose metadata is not available should be expired if one of the following is true:
+     * <ol>
+     *     <li> the batch is not in retry AND request timeout has elapsed after it is ready (full or linger.ms has reached).
+     *     <li> the batch is in retry AND request timeout has elapsed after the backoff period ended.
+     * </ol>
+     * This methods closes this batch and sets {@code expiryErrorMessage} if the batch has timed out.
+     * {@link #expirationDone()} must be invoked to complete the produce future and invoke callbacks.
+     */
+    public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
+        if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
+            expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed since last append";
+        else if (!this.inRetry() && requestTimeoutMs < (createdTimeMs(now) - lingerMs))
+            expiryErrorMessage = (createdTimeMs(now) - lingerMs) + " ms has passed since batch creation plus linger time";
+        else if (this.inRetry() && requestTimeoutMs < (waitedTimeMs(now) - retryBackoffMs))
+            expiryErrorMessage = (waitedTimeMs(now) - retryBackoffMs) + " ms has passed since last attempt plus backoff time";
+
+        boolean expired = expiryErrorMessage != null;
+        if (expired)
+            close();
+        return expired;
+    }
+
+    /**
+     * Completes the produce future with timeout exception and invokes callbacks.
+     * This method should be invoked only if {@link #maybeExpire(int, long, long, long, boolean)}
+     * returned true.
+     */
+    void expirationDone() {
+        if (expiryErrorMessage == null)
+            throw new IllegalStateException("Batch has not expired");
+        this.done(-1L, Record.NO_TIMESTAMP,
+                  new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + ": " + expiryErrorMessage));
+    }
+
+    int attempts() {
+        return attempts;
+    }
+
+    void reenqueued(long now) {
+        attempts++;
+        lastAttemptMs = Math.max(lastAppendTime, now);
+        lastAppendTime = Math.max(lastAppendTime, now);
+        retry = true;
+    }
+
+    long queueTimeMs() {
+        return drainedMs - createdMs;
+    }
+
+    long createdTimeMs(long nowMs) {
+        return Math.max(0, nowMs - createdMs);
+    }
+
+    long waitedTimeMs(long nowMs) {
+        return Math.max(0, nowMs - lastAttemptMs);
+    }
+
+    void drained(long nowMs) {
+        this.drainedMs = Math.max(drainedMs, nowMs);
+    }
+
+    /**
+     * Returns if the batch is been retried for sending to kafka
+     */
+    private boolean inRetry() {
+        return this.retry;
+    }
+
+    public MemoryRecords records() {
+        return recordsBuilder.build();
+    }
+
+    public int sizeInBytes() {
+        return recordsBuilder.sizeInBytes();
+    }
+
+    public double compressionRate() {
+        return recordsBuilder.compressionRate();
+    }
+
+    public boolean isFull() {
+        return recordsBuilder.isFull();
+    }
+
+    public void close() {
+        recordsBuilder.close();
+    }
+
+    public ByteBuffer buffer() {
+        return recordsBuilder.buffer();
+    }
+
+    public int initialCapacity() {
+        return recordsBuilder.initialCapacity();
+    }
+
+    public boolean isWritable() {
+        return !recordsBuilder.isClosed();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/81f9e137/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 3306820..5d95f53 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -73,8 +73,8 @@ public final class RecordAccumulator {
     private final long retryBackoffMs;
     private final BufferPool free;
     private final Time time;
-    private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
-    private final IncompleteRecordBatches incomplete;
+    private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
+    private final IncompleteBatches incomplete;
     // The following variables are only accessed by the sender thread, so we don't need to protect them.
     private final Set<TopicPartition> muted;
     private int drainIndex;
@@ -111,7 +111,7 @@ public final class RecordAccumulator {
         this.batches = new CopyOnWriteMap<>();
         String metricGrpName = "producer-metrics";
         this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName);
-        this.incomplete = new IncompleteRecordBatches();
+        this.incomplete = new IncompleteBatches();
         this.muted = new HashSet<>();
         this.time = time;
         registerMetrics(metrics, metricGrpName);
@@ -172,7 +172,7 @@ public final class RecordAccumulator {
         ByteBuffer buffer = null;
         try {
             // check if we have an in-progress batch
-            Deque<RecordBatch> dq = getOrCreateDeque(tp);
+            Deque<ProducerBatch> dq = getOrCreateDeque(tp);
             synchronized (dq) {
                 if (closed)
                     throw new IllegalStateException("Cannot send after the producer is closed.");
@@ -197,7 +197,7 @@ public final class RecordAccumulator {
                 }
 
                 MemoryRecordsBuilder recordsBuilder = MemoryRecords.builder(buffer, compression, TimestampType.CREATE_TIME, this.batchSize);
-                RecordBatch batch = new RecordBatch(tp, recordsBuilder, time.milliseconds());
+                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
                 FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
 
                 dq.addLast(batch);
@@ -216,11 +216,11 @@ public final class RecordAccumulator {
     }
 
     /**
-     * If `RecordBatch.tryAppend` fails (i.e. the record batch is full), close its memory records to release temporary
+     * If `ProducerBatch.tryAppend` fails (i.e. the record batch is full), close its memory records to release temporary
      * resources (like compression streams buffers).
      */
-    private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) {
-        RecordBatch last = deque.peekLast();
+    private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<ProducerBatch> deque) {
+        ProducerBatch last = deque.peekLast();
         if (last != null) {
             FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
             if (future == null)
@@ -235,11 +235,11 @@ public final class RecordAccumulator {
      * Abort the batches that have been sitting in RecordAccumulator for more than the configured requestTimeout
      * due to metadata being unavailable
      */
-    public List<RecordBatch> abortExpiredBatches(int requestTimeout, long now) {
-        List<RecordBatch> expiredBatches = new ArrayList<>();
+    public List<ProducerBatch> abortExpiredBatches(int requestTimeout, long now) {
+        List<ProducerBatch> expiredBatches = new ArrayList<>();
         int count = 0;
-        for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
-            Deque<RecordBatch> dq = entry.getValue();
+        for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
+            Deque<ProducerBatch> dq = entry.getValue();
             TopicPartition tp = entry.getKey();
             // We only check if the batch should be expired if the partition does not have a batch in flight.
             // This is to prevent later batches from being expired while an earlier batch is still in progress.
@@ -248,10 +248,10 @@ public final class RecordAccumulator {
             if (!muted.contains(tp)) {
                 synchronized (dq) {
                     // iterate over the batches and expire them if they have been in the accumulator for more than requestTimeOut
-                    RecordBatch lastBatch = dq.peekLast();
-                    Iterator<RecordBatch> batchIterator = dq.iterator();
+                    ProducerBatch lastBatch = dq.peekLast();
+                    Iterator<ProducerBatch> batchIterator = dq.iterator();
                     while (batchIterator.hasNext()) {
-                        RecordBatch batch = batchIterator.next();
+                        ProducerBatch batch = batchIterator.next();
                         boolean isFull = batch != lastBatch || batch.isFull();
                         // Check if the batch has expired. Expired batches are closed by maybeExpire, but callbacks
                         // are invoked after completing the iterations, since sends invoked from callbacks
@@ -271,7 +271,7 @@ public final class RecordAccumulator {
         }
         if (!expiredBatches.isEmpty()) {
             log.trace("Expired {} batches in accumulator", count);
-            for (RecordBatch batch : expiredBatches) {
+            for (ProducerBatch batch : expiredBatches) {
                 batch.expirationDone();
                 deallocate(batch);
             }
@@ -283,9 +283,9 @@ public final class RecordAccumulator {
     /**
      * Re-enqueue the given record batch in the accumulator to retry
      */
-    public void reenqueue(RecordBatch batch, long now) {
+    public void reenqueue(ProducerBatch batch, long now) {
         batch.reenqueued(now);
-        Deque<RecordBatch> deque = getOrCreateDeque(batch.topicPartition);
+        Deque<ProducerBatch> deque = getOrCreateDeque(batch.topicPartition);
         synchronized (deque) {
             deque.addFirst(batch);
         }
@@ -318,9 +318,9 @@ public final class RecordAccumulator {
         Set<String> unknownLeaderTopics = new HashSet<>();
 
         boolean exhausted = this.free.queued() > 0;
-        for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
+        for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
             TopicPartition part = entry.getKey();
-            Deque<RecordBatch> deque = entry.getValue();
+            Deque<ProducerBatch> deque = entry.getValue();
 
             Node leader = cluster.leaderFor(part);
             synchronized (deque) {
@@ -329,7 +329,7 @@ public final class RecordAccumulator {
                     // Note that entries are currently not removed from batches when deque is empty.
                     unknownLeaderTopics.add(part.topic());
                 } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
-                    RecordBatch batch = deque.peekFirst();
+                    ProducerBatch batch = deque.peekFirst();
                     if (batch != null) {
                         long waitedTimeMs = batch.waitedTimeMs(nowMs);
                         boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
@@ -358,8 +358,8 @@ public final class RecordAccumulator {
      * @return Whether there is any unsent record in the accumulator.
      */
     public boolean hasUnsent() {
-        for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
-            Deque<RecordBatch> deque = entry.getValue();
+        for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
+            Deque<ProducerBatch> deque = entry.getValue();
             synchronized (deque) {
                 if (!deque.isEmpty())
                     return true;
@@ -376,20 +376,20 @@ public final class RecordAccumulator {
      * @param nodes The list of node to drain
      * @param maxSize The maximum number of bytes to drain
      * @param now The current unix time in milliseconds
-     * @return A list of {@link RecordBatch} for each node specified with total size less than the requested maxSize.
+     * @return A list of {@link ProducerBatch} for each node specified with total size less than the requested maxSize.
      */
-    public Map<Integer, List<RecordBatch>> drain(Cluster cluster,
-                                                 Set<Node> nodes,
-                                                 int maxSize,
-                                                 long now) {
+    public Map<Integer, List<ProducerBatch>> drain(Cluster cluster,
+                                                   Set<Node> nodes,
+                                                   int maxSize,
+                                                   long now) {
         if (nodes.isEmpty())
             return Collections.emptyMap();
 
-        Map<Integer, List<RecordBatch>> batches = new HashMap<>();
+        Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
         for (Node node : nodes) {
             int size = 0;
             List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
-            List<RecordBatch> ready = new ArrayList<>();
+            List<ProducerBatch> ready = new ArrayList<>();
             /* to make starvation less likely this loop doesn't start at 0 */
             int start = drainIndex = drainIndex % parts.size();
             do {
@@ -397,10 +397,10 @@ public final class RecordAccumulator {
                 TopicPartition tp = new TopicPartition(part.topic(), part.partition());
                 // Only proceed if the partition has no in-flight batches.
                 if (!muted.contains(tp)) {
-                    Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
+                    Deque<ProducerBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
                     if (deque != null) {
                         synchronized (deque) {
-                            RecordBatch first = deque.peekFirst();
+                            ProducerBatch first = deque.peekFirst();
                             if (first != null) {
                                 boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
                                 // Only drain the batch if it is not during backoff period.
@@ -411,7 +411,7 @@ public final class RecordAccumulator {
                                         // request
                                         break;
                                     } else {
-                                        RecordBatch batch = deque.pollFirst();
+                                        ProducerBatch batch = deque.pollFirst();
                                         batch.close();
                                         size += batch.sizeInBytes();
                                         ready.add(batch);
@@ -429,19 +429,19 @@ public final class RecordAccumulator {
         return batches;
     }
 
-    private Deque<RecordBatch> getDeque(TopicPartition tp) {
+    private Deque<ProducerBatch> getDeque(TopicPartition tp) {
         return batches.get(tp);
     }
 
     /**
      * Get the deque for the given topic-partition, creating it if necessary.
      */
-    private Deque<RecordBatch> getOrCreateDeque(TopicPartition tp) {
-        Deque<RecordBatch> d = this.batches.get(tp);
+    private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
+        Deque<ProducerBatch> d = this.batches.get(tp);
         if (d != null)
             return d;
         d = new ArrayDeque<>();
-        Deque<RecordBatch> previous = this.batches.putIfAbsent(tp, d);
+        Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d);
         if (previous == null)
             return d;
         else
@@ -451,7 +451,7 @@ public final class RecordAccumulator {
     /**
      * Deallocate the record batch
      */
-    public void deallocate(RecordBatch batch) {
+    public void deallocate(ProducerBatch batch) {
         incomplete.remove(batch);
         free.deallocate(batch.buffer(), batch.initialCapacity());
     }
@@ -466,7 +466,7 @@ public final class RecordAccumulator {
     }
 
     /* Visible for testing */
-    Map<TopicPartition, Deque<RecordBatch>> batches() {
+    Map<TopicPartition, Deque<ProducerBatch>> batches() {
         return Collections.unmodifiableMap(batches);
     }
 
@@ -489,7 +489,7 @@ public final class RecordAccumulator {
      */
     public void awaitFlushCompletion() throws InterruptedException {
         try {
-            for (RecordBatch batch : this.incomplete.all())
+            for (ProducerBatch batch : this.incomplete.all())
                 batch.produceFuture.await();
         } finally {
             this.flushesInProgress.decrementAndGet();
@@ -519,8 +519,8 @@ public final class RecordAccumulator {
      * Go through incomplete batches and abort them.
      */
     private void abortBatches() {
-        for (RecordBatch batch : incomplete.all()) {
-            Deque<RecordBatch> dq = getDeque(batch.topicPartition);
+        for (ProducerBatch batch : incomplete.all()) {
+            Deque<ProducerBatch> dq = getDeque(batch.topicPartition);
             // Close the batch before aborting
             synchronized (dq) {
                 batch.close();
@@ -577,22 +577,22 @@ public final class RecordAccumulator {
     }
 
     /*
-     * A threadsafe helper class to hold RecordBatches that haven't been ack'd yet
+     * A threadsafe helper class to hold batches that haven't been ack'd yet
      */
-    private final static class IncompleteRecordBatches {
-        private final Set<RecordBatch> incomplete;
+    private final static class IncompleteBatches {
+        private final Set<ProducerBatch> incomplete;
 
-        public IncompleteRecordBatches() {
-            this.incomplete = new HashSet<RecordBatch>();
+        public IncompleteBatches() {
+            this.incomplete = new HashSet<>();
         }
 
-        public void add(RecordBatch batch) {
+        public void add(ProducerBatch batch) {
             synchronized (incomplete) {
                 this.incomplete.add(batch);
             }
         }
 
-        public void remove(RecordBatch batch) {
+        public void remove(ProducerBatch batch) {
             synchronized (incomplete) {
                 boolean removed = this.incomplete.remove(batch);
                 if (!removed)
@@ -600,7 +600,7 @@ public final class RecordAccumulator {
             }
         }
 
-        public Iterable<RecordBatch> all() {
+        public Iterable<ProducerBatch> all() {
             synchronized (incomplete) {
                 return new ArrayList<>(this.incomplete);
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/81f9e137/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
deleted file mode 100644
index 8dacaf5..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.producer.internals;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.MemoryRecordsBuilder;
-import org.apache.kafka.common.record.Record;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * A batch of records that is or will be sent.
- *
- * This class is not thread safe and external synchronization must be used when modifying it
- */
-public final class RecordBatch {
-
-    private static final Logger log = LoggerFactory.getLogger(RecordBatch.class);
-
-    final long createdMs;
-    final TopicPartition topicPartition;
-    final ProduceRequestResult produceFuture;
-
-    private final List<Thunk> thunks = new ArrayList<>();
-    private final MemoryRecordsBuilder recordsBuilder;
-
-    private volatile int attempts;
-    int recordCount;
-    int maxRecordSize;
-    private long lastAttemptMs;
-    private long lastAppendTime;
-    private long drainedMs;
-    private String expiryErrorMessage;
-    private AtomicBoolean completed;
-    private boolean retry;
-
-    public RecordBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now) {
-        this.createdMs = now;
-        this.lastAttemptMs = now;
-        this.recordsBuilder = recordsBuilder;
-        this.topicPartition = tp;
-        this.lastAppendTime = createdMs;
-        this.produceFuture = new ProduceRequestResult(topicPartition);
-        this.completed = new AtomicBoolean();
-    }
-
-    /**
-     * Append the record to the current record set and return the relative offset within that record set
-     *
-     * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
-     */
-    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
-        if (!recordsBuilder.hasRoomFor(key, value)) {
-            return null;
-        } else {
-            long checksum = this.recordsBuilder.append(timestamp, key, value);
-            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
-            this.lastAppendTime = now;
-            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
-                                                                   timestamp, checksum,
-                                                                   key == null ? -1 : key.length,
-                                                                   value == null ? -1 : value.length);
-            if (callback != null)
-                thunks.add(new Thunk(callback, future));
-            this.recordCount++;
-            return future;
-        }
-    }
-
-    /**
-     * Complete the request.
-     *
-     * @param baseOffset The base offset of the messages assigned by the server
-     * @param logAppendTime The log append time or -1 if CreateTime is being used
-     * @param exception The exception that occurred (or null if the request was successful)
-     */
-    public void done(long baseOffset, long logAppendTime, RuntimeException exception) {
-        log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.",
-                  topicPartition, baseOffset, exception);
-
-        if (completed.getAndSet(true))
-            throw new IllegalStateException("Batch has already been completed");
-
-        // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
-        produceFuture.set(baseOffset, logAppendTime, exception);
-
-        // execute callbacks
-        for (Thunk thunk : thunks) {
-            try {
-                if (exception == null) {
-                    RecordMetadata metadata = thunk.future.value();
-                    thunk.callback.onCompletion(metadata, null);
-                } else {
-                    thunk.callback.onCompletion(null, exception);
-                }
-            } catch (Exception e) {
-                log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
-            }
-        }
-
-        produceFuture.done();
-    }
-
-    /**
-     * A callback and the associated FutureRecordMetadata argument to pass to it.
-     */
-    final private static class Thunk {
-        final Callback callback;
-        final FutureRecordMetadata future;
-
-        public Thunk(Callback callback, FutureRecordMetadata future) {
-            this.callback = callback;
-            this.future = future;
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "RecordBatch(topicPartition=" + topicPartition + ", recordCount=" + recordCount + ")";
-    }
-
-    /**
-     * A batch whose metadata is not available should be expired if one of the following is true:
-     * <ol>
-     *     <li> the batch is not in retry AND request timeout has elapsed after it is ready (full or linger.ms has reached).
-     *     <li> the batch is in retry AND request timeout has elapsed after the backoff period ended.
-     * </ol>
-     * This methods closes this batch and sets {@code expiryErrorMessage} if the batch has timed out.
-     * {@link #expirationDone()} must be invoked to complete the produce future and invoke callbacks.
-     */
-    public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
-        if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
-            expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed since last append";
-        else if (!this.inRetry() && requestTimeoutMs < (createdTimeMs(now) - lingerMs))
-            expiryErrorMessage = (createdTimeMs(now) - lingerMs) + " ms has passed since batch creation plus linger time";
-        else if (this.inRetry() && requestTimeoutMs < (waitedTimeMs(now) - retryBackoffMs))
-            expiryErrorMessage = (waitedTimeMs(now) - retryBackoffMs) + " ms has passed since last attempt plus backoff time";
-
-        boolean expired = expiryErrorMessage != null;
-        if (expired)
-            close();
-        return expired;
-    }
-
-    /**
-     * Completes the produce future with timeout exception and invokes callbacks.
-     * This method should be invoked only if {@link #maybeExpire(int, long, long, long, boolean)}
-     * returned true.
-     */
-    void expirationDone() {
-        if (expiryErrorMessage == null)
-            throw new IllegalStateException("Batch has not expired");
-        this.done(-1L, Record.NO_TIMESTAMP,
-                  new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + ": " + expiryErrorMessage));
-    }
-
-    int attempts() {
-        return attempts;
-    }
-
-    void reenqueued(long now) {
-        attempts++;
-        lastAttemptMs = Math.max(lastAppendTime, now);
-        lastAppendTime = Math.max(lastAppendTime, now);
-        retry = true;
-    }
-
-    long queueTimeMs() {
-        return drainedMs - createdMs;
-    }
-
-    long createdTimeMs(long nowMs) {
-        return Math.max(0, nowMs - createdMs);
-    }
-
-    long waitedTimeMs(long nowMs) {
-        return Math.max(0, nowMs - lastAttemptMs);
-    }
-
-    void drained(long nowMs) {
-        this.drainedMs = Math.max(drainedMs, nowMs);
-    }
-
-    /**
-     * Returns if the batch is been retried for sending to kafka
-     */
-    private boolean inRetry() {
-        return this.retry;
-    }
-
-    public MemoryRecords records() {
-        return recordsBuilder.build();
-    }
-
-    public int sizeInBytes() {
-        return recordsBuilder.sizeInBytes();
-    }
-
-    public double compressionRate() {
-        return recordsBuilder.compressionRate();
-    }
-
-    public boolean isFull() {
-        return recordsBuilder.isFull();
-    }
-
-    public void close() {
-        recordsBuilder.close();
-    }
-
-    public ByteBuffer buffer() {
-        return recordsBuilder.buffer();
-    }
-
-    public int initialCapacity() {
-        return recordsBuilder.initialCapacity();
-    }
-
-    public boolean isWritable() {
-        return !recordsBuilder.isClosed();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/81f9e137/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 7f27d36..3604f68 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -192,21 +192,21 @@ public class Sender implements Runnable {
         }
 
         // create produce requests
-        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
+        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster,
                                                                          result.readyNodes,
                                                                          this.maxRequestSize,
                                                                          now);
         if (guaranteeMessageOrder) {
             // Mute all the partitions drained
-            for (List<RecordBatch> batchList : batches.values()) {
-                for (RecordBatch batch : batchList)
+            for (List<ProducerBatch> batchList : batches.values()) {
+                for (ProducerBatch batch : batchList)
                     this.accumulator.mutePartition(batch.topicPartition);
             }
         }
 
-        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
+        List<ProducerBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
         // update sensors
-        for (RecordBatch expiredBatch : expiredBatches)
+        for (ProducerBatch expiredBatch : expiredBatches)
             this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
 
         sensors.updateProduceRequestMetrics(batches);
@@ -251,16 +251,16 @@ public class Sender implements Runnable {
     /**
      * Handle a produce response
      */
-    private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) {
+    private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) {
         int correlationId = response.requestHeader().correlationId();
         if (response.wasDisconnected()) {
             log.trace("Cancelled request {} due to node {} being disconnected", response, response.destination());
-            for (RecordBatch batch : batches.values())
+            for (ProducerBatch batch : batches.values())
                 completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now);
         } else if (response.versionMismatch() != null) {
             log.warn("Cancelled request {} due to a version mismatch with node {}",
                     response, response.destination(), response.versionMismatch());
-            for (RecordBatch batch : batches.values())
+            for (ProducerBatch batch : batches.values())
                 completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.INVALID_REQUEST), correlationId, now);
         } else {
             log.trace("Received produce response from node {} with correlation id {}", response.destination(), correlationId);
@@ -270,14 +270,14 @@ public class Sender implements Runnable {
                 for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
                     TopicPartition tp = entry.getKey();
                     ProduceResponse.PartitionResponse partResp = entry.getValue();
-                    RecordBatch batch = batches.get(tp);
+                    ProducerBatch batch = batches.get(tp);
                     completeBatch(batch, partResp, correlationId, now);
                 }
                 this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
                 this.sensors.recordThrottleTime(produceResponse.getThrottleTime());
             } else {
                 // this is the acks = 0 case, just complete all requests
-                for (RecordBatch batch : batches.values()) {
+                for (ProducerBatch batch : batches.values()) {
                     completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now);
                 }
             }
@@ -292,7 +292,7 @@ public class Sender implements Runnable {
      * @param correlationId The correlation id for the request
      * @param now The current POSIX timestamp in milliseconds
      */
-    private void completeBatch(RecordBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
+    private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
                                long now) {
         Errors error = response.error;
         if (error != Errors.NONE && canRetry(batch, error)) {
@@ -331,25 +331,25 @@ public class Sender implements Runnable {
     /**
      * We can retry a send if the error is transient and the number of attempts taken is fewer than the maximum allowed
      */
-    private boolean canRetry(RecordBatch batch, Errors error) {
+    private boolean canRetry(ProducerBatch batch, Errors error) {
         return batch.attempts() < this.retries && error.exception() instanceof RetriableException;
     }
 
     /**
      * Transfer the record batches into a list of produce requests on a per-node basis
      */
-    private void sendProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {
-        for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
+    private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
+        for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
             sendProduceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue());
     }
 
     /**
      * Create a produce request from the given record batches
      */
-    private void sendProduceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
+    private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
         Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
-        final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<>(batches.size());
-        for (RecordBatch batch : batches) {
+        final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
+        for (ProducerBatch batch : batches) {
             TopicPartition tp = batch.topicPartition;
             produceRecordsByPartition.put(tp, batch.records());
             recordsByPartition.put(tp, batch);
@@ -493,11 +493,11 @@ public class Sender implements Runnable {
             }
         }
 
-        public void updateProduceRequestMetrics(Map<Integer, List<RecordBatch>> batches) {
+        public void updateProduceRequestMetrics(Map<Integer, List<ProducerBatch>> batches) {
             long now = time.milliseconds();
-            for (List<RecordBatch> nodeBatch : batches.values()) {
+            for (List<ProducerBatch> nodeBatch : batches.values()) {
                 int records = 0;
-                for (RecordBatch batch : nodeBatch) {
+                for (ProducerBatch batch : nodeBatch) {
                     // register all per-topic metrics at once
                     String topic = batch.topicPartition.topic();
                     maybeRegisterTopicMetrics(topic);

http://git-wip-us.apache.org/repos/asf/kafka/blob/81f9e137/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
new file mode 100644
index 0000000..3258ba3
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.TimestampType;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertFalse;
+
+public class ProducerBatchTest {
+
+    private final long now = 1488748346917L;
+
+    private final MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(ByteBuffer.allocate(0),
+            Record.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME, 0L, Record.NO_TIMESTAMP, 0);
+
+    /**
+     * A {@link ProducerBatch} configured using a very large linger value and a timestamp preceding its create
+     * time is interpreted correctly as not expired when the linger time is larger than the difference
+     * between now and create time by {@link ProducerBatch#maybeExpire(int, long, long, long, boolean)}.
+     */
+    @Test
+    public void testLargeLingerOldNowExpire() {
+        ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
+        // Set `now` to 2ms before the create time.
+        assertFalse(batch.maybeExpire(10240, 100L, now - 2L, Long.MAX_VALUE, false));
+    }
+
+    /**
+     * A {@link ProducerBatch} configured using a very large retryBackoff value with retry = true and a timestamp
+     * preceding its create time is interpreted correctly as not expired when the retryBackoff time is larger than the
+     * difference between now and create time by {@link ProducerBatch#maybeExpire(int, long, long, long, boolean)}.
+     */
+    @Test
+    public void testLargeRetryBackoffOldNowExpire() {
+        ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
+        // Set batch.retry = true
+        batch.reenqueued(now);
+        // Set `now` to 2ms before the create time.
+        assertFalse(batch.maybeExpire(10240, Long.MAX_VALUE, now - 2L, 10240L, false));
+    }
+
+    /**
+     * A {@link ProducerBatch#maybeExpire(int, long, long, long, boolean)} call with a now value before the create
+     * time of the ProducerBatch is correctly recognized as not expired when invoked with parameter isFull = true.
+     */
+    @Test
+    public void testLargeFullOldNowExpire() {
+        ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
+        // Set `now` to 2ms before the create time.
+        assertFalse(batch.maybeExpire(10240, 10240L, now - 2L, 10240L, true));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/81f9e137/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 1cb510e..42dc4c4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -89,7 +89,7 @@ public class RecordAccumulatorTest {
         for (int i = 0; i < appends; i++) {
             // append to the first batch
             accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
-            Deque<RecordBatch> partitionBatches = accum.batches().get(tp1);
+            Deque<ProducerBatch> partitionBatches = accum.batches().get(tp1);
             assertEquals(1, partitionBatches.size());
             assertTrue(partitionBatches.peekFirst().isWritable());
             assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
@@ -97,16 +97,16 @@ public class RecordAccumulatorTest {
 
         // this append doesn't fit in the first batch, so a new batch is created and the first batch is closed
         accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
-        Deque<RecordBatch> partitionBatches = accum.batches().get(tp1);
+        Deque<ProducerBatch> partitionBatches = accum.batches().get(tp1);
         assertEquals(2, partitionBatches.size());
-        Iterator<RecordBatch> partitionBatchesIterator = partitionBatches.iterator();
+        Iterator<ProducerBatch> partitionBatchesIterator = partitionBatches.iterator();
         assertFalse(partitionBatchesIterator.next().isWritable());
         assertTrue(partitionBatchesIterator.next().isWritable());
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
 
-        List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
+        List<ProducerBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
         assertEquals(1, batches.size());
-        RecordBatch batch = batches.get(0);
+        ProducerBatch batch = batches.get(0);
 
         Iterator<LogEntry> iter = batch.records().deepEntries().iterator();
         for (int i = 0; i < appends; i++) {
@@ -133,9 +133,9 @@ public class RecordAccumulatorTest {
         assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
         time.sleep(10);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
-        List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
+        List<ProducerBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
         assertEquals(1, batches.size());
-        RecordBatch batch = batches.get(0);
+        ProducerBatch batch = batches.get(0);
 
         Iterator<LogEntry> iter = batch.records().deepEntries().iterator();
         LogEntry entry = iter.next();
@@ -155,7 +155,7 @@ public class RecordAccumulatorTest {
         }
         assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
 
-        List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), 1024, 0).get(node1.id());
+        List<ProducerBatch> batches = accum.drain(cluster, Collections.singleton(node1), 1024, 0).get(node1.id());
         assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size());
     }
 
@@ -186,9 +186,9 @@ public class RecordAccumulatorTest {
         long now = time.milliseconds();
         while (read < numThreads * msgs) {
             Set<Node> nodes = accum.ready(cluster, now).readyNodes;
-            List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id());
+            List<ProducerBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id());
             if (batches != null) {
-                for (RecordBatch batch : batches) {
+                for (ProducerBatch batch : batches) {
                     for (LogEntry entry : batch.records().deepEntries())
                         read++;
                     accum.deallocate(batch);
@@ -245,7 +245,7 @@ public class RecordAccumulatorTest {
         accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, now + lingerMs + 1);
         assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);
-        Map<Integer, List<RecordBatch>> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1);
+        Map<Integer, List<ProducerBatch>> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1);
         assertEquals("Node1 should be the only ready node.", 1, batches.size());
         assertEquals("Partition 0 should only have one batch drained.", 1, batches.get(0).size());
 
@@ -286,9 +286,9 @@ public class RecordAccumulatorTest {
         result = accum.ready(cluster, time.milliseconds());
         
         // drain and deallocate all batches
-        Map<Integer, List<RecordBatch>> results = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
-        for (List<RecordBatch> batches: results.values())
-            for (RecordBatch batch: batches)
+        Map<Integer, List<ProducerBatch>> results = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+        for (List<ProducerBatch> batches: results.values())
+            for (ProducerBatch batch: batches)
                 accum.deallocate(batch);
         
         // should be complete with no unsent records.
@@ -368,7 +368,7 @@ public class RecordAccumulatorTest {
         // Advance the clock to expire the batch.
         time.sleep(requestTimeout + 1);
         accum.mutePartition(tp1);
-        List<RecordBatch> expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds());
+        List<ProducerBatch> expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds());
         assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size());
 
         accum.unmutePartition(tp1);
@@ -396,7 +396,7 @@ public class RecordAccumulatorTest {
         time.sleep(lingerMs);
         readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes);
-        Map<Integer, List<RecordBatch>> drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE, time.milliseconds());
+        Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE, time.milliseconds());
         assertEquals("There should be only one batch.", drained.get(node1.id()).size(), 1);
         time.sleep(1000L);
         accum.reenqueue(drained.get(node1.id()).get(0), time.milliseconds());
@@ -450,7 +450,7 @@ public class RecordAccumulatorTest {
 
         // Advance the clock to expire the first batch.
         time.sleep(requestTimeout + 1);
-        List<RecordBatch> expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds());
+        List<ProducerBatch> expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds());
         assertEquals("The batch was not expired", 1, expiredBatches.size());
         assertEquals("Callbacks not invoked for expiry", messagesPerBatch, expiryCallbackCount.get());
         assertNull("Unexpected exception", unexpectedException.get());
@@ -481,7 +481,7 @@ public class RecordAccumulatorTest {
 
         // Test drain with muted partition
         accum.mutePartition(tp1);
-        Map<Integer, List<RecordBatch>> drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+        Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
         assertEquals("No batch should have been drained", 0, drained.get(node1.id()).size());
 
         // Test drain without muted partition.

http://git-wip-us.apache.org/repos/asf/kafka/blob/81f9e137/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordBatchTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordBatchTest.java
deleted file mode 100644
index 6404451..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordBatchTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.producer.internals;
-
-import java.nio.ByteBuffer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.MemoryRecordsBuilder;
-import org.apache.kafka.common.record.Record;
-import org.apache.kafka.common.record.TimestampType;
-import org.junit.Test;
-
-import static org.junit.Assert.assertFalse;
-
-public class RecordBatchTest {
-
-    private final long now = 1488748346917L;
-
-    private final MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(ByteBuffer.allocate(0),
-            Record.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME, 0L, Record.NO_TIMESTAMP, 0);
-
-    /**
-     * A RecordBatch configured using a very large linger value and a timestamp preceding its create
-     * time is interpreted correctly as not expired when the linger time is larger than the difference
-     * between now and create time by RecordBatch#maybeExpire.
-     */
-    @Test
-    public void testLargeLingerOldNowExpire() {
-        RecordBatch batch = new RecordBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
-        // Set `now` to 2ms before the create time.
-        assertFalse(batch.maybeExpire(10240, 100L, now - 2L, Long.MAX_VALUE, false));
-    }
-
-    /**
-     * A RecordBatch configured using a very large retryBackoff value with retry = true and a timestamp preceding its
-     * create time is interpreted correctly as not expired when the retryBackoff time is larger than the difference
-     * between now and create time by RecordBatch#maybeExpire.
-     */
-    @Test
-    public void testLargeRetryBackoffOldNowExpire() {
-        RecordBatch batch = new RecordBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
-        // Set batch.retry = true
-        batch.reenqueued(now);
-        // Set `now` to 2ms before the create time.
-        assertFalse(batch.maybeExpire(10240, Long.MAX_VALUE, now - 2L, 10240L, false));
-    }
-
-    /**
-     * A RecordBatch#maybeExpire call with a now value before the create time of the RecordBatch is correctly recognized
-     * as not expired when invoked with parameter isFull = true.
-     */
-    @Test
-    public void testLargeFullOldNowExpire() {
-        RecordBatch batch = new RecordBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
-        // Set `now` to 2ms before the create time.
-        assertFalse(batch.maybeExpire(10240, 10240L, now - 2L, 10240L, true));
-    }
-}


Mime
View raw message