kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2752: Add VerifiableSource/Sink connectors and rolling bounce Copycat system tests.
Date Tue, 10 Nov 2015 22:48:14 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 64a1bfeb9 -> 8db55618d


KAFKA-2752: Add VerifiableSource/Sink connectors and rolling bounce Copycat system tests.

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Ben Stopford, Geoff Anderson, Guozhang Wang

Closes #432 from ewencp/kafka-2752-copycat-clean-bounce-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8db55618
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8db55618
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8db55618

Branch: refs/heads/trunk
Commit: 8db55618d5d5d5de97feab2bf8da4dc45387a76a
Parents: 64a1bfe
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Tue Nov 10 14:54:15 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Nov 10 14:54:15 2015 -0800

----------------------------------------------------------------------
 bin/kafka-run-class.sh                          |   2 +-
 build.gradle                                    |  65 +++++++-
 checkstyle/import-control.xml                   |   4 +
 .../kafka/common/utils/ThroughputThrottler.java | 141 +++++++++++++++++
 .../kafka/connect/source/SourceRecord.java      |   5 +
 .../kafka/connect/runtime/WorkerSinkTask.java   |   7 +
 .../connect/runtime/WorkerSinkTaskThread.java   |   6 +-
 .../kafka/connect/runtime/WorkerSourceTask.java |   4 +-
 .../kafka/connect/runtime/rest/RestServer.java  |   4 +
 .../rest/resources/ConnectorsResource.java      |   8 +-
 .../kafka/connect/util/KafkaBasedLog.java       |   6 +
 .../connect/tools/VerifiableSinkConnector.java  |  64 ++++++++
 .../kafka/connect/tools/VerifiableSinkTask.java | 110 +++++++++++++
 .../tools/VerifiableSourceConnector.java        |  64 ++++++++
 .../connect/tools/VerifiableSourceTask.java     | 128 +++++++++++++++
 settings.gradle                                 |   2 +-
 tests/kafkatest/services/connect.py             | 154 +++++++++++++++----
 tests/kafkatest/services/console_consumer.py    |   9 +-
 tests/kafkatest/services/kafka/kafka.py         |   7 +-
 .../services/kafka/templates/log4j.properties   |  87 +++++++++++
 .../services/templates/connect_log4j.properties |  30 ++++
 .../kafkatest/tests/connect_distributed_test.py | 133 ++++++++++++++--
 .../apache/kafka/tools/ProducerPerformance.java |   1 +
 .../apache/kafka/tools/ThroughputThrottler.java | 117 --------------
 .../apache/kafka/tools/VerifiableProducer.java  |   1 +
 25 files changed, 984 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/bin/kafka-run-class.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index b18a9cf..9962e37 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -72,7 +72,7 @@ do
   CLASSPATH=$CLASSPATH:$dir/*
 done
 
-for cc_pkg in "api" "runtime" "file" "json"
+for cc_pkg in "api" "runtime" "file" "json" "tools"
 do
   for file in $base_dir/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;
   do

http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 7f21a00..70fdbcd 100644
--- a/build.gradle
+++ b/build.gradle
@@ -230,7 +230,7 @@ for ( sv in ['2_10_5', '2_11_7'] ) {
   }
 }
 
-def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file']
+def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file', 'connect:tools']
 def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams'] + connectPkgs
 
 tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" }) {}
@@ -350,6 +350,8 @@ project(':core') {
     from(project(':connect:json').configurations.runtime) { into("libs/") }
     from(project(':connect:file').jar) { into("libs/") }
     from(project(':connect:file').configurations.runtime) { into("libs/") }
+    from(project(':connect:tools').jar) { into("libs/") }
+    from(project(':connect:tools').configurations.runtime) { into("libs/") }
   }
 
   jar {
@@ -887,3 +889,64 @@ project(':connect:file') {
   }
   test.dependsOn('checkstyleMain', 'checkstyleTest')
 }
+
+project(':connect:tools') {
+  apply plugin: 'checkstyle'
+  archivesBaseName = "connect-tools"
+
+  dependencies {
+    compile project(':connect:api')
+    compile "$slf4japi"
+    compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version"
+
+    testCompile "$junit"
+    testCompile "$easymock"
+    testCompile "$powermock"
+    testCompile "$powermock_easymock"
+    testRuntime "$slf4jlog4j"
+  }
+
+  task testJar(type: Jar) {
+    classifier = 'test'
+    from sourceSets.test.output
+  }
+
+  test {
+    testLogging {
+      events "passed", "skipped", "failed"
+      exceptionFormat = 'full'
+    }
+  }
+
+  javadoc {
+    include "**/org/apache/kafka/connect/*"
+  }
+
+  tasks.create(name: "copyDependantLibs", type: Copy) {
+    from (configurations.testRuntime) {
+      include('slf4j-log4j12*')
+    }
+    from (configurations.runtime) {
+      exclude('kafka-clients*')
+      exclude('connect-*')
+    }
+    into "$buildDir/dependant-libs"
+  }
+
+  jar {
+    dependsOn copyDependantLibs
+  }
+
+  artifacts {
+    archives testJar
+  }
+
+  configurations {
+    archives.extendsFrom(testCompile)
+  }
+
+  checkstyle {
+    configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+  }
+  test.dependsOn('checkstyleMain', 'checkstyleTest')
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 908fd35..16a3700 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -205,6 +205,10 @@
       <allow pkg="org.powermock" />
     </subpackage>
 
+    <subpackage name="tools">
+      <allow pkg="org.apache.kafka.connect" />
+      <allow pkg="com.fasterxml.jackson" />
+    </subpackage>
   </subpackage>
 
 </import-control>

http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java b/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java
new file mode 100644
index 0000000..1c63ffb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ThroughputThrottler.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+
+/**
+ * This class helps producers throttle throughput.
+ *
+ * If targetThroughput >= 0, the resulting average throughput will be approximately
+ * min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0,
+ * no throttling will occur.
+ *
+ * To use, do this between successive send attempts:
+ * <pre>
+ *     {@code
+ *      if (throttler.shouldThrottle(...)) {
+ *          throttler.throttle();
+ *      }
+ *     }
+ * </pre>
+ *
+ * Note that this can be used to throttle message throughput or data throughput.
+ */
+public class ThroughputThrottler {
+
+    private static final long NS_PER_MS = 1000000L;
+    private static final long NS_PER_SEC = 1000 * NS_PER_MS;
+    private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
+
+    long sleepTimeNs;
+    long sleepDeficitNs = 0;
+    long targetThroughput = -1;
+    long startMs;
+    private boolean wakeup = false;
+
+    /**
+     * @param targetThroughput Can be messages/sec or bytes/sec
+     * @param startMs          When the very first message is sent
+     */
+    public ThroughputThrottler(long targetThroughput, long startMs) {
+        this.startMs = startMs;
+        this.targetThroughput = targetThroughput;
+        this.sleepTimeNs = targetThroughput > 0 ?
+                           NS_PER_SEC / targetThroughput :
+                           Long.MAX_VALUE;
+    }
+
+    /**
+     * @param amountSoFar bytes produced so far if you want to throttle data throughput, or
+     *                    messages produced so far if you want to throttle message throughput.
+     * @param sendStartMs timestamp of the most recently sent message
+     * @return
+     */
+    public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
+        if (this.targetThroughput < 0) {
+            // No throttling in this case
+            return false;
+        }
+
+        float elapsedMs = (sendStartMs - startMs) / 1000.f;
+        return elapsedMs > 0 && (amountSoFar / elapsedMs) > this.targetThroughput;
+    }
+
+    /**
+     * Occasionally blocks for small amounts of time to achieve targetThroughput.
+     *
+     * Note that if targetThroughput is 0, this will block extremely aggressively.
+     */
+    public void throttle() {
+        if (targetThroughput == 0) {
+            try {
+                synchronized (this) {
+                    while (!wakeup) {
+                        this.wait();
+                    }
+                }
+            } catch (InterruptedException e) {
+                // do nothing
+            }
+            return;
+        }
+
+        // throttle throughput by sleeping, on average,
+        // (1 / this.throughput) seconds between "things sent"
+        sleepDeficitNs += sleepTimeNs;
+
+        // If enough sleep deficit has accumulated, sleep a little
+        if (sleepDeficitNs >= MIN_SLEEP_NS) {
+            long sleepStartNs = System.nanoTime();
+            long currentTimeNs = sleepStartNs;
+            try {
+                synchronized (this) {
+                    long elapsed = currentTimeNs - sleepStartNs;
+                    long remaining = sleepDeficitNs - elapsed;
+                    while (!wakeup && remaining > 0) {
+                        long sleepMs = remaining / 1000000;
+                        long sleepNs = remaining - sleepMs * 1000000;
+                        this.wait(sleepMs, (int) sleepNs);
+                        elapsed = System.nanoTime() - sleepStartNs;
+                        remaining = sleepDeficitNs - elapsed;
+                    }
+                    wakeup = false;
+                }
+                sleepDeficitNs = 0;
+            } catch (InterruptedException e) {
+                // If sleep is cut short, reduce deficit by the amount of
+                // time we actually spent sleeping
+                long sleepElapsedNs = System.nanoTime() - sleepStartNs;
+                if (sleepElapsedNs <= sleepDeficitNs) {
+                    sleepDeficitNs -= sleepElapsedNs;
+                }
+            }
+        }
+    }
+
+    /**
+     * Wakeup the throttler if its sleeping.
+     */
+    public void wakeup() {
+        synchronized (this) {
+            wakeup = true;
+            this.notifyAll();
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
index 1890062..b2b29bf 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
@@ -57,6 +57,11 @@ public class SourceRecord extends ConnectRecord {
     }
 
     public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
+                        String topic, Schema keySchema, Object key, Schema valueSchema, Object value) {
+        this(sourcePartition, sourceOffset, topic, null, keySchema, key, valueSchema, value);
+    }
+
+    public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
                         String topic, Integer partition,
                         Schema keySchema, Object key, Schema valueSchema, Object value) {
         super(topic, partition, keySchema, key, valueSchema, value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index e0a3e04..686e564 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -230,6 +230,13 @@ class WorkerSinkTask implements WorkerTask {
         return workerConfig;
     }
 
+    @Override
+    public String toString() {
+        return "WorkerSinkTask{" +
+                "id=" + id +
+                '}';
+    }
+
     private KafkaConsumer<byte[], byte[]> createConsumer() {
         // Include any unknown worker configs so consumer configs can be set globally on the worker
         // and through to the task

http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java
index e776f08..b65efa8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThread.java
@@ -80,7 +80,7 @@ class WorkerSinkTaskThread extends ShutdownableThread {
         long commitTimeout = commitStarted + task.workerConfig().getLong(
                 WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
         if (committing && now >= commitTimeout) {
-            log.warn("Commit of {} offsets timed out", this);
+            log.warn("Commit of {} offsets timed out", task);
             commitFailures++;
             committing = false;
         }
@@ -98,11 +98,11 @@ class WorkerSinkTaskThread extends ShutdownableThread {
                         seqno, commitSeqno);
             } else {
                 if (error != null) {
-                    log.error("Commit of {} offsets threw an unexpected exception: ", this, error);
+                    log.error("Commit of {} offsets threw an unexpected exception: ", task, error);
                     commitFailures++;
                 } else {
                     log.debug("Finished {} offset commit successfully in {} ms",
-                            this, task.time().milliseconds() - commitStarted);
+                            task, task.time().milliseconds() - commitStarted);
                     commitFailures = 0;
                 }
                 committing = false;

http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 6cf1dd7..5d0b7e7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -178,6 +178,8 @@ class WorkerSourceTask implements WorkerTask {
     public boolean commitOffsets() {
         long commitTimeoutMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
 
+        log.debug("{} Committing offsets", this);
+
         long started = time.milliseconds();
         long timeout = started + commitTimeoutMs;
 
@@ -259,7 +261,7 @@ class WorkerSourceTask implements WorkerTask {
         }
 
         finishSuccessfulFlush();
-        log.debug("Finished {} commitOffsets successfully in {} ms",
+        log.info("Finished {} commitOffsets successfully in {} ms",
                 this, time.milliseconds() - started);
         return true;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 96346ad..a544fb0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -133,6 +133,8 @@ public class RestServer {
     }
 
     public void stop() {
+        log.info("Stopping REST server");
+
         try {
             jettyServer.stop();
             jettyServer.join();
@@ -141,6 +143,8 @@ public class RestServer {
         } finally {
             jettyServer.destroy();
         }
+
+        log.info("REST server stopped");
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index cea4360..c95b723 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -27,6 +27,8 @@ import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
 import org.apache.kafka.connect.util.FutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.servlet.ServletContext;
 import javax.ws.rs.Consumes;
@@ -51,6 +53,8 @@ import java.util.concurrent.TimeoutException;
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
 public class ConnectorsResource {
+    private static final Logger log = LoggerFactory.getLogger(ConnectorsResource.class);
+
     // TODO: This should not be so long. However, due to potentially long rebalances that may have to wait a full
     // session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but
     // we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases,
@@ -159,7 +163,9 @@ public class ConnectorsResource {
         } catch (ExecutionException e) {
             if (e.getCause() instanceof NotLeaderException) {
                 NotLeaderException notLeaderError = (NotLeaderException) e.getCause();
-                return translator.translate(RestServer.httpRequest(RestServer.urlJoin(notLeaderError.leaderUrl(), path), method, body, resultType));
+                String forwardUrl = RestServer.urlJoin(notLeaderError.leaderUrl(), path);
+                log.debug("Forwarding request to leader: {} {} {}", forwardUrl, method, body);
+                return translator.translate(RestServer.httpRequest(forwardUrl, method, body, resultType));
             }
 
             throw e.getCause();

http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 3b37076..c82645c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -189,6 +189,7 @@ public class KafkaBasedLog<K, V> {
      * @param callback the callback to invoke once the end of the log has been reached.
      */
     public void readToEnd(Callback<Void> callback) {
+        log.trace("Starting read to end log for topic {}", topic);
         producer.flush();
         synchronized (this) {
             readLogEndOffsetCallbacks.add(callback);
@@ -286,6 +287,10 @@ public class KafkaBasedLog<K, V> {
 
 
     private class WorkThread extends Thread {
+        public WorkThread() {
+            super("KafkaBasedLog Work Thread - " + topic);
+        }
+
         @Override
         public void run() {
             try {
@@ -300,6 +305,7 @@ public class KafkaBasedLog<K, V> {
                     if (numCallbacks > 0) {
                         try {
                             readToLogEnd();
+                            log.trace("Finished read to end log for topic {}", topic);
                         } catch (WakeupException e) {
                             // Either received another get() call and need to retry reading to end of log or stop() was
                             // called. Both are handled by restarting this loop.

http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
----------------------------------------------------------------------
diff --git a/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
new file mode 100644
index 0000000..0ab64fd
--- /dev/null
+++ b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.tools;
+
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.source.SourceConnector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @see VerifiableSinkTask
+ */
+public class VerifiableSinkConnector extends SourceConnector {
+    private Map<String, String> config;
+
+    @Override
+    public String version() {
+        return AppInfoParser.getVersion();
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        this.config = props;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return VerifiableSinkTask.class;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        ArrayList<Map<String, String>> configs = new ArrayList<>();
+        for (Integer i = 0; i < maxTasks; i++) {
+            Map<String, String> props = new HashMap<>(config);
+            props.put(VerifiableSinkTask.ID_CONFIG, i.toString());
+            configs.add(props);
+        }
+        return configs;
+    }
+
+    @Override
+    public void stop() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java
new file mode 100644
index 0000000..16464a1
--- /dev/null
+++ b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Counterpart to {@link VerifiableSourceTask} that consumes records and logs information about each to stdout. This
+ * allows validation of processing of messages by sink tasks on distributed workers even in the face of worker restarts
+ * and failures. This task relies on the offset management provided by the Kafka Connect framework and therefore can detect
+ * bugs in its implementation.
+ */
+public class VerifiableSinkTask extends SinkTask {
+    public static final String NAME_CONFIG = "name";
+    public static final String ID_CONFIG = "id";
+
+    private static final ObjectMapper JSON_SERDE = new ObjectMapper();
+
+    private String name; // Connector name
+    private int id; // Task ID
+
+    private ArrayList<Map<String, Object>> unflushed = new ArrayList<>();
+
+    @Override
+    public String version() {
+        return new VerifiableSinkConnector().version();
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        try {
+            name = props.get(NAME_CONFIG);
+            id = Integer.parseInt(props.get(ID_CONFIG));
+        } catch (NumberFormatException e) {
+            throw new ConnectException("Invalid VerifiableSourceTask configuration", e);
+        }
+    }
+
+    @Override
+    public void put(Collection<SinkRecord> records) {
+        long nowMs = System.currentTimeMillis();
+        for (SinkRecord record : records) {
+            Map<String, Object> data = new HashMap<>();
+            data.put("name", name);
+            data.put("task", record.key()); // VerifiableSourceTask's input task (source partition)
+            data.put("sinkTask", id);
+            data.put("topic", record.topic());
+            data.put("time_ms", nowMs);
+            data.put("seqno", record.value());
+            data.put("offset", record.kafkaOffset());
+            String dataJson;
+            try {
+                dataJson = JSON_SERDE.writeValueAsString(data);
+            } catch (JsonProcessingException e) {
+                dataJson = "Bad data can't be written as json: " + e.getMessage();
+            }
+            System.out.println(dataJson);
+            unflushed.add(data);
+        }
+    }
+
+    @Override
+    public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        long nowMs = System.currentTimeMillis();
+        for (Map<String, Object> data : unflushed) {
+            data.put("time_ms", nowMs);
+            data.put("flushed", true);
+            String dataJson;
+            try {
+                dataJson = JSON_SERDE.writeValueAsString(data);
+            } catch (JsonProcessingException e) {
+                dataJson = "Bad data can't be written as json: " + e.getMessage();
+            }
+            System.out.println(dataJson);
+        }
+        unflushed.clear();
+    }
+
+    @Override
+    public void stop() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
----------------------------------------------------------------------
diff --git a/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
new file mode 100644
index 0000000..5f9afd5
--- /dev/null
+++ b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.tools;
+
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.source.SourceConnector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @see VerifiableSourceTask
+ */
+public class VerifiableSourceConnector extends SourceConnector {
+    private Map<String, String> config;
+
+    @Override
+    public String version() {
+        return AppInfoParser.getVersion();
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        this.config = props;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return VerifiableSourceTask.class;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        ArrayList<Map<String, String>> configs = new ArrayList<>();
+        for (Integer i = 0; i < maxTasks; i++) {
+            Map<String, String> props = new HashMap<>(config);
+            props.put(VerifiableSourceTask.ID_CONFIG, i.toString());
+            configs.add(props);
+        }
+        return configs;
+    }
+
+    @Override
+    public void stop() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
new file mode 100644
index 0000000..6fee2c4
--- /dev/null
+++ b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.utils.ThroughputThrottler;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A connector primarily intended for system tests. The connector simply generates as many tasks as requested. The
+ * tasks print metadata in the form of JSON to stdout for each message generated, making externally visible which
+ * messages have been sent. Each message is also assigned a unique, increasing seqno that is passed to Kafka Connect; when
+ * tasks are started on new nodes, this seqno is used to resume where the task previously left off, allowing for
+ * testing of distributed Kafka Connect.
+ *
+ * If logging is left enabled, log output on stdout can be easily ignored by checking whether a given line is valid JSON.
+ */
+public class VerifiableSourceTask extends SourceTask {
+    private static final Logger log = LoggerFactory.getLogger(VerifiableSourceTask.class);
+
+    public static final String NAME_CONFIG = "name";
+    public static final String ID_CONFIG = "id";
+    public static final String TOPIC_CONFIG = "topic";
+    public static final String THROUGHPUT_CONFIG = "throughput";
+
+    private static final String ID_FIELD = "id";
+    private static final String SEQNO_FIELD = "seqno";
+
+    private static final ObjectMapper JSON_SERDE = new ObjectMapper();
+
+    private String name; // Connector name
+    private int id; // Task ID
+    private String topic;
+    private Map<String, Integer> partition;
+    private long startingSeqno;
+    private long seqno;
+    private ThroughputThrottler throttler;
+
+    @Override
+    public String version() {
+        return new VerifiableSourceConnector().version();
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        final long throughput;
+        try {
+            name = props.get(NAME_CONFIG);
+            id = Integer.parseInt(props.get(ID_CONFIG));
+            topic = props.get(TOPIC_CONFIG);
+            throughput = Long.parseLong(props.get(THROUGHPUT_CONFIG));
+        } catch (NumberFormatException e) {
+            throw new ConnectException("Invalid VerifiableSourceTask configuration", e);
+        }
+
+        partition = Collections.singletonMap(ID_FIELD, id);
+        Map<String, Object> previousOffset = this.context.offsetStorageReader().offset(partition);
+        if (previousOffset != null)
+            seqno = (Long) previousOffset.get(SEQNO_FIELD) + 1;
+        else
+            seqno = 0;
+        startingSeqno = seqno;
+        throttler = new ThroughputThrottler(throughput, System.currentTimeMillis());
+
+        log.info("Started VerifiableSourceTask {}-{} producing to topic {} resuming from seqno {}", name, id, topic, startingSeqno);
+    }
+
+    @Override
+    public List<SourceRecord> poll() throws InterruptedException {
+        long sendStartMs = System.currentTimeMillis();
+        if (throttler.shouldThrottle(seqno - startingSeqno, sendStartMs))
+            throttler.throttle();
+
+        long nowMs = System.currentTimeMillis();
+
+        Map<String, Object> data = new HashMap<>();
+        data.put("name", name);
+        data.put("task", id);
+        data.put("topic", this.topic);
+        data.put("time_ms", nowMs);
+        data.put("seqno", seqno);
+        String dataJson;
+        try {
+            dataJson = JSON_SERDE.writeValueAsString(data);
+        } catch (JsonProcessingException e) {
+            dataJson = "Bad data can't be written as json: " + e.getMessage();
+        }
+        System.out.println(dataJson);
+
+        Map<String, Long> ccOffset = Collections.singletonMap(SEQNO_FIELD, seqno);
+        SourceRecord srcRecord = new SourceRecord(partition, ccOffset, topic, Schema.INT32_SCHEMA, id, Schema.INT64_SCHEMA, seqno);
+        List<SourceRecord> result = Arrays.asList(srcRecord);
+        seqno++;
+        return result;
+    }
+
+    @Override
+    public void stop() {
+        throttler.wakeup();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 3d69fac..2728b5b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -15,4 +15,4 @@
 
 apply from: file('scala.gradle')
 include 'core', 'examples', 'clients', 'tools', 'streams', 'log4j-appender',
-        'connect:api', 'connect:runtime', 'connect:json', 'connect:file'
+        'connect:api', 'connect:runtime', 'connect:json', 'connect:file', 'connect:tools'

http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/tests/kafkatest/services/connect.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index fbac565..de593ea 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -18,14 +18,30 @@ from ducktape.utils.util import wait_until
 from ducktape.errors import DucktapeError
 
 from kafkatest.services.kafka.directory import kafka_dir
-import signal, random, requests
+import signal, random, requests, os.path, json
 
 class ConnectServiceBase(Service):
     """Base class for Kafka Connect services providing some common settings and functionality"""
 
+    PERSISTENT_ROOT = "/mnt/connect"
+    CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "connect.properties")
+    # The log file contains normal log4j logs written using a file appender. stdout and stderr are handled separately
+    # so they can be used for other output, e.g. verifiable source & sink.
+    LOG_FILE = os.path.join(PERSISTENT_ROOT, "connect.log")
+    STDOUT_FILE = os.path.join(PERSISTENT_ROOT, "connect.stdout")
+    STDERR_FILE = os.path.join(PERSISTENT_ROOT, "connect.stderr")
+    LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "connect-log4j.properties")
+    PID_FILE = os.path.join(PERSISTENT_ROOT, "connect.pid")
+
     logs = {
-        "kafka_log": {
-            "path": "/mnt/connect.log",
+        "connect_log": {
+            "path": LOG_FILE,
+            "collect_default": True},
+        "connect_stdout": {
+            "path": STDOUT_FILE,
+            "collect_default": False},
+        "connect_stderr": {
+            "path": STDERR_FILE,
             "collect_default": True},
     }
 
@@ -37,7 +53,7 @@ class ConnectServiceBase(Service):
     def pids(self, node):
         """Return process ids for Kafka Connect processes."""
         try:
-            return [pid for pid in node.account.ssh_capture("cat /mnt/connect.pid", callback=int)]
+            return [pid for pid in node.account.ssh_capture("cat " + self.PID_FILE, callback=int)]
         except:
             return []
 
@@ -52,33 +68,31 @@ class ConnectServiceBase(Service):
         self.connector_config_templates = connector_config_templates
 
     def stop_node(self, node, clean_shutdown=True):
+        self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Kafka Connect on " + str(node.account))
         pids = self.pids(node)
         sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
 
         for pid in pids:
-            node.account.signal(pid, sig, allow_fail=False)
-        for pid in pids:
-            wait_until(lambda: not node.account.alive(pid), timeout_sec=10, err_msg="Kafka Connect standalone process took too long to exit")
+            node.account.signal(pid, sig, allow_fail=True)
+        if clean_shutdown:
+            for pid in pids:
+                wait_until(lambda: not node.account.alive(pid), timeout_sec=60, err_msg="Kafka Connect process on " + str(node.account) + " took too long to exit")
 
-        node.account.ssh("rm -f /mnt/connect.pid", allow_fail=False)
+        node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False)
 
     def restart(self):
         # We don't want to do any clean up here, just restart the process.
         for node in self.nodes:
+            self.logger.info("Restarting Kafka Connect on " + str(node.account))
             self.stop_node(node)
             self.start_node(node)
 
     def clean_node(self, node):
-        if len(self.pids(node)) > 0:
-            self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
-                             (self.__class__.__name__, node.account))
-        for pid in self.pids(node):
-            node.account.signal(pid, signal.SIGKILL, allow_fail=False)
-
-        node.account.ssh("rm -rf /mnt/connect.pid /mnt/connect.log /mnt/connect.properties  " + " ".join(self.config_filenames() + self.files), allow_fail=False)
+        node.account.kill_process("connect", clean_shutdown=False, allow_fail=True)
+        node.account.ssh("rm -rf " + " ".join([self.CONFIG_FILE, self.LOG4J_CONFIG_FILE, self.PID_FILE, self.LOG_FILE, self.STDOUT_FILE, self.STDERR_FILE] + self.config_filenames() + self.files), allow_fail=False)
 
     def config_filenames(self):
-        return ["/mnt/connect-connector-" + str(idx) + ".properties" for idx, template in enumerate(self.connector_config_templates or [])]
+        return [os.path.join(self.PERSISTENT_ROOT, "connect-connector-" + str(idx) + ".properties") for idx, template in enumerate(self.connector_config_templates or [])]
 
 
     def list_connectors(self, node=None):
@@ -112,6 +126,7 @@ class ConnectServiceBase(Service):
 
         meth = getattr(requests, method.lower())
         url = self._base_url(node) + path
+        self.logger.debug("Kafka Connect REST request: %s %s %s %s", node.account.hostname, url, method, body)
         resp = meth(url, json=body)
         self.logger.debug("%s %s response: %d", url, method, resp.status_code)
         if resp.status_code > 400:
@@ -137,19 +152,23 @@ class ConnectStandaloneService(ConnectServiceBase):
         return self.nodes[0]
 
     def start_node(self, node):
-        node.account.create_file("/mnt/connect.properties", self.config_template_func(node))
+        node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
+
+        node.account.create_file(self.CONFIG_FILE, self.config_template_func(node))
+        node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('connect_log4j.properties', log_file=self.LOG_FILE))
         remote_connector_configs = []
         for idx, template in enumerate(self.connector_config_templates):
-            target_file = "/mnt/connect-connector-" + str(idx) + ".properties"
+            target_file = os.path.join(self.PERSISTENT_ROOT, "connect-connector-" + str(idx) + ".properties")
             node.account.create_file(target_file, template)
             remote_connector_configs.append(target_file)
 
-        self.logger.info("Starting Kafka Connect standalone process")
-        with node.account.monitor_log("/mnt/connect.log") as monitor:
-            node.account.ssh("/opt/%s/bin/connect-standalone.sh /mnt/connect.properties " % kafka_dir(node) +
+        self.logger.info("Starting Kafka Connect standalone process on " + str(node.account))
+        with node.account.monitor_log(self.LOG_FILE) as monitor:
+            node.account.ssh("( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE +
+                             "/opt/%s/bin/connect-standalone.sh %s " % (kafka_dir(node), self.CONFIG_FILE) +
                              " ".join(remote_connector_configs) +
-                             " 1>> /mnt/connect.log 2>> /mnt/connect.log & echo $! > /mnt/connect.pid")
-            monitor.wait_until('Kafka Connect started', timeout_sec=10, err_msg="Never saw message indicating Kafka Connect finished startup")
+                             (" & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)))
+            monitor.wait_until('Kafka Connect started', timeout_sec=15, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account))
 
         if len(self.pids(node)) == 0:
             raise RuntimeError("No process ids recorded")
@@ -164,16 +183,20 @@ class ConnectDistributedService(ConnectServiceBase):
         self.configs_topic = configs_topic
 
     def start_node(self, node):
-        node.account.create_file("/mnt/connect.properties", self.config_template_func(node))
+        node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
+
+        node.account.create_file(self.CONFIG_FILE, self.config_template_func(node))
+        node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('connect_log4j.properties', log_file=self.LOG_FILE))
         if self.connector_config_templates:
             raise DucktapeError("Config files are not valid in distributed mode, submit connectors via the REST API")
 
-        self.logger.info("Starting Kafka Connect distributed process")
-        with node.account.monitor_log("/mnt/connect.log") as monitor:
-            cmd = "/opt/%s/bin/connect-distributed.sh /mnt/connect.properties " % kafka_dir(node)
-            cmd += " 1>> /mnt/connect.log 2>> /mnt/connect.log & echo $! > /mnt/connect.pid"
+        self.logger.info("Starting Kafka Connect distributed process on " + str(node.account))
+        with node.account.monitor_log(self.LOG_FILE) as monitor:
+            cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
+            cmd += "/opt/%s/bin/connect-distributed.sh %s " % (kafka_dir(node), self.CONFIG_FILE)
+            cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)
             node.account.ssh(cmd)
-            monitor.wait_until('Kafka Connect started', timeout_sec=10, err_msg="Never saw message indicating Kafka Connect finished startup")
+            monitor.wait_until('Kafka Connect started', timeout_sec=15, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account))
 
         if len(self.pids(node)) == 0:
             raise RuntimeError("No process ids recorded")
@@ -188,4 +211,75 @@ class ConnectRestError(RuntimeError):
         self.url = url
 
     def __unicode__(self):
-        return "Kafka Connect REST call failed: returned " + self.status + " for " + self.url + ". Response: " + self.message
\ No newline at end of file
+        return "Kafka Connect REST call failed: returned " + self.status + " for " + self.url + ". Response: " + self.message
+
+
+
+class VerifiableConnector(object):
+    def messages(self):
+        """
+        Collect and parse the logs from Kafka Connect nodes. Return a list containing all parsed JSON messages generated by
+        this source.
+        """
+        self.logger.info("Collecting messages from log of %s %s", type(self).__name__, self.name)
+        records = []
+        for node in self.cc.nodes:
+            for line in node.account.ssh_capture('cat ' + self.cc.STDOUT_FILE):
+                try:
+                    data = json.loads(line)
+                except ValueError:
+                    self.logger.debug("Ignoring unparseable line: %s", line)
+                    continue
+                # Filter to only ones matching our name to support multiple verifiable producers
+                if data['name'] != self.name: continue
+                data['node'] = node
+                records.append(data)
+        return records
+
+    def stop(self):
+        self.logger.info("Destroying connector %s %s", type(self).__name__, self.name)
+        self.cc.delete_connector(self.name)
+
+class VerifiableSource(VerifiableConnector):
+    """
+    Helper class for running a verifiable source connector on a Kafka Connect cluster and analyzing the output.
+    """
+
+    def __init__(self, cc, name="verifiable-source", tasks=1, topic="verifiable", throughput=1000):
+        self.cc = cc
+        self.logger = self.cc.logger
+        self.name = name
+        self.tasks = tasks
+        self.topic = topic
+        self.throughput = throughput
+
+    def start(self):
+        self.logger.info("Creating connector VerifiableSourceConnector %s", self.name)
+        self.cc.create_connector({
+            'name': self.name,
+            'connector.class': 'org.apache.kafka.connect.tools.VerifiableSourceConnector',
+            'tasks.max': self.tasks,
+            'topic': self.topic,
+            'throughput': self.throughput
+        })
+
+class VerifiableSink(VerifiableConnector):
+    """
+    Helper class for running a verifiable sink connector on a Kafka Connect cluster and analyzing the output.
+    """
+
+    def __init__(self, cc, name="verifiable-sink", tasks=1, topics=["verifiable"]):
+        self.cc = cc
+        self.logger = self.cc.logger
+        self.name = name
+        self.tasks = tasks
+        self.topics = topics
+
+    def start(self):
+        self.logger.info("Creating connector VerifiableSinkConnector %s", self.name)
+        self.cc.create_connector({
+            'name': self.name,
+            'connector.class': 'org.apache.kafka.connect.tools.VerifiableSinkConnector',
+            'tasks.max': self.tasks,
+            'topics': ",".join(self.topics)
+        })

http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 18021d9..84d358d 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -100,7 +100,8 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
     }
 
     def __init__(self, context, num_nodes, kafka, topic, new_consumer=False, message_validator=None,
-                 from_beginning=True, consumer_timeout_ms=None, version=TRUNK, client_id="console-consumer", jmx_object_names=None, jmx_attributes=[]):
+                 from_beginning=True, consumer_timeout_ms=None, version=TRUNK, client_id="console-consumer",
+                 print_key=False, jmx_object_names=None, jmx_attributes=[]):
         """
         Args:
             context:                    standard context
@@ -114,6 +115,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
                                         successively consumed messages exceeds this timeout. Setting this and
                                         waiting for the consumer to stop is a pretty good way to consume all messages
                                         in a topic.
+            print_key                   if True, print each message's key in addition to its value
         """
         JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
         BackgroundThreadService.__init__(self, context, num_nodes)
@@ -131,7 +133,7 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
         self.message_validator = message_validator
         self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)}
         self.client_id = client_id
-
+        self.print_key = print_key
 
     def prop_file(self, node):
         """Return a string which can be used to create a configuration file appropriate for the given node."""
@@ -184,6 +186,9 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
             if node.version > LATEST_0_8_2:
                 cmd += " --timeout-ms %s" % self.consumer_timeout_ms
 
+        if self.print_key:
+            cmd += " --property print.key=true"
+
         cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args
         return cmd
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 27530f0..a7a1581 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -29,10 +29,13 @@ import re
 import signal
 import subprocess
 import time
-
+import os.path
 
 class KafkaService(JmxMixin, Service):
 
+    PERSISTENT_ROOT = "/mnt"
+    LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "kafka-log4j.properties")
+
     logs = {
         "kafka_log": {
             "path": "/mnt/kafka.log",
@@ -104,6 +107,7 @@ class KafkaService(JmxMixin, Service):
 
     def start_cmd(self, node):
         cmd = "export JMX_PORT=%d; " % self.jmx_port
+        cmd += "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
         cmd += "export LOG_DIR=/mnt/kafka-operational-logs/; "
         cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
         cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log &"
@@ -114,6 +118,7 @@ class KafkaService(JmxMixin, Service):
         self.logger.info("kafka.properties:")
         self.logger.info(prop_file)
         node.account.create_file("/mnt/kafka.properties", prop_file)
+        node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('log4j.properties'))
 
         self.security_config.setup_node(node)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/tests/kafkatest/services/kafka/templates/log4j.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/templates/log4j.properties b/tests/kafkatest/services/kafka/templates/log4j.properties
new file mode 100644
index 0000000..bf816e7
--- /dev/null
+++ b/tests/kafkatest/services/kafka/templates/log4j.properties
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout 
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
+log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
+log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
+log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
+log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
+log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
+log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+# Turn on all our debugging info
+#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
+#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
+#log4j.logger.kafka.perf=DEBUG, kafkaAppender
+#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
+#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
+log4j.logger.kafka=INFO, kafkaAppender
+
+log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
+log4j.additivity.kafka.network.RequestChannel$=false
+
+#log4j.logger.kafka.network.Processor=TRACE, requestAppender
+#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
+#log4j.additivity.kafka.server.KafkaApis=false
+log4j.logger.kafka.request.logger=WARN, requestAppender
+log4j.additivity.kafka.request.logger=false
+
+log4j.logger.kafka.controller=TRACE, controllerAppender
+log4j.additivity.kafka.controller=false
+
+log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
+log4j.additivity.kafka.log.LogCleaner=false
+
+log4j.logger.state.change.logger=TRACE, stateChangeAppender
+log4j.additivity.state.change.logger=false
+
+#Change this to debug to get the actual audit log for authorizer.
+log4j.logger.kafka.authorizer.logger=WARN, authorizerAppender
+log4j.additivity.kafka.authorizer.logger=false
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/tests/kafkatest/services/templates/connect_log4j.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/connect_log4j.properties b/tests/kafkatest/services/templates/connect_log4j.properties
new file mode 100644
index 0000000..d62a93d
--- /dev/null
+++ b/tests/kafkatest/services/templates/connect_log4j.properties
@@ -0,0 +1,30 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+# Define the root logger with appender file
+log4j.rootLogger = {{ log_level|default("INFO") }}, FILE
+
+log4j.appender.FILE=org.apache.log4j.FileAppender
+log4j.appender.FILE.File={{ log_file }}
+log4j.appender.FILE.ImmediateFlush=true
+log4j.appender.FILE.Threshold=debug
+log4j.appender.FILE.Append=true
+log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)%n
+
+log4j.logger.org.apache.zookeeper=ERROR
+log4j.logger.org.I0Itec.zkclient=ERROR

http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/tests/kafkatest/tests/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect_distributed_test.py b/tests/kafkatest/tests/connect_distributed_test.py
index 55901c2..4689f36 100644
--- a/tests/kafkatest/tests/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect_distributed_test.py
@@ -14,11 +14,13 @@
 # limitations under the License.
 
 from kafkatest.tests.kafka_test import KafkaTest
-from kafkatest.services.connect import ConnectDistributedService
+from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink
+from kafkatest.services.console_consumer import ConsoleConsumer
 from ducktape.utils.util import wait_until
-import hashlib, subprocess, json, itertools
+import hashlib, subprocess, json, itertools, time
+from collections import Counter
 
-class ConnectDistributedFileTest(KafkaTest):
+class ConnectDistributedTest(KafkaTest):
     """
     Simple test of Kafka Connect in distributed mode, producing data from files on one cluster and consuming it on
     another, validating the total output is identical to the input.
@@ -41,18 +43,21 @@ class ConnectDistributedFileTest(KafkaTest):
     SCHEMA = { "type": "string", "optional": False }
 
     def __init__(self, test_context):
-        super(ConnectDistributedFileTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+        super(ConnectDistributedTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
             'test' : { 'partitions': 1, 'replication-factor': 1 }
         })
 
-        self.cc = ConnectDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE])
+        self.cc = ConnectDistributedService(test_context, 3, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE])
+        self.cc.log_level = "DEBUG"
+        self.key_converter = "org.apache.kafka.connect.json.JsonConverter"
+        self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
+        self.schemas = True
 
-    def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True):
-        assert converter != None, "converter type must be set"
-        # Template parameters
-        self.key_converter = converter
-        self.value_converter = converter
-        self.schemas = schemas
+    def test_file_source_and_sink(self):
+        """
+        Tests that a basic file connector works across clean rolling bounces. This validates that the connector is
+        correctly created, tasks instantiated, and as nodes restart the work is rebalanced across nodes.
+        """
 
         self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
 
@@ -68,7 +73,7 @@ class ConnectDistributedFileTest(KafkaTest):
         # do rebalancing of the group, etc, and b) without explicit leave group support, rebalancing takes awhile
         for node in self.cc.nodes:
             node.account.ssh("echo -e -n " + repr(self.FIRST_INPUTS) + " >> " + self.INPUT_FILE)
-        wait_until(lambda: self.validate_output(self.FIRST_INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
+        wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST), timeout_sec=70, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
 
         # Restarting both should result in them picking up where they left off,
         # only processing new data.
@@ -76,19 +81,113 @@ class ConnectDistributedFileTest(KafkaTest):
 
         for node in self.cc.nodes:
             node.account.ssh("echo -e -n " + repr(self.SECOND_INPUTS) + " >> " + self.INPUT_FILE)
-        wait_until(lambda: self.validate_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=120, err_msg="Sink output file never converged to the same state as the input file")
+        wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=70, err_msg="Sink output file never converged to the same state as the input file")
 
-    def validate_output(self, input):
+
+    def test_clean_bounce(self):
+        """
+        Validates that source and sink tasks that run continuously and produce a predictable sequence of messages
+        run correctly and deliver messages exactly once when Kafka Connect workers undergo clean rolling bounces.
+        """
+        num_tasks = 3
+
+        self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
+        self.cc.start()
+
+        self.source = VerifiableSource(self.cc, tasks=num_tasks)
+        self.source.start()
+        self.sink = VerifiableSink(self.cc, tasks=num_tasks)
+        self.sink.start()
+
+        for _ in range(3):
+            for node in self.cc.nodes:
+                started = time.time()
+                self.logger.info("Cleanly bouncing Kafka Connect on " + str(node.account))
+                self.cc.stop_node(node)
+                with node.account.monitor_log(self.cc.LOG_FILE) as monitor:
+                    self.cc.start_node(node)
+                    monitor.wait_until("Starting connectors and tasks using config offset", timeout_sec=90,
+                                       err_msg="Kafka Connect worker didn't successfully join group and start work")
+                self.logger.info("Bounced Kafka Connect on %s and rejoined in %f seconds", node.account, time.time() - started)
+
+        self.source.stop()
+        self.sink.stop()
+        self.cc.stop()
+
+        # Validate at least once delivery of everything that was reported as written since we should have flushed and
+        # cleanly exited. Currently this only tests at least once delivery because the sink task may not have consumed
+        # all the messages generated by the source task. This needs to be done per-task since seqnos are not unique across
+        # tasks.
+        src_msgs = self.source.messages()
+        sink_msgs = self.sink.messages()
+        success = True
+        errors = []
+        for task in range(num_tasks):
+            src_seqnos = [msg['seqno'] for msg in src_msgs if msg['task'] == task]
+            # Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because clean
+            # bouncing should commit on rebalance.
+            src_seqno_max = max(src_seqnos)
+            self.logger.debug("Max source seqno: %d", src_seqno_max)
+            src_seqno_counts = Counter(src_seqnos)
+            missing_src_seqnos = sorted(set(range(src_seqno_max)).difference(set(src_seqnos)))
+            duplicate_src_seqnos = sorted([seqno for seqno,count in src_seqno_counts.iteritems() if count > 1])
+
+            if missing_src_seqnos:
+                self.logger.error("Missing source sequence numbers for task " + str(task))
+                errors.append("Found missing source sequence numbers for task %d: %s" % (task, missing_src_seqnos))
+                success = False
+            if duplicate_src_seqnos:
+                self.logger.error("Duplicate source sequence numbers for task " + str(task))
+                errors.append("Found duplicate source sequence numbers for task %d: %s" % (task, duplicate_src_seqnos))
+                success = False
+
+            sink_seqnos = [msg['seqno'] for msg in sink_msgs if msg['task'] == task and 'flushed' in msg]
+            # Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because
+            # clean bouncing should commit on rebalance.
+            sink_seqno_max = max(sink_seqnos)
+            self.logger.debug("Max sink seqno: %d", sink_seqno_max)
+            sink_seqno_counts = Counter(sink_seqnos)
+            missing_sink_seqnos = sorted(set(range(sink_seqno_max)).difference(set(sink_seqnos)))
+            duplicate_sink_seqnos = sorted([seqno for seqno,count in sink_seqno_counts.iteritems() if count > 1])
+
+            if missing_sink_seqnos:
+                self.logger.error("Missing sink sequence numbers for task " + str(task))
+                errors.append("Found missing sink sequence numbers for task %d: %s" % (task, missing_sink_seqnos))
+                success = False
+            if duplicate_sink_seqnos:
+               self.logger.error("Duplicate sink sequence numbers for task " + str(task))
+               errors.append("Found duplicate sink sequence numbers for task %d: %s" % (task, duplicate_sink_seqnos))
+               success = False
+
+
+            if sink_seqno_max > src_seqno_max:
+                self.logger.error("Found sink sequence number greater than any generated sink sequence number for task %d: %d > %d", task, sink_seqno_max, src_seqno_max)
+                errors.append("Found sink sequence number greater than any generated sink sequence number for task %d: %d > %d" % (task, sink_seqno_max, src_seqno_max))
+                success = False
+
+            if src_seqno_max < 1000 or sink_seqno_max < 1000:
+                errors.append("Not enough messages were processed: source:%d sink:%d" % (src_seqno_max, sink_seqno_max))
+                success = False
+
+        if not success:
+            self.mark_for_collect(self.cc)
+            # Also collect the data in the topic to aid in debugging
+            consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True)
+            consumer_validator.run()
+            self.mark_for_collect(consumer_validator, "consumer_stdout")
+        assert success, "Found validation errors:\n" + "\n  ".join(errors)
+
+
+    def _validate_file_output(self, input):
         input_set = set(input)
         # Output needs to be collected from all nodes because we can't be sure where the tasks will be scheduled.
         # Between the first and second rounds, we might even end up with half the data on each node.
         output_set = set(itertools.chain(*[
-            [line.strip() for line in self.file_contents(node, self.OUTPUT_FILE)] for node in self.cc.nodes
+            [line.strip() for line in self._file_contents(node, self.OUTPUT_FILE)] for node in self.cc.nodes
         ]))
         return input_set == output_set
 
-
-    def file_contents(self, node, file):
+    def _file_contents(self, node, file):
         try:
             # Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of
             # immediately

http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
index 3a06862..2a7f7b1 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -24,6 +24,7 @@ import net.sourceforge.argparse4j.inf.ArgumentParserException;
 import net.sourceforge.argparse4j.inf.Namespace;
 
 import org.apache.kafka.clients.producer.*;
+import org.apache.kafka.common.utils.ThroughputThrottler;
 
 public class ProducerPerformance {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java b/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java
deleted file mode 100644
index d8deb22..0000000
--- a/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.tools;
-
-
-/**
- * This class helps producers throttle throughput.
- *
- * If targetThroughput >= 0, the resulting average throughput will be approximately
- * min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0,
- * no throttling will occur.
- *
- * To use, do this between successive send attempts:
- * <pre>
- *     {@code
- *      if (throttler.shouldThrottle(...)) {
- *          throttler.throttle();
- *      }
- *     }
- * </pre>
- *
- * Note that this can be used to throttle message throughput or data throughput.
- */
-public class ThroughputThrottler {
-
-    private static final long NS_PER_MS = 1000000L;
-    private static final long NS_PER_SEC = 1000 * NS_PER_MS;
-    private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
-
-    long sleepTimeNs;
-    long sleepDeficitNs = 0;
-    long targetThroughput = -1;
-    long startMs;
-
-    /**
-     * @param targetThroughput Can be messages/sec or bytes/sec
-     * @param startMs          When the very first message is sent
-     */
-    public ThroughputThrottler(long targetThroughput, long startMs) {
-        this.startMs = startMs;
-        this.targetThroughput = targetThroughput;
-        this.sleepTimeNs = targetThroughput > 0 ?
-                           NS_PER_SEC / targetThroughput :
-                           Long.MAX_VALUE;
-    }
-
-    /**
-     * @param amountSoFar bytes produced so far if you want to throttle data throughput, or
-     *                    messages produced so far if you want to throttle message throughput.
-     * @param sendStartMs timestamp of the most recently sent message
-     * @return
-     */
-    public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
-        if (this.targetThroughput < 0) {
-            // No throttling in this case
-            return false;
-        }
-
-        float elapsedMs = (sendStartMs - startMs) / 1000.f;
-        return elapsedMs > 0 && (amountSoFar / elapsedMs) > this.targetThroughput;
-    }
-
-    /**
-     * Occasionally blocks for small amounts of time to achieve targetThroughput.
-     *
-     * Note that if targetThroughput is 0, this will block extremely aggressively.
-     */
-    public void throttle() {
-        if (targetThroughput == 0) {
-            try {
-                Thread.sleep(Long.MAX_VALUE);
-            } catch (InterruptedException e) {
-                // do nothing
-            }
-            return;
-        }
-
-        // throttle throughput by sleeping, on average,
-        // (1 / this.throughput) seconds between "things sent"
-        sleepDeficitNs += sleepTimeNs;
-
-        // If enough sleep deficit has accumulated, sleep a little
-        if (sleepDeficitNs >= MIN_SLEEP_NS) {
-            long sleepMs = sleepDeficitNs / 1000000;
-            long sleepNs = sleepDeficitNs - sleepMs * 1000000;
-
-            long sleepStartNs = System.nanoTime();
-            try {
-                Thread.sleep(sleepMs, (int) sleepNs);
-                sleepDeficitNs = 0;
-            } catch (InterruptedException e) {
-                // If sleep is cut short, reduce deficit by the amount of
-                // time we actually spent sleeping
-                long sleepElapsedNs = System.nanoTime() - sleepStartNs;
-                if (sleepElapsedNs <= sleepDeficitNs) {
-                    sleepDeficitNs -= sleepElapsedNs;
-                }
-            }
-        }
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/8db55618/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index 0cd90c0..e8bd330 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -41,6 +41,7 @@ import net.sourceforge.argparse4j.ArgumentParsers;
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.ArgumentParserException;
 import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.utils.ThroughputThrottler;
 
 /**
  * Primarily intended for use with system testing, this producer prints metadata


Mime
View raw message