kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3920: Add Schema source connector to Kafka Connect
Date Fri, 08 Jul 2016 17:57:03 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 730bf9a37 -> 542350d61


KAFKA-3920: Add Schema source connector to Kafka Connect

Author: Liquan Pei <liquanpei@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1574 from Ishiihara/schema-source


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/542350d6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/542350d6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/542350d6

Branch: refs/heads/trunk
Commit: 542350d616e4638958d133d1d5599acf8546ed06
Parents: 730bf9a
Author: Liquan Pei <liquanpei@gmail.com>
Authored: Fri Jul 8 10:56:58 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Fri Jul 8 10:56:58 2016 -0700

----------------------------------------------------------------------
 .../kafka/connect/runtime/AbstractHerder.java   |   6 +-
 .../connect/tools/SchemaSourceConnector.java    |  68 ++++++++
 .../kafka/connect/tools/SchemaSourceTask.java   | 174 +++++++++++++++++++
 .../resources/ConnectorPluginsResourceTest.java |   2 +
 4 files changed, 248 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/542350d6/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 1130268..44da042 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -35,6 +35,7 @@ import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.tools.MockConnector;
 import org.apache.kafka.connect.tools.MockSinkConnector;
 import org.apache.kafka.connect.tools.MockSourceConnector;
+import org.apache.kafka.connect.tools.SchemaSourceConnector;
 import org.apache.kafka.connect.tools.VerifiableSinkConnector;
 import org.apache.kafka.connect.tools.VerifiableSourceConnector;
 import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -91,8 +92,9 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener,
Con
     private static final Object LOCK = new Object();
     private Thread classPathTraverser;
     private static final List<Class<? extends Connector>> EXCLUDES = Arrays.asList(
-            VerifiableSourceConnector.class, VerifiableSinkConnector.class,
-            MockConnector.class, MockSourceConnector.class, MockSinkConnector.class);
+        VerifiableSourceConnector.class, VerifiableSinkConnector.class,
+        MockConnector.class, MockSourceConnector.class, MockSinkConnector.class,
+        SchemaSourceConnector.class);
 
     public AbstractHerder(Worker worker,
                           String workerId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/542350d6/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceConnector.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceConnector.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceConnector.java
new file mode 100644
index 0000000..249ed71
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceConnector.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.tools;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.source.SourceConnector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SchemaSourceConnector extends SourceConnector {
+
+    private Map<String, String> config;
+
+    @Override
+    public String version() {
+        return AppInfoParser.getVersion();
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        this.config = props;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return SchemaSourceTask.class;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        ArrayList<Map<String, String>> configs = new ArrayList<>();
+        for (Integer i = 0; i < maxTasks; i++) {
+            Map<String, String> props = new HashMap<>(config);
+            props.put(SchemaSourceTask.ID_CONFIG, i.toString());
+            configs.add(props);
+        }
+        return configs;
+    }
+
+    @Override
+    public void stop() {
+    }
+
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/542350d6/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
new file mode 100644
index 0000000..23f8d2f
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.tools;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.tools.ThroughputThrottler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class SchemaSourceTask extends SourceTask {
+
+    private static final Logger log = LoggerFactory.getLogger(SchemaSourceTask.class);
+
+    public static final String NAME_CONFIG = "name";
+    public static final String ID_CONFIG = "id";
+    public static final String TOPIC_CONFIG = "topic";
+    public static final String NUM_MSGS_CONFIG = "num.messages";
+    public static final String THROUGHPUT_CONFIG = "throughput";
+    public static final String MULTIPLE_SCHEMA_CONFIG = "multiple.schema";
+    public static final String PARTITION_COUNT_CONFIG = "partition.count";
+
+    private static final String ID_FIELD = "id";
+    private static final String SEQNO_FIELD = "seqno";
+    private ThroughputThrottler throttler;
+
+    private String name; // Connector name
+    private int id; // Task ID
+    private String topic;
+    private Map<String, Integer> partition;
+    private long startingSeqno;
+    private long seqno;
+    private long count;
+    private long maxNumMsgs;
+    private boolean multipleSchema;
+    private int partitionCount;
+
+    private static Schema valueSchema = SchemaBuilder.struct().version(1).name("record")
+        .field("boolean", Schema.BOOLEAN_SCHEMA)
+        .field("int", Schema.INT32_SCHEMA)
+        .field("long", Schema.INT64_SCHEMA)
+        .field("float", Schema.FLOAT32_SCHEMA)
+        .field("double", Schema.FLOAT64_SCHEMA)
+        .field("partitioning", Schema.INT32_SCHEMA)
+        .field("id", Schema.INT32_SCHEMA)
+        .field("seqno", Schema.INT64_SCHEMA)
+        .build();
+
+    private static Schema valueSchema2 = SchemaBuilder.struct().version(2).name("record")
+        .field("boolean", Schema.BOOLEAN_SCHEMA)
+        .field("int", Schema.INT32_SCHEMA)
+        .field("long", Schema.INT64_SCHEMA)
+        .field("float", Schema.FLOAT32_SCHEMA)
+        .field("double", Schema.FLOAT64_SCHEMA)
+        .field("partitioning", Schema.INT32_SCHEMA)
+        .field("string", SchemaBuilder.string().defaultValue("abc").build())
+        .field("id", Schema.INT32_SCHEMA)
+        .field("seqno", Schema.INT64_SCHEMA)
+        .build();
+
+    public String version() {
+        return new SchemaSourceConnector().version();
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        final long throughput;
+        try {
+            name = props.get(NAME_CONFIG);
+            id = Integer.parseInt(props.get(ID_CONFIG));
+            topic = props.get(TOPIC_CONFIG);
+            maxNumMsgs = Long.parseLong(props.get(NUM_MSGS_CONFIG));
+            multipleSchema = Boolean.parseBoolean(props.get(MULTIPLE_SCHEMA_CONFIG));
+            partitionCount = Integer.parseInt(props.containsKey(PARTITION_COUNT_CONFIG) ?
props.get(PARTITION_COUNT_CONFIG) : "1");
+            throughput = Long.parseLong(props.get(THROUGHPUT_CONFIG));
+        } catch (NumberFormatException e) {
+            throw new ConnectException("Invalid SchemaSourceTask configuration", e);
+        }
+
+        throttler = new ThroughputThrottler(throughput, System.currentTimeMillis());
+        partition = Collections.singletonMap(ID_FIELD, id);
+        Map<String, Object> previousOffset = this.context.offsetStorageReader().offset(partition);
+        if (previousOffset != null) {
+            seqno = (Long) previousOffset.get(SEQNO_FIELD) + 1;
+        } else {
+            seqno = 0;
+        }
+        startingSeqno = seqno;
+        count = 0;
+        log.info("Started SchemaSourceTask {}-{} producing to topic {} resuming from seqno
{}", name, id, topic, startingSeqno);
+    }
+
+    @Override
+    public List<SourceRecord> poll() throws InterruptedException {
+        if (count < maxNumMsgs) {
+            long sendStartMs = System.currentTimeMillis();
+            if (throttler.shouldThrottle(seqno - startingSeqno, sendStartMs)) {
+                throttler.throttle();
+            }
+
+            Map<String, Long> ccOffset = Collections.singletonMap(SEQNO_FIELD, seqno);
+            int partitionVal = (int) (seqno % partitionCount);
+            final Struct data;
+            final SourceRecord srcRecord;
+            if (!multipleSchema || count % 2 == 0) {
+                data = new Struct(valueSchema)
+                    .put("boolean", true)
+                    .put("int", 12)
+                    .put("long", 12L)
+                    .put("float", 12.2f)
+                    .put("double", 12.2)
+                    .put("partitioning", partitionVal)
+                    .put("id", id)
+                    .put("seqno", seqno);
+
+                srcRecord = new SourceRecord(partition, ccOffset, topic, id, Schema.STRING_SCHEMA,
"key", valueSchema, data);
+            } else {
+                data = new Struct(valueSchema2)
+                    .put("boolean", true)
+                    .put("int", 12)
+                    .put("long", 12L)
+                    .put("float", 12.2f)
+                    .put("double", 12.2)
+                    .put("partitioning", partitionVal)
+                    .put("string", "def")
+                    .put("id", id)
+                    .put("seqno", seqno);
+
+                srcRecord = new SourceRecord(partition, ccOffset, topic, id, Schema.STRING_SCHEMA,
"key", valueSchema2, data);
+            }
+
+            System.out.println("{\"task\": " + id + ", \"seqno\": " + seqno + "}");
+            List<SourceRecord> result = Arrays.asList(srcRecord);
+            seqno++;
+            count++;
+            return result;
+        } else {
+            synchronized (this) {
+                this.wait();
+            }
+            return new ArrayList<>();
+        }
+    }
+
+    @Override
+    public void stop() {
+        throttler.wakeup();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/542350d6/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index c7f532b..e8ee93d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -42,6 +42,7 @@ import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.tools.MockConnector;
 import org.apache.kafka.connect.tools.MockSinkConnector;
 import org.apache.kafka.connect.tools.MockSourceConnector;
+import org.apache.kafka.connect.tools.SchemaSourceConnector;
 import org.apache.kafka.connect.tools.VerifiableSinkConnector;
 import org.apache.kafka.connect.tools.VerifiableSourceConnector;
 import org.easymock.EasyMock;
@@ -171,6 +172,7 @@ public class ConnectorPluginsResourceTest {
         assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSourceConnector.class.getCanonicalName())));
         assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSinkConnector.class.getCanonicalName())));
         assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockConnector.class.getCanonicalName())));
+        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SchemaSourceConnector.class.getCanonicalName())));
         assertTrue(connectorPlugins.contains(new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class.getCanonicalName())));
     }
 


Mime
View raw message