kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-2807: Fix Kafka Connect packaging and move VerifiableSource/Sink into runtime jar.
Date Thu, 12 Nov 2015 19:13:35 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ab5ac264a -> 1408c670e


KAFKA-2807: Fix Kafka Connect packaging and move VerifiableSource/Sink into runtime jar.

Gradle does not handle subprojects with the same name (top-level tools vs
connect/tools) properly, making the dependency impossible to express correctly
since we need to move the ThroughputThrottler class into the top level tools
project. Moving the current set of tools into the runtime jar works fine since
they are only used for system tests at the moment.

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Gwen Shapira

Closes #512 from ewencp/kafka-2807-redux


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1408c670
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1408c670
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1408c670

Branch: refs/heads/trunk
Commit: 1408c670ea576a430287182e9aaa26793917129a
Parents: ab5ac26
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Thu Nov 12 11:11:56 2015 -0800
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Thu Nov 12 11:11:56 2015 -0800

----------------------------------------------------------------------
 build.gradle                                    | 101 +++------------
 checkstyle/import-control.xml                   |   1 +
 .../connect/tools/VerifiableSinkConnector.java  |  64 ++++++++++
 .../kafka/connect/tools/VerifiableSinkTask.java | 109 ++++++++++++++++
 .../tools/VerifiableSourceConnector.java        |  64 ++++++++++
 .../connect/tools/VerifiableSourceTask.java     | 128 +++++++++++++++++++
 .../connect/tools/VerifiableSinkConnector.java  |  64 ----------
 .../kafka/connect/tools/VerifiableSinkTask.java | 109 ----------------
 .../tools/VerifiableSourceConnector.java        |  64 ----------
 .../connect/tools/VerifiableSourceTask.java     | 128 -------------------
 settings.gradle                                 |   2 +-
 .../apache/kafka/tools/ThroughputThrottler.java |   2 +-
 12 files changed, 387 insertions(+), 449 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1408c670/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 0ee6c41..29e351f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -230,7 +230,7 @@ for ( sv in ['2_10_5', '2_11_7'] ) {
   }
 }
 
-def connectPkgs = ['connect-api', 'connect-runtime', 'connect-json', 'connect-file', 'connect-tools']
+def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file']
 def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams'] + connectPkgs
 
 tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" }) {}
@@ -321,7 +321,7 @@ project(':core') {
     standardOutput = new File('docs/kafka_config.html').newOutputStream()
   }
 
-  task siteDocsTar(dependsOn: ['genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs',
':connect-runtime:genConnectConfigDocs'], type: Tar) {
+  task siteDocsTar(dependsOn: ['genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs',
':connect:runtime:genConnectConfigDocs'], type: Tar) {
     classifier = 'site-docs'
     compression = Compression.GZIP
     from project.file("../docs")
@@ -342,16 +342,14 @@ project(':core') {
     from(project.siteDocsTar) { into("site-docs/") }
     from(project(':tools').jar) { into("libs/") }
     from(project(':tools').configurations.runtime) { into("libs/") }
-    from(project(':connect-api').jar) { into("libs/") }
-    from(project(':connect-api').configurations.runtime) { into("libs/") }
-    from(project(':connect-runtime').jar) { into("libs/") }
-    from(project(':connect-runtime').configurations.runtime) { into("libs/") }
-    from(project(':connect-json').jar) { into("libs/") }
-    from(project(':connect-json').configurations.runtime) { into("libs/") }
-    from(project(':connect-file').jar) { into("libs/") }
-    from(project(':connect-file').configurations.runtime) { into("libs/") }
-    from(project(':connect-tools').jar) { into("libs/") }
-    from(project(':connect-tools').configurations.runtime) { into("libs/") }
+    from(project(':connect:api').jar) { into("libs/") }
+    from(project(':connect:api').configurations.runtime) { into("libs/") }
+    from(project(':connect:runtime').jar) { into("libs/") }
+    from(project(':connect:runtime').configurations.runtime) { into("libs/") }
+    from(project(':connect:json').jar) { into("libs/") }
+    from(project(':connect:json').configurations.runtime) { into("libs/") }
+    from(project(':connect:file').jar) { into("libs/") }
+    from(project(':connect:file').configurations.runtime) { into("libs/") }
   }
 
   jar {
@@ -638,7 +636,7 @@ project(':log4j-appender') {
   test.dependsOn('checkstyleMain', 'checkstyleTest')
 }
 
-project(':connect-api') {
+project(':connect:api') {
   apply plugin: 'checkstyle'
   archivesBaseName = "connect-api"
 
@@ -695,12 +693,12 @@ project(':connect-api') {
   test.dependsOn('checkstyleMain', 'checkstyleTest')
 }
 
-project(':connect-json') {
+project(':connect:json') {
   apply plugin: 'checkstyle'
   archivesBaseName = "connect-json"
 
   dependencies {
-    compile project(':connect-api')
+    compile project(':connect:api')
     compile "$slf4japi"
     compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version"
 
@@ -756,13 +754,14 @@ project(':connect-json') {
   test.dependsOn('checkstyleMain', 'checkstyleTest')
 }
 
-project(':connect-runtime') {
+project(':connect:runtime') {
   apply plugin: 'checkstyle'
   archivesBaseName = "connect-runtime"
 
   dependencies {
-    compile project(':connect-api')
+    compile project(':connect:api')
     compile project(':clients')
+    compile project(':tools')
     compile "$slf4japi"
 
     compile "org.eclipse.jetty:jetty-server:$jetty_version"
@@ -776,7 +775,7 @@ project(':connect-runtime') {
     testCompile "$powermock_easymock"
     testCompile project(':clients').sourceSets.test.output
     testRuntime "$slf4jlog4j"
-    testRuntime project(":connect-json")
+    testRuntime project(":connect:json")
   }
 
   task testJar(type: Jar) {
@@ -830,75 +829,13 @@ project(':connect-runtime') {
   }
 }
 
-project(':connect-file') {
+project(':connect:file') {
   apply plugin: 'checkstyle'
   archivesBaseName = "connect-file"
 
   dependencies {
-    compile project(':connect-api')
-    compile "$slf4japi"
-
-    testCompile "$junit"
-    testCompile "$easymock"
-    testCompile "$powermock"
-    testCompile "$powermock_easymock"
-    testRuntime "$slf4jlog4j"
-  }
-
-  task testJar(type: Jar) {
-    classifier = 'test'
-    from sourceSets.test.output
-  }
-
-  test {
-    testLogging {
-      events "passed", "skipped", "failed"
-      exceptionFormat = 'full'
-    }
-  }
-
-  javadoc {
-    include "**/org/apache/kafka/connect/*"
-  }
-
-  tasks.create(name: "copyDependantLibs", type: Copy) {
-    from (configurations.testRuntime) {
-      include('slf4j-log4j12*')
-    }
-    from (configurations.runtime) {
-      exclude('kafka-clients*')
-      exclude('connect-*')
-    }
-    into "$buildDir/dependant-libs"
-  }
-
-  jar {
-    dependsOn copyDependantLibs
-  }
-
-  artifacts {
-    archives testJar
-  }
-
-  configurations {
-    archives.extendsFrom(testCompile)
-  }
-
-  checkstyle {
-    configFile = new File(rootDir, "checkstyle/checkstyle.xml")
-  }
-  test.dependsOn('checkstyleMain', 'checkstyleTest')
-}
-
-project(':connect-tools') {
-  apply plugin: 'checkstyle'
-  archivesBaseName = "connect-tools"
-
-  dependencies {
-    compile project(':connect-api')
-    compile project(':tools')
+    compile project(':connect:api')
     compile "$slf4japi"
-    compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version"
 
     testCompile "$junit"
     testCompile "$easymock"

http://git-wip-us.apache.org/repos/asf/kafka/blob/1408c670/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 16a3700..204bc60 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -207,6 +207,7 @@
 
     <subpackage name="tools">
       <allow pkg="org.apache.kafka.connect" />
+      <allow pkg="org.apache.kafka.tools" />
       <allow pkg="com.fasterxml.jackson" />
     </subpackage>
   </subpackage>

http://git-wip-us.apache.org/repos/asf/kafka/blob/1408c670/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
new file mode 100644
index 0000000..0ab64fd
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.tools;
+
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.source.SourceConnector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @see VerifiableSinkTask
+ */
+public class VerifiableSinkConnector extends SourceConnector {
+    private Map<String, String> config;
+
+    @Override
+    public String version() {
+        return AppInfoParser.getVersion();
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        this.config = props;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return VerifiableSinkTask.class;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        ArrayList<Map<String, String>> configs = new ArrayList<>();
+        for (Integer i = 0; i < maxTasks; i++) {
+            Map<String, String> props = new HashMap<>(config);
+            props.put(VerifiableSinkTask.ID_CONFIG, i.toString());
+            configs.add(props);
+        }
+        return configs;
+    }
+
+    @Override
+    public void stop() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1408c670/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java
new file mode 100644
index 0000000..2333452
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Counterpart to {@link VerifiableSourceTask} that consumes records and logs information
about each to stdout. This
+ * allows validation of processing of messages by sink tasks on distributed workers even
in the face of worker restarts
+ * and failures. This task relies on the offset management provided by the Kafka Connect
framework and therefore can detect
+ * bugs in its implementation.
+ */
+public class VerifiableSinkTask extends SinkTask {
+    public static final String NAME_CONFIG = "name";
+    public static final String ID_CONFIG = "id";
+
+    private static final ObjectMapper JSON_SERDE = new ObjectMapper();
+
+    private String name; // Connector name
+    private int id; // Task ID
+
+    private ArrayList<Map<String, Object>> unflushed = new ArrayList<>();
+
+    @Override
+    public String version() {
+        return new VerifiableSinkConnector().version();
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        try {
+            name = props.get(NAME_CONFIG);
+            id = Integer.parseInt(props.get(ID_CONFIG));
+        } catch (NumberFormatException e) {
+            throw new ConnectException("Invalid VerifiableSourceTask configuration", e);
+        }
+    }
+
+    @Override
+    public void put(Collection<SinkRecord> records) {
+        long nowMs = System.currentTimeMillis();
+        for (SinkRecord record : records) {
+            Map<String, Object> data = new HashMap<>();
+            data.put("name", name);
+            data.put("task", record.key()); // VerifiableSourceTask's input task (source
partition)
+            data.put("sinkTask", id);
+            data.put("topic", record.topic());
+            data.put("time_ms", nowMs);
+            data.put("seqno", record.value());
+            data.put("offset", record.kafkaOffset());
+            String dataJson;
+            try {
+                dataJson = JSON_SERDE.writeValueAsString(data);
+            } catch (JsonProcessingException e) {
+                dataJson = "Bad data can't be written as json: " + e.getMessage();
+            }
+            System.out.println(dataJson);
+            unflushed.add(data);
+        }
+    }
+
+    @Override
+    public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        long nowMs = System.currentTimeMillis();
+        for (Map<String, Object> data : unflushed) {
+            data.put("time_ms", nowMs);
+            data.put("flushed", true);
+            String dataJson;
+            try {
+                dataJson = JSON_SERDE.writeValueAsString(data);
+            } catch (JsonProcessingException e) {
+                dataJson = "Bad data can't be written as json: " + e.getMessage();
+            }
+            System.out.println(dataJson);
+        }
+        unflushed.clear();
+    }
+
+    @Override
+    public void stop() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1408c670/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
new file mode 100644
index 0000000..5f9afd5
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.tools;
+
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.source.SourceConnector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @see VerifiableSourceTask
+ */
+public class VerifiableSourceConnector extends SourceConnector {
+    private Map<String, String> config;
+
+    @Override
+    public String version() {
+        return AppInfoParser.getVersion();
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        this.config = props;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return VerifiableSourceTask.class;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        ArrayList<Map<String, String>> configs = new ArrayList<>();
+        for (Integer i = 0; i < maxTasks; i++) {
+            Map<String, String> props = new HashMap<>(config);
+            props.put(VerifiableSourceTask.ID_CONFIG, i.toString());
+            configs.add(props);
+        }
+        return configs;
+    }
+
+    @Override
+    public void stop() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1408c670/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
new file mode 100644
index 0000000..6dcfdc4
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.tools.ThroughputThrottler;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A connector primarily intended for system tests. The connector simply generates as many
tasks as requested. The
+ * tasks print metadata in the form of JSON to stdout for each message generated, making
externally visible which
+ * messages have been sent. Each message is also assigned a unique, increasing seqno that
is passed to Kafka Connect; when
+ * tasks are started on new nodes, this seqno is used to resume where the task previously
left off, allowing for
+ * testing of distributed Kafka Connect.
+ *
+ * If logging is left enabled, log output on stdout can be easily ignored by checking whether
a given line is valid JSON.
+ */
+public class VerifiableSourceTask extends SourceTask {
+    private static final Logger log = LoggerFactory.getLogger(VerifiableSourceTask.class);
+
+    public static final String NAME_CONFIG = "name";
+    public static final String ID_CONFIG = "id";
+    public static final String TOPIC_CONFIG = "topic";
+    public static final String THROUGHPUT_CONFIG = "throughput";
+
+    private static final String ID_FIELD = "id";
+    private static final String SEQNO_FIELD = "seqno";
+
+    private static final ObjectMapper JSON_SERDE = new ObjectMapper();
+
+    private String name; // Connector name
+    private int id; // Task ID
+    private String topic;
+    private Map<String, Integer> partition;
+    private long startingSeqno;
+    private long seqno;
+    private ThroughputThrottler throttler;
+
+    @Override
+    public String version() {
+        return new VerifiableSourceConnector().version();
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        final long throughput;
+        try {
+            name = props.get(NAME_CONFIG);
+            id = Integer.parseInt(props.get(ID_CONFIG));
+            topic = props.get(TOPIC_CONFIG);
+            throughput = Long.parseLong(props.get(THROUGHPUT_CONFIG));
+        } catch (NumberFormatException e) {
+            throw new ConnectException("Invalid VerifiableSourceTask configuration", e);
+        }
+
+        partition = Collections.singletonMap(ID_FIELD, id);
+        Map<String, Object> previousOffset = this.context.offsetStorageReader().offset(partition);
+        if (previousOffset != null)
+            seqno = (Long) previousOffset.get(SEQNO_FIELD) + 1;
+        else
+            seqno = 0;
+        startingSeqno = seqno;
+        throttler = new ThroughputThrottler(throughput, System.currentTimeMillis());
+
+        log.info("Started VerifiableSourceTask {}-{} producing to topic {} resuming from
seqno {}", name, id, topic, startingSeqno);
+    }
+
+    @Override
+    public List<SourceRecord> poll() throws InterruptedException {
+        long sendStartMs = System.currentTimeMillis();
+        if (throttler.shouldThrottle(seqno - startingSeqno, sendStartMs))
+            throttler.throttle();
+
+        long nowMs = System.currentTimeMillis();
+
+        Map<String, Object> data = new HashMap<>();
+        data.put("name", name);
+        data.put("task", id);
+        data.put("topic", this.topic);
+        data.put("time_ms", nowMs);
+        data.put("seqno", seqno);
+        String dataJson;
+        try {
+            dataJson = JSON_SERDE.writeValueAsString(data);
+        } catch (JsonProcessingException e) {
+            dataJson = "Bad data can't be written as json: " + e.getMessage();
+        }
+        System.out.println(dataJson);
+
+        Map<String, Long> ccOffset = Collections.singletonMap(SEQNO_FIELD, seqno);
+        SourceRecord srcRecord = new SourceRecord(partition, ccOffset, topic, Schema.INT32_SCHEMA,
id, Schema.INT64_SCHEMA, seqno);
+        List<SourceRecord> result = Arrays.asList(srcRecord);
+        seqno++;
+        return result;
+    }
+
+    @Override
+    public void stop() {
+        throttler.wakeup();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1408c670/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
----------------------------------------------------------------------
diff --git a/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
deleted file mode 100644
index 0ab64fd..0000000
--- a/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.connect.tools;
-
-import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.connector.Task;
-import org.apache.kafka.connect.source.SourceConnector;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * @see VerifiableSinkTask
- */
-public class VerifiableSinkConnector extends SourceConnector {
-    private Map<String, String> config;
-
-    @Override
-    public String version() {
-        return AppInfoParser.getVersion();
-    }
-
-    @Override
-    public void start(Map<String, String> props) {
-        this.config = props;
-    }
-
-    @Override
-    public Class<? extends Task> taskClass() {
-        return VerifiableSinkTask.class;
-    }
-
-    @Override
-    public List<Map<String, String>> taskConfigs(int maxTasks) {
-        ArrayList<Map<String, String>> configs = new ArrayList<>();
-        for (Integer i = 0; i < maxTasks; i++) {
-            Map<String, String> props = new HashMap<>(config);
-            props.put(VerifiableSinkTask.ID_CONFIG, i.toString());
-            configs.add(props);
-        }
-        return configs;
-    }
-
-    @Override
-    public void stop() {
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1408c670/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java
b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java
deleted file mode 100644
index 2333452..0000000
--- a/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.connect.tools;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.sink.SinkRecord;
-import org.apache.kafka.connect.sink.SinkTask;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Counterpart to {@link VerifiableSourceTask} that consumes records and logs information
about each to stdout. This
- * allows validation of processing of messages by sink tasks on distributed workers even
in the face of worker restarts
- * and failures. This task relies on the offset management provided by the Kafka Connect
framework and therefore can detect
- * bugs in its implementation.
- */
-public class VerifiableSinkTask extends SinkTask {
-    public static final String NAME_CONFIG = "name";
-    public static final String ID_CONFIG = "id";
-
-    private static final ObjectMapper JSON_SERDE = new ObjectMapper();
-
-    private String name; // Connector name
-    private int id; // Task ID
-
-    private ArrayList<Map<String, Object>> unflushed = new ArrayList<>();
-
-    @Override
-    public String version() {
-        return new VerifiableSinkConnector().version();
-    }
-
-    @Override
-    public void start(Map<String, String> props) {
-        try {
-            name = props.get(NAME_CONFIG);
-            id = Integer.parseInt(props.get(ID_CONFIG));
-        } catch (NumberFormatException e) {
-            throw new ConnectException("Invalid VerifiableSourceTask configuration", e);
-        }
-    }
-
-    @Override
-    public void put(Collection<SinkRecord> records) {
-        long nowMs = System.currentTimeMillis();
-        for (SinkRecord record : records) {
-            Map<String, Object> data = new HashMap<>();
-            data.put("name", name);
-            data.put("task", record.key()); // VerifiableSourceTask's input task (source
partition)
-            data.put("sinkTask", id);
-            data.put("topic", record.topic());
-            data.put("time_ms", nowMs);
-            data.put("seqno", record.value());
-            data.put("offset", record.kafkaOffset());
-            String dataJson;
-            try {
-                dataJson = JSON_SERDE.writeValueAsString(data);
-            } catch (JsonProcessingException e) {
-                dataJson = "Bad data can't be written as json: " + e.getMessage();
-            }
-            System.out.println(dataJson);
-            unflushed.add(data);
-        }
-    }
-
-    @Override
-    public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
-        long nowMs = System.currentTimeMillis();
-        for (Map<String, Object> data : unflushed) {
-            data.put("time_ms", nowMs);
-            data.put("flushed", true);
-            String dataJson;
-            try {
-                dataJson = JSON_SERDE.writeValueAsString(data);
-            } catch (JsonProcessingException e) {
-                dataJson = "Bad data can't be written as json: " + e.getMessage();
-            }
-            System.out.println(dataJson);
-        }
-        unflushed.clear();
-    }
-
-    @Override
-    public void stop() {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1408c670/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
----------------------------------------------------------------------
diff --git a/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
deleted file mode 100644
index 5f9afd5..0000000
--- a/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.connect.tools;
-
-import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.connect.connector.Task;
-import org.apache.kafka.connect.source.SourceConnector;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * @see VerifiableSourceTask
- */
-public class VerifiableSourceConnector extends SourceConnector {
-    private Map<String, String> config;
-
-    @Override
-    public String version() {
-        return AppInfoParser.getVersion();
-    }
-
-    @Override
-    public void start(Map<String, String> props) {
-        this.config = props;
-    }
-
-    @Override
-    public Class<? extends Task> taskClass() {
-        return VerifiableSourceTask.class;
-    }
-
-    @Override
-    public List<Map<String, String>> taskConfigs(int maxTasks) {
-        ArrayList<Map<String, String>> configs = new ArrayList<>();
-        for (Integer i = 0; i < maxTasks; i++) {
-            Map<String, String> props = new HashMap<>(config);
-            props.put(VerifiableSourceTask.ID_CONFIG, i.toString());
-            configs.add(props);
-        }
-        return configs;
-    }
-
-    @Override
-    public void stop() {
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1408c670/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
b/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
deleted file mode 100644
index a85a0e9..0000000
--- a/connect/tools/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.connect.tools;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.source.SourceRecord;
-import org.apache.kafka.connect.source.SourceTask;
-import org.apache.kafka.tools.ThroughputThrottler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A connector primarily intended for system tests. The connector simply generates as many
tasks as requested. The
- * tasks print metadata in the form of JSON to stdout for each message generated, making
externally visible which
- * messages have been sent. Each message is also assigned a unique, increasing seqno that
is passed to Kafka Connect; when
- * tasks are started on new nodes, this seqno is used to resume where the task previously
left off, allowing for
- * testing of distributed Kafka Connect.
- *
- * If logging is left enabled, log output on stdout can be easily ignored by checking whether
a given line is valid JSON.
- */
-public class VerifiableSourceTask extends SourceTask {
-    private static final Logger log = LoggerFactory.getLogger(VerifiableSourceTask.class);
-
-    public static final String NAME_CONFIG = "name";
-    public static final String ID_CONFIG = "id";
-    public static final String TOPIC_CONFIG = "topic";
-    public static final String THROUGHPUT_CONFIG = "throughput";
-
-    private static final String ID_FIELD = "id";
-    private static final String SEQNO_FIELD = "seqno";
-
-    private static final ObjectMapper JSON_SERDE = new ObjectMapper();
-
-    private String name; // Connector name
-    private int id; // Task ID
-    private String topic;
-    private Map<String, Integer> partition;
-    private long startingSeqno;
-    private long seqno;
-    private ThroughputThrottler throttler;
-
-    @Override
-    public String version() {
-        return new VerifiableSourceConnector().version();
-    }
-
-    @Override
-    public void start(Map<String, String> props) {
-        final long throughput;
-        try {
-            name = props.get(NAME_CONFIG);
-            id = Integer.parseInt(props.get(ID_CONFIG));
-            topic = props.get(TOPIC_CONFIG);
-            throughput = Long.parseLong(props.get(THROUGHPUT_CONFIG));
-        } catch (NumberFormatException e) {
-            throw new ConnectException("Invalid VerifiableSourceTask configuration", e);
-        }
-
-        partition = Collections.singletonMap(ID_FIELD, id);
-        Map<String, Object> previousOffset = this.context.offsetStorageReader().offset(partition);
-        if (previousOffset != null)
-            seqno = (Long) previousOffset.get(SEQNO_FIELD) + 1;
-        else
-            seqno = 0;
-        startingSeqno = seqno;
-        throttler = new ThroughputThrottler(throughput, System.currentTimeMillis());
-
-        log.info("Started VerifiableSourceTask {}-{} producing to topic {} resuming from
seqno {}", name, id, topic, startingSeqno);
-    }
-
-    @Override
-    public List<SourceRecord> poll() throws InterruptedException {
-        long sendStartMs = System.currentTimeMillis();
-        if (throttler.shouldThrottle(seqno - startingSeqno, sendStartMs))
-            throttler.throttle();
-
-        long nowMs = System.currentTimeMillis();
-
-        Map<String, Object> data = new HashMap<>();
-        data.put("name", name);
-        data.put("task", id);
-        data.put("topic", this.topic);
-        data.put("time_ms", nowMs);
-        data.put("seqno", seqno);
-        String dataJson;
-        try {
-            dataJson = JSON_SERDE.writeValueAsString(data);
-        } catch (JsonProcessingException e) {
-            dataJson = "Bad data can't be written as json: " + e.getMessage();
-        }
-        System.out.println(dataJson);
-
-        Map<String, Long> ccOffset = Collections.singletonMap(SEQNO_FIELD, seqno);
-        SourceRecord srcRecord = new SourceRecord(partition, ccOffset, topic, Schema.INT32_SCHEMA,
id, Schema.INT64_SCHEMA, seqno);
-        List<SourceRecord> result = Arrays.asList(srcRecord);
-        seqno++;
-        return result;
-    }
-
-    @Override
-    public void stop() {
-        throttler.wakeup();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1408c670/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index d1543c3..3d69fac 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -15,4 +15,4 @@
 
 apply from: file('scala.gradle')
 include 'core', 'examples', 'clients', 'tools', 'streams', 'log4j-appender',
-        'connect-api', 'connect-runtime', 'connect-json', 'connect-file', 'connect-tools'
+        'connect:api', 'connect:runtime', 'connect:json', 'connect:file'

http://git-wip-us.apache.org/repos/asf/kafka/blob/1408c670/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java b/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java
index a3bcd2f..a58a535 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java
@@ -13,7 +13,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- **/
+ */
 
 package org.apache.kafka.tools;
 


Mime
View raw message