kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [2/2] kafka git commit: MINOR: Improvements to Record related classes (part 1)
Date Wed, 04 Jan 2017 03:53:41 GMT
MINOR: Improvements to Record related classes (part 1)

Jason recently cleaned things up significantly by consolidating the Message/Record classes
into the common Java code in the clients module. While reviewing that, I noticed a few things
that could be improved a little more. To make reviewing easier, there will be multiple PRs.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Ewen Cheslack-Postava <me@ewencp.org>, Jason Gustafson <jason@confluent.io>

Closes #2271 from ijuma/records-minor-fixes


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6d6c77a7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6d6c77a7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6d6c77a7

Branch: refs/heads/trunk
Commit: 6d6c77a7a9c102f7508e4bc48e0d6eba1fcbc9c6
Parents: ce1cb32
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Tue Jan 3 19:53:20 2017 -0800
Committer: Jason Gustafson <jason@confluent.io>
Committed: Tue Jan 3 19:53:20 2017 -0800

----------------------------------------------------------------------
 .../apache/kafka/clients/MetadataUpdater.java   |   4 +-
 .../kafka/clients/producer/ProducerConfig.java  |   4 +-
 .../org/apache/kafka/common/Configurable.java   |   2 +-
 .../kafka/common/network/ByteBufferSend.java    |   7 +-
 .../apache/kafka/common/network/MultiSend.java  |  40 ++---
 .../kafka/common/network/NetworkSend.java       |  23 ++-
 .../apache/kafka/common/network/Receive.java    |   6 +-
 .../org/apache/kafka/common/network/Send.java   |   8 +-
 .../kafka/common/network/TransportLayers.java   |  34 ++++
 .../common/record/ByteBufferInputStream.java    |   6 +-
 .../common/record/ByteBufferLogInputStream.java |  12 +-
 .../apache/kafka/common/record/FileRecords.java |  23 +--
 .../kafka/common/record/MemoryRecords.java      |  27 +++-
 .../common/record/MemoryRecordsBuilder.java     |  17 +-
 .../org/apache/kafka/common/record/Records.java |   4 +-
 .../kafka/common/record/RecordsIterator.java    |  10 +-
 .../kafka/common/requests/FetchRequest.java     |  14 +-
 .../kafka/common/requests/FetchResponse.java    |  42 +++--
 .../kafka/common/requests/RecordsSend.java      |  15 +-
 .../authenticator/SaslServerAuthenticator.java  |   2 +-
 .../org/apache/kafka/common/utils/Utils.java    |   5 +-
 .../clients/consumer/KafkaConsumerTest.java     |   3 +-
 .../clients/consumer/internals/FetcherTest.java |   5 +-
 .../apache/kafka/common/record/RecordTest.java  |  14 +-
 .../common/requests/RequestResponseTest.java    |  12 +-
 .../main/scala/kafka/admin/AdminClient.scala    |  12 +-
 core/src/main/scala/kafka/api/ApiUtils.scala    |   7 -
 .../src/main/scala/kafka/api/FetchRequest.scala |  14 +-
 .../main/scala/kafka/api/FetchResponse.scala    | 138 ----------------
 .../scala/kafka/api/OffsetCommitRequest.scala   |   2 +-
 .../main/scala/kafka/cluster/Partition.scala    | 113 +++++--------
 core/src/main/scala/kafka/cluster/Replica.scala |  75 ++++-----
 .../scala/kafka/common/TopicAndPartition.scala  |   4 +-
 .../controller/ControllerChannelManager.scala   |   2 +-
 .../coordinator/GroupMetadataManager.scala      |   2 +-
 core/src/main/scala/kafka/log/LogCleaner.scala  |   9 +-
 core/src/main/scala/kafka/log/LogSegment.scala  |   2 +-
 .../src/main/scala/kafka/log/LogValidator.scala | 158 ++++++++++---------
 core/src/main/scala/kafka/message/Message.scala |  16 +-
 .../kafka/server/AbstractFetcherThread.scala    |   2 +-
 .../kafka/server/DelayedOperationKey.scala      |   2 -
 .../main/scala/kafka/server/KafkaServer.scala   |   2 +-
 .../scala/kafka/server/LogOffsetMetadata.scala  |   2 +-
 .../kafka/server/ReplicaFetcherThread.scala     |   4 +-
 .../scala/kafka/server/ReplicaManager.scala     |  33 ++--
 core/src/main/scala/kafka/utils/CoreUtils.scala |  22 ---
 .../kafka/utils/NetworkClientBlockingOps.scala  |   2 +-
 core/src/main/scala/kafka/utils/Pool.scala      |  47 ++++--
 .../api/GroupCoordinatorIntegrationTest.scala   |  11 +-
 .../kafka/api/IntegrationTestHarness.scala      |   2 -
 .../kafka/message/BaseMessageSetTestCases.scala |  12 +-
 .../unit/kafka/server/ISRExpirationTest.scala   |  36 +++--
 .../unit/kafka/server/SimpleFetchTest.scala     |   1 -
 .../InternalTopicIntegrationTest.java           |   4 +-
 54 files changed, 458 insertions(+), 617 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
index 37c77bb..34bdbf6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
@@ -41,8 +41,8 @@ interface MetadataUpdater {
      * Starts a cluster metadata update if needed and possible. Returns the time until the metadata update (which would
      * be 0 if an update has been started as a result of this call).
      *
-     * If the implementation relies on `NetworkClient` to send requests, the completed receive will be passed to
-     * `handleCompletedMetadataResponse`.
+     * If the implementation relies on `NetworkClient` to send requests, `handleCompletedMetadataResponse` will be
+     * invoked after the metadata response is received.
      *
      * The semantics of `needed` and `possible` are implementation-dependent and may take into account a number of
      * factors like node availability, how long since the last metadata update, etc.

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 39446f5..73bf1c8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -211,8 +211,8 @@ public class ProducerConfig extends AbstractConfig {
     /** <code>request.timeout.ms</code> */
     public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
     private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC
-                                                        + " request.timeout.ms should be larger than replica.lag.time.max.ms, a broker side configuration,"
-                                                        + " to reduce message duplication caused by unnecessary producer retry.";
+                                                        + " This should be larger than replica.lag.time.max.ms (a broker configuration)"
+                                                        + " to reduce the possibility of message duplication due to unnecessary producer retries.";
 
     /** <code>interceptor.classes</code> */
     public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/main/java/org/apache/kafka/common/Configurable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Configurable.java b/clients/src/main/java/org/apache/kafka/common/Configurable.java
index 8d0edc0..98774cc 100644
--- a/clients/src/main/java/org/apache/kafka/common/Configurable.java
+++ b/clients/src/main/java/org/apache/kafka/common/Configurable.java
@@ -26,6 +26,6 @@ public interface Configurable {
     /**
      * Configure this class with the given key-value pairs
      */
-    public void configure(Map<String, ?> configs);
+    void configure(Map<String, ?> configs);
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
index 3683283..a7ba6bf 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
@@ -57,12 +57,7 @@ public class ByteBufferSend implements Send {
         if (written < 0)
             throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
         remaining -= written;
-        // This is temporary workaround. As Send , Receive interfaces are being used by BlockingChannel.
-        // Once BlockingChannel is removed we can make Send, Receive to work with transportLayer rather than
-        // GatheringByteChannel or ScatteringByteChannel.
-        if (channel instanceof TransportLayer)
-            pending = ((TransportLayer) channel).hasPendingWrites();
-
+        pending = TransportLayers.hasPendingWrites(channel);
         return written;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java b/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
index 11f5e07..0285fe3 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
@@ -32,19 +32,22 @@ import java.util.List;
 public class MultiSend implements Send {
 
     private static final Logger log = LoggerFactory.getLogger(MultiSend.class);
-    private String dest;
+
+    private final String dest;
+    private final Iterator<Send> sendsIterator;
+    private final long size;
+
     private long totalWritten = 0;
-    private Iterator<Send> sendsIterator;
     private Send current;
-    private boolean doneSends = false;
-    private long size = 0;
 
     public MultiSend(String dest, List<Send> sends) {
         this.dest = dest;
         this.sendsIterator = sends.iterator();
         nextSendOrDone();
-        for (Send send: sends)
-            this.size += send.size();
+        long size = 0;
+        for (Send send : sends)
+            size += send.size();
+        this.size = size;
     }
 
     @Override
@@ -59,40 +62,39 @@ public class MultiSend implements Send {
 
     @Override
     public boolean completed() {
-        if (doneSends) {
-            if (totalWritten != size)
-                log.error("mismatch in sending bytes over socket; expected: " + size + " actual: " + totalWritten);
-            return true;
-        } else {
-            return false;
-        }
+        return current == null;
     }
 
     @Override
     public long writeTo(GatheringByteChannel channel) throws IOException {
         if (completed())
-            throw new KafkaException("This operation cannot be completed on a complete request.");
+            throw new KafkaException("This operation cannot be invoked on a complete request.");
 
         int totalWrittenPerCall = 0;
         boolean sendComplete;
         do {
             long written = current.writeTo(channel);
-            totalWritten += written;
             totalWrittenPerCall += written;
             sendComplete = current.completed();
             if (sendComplete)
                 nextSendOrDone();
         } while (!completed() && sendComplete);
-        if (log.isTraceEnabled())
-            log.trace("Bytes written as part of multisend call : " + totalWrittenPerCall +  "Total bytes written so far : " + totalWritten + "Expected bytes to write : " + size);
+
+        totalWritten += totalWrittenPerCall;
+
+        if (completed() && totalWritten != size)
+            log.error("mismatch in sending bytes over socket; expected: " + size + " actual: " + totalWritten);
+
+        log.trace("Bytes written as part of multi-send call: {}, total bytes written so far: {}, expected bytes to write: {}",
+                totalWrittenPerCall, totalWritten, size);
+
         return totalWrittenPerCall;
     }
 
-    // update current if there's a next Send, mark sends as done if there isn't
     private void nextSendOrDone() {
         if (sendsIterator.hasNext())
             current = sendsIterator.next();
         else
-            doneSends = true;
+            current = null;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
index 5e4bf2c..4201579 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java
@@ -23,20 +23,19 @@ import java.nio.ByteBuffer;
  */
 public class NetworkSend extends ByteBufferSend {
 
-    public NetworkSend(String destination, ByteBuffer... buffers) {
-        super(destination, sizeDelimit(buffers));
+    public NetworkSend(String destination, ByteBuffer buffer) {
+        super(destination, sizeDelimit(buffer));
     }
 
-    private static ByteBuffer[] sizeDelimit(ByteBuffer[] buffers) {
-        int size = 0;
-        for (ByteBuffer buffer : buffers)
-            size += buffer.remaining();
-        ByteBuffer[] delimited = new ByteBuffer[buffers.length + 1];
-        delimited[0] = ByteBuffer.allocate(4);
-        delimited[0].putInt(size);
-        delimited[0].rewind();
-        System.arraycopy(buffers, 0, delimited, 1, buffers.length);
-        return delimited;
+    private static ByteBuffer[] sizeDelimit(ByteBuffer buffer) {
+        return new ByteBuffer[] {sizeBuffer(buffer.remaining()), buffer};
+    }
+
+    private static ByteBuffer sizeBuffer(int size) {
+        ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
+        sizeBuffer.putInt(size);
+        sizeBuffer.rewind();
+        return sizeBuffer;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/main/java/org/apache/kafka/common/network/Receive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Receive.java b/clients/src/main/java/org/apache/kafka/common/network/Receive.java
index 4b14431..57599f7 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Receive.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Receive.java
@@ -27,12 +27,12 @@ public interface Receive {
     /**
      * The numeric id of the source from which we are receiving data.
      */
-    public String source();
+    String source();
 
     /**
      * Are we done receiving data?
      */
-    public boolean complete();
+    boolean complete();
 
     /**
      * Read bytes into this receive from the given channel
@@ -40,6 +40,6 @@ public interface Receive {
      * @return The number of bytes read
      * @throws IOException If the reading fails
      */
-    public long readFrom(ScatteringByteChannel channel) throws IOException;
+    long readFrom(ScatteringByteChannel channel) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/main/java/org/apache/kafka/common/network/Send.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Send.java b/clients/src/main/java/org/apache/kafka/common/network/Send.java
index e0d8831..628fb29 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Send.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Send.java
@@ -23,12 +23,12 @@ public interface Send {
     /**
      * The numeric id for the destination of this send
      */
-    public String destination();
+    String destination();
 
     /**
      * Is this send complete?
      */
-    public boolean completed();
+    boolean completed();
 
     /**
      * Write some as-yet unwritten bytes from this send to the provided channel. It may take multiple calls for the send
@@ -37,11 +37,11 @@ public interface Send {
      * @return The number of bytes written
      * @throws IOException If the write fails
      */
-    public long writeTo(GatheringByteChannel channel) throws IOException;
+    long writeTo(GatheringByteChannel channel) throws IOException;
 
     /**
      * Size of the send
      */
-    public long size();
+    long size();
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/main/java/org/apache/kafka/common/network/TransportLayers.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/TransportLayers.java b/clients/src/main/java/org/apache/kafka/common/network/TransportLayers.java
new file mode 100644
index 0000000..b77eae8
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/TransportLayers.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.network;
+
+import java.nio.channels.GatheringByteChannel;
+
+public final class TransportLayers {
+
+    private TransportLayers() {}
+
+    // This is temporary workaround as Send and Receive interfaces are used by BlockingChannel.
+    // Once BlockingChannel is removed we can make Send and Receive work with TransportLayer rather than
+    // GatheringByteChannel or ScatteringByteChannel.
+    public static boolean hasPendingWrites(GatheringByteChannel channel) {
+        if (channel instanceof TransportLayer)
+            return ((TransportLayer) channel).hasPendingWrites();
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
index b25f949..37e4766 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
@@ -23,14 +23,14 @@ import java.nio.ByteBuffer;
 /**
  * A byte buffer backed input inputStream
  */
-public class ByteBufferInputStream extends DataInputStream {
+public final class ByteBufferInputStream extends DataInputStream {
 
     public ByteBufferInputStream(ByteBuffer buffer) {
         super(new UnderlyingInputStream(buffer));
     }
 
-    private static class UnderlyingInputStream extends InputStream {
-        private ByteBuffer buffer;
+    private static final class UnderlyingInputStream extends InputStream {
+        private final ByteBuffer buffer;
 
         public UnderlyingInputStream(ByteBuffer buffer) {
             this.buffer = buffer;

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
index ae0c91b..ee7d308 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java
@@ -92,20 +92,18 @@ class ByteBufferLogInputStream implements LogInputStream<ByteBufferLogInputStrea
             // We don't need to recompute crc if the timestamp is not updated.
             if (record.timestampType() == TimestampType.CREATE_TIME && currentTimestamp == timestamp)
                 return;
-
-            byte attributes = record.attributes();
-            buffer.put(LOG_OVERHEAD + Record.ATTRIBUTES_OFFSET, TimestampType.CREATE_TIME.updateAttributes(attributes));
-            buffer.putLong(LOG_OVERHEAD + Record.TIMESTAMP_OFFSET, timestamp);
-            long crc = record.computeChecksum();
-            Utils.writeUnsignedInt(buffer, LOG_OVERHEAD + Record.CRC_OFFSET, crc);
+            setTimestampAndUpdateCrc(TimestampType.CREATE_TIME, timestamp);
         }
 
         public void setLogAppendTime(long timestamp) {
             if (record.magic() == Record.MAGIC_VALUE_V0)
                 throw new IllegalArgumentException("Cannot set timestamp for a record with magic = 0");
+            setTimestampAndUpdateCrc(TimestampType.LOG_APPEND_TIME, timestamp);
+        }
 
+        private void setTimestampAndUpdateCrc(TimestampType timestampType, long timestamp) {
             byte attributes = record.attributes();
-            buffer.put(LOG_OVERHEAD + Record.ATTRIBUTES_OFFSET, TimestampType.LOG_APPEND_TIME.updateAttributes(attributes));
+            buffer.put(LOG_OVERHEAD + Record.ATTRIBUTES_OFFSET, timestampType.updateAttributes(attributes));
             buffer.putLong(LOG_OVERHEAD + Record.TIMESTAMP_OFFSET, timestamp);
             long crc = record.computeChecksum();
             Utils.writeUnsignedInt(buffer, LOG_OVERHEAD + Record.CRC_OFFSET, crc);

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 050711d..8a33dca 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -38,11 +38,8 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public class FileRecords extends AbstractRecords implements Closeable {
     private final boolean isSlice;
-    private final FileChannel channel;
     private final int start;
     private final int end;
-    private volatile File file;
-    private final AtomicInteger size;
 
     private final Iterable<FileChannelLogEntry> shallowEntries;
 
@@ -53,6 +50,15 @@ public class FileRecords extends AbstractRecords implements Closeable {
         }
     };
 
+    // mutable state
+    private final AtomicInteger size;
+    private final FileChannel channel;
+    private volatile File file;
+
+    /**
+     * The {@code FileRecords.open} methods should be used instead of this constructor whenever possible.
+     * The constructor is visible for tests.
+     */
     public FileRecords(File file,
                        FileChannel channel,
                        int start,
@@ -65,14 +71,8 @@ public class FileRecords extends AbstractRecords implements Closeable {
         this.isSlice = isSlice;
         this.size = new AtomicInteger();
 
-        // set the initial size of the buffer
-        resize();
-
-        shallowEntries = shallowEntriesFrom(start);
-    }
-
-    public void resize() throws IOException {
         if (isSlice) {
+            // don't check the file size if this is just a slice view
             size.set(end - start);
         } else {
             int limit = Math.min((int) channel.size(), end);
@@ -82,6 +82,8 @@ public class FileRecords extends AbstractRecords implements Closeable {
             // set the file position to the last byte in the file
             channel.position(limit);
         }
+
+        shallowEntries = shallowEntriesFrom(start);
     }
 
     @Override
@@ -137,6 +139,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
             throw new IllegalArgumentException("Invalid size: " + size);
 
         final int end;
+        // handle integer overflow
         if (this.start + position + size < 0)
             end = sizeInBytes();
         else

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 65d91c6..6c31b25 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -56,8 +56,14 @@ public class MemoryRecords extends AbstractRecords {
 
     @Override
     public long writeTo(GatheringByteChannel channel, long position, int length) throws IOException {
-        ByteBuffer dup = buffer.duplicate();
+        if (position > Integer.MAX_VALUE)
+            throw new IllegalArgumentException("position should not be greater than Integer.MAX_VALUE: " + position);
+        if (position + length > buffer.limit())
+            throw new IllegalArgumentException("position+length should not be greater than buffer.limit(), position: "
+                    + position + ", length: " + length + ", buffer.limit(): " + buffer.limit());
+
         int pos = (int) position;
+        ByteBuffer dup = buffer.duplicate();
         dup.position(pos);
         dup.limit(pos + length);
         return channel.write(dup);
@@ -98,10 +104,15 @@ public class MemoryRecords extends AbstractRecords {
     /**
      * Filter the records into the provided ByteBuffer.
      * @param filter The filter function
-     * @param buffer The byte buffer to write the filtered records to
+     * @param destinationBuffer The byte buffer to write the filtered records to
      * @return A FilterResult with a summary of the output (for metrics)
      */
-    public FilterResult filterTo(LogEntryFilter filter, ByteBuffer buffer) {
+    public FilterResult filterTo(LogEntryFilter filter, ByteBuffer destinationBuffer) {
+        return filterTo(shallowEntries(), filter, destinationBuffer);
+    }
+
+    private static FilterResult filterTo(Iterable<ByteBufferLogEntry> fromShallowEntries, LogEntryFilter filter,
+                                       ByteBuffer destinationBuffer) {
         long maxTimestamp = Record.NO_TIMESTAMP;
         long maxOffset = -1L;
         long shallowOffsetOfMaxTimestamp = -1L;
@@ -110,7 +121,7 @@ public class MemoryRecords extends AbstractRecords {
         int messagesRetained = 0;
         int bytesRetained = 0;
 
-        for (ByteBufferLogEntry shallowEntry : shallowEntries()) {
+        for (ByteBufferLogEntry shallowEntry : fromShallowEntries) {
             bytesRead += shallowEntry.sizeInBytes();
 
             // We use the absolute offset to decide whether to retain the message or not (this is handled by the
@@ -144,7 +155,7 @@ public class MemoryRecords extends AbstractRecords {
 
             if (writeOriginalEntry) {
                 // There are no messages compacted out and no message format conversion, write the original message set back
-                shallowEntry.writeTo(buffer);
+                shallowEntry.writeTo(destinationBuffer);
                 messagesRetained += retainedEntries.size();
                 bytesRetained += shallowEntry.sizeInBytes();
 
@@ -153,11 +164,11 @@ public class MemoryRecords extends AbstractRecords {
                     shallowOffsetOfMaxTimestamp = shallowEntry.offset();
                 }
             } else if (!retainedEntries.isEmpty()) {
-                ByteBuffer slice = buffer.slice();
+                ByteBuffer slice = destinationBuffer.slice();
                 MemoryRecordsBuilder builder = builderWithEntries(slice, shallowRecord.timestampType(), shallowRecord.compressionType(),
                         shallowRecord.timestamp(), retainedEntries);
                 MemoryRecords records = builder.build();
-                buffer.position(buffer.position() + slice.position());
+                destinationBuffer.position(destinationBuffer.position() + slice.position());
                 messagesRetained += retainedEntries.size();
                 bytesRetained += records.sizeInBytes();
 
@@ -388,7 +399,7 @@ public class MemoryRecords extends AbstractRecords {
                                                            long logAppendTime,
                                                            List<LogEntry> entries) {
         if (entries.isEmpty())
-            throw new IllegalArgumentException();
+            throw new IllegalArgumentException("entries must not be empty");
 
         LogEntry firstEntry = entries.iterator().next();
         long firstOffset = firstEntry.offset();

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index d60861b..69e9003 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -96,14 +96,15 @@ public class MemoryRecordsBuilder {
     private final int writeLimit;
     private final int initialCapacity;
 
-    private MemoryRecords builtRecords;
-    private long writtenUncompressed;
-    private long numRecords;
-    private float compressionRate;
-    private long maxTimestamp;
-    private long offsetOfMaxTimestamp;
+    private long writtenUncompressed = 0;
+    private long numRecords = 0;
+    private float compressionRate = 1;
+    private long maxTimestamp = Record.NO_TIMESTAMP;
+    private long offsetOfMaxTimestamp = -1;
     private long lastOffset = -1;
 
+    private MemoryRecords builtRecords;
+
     public MemoryRecordsBuilder(ByteBuffer buffer,
                                 byte magic,
                                 CompressionType compressionType,
@@ -117,10 +118,6 @@ public class MemoryRecordsBuilder {
         this.baseOffset = baseOffset;
         this.logAppendTime = logAppendTime;
         this.initPos = buffer.position();
-        this.numRecords = 0;
-        this.writtenUncompressed = 0;
-        this.compressionRate = 1;
-        this.maxTimestamp = Record.NO_TIMESTAMP;
         this.writeLimit = writeLimit;
         this.initialCapacity = buffer.capacity();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/main/java/org/apache/kafka/common/record/Records.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java
index f0dbf9e..9235f92 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Records.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java
@@ -44,11 +44,11 @@ public interface Records {
     int sizeInBytes();
 
     /**
-     * Write the contents of this buffer to a channel.
+     * Attempts to write the contents of this buffer to a channel.
      * @param channel The channel to write to
      * @param position The position in the buffer to write from
      * @param length The number of bytes to write
-     * @return The number of bytes written
+     * @return The number of bytes actually written
      * @throws IOException For any IO errors
      */
     long writeTo(GatheringByteChannel channel, long position, int length) throws IOException;

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
index 4a678d5..792a857 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
@@ -34,18 +34,18 @@ import java.util.Iterator;
 public class RecordsIterator extends AbstractIterator<LogEntry> {
     private final boolean shallow;
     private final boolean ensureMatchingMagic;
-    private final int masRecordSize;
+    private final int maxRecordSize;
     private final ShallowRecordsIterator<?> shallowIter;
     private DeepRecordsIterator innerIter;
 
     public RecordsIterator(LogInputStream<?> logInputStream,
                            boolean shallow,
                            boolean ensureMatchingMagic,
-                           int masRecordSize) {
+                           int maxRecordSize) {
         this.shallowIter = new ShallowRecordsIterator<>(logInputStream);
         this.shallow = shallow;
         this.ensureMatchingMagic = ensureMatchingMagic;
-        this.masRecordSize = masRecordSize;
+        this.maxRecordSize = maxRecordSize;
     }
 
     /**
@@ -76,7 +76,7 @@ public class RecordsIterator extends AbstractIterator<LogEntry> {
                 // would not try to further decompress underlying messages
                 // There will be at least one element in the inner iterator, so we don't
                 // need to call hasNext() here.
-                innerIter = new DeepRecordsIterator(entry, ensureMatchingMagic, masRecordSize);
+                innerIter = new DeepRecordsIterator(entry, ensureMatchingMagic, maxRecordSize);
                 return innerIter.next();
             }
         } else {
@@ -88,7 +88,7 @@ public class RecordsIterator extends AbstractIterator<LogEntry> {
         return innerIter == null || !innerIter.hasNext();
     }
 
-    private static class DataLogInputStream implements LogInputStream<LogEntry> {
+    private static final class DataLogInputStream implements LogInputStream<LogEntry> {
         private final DataInputStream stream;
         protected final int maxMessageSize;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index fd4c747..b5770d4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -182,7 +182,7 @@ public class FetchRequest extends AbstractRequest {
 
     @Override
     public AbstractResponse getErrorResponse(int versionId, Throwable e) {
-        Map<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>();
+        LinkedHashMap<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>();
 
         for (Map.Entry<TopicPartition, PartitionData> entry: fetchData.entrySet()) {
             FetchResponse.PartitionData partitionResponse = new FetchResponse.PartitionData(Errors.forException(e).code(),
@@ -191,17 +191,7 @@ public class FetchRequest extends AbstractRequest {
             responseData.put(entry.getKey(), partitionResponse);
         }
 
-        switch (versionId) {
-            case 0:
-                return new FetchResponse(responseData);
-            case 1:
-            case 2:
-            case 3:
-                return new FetchResponse(responseData, 0);
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.FETCH.id)));
-        }
+        return new FetchResponse(versionId, responseData, 0);
     }
 
     public int replicaId() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index ec2ab47..66b01a0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -85,35 +85,29 @@ public class FetchResponse extends AbstractResponse {
     }
 
     /**
-     * Constructor for Version 0
-     * @param responseData fetched data grouped by topic-partition
-     */
-    public FetchResponse(Map<TopicPartition, PartitionData> responseData) {
-        this(0, new LinkedHashMap<>(responseData), DEFAULT_THROTTLE_TIME);
-    }
-
-    /**
-     * Constructor for Version 1 and 2
+     * Constructor for version 3.
+     *
+     * The entries in `responseData` should be in the same order as the entries in `FetchRequest.fetchData`.
+     *
      * @param responseData fetched data grouped by topic-partition
      * @param throttleTime Time in milliseconds the response was throttled
      */
-    public FetchResponse(Map<TopicPartition, PartitionData> responseData, int throttleTime) {
-        // the schema for versions 1 and 2 is the same, so we pick 2 here
-        this(2, new LinkedHashMap<>(responseData), throttleTime);
+    public FetchResponse(LinkedHashMap<TopicPartition, PartitionData> responseData, int throttleTime) {
+        this(3, responseData, throttleTime);
     }
 
     /**
-     * Constructor for Version 3
+     * Constructor for all versions.
+     *
+     * From version 3, the entries in `responseData` should be in the same order as the entries in
+     * `FetchRequest.fetchData`.
+     *
      * @param responseData fetched data grouped by topic-partition
      * @param throttleTime Time in milliseconds the response was throttled
      */
-    public FetchResponse(LinkedHashMap<TopicPartition, PartitionData> responseData, int throttleTime) {
-        this(3, responseData, throttleTime);
-    }
-
     public FetchResponse(int version, LinkedHashMap<TopicPartition, PartitionData> responseData, int throttleTime) {
-        super(new Struct(ProtoUtils.responseSchema(ApiKeys.FETCH.id, version)));
-        writeStruct(struct, version, responseData, throttleTime);
+        super(writeStruct(new Struct(ProtoUtils.responseSchema(ApiKeys.FETCH.id, version)), version, responseData,
+                throttleTime));
         this.responseData = responseData;
         this.throttleTime = throttleTime;
     }
@@ -222,10 +216,10 @@ public class FetchResponse extends AbstractResponse {
         sends.add(new RecordsSend(dest, records));
     }
 
-    private static void writeStruct(Struct struct,
-                                    int version,
-                                    LinkedHashMap<TopicPartition, PartitionData> responseData,
-                                    int throttleTime) {
+    private static Struct writeStruct(Struct struct,
+                                      int version,
+                                      LinkedHashMap<TopicPartition, PartitionData> responseData,
+                                      int throttleTime) {
         List<FetchRequest.TopicAndPartitionData<PartitionData>> topicsData = FetchRequest.TopicAndPartitionData.batchByTopic(responseData);
         List<Struct> topicArray = new ArrayList<>();
         for (FetchRequest.TopicAndPartitionData<PartitionData> topicEntry: topicsData) {
@@ -250,6 +244,8 @@ public class FetchResponse extends AbstractResponse {
 
         if (version >= 1)
             struct.set(THROTTLE_TIME_KEY_NAME, throttleTime);
+
+        return struct;
     }
 
     public static int sizeOf(int version, LinkedHashMap<TopicPartition, PartitionData> responseData) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/main/java/org/apache/kafka/common/requests/RecordsSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RecordsSend.java b/clients/src/main/java/org/apache/kafka/common/requests/RecordsSend.java
index 1c3bb0d..f1b6b25 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RecordsSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RecordsSend.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.network.Send;
-import org.apache.kafka.common.network.TransportLayer;
+import org.apache.kafka.common.network.TransportLayers;
 import org.apache.kafka.common.record.Records;
 
 import java.io.EOFException;
@@ -52,21 +52,18 @@ public class RecordsSend implements Send {
     @Override
     public long writeTo(GatheringByteChannel channel) throws IOException {
         long written = 0;
+
         if (remaining > 0) {
             written = records.writeTo(channel, size() - remaining, remaining);
             if (written < 0)
                 throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
+            remaining -= written;
         }
 
-        if (channel instanceof TransportLayer) {
-            TransportLayer tl = (TransportLayer) channel;
-            pending = tl.hasPendingWrites();
-
-            if (remaining <= 0 && pending)
-                channel.write(EMPTY_BYTE_BUFFER);
-        }
+        pending = TransportLayers.hasPendingWrites(channel);
+        if (remaining <= 0 && pending)
+            channel.write(EMPTY_BYTE_BUFFER);
 
-        remaining -= written;
         return written;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 223e798..27744ad 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -320,7 +320,7 @@ public class SaslServerAuthenticator implements Authenticator {
             }
         } catch (SchemaException | IllegalArgumentException e) {
             if (saslState == SaslState.GSSAPI_OR_HANDSHAKE_REQUEST) {
-                // SchemaException is thrown if the request is not in Kafka format. IIlegalArgumentException is thrown
+                // SchemaException is thrown if the request is not in Kafka format. IllegalArgumentException is thrown
                 // if the API key is invalid. For compatibility with 0.9.0.x where the first packet is a GSSAPI token
                 // starting with 0x60, revert to GSSAPI for both these exceptions.
                 if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index c5e6716..a502b32 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -606,7 +606,6 @@ public class Utils {
         return Arrays.asList(elems);
     }
 
-
     /*
      * Create a string from a collection
      * @param coll the collection
@@ -779,9 +778,7 @@ public class Utils {
      * @param size The number of bytes to include
      */
     public static long computeChecksum(ByteBuffer buffer, int start, int size) {
-        Crc32 crc = new Crc32();
-        crc.update(buffer.array(), buffer.arrayOffset() + start, size);
-        return crc.getValue();
+        return Crc32.crc32(buffer.array(), buffer.arrayOffset() + start, size);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 8240d05..861dbf9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -70,6 +70,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -1320,7 +1321,7 @@ public class KafkaConsumerTest {
     }
 
     private FetchResponse fetchResponse(Map<TopicPartition, FetchInfo> fetches) {
-        Map<TopicPartition, PartitionData> tpResponses = new HashMap<>();
+        LinkedHashMap<TopicPartition, PartitionData> tpResponses = new LinkedHashMap<>();
         for (Map.Entry<TopicPartition, FetchInfo> fetchEntry : fetches.entrySet()) {
             TopicPartition partition = fetchEntry.getKey();
             long fetchOffset = fetchEntry.getValue().offset;

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 0095697..706caf7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -63,6 +63,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -753,7 +754,9 @@ public class FetcherTest {
     }
 
     private FetchResponse fetchResponse(MemoryRecords records, short error, long hw, int throttleTime) {
-        return new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, records)), throttleTime);
+        return new FetchResponse(
+                new LinkedHashMap<>(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, records))),
+                throttleTime);
     }
 
     private MetadataResponse newMetadataResponse(String topic, Errors error) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
index 551d820..f785c72 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
@@ -35,13 +35,13 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(value = Parameterized.class)
 public class RecordTest {
 
-    private byte magic;
-    private long timestamp;
-    private ByteBuffer key;
-    private ByteBuffer value;
-    private CompressionType compression;
-    private TimestampType timestampType;
-    private Record record;
+    private final byte magic;
+    private final long timestamp;
+    private final ByteBuffer key;
+    private final ByteBuffer value;
+    private final CompressionType compression;
+    private final TimestampType timestampType;
+    private final Record record;
 
     public RecordTest(byte magic, long timestamp, byte[] key, byte[] value, CompressionType compression) {
         this.magic = magic;

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index a5eb22a..5f4463f 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -184,13 +184,13 @@ public class RequestResponseTest {
 
     @Test
     public void fetchResponseVersionTest() {
-        Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<>();
+        LinkedHashMap<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>();
 
         MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10));
         responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, records));
 
-        FetchResponse v0Response = new FetchResponse(responseData);
-        FetchResponse v1Response = new FetchResponse(responseData, 10);
+        FetchResponse v0Response = new FetchResponse(0, responseData, 0);
+        FetchResponse v1Response = new FetchResponse(1, responseData, 10);
         assertEquals("Throttle time must be zero", 0, v0Response.getThrottleTime());
         assertEquals("Throttle time must be 10", 10, v1Response.getThrottleTime());
         assertEquals("Should use schema version 0", ProtoUtils.responseSchema(ApiKeys.FETCH.id, 0), v0Response.toStruct().schema());
@@ -279,7 +279,7 @@ public class RequestResponseTest {
     }
 
     private FetchResponse createFetchResponse() {
-        Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<>();
+        LinkedHashMap<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>();
         MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10));
         responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, records));
         return new FetchResponse(responseData, 25);
@@ -580,7 +580,9 @@ public class RequestResponseTest {
         private boolean closed = false;
 
         private ByteBufferChannel(long size) {
-            this.buf = ByteBuffer.allocate(Long.valueOf(size).intValue());
+            if (size > Integer.MAX_VALUE)
+                throw new IllegalArgumentException("size should be not be greater than Integer.MAX_VALUE");
+            this.buf = ByteBuffer.allocate((int) size);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index a7e7ebc..3845d8d 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -53,12 +53,12 @@ class AdminClient(val time: Time,
 
   private def sendAnyNode(api: ApiKeys, request: AbstractRequest): AbstractResponse = {
     bootstrapBrokers.foreach { broker =>
-        try {
-          return send(broker, api, request)
-        } catch {
-          case e: Exception =>
-            debug(s"Request $api failed against node $broker", e)
-        }
+      try {
+        return send(broker, api, request)
+      } catch {
+        case e: Exception =>
+          debug(s"Request $api failed against node $broker", e)
+      }
     }
     throw new RuntimeException(s"Request $api failed on brokers $bootstrapBrokers")
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/api/ApiUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiUtils.scala b/core/src/main/scala/kafka/api/ApiUtils.scala
index 8e8ac36..2145e8c 100644
--- a/core/src/main/scala/kafka/api/ApiUtils.scala
+++ b/core/src/main/scala/kafka/api/ApiUtils.scala
@@ -17,9 +17,7 @@
 package kafka.api
 
 import java.nio._
-import java.nio.channels.GatheringByteChannel
 import kafka.common._
-import org.apache.kafka.common.network.TransportLayer
 
 /**
  * Helper functions specific to parsing or serializing requests and responses
@@ -98,10 +96,5 @@ object ApiUtils {
       throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".")
     else value
   }
-
-  private[api] def hasPendingWrites(channel: GatheringByteChannel): Boolean = channel match {
-    case t: TransportLayer => t.hasPendingWrites
-    case _ => false
-  }
   
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 00897db..5e5360a 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -25,8 +25,12 @@ import kafka.network.RequestChannel
 import kafka.message.MessageSet
 import java.util.concurrent.atomic.AtomicInteger
 import java.nio.ByteBuffer
+import java.util
 
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests.{FetchResponse => JFetchResponse}
 
 import scala.collection.mutable.ArrayBuffer
 import scala.util.Random
@@ -196,12 +200,14 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
   }
 
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    val fetchResponsePartitionData = requestInfo.map { case (topicAndPartition, _) =>
-      (topicAndPartition, FetchResponsePartitionData(Errors.forException(e).code, -1, MessageSet.Empty))
+    val responseData = new util.LinkedHashMap[TopicPartition, JFetchResponse.PartitionData]
+    requestInfo.foreach { case (TopicAndPartition(topic, partition), _) =>
+      responseData.put(new TopicPartition(topic, partition),
+        new JFetchResponse.PartitionData(Errors.forException(e).code, -1, MemoryRecords.EMPTY))
     }
-    val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData, request.header.apiVersion)
+    val errorResponse = new JFetchResponse(versionId, responseData, 0)
     // Magic value does not matter here because the message set is empty
-    requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, errorResponse)))
+    requestChannel.sendResponse(new RequestChannel.Response(request, errorResponse))
   }
 
   override def describe(details: Boolean): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index 5e2a999..cd346cb 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -18,17 +18,13 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import java.nio.channels.GatheringByteChannel
 
 import kafka.common.TopicAndPartition
 import kafka.message.{ByteBufferMessageSet, MessageSet}
 import kafka.api.ApiUtils._
-import org.apache.kafka.common.KafkaException
-import org.apache.kafka.common.network.{MultiSend, Send}
 import org.apache.kafka.common.protocol.Errors
 
 import scala.collection._
-import scala.collection.JavaConverters._
 
 object FetchResponsePartitionData {
   def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = {
@@ -51,48 +47,6 @@ case class FetchResponsePartitionData(error: Short = Errors.NONE.code, hw: Long
   val sizeInBytes = FetchResponsePartitionData.headerSize + messages.sizeInBytes
 }
 
-// SENDS
-
-class PartitionDataSend(val partitionId: Int,
-                        val partitionData: FetchResponsePartitionData) extends Send {
-  private val emptyBuffer = ByteBuffer.allocate(0)
-  private val messageSize = partitionData.messages.sizeInBytes
-  private var messagesSentSize = 0
-  private var pending = false
-  private val buffer = ByteBuffer.allocate( 4 /** partitionId **/ + FetchResponsePartitionData.headerSize)
-  buffer.putInt(partitionId)
-  buffer.putShort(partitionData.error)
-  buffer.putLong(partitionData.hw)
-  buffer.putInt(partitionData.messages.sizeInBytes)
-  buffer.rewind()
-
-  override def completed = !buffer.hasRemaining && messagesSentSize >= messageSize && !pending
-
-  override def destination: String = ""
-
-  override def writeTo(channel: GatheringByteChannel): Long = {
-    var written = 0L
-    if (buffer.hasRemaining)
-      written += channel.write(buffer)
-    if (!buffer.hasRemaining) {
-      if (messagesSentSize < messageSize) {
-        val records = partitionData.messages.asRecords
-        val bytesSent = records.writeTo(channel, messagesSentSize, messageSize - messagesSentSize).toInt
-        messagesSentSize += bytesSent
-        written += bytesSent
-      }
-      if (messagesSentSize >= messageSize && hasPendingWrites(channel))
-        channel.write(emptyBuffer)
-    }
-
-    pending = hasPendingWrites(channel)
-
-    written
-  }
-
-  override def size = buffer.capacity() + messageSize
-}
-
 object TopicData {
   def readFrom(buffer: ByteBuffer): TopicData = {
     val topic = readShortString(buffer)
@@ -119,50 +73,6 @@ case class TopicData(topic: String, partitionData: Seq[(Int, FetchResponsePartit
   val headerSize = TopicData.headerSize(topic)
 }
 
-class TopicDataSend(val dest: String, val topicData: TopicData) extends Send {
-
-  private val emptyBuffer = ByteBuffer.allocate(0)
-
-  private var sent = 0L
-
-  private var pending = false
-
-  override def completed: Boolean = sent >= size && !pending
-
-  override def destination: String = dest
-
-  override def size = topicData.headerSize + sends.size()
-
-  private val buffer = ByteBuffer.allocate(topicData.headerSize)
-  writeShortString(buffer, topicData.topic)
-  buffer.putInt(topicData.partitionData.size)
-  buffer.rewind()
-
-  private val sends = new MultiSend(dest,
-    topicData.partitionData.map(d => new PartitionDataSend(d._1, d._2): Send).asJava)
-
-  override def writeTo(channel: GatheringByteChannel): Long = {
-    if (completed)
-      throw new KafkaException("This operation cannot be completed on a complete request.")
-
-    var written = 0L
-    if (buffer.hasRemaining)
-      written += channel.write(buffer)
-    if (!buffer.hasRemaining) {
-      if (!sends.completed)
-        written += sends.writeTo(channel)
-      if (sends.completed && hasPendingWrites(channel))
-        written += channel.write(emptyBuffer)
-    }
-
-    pending = hasPendingWrites(channel)
-
-    sent += written
-    written
-  }
-}
-
-
 object FetchResponse {
 
   // The request version is used to determine which fields we can expect in the response
@@ -260,51 +170,3 @@ case class FetchResponse(correlationId: Int,
 
   def errorCode(topic: String, partition: Int) = partitionDataFor(topic, partition).error
 }
-
-
-class FetchResponseSend(val dest: String, val fetchResponse: FetchResponse) extends Send {
-
-  private val emptyBuffer = ByteBuffer.allocate(0)
-
-  private val payloadSize = fetchResponse.sizeInBytes
-
-  private var sent = 0L
-
-  private var pending = false
-
-  override def size = 4 /* for size byte */ + payloadSize
-
-  override def completed = sent >= size && !pending
-
-  override def destination = dest
-
-  // The throttleTimeSize will be 0 if the request was made from a client sending a V0 style request
-  private val buffer = ByteBuffer.allocate(4 /* for size */ + fetchResponse.headerSizeInBytes)
-  fetchResponse.writeHeaderTo(buffer)
-  buffer.rewind()
-
-  private val sends = new MultiSend(dest, fetchResponse.dataGroupedByTopic.map {
-    case (topic, data) => new TopicDataSend(dest, TopicData(topic, data)): Send
-  }.asJava)
-
-  override def writeTo(channel: GatheringByteChannel): Long = {
-    if (completed)
-      throw new KafkaException("This operation cannot be completed on a complete request.")
-
-    var written = 0L
-
-    if (buffer.hasRemaining)
-      written += channel.write(buffer)
-    if (!buffer.hasRemaining) {
-      if (!sends.completed)
-        written += sends.writeTo(channel)
-      if (sends.completed && hasPendingWrites(channel))
-        written += channel.write(emptyBuffer)
-    }
-
-    sent += written
-    pending = hasPendingWrites(channel)
-
-    written
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index e7cd952..95663af 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -160,7 +160,7 @@ case class OffsetCommitRequest(groupId: String,
       })
     })
 
-  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+  override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     val errorCode = Errors.forException(e).code
     val commitStatus = requestInfo.mapValues(_ => errorCode)
     val commitResponse = OffsetCommitResponse(commitStatus, correlationId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 8a19f17..46815e1 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -52,7 +52,7 @@ class Partition(val topic: String,
   private val zkUtils = replicaManager.zkUtils
   private val assignedReplicaMap = new Pool[Int, Replica]
   // The read lock is only required when multiple reads are executed and needs to be in a consistent manner
-  private val leaderIsrUpdateLock = new ReentrantReadWriteLock()
+  private val leaderIsrUpdateLock = new ReentrantReadWriteLock
   private var zkVersion: Int = LeaderAndIsr.initialZKVersion
   @volatile private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
   @volatile var leaderReplicaIdOpt: Option[Int] = None
@@ -102,51 +102,33 @@ class Partition(val topic: String,
     isLeaderReplicaLocal && inSyncReplicas.size < assignedReplicas.size
 
   def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
-    val replicaOpt = getReplica(replicaId)
-    replicaOpt match {
-      case Some(replica) => replica
-      case None =>
-        if (isReplicaLocal(replicaId)) {
-          val config = LogConfig.fromProps(logManager.defaultConfig.originals,
-                                           AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic))
-          val log = logManager.createLog(topicPartition, config)
-          val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
-          val offsetMap = checkpoint.read
-          if (!offsetMap.contains(topicPartition))
-            info("No checkpointed highwatermark is found for partition [%s,%d]".format(topic, partitionId))
-          val offset = offsetMap.getOrElse(topicPartition, 0L).min(log.logEndOffset)
-          val localReplica = new Replica(replicaId, this, time, offset, Some(log))
-          addReplicaIfNotExists(localReplica)
-        } else {
-          val remoteReplica = new Replica(replicaId, this, time)
-          addReplicaIfNotExists(remoteReplica)
-        }
-        getReplica(replicaId).get
-    }
+    assignedReplicaMap.getAndMaybePut(replicaId, {
+      if (isReplicaLocal(replicaId)) {
+        val config = LogConfig.fromProps(logManager.defaultConfig.originals,
+                                         AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic))
+        val log = logManager.createLog(topicPartition, config)
+        val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
+        val offsetMap = checkpoint.read
+        if (!offsetMap.contains(topicPartition))
+          info(s"No checkpointed highwatermark is found for partition $topicPartition")
+        val offset = math.min(offsetMap.getOrElse(topicPartition, 0L), log.logEndOffset)
+        new Replica(replicaId, this, time, offset, Some(log))
+      } else new Replica(replicaId, this, time)
+    })
   }
 
   def getReplica(replicaId: Int = localBrokerId): Option[Replica] = Option(assignedReplicaMap.get(replicaId))
 
-  def leaderReplicaIfLocal(): Option[Replica] = {
-    leaderReplicaIdOpt match {
-      case Some(leaderReplicaId) =>
-        if (leaderReplicaId == localBrokerId)
-          getReplica(localBrokerId)
-        else
-          None
-      case None => None
-    }
-  }
+  def leaderReplicaIfLocal: Option[Replica] =
+    leaderReplicaIdOpt.filter(_ == localBrokerId).flatMap(getReplica)
 
-  def addReplicaIfNotExists(replica: Replica) = {
+  def addReplicaIfNotExists(replica: Replica): Replica =
     assignedReplicaMap.putIfNotExists(replica.brokerId, replica)
-  }
 
-  def assignedReplicas(): Set[Replica] = {
+  def assignedReplicas: Set[Replica] =
     assignedReplicaMap.values.toSet
-  }
 
-  def removeReplica(replicaId: Int) {
+  private def removeReplica(replicaId: Int) {
     assignedReplicaMap.remove(replicaId)
   }
 
@@ -162,14 +144,12 @@ class Partition(val topic: String,
       } catch {
         case e: IOException =>
           fatal(s"Error deleting the log for partition $topicPartition", e)
-          Runtime.getRuntime().halt(1)
+          Runtime.getRuntime.halt(1)
       }
     }
   }
 
-  def getLeaderEpoch(): Int = {
-    this.leaderEpoch
-  }
+  def getLeaderEpoch: Int = this.leaderEpoch
 
   /**
    * Make the local replica the leader by resetting LogEndOffset for remote replicas (there could be old LogEndOffset
@@ -186,7 +166,7 @@ class Partition(val topic: String,
       allReplicas.foreach(replica => getOrCreateReplica(replica))
       val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet
       // remove assigned replicas that have been removed by the controller
-      (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica)
+      (assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica)
       inSyncReplicas = newInSyncReplicas
       leaderEpoch = partitionStateInfo.leaderEpoch
       zkVersion = partitionStateInfo.zkVersion
@@ -201,7 +181,7 @@ class Partition(val topic: String,
       val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
       val curTimeMs = time.milliseconds
       // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset.
-      (assignedReplicas() - leaderReplica).foreach{replica =>
+      (assignedReplicas - leaderReplica).foreach { replica =>
         val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L
         replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs)
       }
@@ -234,7 +214,7 @@ class Partition(val topic: String,
       // add replicas that are new
       allReplicas.foreach(r => getOrCreateReplica(r))
       // remove assigned replicas that have been removed by the controller
-      (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
+      (assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica)
       inSyncReplicas = Set.empty[Replica]
       leaderEpoch = partitionStateInfo.leaderEpoch
       zkVersion = partitionStateInfo.zkVersion
@@ -268,7 +248,7 @@ class Partition(val topic: String,
           .format(localBrokerId,
                   replicaId,
                   logReadResult.info.fetchOffsetMetadata.messageOffset,
-                  assignedReplicas().map(_.brokerId).mkString(","),
+                  assignedReplicas.map(_.brokerId).mkString(","),
                   topicPartition))
     }
   }
@@ -286,7 +266,7 @@ class Partition(val topic: String,
   def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult) {
     val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
       // check if this replica needs to be added to the ISR
-      leaderReplicaIfLocal() match {
+      leaderReplicaIfLocal match {
         case Some(leaderReplica) =>
           val replica = getReplica(replicaId).get
           val leaderHW = leaderReplica.highWatermark
@@ -294,9 +274,8 @@ class Partition(val topic: String,
              assignedReplicas.map(_.brokerId).contains(replicaId) &&
              replica.logEndOffset.offsetDiff(leaderHW) >= 0) {
             val newInSyncReplicas = inSyncReplicas + replica
-            info("Expanding ISR for partition [%s,%d] from %s to %s"
-                         .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","),
-                                 newInSyncReplicas.map(_.brokerId).mkString(",")))
+            info(s"Expanding ISR for partition $topicPartition from ${inSyncReplicas.map(_.brokerId).mkString(",")} " +
+              s"to ${newInSyncReplicas.map(_.brokerId).mkString(",")}")
             // update ISR in ZK and cache
             updateIsr(newInSyncReplicas)
             replicaManager.isrExpandRate.mark()
@@ -324,7 +303,7 @@ class Partition(val topic: String,
    * produce request.
    */
   def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = {
-    leaderReplicaIfLocal() match {
+    leaderReplicaIfLocal match {
       case Some(leaderReplica) =>
         // keep the current immutable replica list reference
         val curInSyncReplicas = inSyncReplicas
@@ -380,7 +359,7 @@ class Partition(val topic: String,
    * since all callers of this private API acquire that lock
    */
   private def maybeIncrementLeaderHW(leaderReplica: Replica, curTime: Long = time.milliseconds): Boolean = {
-    val allLogEndOffsets = assignedReplicas.filter{replica =>
+    val allLogEndOffsets = assignedReplicas.filter { replica =>
       curTime - replica.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs || inSyncReplicas.contains(replica)
     }.map(_.logEndOffset)
     val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
@@ -400,14 +379,14 @@ class Partition(val topic: String,
    * Try to complete any pending requests. This should be called without holding the leaderIsrUpdateLock.
    */
   private def tryCompleteDelayedRequests() {
-    val requestKey = new TopicPartitionOperationKey(this.topic, this.partitionId)
+    val requestKey = new TopicPartitionOperationKey(topicPartition)
     replicaManager.tryCompleteDelayedFetch(requestKey)
     replicaManager.tryCompleteDelayedProduce(requestKey)
   }
 
   def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
     val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
-      leaderReplicaIfLocal() match {
+      leaderReplicaIfLocal match {
         case Some(leaderReplica) =>
           val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)
           if(outOfSyncReplicas.nonEmpty) {
@@ -457,8 +436,7 @@ class Partition(val topic: String,
 
   def appendRecordsToLeader(records: MemoryRecords, requiredAcks: Int = 0) = {
     val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
-      val leaderReplicaOpt = leaderReplicaIfLocal()
-      leaderReplicaOpt match {
+      leaderReplicaIfLocal match {
         case Some(leaderReplica) =>
           val log = leaderReplica.log.get
           val minIsr = log.config.minInSyncReplicas
@@ -466,8 +444,8 @@ class Partition(val topic: String,
 
           // Avoid writing to leader if there are not enough insync replicas to make it safe
           if (inSyncSize < minIsr && requiredAcks == -1) {
-            throw new NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]"
-              .format(topic, partitionId, inSyncSize, minIsr))
+            throw new NotEnoughReplicasException("Number of insync replicas for partition %s is [%d], below required minimum [%d]"
+              .format(topicPartition, inSyncSize, minIsr))
           }
 
           val info = log.append(records, assignOffsets = true)
@@ -477,8 +455,8 @@ class Partition(val topic: String,
           (info, maybeIncrementLeaderHW(leaderReplica))
 
         case None =>
-          throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"
-            .format(topic, partitionId, localBrokerId))
+          throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d"
+            .format(topicPartition, localBrokerId))
       }
     }
 
@@ -513,18 +491,13 @@ class Partition(val topic: String,
     removeMetric("ReplicasCount", tags)
   }
 
-  override def equals(that: Any): Boolean = {
-    if(!that.isInstanceOf[Partition])
-      return false
-    val other = that.asInstanceOf[Partition]
-    if(topic.equals(other.topic) && partitionId == other.partitionId)
-      return true
-    false
+  override def equals(that: Any): Boolean = that match {
+    case other: Partition => partitionId == other.partitionId && topic == other.topic
+    case _ => false
   }
 
-  override def hashCode(): Int = {
-    31 + topic.hashCode() + 17*partitionId
-  }
+  override def hashCode: Int =
+    31 + topic.hashCode + 17 * partitionId
 
   override def toString: String = {
     val partitionString = new StringBuilder
@@ -533,6 +506,6 @@ class Partition(val topic: String,
     partitionString.append("; Leader: " + leaderReplicaIdOpt)
     partitionString.append("; AssignedReplicas: " + assignedReplicaMap.keys.mkString(","))
     partitionString.append("; InSyncReplicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
-    partitionString.toString()
+    partitionString.toString
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 4d90815..346e5d6 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -31,46 +31,46 @@ class Replica(val brokerId: Int,
               initialHighWatermarkValue: Long = 0L,
               val log: Option[Log] = None) extends Logging {
   // the high watermark offset value, in non-leader replicas only its message offsets are kept
-  @volatile private[this] var highWatermarkMetadata: LogOffsetMetadata = new LogOffsetMetadata(initialHighWatermarkValue)
+  @volatile private[this] var highWatermarkMetadata = new LogOffsetMetadata(initialHighWatermarkValue)
   // the log end offset value, kept in all replicas;
   // for local replica it is the log's end offset, for remote replicas its value is only updated by follower fetch
-  @volatile private[this] var logEndOffsetMetadata: LogOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata
+  @volatile private[this] var logEndOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata
 
-  // The log end offset value at the time the leader receives last FetchRequest from this follower.
+  // The log end offset value at the time the leader received the last FetchRequest from this follower
   // This is used to determine the lastCaughtUpTimeMs of the follower
-  @volatile private var lastFetchLeaderLogEndOffset: Long = 0L
+  @volatile private[this] var lastFetchLeaderLogEndOffset = 0L
 
-  // The time when the leader receives last FetchRequest from this follower
+  // The time when the leader received the last FetchRequest from this follower
   // This is used to determine the lastCaughtUpTimeMs of the follower
-  @volatile private var lastFetchTimeMs: Long = 0L
+  @volatile private[this] var lastFetchTimeMs = 0L
 
-  val topic = partition.topic
-  val partitionId = partition.partitionId
+  // lastCaughtUpTimeMs is the largest time t such that the offset of most recent FetchRequest from this follower >=
+  // the LEO of leader at time t. This is used to determine the lag of this follower and ISR of this partition.
+  @volatile private[this] var _lastCaughtUpTimeMs = 0L
 
-  def isLocal: Boolean = log.isDefined
+  val topicPartition = partition.topicPartition
 
-  // lastCaughtUpTimeMs is the largest time t such that the begin offset of most recent FetchRequest from this follower >=
-  // the LEO of leader at time t. This is used to determine the lag of this follower and ISR of this partition.
-  private[this] val lastCaughtUpTimeMsUnderlying = new AtomicLong(0L)
+  def isLocal: Boolean = log.isDefined
 
-  def lastCaughtUpTimeMs = lastCaughtUpTimeMsUnderlying.get()
+  def lastCaughtUpTimeMs = _lastCaughtUpTimeMs
 
   /*
-   * If the FetchRequest reads up to the log end offset of the leader when the current fetch request was received,
-   * set the lastCaughtUpTimeMsUnderlying to the time when the current fetch request was received.
+   * If the FetchRequest reads up to the log end offset of the leader when the current fetch request is received,
+   * set `lastCaughtUpTimeMs` to the time when the current fetch request was received.
    *
    * Else if the FetchRequest reads up to the log end offset of the leader when the previous fetch request was received,
-   * set the lastCaughtUpTimeMsUnderlying to the time when the previous fetch request was received.
+   * set `lastCaughtUpTimeMs` to the time when the previous fetch request was received.
    *
    * This is needed to enforce the semantics of ISR, i.e. a replica is in ISR if and only if it lags behind leader's LEO
-   * by at most replicaLagTimeMaxMs. This semantics allows a follower to be added to the ISR even if offset of its fetch request is
-   * always smaller than leader's LEO, which can happen if there are constant small produce requests at high frequency.
+   * by at most `replicaLagTimeMaxMs`. These semantics allow a follower to be added to the ISR even if the offset of its
+   * fetch request is always smaller than the leader's LEO, which can happen if small produce requests are received at
+   * high frequency.
    */
   def updateLogReadResult(logReadResult : LogReadResult) {
     if (logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.leaderLogEndOffset)
-      lastCaughtUpTimeMsUnderlying.set(logReadResult.fetchTimeMs)
+      _lastCaughtUpTimeMs = logReadResult.fetchTimeMs
     else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset)
-      lastCaughtUpTimeMsUnderlying.set(lastFetchTimeMs)
+      _lastCaughtUpTimeMs = lastFetchTimeMs
 
     logEndOffset = logReadResult.info.fetchOffsetMetadata
     lastFetchLeaderLogEndOffset = logReadResult.leaderLogEndOffset
@@ -80,16 +80,15 @@ class Replica(val brokerId: Int,
   def resetLastCaughtUpTime(curLeaderLogEndOffset: Long, curTimeMs: Long, lastCaughtUpTimeMs: Long) {
     lastFetchLeaderLogEndOffset = curLeaderLogEndOffset
     lastFetchTimeMs = curTimeMs
-    lastCaughtUpTimeMsUnderlying.set(lastCaughtUpTimeMs)
+    _lastCaughtUpTimeMs = lastCaughtUpTimeMs
   }
 
   private def logEndOffset_=(newLogEndOffset: LogOffsetMetadata) {
     if (isLocal) {
-      throw new KafkaException("Should not set log end offset on partition [%s,%d]'s local replica %d".format(topic, partitionId, brokerId))
+      throw new KafkaException(s"Should not set log end offset on partition $topicPartition's local replica $brokerId")
     } else {
       logEndOffsetMetadata = newLogEndOffset
-      trace("Setting log end offset for replica %d for partition [%s,%d] to [%s]"
-        .format(brokerId, topic, partitionId, logEndOffsetMetadata))
+      trace(s"Setting log end offset for replica $brokerId for partition $topicPartition to [$logEndOffsetMetadata]")
     }
   }
 
@@ -102,10 +101,9 @@ class Replica(val brokerId: Int,
   def highWatermark_=(newHighWatermark: LogOffsetMetadata) {
     if (isLocal) {
       highWatermarkMetadata = newHighWatermark
-      trace("Setting high watermark for replica %d partition [%s,%d] on broker %d to [%s]"
-        .format(brokerId, topic, partitionId, brokerId, newHighWatermark))
+      trace(s"Setting high watermark for replica $brokerId partition $topicPartition to [$newHighWatermark]")
     } else {
-      throw new KafkaException("Should not set high watermark on partition [%s,%d]'s non-local replica %d".format(topic, partitionId, brokerId))
+      throw new KafkaException(s"Should not set high watermark on partition $topicPartition's non-local replica $brokerId")
     }
   }
 
@@ -115,31 +113,24 @@ class Replica(val brokerId: Int,
     if (isLocal) {
       highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset)
     } else {
-      throw new KafkaException("Should not construct complete high watermark on partition [%s,%d]'s non-local replica %d".format(topic, partitionId, brokerId))
+      throw new KafkaException(s"Should not construct complete high watermark on partition $topicPartition's non-local replica $brokerId")
     }
   }
 
-  override def equals(that: Any): Boolean = {
-    if(!that.isInstanceOf[Replica])
-      return false
-    val other = that.asInstanceOf[Replica]
-    if(topic.equals(other.topic) && brokerId == other.brokerId && partition.equals(other.partition))
-      return true
-    false
-  }
-
-  override def hashCode(): Int = {
-    31 + topic.hashCode() + 17*brokerId + partition.hashCode()
+  override def equals(that: Any): Boolean = that match {
+    case other: Replica => brokerId == other.brokerId && topicPartition == other.topicPartition
+    case _ => false
   }
 
+  override def hashCode: Int = 31 + topicPartition.hashCode + 17 * brokerId
 
   override def toString: String = {
     val replicaString = new StringBuilder
     replicaString.append("ReplicaId: " + brokerId)
-    replicaString.append("; Topic: " + topic)
+    replicaString.append("; Topic: " + partition.topic)
     replicaString.append("; Partition: " + partition.partitionId)
     replicaString.append("; isLocal: " + isLocal)
-    if(isLocal) replicaString.append("; Highwatermark: " + highWatermark)
-    replicaString.toString()
+    if (isLocal) replicaString.append("; Highwatermark: " + highWatermark)
+    replicaString.toString
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/common/TopicAndPartition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/TopicAndPartition.scala b/core/src/main/scala/kafka/common/TopicAndPartition.scala
index cf892c4..35b6bcd 100644
--- a/core/src/main/scala/kafka/common/TopicAndPartition.scala
+++ b/core/src/main/scala/kafka/common/TopicAndPartition.scala
@@ -30,10 +30,10 @@ case class TopicAndPartition(topic: String, partition: Int) {
 
   def this(partition: Partition) = this(partition.topic, partition.partitionId)
 
-  def this(replica: Replica) = this(replica.topic, replica.partitionId)
-
   def this(topicPartition: TopicPartition) = this(topicPartition.topic, topicPartition.partition)
 
+  def this(replica: Replica) = this(replica.topicPartition)
+
   def asTuple = (topic, partition)
 
   override def toString = "[%s,%d]".format(topic, partition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 40071b2..a820171 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -186,7 +186,7 @@ class RequestSendThread(val controllerId: Int,
             else {
               val requestHeader = apiVersion.fold(networkClient.nextRequestHeader(apiKey))(networkClient.nextRequestHeader(apiKey, _))
               val clientRequest = new ClientRequest(brokerNode.idString, time.milliseconds(), true, requestHeader, request, null)
-              clientResponse = networkClient.blockingSendAndReceive(clientRequest, request)(time)
+              clientResponse = networkClient.blockingSendAndReceive(clientRequest)(time)
               isSendSuccessful = true
             }
           } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index e649946..637b0c4 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -606,7 +606,7 @@ class GroupMetadataManager(val brokerId: Int,
     val partitionOpt = replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, partitionId))
 
     val hw = partitionOpt.map { partition =>
-      partition.leaderReplicaIfLocal().map(_.highWatermark.messageOffset).getOrElse(-1L)
+      partition.leaderReplicaIfLocal.map(_.highWatermark.messageOffset).getOrElse(-1L)
     }.getOrElse(-1L)
 
     hw

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 620ae9b..5f06a73 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -451,11 +451,8 @@ private[log] class Cleaner(val id: Int,
                              maxLogMessageSize: Int,
                              stats: CleanerStats) {
 
-    def shouldRetainEntry(logEntry: LogEntry): Boolean =
-      shouldRetainMessage(source, map, retainDeletes, logEntry, stats)
-
-    class LogCleanerFilter extends LogEntryFilter {
-      def shouldRetain(logEntry: LogEntry): Boolean = shouldRetainEntry(logEntry)
+    val logCleanerFilter = new LogEntryFilter {
+      def shouldRetain(logEntry: LogEntry): Boolean = shouldRetainMessage(source, map, retainDeletes, logEntry, stats)
     }
 
     var position = 0
@@ -468,7 +465,7 @@ private[log] class Cleaner(val id: Int,
       source.log.readInto(readBuffer, position)
       val records = MemoryRecords.readableRecords(readBuffer)
       throttler.maybeThrottle(records.sizeInBytes)
-      val result = records.filterTo(new LogCleanerFilter, writeBuffer)
+      val result = records.filterTo(logCleanerFilter, writeBuffer)
       stats.readMessages(result.messagesRead, result.bytesRead)
       stats.recopyMessages(result.messagesRetained, result.bytesRetained)
 


Mime
View raw message