kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 2.8 updated: MINOR: update the branch(split) doc and java doc and tests (#11195)
Date Tue, 10 Aug 2021 20:44:53 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new de416f6  MINOR: update the branch(split) doc and java doc and tests (#11195)
de416f6 is described below

commit de416f6b1c1b115dd4875a500f5bd7cb0c34fb00
Author: Luke Chen <showuon@gmail.com>
AuthorDate: Wed Aug 11 04:37:59 2021 +0800

    MINOR: update the branch(split) doc and java doc and tests (#11195)
    
    Reviewers: Ivan Ponomarev <iponomarev@mail.ru>, Matthias J. Sax <matthias@confluent.io>
---
 docs/streams/developer-guide/dsl-api.html          | 11 ++++---
 .../org/apache/kafka/streams/kstream/Branched.java |  1 +
 .../kafka/streams/kstream/BranchedKStream.java     |  7 +++--
 .../org/apache/kafka/streams/kstream/KStream.java  | 16 ++++++----
 .../kstream/internals/KStreamSplitTest.java        | 36 +++++++++++++---------
 5 files changed, 42 insertions(+), 29 deletions(-)

diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html
index 84fe4e7ff..04c63ed 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -362,15 +362,16 @@
                     <tbody valign="top">
                     <tr class="row-even"><td><p class="first"><strong>Branch</strong></p>
                         <ul class="last simple">
-                            <li>KStream &rarr; KStream[]</li>
+                            <li>KStream &rarr; BranchedKStream</li>
                         </ul>
                     </td>
                         <td><p class="first">Branch (or split) a <code class="docutils
literal"><span class="pre">KStream</span></code> based on the supplied
predicates into one or more <code class="docutils literal"><span class="pre">KStream</span></code>
instances.
-                            (<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#split">details</a>)</p>
+                            (<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#split()">details</a>)</p>
                             <p>Predicates are evaluated in order.  A record is placed
to one and only one output stream on the first match:
-                                if the n-th predicate evaluates to true, the record is placed
to n-th stream.  If no predicate matches, the
-                                the record is dropped.</p>
+                                if the n-th predicate evaluates to true, the record is placed
to n-th stream. If a record does not match any predicates,
+                                it will be routed to the default branch, or dropped if no
default branch is created.</p>
                             <p>Branching is useful, for example, to route records to
different downstream topics.</p>
+<<<<<<< HEAD
                             <pre class="brush: java;">
 KStream<String, Long> stream = ...;
 Map<String, KStream<String, Long>> branches =
@@ -379,7 +380,7 @@ Map<String, KStream<String, Long>> branches =
              Branched.as("A"))
         .branch((key, value) -> key.startsWith("B"),  /* second predicate */
              Branched.as("B"))
-.defaultBranch(Branched.as("C"))
+        .defaultBranch(Branched.as("C"))              /* default branch */
 );
 
 // KStream branches.get("Branch-A") contains all records whose keys start with "A"
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java
index 713a510..3e85a73 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java
@@ -143,6 +143,7 @@ public class Branched<K, V> implements NamedOperation<Branched<K,
V>> {
      */
     @Override
     public Branched<K, V> withName(final String name) {
+        Objects.requireNonNull(name, "name cannot be null");
         return new Branched<>(name, chainFunction, chainConsumer);
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java
index 8bde42b..dfa7328 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java
@@ -22,7 +22,7 @@ import java.util.Map;
  * Branches the records in the original stream based on the predicates supplied for the branch
definitions.
  * <p>
  * Branches are defined with {@link BranchedKStream#branch(Predicate, Branched)} or
- * {@link BranchedKStream#defaultBranch(Branched)} methods. Each record is evaluated against
the predicates
+ * {@link BranchedKStream#defaultBranch(Branched)} methods. Each record is evaluated against
the {@code predicate}
  * supplied via {@link Branched} parameters, and is routed to the first branch for which
its respective predicate
  * evaluates to {@code true}. If a record does not match any predicates, it will be routed
to the default branch,
  * or dropped if no default branch is created.
@@ -50,7 +50,7 @@ import java.util.Map;
  *     {@link Branched} parameter, its value is appended to the prefix to form the {@code
Map} key
  *     <li>If a name is not provided for the branch, then the key defaults to {@code
prefix + position} of the branch
  *     as a decimal number, starting from {@code "1"}
- *     <li>If a name is not provided for the {@link BranchedKStream#defaultBranch()}
call, then the key defaults
+ *     <li>If a name is not provided for the {@link BranchedKStream#defaultBranch()},
then the key defaults
  *     to {@code prefix + "0"}
  * </ul>
  * The values of the respective {@code Map<Stream, KStream<K, V>>} entries are
formed as following:
@@ -69,7 +69,8 @@ import java.util.Map;
  *     .branch(predicate1, Branched.as("bar"))                    // "foo-bar"
  *     .branch(predicate2, Branched.withConsumer(ks->ks.to("A"))  // no entry: a Consumer
is provided
  *     .branch(predicate3, Branched.withFunction(ks->null))       // no entry: chain function
returns null
- *     .branch(predicate4)                                        // "foo-4": name defaults
to the branch position
+ *     .branch(predicate4, Branched.withFunction(ks->ks))         // "foo-4": chain function
returns non-null value
+ *     .branch(predicate5)                                        // "foo-5": name defaults
to the branch position
  *     .defaultBranch()                                           // "foo-0": "0" is the
default name for the default branch
  * }</pre>
  *
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 0586efc..95a5260 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -782,18 +782,22 @@ public interface KStream<K, V> {
     KStream<K, V>[] branch(final Named named, final Predicate<? super K, ? super
V>... predicates);
 
     /**
-     * Split this stream. {@link BranchedKStream} can be used for routing the records to
different branches depending
-     * on evaluation against the supplied predicates.
-     * Stream branching is a stateless record-by-record operation.
+     * Split this stream into different branches. The returned {@link BranchedKStream} instance
can be used for routing
+     * the records to different branches depending on evaluation against the supplied predicates.
+     * <p>
+     *     Note: Stream branching is a stateless record-by-record operation.
+     *     Please check {@link BranchedKStream} for detailed description and usage example
      *
      * @return {@link BranchedKStream} that provides methods for routing the records to different
branches.
      */
     BranchedKStream<K, V> split();
 
     /**
-     * Split this stream. {@link BranchedKStream} can be used for routing the records to
different branches depending
-     * on evaluation against the supplied predicates.
-     * Stream branching is a stateless record-by-record operation.
+     * Split this stream into different branches. The returned {@link BranchedKStream} instance
can be used for routing
+     * the records to different branches depending on evaluation against the supplied predicates.
+     * <p>
+     *     Note: Stream branching is a stateless record-by-record operation.
+     *     Please check {@link BranchedKStream} for detailed description and usage example
      *
      * @param named  a {@link Named} config used to name the processor in the topology and
also to set the name prefix
      *               for the resulting branches (see {@link BranchedKStream})
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSplitTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSplitTest.java
index 4b2e684..29eaf1a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSplitTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSplitTest.java
@@ -50,6 +50,7 @@ public class KStreamSplitTest {
     private final Predicate<Integer, String> isMultipleOfThree = (key, value) ->
(key % 3) == 0;
     private final Predicate<Integer, String> isMultipleOfFive = (key, value) ->
(key % 5) == 0;
     private final Predicate<Integer, String> isMultipleOfSeven = (key, value) ->
(key % 7) == 0;
+    private final Predicate<Integer, String> isNegative = (key, value) -> key <
0;
     private final KStream<Integer, String> source = builder.stream(topicName, Consumed.with(Serdes.Integer(),
Serdes.String()));
 
     @Test
@@ -68,14 +69,14 @@ public class KStreamSplitTest {
             final TestOutputTopic<Integer, String> x2 = driver.createOutputTopic("x2",
new IntegerDeserializer(), new StringDeserializer());
             final TestOutputTopic<Integer, String> x3 = driver.createOutputTopic("x3",
new IntegerDeserializer(), new StringDeserializer());
             final TestOutputTopic<Integer, String> x5 = driver.createOutputTopic("x5",
new IntegerDeserializer(), new StringDeserializer());
-            assertEquals(Arrays.asList("V2", "V4", "V6"), x2.readValuesToList());
+            assertEquals(Arrays.asList("V0", "V2", "V4", "V6"), x2.readValuesToList());
             assertEquals(Arrays.asList("V3"), x3.readValuesToList());
             assertEquals(Arrays.asList("V5"), x5.readValuesToList());
         });
     }
 
     private void withDriver(final Consumer<TopologyTestDriver> test) {
-        final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
+        final int[] expectedKeys = new int[]{-1, 0, 1, 2, 3, 4, 5, 6, 7};
         final Topology topology = builder.build();
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
             final TestInputTopic<Integer, String> inputTopic = driver.createInputTopic(topicName,
new IntegerSerializer(), new StringSerializer());
@@ -104,25 +105,29 @@ public class KStreamSplitTest {
                         // "foo-bar"
                         .branch(isEven, Branched.as("bar"))
                         // no entry: a Consumer is provided
-                        .branch(isMultipleOfThree, Branched.withConsumer(ks -> {
-                        }))
+                        .branch(isMultipleOfThree, Branched.withConsumer(ks -> { }))
                         // no entry: chain function returns null
                         .branch(isMultipleOfFive, Branched.withFunction(ks -> null))
-                        // "foo-4": name defaults to the branch position
+                        // "foo-4": chain function returns non-null value
+                        .branch(isNegative, Branched.withFunction(ks -> ks))
+                        // "foo-5": name defaults to the branch position
                         .branch(isMultipleOfSeven)
                         // "foo-0": "0" is the default name for the default branch
                         .defaultBranch();
-        assertEquals(3, branches.size());
-        branches.get("foo-bar").to("foo-bar");
-        branches.get("foo-4").to("foo-4");
-        branches.get("foo-0").to("foo-0");
+        assertEquals(4, branches.size());
+        // direct the branched streams into different topics named with branch name
+        for (final Map.Entry<String, KStream<Integer, String>> branch: branches.entrySet())
{
+            branch.getValue().to(branch.getKey());
+        }
         builder.build();
 
         withDriver(driver -> {
             final TestOutputTopic<Integer, String> even = driver.createOutputTopic("foo-bar",
new IntegerDeserializer(), new StringDeserializer());
-            final TestOutputTopic<Integer, String> x7 = driver.createOutputTopic("foo-4",
new IntegerDeserializer(), new StringDeserializer());
+            final TestOutputTopic<Integer, String> negative = driver.createOutputTopic("foo-4",
new IntegerDeserializer(), new StringDeserializer());
+            final TestOutputTopic<Integer, String> x7 = driver.createOutputTopic("foo-5",
new IntegerDeserializer(), new StringDeserializer());
             final TestOutputTopic<Integer, String> defaultBranch = driver.createOutputTopic("foo-0",
new IntegerDeserializer(), new StringDeserializer());
-            assertEquals(Arrays.asList("V2", "V4", "V6"), even.readValuesToList());
+            assertEquals(Arrays.asList("V0", "V2", "V4", "V6"), even.readValuesToList());
+            assertEquals(Arrays.asList("V-1"), negative.readValuesToList());
             assertEquals(Arrays.asList("V7"), x7.readValuesToList());
             assertEquals(Arrays.asList("V1"), defaultBranch.readValuesToList());
         });
@@ -130,14 +135,15 @@ public class KStreamSplitTest {
 
     @Test
     public void testBranchingWithNoTerminalOperation() {
+        final String outputTopicName = "output";
         source.split()
-                .branch(isEven, Branched.withConsumer(ks -> ks.to("output")))
-                .branch(isMultipleOfFive, Branched.withConsumer(ks -> ks.to("output")));
+                .branch(isEven, Branched.withConsumer(ks -> ks.to(outputTopicName)))
+                .branch(isMultipleOfFive, Branched.withConsumer(ks -> ks.to(outputTopicName)));
         builder.build();
         withDriver(driver -> {
             final TestOutputTopic<Integer, String> outputTopic =
-                    driver.createOutputTopic("output", new IntegerDeserializer(), new StringDeserializer());
-            assertEquals(Arrays.asList("V2", "V4", "V5", "V6"), outputTopic.readValuesToList());
+                    driver.createOutputTopic(outputTopicName, new IntegerDeserializer(),
new StringDeserializer());
+            assertEquals(Arrays.asList("V0", "V2", "V4", "V5", "V6"), outputTopic.readValuesToList());
         });
     }
 }

Mime
View raw message