kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [3/3] kafka git commit: KAFKA-4586; Add purgeDataBefore() API (KIP-107)
Date Tue, 28 Mar 2017 16:59:51 GMT
KAFKA-4586; Add purgeDataBefore() API (KIP-107)

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Jiangjie Qin <becket.qin@gmail.com>

Closes #2476 from lindong28/KAFKA-4586


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8b05ad40
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8b05ad40
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8b05ad40

Branch: refs/heads/trunk
Commit: 8b05ad406d4cba6a75d1683b6d8699c3ab28f9d6
Parents: f3f9a9e
Author: Dong Lin <lindong28@gmail.com>
Authored: Tue Mar 28 09:59:44 2017 -0700
Committer: Jiangjie Qin <becket.qin@gmail.com>
Committed: Tue Mar 28 09:59:44 2017 -0700

----------------------------------------------------------------------
 bin/kafka-delete-records.sh                     |  17 ++
 .../clients/consumer/internals/Fetcher.java     |   4 +-
 .../consumer/internals/RequestFuture.java       |  36 ++--
 .../kafka/common/network/DualSocketChannel.java |   4 +
 .../apache/kafka/common/protocol/ApiKeys.java   |   3 +-
 .../apache/kafka/common/protocol/Protocol.java  | 112 +++++++++++-
 .../apache/kafka/common/record/FileRecords.java |  15 +-
 .../kafka/common/requests/AbstractRequest.java  |   3 +
 .../kafka/common/requests/AbstractResponse.java |   2 +
 .../common/requests/DeleteRecordsRequest.java   | 151 +++++++++++++++
 .../common/requests/DeleteRecordsResponse.java  | 135 ++++++++++++++
 .../kafka/common/requests/FetchRequest.java     |  30 +--
 .../kafka/common/requests/FetchResponse.java    |  35 ++--
 .../clients/consumer/KafkaConsumerTest.java     |   2 +-
 .../clients/consumer/internals/FetcherTest.java |   4 +-
 .../common/requests/RequestResponseTest.java    |  16 +-
 .../main/scala/kafka/admin/AdminClient.scala    | 183 +++++++++++++++++--
 .../kafka/admin/BrokerApiVersionsCommand.scala  |   1 +
 .../kafka/admin/DeleteRecordsCommand.scala      | 117 ++++++++++++
 core/src/main/scala/kafka/api/ApiVersion.scala  |  10 +-
 .../src/main/scala/kafka/api/FetchRequest.scala |   2 +-
 .../main/scala/kafka/cluster/Partition.scala    |  58 +++++-
 core/src/main/scala/kafka/cluster/Replica.scala |  34 +++-
 .../kafka/consumer/ConsumerFetcherThread.scala  |   3 +-
 core/src/main/scala/kafka/log/Log.scala         |  98 +++++++---
 core/src/main/scala/kafka/log/LogManager.scala  |  56 +++++-
 core/src/main/scala/kafka/log/LogSegment.scala  |  33 ++--
 .../kafka/server/AbstractFetcherThread.scala    |  26 +--
 .../kafka/server/DelayedDeleteRecords.scala     | 129 +++++++++++++
 .../main/scala/kafka/server/DelayedFetch.scala  |   2 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  67 ++++++-
 .../main/scala/kafka/server/KafkaConfig.scala   |  10 +
 .../main/scala/kafka/server/KafkaServer.scala   |   5 +-
 .../main/scala/kafka/server/MetadataCache.scala |   6 +
 .../kafka/server/ReplicaFetcherThread.scala     |  15 +-
 .../scala/kafka/server/ReplicaManager.scala     | 155 ++++++++++++++--
 .../integration/kafka/api/AdminClientTest.scala | 179 +++++++++++++++---
 .../kafka/api/AuthorizerIntegrationTest.scala   |   2 +-
 .../kafka/api/ConsumerBounceTest.scala          |   1 +
 .../ReplicaFetcherThreadFatalErrorTest.scala    |   4 +-
 .../test/scala/other/kafka/StressTestLog.scala  |   1 +
 .../other/kafka/TestLinearWriteSpeed.scala      |   2 +-
 .../unit/kafka/log/BrokerCompressionTest.scala  |   2 +-
 .../kafka/log/LogCleanerIntegrationTest.scala   |   1 +
 .../log/LogCleanerLagIntegrationTest.scala      |   1 +
 .../unit/kafka/log/LogCleanerManagerTest.scala  |   3 +-
 .../scala/unit/kafka/log/LogCleanerTest.scala   |   2 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |  97 +++++++---
 .../server/AbstractFetcherThreadTest.scala      |   4 +-
 .../unit/kafka/server/FetchRequestTest.scala    |   2 +-
 .../server/HighwatermarkPersistenceTest.scala   |   6 +-
 .../unit/kafka/server/ISRExpirationTest.scala   |  12 +-
 .../unit/kafka/server/KafkaConfigTest.scala     |   2 +
 .../scala/unit/kafka/server/LogOffsetTest.scala |  40 +++-
 .../kafka/server/ReplicaManagerQuotasTest.scala |   5 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  |  42 +++--
 .../unit/kafka/server/SimpleFetchTest.scala     |   7 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |   3 +-
 docs/upgrade.html                               |   6 +
 59 files changed, 1732 insertions(+), 271 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/bin/kafka-delete-records.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-delete-records.sh b/bin/kafka-delete-records.sh
new file mode 100755
index 0000000..8726f91
--- /dev/null
+++ b/bin/kafka-delete-records.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.admin.DeleteRecordsCommand "$@"

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 7236653..c2456cc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -200,7 +200,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
 
                             for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                                 TopicPartition partition = entry.getKey();
-                                long fetchOffset = request.fetchData().get(partition).offset;
+                                long fetchOffset = request.fetchData().get(partition).fetchOffset;
                                 FetchResponse.PartitionData fetchData = entry.getValue();
                                 completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
                                         resp.requestHeader().apiVersion()));
@@ -722,7 +722,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
                 }
 
                 long position = this.subscriptions.position(partition);
-                fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize));
+                fetch.put(partition, new FetchRequest.PartitionData(position, FetchRequest.INVALID_LOG_START_OFFSET, this.fetchSize));
                 log.trace("Added fetch request for partition {} at offset {} to node {}", partition, position, node);
             } else {
                 log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
index 2b7c8f3..8515c95 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
@@ -18,7 +18,8 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.protocol.Errors;
-
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -46,6 +47,7 @@ public class RequestFuture<T> implements ConsumerNetworkClient.PollCondition {
     private static final Object INCOMPLETE_SENTINEL = new Object();
     private final AtomicReference<Object> result = new AtomicReference<>(INCOMPLETE_SENTINEL);
     private final ConcurrentLinkedQueue<RequestFutureListener<T>> listeners = new ConcurrentLinkedQueue<>();
+    private final CountDownLatch completedLatch = new CountDownLatch(1);
 
     /**
      * Check whether the response is ready to be handled
@@ -55,6 +57,10 @@ public class RequestFuture<T> implements ConsumerNetworkClient.PollCondition {
         return result.get() != INCOMPLETE_SENTINEL;
     }
 
+    public boolean awaitDone(long timeout, TimeUnit unit) throws InterruptedException {
+        return completedLatch.await(timeout, unit);
+    }
+
     /**
      * Get the value corresponding to this request (only available if the request succeeded)
      * @return the value set in {@link #complete(Object)}
@@ -112,12 +118,16 @@ public class RequestFuture<T> implements ConsumerNetworkClient.PollCondition {
      * @throws IllegalArgumentException if the argument is an instance of {@link RuntimeException}
      */
     public void complete(T value) {
-        if (value instanceof RuntimeException)
-            throw new IllegalArgumentException("The argument to complete can not be an instance of RuntimeException");
+        try {
+            if (value instanceof RuntimeException)
+                throw new IllegalArgumentException("The argument to complete can not be an instance of RuntimeException");
 
-        if (!result.compareAndSet(INCOMPLETE_SENTINEL, value))
-            throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
-        fireSuccess();
+            if (!result.compareAndSet(INCOMPLETE_SENTINEL, value))
+                throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
+            fireSuccess();
+        } finally {
+            completedLatch.countDown();
+        }
     }
 
     /**
@@ -127,13 +137,17 @@ public class RequestFuture<T> implements ConsumerNetworkClient.PollCondition {
      * @throws IllegalStateException if the future has already been completed
      */
     public void raise(RuntimeException e) {
-        if (e == null)
-            throw new IllegalArgumentException("The exception passed to raise must not be null");
+        try {
+            if (e == null)
+                throw new IllegalArgumentException("The exception passed to raise must not be null");
 
-        if (!result.compareAndSet(INCOMPLETE_SENTINEL, e))
-            throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
+            if (!result.compareAndSet(INCOMPLETE_SENTINEL, e))
+                throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
 
-        fireFailure();
+            fireFailure();
+        } finally {
+            completedLatch.countDown();
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/network/DualSocketChannel.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/DualSocketChannel.java b/clients/src/main/java/org/apache/kafka/common/network/DualSocketChannel.java
new file mode 100644
index 0000000..411dd50
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/DualSocketChannel.java
@@ -0,0 +1,4 @@
+package org.apache.kafka.common.network;
+
+public class DualSocketChannel {
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index b3c59a1..89b2000 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -45,7 +45,8 @@ public enum ApiKeys {
     SASL_HANDSHAKE(17, "SaslHandshake"),
     API_VERSIONS(18, "ApiVersions"),
     CREATE_TOPICS(19, "CreateTopics"),
-    DELETE_TOPICS(20, "DeleteTopics");
+    DELETE_TOPICS(20, "DeleteTopics"),
+    DELETE_RECORDS(21, "DeleteRecords");
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 8c3e08c..5d7004a 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -492,11 +492,31 @@ public class Protocol {
                                                                                  INT32,
                                                                                  "Maximum bytes to fetch."));
 
+    // FETCH_REQUEST_PARTITION_V1 added log_start_offset field - the earliest available offset of partition data that can be consumed.
+    public static final Schema FETCH_REQUEST_PARTITION_V5 = new Schema(new Field("partition",
+                                                                                 INT32,
+                                                                                 "Topic partition id."),
+                                                                       new Field("fetch_offset",
+                                                                                 INT64,
+                                                                                 "Message offset."),
+                                                                       new Field("log_start_offset",
+                                                                                 INT64,
+                                                                                 "Earliest available offset of the follower replica. " +
+                                                                                 "The field is only used when request is sent by follower. "),
+                                                                       new Field("max_bytes",
+                                                                                 INT32,
+                                                                                 "Maximum bytes to fetch."));
+
     public static final Schema FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic", STRING, "Topic to fetch."),
                                                                    new Field("partitions",
                                                                              new ArrayOf(FETCH_REQUEST_PARTITION_V0),
                                                                              "Partitions to fetch."));
 
+    public static final Schema FETCH_REQUEST_TOPIC_V5 = new Schema(new Field("topic", STRING, "Topic to fetch."),
+                                                                   new Field("partitions",
+                                                                             new ArrayOf(FETCH_REQUEST_PARTITION_V5),
+                                                                             "Partitions to fetch."));
+
     public static final Schema FETCH_REQUEST_V0 = new Schema(new Field("replica_id",
                                                                        INT32,
                                                                        "Broker id of the follower. For normal consumers, use -1."),
@@ -565,6 +585,34 @@ public class Protocol {
                     new ArrayOf(FETCH_REQUEST_TOPIC_V0),
                     "Topics to fetch in the order provided."));
 
+    // FETCH_REQUEST_V5 added a per-partition log_start_offset field - the earliest available offset of partition data that can be consumed.
+    public static final Schema FETCH_REQUEST_V5 = new Schema(
+            new Field("replica_id",
+                    INT32,
+                    "Broker id of the follower. For normal consumers, use -1."),
+            new Field("max_wait_time",
+                    INT32,
+                    "Maximum time in ms to wait for the response."),
+            new Field("min_bytes",
+                    INT32,
+                    "Minimum bytes to accumulate in the response."),
+            new Field("max_bytes",
+                    INT32,
+                    "Maximum bytes to accumulate in the response. Note that this is not an absolute maximum, " +
+                    "if the first message in the first non-empty partition of the fetch is larger than this " +
+                    "value, the message will still be returned to ensure that progress can be made."),
+            new Field("isolation_level",
+                    INT8,
+                    "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED " +
+                    "(isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), " +
+                     "non-transactional and COMMITTED transactional records are visible. To be more concrete, " +
+                     "READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), " +
+                     "and enables the inclusion of the list of aborted transactions in the result, which allows " +
+                     "consumers to discard ABORTED transactional records"),
+            new Field("topics",
+                    new ArrayOf(FETCH_REQUEST_TOPIC_V5),
+                    "Topics to fetch in the order provided."));
+
     public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V0 = new Schema(new Field("partition",
                                                                                          INT32,
                                                                                          "Topic partition id."),
@@ -602,6 +650,8 @@ public class Protocol {
             new Field("pid", INT64, "The producer ID (PID) associated with the aborted transactions"),
             new Field("first_offset", INT64, "The first offset in the aborted transaction"));
 
+    public static final Schema FETCH_RESPONSE_ABORTED_TRANSACTION_V5 = FETCH_RESPONSE_ABORTED_TRANSACTION_V4;
+
     public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V4 = new Schema(
             new Field("partition",
                     INT32,
@@ -617,14 +667,41 @@ public class Protocol {
             new Field("aborted_transactions",
                     ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V4)));
 
+    // FETCH_RESPONSE_PARTITION_HEADER_V5 added log_start_offset field - the earliest available offset of partition data that can be consumed.
+    public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V5 = new Schema(
+            new Field("partition",
+                    INT32,
+                    "Topic partition id."),
+            new Field("error_code", INT16),
+            new Field("high_watermark",
+                    INT64,
+                    "Last committed offset."),
+            new Field("last_stable_offset",
+                    INT64,
+                    "The last stable offset (or LSO) of the partition. This is the last offset such that the state " +
+                    "of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)"),
+            new Field("log_start_offset",
+                    INT64,
+                    "Earliest available offset."),
+            new Field("aborted_transactions",
+                    ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V5)));
+
     public static final Schema FETCH_RESPONSE_PARTITION_V4 = new Schema(
             new Field("partition_header", FETCH_RESPONSE_PARTITION_HEADER_V4),
             new Field("record_set", RECORDS));
 
+    public static final Schema FETCH_RESPONSE_PARTITION_V5 = new Schema(
+            new Field("partition_header", FETCH_RESPONSE_PARTITION_HEADER_V5),
+            new Field("record_set", RECORDS));
+
     public static final Schema FETCH_RESPONSE_TOPIC_V4 = new Schema(
             new Field("topic", STRING),
             new Field("partition_responses", new ArrayOf(FETCH_RESPONSE_PARTITION_V4)));
 
+    public static final Schema FETCH_RESPONSE_TOPIC_V5 = new Schema(
+            new Field("topic", STRING),
+            new Field("partition_responses", new ArrayOf(FETCH_RESPONSE_PARTITION_V5)));
+
     public static final Schema FETCH_RESPONSE_V4 = new Schema(
             new Field("throttle_time_ms",
                     INT32,
@@ -633,8 +710,16 @@ public class Protocol {
                     0),
             new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V4)));
 
-    public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4};
-    public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3, FETCH_RESPONSE_V4};
+    public static final Schema FETCH_RESPONSE_V5 = new Schema(
+            new Field("throttle_time_ms",
+                    INT32,
+                    "Duration in milliseconds for which the request was throttled " +
+                    "due to quota violation (zero if the request did not violate any quota).",
+                    0),
+            new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V5)));
+
+    public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4, FETCH_REQUEST_V5};
+    public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2, FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5};
 
     /* List groups api */
     public static final Schema LIST_GROUPS_REQUEST_V0 = new Schema();
@@ -1070,6 +1155,27 @@ public class Protocol {
     public static final Schema[] DELETE_TOPICS_REQUEST = new Schema[] {DELETE_TOPICS_REQUEST_V0};
     public static final Schema[] DELETE_TOPICS_RESPONSE = new Schema[] {DELETE_TOPICS_RESPONSE_V0};
 
+    public static final Schema DELETE_RECORDS_REQUEST_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."),
+                                                                                new Field("offset", INT64, "The offset before which the messages will be deleted."));
+
+    public static final Schema DELETE_RECORDS_REQUEST_TOPIC_V0 = new Schema(new Field("topic", STRING, "Topic name."),
+                                                                            new Field("partitions", new ArrayOf(DELETE_RECORDS_REQUEST_PARTITION_V0)));
+
+    public static final Schema DELETE_RECORDS_REQUEST_V0 = new Schema(new Field("topics", new ArrayOf(DELETE_RECORDS_REQUEST_TOPIC_V0)),
+                                                                      new Field("timeout", INT32, "The maximum time to await a response in ms."));
+
+    public static final Schema DELETE_RECORDS_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."),
+                                                                                 new Field("low_watermark", INT64, "Smallest available offset of all live replicas"),
+                                                                                 new Field("error_code", INT16, "The error code for the given partition."));
+
+    public static final Schema DELETE_RECORDS_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING, "Topic name."),
+                                                                             new Field("partitions", new ArrayOf(DELETE_RECORDS_RESPONSE_PARTITION_V0)));
+
+    public static final Schema DELETE_RECORDS_RESPONSE_V0 = new Schema(new Field("topics", new ArrayOf(DELETE_RECORDS_RESPONSE_TOPIC_V0)));
+
+    public static final Schema[] DELETE_RECORDS_REQUEST = new Schema[] {DELETE_RECORDS_REQUEST_V0};
+    public static final Schema[] DELETE_RECORDS_RESPONSE = new Schema[] {DELETE_RECORDS_RESPONSE_V0};
+
     /* an array of all requests and responses with all schema versions; a null value in the inner array means that the
      * particular version is not supported */
     public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -1101,6 +1207,7 @@ public class Protocol {
         REQUESTS[ApiKeys.API_VERSIONS.id] = API_VERSIONS_REQUEST;
         REQUESTS[ApiKeys.CREATE_TOPICS.id] = CREATE_TOPICS_REQUEST;
         REQUESTS[ApiKeys.DELETE_TOPICS.id] = DELETE_TOPICS_REQUEST;
+        REQUESTS[ApiKeys.DELETE_RECORDS.id] = DELETE_RECORDS_REQUEST;
 
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
         RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -1123,6 +1230,7 @@ public class Protocol {
         RESPONSES[ApiKeys.API_VERSIONS.id] = API_VERSIONS_RESPONSE;
         RESPONSES[ApiKeys.CREATE_TOPICS.id] = CREATE_TOPICS_RESPONSE;
         RESPONSES[ApiKeys.DELETE_TOPICS.id] = DELETE_TOPICS_RESPONSE;
+        RESPONSES[ApiKeys.DELETE_RECORDS.id] = DELETE_RECORDS_RESPONSE;
 
         /* set the minimum and maximum version of each api */
         for (ApiKeys api : ApiKeys.values()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index b0dcebf..dcd7845 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -286,24 +286,25 @@ public class FileRecords extends AbstractRecords implements Closeable {
     }
 
     /**
-     * Search forward for the message whose timestamp is greater than or equals to the target timestamp.
+     * Search forward for the first message that meets the following requirements:
+     * - Message's timestamp is greater than or equals to the targetTimestamp.
+     * - Message's position in the log file is greater than or equals to the startingPosition.
+     * - Message's offset is greater than or equals to the startingOffset.
      *
      * @param targetTimestamp The timestamp to search for.
      * @param startingPosition The starting position to search.
-     * @return The timestamp and offset of the message found. None, if no message is found.
+     * @param startingOffset The starting offset to search.
+     * @return The timestamp and offset of the message found. Null if no message is found.
      */
-    public TimestampAndOffset searchForTimestamp(long targetTimestamp, int startingPosition) {
+    public TimestampAndOffset searchForTimestamp(long targetTimestamp, int startingPosition, long startingOffset) {
         for (RecordBatch batch : batchesFrom(startingPosition)) {
             if (batch.maxTimestamp() >= targetTimestamp) {
                 // We found a message
                 for (Record record : batch) {
                     long timestamp = record.timestamp();
-                    if (timestamp >= targetTimestamp)
+                    if (timestamp >= targetTimestamp && record.offset() >= startingOffset)
                         return new TimestampAndOffset(timestamp, record.offset());
                 }
-                throw new IllegalStateException(String.format("The message set (max timestamp = %s, max offset = %s)" +
-                        " should contain target timestamp %s but it does not.", batch.maxTimestamp(),
-                        batch.lastOffset(), targetTimestamp));
             }
         }
         return null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 7dc3b62..3a99a8a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -168,6 +168,9 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
             case DELETE_TOPICS:
                 request = new DeleteTopicsRequest(struct, version);
                 break;
+            case DELETE_RECORDS:
+                request = new DeleteRecordsRequest(struct, version);
+                break;
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index d534daf..a5d0dc4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -91,6 +91,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
                 return new CreateTopicsResponse(struct);
             case DELETE_TOPICS:
                 return new DeleteTopicsResponse(struct);
+            case DELETE_RECORDS:
+                return new DeleteRecordsResponse(struct);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `getResponse`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
new file mode 100644
index 0000000..f204c44
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+import org.apache.kafka.common.utils.Utils;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DeleteRecordsRequest extends AbstractRequest {
+
+    public static final long HIGH_WATERMARK = -1L;
+
+    // request level key names
+    private static final String TOPICS_KEY_NAME = "topics";
+    private static final String TIMEOUT_KEY_NAME = "timeout";
+
+    // topic level key names
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+
+    // partition level key names
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String OFFSET_KEY_NAME = "offset";
+
+    private final int timeout;
+    private final Map<TopicPartition, Long> partitionOffsets;
+
+    public static class Builder extends AbstractRequest.Builder<DeleteRecordsRequest> {
+        private final int timeout;
+        private final Map<TopicPartition, Long> partitionOffsets;
+
+        public Builder(int timeout, Map<TopicPartition, Long> partitionOffsets) {
+            super(ApiKeys.DELETE_RECORDS);
+            this.timeout = timeout;
+            this.partitionOffsets = partitionOffsets;
+        }
+
+        @Override
+        public DeleteRecordsRequest build(short version) {
+            return new DeleteRecordsRequest(timeout, partitionOffsets, version);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append("(type=DeleteRecordsRequest")
+                   .append(", timeout=").append(timeout)
+                   .append(", partitionOffsets=(").append(Utils.mkString(partitionOffsets))
+                   .append("))");
+            return builder.toString();
+        }
+    }
+
+
+    public DeleteRecordsRequest(Struct struct, short version) {
+        super(version);
+        partitionOffsets = new HashMap<>();
+        for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) {
+            Struct topicStruct = (Struct) topicStructObj;
+            String topic = topicStruct.getString(TOPIC_KEY_NAME);
+            for (Object partitionStructObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionStruct = (Struct) partitionStructObj;
+                int partition = partitionStruct.getInt(PARTITION_KEY_NAME);
+                long offset = partitionStruct.getLong(OFFSET_KEY_NAME);
+                partitionOffsets.put(new TopicPartition(topic, partition), offset);
+            }
+        }
+        timeout = struct.getInt(TIMEOUT_KEY_NAME);
+    }
+
+    public DeleteRecordsRequest(int timeout, Map<TopicPartition, Long> partitionOffsets, short version) {
+        super(version);
+        this.timeout = timeout;
+        this.partitionOffsets = partitionOffsets;
+    }
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new Struct(ApiKeys.DELETE_RECORDS.requestSchema(version()));
+        Map<String, Map<Integer, Long>> offsetsByTopic = CollectionUtils.groupDataByTopic(partitionOffsets);
+        struct.set(TIMEOUT_KEY_NAME, timeout);
+        List<Struct> topicStructArray = new ArrayList<>();
+        for (Map.Entry<String, Map<Integer, Long>> offsetsByTopicEntry : offsetsByTopic.entrySet()) {
+            Struct topicStruct = struct.instance(TOPICS_KEY_NAME);
+            topicStruct.set(TOPIC_KEY_NAME, offsetsByTopicEntry.getKey());
+            List<Struct> partitionStructArray = new ArrayList<>();
+            for (Map.Entry<Integer, Long> offsetsByPartitionEntry : offsetsByTopicEntry.getValue().entrySet()) {
+                Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME);
+                partitionStruct.set(PARTITION_KEY_NAME, offsetsByPartitionEntry.getKey());
+                partitionStruct.set(OFFSET_KEY_NAME, offsetsByPartitionEntry.getValue());
+                partitionStructArray.add(partitionStruct);
+            }
+            topicStruct.set(PARTITIONS_KEY_NAME, partitionStructArray.toArray());
+            topicStructArray.add(topicStruct);
+        }
+        struct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
+        return struct;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(Throwable e) {
+        Map<TopicPartition, DeleteRecordsResponse.PartitionResponse> responseMap = new HashMap<>();
+
+        for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
+            responseMap.put(entry.getKey(), new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.forException(e)));
+        }
+
+        short versionId = version();
+        switch (versionId) {
+            case 0:
+                return new DeleteRecordsResponse(responseMap);
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                    versionId, this.getClass().getSimpleName(), ApiKeys.DELETE_RECORDS.latestVersion()));
+        }
+    }
+
+    public int timeout() {
+        return timeout;
+    }
+
+    public Map<TopicPartition, Long> partitionOffsets() {
+        return partitionOffsets;
+    }
+
+    public static DeleteRecordsRequest parse(ByteBuffer buffer, short version) {
+        return new DeleteRecordsRequest(ApiKeys.DELETE_RECORDS.parseRequest(version, buffer), version);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
new file mode 100644
index 0000000..45b518b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DeleteRecordsResponse extends AbstractResponse {
+
+    public static final long INVALID_LOW_WATERMARK = -1L;
+
+    // request level key names
+    private static final String TOPICS_KEY_NAME = "topics";
+
+    // topic level key names
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+
+    // partition level key names
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String LOW_WATERMARK_KEY_NAME = "low_watermark";
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    private final Map<TopicPartition, PartitionResponse> responses;
+
+    /**
+     * Possible error code:
+     *
+     * OFFSET_OUT_OF_RANGE (1)
+     * UNKNOWN_TOPIC_OR_PARTITION (3)
+     * NOT_LEADER_FOR_PARTITION (6)
+     * REQUEST_TIMED_OUT (7)
+     * NOT_ENOUGH_REPLICAS (19)
+     * UNKNOWN (-1)
+     */
+
+    public static final class PartitionResponse {
+        public long lowWatermark;
+        public Errors error;
+
+        public PartitionResponse(long lowWatermark, Errors error) {
+            this.lowWatermark = lowWatermark;
+            this.error = error;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append('{')
+                   .append(",low_watermark: ")
+                   .append(lowWatermark)
+                   .append("error: ")
+                   .append(error.toString())
+                   .append('}');
+            return builder.toString();
+        }
+    }
+
+    public DeleteRecordsResponse(Struct struct) {
+        responses = new HashMap<>();
+        for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) {
+            Struct topicStruct = (Struct) topicStructObj;
+            String topic = topicStruct.getString(TOPIC_KEY_NAME);
+            for (Object partitionStructObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionStruct = (Struct) partitionStructObj;
+                int partition = partitionStruct.getInt(PARTITION_KEY_NAME);
+                long lowWatermark = partitionStruct.getLong(LOW_WATERMARK_KEY_NAME);
+                Errors error = Errors.forCode(partitionStruct.getShort(ERROR_CODE_KEY_NAME));
+                responses.put(new TopicPartition(topic, partition), new PartitionResponse(lowWatermark, error));
+            }
+        }
+    }
+
+    /**
+     * Constructor for version 0.
+     */
+    public DeleteRecordsResponse(Map<TopicPartition, PartitionResponse> responses) {
+        this.responses = responses;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.DELETE_RECORDS.responseSchema(version));
+        Map<String, Map<Integer, PartitionResponse>> responsesByTopic = CollectionUtils.groupDataByTopic(responses);
+        List<Struct> topicStructArray = new ArrayList<>();
+        for (Map.Entry<String, Map<Integer, PartitionResponse>> responsesByTopicEntry : responsesByTopic.entrySet()) {
+            Struct topicStruct = struct.instance(TOPICS_KEY_NAME);
+            topicStruct.set(TOPIC_KEY_NAME, responsesByTopicEntry.getKey());
+            List<Struct> partitionStructArray = new ArrayList<>();
+            for (Map.Entry<Integer, PartitionResponse> responsesByPartitionEntry : responsesByTopicEntry.getValue().entrySet()) {
+                Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME);
+                PartitionResponse response = responsesByPartitionEntry.getValue();
+                partitionStruct.set(PARTITION_KEY_NAME, responsesByPartitionEntry.getKey());
+                partitionStruct.set(LOW_WATERMARK_KEY_NAME, response.lowWatermark);
+                partitionStruct.set(ERROR_CODE_KEY_NAME, response.error.code());
+                partitionStructArray.add(partitionStruct);
+            }
+            topicStruct.set(PARTITIONS_KEY_NAME, partitionStructArray.toArray());
+            topicStructArray.add(topicStruct);
+        }
+        struct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
+        return struct;
+    }
+
+    public Map<TopicPartition, PartitionResponse> responses() {
+        return this.responses;
+    }
+
+    public static DeleteRecordsResponse parse(ByteBuffer buffer, short version) {
+        return new DeleteRecordsResponse(ApiKeys.DELETE_RECORDS.responseSchema(version).read(buffer));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 7c029ca..b843c66 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -47,9 +47,11 @@ public class FetchRequest extends AbstractRequest {
     // partition level field names
     private static final String PARTITION_KEY_NAME = "partition";
     private static final String FETCH_OFFSET_KEY_NAME = "fetch_offset";
+    private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset";
 
     // default values for older versions where a request level limit did not exist
     public static final int DEFAULT_RESPONSE_MAX_BYTES = Integer.MAX_VALUE;
+    public static final long INVALID_LOG_START_OFFSET = -1L;
 
     private final int replicaId;
     private final int maxWait;
@@ -59,17 +61,19 @@ public class FetchRequest extends AbstractRequest {
     private final LinkedHashMap<TopicPartition, PartitionData> fetchData;
 
     public static final class PartitionData {
-        public final long offset;
+        public final long fetchOffset;
+        public final long logStartOffset;
         public final int maxBytes;
 
-        public PartitionData(long offset, int maxBytes) {
-            this.offset = offset;
+        public PartitionData(long fetchOffset, long logStartOffset, int maxBytes) {
+            this.fetchOffset = fetchOffset;
+            this.logStartOffset = logStartOffset;
             this.maxBytes = maxBytes;
         }
 
         @Override
         public String toString() {
-            return "(offset=" + offset + ", maxBytes=" + maxBytes + ")";
+            return "(offset=" + fetchOffset + ", logStartOffset=" + logStartOffset + ", maxBytes=" + maxBytes + ")";
         }
     }
 
@@ -188,7 +192,9 @@ public class FetchRequest extends AbstractRequest {
                 int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
                 long offset = partitionResponse.getLong(FETCH_OFFSET_KEY_NAME);
                 int maxBytes = partitionResponse.getInt(MAX_BYTES_KEY_NAME);
-                PartitionData partitionData = new PartitionData(offset, maxBytes);
+                long logStartOffset = partitionResponse.hasField(LOG_START_OFFSET_KEY_NAME) ?
+                    partitionResponse.getLong(LOG_START_OFFSET_KEY_NAME) : INVALID_LOG_START_OFFSET;
+                PartitionData partitionData = new PartitionData(offset, logStartOffset, maxBytes);
                 fetchData.put(new TopicPartition(topic, partition), partitionData);
             }
         }
@@ -200,7 +206,8 @@ public class FetchRequest extends AbstractRequest {
 
         for (Map.Entry<TopicPartition, PartitionData> entry: fetchData.entrySet()) {
             FetchResponse.PartitionData partitionResponse = new FetchResponse.PartitionData(Errors.forException(e),
-                    FetchResponse.INVALID_LSO, FetchResponse.INVALID_HIGHWATERMARK, null, MemoryRecords.EMPTY);
+                FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET,
+                null, MemoryRecords.EMPTY);
             responseData.put(entry.getKey(), partitionResponse);
         }
         return new FetchResponse(responseData, 0);
@@ -240,16 +247,15 @@ public class FetchRequest extends AbstractRequest {
 
     @Override
     protected Struct toStruct() {
-        short version = version();
-        Struct struct = new Struct(ApiKeys.FETCH.requestSchema(version));
+        Struct struct = new Struct(ApiKeys.FETCH.requestSchema(version()));
         List<TopicAndPartitionData<PartitionData>> topicsData = TopicAndPartitionData.batchByTopic(fetchData);
 
         struct.set(REPLICA_ID_KEY_NAME, replicaId);
         struct.set(MAX_WAIT_KEY_NAME, maxWait);
         struct.set(MIN_BYTES_KEY_NAME, minBytes);
-        if (version >= 3)
+        if (struct.hasField(MAX_BYTES_KEY_NAME))
             struct.set(MAX_BYTES_KEY_NAME, maxBytes);
-        if (version >= 4)
+        if (struct.hasField(ISOLATION_LEVEL_KEY_NAME))
             struct.set(ISOLATION_LEVEL_KEY_NAME, IsolationLevel.READ_UNCOMMITTED.id());
 
         List<Struct> topicArray = new ArrayList<>();
@@ -261,7 +267,9 @@ public class FetchRequest extends AbstractRequest {
                 PartitionData fetchPartitionData = partitionEntry.getValue();
                 Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
                 partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(FETCH_OFFSET_KEY_NAME, fetchPartitionData.offset);
+                partitionData.set(FETCH_OFFSET_KEY_NAME, fetchPartitionData.fetchOffset);
+                if (partitionData.hasField(LOG_START_OFFSET_KEY_NAME))
+                    partitionData.set(LOG_START_OFFSET_KEY_NAME, fetchPartitionData.logStartOffset);
                 partitionData.set(MAX_BYTES_KEY_NAME, fetchPartitionData.maxBytes);
                 partitionArray.add(partitionData);
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index f0a0eee..56eb838 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -51,6 +51,7 @@ public class FetchResponse extends AbstractResponse {
     private static final String ERROR_CODE_KEY_NAME = "error_code";
     private static final String HIGH_WATERMARK_KEY_NAME = "high_watermark";
     private static final String LAST_STABLE_OFFSET_KEY_NAME = "last_stable_offset";
+    private static final String LOG_START_OFFSET_KEY_NAME = "log_start_offset";
     private static final String ABORTED_TRANSACTIONS_KEY_NAME = "aborted_transactions";
     private static final String RECORD_SET_KEY_NAME = "record_set";
 
@@ -60,7 +61,8 @@ public class FetchResponse extends AbstractResponse {
 
     private static final int DEFAULT_THROTTLE_TIME = 0;
     public static final long INVALID_HIGHWATERMARK = -1L;
-    public static final long INVALID_LSO = -1L;
+    public static final long INVALID_LAST_STABLE_OFFSET = -1L;
+    public static final long INVALID_LOG_START_OFFSET = -1L;
 
     /**
      * Possible error codes:
@@ -111,19 +113,22 @@ public class FetchResponse extends AbstractResponse {
 
     public static final class PartitionData {
         public final Errors error;
-        public final long lastStableOffset;
         public final long highWatermark;
+        public final long lastStableOffset;
+        public final long logStartOffset;
         public final List<AbortedTransaction> abortedTransactions;
         public final Records records;
 
         public PartitionData(Errors error,
                              long highWatermark,
                              long lastStableOffset,
+                             long logStartOffset,
                              List<AbortedTransaction> abortedTransactions,
                              Records records) {
             this.error = error;
             this.highWatermark = highWatermark;
             this.lastStableOffset = lastStableOffset;
+            this.logStartOffset = logStartOffset;
             this.abortedTransactions = abortedTransactions;
             this.records = records;
         }
@@ -140,6 +145,7 @@ public class FetchResponse extends AbstractResponse {
             return error == that.error &&
                     highWatermark == that.highWatermark &&
                     lastStableOffset == that.lastStableOffset &&
+                    logStartOffset == that.logStartOffset &&
                     (abortedTransactions == null ? that.abortedTransactions == null : abortedTransactions.equals(that.abortedTransactions)) &&
                     (records == null ? that.records == null : records.equals(that.records));
         }
@@ -147,8 +153,9 @@ public class FetchResponse extends AbstractResponse {
         @Override
         public int hashCode() {
             int result = error != null ? error.hashCode() : 0;
-            result = 31 * result + (int) (lastStableOffset ^ (lastStableOffset >>> 32));
             result = 31 * result + (int) (highWatermark ^ (highWatermark >>> 32));
+            result = 31 * result + (int) (lastStableOffset ^ (lastStableOffset >>> 32));
+            result = 31 * result + (int) (logStartOffset ^ (logStartOffset >>> 32));
             result = 31 * result + (abortedTransactions != null ? abortedTransactions.hashCode() : 0);
             result = 31 * result + (records != null ? records.hashCode() : 0);
             return result;
@@ -157,15 +164,16 @@ public class FetchResponse extends AbstractResponse {
         @Override
         public String toString() {
             return "(error=" + error + ", highWaterMark=" + highWatermark +
-                    ", lastStableOffset = " + lastStableOffset + ", " +
-                    "abortedTransactions = " + abortedTransactions + ", records=" + records + ")";
+                    ", lastStableOffset = " + lastStableOffset +
+                    ", logStartOffset = " + logStartOffset +
+                    ", abortedTransactions = " + abortedTransactions + ", records=" + records + ")";
         }
     }
 
     /**
      * Constructor for all versions.
      *
-     * From version 3, the entries in `responseData` should be in the same order as the entries in
+     * From version 3 or later, the entries in `responseData` should be in the same order as the entries in
      * `FetchRequest.fetchData`.
      *
      * @param responseData fetched data grouped by topic-partition
@@ -187,10 +195,13 @@ public class FetchResponse extends AbstractResponse {
                 int partition = partitionResponseHeader.getInt(PARTITION_KEY_NAME);
                 Errors error = Errors.forCode(partitionResponseHeader.getShort(ERROR_CODE_KEY_NAME));
                 long highWatermark = partitionResponseHeader.getLong(HIGH_WATERMARK_KEY_NAME);
-                long lastStableOffset = INVALID_LSO;
+                long lastStableOffset = INVALID_LAST_STABLE_OFFSET;
                 if (partitionResponseHeader.hasField(LAST_STABLE_OFFSET_KEY_NAME))
                     lastStableOffset = partitionResponseHeader.getLong(LAST_STABLE_OFFSET_KEY_NAME);
-
+                long logStartOffset = INVALID_LOG_START_OFFSET;
+                if (partitionResponseHeader.hasField(LOG_START_OFFSET_KEY_NAME))
+                    logStartOffset = partitionResponseHeader.getLong(LOG_START_OFFSET_KEY_NAME);
+                
                 Records records = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
 
                 List<AbortedTransaction> abortedTransactions = null;
@@ -207,7 +218,7 @@ public class FetchResponse extends AbstractResponse {
                     }
                 }
 
-                PartitionData partitionData = new PartitionData(error, highWatermark, lastStableOffset,
+                PartitionData partitionData = new PartitionData(error, highWatermark, lastStableOffset, logStartOffset,
                         abortedTransactions, records);
                 responseData.put(new TopicPartition(topic, partition), partitionData);
             }
@@ -319,7 +330,7 @@ public class FetchResponse extends AbstractResponse {
                 partitionDataHeader.set(ERROR_CODE_KEY_NAME, fetchPartitionData.error.code());
                 partitionDataHeader.set(HIGH_WATERMARK_KEY_NAME, fetchPartitionData.highWatermark);
 
-                if (version >= 4) {
+                if (partitionDataHeader.hasField(LAST_STABLE_OFFSET_KEY_NAME)) {
                     partitionDataHeader.set(LAST_STABLE_OFFSET_KEY_NAME, fetchPartitionData.lastStableOffset);
 
                     if (fetchPartitionData.abortedTransactions == null) {
@@ -335,6 +346,8 @@ public class FetchResponse extends AbstractResponse {
                         partitionDataHeader.set(ABORTED_TRANSACTIONS_KEY_NAME, abortedTransactionStructs.toArray());
                     }
                 }
+                if (partitionDataHeader.hasField(LOG_START_OFFSET_KEY_NAME))
+                    partitionDataHeader.set(LOG_START_OFFSET_KEY_NAME, fetchPartitionData.logStartOffset);
 
                 partitionData.set(PARTITION_HEADER_KEY_NAME, partitionDataHeader);
                 partitionData.set(RECORD_SET_KEY_NAME, fetchPartitionData.records);
@@ -345,7 +358,7 @@ public class FetchResponse extends AbstractResponse {
         }
         struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
 
-        if (version >= 1)
+        if (struct.hasField(THROTTLE_TIME_KEY_NAME))
             struct.set(THROTTLE_TIME_KEY_NAME, throttleTime);
 
         return struct;

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 7f20472..3dd3983 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1467,7 +1467,7 @@ public class KafkaConsumerTest {
                     builder.append(0L, ("key-" + i).getBytes(), ("value-" + i).getBytes());
                 records = builder.build();
             }
-            tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE, 0, FetchResponse.INVALID_LSO,
+            tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE, 0, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L,
                     null, records));
         }
         return new FetchResponse(tpResponses, 0);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 92150a6..224f83c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -222,7 +222,7 @@ public class FetcherTest {
             public boolean matches(AbstractRequest body) {
                 FetchRequest fetch = (FetchRequest) body;
                 return fetch.fetchData().containsKey(tp) &&
-                        fetch.fetchData().get(tp).offset == offset;
+                        fetch.fetchData().get(tp).fetchOffset == offset;
             }
         };
     }
@@ -966,7 +966,7 @@ public class FetcherTest {
 
     private FetchResponse fetchResponse(MemoryRecords records, Errors error, long hw, int throttleTime) {
         Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp,
-                new FetchResponse.PartitionData(error, hw, FetchResponse.INVALID_LSO, null, records));
+                new FetchResponse.PartitionData(error, hw, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records));
         return new FetchResponse(new LinkedHashMap<>(partitions), throttleTime);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 8a6d69a..ad7260e 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -348,7 +348,7 @@ public class RequestResponseTest {
 
         MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10));
         responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE, 1000000,
-                FetchResponse.INVALID_LSO, null, records));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records));
 
         FetchResponse v0Response = new FetchResponse(responseData, 0);
         FetchResponse v1Response = new FetchResponse(responseData, 10);
@@ -373,11 +373,11 @@ public class RequestResponseTest {
                 new FetchResponse.AbortedTransaction(15, 50)
         );
         responseData.put(new TopicPartition("bar", 0), new FetchResponse.PartitionData(Errors.NONE, 100000,
-                FetchResponse.INVALID_LSO, abortedTransactions, records));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, abortedTransactions, records));
         responseData.put(new TopicPartition("bar", 1), new FetchResponse.PartitionData(Errors.NONE, 900000,
-                5, null, records));
+                5, FetchResponse.INVALID_LOG_START_OFFSET, null, records));
         responseData.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(Errors.NONE, 70000,
-                6, Collections.<FetchResponse.AbortedTransaction>emptyList(), records));
+                6, FetchResponse.INVALID_LOG_START_OFFSET, Collections.<FetchResponse.AbortedTransaction>emptyList(), records));
 
         FetchResponse response = new FetchResponse(responseData, 10);
         FetchResponse deserialized = FetchResponse.parse(toBuffer(response.toStruct((short) 4)), (short) 4);
@@ -458,8 +458,8 @@ public class RequestResponseTest {
 
     private FetchRequest createFetchRequest(int version) {
         LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetchData = new LinkedHashMap<>();
-        fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 1000000));
-        fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 1000000));
+        fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 0L, 1000000));
+        fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 0L, 1000000));
         return FetchRequest.Builder.forConsumer(100, 100000, fetchData).setMaxBytes(1000).build((short) version);
     }
 
@@ -467,12 +467,12 @@ public class RequestResponseTest {
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>();
         MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("blah".getBytes()));
         responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE,
-                1000000, FetchResponse.INVALID_LSO, null, records));
+                1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records));
 
         List<FetchResponse.AbortedTransaction> abortedTransactions = Collections.singletonList(
                 new FetchResponse.AbortedTransaction(234L, 999L));
         responseData.put(new TopicPartition("test", 1), new FetchResponse.PartitionData(Errors.NONE,
-                1000000, FetchResponse.INVALID_LSO, abortedTransactions, MemoryRecords.EMPTY));
+                1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, abortedTransactions, MemoryRecords.EMPTY));
 
         return new FetchResponse(responseData, 25);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 45ba58b..8a9660b 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -12,16 +12,18 @@
  */
 package kafka.admin
 
+import java.io.IOException
 import java.nio.ByteBuffer
 import java.util.{Collections, Properties}
 import java.util.concurrent.atomic.AtomicInteger
-
+import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit, Future}
+import kafka.admin.AdminClient.DeleteRecordsResult
 import kafka.common.KafkaException
 import kafka.coordinator.GroupOverview
 import kafka.utils.Logging
 
 import org.apache.kafka.clients._
-import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture}
+import org.apache.kafka.clients.consumer.internals.{RequestFutureAdapter, ConsumerNetworkClient, ConsumerProtocol, RequestFuture}
 import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
 import org.apache.kafka.common.errors.TimeoutException
@@ -32,7 +34,7 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
 import org.apache.kafka.common.requests.DescribeGroupsResponse.GroupMetadata
 import org.apache.kafka.common.requests.OffsetFetchResponse
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.{KafkaThread, Time, Utils}
 import org.apache.kafka.common.{Cluster, Node, TopicPartition}
 
 import scala.collection.JavaConverters._
@@ -40,18 +42,44 @@ import scala.util.Try
 
 class AdminClient(val time: Time,
                   val requestTimeoutMs: Int,
-                  val retryBackoffMs: Int,
+                  val retryBackoffMs: Long,
                   val client: ConsumerNetworkClient,
                   val bootstrapBrokers: List[Node]) extends Logging {
 
+  @volatile var running: Boolean = true
+  val pendingFutures = new ConcurrentLinkedQueue[RequestFuture[ClientResponse]]()
+
+  val networkThread = new KafkaThread("admin-client-network-thread", new Runnable {
+    override def run() {
+      try {
+        while (running) {
+          client.poll(Long.MaxValue)
+        }
+      } catch {
+        case t : Throwable =>
+          error("admin-client-network-thread exited", t)
+      } finally {
+        pendingFutures.asScala.foreach { future =>
+          try {
+            future.raise(Errors.UNKNOWN)
+          } catch {
+            case _: IllegalStateException => // It is OK if the future has been completed
+          }
+        }
+        pendingFutures.clear()
+      }
+    }
+  }, true)
+
+  networkThread.start()
+
   private def send(target: Node,
                    api: ApiKeys,
                    request: AbstractRequest.Builder[_ <: AbstractRequest]): AbstractResponse = {
-    var future: RequestFuture[ClientResponse] = null
-
-    future = client.send(target, request)
-    client.poll(future)
-
+    val future: RequestFuture[ClientResponse] = client.send(target, request)
+    pendingFutures.add(future)
+    future.awaitDone(Long.MaxValue, TimeUnit.MILLISECONDS)
+    pendingFutures.remove(future)
     if (future.succeeded())
       future.value().responseBody()
     else
@@ -163,6 +191,73 @@ class AdminClient(val time: Time,
       broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker).asJava))
     }.toMap
 
+  /*
+   * Remove all the messages whose offset is smaller than the given offset of the corresponding partition
+   *
+   * DeleteRecordsResult contains either lowWatermark of the partition or exception. We list the possible exception
+   * and their interpretations below:
+   *
+   * - DisconnectException if leader node of the partition is not available. Need retry by user.
+   * - PolicyViolationException if the topic is configured as non-deletable.
+   * - TopicAuthorizationException if the topic doesn't exist and the user doesn't have the authority to create the topic
+   * - TimeoutException if response is not available within the timeout specified by either Future's timeout or AdminClient's request timeout
+   * - UnknownTopicOrPartitionException if the partition doesn't exist or if the user doesn't have the authority to describe the topic
+   * - NotLeaderForPartitionException if broker is not leader of the partition. Need retry by user.
+   * - OffsetOutOfRangeException if the offset is larger than high watermark of this partition
+   *
+   */
+
+  def deleteRecordsBefore(offsets: Map[TopicPartition, Long]): Future[Map[TopicPartition, DeleteRecordsResult]] = {
+    val metadataRequest = new MetadataRequest.Builder(offsets.keys.map(_.topic()).toSet.toList.asJava)
+    val response = sendAnyNode(ApiKeys.METADATA, metadataRequest).asInstanceOf[MetadataResponse]
+    val errors = response.errors
+    if (!errors.isEmpty)
+      error(s"Metadata request contained errors: $errors")
+
+    val (partitionsWithoutError, partitionsWithError) = offsets.partition{ partitionAndOffset =>
+      !response.errors().containsKey(partitionAndOffset._1.topic())}
+
+    val (partitionsWithLeader, partitionsWithoutLeader) = partitionsWithoutError.partition{ partitionAndOffset =>
+      response.cluster().leaderFor(partitionAndOffset._1) != null}
+
+    val partitionsWithErrorResults = partitionsWithError.keys.map( partition =>
+      partition -> DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, response.errors().get(partition.topic()).exception())).toMap
+
+    val partitionsWithoutLeaderResults = partitionsWithoutLeader.mapValues( _ =>
+      DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.LEADER_NOT_AVAILABLE.exception()))
+
+    val partitionsGroupByLeader = partitionsWithLeader.groupBy(partitionAndOffset =>
+      response.cluster().leaderFor(partitionAndOffset._1))
+
+    // prepare requests and generate Future objects
+    val futures = partitionsGroupByLeader.map{ case (node, partitionAndOffsets) =>
+      val convertedMap: java.util.Map[TopicPartition, java.lang.Long] = partitionAndOffsets.mapValues(_.asInstanceOf[java.lang.Long]).asJava
+      val future = client.send(node, new DeleteRecordsRequest.Builder(requestTimeoutMs, convertedMap))
+      pendingFutures.add(future)
+      future.compose(new RequestFutureAdapter[ClientResponse, Map[TopicPartition, DeleteRecordsResult]]() {
+          override def onSuccess(response: ClientResponse, future: RequestFuture[Map[TopicPartition, DeleteRecordsResult]]) {
+            val deleteRecordsResponse = response.responseBody().asInstanceOf[DeleteRecordsResponse]
+            val result = deleteRecordsResponse.responses().asScala.mapValues(v => DeleteRecordsResult(v.lowWatermark, v.error.exception())).toMap
+            future.complete(result)
+            pendingFutures.remove(future)
+          }
+
+          override def onFailure(e: RuntimeException, future: RequestFuture[Map[TopicPartition, DeleteRecordsResult]]) {
+            val result = partitionAndOffsets.mapValues(_ => DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, e))
+            future.complete(result)
+            pendingFutures.remove(future)
+          }
+
+        })
+    }
+
+    // default output if not receiving DeleteRecordsResponse before timeout
+    val defaultResults = offsets.mapValues(_ =>
+      DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.REQUEST_TIMED_OUT.exception())) ++ partitionsWithErrorResults ++ partitionsWithoutLeaderResults
+
+    new CompositeFuture(time, defaultResults, futures.toList)
+  }
+
   /**
    * Case class used to represent a consumer of a consumer group
    */
@@ -221,11 +316,53 @@ class AdminClient(val time: Time,
   }
 
   def close() {
-    client.close()
+    running = false
+    try {
+      client.close()
+    } catch {
+      case e: IOException =>
+        error("Exception closing nioSelector:", e)
+    }
   }
 
 }
 
+/*
+ * CompositeFuture assumes that the future object in the futures list does not raise error
+ */
+class CompositeFuture[T](time: Time,
+                         defaultResults: Map[TopicPartition, T],
+                         futures: List[RequestFuture[Map[TopicPartition, T]]]) extends Future[Map[TopicPartition, T]] {
+
+  override def isCancelled = false
+
+  override def cancel(interrupt: Boolean) = false
+
+  override def get(): Map[TopicPartition, T] = {
+    get(Long.MaxValue, TimeUnit.MILLISECONDS)
+  }
+
+  override def get(timeout: Long, unit: TimeUnit): Map[TopicPartition, T] = {
+    val start: Long = time.milliseconds()
+    val timeoutMs = unit.toMillis(timeout)
+    var remaining: Long = timeoutMs
+
+    val observedResults = futures.flatMap{ future =>
+      val elapsed = time.milliseconds() - start
+      remaining = if (timeoutMs - elapsed > 0) timeoutMs - elapsed else 0L
+
+      if (future.awaitDone(remaining, TimeUnit.MILLISECONDS)) future.value()
+      else Map.empty[TopicPartition, T]
+    }.toMap
+
+    defaultResults ++ observedResults
+  }
+
+  override def isDone: Boolean = {
+    futures.forall(_.isDone)
+  }
+}
+
 object AdminClient {
   val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
   val DefaultRequestTimeoutMs = 5000
@@ -249,11 +386,25 @@ object AdminClient {
         CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
         ConfigDef.Importance.MEDIUM,
         CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+      .define(
+        CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG,
+        ConfigDef.Type.INT,
+        DefaultRequestTimeoutMs,
+        ConfigDef.Importance.MEDIUM,
+        CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
+      .define(
+        CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG,
+        ConfigDef.Type.LONG,
+        DefaultRetryBackoffMs,
+        ConfigDef.Importance.MEDIUM,
+        CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
       .withClientSslSupport()
       .withClientSaslSupport()
     config
   }
 
+  case class DeleteRecordsResult(lowWatermark: Long, error: Exception)
+
   class AdminConfig(originals: Map[_,_]) extends AbstractConfig(AdminConfigDef, originals.asJava, false)
 
   def createSimplePlaintext(brokerUrl: String): AdminClient = {
@@ -270,6 +421,8 @@ object AdminClient {
     val metrics = new Metrics(time)
     val metadata = new Metadata
     val channelBuilder = ClientUtils.createChannelBuilder(config)
+    val requestTimeoutMs = config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG)
+    val retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG)
 
     val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
     val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls)
@@ -291,7 +444,7 @@ object AdminClient {
       DefaultReconnectBackoffMs,
       DefaultSendBufferBytes,
       DefaultReceiveBufferBytes,
-      DefaultRequestTimeoutMs,
+      requestTimeoutMs,
       time,
       true,
       new ApiVersions)
@@ -300,13 +453,13 @@ object AdminClient {
       networkClient,
       metadata,
       time,
-      DefaultRetryBackoffMs,
-      DefaultRequestTimeoutMs)
+      retryBackoffMs,
+      requestTimeoutMs.toLong)
 
     new AdminClient(
       time,
-      DefaultRequestTimeoutMs,
-      DefaultRetryBackoffMs,
+      requestTimeoutMs,
+      retryBackoffMs,
       highLevelClient,
       bootstrapCluster.nodes.asScala.toList)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
index ac94a7e..b87c856 100644
--- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -48,6 +48,7 @@ object BrokerApiVersionsCommand {
         case Failure(v) => out.print(s"${broker} -> ERROR: ${v}\n")
       }
     }
+    adminClient.close()
   }
 
   private def createAdminClient(opts: BrokerVersionCommandOptions): AdminClient = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
new file mode 100644
index 0000000..b85a6ff
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import java.io.PrintStream
+import java.util.Properties
+
+import kafka.admin.AdminClient.DeleteRecordsResult
+import kafka.common.AdminCommandFailedException
+import kafka.utils.{CoreUtils, Json, CommandLineUtils}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.clients.CommonClientConfigs
+import joptsimple._
+
+import scala.util.{Failure, Success}
+
+/**
+ * A command for delete records of the given partitions down to the specified offset.
+ */
+object DeleteRecordsCommand {
+
+  def main(args: Array[String]): Unit = {
+    execute(args, System.out)
+  }
+
+  def parseOffsetJsonStringWithoutDedup(jsonData: String): Seq[(TopicPartition, Long)] = {
+    Json.parseFull(jsonData) match {
+      case Some(m) =>
+        m.asInstanceOf[Map[String, Any]].get("partitions") match {
+          case Some(partitionsSeq) =>
+            partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].map(p => {
+              val topic = p.get("topic").get.asInstanceOf[String]
+              val partition = p.get("partition").get.asInstanceOf[Int]
+              val offset = p.get("offset").get.asInstanceOf[Int].toLong
+              new TopicPartition(topic, partition) -> offset
+            })
+          case None =>
+            Seq.empty
+        }
+      case None =>
+        Seq.empty
+    }
+  }
+
+  def execute(args: Array[String], out: PrintStream): Unit = {
+    val opts = new DeleteRecordsCommandOptions(args)
+    val adminClient = createAdminClient(opts)
+    val offsetJsonFile =  opts.options.valueOf(opts.offsetJsonFileOpt)
+    val offsetJsonString = Utils.readFileAsString(offsetJsonFile)
+    val offsetSeq = parseOffsetJsonStringWithoutDedup(offsetJsonString)
+
+    val duplicatePartitions = CoreUtils.duplicates(offsetSeq.map { case (tp, _) => tp })
+    if (duplicatePartitions.nonEmpty)
+      throw new AdminCommandFailedException("Offset json file contains duplicate topic partitions: %s".format(duplicatePartitions.mkString(",")))
+
+    out.println("Executing records delete operation")
+    val deleteRecordsResult: Map[TopicPartition, DeleteRecordsResult] = adminClient.deleteRecordsBefore(offsetSeq.toMap).get()
+    out.println("Records delete operation completed:")
+
+    deleteRecordsResult.foreach{ case (tp, partitionResult) => {
+      if (partitionResult.error == null)
+        out.println(s"partition: $tp\tlow_watermark: ${partitionResult.lowWatermark}")
+      else
+        out.println(s"partition: $tp\terror: ${partitionResult.error.toString}")
+    }}
+    adminClient.close()
+  }
+
+  private def createAdminClient(opts: DeleteRecordsCommandOptions): AdminClient = {
+    val props = if (opts.options.has(opts.commandConfigOpt))
+      Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
+    else
+      new Properties()
+    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
+    AdminClient.create(props)
+  }
+
+  class DeleteRecordsCommandOptions(args: Array[String]) {
+    val BootstrapServerDoc = "REQUIRED: The server to connect to."
+    val offsetJsonFileDoc = "REQUIRED: The JSON file with offset per partition. The format to use is:\n" +
+                                 "{\"partitions\":\n  [{\"topic\": \"foo\", \"partition\": 1, \"offset\": 1}],\n \"version\":1\n}"
+    val CommandConfigDoc = "A property file containing configs to be passed to Admin Client."
+
+    val parser = new OptionParser
+    val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc)
+                                   .withRequiredArg
+                                   .describedAs("server(s) to use for bootstrapping")
+                                   .ofType(classOf[String])
+    val offsetJsonFileOpt = parser.accepts("offset-json-file", offsetJsonFileDoc)
+                                   .withRequiredArg
+                                   .describedAs("Offset json file path")
+                                   .ofType(classOf[String])
+    val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
+                                   .withRequiredArg
+                                   .describedAs("command config property file path")
+                                   .ofType(classOf[String])
+
+    val options = parser.parse(args : _*)
+    CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, offsetJsonFileOpt)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index 2ed6452..6101d2a 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -65,7 +65,9 @@ object ApiVersion {
     "0.10.2" -> KAFKA_0_10_2_IV0,
     // KIP-98 (idempotent and transactional producer support)
     "0.11.0-IV0" -> KAFKA_0_11_0_IV0,
-    "0.11.0" -> KAFKA_0_11_0_IV0
+    // introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107
+    "0.11.0-IV1" -> KAFKA_0_11_0_IV1,
+    "0.11.0" -> KAFKA_0_11_0_IV1
   )
 
   private val versionPattern = "\\.".r
@@ -155,3 +157,9 @@ case object KAFKA_0_11_0_IV0 extends ApiVersion {
   val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
   val id: Int = 10
 }
+
+case object KAFKA_0_11_0_IV1 extends ApiVersion {
+  val version: String = "0.11.0-IV0"
+  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+  val id: Int = 10
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index f91a3c3..39da605 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -204,7 +204,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
     requestInfo.foreach { case (TopicAndPartition(topic, partition), _) =>
       responseData.put(new TopicPartition(topic, partition),
         new JFetchResponse.PartitionData(Errors.forException(e), JFetchResponse.INVALID_HIGHWATERMARK,
-          JFetchResponse.INVALID_LSO, null, MemoryRecords.EMPTY))
+          JFetchResponse.INVALID_LAST_STABLE_OFFSET, JFetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY))
     }
     val errorResponse = new JFetchResponse(responseData, 0)
     // Magic value does not matter here because the message set is empty


Mime
View raw message