kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: fix indention in <pre> tags
Date Thu, 16 Feb 2017 01:02:18 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk abe5e0e48 -> af182480d


MINOR: fix indention in <pre> tags

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #2553 from mjsax/hotfixDocs2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/af182480
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/af182480
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/af182480

Branch: refs/heads/trunk
Commit: af182480d2f31f8ef44764fc8cfa250ec76a878f
Parents: abe5e0e
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Wed Feb 15 17:02:15 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Feb 15 17:02:15 2017 -0800

----------------------------------------------------------------------
 docs/streams.html | 392 ++++++++++++++++++++++++-------------------------
 1 file changed, 196 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/af182480/docs/streams.html
----------------------------------------------------------------------
diff --git a/docs/streams.html b/docs/streams.html
index d9afee8..6461f86 100644
--- a/docs/streams.html
+++ b/docs/streams.html
@@ -306,60 +306,60 @@
         The following example <code>Processor</code> implementation defines a
simple word-count algorithm:
         </p>
 
-        <pre>
-            public class MyProcessor extends Processor&lt;String, String&gt; {
-                private ProcessorContext context;
-                private KeyValueStore&lt;String, Long&gt; kvStore;
-
-                @Override
-                @SuppressWarnings("unchecked")
-                public void init(ProcessorContext context) {
-                    // keep the processor context locally because we need it in punctuate()
and commit()
-                    this.context = context;
-
-                    // call this processor's punctuate() method every 1000 milliseconds.
-                    this.context.schedule(1000);
-
-                    // retrieve the key-value store named "Counts"
-                    this.kvStore = (KeyValueStore&lt;String, Long&gt;) context.getStateStore("Counts");
-                }
-
-                @Override
-                public void process(String dummy, String line) {
-                    String[] words = line.toLowerCase().split(" ");
-
-                    for (String word : words) {
-                        Long oldValue = this.kvStore.get(word);
-
-                        if (oldValue == null) {
-                            this.kvStore.put(word, 1L);
-                        } else {
-                            this.kvStore.put(word, oldValue + 1L);
-                        }
-                    }
-                }
-
-                @Override
-                public void punctuate(long timestamp) {
-                    KeyValueIterator&lt;String, Long&gt; iter = this.kvStore.all();
-
-                    while (iter.hasNext()) {
-                        KeyValue&lt;String, Long&gt; entry = iter.next();
-                        context.forward(entry.key, entry.value.toString());
-                    }
-
-                    iter.close();
-                    // commit the current processing progress
-                    context.commit();
-                }
-
-                @Override
-                public void close() {
-                    // close the key-value store
-                    this.kvStore.close();
-                }
-            };
-        </pre>
+<pre>
+public class MyProcessor extends Processor&lt;String, String&gt; {
+    private ProcessorContext context;
+    private KeyValueStore&lt;String, Long&gt; kvStore;
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void init(ProcessorContext context) {
+        // keep the processor context locally because we need it in punctuate() and commit()
+        this.context = context;
+
+        // call this processor's punctuate() method every 1000 milliseconds.
+        this.context.schedule(1000);
+
+        // retrieve the key-value store named "Counts"
+        this.kvStore = (KeyValueStore&lt;String, Long&gt;) context.getStateStore("Counts");
+    }
+
+    @Override
+    public void process(String dummy, String line) {
+        String[] words = line.toLowerCase().split(" ");
+
+        for (String word : words) {
+            Long oldValue = this.kvStore.get(word);
+
+            if (oldValue == null) {
+                this.kvStore.put(word, 1L);
+            } else {
+                this.kvStore.put(word, oldValue + 1L);
+            }
+        }
+    }
+
+    @Override
+    public void punctuate(long timestamp) {
+        KeyValueIterator&lt;String, Long&gt; iter = this.kvStore.all();
+
+        while (iter.hasNext()) {
+            KeyValue&lt;String, Long&gt; entry = iter.next();
+            context.forward(entry.key, entry.value.toString());
+        }
+
+        iter.close();
+        // commit the current processing progress
+        context.commit();
+    }
+
+    @Override
+    public void close() {
+        // close the key-value store
+        this.kvStore.close();
+    }
+};
+</pre>
 
         <p>
         In the above implementation, the following actions are performed:
@@ -379,31 +379,31 @@
         by connecting these processors together:
         </p>
 
-        <pre>
-            TopologyBuilder builder = new TopologyBuilder();
+<pre>
+TopologyBuilder builder = new TopologyBuilder();
 
-            builder.addSource("SOURCE", "src-topic")
-                // add "PROCESS1" node which takes the source processor "SOURCE" as its upstream
processor
-                .addProcessor("PROCESS1", () -> new MyProcessor1(), "SOURCE")
+builder.addSource("SOURCE", "src-topic")
+    // add "PROCESS1" node which takes the source processor "SOURCE" as its upstream processor
+    .addProcessor("PROCESS1", () -> new MyProcessor1(), "SOURCE")
 
-                // add "PROCESS2" node which takes "PROCESS1" as its upstream processor
-                .addProcessor("PROCESS2", () -> new MyProcessor2(), "PROCESS1")
+    // add "PROCESS2" node which takes "PROCESS1" as its upstream processor
+    .addProcessor("PROCESS2", () -> new MyProcessor2(), "PROCESS1")
 
-                // add "PROCESS3" node which takes "PROCESS1" as its upstream processor
-                .addProcessor("PROCESS3", () -> new MyProcessor3(), "PROCESS1")
+    // add "PROCESS3" node which takes "PROCESS1" as its upstream processor
+    .addProcessor("PROCESS3", () -> new MyProcessor3(), "PROCESS1")
 
-                // add the sink processor node "SINK1" that takes Kafka topic "sink-topic1"
-                // as output and the "PROCESS1" node as its upstream processor
-                .addSink("SINK1", "sink-topic1", "PROCESS1")
+    // add the sink processor node "SINK1" that takes Kafka topic "sink-topic1"
+    // as output and the "PROCESS1" node as its upstream processor
+    .addSink("SINK1", "sink-topic1", "PROCESS1")
 
-                // add the sink processor node "SINK2" that takes Kafka topic "sink-topic2"
-                // as output and the "PROCESS2" node as its upstream processor
-                .addSink("SINK2", "sink-topic2", "PROCESS2")
+    // add the sink processor node "SINK2" that takes Kafka topic "sink-topic2"
+    // as output and the "PROCESS2" node as its upstream processor
+    .addSink("SINK2", "sink-topic2", "PROCESS2")
 
-                // add the sink processor node "SINK3" that takes Kafka topic "sink-topic3"
-                // as output and the "PROCESS3" node as its upstream processor
-                .addSink("SINK3", "sink-topic3", "PROCESS3");
-        </pre>
+    // add the sink processor node "SINK3" that takes Kafka topic "sink-topic3"
+    // as output and the "PROCESS3" node as its upstream processor
+    .addSink("SINK3", "sink-topic3", "PROCESS3");
+</pre>
 
         There are several steps in the above code to build the topology, and here is a quick
walk through:
 
@@ -423,13 +423,13 @@
         In the following example, a persistent key-value store named “Counts” with key
type <code>String</code> and value type <code>Long</code> is created.
         </p>
 
-        <pre>
-            StateStoreSupplier countStore = Stores.create("Counts")
-              .withKeys(Serdes.String())
-              .withValues(Serdes.Long())
-              .persistent()
-              .build();
-        </pre>
+<pre>
+StateStoreSupplier countStore = Stores.create("Counts")
+    .withKeys(Serdes.String())
+    .withValues(Serdes.Long())
+    .persistent()
+    .build();
+</pre>
 
         <p>
         To take advantage of these state stores, developers can use the <code>TopologyBuilder.addStateStore</code>
method when building the
@@ -437,24 +437,24 @@
         state store with the existing processor nodes through <code>TopologyBuilder.connectProcessorAndStateStores</code>.
         </p>
 
-        <pre>
-            TopologyBuilder builder = new TopologyBuilder();
+<pre>
+TopologyBuilder builder = new TopologyBuilder();
 
-            builder.addSource("SOURCE", "src-topic")
+builder.addSource("SOURCE", "src-topic")
 
-                .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
-                // add the created state store "COUNTS" associated with processor "PROCESS1"
-                .addStateStore(countStore, "PROCESS1")
-                .addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that
can generate MyProcessor3 */, "PROCESS1")
-                .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that
can generate MyProcessor3 */, "PROCESS1")
+    .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
+    // add the created state store "COUNTS" associated with processor "PROCESS1"
+    .addStateStore(countStore, "PROCESS1")
+    .addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate
MyProcessor3 */, "PROCESS1")
+    .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate
MyProcessor3 */, "PROCESS1")
 
-                // connect the state store "COUNTS" with processor "PROCESS2"
-                .connectProcessorAndStateStores("PROCESS2", "COUNTS");
+    // connect the state store "COUNTS" with processor "PROCESS2"
+    .connectProcessorAndStateStores("PROCESS2", "COUNTS");
 
-                .addSink("SINK1", "sink-topic1", "PROCESS1")
-                .addSink("SINK2", "sink-topic2", "PROCESS2")
-                .addSink("SINK3", "sink-topic3", "PROCESS3");
-        </pre>
+    .addSink("SINK1", "sink-topic1", "PROCESS1")
+    .addSink("SINK2", "sink-topic2", "PROCESS2")
+    .addSink("SINK3", "sink-topic3", "PROCESS3");
+</pre>
 
         In the next section we present another way to build the processor topology: the Kafka
Streams DSL.
         <br>
@@ -511,9 +511,9 @@
 
         To illustrate the difference between KStreams and KTables/GlobalKTables, let’s
imagine the following two data records are being sent to the stream:
 
-        <pre>
-            ("alice", 1) --> ("alice", 3)
-        </pre>
+<pre>
+("alice", 1) --> ("alice", 3)
+</pre>
 
         If these records a KStream and the stream processing application were to sum the
values it would return <code>4</code>. If these records were a KTable or GlobalKTable,
the return would be <code>3</code>, since the last record would be considered
as an update.
 
@@ -525,13 +525,13 @@
         from a single topic).
         </p>
 
-        <pre>
-            KStreamBuilder builder = new KStreamBuilder();
+<pre>
+KStreamBuilder builder = new KStreamBuilder();
 
-            KStream&lt;String, GenericRecord&gt; source1 = builder.stream("topic1",
"topic2");
-            KTable&lt;String, GenericRecord&gt; source2 = builder.table("topic3",
"stateStoreName");
-            GlobalKTable&lt;String, GenericRecord&gt; source2 = builder.globalTable("topic4",
"globalStoreName");
-        </pre>
+KStream&lt;String, GenericRecord&gt; source1 = builder.stream("topic1", "topic2");
+KTable&lt;String, GenericRecord&gt; source2 = builder.table("topic3", "stateStoreName");
+GlobalKTable&lt;String, GenericRecord&gt; source2 = builder.globalTable("topic4",
"globalStoreName");
+</pre>
 
         <h4><a id="streams_dsl_windowing" href="#streams_dsl_windowing">Windowing
a stream</a></h4>
         A stream processor may need to divide data records into time buckets, i.e. to <b>window</b>
the stream by time. This is usually needed for join and aggregation operations, etc. Kafka
Streams currently defines the following types of windows:
@@ -605,10 +605,10 @@
 
         </p>
 
-        <pre>
-            // written in Java 8+, using lambda expressions
-            KStream&lt;String, GenericRecord&gt; mapped = source1.mapValue(record
-> record.get("category"));
-        </pre>
+<pre>
+// written in Java 8+, using lambda expressions
+KStream&lt;String, GenericRecord&gt; mapped = source1.mapValue(record -> record.get("category"));
+</pre>
 
         <p>
         Stateless transformations, by definition, do not depend on any state for processing,
and hence implementation-wise
@@ -619,19 +619,19 @@
         based on them.
         </p>
 
-        <pre>
-            // written in Java 8+, using lambda expressions
-            KTable&lt;Windowed&lt;String&gt;, Long&gt; counts = source1.groupByKey().aggregate(
-                () -> 0L,  // initial value
-                (aggKey, value, aggregate) -> aggregate + 1L,   // aggregating value
-                TimeWindows.of("counts", 5000L).advanceBy(1000L), // intervals in milliseconds
-                Serdes.Long() // serde for aggregated value
-            );
+<pre>
+// written in Java 8+, using lambda expressions
+KTable&lt;Windowed&lt;String&gt;, Long&gt; counts = source1.groupByKey().aggregate(
+    () -> 0L,  // initial value
+    (aggKey, value, aggregate) -> aggregate + 1L,   // aggregating value
+    TimeWindows.of("counts", 5000L).advanceBy(1000L), // intervals in milliseconds
+    Serdes.Long() // serde for aggregated value
+);
 
-            KStream&lt;String, String&gt; joined = source1.leftJoin(source2,
-                (record1, record2) -> record1.get("user") + "-" + record2.get("region");
-            );
-        </pre>
+KStream&lt;String, String&gt; joined = source1.leftJoin(source2,
+    (record1, record2) -> record1.get("user") + "-" + record2.get("region");
+);
+</pre>
 
         <h4><a id="streams_dsl_sink" href="#streams_dsl_sink">Write streams back
to Kafka</a></h4>
 
@@ -640,21 +640,21 @@
         <code>KStream.to</code> and <code>KTable.to</code>.
         </p>
 
-        <pre>
-            joined.to("topic4");
-        </pre>
+<pre>
+joined.to("topic4");
+</pre>
 
         If your application needs to continue reading and processing the records after they
have been materialized
         to a topic via <code>to</code> above, one option is to construct a new
stream that reads from the output topic;
         Kafka Streams provides a convenience method called <code>through</code>:
 
-        <pre>
-            // equivalent to
-            //
-            // joined.to("topic4");
-            // materialized = builder.stream("topic4");
-            KStream&lt;String, String&gt; materialized = joined.through("topic4");
-        </pre>
+<pre>
+// equivalent to
+//
+// joined.to("topic4");
+// materialized = builder.stream("topic4");
+KStream&lt;String, String&gt; materialized = joined.through("topic4");
+</pre>
         <br>
 
         <h3><a id="streams_execute" href="#streams_execute">Application Configuration
and Execution</a></h3>
@@ -670,21 +670,21 @@
         set the necessary parameters, and construct a <code>StreamsConfig</code>
instance from the <code>Properties</code> instance.
         </p>
 
-        <pre>
-            import java.util.Properties;
-            import org.apache.kafka.streams.StreamsConfig;
+<pre>
+import java.util.Properties;
+import org.apache.kafka.streams.StreamsConfig;
 
-            Properties settings = new Properties();
-            // Set a few key parameters
-            settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application");
-            settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
-            settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper1:2181");
-            // Any further settings
-            settings.put(... , ...);
+Properties settings = new Properties();
+// Set a few key parameters
+settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application");
+settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
+settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper1:2181");
+// Any further settings
+settings.put(... , ...);
 
-            // Create an instance of StreamsConfig from the Properties instance
-            StreamsConfig config = new StreamsConfig(settings);
-        </pre>
+// Create an instance of StreamsConfig from the Properties instance
+StreamsConfig config = new StreamsConfig(settings);
+</pre>
 
         <p>
         Apart from Kafka Streams' own configuration parameters you can also specify parameters
for the Kafka consumers and producers that are used internally,
@@ -694,24 +694,24 @@
         If you want to set different values for consumer and producer for such a parameter,
you can prefix the parameter name with <code>consumer.</code> or <code>producer.</code>:
         </p>
 
-        <pre>
-            Properties settings = new Properties();
-            // Example of a "normal" setting for Kafka Streams
-            settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
+<pre>
+Properties settings = new Properties();
+// Example of a "normal" setting for Kafka Streams
+settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
 
-            // Customize the Kafka consumer settings
-            streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
+// Customize the Kafka consumer settings
+streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
 
-            // Customize a common client setting for both consumer and producer
-            settings.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 100L);
+// Customize a common client setting for both consumer and producer
+settings.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 100L);
 
-            // Customize different values for consumer and producer
-            settings.put("consumer." + ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024);
-            settings.put("producer." + ProducerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);
-            // Alternatively, you can use
-            settings.put(StreamsConfig.consumerPrefix(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
1024 * 1024);
-            settings.put(StremasConfig.producerConfig(ProducerConfig.RECEIVE_BUFFER_CONFIG),
64 * 1024);
-        </pre>
+// Customize different values for consumer and producer
+settings.put("consumer." + ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024);
+settings.put("producer." + ProducerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);
+// Alternatively, you can use
+settings.put(StreamsConfig.consumerPrefix(ConsumerConfig.RECEIVE_BUFFER_CONFIG), 1024 * 1024);
+settings.put(StremasConfig.producerConfig(ProducerConfig.RECEIVE_BUFFER_CONFIG), 64 * 1024);
+</pre>
 
         <p>
         You can call Kafka Streams from anywhere in your application code.
@@ -724,68 +724,68 @@
         that is used to define a topology; The second argument is an instance of <code>StreamsConfig</code>
mentioned above.
         </p>
 
-        <pre>
-            import org.apache.kafka.streams.KafkaStreams;
-            import org.apache.kafka.streams.StreamsConfig;
-            import org.apache.kafka.streams.kstream.KStreamBuilder;
-            import org.apache.kafka.streams.processor.TopologyBuilder;
+<pre>
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.TopologyBuilder;
 
-            // Use the builders to define the actual processing topology, e.g. to specify
-            // from which input topics to read, which stream operations (filter, map, etc.)
-            // should be called, and so on.
+// Use the builders to define the actual processing topology, e.g. to specify
+// from which input topics to read, which stream operations (filter, map, etc.)
+// should be called, and so on.
 
-            KStreamBuilder builder = ...;  // when using the Kafka Streams DSL
-            //
-            // OR
-            //
-            TopologyBuilder builder = ...; // when using the Processor API
+KStreamBuilder builder = ...;  // when using the Kafka Streams DSL
+//
+// OR
+//
+TopologyBuilder builder = ...; // when using the Processor API
 
-            // Use the configuration to tell your application where the Kafka cluster is,
-            // which serializers/deserializers to use by default, to specify security settings,
-            // and so on.
-            StreamsConfig config = ...;
+// Use the configuration to tell your application where the Kafka cluster is,
+// which serializers/deserializers to use by default, to specify security settings,
+// and so on.
+StreamsConfig config = ...;
 
-            KafkaStreams streams = new KafkaStreams(builder, config);
-        </pre>
+KafkaStreams streams = new KafkaStreams(builder, config);
+</pre>
 
         <p>
         At this point, internal structures have been initialized, but the processing is not
started yet. You have to explicitly start the Kafka Streams thread by calling the <code>start()</code>
method:
         </p>
 
-        <pre>
-            // Start the Kafka Streams instance
-            streams.start();
-        </pre>
+<pre>
+// Start the Kafka Streams instance
+streams.start();
+</pre>
 
         <p>
         To catch any unexpected exceptions, you may set an <code>java.lang.Thread.UncaughtExceptionHandler</code>
before you start the application. This handler is called whenever a stream thread is terminated
by an unexpected exception:
         </p>
 
-        <pre>
-            streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-                public uncaughtException(Thread t, throwable e) {
-                    // here you should examine the exception and perform an appropriate action!
-                }
-            );
-        </pre>
+<pre>
+streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+    public uncaughtException(Thread t, throwable e) {
+        // here you should examine the exception and perform an appropriate action!
+    }
+);
+</pre>
 
         <p>
         To stop the application instance call the <code>close()</code> method:
         </p>
 
-        <pre>
-            // Stop the Kafka Streams instance
-            streams.close();
-        </pre>
+<pre>
+// Stop the Kafka Streams instance
+streams.close();
+</pre>
 
         Now it's time to execute your application that uses the Kafka Streams library, which
can be run just like any other Java application – there is no special magic or requirement
on the side of Kafka Streams.
         For example, you can package your Java application as a fat jar file and then start
the application via:
 
-        <pre>
-            # Start the application in class `com.example.MyStreamsApp`
-            # from the fat jar named `path-to-app-fatjar.jar`.
-            $ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp
-        </pre>
+<pre>
+# Start the application in class `com.example.MyStreamsApp`
+# from the fat jar named `path-to-app-fatjar.jar`.
+$ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp
+</pre>
 
         <p>
         When the application instance starts running, the defined processor topology will
be initialized as one or more stream tasks that can be executed in parallel by the stream
threads within the instance.


Mime
View raw message