kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2683: ensure wakeup exceptions raised to user
Date Wed, 28 Oct 2015 00:34:05 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 13c3e049f -> 1ac2640f8


KAFKA-2683: ensure wakeup exceptions raised to user

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ewen Cheslack-Postava, Guozhang Wang

Closes #366 from hachikuji/KAFKA-2683


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1ac2640f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1ac2640f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1ac2640f

Branch: refs/heads/trunk
Commit: 1ac2640f8095262f423c770060b737f81652e211
Parents: 13c3e04
Author: Jason Gustafson <jason@confluent.io>
Authored: Tue Oct 27 17:39:19 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Oct 27 17:39:19 2015 -0700

----------------------------------------------------------------------
 .../consumer/ConsumerWakeupException.java       | 20 ---------------
 .../kafka/clients/consumer/KafkaConsumer.java   | 26 ++++++++++----------
 .../kafka/clients/consumer/MockConsumer.java    |  3 ++-
 .../consumer/internals/AbstractCoordinator.java | 26 +++++++++++---------
 .../consumer/internals/ConsumerCoordinator.java | 10 +++++---
 .../internals/ConsumerNetworkClient.java        | 10 ++++----
 .../kafka/common/errors/WakeupException.java    | 26 ++++++++++++++++++++
 .../internals/ConsumerNetworkClientTest.java    |  4 +--
 .../kafka/copycat/runtime/WorkerSinkTask.java   |  3 ++-
 .../runtime/distributed/DistributedHerder.java  |  6 ++---
 .../runtime/distributed/WorkerGroupMember.java  |  2 +-
 .../kafka/copycat/util/KafkaBasedLog.java       | 10 ++++----
 12 files changed, 81 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
deleted file mode 100644
index 35f1ec9..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.consumer;
-
-import org.apache.kafka.common.KafkaException;
-
-public class ConsumerWakeupException extends KafkaException {
-    private static final long serialVersionUID = 1L;
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 06a9239..7aef8a3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -313,9 +313,9 @@ import java.util.regex.Pattern;
  *
  * <p>
  * The only exception to this rule is {@link #wakeup()}, which can safely be used from an
external thread to
- * interrupt an active operation. In this case, a {@link ConsumerWakeupException} will be
thrown from the thread
- * blocking on the operation. This can be used to shutdown the consumer from another thread.
The following
- * snippet shows the typical pattern:
+ * interrupt an active operation. In this case, a {@link org.apache.kafka.common.errors.WakeupException}
will be
+ * thrown from the thread blocking on the operation. This can be used to shutdown the consumer
from another thread.
+ * The following snippet shows the typical pattern:
  *
  * <pre>
  * public class KafkaConsumerRunner implements Runnable {
@@ -329,7 +329,7 @@ import java.util.regex.Pattern;
  *                 ConsumerRecords records = consumer.poll(10000);
  *                 // Handle new records
  *             }
- *         } catch (ConsumerWakeupException e) {
+ *         } catch (WakeupException e) {
  *             // Ignore exception if closing
  *             if (!closed.get()) throw e;
  *         } finally {
@@ -778,7 +778,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      *             offset reset policy has been configured.
      * @throws org.apache.kafka.common.errors.OffsetOutOfRangeException If there is OffsetOutOfRange
error in fetchResponse and
      *         the defaultResetPolicy is NONE
-     * @throws org.apache.kafka.clients.consumer.ConsumerWakeupException if {@link #wakeup()}
is called before or while this function is called
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called
before or while this function is called
      */
     @Override
     public ConsumerRecords<K, V> poll(long timeout) {
@@ -818,7 +818,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * @return The fetched records (may be empty)
      * @throws org.apache.kafka.common.errors.OffsetOutOfRangeException If there is OffsetOutOfRange
error in fetchResponse and
      *         the defaultResetPolicy is NONE
-     * @throws org.apache.kafka.clients.consumer.ConsumerWakeupException if {@link #wakeup()}
is called before or while this function is called
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called
before or while this function is called
      */
     private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long
timeout) {
         // TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
@@ -858,7 +858,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * This is a synchronous commits and will block until either the commit succeeds or an
unrecoverable error is
      * encountered (in which case it is thrown to the caller).
      *
-     * @throws org.apache.kafka.clients.consumer.ConsumerWakeupException if {@link #wakeup()}
is called before or while this function is called
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called
before or while this function is called
      */
     @Override
     public void commitSync() {
@@ -881,7 +881,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * encountered (in which case it is thrown to the caller).
      *
      * @param offsets A map of offsets by partition with associated metadata
-     * @throws org.apache.kafka.clients.consumer.ConsumerWakeupException if {@link #wakeup()}
is called before or while this function is called
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called
before or while this function is called
      */
     @Override
     public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
@@ -1006,7 +1006,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * @return The offset
      * @throws NoOffsetForPartitionException If a position hasn't been set for a given partition,
and no reset policy is
      *             available.
-     * @throws org.apache.kafka.clients.consumer.ConsumerWakeupException if {@link #wakeup()}
is called before or while this function is called
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called
before or while this function is called
      */
     public long position(TopicPartition partition) {
         acquire();
@@ -1033,7 +1033,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      *
      * @param partition The partition to check
      * @return The last committed offset and metadata or null if there was no prior commit
-     * @throws org.apache.kafka.clients.consumer.ConsumerWakeupException if {@link #wakeup()}
is called before or while this function is called
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called
before or while this function is called
      */
     @Override
     public OffsetAndMetadata committed(TopicPartition partition) {
@@ -1071,7 +1071,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      *
      * @param topic The topic to get partition metadata for
      * @return The list of partitions
-     * @throws org.apache.kafka.clients.consumer.ConsumerWakeupException if {@link #wakeup()}
is called before or while this function is called
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called
before or while this function is called
      */
     @Override
     public List<PartitionInfo> partitionsFor(String topic) {
@@ -1094,7 +1094,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      * server.
      *
      * @return The map of topics and its partitions
-     * @throws org.apache.kafka.clients.consumer.ConsumerWakeupException if {@link #wakeup()}
is called before or while this function is called
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called
before or while this function is called
      */
     @Override
     public Map<String, List<PartitionInfo>> listTopics() {
@@ -1158,7 +1158,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
 
     /**
      * Wakeup the consumer. This method is thread-safe and is useful in particular to abort
a long poll.
-     * The thread which is blocking in an operation will throw {@link ConsumerWakeupException}.
+     * The thread which is blocking in an operation will throw {@link WakeupException}.
      */
     @Override
     public void wakeup() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index ed1c1e2..25c0c2c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -19,6 +19,7 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -135,7 +136,7 @@ public class MockConsumer<K, V> implements Consumer<K, V>
{
 
         if (wakeup.get()) {
             wakeup.set(false);
-            throw new ConsumerWakeupException();
+            throw new WakeupException();
         }
 
         if (exception != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 549c8de..8d5ee16 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -30,7 +30,6 @@ import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.requests.GroupMetadataRequest;
 import org.apache.kafka.common.requests.GroupMetadataResponse;
 import org.apache.kafka.common.requests.HeartbeatRequest;
@@ -181,8 +180,12 @@ public abstract class AbstractCoordinator {
             RequestFuture<Void> future = sendGroupMetadataRequest();
             client.poll(future, requestTimeoutMs);
 
-            if (future.failed())
-                client.awaitMetadataUpdate();
+            if (future.failed()) {
+                if (future.isRetriable())
+                    client.awaitMetadataUpdate();
+                else
+                    throw future.exception();
+            }
         }
     }
 
@@ -417,12 +420,8 @@ public abstract class AbstractCoordinator {
                            RequestFuture<ByteBuffer> future) {
             short errorCode = syncResponse.errorCode();
             if (errorCode == Errors.NONE.code()) {
-                try {
-                    future.complete(syncResponse.memberAssignment());
-                    sensors.syncLatency.record(response.requestLatencyMs());
-                } catch (SchemaException e) {
-                    future.raise(e);
-                }
+                future.complete(syncResponse.memberAssignment());
+                sensors.syncLatency.record(response.requestLatencyMs());
             } else {
                 AbstractCoordinator.this.rejoinNeeded = true;
                 future.raise(Errors.forCode(errorCode));
@@ -588,8 +587,13 @@ public abstract class AbstractCoordinator {
                 return;
             }
 
-            R response = parse(clientResponse);
-            handle(response, future);
+            try {
+                R response = parse(clientResponse);
+                handle(response, future);
+            } catch (RuntimeException e) {
+                if (!future.isDone())
+                    future.raise(e);
+            }
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 20d1564..641939a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -15,7 +15,7 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerWakeupException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment;
@@ -180,6 +180,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements
Cl
         try {
             Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
             listener.onPartitionsAssigned(assigned);
+        } catch (WakeupException e) {
+            throw e;
         } catch (Exception e) {
             log.error("User provided listener " + listener.getClass().getName()
                     + " failed on partition assignment: ", e);
@@ -234,6 +236,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements
Cl
         try {
             Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
             listener.onPartitionsRevoked(revoked);
+        } catch (WakeupException e) {
+            throw e;
         } catch (Exception e) {
             log.error("User provided listener " + listener.getClass().getName()
                     + " failed on partition revocation: ", e);
@@ -302,7 +306,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements
Cl
             try {
                 maybeAutoCommitOffsetsSync();
                 return;
-            } catch (ConsumerWakeupException e) {
+            } catch (WakeupException e) {
                 // ignore wakeups while closing to ensure we have a chance to commit
                 continue;
             }
@@ -368,7 +372,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements
Cl
         if (autoCommitEnabled) {
             try {
                 commitOffsetsSync(subscriptions.allConsumed());
-            } catch (ConsumerWakeupException e) {
+            } catch (WakeupException e) {
                 // rethrow wakeups since they are triggered by the user
                 throw e;
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index e3a2514..4757fc4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -17,7 +17,7 @@ import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.RequestCompletionHandler;
-import org.apache.kafka.clients.consumer.ConsumerWakeupException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.requests.AbstractRequest;
@@ -147,7 +147,7 @@ public class ConsumerNetworkClient implements Closeable {
     /**
      * Block indefinitely until the given request future has finished.
      * @param future The request future to await.
-     * @throws ConsumerWakeupException if {@link #wakeup()} is called from another thread
+     * @throws WakeupException if {@link #wakeup()} is called from another thread
      */
     public void poll(RequestFuture<?> future) {
         while (!future.isDone())
@@ -159,7 +159,7 @@ public class ConsumerNetworkClient implements Closeable {
      * @param future The request future to wait for
      * @param timeout The maximum duration (in ms) to wait for the request
      * @return true if the future is done, false otherwise
-     * @throws ConsumerWakeupException if {@link #wakeup()} is called from another thread
+     * @throws WakeupException if {@link #wakeup()} is called from another thread
      */
     public boolean poll(RequestFuture<?> future, long timeout) {
         long now = time.milliseconds();
@@ -175,7 +175,7 @@ public class ConsumerNetworkClient implements Closeable {
      * Poll for any network IO. All send requests will either be transmitted on the network
      * or failed when this call completes.
      * @param timeout The maximum time to wait for an IO event.
-     * @throws ConsumerWakeupException if {@link #wakeup()} is called from another thread
+     * @throws WakeupException if {@link #wakeup()} is called from another thread
      */
     public void poll(long timeout) {
         poll(timeout, time.milliseconds());
@@ -298,7 +298,7 @@ public class ConsumerNetworkClient implements Closeable {
         if (wakeup.get()) {
             failUnsentRequests();
             wakeup.set(false);
-            throw new ConsumerWakeupException();
+            throw new WakeupException();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/main/java/org/apache/kafka/common/errors/WakeupException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/WakeupException.java b/clients/src/main/java/org/apache/kafka/common/errors/WakeupException.java
new file mode 100644
index 0000000..a2e718d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/WakeupException.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ * Exception used to indicate preemption of a blocking operation by an external thread.
+ * For example, {@link org.apache.kafka.clients.consumer.KafkaConsumer#wakeup}
+ * can be used to break out of an active {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)},
+ * which would raise an instance of this exception.
+ */
+public class WakeupException extends KafkaException {
+    private static final long serialVersionUID = 1L;
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 6a42058..1692010 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -15,7 +15,7 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
-import org.apache.kafka.clients.consumer.ConsumerWakeupException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -96,7 +96,7 @@ public class ConsumerNetworkClientTest {
         try {
             consumerClient.poll(0);
             fail();
-        } catch (ConsumerWakeupException e) {
+        } catch (WakeupException e) {
         }
 
         client.respond(heartbeatResponse(Errors.NONE.code()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
index edb415a..3c5cd13 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
@@ -19,6 +19,7 @@ package org.apache.kafka.copycat.runtime;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.utils.Utils;
@@ -112,7 +113,7 @@ class WorkerSinkTask implements WorkerTask {
             ConsumerRecords<byte[], byte[]> msgs = consumer.poll(timeoutMs);
             log.trace("{} polling returned {} messages", id, msgs.count());
             deliverMessages(msgs);
-        } catch (ConsumerWakeupException we) {
+        } catch (WakeupException we) {
             log.trace("{} consumer woken up", id);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
index bf3229d..17bf7b7 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.copycat.runtime.distributed;
 
-import org.apache.kafka.clients.consumer.ConsumerWakeupException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.copycat.connector.ConnectorContext;
@@ -159,7 +159,7 @@ public class DistributedHerder implements Herder, Runnable {
             member.ensureActive();
             // Ensure we're in a good state in our group. If not restart and everything should
be setup to rejoin
             if (!handleRebalanceCompleted()) return;
-        } catch (ConsumerWakeupException e) {
+        } catch (WakeupException e) {
             // May be due to a request from another thread, or might be stopping. If the
latter, we need to check the
             // flag immediately. If the former, we need to re-run the ensureActive call since
we can't handle requests
             // unless we're in the group.
@@ -217,7 +217,7 @@ public class DistributedHerder implements Herder, Runnable {
             member.poll(Long.MAX_VALUE);
             // Ensure we're in a good state in our group. If not restart and everything should
be setup to rejoin
             if (!handleRebalanceCompleted()) return;
-        } catch (ConsumerWakeupException e) { // FIXME should not be ConsumerWakeupException
+        } catch (WakeupException e) { // FIXME should not be WakeupException
             // Ignore. Just indicates we need to check the exit flag, for requested actions,
etc.
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
index f8cabaa..03960cf 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
@@ -147,7 +147,7 @@ public class WorkerGroupMember {
     }
 
     /**
-     * Interrupt any running poll() calls, causing a ConsumerWakeupException to be thrown
in the thread invoking that method.
+     * Interrupt any running poll() calls, causing a WakeupException to be thrown in the
thread invoking that method.
      */
     public void wakeup() {
         this.client.wakeup();

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ac2640f/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
index 5e860d9..f5e72d3 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
@@ -21,7 +21,7 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.ConsumerWakeupException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -232,7 +232,7 @@ public class KafkaBasedLog<K, V> {
             ConsumerRecords<K, V> records = consumer.poll(timeoutMs);
             for (ConsumerRecord<K, V> record : records)
                 consumedCallback.onCompletion(null, record);
-        } catch (ConsumerWakeupException e) {
+        } catch (WakeupException e) {
             // Expected on get() or stop(). The calling code should handle this
             throw e;
         } catch (KafkaException e) {
@@ -257,7 +257,7 @@ public class KafkaBasedLog<K, V> {
         try {
             poll(0);
         } finally {
-            // If there is an exception, even a possibly expected one like ConsumerWakeupException,
we need to make sure
+            // If there is an exception, even a possibly expected one like WakeupException,
we need to make sure
             // the consumers position is reset or it'll get into an inconsistent state.
             for (TopicPartition tp : assignment) {
                 long startOffset = offsets.get(tp);
@@ -300,7 +300,7 @@ public class KafkaBasedLog<K, V> {
                     if (numCallbacks > 0) {
                         try {
                             readToLogEnd();
-                        } catch (ConsumerWakeupException e) {
+                        } catch (WakeupException e) {
                             // Either received another get() call and need to retry reading
to end of log or stop() was
                             // called. Both are handled by restarting this loop.
                             continue;
@@ -318,7 +318,7 @@ public class KafkaBasedLog<K, V> {
 
                     try {
                         poll(Integer.MAX_VALUE);
-                    } catch (ConsumerWakeupException e) {
+                    } catch (WakeupException e) {
                         // See previous comment, both possible causes of this wakeup are
handled by starting this loop again
                         continue;
                     }


Mime
View raw message