kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3006: standardize KafkaConsumer API to use Collection
Date Fri, 18 Mar 2016 23:07:26 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 07360cb0d -> 4332175c1


KAFKA-3006: standardize KafkaConsumer API to use Collection

Author: Pierre-Yves Ritschard <pyr@spootnik.org>

Reviewers: Jason Gustafson, Gwen Shapira

Closes #1098 from hachikuji/KAFKA-3006


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4332175c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4332175c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4332175c

Branch: refs/heads/trunk
Commit: 4332175c11dda5deb491f27a6ecf66661676ca47
Parents: 07360cb
Author: Pierre-Yves Ritschard <pyr@spootnik.org>
Authored: Fri Mar 18 16:07:20 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Fri Mar 18 16:07:20 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/clients/consumer/Consumer.java | 35 ++++----
 .../kafka/clients/consumer/KafkaConsumer.java   | 85 ++++++++++----------
 .../kafka/clients/consumer/MockConsumer.java    | 16 ++--
 .../consumer/internals/SubscriptionState.java   |  7 +-
 .../clients/consumer/KafkaConsumerTest.java     | 29 ++++---
 .../kafka/connect/runtime/WorkerSinkTask.java   |  9 ++-
 .../connect/runtime/WorkerSinkTaskContext.java  |  5 +-
 .../kafka/connect/util/KafkaBasedLog.java       |  6 +-
 .../connect/runtime/WorkerSinkTaskTest.java     | 14 ++--
 .../runtime/WorkerSinkTaskThreadedTest.java     |  8 +-
 .../kafka/admin/ConsumerGroupCommand.scala      |  2 +-
 .../scala/kafka/tools/ConsumerPerformance.scala |  3 +-
 .../scala/kafka/tools/EndToEndLatency.scala     |  5 +-
 .../kafka/api/BaseConsumerTest.scala            |  6 +-
 .../kafka/api/ConsumerBounceTest.scala          |  4 +-
 .../kafka/api/PlaintextConsumerTest.scala       | 17 ++--
 .../internals/ProcessorStateManager.java        |  6 +-
 .../streams/processor/internals/StreamTask.java |  6 +-
 .../processor/internals/StreamThread.java       |  8 +-
 .../internals/ProcessorStateManagerTest.java    | 14 ++--
 .../processor/internals/StandbyTaskTest.java    |  5 +-
 .../streams/smoketest/SmokeTestDriver.java      |  2 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |  5 +-
 23 files changed, 156 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4332175c/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index c0f3030..0862c32 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -19,6 +19,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 import java.io.Closeable;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -30,7 +31,7 @@ import java.util.regex.Pattern;
  */
 @InterfaceStability.Unstable
 public interface Consumer<K, V> extends Closeable {
-    
+
     /**
      * @see KafkaConsumer#assignment()
      */
@@ -42,19 +43,19 @@ public interface Consumer<K, V> extends Closeable {
     public Set<String> subscription();
 
     /**
-     * @see KafkaConsumer#subscribe(List)
+     * @see KafkaConsumer#subscribe(Collection)
      */
-    public void subscribe(List<String> topics);
+    public void subscribe(Collection<String> topics);
 
     /**
-     * @see KafkaConsumer#subscribe(List, ConsumerRebalanceListener)
+     * @see KafkaConsumer#subscribe(Collection, ConsumerRebalanceListener)
      */
-    public void subscribe(List<String> topics, ConsumerRebalanceListener callback);
+    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
 
     /**
-     * @see KafkaConsumer#assign(List)
+     * @see KafkaConsumer#assign(Collection)
      */
-    public void assign(List<TopicPartition> partitions);
+    public void assign(Collection<TopicPartition> partitions);
 
     /**
     * @see KafkaConsumer#subscribe(Pattern, ConsumerRebalanceListener)
@@ -102,14 +103,14 @@ public interface Consumer<K, V> extends Closeable {
     public void seek(TopicPartition partition, long offset);
 
     /**
-     * @see KafkaConsumer#seekToBeginning(TopicPartition...)
+     * @see KafkaConsumer#seekToBeginning(Collection)
      */
-    public void seekToBeginning(TopicPartition... partitions);
+    public void seekToBeginning(Collection<TopicPartition> partitions);
 
     /**
-     * @see KafkaConsumer#seekToEnd(TopicPartition...)
+     * @see KafkaConsumer#seekToEnd(Collection)
      */
-    public void seekToEnd(TopicPartition... partitions);
+    public void seekToEnd(Collection<TopicPartition> partitions);
 
     /**
      * @see KafkaConsumer#position(TopicPartition)
@@ -137,19 +138,19 @@ public interface Consumer<K, V> extends Closeable {
     public Map<String, List<PartitionInfo>> listTopics();
 
     /**
-     * @see KafkaConsumer#pause(TopicPartition...)
+     * @see KafkaConsumer#paused()
      */
-    public void pause(TopicPartition... partitions);
+    public Set<TopicPartition> paused();
 
     /**
-     * @see KafkaConsumer#paused()
+     * @see KafkaConsumer#pause(Collection)
      */
-    public Set<TopicPartition> paused();
+    public void pause(Collection<TopicPartition> partitions);
 
     /**
-     * @see KafkaConsumer#resume(TopicPartition...)
+     * @see KafkaConsumer#resume(Collection)
      */
-    public void resume(TopicPartition... partitions);
+    public void resume(Collection<TopicPartition> partitions);
 
     /**
      * @see KafkaConsumer#close()

http://git-wip-us.apache.org/repos/asf/kafka/blob/4332175c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 804a160..b7eafbe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -16,12 +16,12 @@ import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
+import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
-import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
@@ -45,7 +45,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.ConcurrentModificationException;
@@ -100,7 +99,7 @@ import java.util.regex.Pattern;
  * distributed over many machines to provide additional scalability and fault tolerance for processing.
  * <p>
  * Each Kafka consumer is able to configure a consumer group that it belongs to, and can dynamically set the
- * list of topics it wants to subscribe to through {@link #subscribe(List, ConsumerRebalanceListener)},
+ * list of topics it wants to subscribe to through {@link #subscribe(Collection, ConsumerRebalanceListener)},
  * or subscribe to all topics matching certain pattern through {@link #subscribe(Pattern, ConsumerRebalanceListener)}.
  * Kafka will deliver each message in the
  * subscribed topics to one process in each consumer group. This is achieved by balancing the partitions in the topic
@@ -127,7 +126,7 @@ import java.util.regex.Pattern;
  * commits (note that offsets are always committed for a given consumer group), etc.
  * See <a href="#rebalancecallback">Storing Offsets Outside Kafka</a> for more details
  * <p>
- * It is also possible for the consumer to manually specify the partitions that are assigned to it through {@link #assign(List)},
+ * It is also possible for the consumer to manually specify the partitions that are assigned to it through {@link #assign(Collection)},
  * which disables this dynamic partition assignment.
  *
  * <h3>Usage Examples</h3>
@@ -313,7 +312,7 @@ import java.util.regex.Pattern;
  * This type of usage is simplest when the partition assignment is also done manually (this would be likely in the
  * search index use case described above). If the partition assignment is done automatically special care is
  * needed to handle the case where partition assignments change. This can be done by providing a
- * {@link ConsumerRebalanceListener} instance in the call to {@link #subscribe(List, ConsumerRebalanceListener)}
+ * {@link ConsumerRebalanceListener} instance in the call to {@link #subscribe(Collection, ConsumerRebalanceListener)}
  * and {@link #subscribe(Pattern, ConsumerRebalanceListener)}.
  * For example, when partitions are taken from a consumer the consumer will want to commit its offset for those partitions by
  * implementing {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)}. When partitions are assigned to a
@@ -342,7 +341,7 @@ import java.util.regex.Pattern;
  * <p>
  * Kafka allows specifying the position using {@link #seek(TopicPartition, long)} to specify the new position. Special
  * methods for seeking to the earliest and latest offset the server maintains are also available (
- * {@link #seekToBeginning(TopicPartition...)} and {@link #seekToEnd(TopicPartition...)} respectively).
+ * {@link #seekToBeginning(Collection)} and {@link #seekToEnd(Collection)} respectively).
  *
  * <h4>Consumption Flow Control</h4>
  *
@@ -359,7 +358,7 @@ import java.util.regex.Pattern;
  * fetching other topics.
  *
  * <p>
- * Kafka supports dynamic controlling of consumption flows by using {@link #pause(TopicPartition...)} and {@link #resume(TopicPartition...)}
+ * Kafka supports dynamic controlling of consumption flows by using {@link #pause(Collection)} and {@link #resume(Collection)}
  * to pause the consumption on the specified assigned partitions and resume the consumption
  * on the specified paused partitions respectively in the future {@link #poll(long)} calls.
  *
@@ -660,7 +659,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
     /**
      * Get the set of partitions currently assigned to this consumer. If subscription happened by directly assigning
-     * partitions using {@link #assign(List)} then this will simply return the same partitions that
+     * partitions using {@link #assign(Collection)} then this will simply return the same partitions that
      * were assigned. If topic subscription was used, then this will give the set of topic partitions currently assigned
      * to the consumer (which may be none if the assignment hasn't happened yet, or the partitions are in the
      * process of getting reassigned).
@@ -677,7 +676,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
     /**
      * Get the current subscription. Will return the same topics used in the most recent call to
-     * {@link #subscribe(List, ConsumerRebalanceListener)}, or an empty set if no such call has been made.
+     * {@link #subscribe(Collection, ConsumerRebalanceListener)}, or an empty set if no such call has been made.
      * @return The set of topics currently subscribed to
      */
     public Set<String> subscription() {
@@ -693,7 +692,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * Subscribe to the given list of topics to get dynamically
      * assigned partitions. <b>Topic subscriptions are not incremental. This list will replace the current
      * assignment (if there is one).</b> Note that it is not possible to combine topic subscription with group management
-     * with manual partition assignment through {@link #assign(List)}.
+     * with manual partition assignment through {@link #assign(Collection)}.
      *
      * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
      *
@@ -718,7 +717,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *                 subscribed topics
      */
     @Override
-    public void subscribe(List<String> topics, ConsumerRebalanceListener listener) {
+    public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
         acquire();
         try {
             if (topics.isEmpty()) {
@@ -738,21 +737,21 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * Subscribe to the given list of topics to get dynamically assigned partitions.
      * <b>Topic subscriptions are not incremental. This list will replace the current
      * assignment (if there is one).</b> It is not possible to combine topic subscription with group management
-     * with manual partition assignment through {@link #assign(List)}.
+     * with manual partition assignment through {@link #assign(Collection)}.
      *
      * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
      *
      * <p>
-     * This is a short-hand for {@link #subscribe(List, ConsumerRebalanceListener)}, which
+     * This is a short-hand for {@link #subscribe(Collection, ConsumerRebalanceListener)}, which
      * uses a noop listener. If you need the ability to either seek to particular offsets, you should prefer
-     * {@link #subscribe(List, ConsumerRebalanceListener)}, since group rebalances will cause partition offsets
+     * {@link #subscribe(Collection, ConsumerRebalanceListener)}, since group rebalances will cause partition offsets
      * to be reset. You should also prefer to provide your own listener if you are doing your own offset
      * management since the listener gives you an opportunity to commit offsets before a rebalance finishes.
      *
      * @param topics The list of topics to subscribe to
      */
     @Override
-    public void subscribe(List<String> topics) {
+    public void subscribe(Collection<String> topics) {
         subscribe(topics, new NoOpConsumerRebalanceListener());
     }
 
@@ -785,8 +784,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Unsubscribe from topics currently subscribed with {@link #subscribe(List)}. This
-     * also clears any partitions directly assigned through {@link #assign(List)}.
+     * Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)}. This
+     * also clears any partitions directly assigned through {@link #assign(Collection)}.
      */
     public void unsubscribe() {
         acquire();
@@ -806,13 +805,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * <p>
      * Manual topic assignment through this method does not use the consumer's group management
      * functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
-     * metadata change. Note that it is not possible to use both manual partition assignment with {@link #assign(List)}
-     * and group assignment with {@link #subscribe(List, ConsumerRebalanceListener)}.
+     * metadata change. Note that it is not possible to use both manual partition assignment with {@link #assign(Collection)}
+     * and group assignment with {@link #subscribe(Collection, ConsumerRebalanceListener)}.
      *
      * @param partitions The list of partitions to assign this consumer
      */
     @Override
-    public void assign(List<TopicPartition> partitions) {
+    public void assign(Collection<TopicPartition> partitions) {
         acquire();
         try {
             log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
@@ -931,7 +930,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * encountered (in which case it is thrown to the caller).
      *
      * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried.
-     *             This can only occur if you are using automatic group management with {@link #subscribe(List)},
+     *             This can only occur if you are using automatic group management with {@link #subscribe(Collection)},
      *             or if there is an active group with the same groupId which is using group management.
      * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
      *             function is called
@@ -963,7 +962,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *
      * @param offsets A map of offsets by partition with associated metadata
      * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried.
-     *             This can only occur if you are using automatic group management with {@link #subscribe(List)},
+     *             This can only occur if you are using automatic group management with {@link #subscribe(Collection)},
      *             or if there is an active group with the same groupId which is using group management.
      * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
      *             function is called
@@ -1063,11 +1062,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * first offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)} are called.
      * If no partition is provided, seek to the first offset for all of the currently assigned partitions.
      */
-    public void seekToBeginning(TopicPartition... partitions) {
+    public void seekToBeginning(Collection<TopicPartition> partitions) {
         acquire();
         try {
-            Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
-                    : Arrays.asList(partitions);
+            Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
             for (TopicPartition tp : parts) {
                 log.debug("Seeking to beginning of partition {}", tp);
                 subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
@@ -1082,11 +1080,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * final offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)} are called.
      * If no partition is provided, seek to the final offset for all of the currently assigned partitions.
      */
-    public void seekToEnd(TopicPartition... partitions) {
+    public void seekToEnd(Collection<TopicPartition> partitions) {
         acquire();
         try {
-            Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
-                    : Arrays.asList(partitions);
+            Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
             for (TopicPartition tp : parts) {
                 log.debug("Seeking to end of partition {}", tp);
                 subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
@@ -1222,13 +1219,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
     /**
      * Suspend fetching from the requested partitions. Future calls to {@link #poll(long)} will not return
-     * any records from these partitions until they have been resumed using {@link #resume(TopicPartition...)}.
+     * any records from these partitions until they have been resumed using {@link #resume(Collection)}.
      * Note that this method does not affect partition subscription. In particular, it does not cause a group
      * rebalance when automatic assignment is used.
      * @param partitions The partitions which should be paused
      */
     @Override
-    public void pause(TopicPartition... partitions) {
+    public void pause(Collection<TopicPartition> partitions) {
         acquire();
         try {
             for (TopicPartition partition: partitions) {
@@ -1241,34 +1238,34 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Get the set of partitions that were previously paused by a call to {@link #pause(TopicPartition...)}.
-     *
-     * @return The set of paused partitions
+     * Resume specified partitions which have been paused with {@link #pause(Collection)}. New calls to
+     * {@link #poll(long)} will return records from these partitions if there are any to be fetched.
+     * If the partitions were not previously paused, this method is a no-op.
+     * @param partitions The partitions which should be resumed
      */
     @Override
-    public Set<TopicPartition> paused() {
+    public void resume(Collection<TopicPartition> partitions) {
         acquire();
         try {
-            return Collections.unmodifiableSet(subscriptions.pausedPartitions());
+            for (TopicPartition partition: partitions) {
+                log.debug("Resuming partition {}", partition);
+                subscriptions.resume(partition);
+            }
         } finally {
             release();
         }
     }
 
     /**
-     * Resume specified partitions which have been paused with {@link #pause(TopicPartition...)}. New calls to
-     * {@link #poll(long)} will return records from these partitions if there are any to be fetched.
-     * If the partitions were not previously paused, this method is a no-op.
-     * @param partitions The partitions which should be resumed
+     * Get the set of partitions that were previously paused by a call to {@link #pause(Collection)}.
+     *
+     * @return The set of paused partitions
      */
     @Override
-    public void resume(TopicPartition... partitions) {
+    public Set<TopicPartition> paused() {
         acquire();
         try {
-            for (TopicPartition partition: partitions) {
-                log.debug("Resuming partition {}", partition);
-                subscriptions.resume(partition);
-            }
+            return Collections.unmodifiableSet(subscriptions.pausedPartitions());
         } finally {
             release();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4332175c/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index c7f0a46..8dce1f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -67,7 +67,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         this.exception = null;
         this.wakeup = new AtomicBoolean(false);
     }
-    
+
     @Override
     public Set<TopicPartition> assignment() {
         return this.subscriptions.assignedPartitions();
@@ -86,7 +86,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public void subscribe(List<String> topics) {
+    public void subscribe(Collection<String> topics) {
         subscribe(topics, new NoOpConsumerRebalanceListener());
     }
 
@@ -105,13 +105,13 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public void subscribe(List<String> topics, final ConsumerRebalanceListener listener) {
+    public void subscribe(Collection<String> topics, final ConsumerRebalanceListener listener) {
         ensureNotClosed();
         this.subscriptions.subscribe(topics, listener);
     }
 
     @Override
-    public void assign(List<TopicPartition> partitions) {
+    public void assign(Collection<TopicPartition> partitions) {
         ensureNotClosed();
         this.subscriptions.assignFromUser(partitions);
     }
@@ -238,7 +238,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public void seekToBeginning(TopicPartition... partitions) {
+    public void seekToBeginning(Collection<TopicPartition> partitions) {
         ensureNotClosed();
         for (TopicPartition tp : partitions)
             subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
@@ -249,7 +249,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public void seekToEnd(TopicPartition... partitions) {
+    public void seekToEnd(Collection<TopicPartition> partitions) {
         ensureNotClosed();
         for (TopicPartition tp : partitions)
             subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
@@ -287,7 +287,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public void pause(TopicPartition... partitions) {
+    public void pause(Collection<TopicPartition> partitions) {
         for (TopicPartition partition : partitions) {
             subscriptions.pause(partition);
             paused.add(partition);
@@ -295,7 +295,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public void resume(TopicPartition... partitions) {
+    public void resume(Collection<TopicPartition> partitions) {
         for (TopicPartition partition : partitions) {
             subscriptions.resume(partition);
             paused.remove(partition);

http://git-wip-us.apache.org/repos/asf/kafka/blob/4332175c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index af26357..e72a476 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -21,7 +21,6 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
@@ -89,7 +88,7 @@ public class SubscriptionState {
         this.subscribedPattern = null;
     }
 
-    public void subscribe(List<String> topics, ConsumerRebalanceListener listener) {
+    public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
         if (listener == null)
             throw new IllegalArgumentException("RebalanceListener cannot be null");
 
@@ -101,7 +100,7 @@ public class SubscriptionState {
         changeSubscription(topics);
     }
 
-    public void changeSubscription(List<String> topicsToSubscribe) {
+    public void changeSubscription(Collection<String> topicsToSubscribe) {
         if (!this.subscription.equals(new HashSet<>(topicsToSubscribe))) {
             this.subscription.clear();
             this.subscription.addAll(topicsToSubscribe);
@@ -415,4 +414,4 @@ public class SubscriptionState {
 
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4332175c/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 2ac024f..ff07461 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -29,6 +29,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Properties;
 
+import static java.util.Collections.singleton;
+import static org.junit.Assert.assertEquals;
+
 public class KafkaConsumerTest {
 
     private final String topic = "test";
@@ -47,9 +50,9 @@ public class KafkaConsumerTest {
             KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
                     props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
         } catch (KafkaException e) {
-            Assert.assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
-            Assert.assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get());
-            Assert.assertEquals("Failed to construct kafka consumer", e.getMessage());
+            assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
+            assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get());
+            assertEquals("Failed to construct kafka consumer", e.getMessage());
             return;
         }
         Assert.fail("should have caught an exception and returned");
@@ -60,7 +63,7 @@ public class KafkaConsumerTest {
         KafkaConsumer<byte[], byte[]> consumer = newConsumer();
 
         consumer.subscribe(Collections.singletonList(topic));
-        Assert.assertEquals(Collections.singleton(topic), consumer.subscription());
+        assertEquals(singleton(topic), consumer.subscription());
         Assert.assertTrue(consumer.assignment().isEmpty());
 
         consumer.subscribe(Collections.<String>emptyList());
@@ -69,7 +72,7 @@ public class KafkaConsumerTest {
 
         consumer.assign(Collections.singletonList(tp0));
         Assert.assertTrue(consumer.subscription().isEmpty());
-        Assert.assertEquals(Collections.singleton(tp0), consumer.assignment());
+        assertEquals(singleton(tp0), consumer.assignment());
 
         consumer.unsubscribe();
         Assert.assertTrue(consumer.subscription().isEmpty());
@@ -98,12 +101,12 @@ public class KafkaConsumerTest {
 
             KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(
                     props, new StringDeserializer(), new StringDeserializer());
-            Assert.assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
-            Assert.assertEquals(0, MockConsumerInterceptor.CLOSE_COUNT.get());
+            assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
+            assertEquals(0, MockConsumerInterceptor.CLOSE_COUNT.get());
 
             consumer.close();
-            Assert.assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
-            Assert.assertEquals(1, MockConsumerInterceptor.CLOSE_COUNT.get());
+            assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
+            assertEquals(1, MockConsumerInterceptor.CLOSE_COUNT.get());
         } finally {
             // cleanup since we are using mutable static variables in MockConsumerInterceptor
             MockConsumerInterceptor.resetCounters();
@@ -115,13 +118,13 @@ public class KafkaConsumerTest {
         KafkaConsumer<byte[], byte[]> consumer = newConsumer();
 
         consumer.assign(Collections.singletonList(tp0));
-        Assert.assertEquals(Collections.singleton(tp0), consumer.assignment());
+        assertEquals(singleton(tp0), consumer.assignment());
         Assert.assertTrue(consumer.paused().isEmpty());
 
-        consumer.pause(tp0);
-        Assert.assertEquals(Collections.singleton(tp0), consumer.paused());
+        consumer.pause(singleton(tp0));
+        assertEquals(singleton(tp0), consumer.paused());
 
-        consumer.resume(tp0);
+        consumer.resume(singleton(tp0));
         Assert.assertTrue(consumer.paused().isEmpty());
 
         consumer.unsubscribe();

http://git-wip-us.apache.org/repos/asf/kafka/blob/4332175c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index eb64355..6293455 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -49,6 +49,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static java.util.Collections.singleton;
+
 /**
  * WorkerTask that uses a SinkTask to export data from Kafka.
  */
@@ -350,7 +352,7 @@ class WorkerSinkTask extends WorkerTask {
             if (pausedForRedelivery) {
                 for (TopicPartition tp : consumer.assignment())
                     if (!context.pausedPartitions().contains(tp))
-                        consumer.resume(tp);
+                        consumer.resume(singleton(tp));
                 pausedForRedelivery = false;
             }
         } catch (RetriableException e) {
@@ -358,8 +360,7 @@ class WorkerSinkTask extends WorkerTask {
             // If we're retrying a previous batch, make sure we've paused all topic partitions so we don't get new data,
             // but will still be able to poll in order to handle user-requested timeouts, keep group membership, etc.
             pausedForRedelivery = true;
-            for (TopicPartition tp : consumer.assignment())
-                consumer.pause(tp);
+            consumer.pause(consumer.assignment());
             // Let this exit normally, the batch will be reprocessed on the next loop.
         } catch (Throwable t) {
             log.error("Task {} threw an uncaught and unrecoverable exception", id, t);
@@ -419,7 +420,7 @@ class WorkerSinkTask extends WorkerTask {
 
                 for (TopicPartition tp : partitions) {
                     if (!taskPaused.contains(tp))
-                        consumer.resume(tp);
+                        consumer.resume(singleton(tp));
                 }
 
                 Iterator<TopicPartition> tpIter = taskPaused.iterator();

http://git-wip-us.apache.org/repos/asf/kafka/blob/4332175c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
index 06f4838..c762bdd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
@@ -16,6 +16,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.errors.IllegalWorkerStateException;
 import org.apache.kafka.connect.sink.SinkTaskContext;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -85,7 +86,7 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
         try {
             for (TopicPartition partition : partitions)
                 pausedPartitions.add(partition);
-            consumer.pause(partitions);
+            consumer.pause(Arrays.asList(partitions));
         } catch (IllegalStateException e) {
             throw new IllegalWorkerStateException("SinkTasks may not pause partitions that are not currently assigned to them.", e);
         }
@@ -99,7 +100,7 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
         try {
             for (TopicPartition partition : partitions)
                 pausedPartitions.remove(partition);
-            consumer.resume(partitions);
+            consumer.resume(Arrays.asList(partitions));
         } catch (IllegalStateException e) {
             throw new IllegalWorkerStateException("SinkTasks may not resume partitions that are not currently assigned to them.", e);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4332175c/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 5ab60cd..290f8a2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -30,6 +29,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.errors.ConnectException;
@@ -46,6 +46,8 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.Future;
 
+import static java.util.Collections.singleton;
+
 /**
  * <p>
  *     KafkaBasedLog provides a generic implementation of a shared, compacted log of records stored in Kafka that all
@@ -267,7 +269,7 @@ public class KafkaBasedLog<K, V> {
         for (TopicPartition tp : assignment) {
             long offset = consumer.position(tp);
             offsets.put(tp, offset);
-            consumer.seekToEnd(tp);
+            consumer.seekToEnd(singleton(tp));
         }
 
         Map<TopicPartition, Long> endOffsets = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/4332175c/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index f419a7b..7bc83de 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -57,6 +57,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import static java.util.Collections.singleton;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -136,10 +137,9 @@ public class WorkerSinkTaskTest {
         sinkTask.put(EasyMock.capture(records));
         EasyMock.expectLastCall().andThrow(new RetriableException("retry"));
         // Pause
-        EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)));
-        consumer.pause(TOPIC_PARTITION);
-        PowerMock.expectLastCall();
-        consumer.pause(TOPIC_PARTITION2);
+        HashSet<TopicPartition> partitions = new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
+        EasyMock.expect(consumer.assignment()).andReturn(partitions);
+        consumer.pause(partitions);
         PowerMock.expectLastCall();
 
         // Retry delivery should succeed
@@ -147,10 +147,10 @@ public class WorkerSinkTaskTest {
         sinkTask.put(EasyMock.capture(records));
         EasyMock.expectLastCall();
         // And unpause
-        EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)));
-        consumer.resume(TOPIC_PARTITION);
+        EasyMock.expect(consumer.assignment()).andReturn(partitions);
+        consumer.resume(singleton(TOPIC_PARTITION));
         PowerMock.expectLastCall();
-        consumer.resume(TOPIC_PARTITION2);
+        consumer.resume(singleton(TOPIC_PARTITION2));
         PowerMock.expectLastCall();
 
         PowerMock.replayAll();

http://git-wip-us.apache.org/repos/asf/kafka/blob/4332175c/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 1099d7a..25f0bf4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -377,9 +377,9 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
                 return null;
             }
         });
-        consumer.pause(UNASSIGNED_TOPIC_PARTITION);
+        consumer.pause(Arrays.asList(UNASSIGNED_TOPIC_PARTITION));
         PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition"));
-        consumer.pause(TOPIC_PARTITION, TOPIC_PARTITION2);
+        consumer.pause(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
         PowerMock.expectLastCall();
 
         expectOnePoll().andAnswer(new IAnswer<Object>() {
@@ -396,9 +396,9 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
                 return null;
             }
         });
-        consumer.resume(UNASSIGNED_TOPIC_PARTITION);
+        consumer.resume(Arrays.asList(UNASSIGNED_TOPIC_PARTITION));
         PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition"));
-        consumer.resume(TOPIC_PARTITION, TOPIC_PARTITION2);
+        consumer.resume(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
         PowerMock.expectLastCall();
 
         expectStopTask(0);

http://git-wip-us.apache.org/repos/asf/kafka/blob/4332175c/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 4f9aca3..0386404 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -335,7 +335,7 @@ object ConsumerGroupCommand {
       val consumer = getConsumer()
       val topicPartition = new TopicPartition(topic, partition)
       consumer.assign(List(topicPartition).asJava)
-      consumer.seekToEnd(topicPartition)
+      consumer.seekToEnd(List(topicPartition).asJava)
       val logEndOffset = consumer.position(topicPartition)
       LogEndOffsetResult.LogEndOffset(logEndOffset)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4332175c/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index c7f9072..a38c04b 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -28,6 +28,7 @@ import org.apache.log4j.Logger
 import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.TopicPartition
 import kafka.utils.CommandLineUtils
 import java.util.{ Random, Properties }
 import kafka.consumer.Consumer
@@ -121,7 +122,7 @@ object ConsumerPerformance {
       }
       consumer.poll(100)
     }
-    consumer.seekToBeginning()
+    consumer.seekToBeginning(List[TopicPartition]())
 
     // Now start the benchmark
     val startMs = System.currentTimeMillis

http://git-wip-us.apache.org/repos/asf/kafka/blob/4332175c/core/src/main/scala/kafka/tools/EndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
index e670d82..584d4fb 100755
--- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala
+++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
@@ -22,6 +22,7 @@ import java.util.{Arrays, Properties}
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.TopicPartition
 
 import scala.collection.JavaConversions._
 
@@ -85,7 +86,7 @@ object EndToEndLatency {
 
     //Ensure we are at latest offset. seekToEnd evaluates lazily, that is to say actually performs the seek only when
     //a poll() or position() request is issued. Hence we need to poll after we seek to ensure we see our first write.
-    consumer.seekToEnd()
+    consumer.seekToEnd(List[TopicPartition]())
     consumer.poll(0)
 
     var totalTime = 0.0
@@ -143,4 +144,4 @@ object EndToEndLatency {
   def randomBytesOfLen(len: Int): Array[Byte] = {
     Array.fill(len)((scala.util.Random.nextInt(26) + 65).toByte)
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4332175c/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index f576be5..9939309 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -99,7 +99,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     val rebalanceListener = new ConsumerRebalanceListener {
       override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) = {
         // keep partitions paused in this test so that we can verify the commits based on specific seeks
-        partitions.asScala.foreach(consumer0.pause(_))
+        consumer0.pause(partitions)
       }
 
       override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) = {}
@@ -145,7 +145,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     assertEquals(3, this.consumers(0).committed(tp).offset)
     assertNull(this.consumers(0).committed(tp2))
 
-    // positions should not change
+    // Positions should not change
     assertEquals(pos1, this.consumers(0).position(tp))
     assertEquals(pos2, this.consumers(0).position(tp2))
     this.consumers(0).commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(5L))).asJava)
@@ -244,7 +244,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     sendRecords(5)
     consumer0.subscribe(List(topic).asJava)
     consumeAndVerifyRecords(consumer = consumer0, numRecords = 5, startingOffset = 0)
-    consumer0.pause(tp)
+    consumer0.pause(List(tp).asJava)
 
     // subscribe to a new topic to trigger a rebalance
     consumer0.subscribe(List("topic2").asJava)

http://git-wip-us.apache.org/repos/asf/kafka/blob/4332175c/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 029eaf4..8424340 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -104,7 +104,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
         assertEquals(consumer.position(tp), consumer.committed(tp).offset)
 
         if (consumer.position(tp) == numRecords) {
-          consumer.seekToBeginning()
+          consumer.seekToBeginning(List[TopicPartition]())
           consumed = 0
         }
       } catch {
@@ -140,7 +140,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
       val coin = TestUtils.random.nextInt(3)
       if (coin == 0) {
         info("Seeking to end of log")
-        consumer.seekToEnd()
+        consumer.seekToEnd(List[TopicPartition]())
         assertEquals(numRecords.toLong, consumer.position(tp))
       } else if (coin == 1) {
         val pos = TestUtils.random.nextInt(numRecords).toLong

http://git-wip-us.apache.org/repos/asf/kafka/blob/4332175c/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index ca0497b..9c56010 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -300,11 +300,11 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     sendRecords(totalRecords.toInt, tp)
     consumer.assign(List(tp).asJava)
 
-    consumer.seekToEnd(tp)
+    consumer.seekToEnd(List(tp).asJava)
     assertEquals(totalRecords, consumer.position(tp))
     assertFalse(consumer.poll(totalRecords).iterator().hasNext)
 
-    consumer.seekToBeginning(tp)
+    consumer.seekToBeginning(List(tp).asJava)
     assertEquals(0, consumer.position(tp), 0)
     consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0)
 
@@ -318,11 +318,11 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     sendCompressedMessages(totalRecords.toInt, tp2)
     consumer.assign(List(tp2).asJava)
 
-    consumer.seekToEnd(tp2)
+    consumer.seekToEnd(List(tp2).asJava)
     assertEquals(totalRecords, consumer.position(tp2))
     assertFalse(consumer.poll(totalRecords).iterator().hasNext)
 
-    consumer.seekToBeginning(tp2)
+    consumer.seekToBeginning(List(tp2).asJava)
     assertEquals(0, consumer.position(tp2), 0)
     consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0, tp = tp2)
 
@@ -375,13 +375,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
   @Test
   def testPartitionPauseAndResume() {
+    val partitions = List(tp).asJava
     sendRecords(5)
-    this.consumers(0).assign(List(tp).asJava)
+    this.consumers(0).assign(partitions)
     consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = 5, startingOffset = 0)
-    this.consumers(0).pause(tp)
+    this.consumers(0).pause(partitions)
     sendRecords(5)
     assertTrue(this.consumers(0).poll(0).isEmpty)
-    this.consumers(0).resume(tp)
+    this.consumers(0).resume(partitions)
     consumeAndVerifyRecords(consumer = this.consumers(0), numRecords = 5, startingOffset = 5)
   }
 
@@ -632,7 +633,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val rebalanceListener = new ConsumerRebalanceListener {
       override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) = {
         // keep partitions paused in this test so that we can verify the commits based on specific seeks
-        partitions.asScala.foreach(testConsumer.pause(_))
+        testConsumer.pause(partitions)
       }
 
       override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) = {}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4332175c/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index df8516c..665d39f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -43,6 +43,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static java.util.Collections.singleton;
+
 public class ProcessorStateManager {
 
     private static final Logger log = LoggerFactory.getLogger(ProcessorStateManager.class);
@@ -204,7 +206,7 @@ public class ProcessorStateManager {
         try {
             // calculate the end offset of the partition
             // TODO: this is a bit hacky to first seek then position to get the end offset
-            restoreConsumer.seekToEnd(storePartition);
+            restoreConsumer.seekToEnd(singleton(storePartition));
             long endOffset = restoreConsumer.position(storePartition);
 
             // restore from the checkpointed offset of the change log if it is persistent and the offset exists;
@@ -212,7 +214,7 @@ public class ProcessorStateManager {
             if (checkpointedOffsets.containsKey(storePartition)) {
                 restoreConsumer.seek(storePartition, checkpointedOffsets.get(storePartition));
             } else {
-                restoreConsumer.seekToBeginning(storePartition);
+                restoreConsumer.seekToBeginning(singleton(storePartition));
             }
 
             // restore its state from changelog records

http://git-wip-us.apache.org/repos/asf/kafka/blob/4332175c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 54a25c1..c4cc2ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -34,6 +34,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static java.util.Collections.singleton;
+
 /**
  * A StreamTask is associated with a {@link PartitionGroup}, and is assigned to a StreamThread for processing.
  */
@@ -136,7 +138,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
         // if after adding these records, its partition queue's buffered size has been
         // increased beyond the threshold, we can then pause the consumption for this partition
         if (queueSize > this.maxBufferedSize) {
-            consumer.pause(partition);
+            consumer.pause(singleton(partition));
         }
     }
 
@@ -178,7 +180,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
                 // after processing this record, if its partition queue's buffered size has been
                 // decreased to the threshold, we can then resume the consumption on this partition
                 if (partitionGroup.numBuffered(partition) == this.maxBufferedSize) {
-                    consumer.resume(partition);
+                    consumer.resume(singleton(partition));
                     requiresPoll = true;
                 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4332175c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e1a518d..7d6b98f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -66,6 +66,8 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static java.util.Collections.singleton;
+
 public class StreamThread extends Thread {
 
     private static final Logger log = LoggerFactory.getLogger(StreamThread.class);
@@ -382,7 +384,7 @@ public class StreamThread extends Thread {
                             if (remaining != null) {
                                 remainingStandbyRecords.put(partition, remaining);
                             } else {
-                                restoreConsumer.resume(partition);
+                                restoreConsumer.resume(singleton(partition));
                             }
                         }
                     }
@@ -405,7 +407,7 @@ public class StreamThread extends Thread {
 
                     List<ConsumerRecord<byte[], byte[]>> remaining = task.update(partition, records.records(partition));
                     if (remaining != null) {
-                        restoreConsumer.pause(partition);
+                        restoreConsumer.pause(singleton(partition));
                         standbyRecords.put(partition, remaining);
                     }
                 }
@@ -690,7 +692,7 @@ public class StreamThread extends Thread {
             if (offset >= 0) {
                 restoreConsumer.seek(partition, offset);
             } else {
-                restoreConsumer.seekToBeginning(partition);
+                restoreConsumer.seekToBeginning(singleton(partition));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4332175c/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 1d0a969..84b59e6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -38,9 +38,9 @@ import java.io.IOException;
 import java.nio.channels.FileLock;
 import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -94,7 +94,7 @@ public class ProcessorStateManagerTest {
         }
 
         @Override
-        public synchronized void assign(List<TopicPartition> partitions) {
+        public synchronized void assign(Collection<TopicPartition> partitions) {
             int numPartitions = partitions.size();
             if (numPartitions > 1)
                 throw new IllegalArgumentException("RestoreConsumer: more than one partition specified");
@@ -102,7 +102,7 @@ public class ProcessorStateManagerTest {
             if (numPartitions == 1) {
                 if (assignedPartition != null)
                     throw new IllegalStateException("RestoreConsumer: partition already assigned");
-                assignedPartition = partitions.get(0);
+                assignedPartition = partitions.iterator().next();
 
                 // set the beginning offset to 0
                 // NOTE: this is users responsible to set the initial lEO.
@@ -154,8 +154,8 @@ public class ProcessorStateManagerTest {
         }
 
         @Override
-        public synchronized void seekToBeginning(TopicPartition... partitions) {
-            if (partitions.length != 1)
+        public synchronized void seekToBeginning(Collection<TopicPartition> partitions) {
+            if (partitions.size() != 1)
                 throw new IllegalStateException("RestoreConsumer: other than one partition specified");
 
             for (TopicPartition partition : partitions) {
@@ -168,8 +168,8 @@ public class ProcessorStateManagerTest {
         }
 
         @Override
-        public synchronized void seekToEnd(TopicPartition... partitions) {
-            if (partitions.length != 1)
+        public synchronized void seekToEnd(Collection<TopicPartition> partitions) {
+            if (partitions.size() != 1)
                 throw new IllegalStateException("RestoreConsumer: other than one partition specified");
 
             for (TopicPartition partition : partitions) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4332175c/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index ea24441..e7fb9a4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -49,6 +49,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import static java.util.Collections.singleton;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
@@ -181,7 +182,7 @@ public class StandbyTaskTest {
                 if (offset >= 0) {
                     restoreStateConsumer.seek(partition, offset);
                 } else {
-                    restoreStateConsumer.seekToBeginning(partition);
+                    restoreStateConsumer.seekToBeginning(singleton(partition));
                 }
             }
 
@@ -246,7 +247,7 @@ public class StandbyTaskTest {
                 if (offset >= 0) {
                     restoreStateConsumer.seek(partition, offset);
                 } else {
-                    restoreStateConsumer.seekToBeginning(partition);
+                    restoreStateConsumer.seekToBeginning(singleton(partition));
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4332175c/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
index 1abf88d..d7b0139 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
@@ -200,7 +200,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
         KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
         List<TopicPartition> partitions = getAllPartitions(consumer, "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg");
         consumer.assign(partitions);
-        consumer.seekToBeginning(partitions.toArray(new TopicPartition[partitions.size()]));
+        consumer.seekToBeginning(partitions);
 
         final int recordsGenerated = allData.size() * maxRecordsPerKey;
         int recordsProcessed = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/4332175c/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index a2948a2..4ddbc2a 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -40,6 +40,7 @@ import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -317,12 +318,12 @@ public class ProcessorTopologyTestDriver {
     protected MockConsumer<byte[], byte[]> createRestoreConsumer(TaskId id, String... storeNames) {
         MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) {
             @Override
-            public synchronized void seekToEnd(TopicPartition... partitions) {
+            public synchronized void seekToEnd(Collection<TopicPartition> partitions) {
                 // do nothing ...
             }
 
             @Override
-            public synchronized void seekToBeginning(TopicPartition... partitions) {
+            public synchronized void seekToBeginning(Collection<TopicPartition> partitions) {
                 // do nothing ...
             }
 


Mime
View raw message