kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3421: Connect developer guide update and several fixes
Date Fri, 13 May 2016 01:16:06 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 056a78dff -> c6e9717df


KAFKA-3421: Connect developer guide update and several fixes

This is a follow up of KAKFA-3421 to update the connect developer guide to include the configuration
validation. Also includes a couple of minor fixes.

Author: Liquan Pei <liquanpei@gmail.com>

Reviewers: Jason Gustafson <jason@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1366 from Ishiihara/connect-dev-doc

(cherry picked from commit 527b98d82f5142ab6a5efc26e84f6b0a21aec062)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c6e9717d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c6e9717d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c6e9717d

Branch: refs/heads/0.10.0
Commit: c6e9717dfd3b9a9c16c32cb7ac64abc7ce3ebe6c
Parents: 056a78d
Author: Liquan Pei <liquanpei@gmail.com>
Authored: Thu May 12 18:14:37 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Thu May 12 18:15:43 2016 -0700

----------------------------------------------------------------------
 config/connect-distributed.properties | 14 ++++++--
 docs/connect.html                     | 54 +++++++++++++++++++++---------
 2 files changed, 50 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e9717d/config/connect-distributed.properties
----------------------------------------------------------------------
diff --git a/config/connect-distributed.properties b/config/connect-distributed.properties
index b25339f..931b853 100644
--- a/config/connect-distributed.properties
+++ b/config/connect-distributed.properties
@@ -18,6 +18,7 @@
 # These are defaults. This file just demonstrates how to override some settings.
 bootstrap.servers=localhost:9092
 
+# unique name for the cluster, used in forming the Connect cluster group. Note that this
must not conflict with consumer group IDs
 group.id=connect-cluster
 
 # The converters specify the format of data in Kafka and how to translate it into Connect
data. Every Connect user will
@@ -36,8 +37,15 @@ internal.value.converter=org.apache.kafka.connect.json.JsonConverter
 internal.key.converter.schemas.enable=false
 internal.value.converter.schemas.enable=false
 
+# Topic to use for storing offsets. This topic should have many partitions and be replicated.
 offset.storage.topic=connect-offsets
-# Flush much faster than normal, which is useful for testing/debugging
-offset.flush.interval.ms=10000
+
+# Topic to use for storing connector and task configurations; note that this should be a
single partition, highly replicated topic.
+# You may need to manually create the topic to ensure single partition for the config topic
as auto created topics may have multiple partitions.
 config.storage.topic=connect-configs
-status.storage.topic=connect-status
\ No newline at end of file
+
+# Topic to use for storing statuses. This topic can have multiple partitions and should be
replicated.
+status.storage.topic=connect-status
+
+# Flush much faster than normal, which is useful for testing/debugging
+offset.flush.interval.ms=10000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e9717d/docs/connect.html
----------------------------------------------------------------------
diff --git a/docs/connect.html b/docs/connect.html
index a362dde..4ba406e 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -53,15 +53,17 @@ Distributed mode handles automatic balancing of work, allows you to scale
up (or
 &gt; bin/connect-distributed.sh config/connect-distributed.properties
 </pre>
 
-The difference is in the class which is started and the configuration parameters which change
how the Kafka Connect process decides where to store configurations, how to assign work, and
where to store offsets. In particular, the following configuration parameters are critical
to set before starting your cluster:
+The difference is in the class which is started and the configuration parameters which change
how the Kafka Connect process decides where to store configurations, how to assign work, and
where to store offsets and task statues. In the distributed mode, Kafka Connect stores the
offsets, configs and task statuses in Kafka topics. It is recommended to manually create the
topics for offset, configs and statuses in order to achieve the desired the number of partitions
and replication factors. If the topics are not yet created when starting Kafka Connect, the
topics will be auto created with default number of partitions and replication factor, which
may not be best suited for its usage.
 
+In particular, the following configuration parameters are critical to set before starting
your cluster:
 <ul>
     <li><code>group.id</code> (default <code>connect-cluster</code>)
- unique name for the cluster, used in forming the Connect cluster group; note that this <b>must
not conflict</b> with consumer group IDs</li>
-    <li><code>config.storage.topic</code> (default <code>connect-configs</code>)
- topic to use for storing connector and task configurations; note that this should be a single
partition, highly replicated topic</li>
-    <li><code>offset.storage.topic</code> (default <code>connect-offsets</code>)
- topic to use for ; this topic should have many partitions and be replicated</li>
+    <li><code>config.storage.topic</code> (default <code>connect-configs</code>)
- topic to use for storing connector and task configurations; note that this should be a single
partition, highly replicated topic. You may need to manually create the topic to ensure single
partition for the config topic as auto created topics may have multiple partitions.</li>
+    <li><code>offset.storage.topic</code> (default <code>connect-offsets</code>)
- topic to use for storing offsets; this topic should have many partitions and be replicated</li>
+    <li><code>status.storage.topic</code> (default <code>connect-status</code>)
- topic to use for storing statuses; this topic can have multiple partitions and should be
replicated</li>
 </ul>
 
-Note that in distributed mode the connector configurations are not passed on the command
line. Instead, use the REST API described below to create, modify, and destroy connectors.
+Note that in distributed mode the connector configurations are not passed on the command
line. Instead, use the REST API described below to create, modify, and destroy connectors.

 
 
 <h4><a id="connect_configuring" href="#connect_configuring">Configuring Connectors</a></h4>
@@ -158,7 +160,7 @@ The easiest method to fill in is <code>getTaskClass()</code>,
which defines the
 
 <pre>
 @Override
-public Class<? extends Task> getTaskClass() {
+public Class&lt;? extends Task&gt; getTaskClass() {
     return FileStreamSourceTask.class;
 }
 </pre>
@@ -179,7 +181,7 @@ public void stop() {
 }
 </pre>
 
-Finally, the real core of the implementation is in <code>getTaskConfigs()</code>.
In this case we're only
+Finally, the real core of the implementation is in <code>getTaskConfigs()</code>.
In this case we are only
 handling a single file, so even though we may be permitted to generate more tasks as per
the
 <code>maxTasks</code> argument, we return a list with only one entry:
 
@@ -225,7 +227,7 @@ public class FileStreamSourceTask extends SourceTask&lt;Object, Object&gt;
{
 
     @Override
     public synchronized void stop() {
-        stream.close()
+        stream.close();
     }
 </pre>
 
@@ -241,8 +243,8 @@ public List&lt;SourceRecord&gt; poll() throws InterruptedException
{
         while (streamValid(stream) &amp;&amp; records.isEmpty()) {
             LineAndOffset line = readToNextLine(stream);
             if (line != null) {
-                Map<String, Object> sourcePartition = Collections.singletonMap("filename",
filename);
-                Map<String, Object> sourceOffset = Collections.singletonMap("position",
streamOffset);
+                Map&lt;String, Object&gt; sourcePartition = Collections.singletonMap("filename",
filename);
+                Map&lt;String, Object&gt; sourceOffset = Collections.singletonMap("position",
streamOffset);
                 records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA,
line));
             } else {
                 Thread.sleep(1);
@@ -267,11 +269,13 @@ The previous section described how to implement a simple <code>SourceTask</code>
 
 <pre>
 public abstract class SinkTask implements Task {
-public void initialize(SinkTaskContext context) { ... }
-
-public abstract void put(Collection&lt;SinkRecord&gt; records);
+    public void initialize(SinkTaskContext context) {
+        this.context = context;
+    }
 
-public abstract void flush(Map&lt;TopicPartition, Long&gt; offsets);
+    public abstract void put(Collection&lt;SinkRecord&gt; records);
+     
+    public abstract void flush(Map&lt;TopicPartition, Long&gt; offsets);
 </pre>
 
 The <code>SinkTask</code> documentation contains full details, but this interface
is nearly as simple as the <code>SourceTask</code>. The <code>put()</code>
method should contain most of the implementation, accepting sets of <code>SinkRecords</code>,
performing any required translation, and storing them in the destination system. This method
does not need to ensure the data has been fully written to the destination system before returning.
In fact, in many cases internal buffering will be useful so an entire batch of records can
be sent at once, reducing the overhead of inserting events into the downstream data store.
The <code>SinkRecords</code> contain essentially the same information as <code>SourceRecords</code>:
Kafka topic, partition, offset and the event key and value.
@@ -305,8 +309,8 @@ Kafka Connect is intended to define bulk data copying jobs, such as copying
an e
 Source connectors need to monitor the source system for changes, e.g. table additions/deletions
in a database. When they pick up changes, they should notify the framework via the <code>ConnectorContext</code>
object that reconfiguration is necessary. For example, in a <code>SourceConnector</code>:
 
 <pre>
-if (inputsChanged())
-    this.context.requestTaskReconfiguration();
+    if (inputsChanged())
+        this.context.requestTaskReconfiguration();
 </pre>
 
 The framework will promptly request new configuration information and update the tasks, allowing
them to gracefully commit their progress before reconfiguring them. Note that in the <code>SourceConnector</code>
this monitoring is currently left up to the connector implementation. If an extra thread is
required to perform this monitoring, the connector must allocate it itself.
@@ -315,6 +319,26 @@ Ideally this code for monitoring changes would be isolated to the <code>Connecto
 
 <code>SinkConnectors</code> usually only have to handle the addition of streams,
which may translate to new entries in their outputs (e.g., a new database table). The framework
manages any changes to the Kafka input, such as when the set of input topics changes because
of a regex subscription. <code>SinkTasks</code> should expect new input streams,
which may require creating new resources in the downstream system, such as a new table in
a database. The trickiest situation to handle in these cases may be conflicts between multiple
<code>SinkTasks</code> seeing a new input stream for the first time and simultaneously
trying to create the new resource. <code>SinkConnectors</code>, on the other hand,
will generally require no special code for handling a dynamic set of streams.
 
+<h4><a id="connect_configs" href="#connect_configs">Connect Configuration Validation</a></h4>
+
+Kafka Connect allows you to validate connector configurations before submitting a connector
to be executed and can provide feedback about errors and recommended values. To take advantage
of this, connector developers need to provide an implementation of <code>config()</code>
to expose the configuration definition to the framework.
+
+The following code in <code>FileStreamSourceConnector</code> defines the configuration
and exposes it to the framework.
+
+<pre>
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+        .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.")
+        .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to");
+
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+</pre>
+
+<code>ConfigDef</code> class is used for specifying the set of expected configurations.
For each configuration, you can specify the name, the type, the default value, the documentation,
the group information, the order in the group, the width of the configuration value and the
name suitable for display in the UI. Plus, you can provide special validation logic used for
single configuration validation by overriding the <code>Validator</code> class.
Moreover, as there may be dependencies between configurations, for example, the valid values
and visibility of a configuration may change according to the values of other configurations.
To handle this, <code>ConfigDef</code> allows you to specify the dependents of
a configuration and to provide an implementation of <code>Recommender</code> to
get valid values and set visibility of a configuration given the current configuration values.
+
+Also, the <code>validate()</code> method in <code>Connector</code>
provides a default validation implementation which returns a list of allowed configurations
together with configuration errors and recommended values for each configuration. However,
it does not use the recommended values for configuration validation. You may provide an override
of the default implementation for customized configuration validation, which may use the recommended
values.
+
 <h4><a id="connect_schemas" href="#connect_schemas">Working with Schemas</a></h4>
 
 The FileStream connectors are good examples because they are simple, but they also have trivially
structured data -- each line is just a string. Almost all practical connectors will need schemas
with more complex data formats.


Mime
View raw message