kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5534; KafkaConsumer `offsetForTimes` result should include partitions with no offset
Date Fri, 21 Jul 2017 00:38:49 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk cdff011fe -> 5bb53e034


KAFKA-5534; KafkaConsumer `offsetForTimes` result should include partitions with no offset

For topics that support timestamp search, if no offset is found for a partition, the partition should still be included in the result with a `null` offset value. This `KafkaConsumer` method currently excludes such partitions from the result.

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>

Closes #3460 from vahidhashemian/KAFKA-5534


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5bb53e03
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5bb53e03
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5bb53e03

Branch: refs/heads/trunk
Commit: 5bb53e034e4f8a06550dd06377fae7b3c2137ce2
Parents: cdff011
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Authored: Thu Jul 20 17:31:24 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Thu Jul 20 17:38:30 2017 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     |  22 +-
 .../clients/consumer/internals/FetcherTest.java | 590 ++++++++++---------
 .../kafka/admin/ConsumerGroupCommand.scala      |   2 +-
 .../kafka/api/LegacyAdminClientTest.scala       |   6 +
 docs/upgrade.html                               |   2 +
 5 files changed, 327 insertions(+), 295 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5bb53e03/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 2703823..f5ebac5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -429,14 +429,17 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
     public Map<TopicPartition, OffsetAndTimestamp> getOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch,
                                                                      long timeout) {
         Map<TopicPartition, OffsetData> offsetData = retrieveOffsetsByTimes(timestampsToSearch, timeout, true);
-        HashMap<TopicPartition, OffsetAndTimestamp> offsetsByTimes = new HashMap<>(offsetData.size());
+
+        HashMap<TopicPartition, OffsetAndTimestamp> offsetsByTimes = new HashMap<>(timestampsToSearch.size());
+        for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet())
+            offsetsByTimes.put(entry.getKey(), null);
+
         for (Map.Entry<TopicPartition, OffsetData> entry : offsetData.entrySet()) {
-            OffsetData data = entry.getValue();
-            if (data == null)
-                offsetsByTimes.put(entry.getKey(), null);
-            else
-                offsetsByTimes.put(entry.getKey(), new OffsetAndTimestamp(data.offset, data.timestamp));
+            // 'entry.getValue().timestamp' will not be null since we are guaranteed
+            // to work with a v1 (or later) ListOffset request
+            offsetsByTimes.put(entry.getKey(), new OffsetAndTimestamp(entry.getValue().offset, entry.getValue().timestamp));
         }
+
         return offsetsByTimes;
     }
 
@@ -683,8 +686,10 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
      * @param listOffsetResponse The response from the server.
      * @param future The future to be completed when the response returns. Note that any partition-level errors will
      *               generally fail the entire future result. The one exception is UNSUPPORTED_FOR_MESSAGE_FORMAT,
-     *               which indicates that the broker does not support the v1 message format and results in a null
-     *               being inserted into the resulting map.
+     *               which indicates that the broker does not support the v1 message format. Partitions with this
+     *               particular error are simply left out of the future map. Note that the corresponding timestamp
+     *               value of each partition may be null only for v0. In v1 and later the ListOffset API would not
+     *               return a null timestamp (-1 is returned instead when necessary).
      */
     @SuppressWarnings("deprecation")
     private void handleListOffsetResponse(Map<TopicPartition, Long> timestampsToSearch,
@@ -727,7 +732,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                 // The message format on the broker side is before 0.10.0, we simply put null in the response.
                 log.debug("Cannot search by timestamp for partition {} because the message format version " +
                         "is before 0.10.0", topicPartition);
-                timestampOffsetMap.put(topicPartition, null);
             } else if (error == Errors.NOT_LEADER_FOR_PARTITION) {
                 log.debug("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
                         topicPartition);

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bb53e03/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 0801979..c0edcfd 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -111,8 +111,8 @@ public class FetcherTest {
     private String topicName = "test";
     private String groupId = "test-group";
     private final String metricGroup = "consumer" + groupId + "-fetch-manager-metrics";
-    private TopicPartition tp1 = new TopicPartition(topicName, 0);
-    private TopicPartition tp2 = new TopicPartition(topicName, 1);
+    private TopicPartition tp0 = new TopicPartition(topicName, 0);
+    private TopicPartition tp1 = new TopicPartition(topicName, 1);
     private int minBytes = 1;
     private int maxBytes = Integer.MAX_VALUE;
     private int maxWaitMs = 0;
@@ -164,23 +164,23 @@ public class FetcherTest {
 
     @Test
     public void testFetchNormal() {
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 0);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
         assertFalse(fetcher.hasCompletedFetches());
 
-        client.prepareResponse(fetchResponse(tp1, this.records, Errors.NONE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
         consumerClient.poll(0);
         assertTrue(fetcher.hasCompletedFetches());
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetcher.fetchedRecords();
-        assertTrue(partitionRecords.containsKey(tp1));
+        assertTrue(partitionRecords.containsKey(tp0));
 
-        List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp1);
+        List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp0);
         assertEquals(3, records.size());
-        assertEquals(4L, subscriptions.position(tp1).longValue()); // this is the next fetching position
+        assertEquals(4L, subscriptions.position(tp0).longValue()); // this is the next fetching position
         long offset = 1;
         for (ConsumerRecord<byte[], byte[]> record : records) {
             assertEquals(offset, record.offset());
@@ -190,8 +190,8 @@ public class FetcherTest {
 
     @Test
     public void testFetcherIgnoresControlRecords() {
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 0);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
@@ -213,16 +213,16 @@ public class FetcherTest {
 
         buffer.flip();
 
-        client.prepareResponse(fetchResponse(tp1, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp0, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
         consumerClient.poll(0);
         assertTrue(fetcher.hasCompletedFetches());
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetcher.fetchedRecords();
-        assertTrue(partitionRecords.containsKey(tp1));
+        assertTrue(partitionRecords.containsKey(tp0));
 
-        List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp1);
+        List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp0);
         assertEquals(1, records.size());
-        assertEquals(2L, subscriptions.position(tp1).longValue());
+        assertEquals(2L, subscriptions.position(tp0).longValue());
 
         ConsumerRecord<byte[], byte[]> record = records.get(0);
         assertArrayEquals("key".getBytes(), record.key());
@@ -230,18 +230,18 @@ public class FetcherTest {
 
     @Test
     public void testFetchError() {
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 0);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
 
         assertEquals(1, fetcher.sendFetches());
         assertFalse(fetcher.hasCompletedFetches());
 
-        client.prepareResponse(fetchResponse(tp1, this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0));
+        client.prepareResponse(fetchResponse(tp0, this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0));
         consumerClient.poll(0);
         assertTrue(fetcher.hasCompletedFetches());
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetcher.fetchedRecords();
-        assertFalse(partitionRecords.containsKey(tp1));
+        assertFalse(partitionRecords.containsKey(tp0));
     }
 
     private MockClient.RequestMatcher matchesOffset(final TopicPartition tp, final long offset) {
@@ -274,10 +274,10 @@ public class FetcherTest {
 
         Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), deserializer, deserializer);
 
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 1);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 1);
 
-        client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(tp1, this.records, Errors.NONE, 100L, 0));
+        client.prepareResponse(matchesOffset(tp0, 1), fetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
 
         assertEquals(1, fetcher.sendFetches());
         consumerClient.poll(0);
@@ -288,7 +288,7 @@ public class FetcherTest {
                 fail("fetchedRecords should have raised");
             } catch (SerializationException e) {
                 // the position should not advance since no data has been returned
-                assertEquals(1, subscriptions.position(tp1).longValue());
+                assertEquals(1, subscriptions.position(tp0).longValue());
             }
         }
     }
@@ -325,17 +325,17 @@ public class FetcherTest {
 
         buffer.flip();
 
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 0);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(tp1, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp0, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
         consumerClient.poll(0);
 
         // the first fetchedRecords() should return the first valid message
-        assertEquals(1, fetcher.fetchedRecords().get(tp1).size());
-        assertEquals(1, subscriptions.position(tp1).longValue());
+        assertEquals(1, fetcher.fetchedRecords().get(tp0).size());
+        assertEquals(1, subscriptions.position(tp0).longValue());
 
         // the fetchedRecords() should always throw exception due to the second invalid message
         for (int i = 0; i < 2; i++) {
@@ -343,22 +343,22 @@ public class FetcherTest {
                 fetcher.fetchedRecords();
                 fail("fetchedRecords should have raised KafkaException");
             } catch (KafkaException e) {
-                assertEquals(1, subscriptions.position(tp1).longValue());
+                assertEquals(1, subscriptions.position(tp0).longValue());
             }
         }
 
         // Seek to skip the bad record and fetch again.
-        subscriptions.seek(tp1, 2);
+        subscriptions.seek(tp0, 2);
         // Should not throw exception after the seek.
         fetcher.fetchedRecords();
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(tp1, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp0, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
         consumerClient.poll(0);
 
-        List<ConsumerRecord<byte[], byte[]>> records = fetcher.fetchedRecords().get(tp1);
+        List<ConsumerRecord<byte[], byte[]>> records = fetcher.fetchedRecords().get(tp0);
         assertEquals(1, records.size());
         assertEquals(2L, records.get(0).offset());
-        assertEquals(3, subscriptions.position(tp1).longValue());
+        assertEquals(3, subscriptions.position(tp0).longValue());
     }
 
     @Test
@@ -380,12 +380,12 @@ public class FetcherTest {
         buffer.put("beef".getBytes());
         buffer.position(0);
 
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 0);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(tp1, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp0, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
         consumerClient.poll(0);
 
         // the fetchedRecords() should always throw exception due to the bad batch.
@@ -394,7 +394,7 @@ public class FetcherTest {
                 fetcher.fetchedRecords();
                 fail("fetchedRecords should have raised KafkaException");
             } catch (KafkaException e) {
-                assertEquals(0, subscriptions.position(tp1).longValue());
+                assertEquals(0, subscriptions.position(tp0).longValue());
             }
         }
     }
@@ -411,19 +411,19 @@ public class FetcherTest {
         // flip some bits to fail the crc
         buffer.putInt(32, buffer.get(32) ^ 87238423);
 
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 0);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(tp1, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp0, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
         consumerClient.poll(0);
         try {
             fetcher.fetchedRecords();
             fail("fetchedRecords should have raised");
         } catch (KafkaException e) {
             // the position should not advance since no data has been returned
-            assertEquals(0, subscriptions.position(tp1).longValue());
+            assertEquals(0, subscriptions.position(tp0).longValue());
         }
     }
 
@@ -446,14 +446,14 @@ public class FetcherTest {
         MemoryRecords memoryRecords = builder.build();
 
         List<ConsumerRecord<byte[], byte[]>> records;
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 1);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 1);
 
-        client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(tp1, memoryRecords, Errors.NONE, 100L, 0));
+        client.prepareResponse(matchesOffset(tp0, 1), fetchResponse(tp0, memoryRecords, Errors.NONE, 100L, 0));
 
         assertEquals(1, fetcher.sendFetches());
         consumerClient.poll(0);
-        records = fetcher.fetchedRecords().get(tp1);
+        records = fetcher.fetchedRecords().get(tp0);
 
         assertEquals(3, records.size());
 
@@ -476,32 +476,32 @@ public class FetcherTest {
         Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), 2);
 
         List<ConsumerRecord<byte[], byte[]>> records;
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 1);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 1);
 
-        client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(tp1, this.records, Errors.NONE, 100L, 0));
-        client.prepareResponse(matchesOffset(tp1, 4), fetchResponse(tp1, this.nextRecords, Errors.NONE, 100L, 0));
+        client.prepareResponse(matchesOffset(tp0, 1), fetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
+        client.prepareResponse(matchesOffset(tp0, 4), fetchResponse(tp0, this.nextRecords, Errors.NONE, 100L, 0));
 
         assertEquals(1, fetcher.sendFetches());
         consumerClient.poll(0);
-        records = fetcher.fetchedRecords().get(tp1);
+        records = fetcher.fetchedRecords().get(tp0);
         assertEquals(2, records.size());
-        assertEquals(3L, subscriptions.position(tp1).longValue());
+        assertEquals(3L, subscriptions.position(tp0).longValue());
         assertEquals(1, records.get(0).offset());
         assertEquals(2, records.get(1).offset());
 
         assertEquals(0, fetcher.sendFetches());
         consumerClient.poll(0);
-        records = fetcher.fetchedRecords().get(tp1);
+        records = fetcher.fetchedRecords().get(tp0);
         assertEquals(1, records.size());
-        assertEquals(4L, subscriptions.position(tp1).longValue());
+        assertEquals(4L, subscriptions.position(tp0).longValue());
         assertEquals(3, records.get(0).offset());
 
         assertTrue(fetcher.sendFetches() > 0);
         consumerClient.poll(0);
-        records = fetcher.fetchedRecords().get(tp1);
+        records = fetcher.fetchedRecords().get(tp0);
         assertEquals(2, records.size());
-        assertEquals(6L, subscriptions.position(tp1).longValue());
+        assertEquals(6L, subscriptions.position(tp0).longValue());
         assertEquals(4, records.get(0).offset());
         assertEquals(5, records.get(1).offset());
     }
@@ -516,31 +516,31 @@ public class FetcherTest {
         Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), 2);
 
         List<ConsumerRecord<byte[], byte[]>> records;
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 1);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 1);
 
         // Returns 3 records while `max.poll.records` is configured to 2
-        client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(tp1, this.records, Errors.NONE, 100L, 0));
+        client.prepareResponse(matchesOffset(tp0, 1), fetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
 
         assertEquals(1, fetcher.sendFetches());
         consumerClient.poll(0);
-        records = fetcher.fetchedRecords().get(tp1);
+        records = fetcher.fetchedRecords().get(tp0);
         assertEquals(2, records.size());
-        assertEquals(3L, subscriptions.position(tp1).longValue());
+        assertEquals(3L, subscriptions.position(tp0).longValue());
         assertEquals(1, records.get(0).offset());
         assertEquals(2, records.get(1).offset());
 
-        subscriptions.assignFromUser(singleton(tp2));
-        client.prepareResponse(matchesOffset(tp2, 4), fetchResponse(tp2, this.nextRecords, Errors.NONE, 100L, 0));
-        subscriptions.seek(tp2, 4);
+        subscriptions.assignFromUser(singleton(tp1));
+        client.prepareResponse(matchesOffset(tp1, 4), fetchResponse(tp1, this.nextRecords, Errors.NONE, 100L, 0));
+        subscriptions.seek(tp1, 4);
 
         assertEquals(1, fetcher.sendFetches());
         consumerClient.poll(0);
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords();
-        assertNull(fetchedRecords.get(tp1));
-        records = fetchedRecords.get(tp2);
+        assertNull(fetchedRecords.get(tp0));
+        records = fetchedRecords.get(tp1);
         assertEquals(2, records.size());
-        assertEquals(6L, subscriptions.position(tp2).longValue());
+        assertEquals(6L, subscriptions.position(tp1).longValue());
         assertEquals(4, records.get(0).offset());
         assertEquals(5, records.get(1).offset());
     }
@@ -558,16 +558,16 @@ public class FetcherTest {
         MemoryRecords records = builder.build();
 
         List<ConsumerRecord<byte[], byte[]>> consumerRecords;
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 0);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(tp1, records, Errors.NONE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp0, records, Errors.NONE, 100L, 0));
         consumerClient.poll(0);
-        consumerRecords = fetcher.fetchedRecords().get(tp1);
+        consumerRecords = fetcher.fetchedRecords().get(tp0);
         assertEquals(3, consumerRecords.size());
-        assertEquals(31L, subscriptions.position(tp1).longValue()); // this is the next fetching position
+        assertEquals(31L, subscriptions.position(tp0).longValue()); // this is the next fetching position
 
         assertEquals(15L, consumerRecords.get(0).offset());
         assertEquals(20L, consumerRecords.get(1).offset());
@@ -590,7 +590,7 @@ public class FetcherTest {
             } catch (RecordTooLargeException e) {
                 assertTrue(e.getMessage().startsWith("There are some messages at [Partition=Offset]: "));
                 // the position should not advance since no data has been returned
-                assertEquals(0, subscriptions.position(tp1).longValue());
+                assertEquals(0, subscriptions.position(tp0).longValue());
             }
         } finally {
             client.setNodeApiVersions(NodeApiVersions.create());
@@ -612,30 +612,30 @@ public class FetcherTest {
         } catch (KafkaException e) {
             assertTrue(e.getMessage().startsWith("Failed to make progress reading messages"));
             // the position should not advance since no data has been returned
-            assertEquals(0, subscriptions.position(tp1).longValue());
+            assertEquals(0, subscriptions.position(tp0).longValue());
         }
     }
 
     private void makeFetchRequestWithIncompleteRecord() {
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 0);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
         assertEquals(1, fetcher.sendFetches());
         assertFalse(fetcher.hasCompletedFetches());
         MemoryRecords partialRecord = MemoryRecords.readableRecords(
             ByteBuffer.wrap(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}));
-        client.prepareResponse(fetchResponse(tp1, partialRecord, Errors.NONE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp0, partialRecord, Errors.NONE, 100L, 0));
         consumerClient.poll(0);
         assertTrue(fetcher.hasCompletedFetches());
     }
 
     @Test
     public void testUnauthorizedTopic() {
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 0);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
 
         // resize the limit of the buffer to pretend it is only fetch-size large
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(tp1, this.records, Errors.TOPIC_AUTHORIZATION_FAILED, 100L, 0));
+        client.prepareResponse(fetchResponse(tp0, this.records, Errors.TOPIC_AUTHORIZATION_FAILED, 100L, 0));
         consumerClient.poll(0);
         try {
             fetcher.fetchedRecords();
@@ -648,14 +648,14 @@ public class FetcherTest {
     @Test
     public void testFetchDuringRebalance() {
         subscriptions.subscribe(singleton(topicName), listener);
-        subscriptions.assignFromSubscribed(singleton(tp1));
-        subscriptions.seek(tp1, 0);
+        subscriptions.assignFromSubscribed(singleton(tp0));
+        subscriptions.seek(tp0, 0);
 
         assertEquals(1, fetcher.sendFetches());
 
         // Now the rebalance happens and fetch positions are cleared
-        subscriptions.assignFromSubscribed(singleton(tp1));
-        client.prepareResponse(fetchResponse(tp1, this.records, Errors.NONE, 100L, 0));
+        subscriptions.assignFromSubscribed(singleton(tp0));
+        client.prepareResponse(fetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
         consumerClient.poll(0);
 
         // The active fetch should be ignored since its position is no longer valid
@@ -664,34 +664,34 @@ public class FetcherTest {
 
     @Test
     public void testInFlightFetchOnPausedPartition() {
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 0);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
 
         assertEquals(1, fetcher.sendFetches());
-        subscriptions.pause(tp1);
+        subscriptions.pause(tp0);
 
-        client.prepareResponse(fetchResponse(tp1, this.records, Errors.NONE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
         consumerClient.poll(0);
-        assertNull(fetcher.fetchedRecords().get(tp1));
+        assertNull(fetcher.fetchedRecords().get(tp0));
     }
 
     @Test
     public void testFetchOnPausedPartition() {
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 0);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
 
-        subscriptions.pause(tp1);
+        subscriptions.pause(tp0);
         assertFalse(fetcher.sendFetches() > 0);
         assertTrue(client.requests().isEmpty());
     }
 
     @Test
     public void testFetchNotLeaderForPartition() {
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 0);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
 
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(tp1, this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0));
+        client.prepareResponse(fetchResponse(tp0, this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0));
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
         assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
@@ -699,11 +699,11 @@ public class FetcherTest {
 
     @Test
     public void testFetchUnknownTopicOrPartition() {
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 0);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
 
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(tp1, this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION, 100L, 0));
+        client.prepareResponse(fetchResponse(tp0, this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION, 100L, 0));
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
         assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
@@ -711,62 +711,62 @@ public class FetcherTest {
 
     @Test
     public void testFetchOffsetOutOfRange() {
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 0);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
 
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(tp1, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
-        assertTrue(subscriptions.isOffsetResetNeeded(tp1));
-        assertEquals(null, subscriptions.position(tp1));
+        assertTrue(subscriptions.isOffsetResetNeeded(tp0));
+        assertEquals(null, subscriptions.position(tp0));
     }
 
     @Test
     public void testStaleOutOfRangeError() {
         // verify that an out of range error which arrives after a seek
         // does not cause us to reset our position or throw an exception
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 0);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
 
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(tp1, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
-        subscriptions.seek(tp1, 1);
+        client.prepareResponse(fetchResponse(tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
+        subscriptions.seek(tp0, 1);
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
-        assertFalse(subscriptions.isOffsetResetNeeded(tp1));
-        assertEquals(1, subscriptions.position(tp1).longValue());
+        assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+        assertEquals(1, subscriptions.position(tp0).longValue());
     }
 
     @Test
     public void testFetchedRecordsAfterSeek() {
-        subscriptionsNoAutoReset.assignFromUser(singleton(tp1));
-        subscriptionsNoAutoReset.seek(tp1, 0);
+        subscriptionsNoAutoReset.assignFromUser(singleton(tp0));
+        subscriptionsNoAutoReset.seek(tp0, 0);
 
         assertTrue(fetcherNoAutoReset.sendFetches() > 0);
-        client.prepareResponse(fetchResponse(tp1, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
         consumerClient.poll(0);
-        assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp1));
-        subscriptionsNoAutoReset.seek(tp1, 2);
+        assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp0));
+        subscriptionsNoAutoReset.seek(tp0, 2);
         assertEquals(0, fetcherNoAutoReset.fetchedRecords().size());
     }
 
     @Test
     public void testFetchOffsetOutOfRangeException() {
-        subscriptionsNoAutoReset.assignFromUser(singleton(tp1));
-        subscriptionsNoAutoReset.seek(tp1, 0);
+        subscriptionsNoAutoReset.assignFromUser(singleton(tp0));
+        subscriptionsNoAutoReset.seek(tp0, 0);
 
         fetcherNoAutoReset.sendFetches();
-        client.prepareResponse(fetchResponse(tp1, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
         consumerClient.poll(0);
 
-        assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp1));
+        assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp0));
         for (int i = 0; i < 2; i++) {
             try {
                 fetcherNoAutoReset.fetchedRecords();
                 fail("Should have thrown OffsetOutOfRangeException");
             } catch (OffsetOutOfRangeException e) {
-                assertTrue(e.offsetOutOfRangePartitions().containsKey(tp1));
+                assertTrue(e.offsetOutOfRangePartitions().containsKey(tp0));
                 assertEquals(e.offsetOutOfRangePartitions().size(), 1);
             }
         }
@@ -776,16 +776,16 @@ public class FetcherTest {
     public void testFetchPositionAfterException() {
         // verify the advancement in the next fetch offset equals to the number of fetched records when
         // some fetched partitions cause Exception. This ensures that consumer won't lose record upon exception
-        subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp1, tp2));
+        subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp0, tp1));
+        subscriptionsNoAutoReset.seek(tp0, 1);
         subscriptionsNoAutoReset.seek(tp1, 1);
-        subscriptionsNoAutoReset.seek(tp2, 1);
 
         assertEquals(1, fetcherNoAutoReset.sendFetches());
 
         Map<TopicPartition, FetchResponse.PartitionData> partitions = new LinkedHashMap<>();
-        partitions.put(tp2, new FetchResponse.PartitionData(Errors.NONE, 100,
+        partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100,
             FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, records));
-        partitions.put(tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
+        partitions.put(tp0, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
                 FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
         client.prepareResponse(new FetchResponse(new LinkedHashMap<>(partitions), 0));
         consumerClient.poll(0);
@@ -796,7 +796,7 @@ public class FetcherTest {
         for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values())
             fetchedRecords.addAll(records);
 
-        assertEquals(fetchedRecords.size(), subscriptionsNoAutoReset.position(tp2) - 1);
+        assertEquals(fetchedRecords.size(), subscriptionsNoAutoReset.position(tp1) - 1);
 
         try {
             for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values())
@@ -805,13 +805,13 @@ public class FetcherTest {
             exceptions.add(e);
         }
 
-        assertEquals(4, subscriptionsNoAutoReset.position(tp2).longValue());
+        assertEquals(4, subscriptionsNoAutoReset.position(tp1).longValue());
         assertEquals(3, fetchedRecords.size());
 
         // Should have received one OffsetOutOfRangeException for partition tp1
         assertEquals(1, exceptions.size());
         OffsetOutOfRangeException e = exceptions.get(0);
-        assertTrue(e.offsetOutOfRangePartitions().containsKey(tp1));
+        assertTrue(e.offsetOutOfRangePartitions().containsKey(tp0));
         assertEquals(e.offsetOutOfRangePartitions().size(), 1);
     }
 
@@ -819,51 +819,51 @@ public class FetcherTest {
     public void testSeekBeforeException() {
         Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptionsNoAutoReset, new Metrics(time), 2);
 
-        subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp1));
-        subscriptionsNoAutoReset.seek(tp1, 1);
+        subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp0));
+        subscriptionsNoAutoReset.seek(tp0, 1);
         assertEquals(1, fetcher.sendFetches());
         Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>();
-        partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100,
+        partitions.put(tp0, new FetchResponse.PartitionData(Errors.NONE, 100,
             FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, records));
-        client.prepareResponse(fetchResponse(tp1, this.records, Errors.NONE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
         consumerClient.poll(0);
 
-        assertEquals(2, fetcher.fetchedRecords().get(tp1).size());
+        assertEquals(2, fetcher.fetchedRecords().get(tp0).size());
 
-        subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp1, tp2));
-        subscriptionsNoAutoReset.seek(tp2, 1);
+        subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp0, tp1));
+        subscriptionsNoAutoReset.seek(tp1, 1);
         assertEquals(1, fetcher.sendFetches());
         partitions = new HashMap<>();
-        partitions.put(tp2, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
+        partitions.put(tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
             FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
         client.prepareResponse(new FetchResponse(new LinkedHashMap<>(partitions), 0));
         consumerClient.poll(0);
-        assertEquals(1, fetcher.fetchedRecords().get(tp1).size());
+        assertEquals(1, fetcher.fetchedRecords().get(tp0).size());
 
-        subscriptionsNoAutoReset.seek(tp2, 10);
+        subscriptionsNoAutoReset.seek(tp1, 10);
         // Should not throw OffsetOutOfRangeException after the seek
         assertEquals(0, fetcher.fetchedRecords().size());
     }
 
     @Test
     public void testFetchDisconnected() {
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 0);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
 
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(tp1, this.records, Errors.NONE, 100L, 0), true);
+        client.prepareResponse(fetchResponse(tp0, this.records, Errors.NONE, 100L, 0), true);
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
 
         // disconnects should have no affect on subscription state
-        assertFalse(subscriptions.isOffsetResetNeeded(tp1));
-        assertTrue(subscriptions.isFetchable(tp1));
-        assertEquals(0, subscriptions.position(tp1).longValue());
+        assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+        assertTrue(subscriptions.isFetchable(tp0));
+        assertEquals(0, subscriptions.position(tp0).longValue());
     }
 
     @Test
     public void testUpdateFetchPositionsNoneCommittedNoResetStrategy() {
-        Set<TopicPartition> tps = new HashSet<>(Arrays.asList(tp1, tp2));
+        Set<TopicPartition> tps = new HashSet<>(Arrays.asList(tp0, tp1));
         subscriptionsNoAutoReset.assignFromUser(tps);
         try {
             fetcherNoAutoReset.updateFetchPositions(tps);
@@ -879,38 +879,38 @@ public class FetcherTest {
     public void testUpdateFetchPositionToCommitted() {
         // unless a specific reset is expected, the default behavior is to reset to the committed
         // position if one is present
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.committed(tp1, new OffsetAndMetadata(5));
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.committed(tp0, new OffsetAndMetadata(5));
 
-        fetcher.updateFetchPositions(singleton(tp1));
-        assertTrue(subscriptions.isFetchable(tp1));
-        assertEquals(5, subscriptions.position(tp1).longValue());
+        fetcher.updateFetchPositions(singleton(tp0));
+        assertTrue(subscriptions.isFetchable(tp0));
+        assertEquals(5, subscriptions.position(tp0).longValue());
     }
 
     @Test
     public void testUpdateFetchPositionResetToDefaultOffset() {
-        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.assignFromUser(singleton(tp0));
         // with no commit position, we should reset using the default strategy defined above (EARLIEST)
 
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
                                listOffsetResponse(Errors.NONE, 1L, 5L));
-        fetcher.updateFetchPositions(singleton(tp1));
-        assertFalse(subscriptions.isOffsetResetNeeded(tp1));
-        assertTrue(subscriptions.isFetchable(tp1));
-        assertEquals(5, subscriptions.position(tp1).longValue());
+        fetcher.updateFetchPositions(singleton(tp0));
+        assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+        assertTrue(subscriptions.isFetchable(tp0));
+        assertEquals(5, subscriptions.position(tp0).longValue());
     }
 
     @Test
     public void testUpdateFetchPositionResetToLatestOffset() {
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.needOffsetReset(tp0, OffsetResetStrategy.LATEST);
 
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
                                listOffsetResponse(Errors.NONE, 1L, 5L));
-        fetcher.updateFetchPositions(singleton(tp1));
-        assertFalse(subscriptions.isOffsetResetNeeded(tp1));
-        assertTrue(subscriptions.isFetchable(tp1));
-        assertEquals(5, subscriptions.position(tp1).longValue());
+        fetcher.updateFetchPositions(singleton(tp0));
+        assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+        assertTrue(subscriptions.isFetchable(tp0));
+        assertEquals(5, subscriptions.position(tp0).longValue());
     }
 
     @Test
@@ -919,8 +919,8 @@ public class FetcherTest {
             Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(),
                     new ByteArrayDeserializer(), Integer.MAX_VALUE, isolationLevel);
 
-            subscriptions.assignFromUser(singleton(tp1));
-            subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST);
+            subscriptions.assignFromUser(singleton(tp0));
+            subscriptions.needOffsetReset(tp0, OffsetResetStrategy.LATEST);
 
             client.prepareResponse(new MockClient.RequestMatcher() {
                 @Override
@@ -929,30 +929,30 @@ public class FetcherTest {
                     return request.isolationLevel() == isolationLevel;
                 }
             }, listOffsetResponse(Errors.NONE, 1L, 5L));
-            fetcher.updateFetchPositions(singleton(tp1));
-            assertFalse(subscriptions.isOffsetResetNeeded(tp1));
-            assertTrue(subscriptions.isFetchable(tp1));
-            assertEquals(5, subscriptions.position(tp1).longValue());
+            fetcher.updateFetchPositions(singleton(tp0));
+            assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+            assertTrue(subscriptions.isFetchable(tp0));
+            assertEquals(5, subscriptions.position(tp0).longValue());
         }
     }
 
     @Test
     public void testUpdateFetchPositionResetToEarliestOffset() {
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.needOffsetReset(tp1, OffsetResetStrategy.EARLIEST);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.needOffsetReset(tp0, OffsetResetStrategy.EARLIEST);
 
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
                                listOffsetResponse(Errors.NONE, 1L, 5L));
-        fetcher.updateFetchPositions(singleton(tp1));
-        assertFalse(subscriptions.isOffsetResetNeeded(tp1));
-        assertTrue(subscriptions.isFetchable(tp1));
-        assertEquals(5, subscriptions.position(tp1).longValue());
+        fetcher.updateFetchPositions(singleton(tp0));
+        assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+        assertTrue(subscriptions.isFetchable(tp0));
+        assertEquals(5, subscriptions.position(tp0).longValue());
     }
 
     @Test
     public void testUpdateFetchPositionDisconnect() {
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.needOffsetReset(tp0, OffsetResetStrategy.LATEST);
 
         // First request gets a disconnect
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
@@ -961,72 +961,72 @@ public class FetcherTest {
         // Next one succeeds
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
                                listOffsetResponse(Errors.NONE, 1L, 5L));
-        fetcher.updateFetchPositions(singleton(tp1));
-        assertFalse(subscriptions.isOffsetResetNeeded(tp1));
-        assertTrue(subscriptions.isFetchable(tp1));
-        assertEquals(5, subscriptions.position(tp1).longValue());
+        fetcher.updateFetchPositions(singleton(tp0));
+        assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+        assertTrue(subscriptions.isFetchable(tp0));
+        assertEquals(5, subscriptions.position(tp0).longValue());
     }
 
     @Test
     public void testUpdateFetchPositionOfPausedPartitionsRequiringOffsetReset() {
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.committed(tp1, new OffsetAndMetadata(0));
-        subscriptions.pause(tp1); // paused partition does not have a valid position
-        subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.committed(tp0, new OffsetAndMetadata(0));
+        subscriptions.pause(tp0); // paused partition does not have a valid position
+        subscriptions.needOffsetReset(tp0, OffsetResetStrategy.LATEST);
 
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
                                listOffsetResponse(Errors.NONE, 1L, 10L));
-        fetcher.updateFetchPositions(singleton(tp1));
+        fetcher.updateFetchPositions(singleton(tp0));
 
-        assertFalse(subscriptions.isOffsetResetNeeded(tp1));
-        assertFalse(subscriptions.isFetchable(tp1)); // because tp is paused
-        assertTrue(subscriptions.hasValidPosition(tp1));
-        assertEquals(10, subscriptions.position(tp1).longValue());
+        assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+        assertFalse(subscriptions.isFetchable(tp0)); // because tp is paused
+        assertTrue(subscriptions.hasValidPosition(tp0));
+        assertEquals(10, subscriptions.position(tp0).longValue());
     }
 
     @Test
     public void testUpdateFetchPositionOfPausedPartitionsWithoutACommittedOffset() {
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.pause(tp1); // paused partition does not have a valid position
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.pause(tp0); // paused partition does not have a valid position
 
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
                                listOffsetResponse(Errors.NONE, 1L, 0L));
-        fetcher.updateFetchPositions(singleton(tp1));
+        fetcher.updateFetchPositions(singleton(tp0));
 
-        assertFalse(subscriptions.isOffsetResetNeeded(tp1));
-        assertFalse(subscriptions.isFetchable(tp1)); // because tp is paused
-        assertTrue(subscriptions.hasValidPosition(tp1));
-        assertEquals(0, subscriptions.position(tp1).longValue());
+        assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+        assertFalse(subscriptions.isFetchable(tp0)); // because tp is paused
+        assertTrue(subscriptions.hasValidPosition(tp0));
+        assertEquals(0, subscriptions.position(tp0).longValue());
     }
 
     @Test
     public void testUpdateFetchPositionOfPausedPartitionsWithoutAValidPosition() {
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.committed(tp1, new OffsetAndMetadata(0));
-        subscriptions.pause(tp1); // paused partition does not have a valid position
-        subscriptions.seek(tp1, 10);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.committed(tp0, new OffsetAndMetadata(0));
+        subscriptions.pause(tp0); // paused partition does not have a valid position
+        subscriptions.seek(tp0, 10);
 
-        fetcher.updateFetchPositions(singleton(tp1));
+        fetcher.updateFetchPositions(singleton(tp0));
 
-        assertFalse(subscriptions.isOffsetResetNeeded(tp1));
-        assertFalse(subscriptions.isFetchable(tp1)); // because tp is paused
-        assertTrue(subscriptions.hasValidPosition(tp1));
-        assertEquals(10, subscriptions.position(tp1).longValue());
+        assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+        assertFalse(subscriptions.isFetchable(tp0)); // because tp is paused
+        assertTrue(subscriptions.hasValidPosition(tp0));
+        assertEquals(10, subscriptions.position(tp0).longValue());
     }
 
     @Test
     public void testUpdateFetchPositionOfPausedPartitionsWithAValidPosition() {
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.committed(tp1, new OffsetAndMetadata(0));
-        subscriptions.seek(tp1, 10);
-        subscriptions.pause(tp1); // paused partition already has a valid position
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.committed(tp0, new OffsetAndMetadata(0));
+        subscriptions.seek(tp0, 10);
+        subscriptions.pause(tp0); // paused partition already has a valid position
 
-        fetcher.updateFetchPositions(singleton(tp1));
+        fetcher.updateFetchPositions(singleton(tp0));
 
-        assertFalse(subscriptions.isOffsetResetNeeded(tp1));
-        assertFalse(subscriptions.isFetchable(tp1)); // because tp is paused
-        assertTrue(subscriptions.hasValidPosition(tp1));
-        assertEquals(10, subscriptions.position(tp1).longValue());
+        assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+        assertFalse(subscriptions.isFetchable(tp0)); // because tp is paused
+        assertTrue(subscriptions.hasValidPosition(tp0));
+        assertEquals(10, subscriptions.position(tp0).longValue());
     }
 
     @Test
@@ -1117,7 +1117,7 @@ public class FetcherTest {
             ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, null);
             client.send(request, time.milliseconds());
             client.poll(1, time.milliseconds());
-            FetchResponse response = fetchResponse(tp1, nextRecords, Errors.NONE, i, throttleTimeMs);
+            FetchResponse response = fetchResponse(tp0, nextRecords, Errors.NONE, i, throttleTimeMs);
             buffer = response.serialize(ApiKeys.FETCH.latestVersion(), new ResponseHeader(request.correlationId()));
             selector.completeReceive(new NetworkReceive(node.idString(), buffer));
             client.poll(1, time.milliseconds());
@@ -1137,11 +1137,11 @@ public class FetcherTest {
      */
     @Test
     public void testFetcherMetrics() {
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 0);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
 
         MetricName maxLagMetric = metrics.metricInstance(metricsRegistry.recordsLagMax);
-        MetricName partitionLagMetric = metrics.metricName(tp1 + ".records-lag", metricGroup);
+        MetricName partitionLagMetric = metrics.metricName(tp0 + ".records-lag", metricGroup);
 
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
         KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric);
@@ -1150,7 +1150,7 @@ public class FetcherTest {
         assertEquals(Double.NEGATIVE_INFINITY, recordsFetchLagMax.value(), EPSILON);
 
         // recordsFetchLagMax should be hw - fetchOffset after receiving an empty FetchResponse
-        fetchRecords(tp1, MemoryRecords.EMPTY, Errors.NONE, 100L, 0);
+        fetchRecords(tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, 0);
         assertEquals(100, recordsFetchLagMax.value(), EPSILON);
 
         KafkaMetric partitionLag = allMetrics.get(partitionLagMetric);
@@ -1161,7 +1161,7 @@ public class FetcherTest {
                 TimestampType.CREATE_TIME, 0L);
         for (int v = 0; v < 3; v++)
             builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
-        fetchRecords(tp1, builder.build(), Errors.NONE, 200L, 0);
+        fetchRecords(tp0, builder.build(), Errors.NONE, 200L, 0);
         assertEquals(197, recordsFetchLagMax.value(), EPSILON);
         assertEquals(197, partitionLag.value(), EPSILON);
 
@@ -1176,11 +1176,11 @@ public class FetcherTest {
         fetcher = createFetcher(subscriptions, metrics, new ByteArrayDeserializer(),
                 new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
 
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 0);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
 
         MetricName maxLagMetric = metrics.metricInstance(metricsRegistry.recordsLagMax);
-        MetricName partitionLagMetric = metrics.metricName(tp1 + ".records-lag", metricGroup);
+        MetricName partitionLagMetric = metrics.metricName(tp0 + ".records-lag", metricGroup);
 
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
         KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric);
@@ -1189,7 +1189,7 @@ public class FetcherTest {
         assertEquals(Double.NEGATIVE_INFINITY, recordsFetchLagMax.value(), EPSILON);
 
         // recordsFetchLagMax should be lso - fetchOffset after receiving an empty FetchResponse
-        fetchRecords(tp1, MemoryRecords.EMPTY, Errors.NONE, 100L, 50L, 0);
+        fetchRecords(tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, 50L, 0);
         assertEquals(50, recordsFetchLagMax.value(), EPSILON);
 
         KafkaMetric partitionLag = allMetrics.get(partitionLagMetric);
@@ -1200,7 +1200,7 @@ public class FetcherTest {
                 TimestampType.CREATE_TIME, 0L);
         for (int v = 0; v < 3; v++)
             builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes());
-        fetchRecords(tp1, builder.build(), Errors.NONE, 200L, 150L, 0);
+        fetchRecords(tp0, builder.build(), Errors.NONE, 200L, 150L, 0);
         assertEquals(147, recordsFetchLagMax.value(), EPSILON);
         assertEquals(147, partitionLag.value(), EPSILON);
 
@@ -1211,8 +1211,8 @@ public class FetcherTest {
 
     @Test
     public void testFetchResponseMetrics() {
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 0);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
 
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
         KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
@@ -1228,15 +1228,15 @@ public class FetcherTest {
         for (Record record : records.records())
             expectedBytes += record.sizeInBytes();
 
-        fetchRecords(tp1, records, Errors.NONE, 100L, 0);
+        fetchRecords(tp0, records, Errors.NONE, 100L, 0);
         assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
         assertEquals(3, recordsCountAverage.value(), EPSILON);
     }
 
     @Test
     public void testFetchResponseMetricsPartialResponse() {
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 1);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 1);
 
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
         KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
@@ -1254,16 +1254,16 @@ public class FetcherTest {
                 expectedBytes += record.sizeInBytes();
         }
 
-        fetchRecords(tp1, records, Errors.NONE, 100L, 0);
+        fetchRecords(tp0, records, Errors.NONE, 100L, 0);
         assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON);
         assertEquals(2, recordsCountAverage.value(), EPSILON);
     }
 
     @Test
     public void testFetchResponseMetricsWithOnePartitionError() {
-        subscriptions.assignFromUser(Utils.mkSet(tp1, tp2));
+        subscriptions.assignFromUser(Utils.mkSet(tp0, tp1));
+        subscriptions.seek(tp0, 0);
         subscriptions.seek(tp1, 0);
-        subscriptions.seek(tp2, 0);
 
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
         KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
@@ -1276,9 +1276,9 @@ public class FetcherTest {
         MemoryRecords records = builder.build();
 
         Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>();
-        partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100,
+        partitions.put(tp0, new FetchResponse.PartitionData(Errors.NONE, 100,
                 FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records));
-        partitions.put(tp2, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
+        partitions.put(tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
                 FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, MemoryRecords.EMPTY));
 
         assertEquals(1, fetcher.sendFetches());
@@ -1296,9 +1296,9 @@ public class FetcherTest {
 
     @Test
     public void testFetchResponseMetricsWithOnePartitionAtTheWrongOffset() {
-        subscriptions.assignFromUser(Utils.mkSet(tp1, tp2));
+        subscriptions.assignFromUser(Utils.mkSet(tp0, tp1));
+        subscriptions.seek(tp0, 0);
         subscriptions.seek(tp1, 0);
-        subscriptions.seek(tp2, 0);
 
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
         KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg));
@@ -1306,7 +1306,7 @@ public class FetcherTest {
 
         // send the fetch and then seek to a new offset
         assertEquals(1, fetcher.sendFetches());
-        subscriptions.seek(tp2, 5);
+        subscriptions.seek(tp1, 5);
 
         MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
                 TimestampType.CREATE_TIME, 0L);
@@ -1315,9 +1315,9 @@ public class FetcherTest {
         MemoryRecords records = builder.build();
 
         Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>();
-        partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100,
+        partitions.put(tp0, new FetchResponse.PartitionData(Errors.NONE, 100,
                 FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records));
-        partitions.put(tp2, new FetchResponse.PartitionData(Errors.NONE, 100,
+        partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100,
                 FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null,
                 MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("val".getBytes()))));
 
@@ -1361,6 +1361,8 @@ public class FetcherTest {
     public void testGetOffsetsForTimes() {
         // Empty map
         assertTrue(fetcher.getOffsetsByTimes(new HashMap<TopicPartition, Long>(), 100L).isEmpty());
+        // Unknown Offset
+        testGetOffsetsForTimesWithUnknownOffset();
         // Error code none with unknown offset
         testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, -1L, 100L, null, 100L);
         // Error code none with known offset
@@ -1379,15 +1381,15 @@ public class FetcherTest {
     @Test(expected = TimeoutException.class)
     public void testBatchedListOffsetsMetadataErrors() {
         Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>();
-        partitionData.put(tp1, new ListOffsetResponse.PartitionData(Errors.NOT_LEADER_FOR_PARTITION,
+        partitionData.put(tp0, new ListOffsetResponse.PartitionData(Errors.NOT_LEADER_FOR_PARTITION,
                 ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET));
-        partitionData.put(tp2, new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+        partitionData.put(tp1, new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
                 ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET));
         client.prepareResponse(new ListOffsetResponse(0, partitionData));
 
         Map<TopicPartition, Long> offsetsToSearch = new HashMap<>();
+        offsetsToSearch.put(tp0, ListOffsetRequest.EARLIEST_TIMESTAMP);
         offsetsToSearch.put(tp1, ListOffsetRequest.EARLIEST_TIMESTAMP);
-        offsetsToSearch.put(tp2, ListOffsetRequest.EARLIEST_TIMESTAMP);
 
         fetcher.getOffsetsByTimes(offsetsToSearch, 0);
     }
@@ -1410,9 +1412,9 @@ public class FetcherTest {
         List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
         abortedTransactions.add(new FetchResponse.AbortedTransaction(1, 0));
         MemoryRecords records = MemoryRecords.readableRecords(buffer);
-        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.assignFromUser(singleton(tp0));
 
-        subscriptions.seek(tp1, 0);
+        subscriptions.seek(tp0, 0);
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
@@ -1423,7 +1425,7 @@ public class FetcherTest {
         assertTrue(fetcher.hasCompletedFetches());
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords();
-        assertFalse(fetchedRecords.containsKey(tp1));
+        assertFalse(fetchedRecords.containsKey(tp0));
     }
 
     @Test
@@ -1442,9 +1444,9 @@ public class FetcherTest {
 
         List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
         MemoryRecords records = MemoryRecords.readableRecords(buffer);
-        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.assignFromUser(singleton(tp0));
 
-        subscriptions.seek(tp1, 0);
+        subscriptions.seek(tp0, 0);
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
@@ -1462,8 +1464,8 @@ public class FetcherTest {
         assertTrue(fetcher.hasCompletedFetches());
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords();
-        assertTrue(fetchedRecords.containsKey(tp1));
-        assertEquals(fetchedRecords.get(tp1).size(), 2);
+        assertTrue(fetchedRecords.containsKey(tp0));
+        assertEquals(fetchedRecords.get(tp0).size(), 2);
     }
 
     @Test
@@ -1519,9 +1521,9 @@ public class FetcherTest {
         buffer.flip();
 
         MemoryRecords records = MemoryRecords.readableRecords(buffer);
-        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.assignFromUser(singleton(tp0));
 
-        subscriptions.seek(tp1, 0);
+        subscriptions.seek(tp0, 0);
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
@@ -1532,9 +1534,9 @@ public class FetcherTest {
         assertTrue(fetcher.hasCompletedFetches());
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords();
-        assertTrue(fetchedRecords.containsKey(tp1));
+        assertTrue(fetchedRecords.containsKey(tp0));
         // There are only 3 committed records
-        List<ConsumerRecord<byte[], byte[]>> fetchedConsumerRecords = fetchedRecords.get(tp1);
+        List<ConsumerRecord<byte[], byte[]>> fetchedConsumerRecords = fetchedRecords.get(tp0);
         Set<String> fetchedKeys = new HashSet<>();
         for (ConsumerRecord<byte[], byte[]> consumerRecord : fetchedConsumerRecords) {
             fetchedKeys.add(new String(consumerRecord.key(), StandardCharsets.UTF_8));
@@ -1566,9 +1568,9 @@ public class FetcherTest {
         List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
         abortedTransactions.add(new FetchResponse.AbortedTransaction(1, 0));
         MemoryRecords records = MemoryRecords.readableRecords(buffer);
-        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.assignFromUser(singleton(tp0));
 
-        subscriptions.seek(tp1, 0);
+        subscriptions.seek(tp0, 0);
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
@@ -1579,9 +1581,9 @@ public class FetcherTest {
         assertTrue(fetcher.hasCompletedFetches());
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords();
-        assertTrue(fetchedRecords.containsKey(tp1));
-        assertEquals(fetchedRecords.get(tp1).size(), 2);
-        List<ConsumerRecord<byte[], byte[]>> fetchedConsumerRecords = fetchedRecords.get(tp1);
+        assertTrue(fetchedRecords.containsKey(tp0));
+        assertEquals(fetchedRecords.get(tp0).size(), 2);
+        List<ConsumerRecord<byte[], byte[]>> fetchedConsumerRecords = fetchedRecords.get(tp0);
         Set<String> committedKeys = new HashSet<>(Arrays.asList("commit1-1", "commit1-2"));
         Set<String> actuallyCommittedKeys = new HashSet<>();
         for (ConsumerRecord<byte[], byte[]> consumerRecord : fetchedConsumerRecords) {
@@ -1610,8 +1612,8 @@ public class FetcherTest {
         buffer.flip();
 
         // send the fetch
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 0);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
         assertEquals(1, fetcher.sendFetches());
 
         // prepare the response. the aborted transactions begin at offsets which are no longer in the log
@@ -1624,8 +1626,8 @@ public class FetcherTest {
         assertTrue(fetcher.hasCompletedFetches());
 
         Map<TopicPartition, List<ConsumerRecord<String, String>>> allFetchedRecords = fetcher.fetchedRecords();
-        assertTrue(allFetchedRecords.containsKey(tp1));
-        List<ConsumerRecord<String, String>> fetchedRecords = allFetchedRecords.get(tp1);
+        assertTrue(allFetchedRecords.containsKey(tp0));
+        List<ConsumerRecord<String, String>> fetchedRecords = allFetchedRecords.get(tp0);
         assertEquals(3, fetchedRecords.size());
         assertEquals(Arrays.asList(6L, 7L, 8L), collectRecordOffsets(fetchedRecords));
     }
@@ -1639,7 +1641,7 @@ public class FetcherTest {
                 new SimpleRecord(null, "value".getBytes()));
 
         // Remove the last record to simulate compaction
-        MemoryRecords.FilterResult result = records.filterTo(tp1, new MemoryRecords.RecordFilter() {
+        MemoryRecords.FilterResult result = records.filterTo(tp0, new MemoryRecords.RecordFilter() {
             @Override
             protected BatchRetention checkBatchRetention(RecordBatch batch) {
                 return BatchRetention.DELETE_EMPTY;
@@ -1653,16 +1655,16 @@ public class FetcherTest {
         result.output.flip();
         MemoryRecords compactedRecords = MemoryRecords.readableRecords(result.output);
 
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 0);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(tp1, compactedRecords, Errors.NONE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp0, compactedRecords, Errors.NONE, 100L, 0));
         consumerClient.poll(0);
         assertTrue(fetcher.hasCompletedFetches());
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> allFetchedRecords = fetcher.fetchedRecords();
-        assertTrue(allFetchedRecords.containsKey(tp1));
-        List<ConsumerRecord<byte[], byte[]>> fetchedRecords = allFetchedRecords.get(tp1);
+        assertTrue(allFetchedRecords.containsKey(tp0));
+        List<ConsumerRecord<byte[], byte[]>> fetchedRecords = allFetchedRecords.get(tp0);
         assertEquals(3, fetchedRecords.size());
 
         for (int i = 0; i < 3; i++) {
@@ -1670,7 +1672,7 @@ public class FetcherTest {
         }
 
         // The next offset should point to the next batch
-        assertEquals(4L, subscriptions.position(tp1).longValue());
+        assertEquals(4L, subscriptions.position(tp0).longValue());
     }
 
     @Test
@@ -1688,10 +1690,10 @@ public class FetcherTest {
         buffer.flip();
         MemoryRecords recordsWithEmptyBatch = MemoryRecords.readableRecords(buffer);
 
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 0);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fetchResponse(tp1, recordsWithEmptyBatch, Errors.NONE, 100L, 0));
+        client.prepareResponse(fetchResponse(tp0, recordsWithEmptyBatch, Errors.NONE, 100L, 0));
         consumerClient.poll(0);
         assertTrue(fetcher.hasCompletedFetches());
 
@@ -1699,7 +1701,7 @@ public class FetcherTest {
         assertTrue(allFetchedRecords.isEmpty());
 
         // The next offset should point to the next batch
-        assertEquals(lastOffset + 1, subscriptions.position(tp1).longValue());
+        assertEquals(lastOffset + 1, subscriptions.position(tp0).longValue());
     }
 
     @Test
@@ -1743,8 +1745,8 @@ public class FetcherTest {
         buffer.flip();
 
         // send the fetch
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 0);
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
         assertEquals(1, fetcher.sendFetches());
 
         // prepare the response. the aborted transactions begin at offsets which are no longer in the log
@@ -1758,8 +1760,8 @@ public class FetcherTest {
         assertTrue(fetcher.hasCompletedFetches());
 
         Map<TopicPartition, List<ConsumerRecord<String, String>>> allFetchedRecords = fetcher.fetchedRecords();
-        assertTrue(allFetchedRecords.containsKey(tp1));
-        List<ConsumerRecord<String, String>> fetchedRecords = allFetchedRecords.get(tp1);
+        assertTrue(allFetchedRecords.containsKey(tp0));
+        List<ConsumerRecord<String, String>> fetchedRecords = allFetchedRecords.get(tp0);
         assertEquals(5, fetchedRecords.size());
         assertEquals(Arrays.asList(3L, 4L, 30L, 31L, 32L), collectRecordOffsets(fetchedRecords));
     }
@@ -1782,9 +1784,9 @@ public class FetcherTest {
         List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
         abortedTransactions.add(new FetchResponse.AbortedTransaction(1, 0));
         MemoryRecords records = MemoryRecords.readableRecords(buffer);
-        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.assignFromUser(singleton(tp0));
 
-        subscriptions.seek(tp1, 0);
+        subscriptions.seek(tp0, 0);
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
@@ -1795,7 +1797,7 @@ public class FetcherTest {
         assertTrue(fetcher.hasCompletedFetches());
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords();
-        assertTrue(fetchedRecords.containsKey(tp1));
+        assertTrue(fetchedRecords.containsKey(tp0));
     }
 
     @Test
@@ -1815,9 +1817,9 @@ public class FetcherTest {
         List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
         abortedTransactions.add(new FetchResponse.AbortedTransaction(1, 0));
         MemoryRecords records = MemoryRecords.readableRecords(buffer);
-        subscriptions.assignFromUser(singleton(tp1));
+        subscriptions.assignFromUser(singleton(tp0));
 
-        subscriptions.seek(tp1, 0);
+        subscriptions.seek(tp0, 0);
 
         // normal fetch
         assertEquals(1, fetcher.sendFetches());
@@ -1830,8 +1832,8 @@ public class FetcherTest {
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords();
 
         // Ensure that we don't return any of the aborted records, but yet advance the consumer position.
-        assertFalse(fetchedRecords.containsKey(tp1));
-        assertEquals(currentOffset, (long) subscriptions.position(tp1));
+        assertFalse(fetchedRecords.containsKey(tp0));
+        assertEquals(currentOffset, (long) subscriptions.position(tp0));
     }
 
     private int appendTransactionalRecords(ByteBuffer buffer, long pid, long baseOffset, int baseSequence, SimpleRecord... records) {
@@ -1873,8 +1875,6 @@ public class FetcherTest {
                                                  Long expectedOffsetForTp0,
                                                  Long expectedOffsetForTp1) {
         client.reset();
-        TopicPartition tp0 = tp1;
-        TopicPartition tp1 = new TopicPartition(topicName, 1);
         // Ensure metadata has both partition.
         Cluster cluster = TestUtils.clusterWith(2, topicName, 2);
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
@@ -1906,19 +1906,39 @@ public class FetcherTest {
         }
     }
 
+    private void testGetOffsetsForTimesWithUnknownOffset() {
+        client.reset();
+        // Ensure metadata has both partition.
+        Cluster cluster = TestUtils.clusterWith(1, topicName, 1);
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+
+        Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>();
+        partitionData.put(tp0, new ListOffsetResponse.PartitionData(Errors.NONE,
+                ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET));
+
+        client.prepareResponseFrom(new ListOffsetResponse(0, partitionData), cluster.leaderFor(tp0));
+
+        Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
+        timestampToSearch.put(tp0, 0L);
+        Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = fetcher.getOffsetsByTimes(timestampToSearch, Long.MAX_VALUE);
+
+        assertTrue(offsetAndTimestampMap.containsKey(tp0));
+        assertNull(offsetAndTimestampMap.get(tp0));
+    }
+
     private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) {
         // matches any list offset request with the provided timestamp
         return new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
                 ListOffsetRequest req = (ListOffsetRequest) body;
-                return timestamp == req.partitionTimestamps().get(tp1);
+                return timestamp == req.partitionTimestamps().get(tp0);
             }
         };
     }
 
     private ListOffsetResponse listOffsetResponse(Errors error, long timestamp, long offset) {
-        return listOffsetResponse(tp1, error, timestamp, offset);
+        return listOffsetResponse(tp0, error, timestamp, offset);
     }
 
     private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset) {
@@ -1934,7 +1954,7 @@ public class FetcherTest {
                                                                long lastStableOffset,
                                                                long hw,
                                                                int throttleTime) {
-        Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp1,
+        Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp0,
                 new FetchResponse.PartitionData(error, hw, lastStableOffset, 0L, abortedTransactions, records));
         return new FetchResponse(new LinkedHashMap<>(partitions), throttleTime);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bb53e03/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 4c1f593..77f11df 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -480,7 +480,7 @@ object ConsumerGroupCommand extends Logging {
       val consumer = getConsumer()
       consumer.assign(List(topicPartition).asJava)
       val offsetsForTimes = consumer.offsetsForTimes(Map(topicPartition -> timestamp).asJava)
-      if (offsetsForTimes != null && !offsetsForTimes.isEmpty)
+      if (offsetsForTimes != null && !offsetsForTimes.isEmpty && offsetsForTimes.get(topicPartition) != null)
         LogOffsetResult.LogOffset(offsetsForTimes.get(topicPartition).offset)
       else {
         getLogEndOffset(topicPartition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bb53e03/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
index ddac764..c9c40a9 100644
--- a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
@@ -158,6 +158,12 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging {
   }
 
   @Test
+  def testOffsetsForTimesWhenOffsetNotFound() {
+    val consumer = consumers.head
+    assertNull(consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp))
+  }
+
+  @Test
   def testOffsetsForTimesAfterDeleteRecords() {
     val consumer = consumers.head
     subscribeAndWaitForAssignment(topic, consumer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bb53e03/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index bfe5419..e61f6c7 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -28,6 +28,8 @@ To upgrade from earlier versions, please review the <a href="#upgrade_11_0_0">0.
 <ul>
     <li>Topic deletion is now enabled by default, since the functionality is now stable. Users who wish to
         to retain the previous behavior should set the broker config <code>delete.topic.enable</code> to <code>false</code>. Keep in mind that topic deletion removes data and the operation is not reversible (i.e. there is no "undelete" operation)</li>
+    <li>For topics that support timestamp search if no offset can be found for a partition, that partition is now included in the search result with a null offset value. Previously, the partition was not included in the map.
+        This change was made to make the search behavior consistent with the case of topics not supporting timestamp search.
 </ul>
 
 <h4><a id="upgrade_11_0_0" href="#upgrade_11_0_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x or 0.10.2.x to 0.11.0.0</a></h4>


Mime
View raw message