kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-5037 Follow-up: move Scala test to Java (#5399)
Date Fri, 20 Jul 2018 17:38:01 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 75825ca  KAFKA-5037 Follow-up: move Scala test to Java (#5399)
75825ca is described below

commit 75825caee4eba332808fe9bb8575920b01d99af4
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Fri Jul 20 10:37:56 2018 -0700

    KAFKA-5037 Follow-up: move Scala test to Java (#5399)
    
    Reviewers: Ted Yu <yuzhihong@gmail.com>, Bill Bejeck <bill@confluent.io>,
Matthias J. Sax <matthias@confluent.io>
---
 .../StreamTableJoinIntegrationTest.java            | 30 ++++++++
 ...JoinWithIncompleteMetadataIntegrationTest.scala | 88 ----------------------
 2 files changed, 30 insertions(+), 88 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
index 5fc768d..61bbb8b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
@@ -16,11 +16,14 @@
  */
 package org.apache.kafka.streams.integration;
 
+import org.apache.kafka.streams.KafkaStreamsWrapper;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -31,6 +34,8 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import static org.junit.Assert.assertEquals;
+
 
 /**
  * Tests all available joins of Kafka Streams DSL.
@@ -57,6 +62,31 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest
     }
 
     @Test
+    public void testShouldAutoShutdownOnIncompleteMetadata() throws InterruptedException
{
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-incomplete");
+
+        final KStream<Long, String> notExistStream = builder.stream(INPUT_TOPIC_LEFT
+ "-not-existed");
+
+        final KTable<Long, String> aggregatedTable = notExistStream.leftJoin(rightTable,
valueJoiner)
+                .groupBy((key, value) -> key)
+                .reduce((value1, value2) -> value1 + value2);
+
+        // Write the (continuously updating) results to the output topic.
+        aggregatedTable.toStream().to(OUTPUT_TOPIC);
+
+        final KafkaStreamsWrapper streams = new KafkaStreamsWrapper(builder.build(), STREAMS_CONFIG);
+        final IntegrationTestUtils.StateListenerStub listener = new IntegrationTestUtils.StateListenerStub();
+        streams.setStreamThreadStateListener(listener);
+        streams.start();
+
+        TestUtils.waitForCondition(listener::revokedToPendingShutdownSeen, "Did not seen
thread state transited to PENDING_SHUTDOWN");
+
+        streams.close();
+        assertEquals(listener.runningToRevokedSeen(), true);
+        assertEquals(listener.revokedToPendingShutdownSeen(), true);
+    }
+
+    @Test
     public void testInner() throws Exception {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner");
 
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinWithIncompleteMetadataIntegrationTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinWithIncompleteMetadataIntegrationTest.scala
deleted file mode 100644
index 3bf5977..0000000
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinWithIncompleteMetadataIntegrationTest.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
- * Copyright (C) 2017-2018 Alexis Seigneurin.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.scala
-
-import java.util.Properties
-
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.common.serialization._
-import org.apache.kafka.common.utils.MockTime
-import org.apache.kafka.streams._
-import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils}
-import org.apache.kafka.streams.processor.internals.StreamThread
-import org.apache.kafka.streams.scala.ImplicitConversions._
-import org.apache.kafka.streams.scala.kstream._
-import org.apache.kafka.test.TestUtils
-import org.junit.Assert._
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.scalatest.junit.JUnitSuite
-
-/**
- * Test suite that verifies the shutdown of StreamThread when metadata is incomplete during
stream-table joins in Kafka Streams
- * <p>
- */
-class StreamToTableJoinWithIncompleteMetadataIntegrationTest extends StreamToTableJoinScalaIntegrationTestBase
{
-
-  @Test def testShouldAutoShutdownOnIncompleteMetadata(): Unit = {
-
-    // DefaultSerdes brings into scope implicit serdes (mostly for primitives) that will
set up all Serialized, Produced,
-    // Consumed and Joined instances. So all APIs below that accept Serialized, Produced,
Consumed or Joined will
-    // get these instances automatically
-    import Serdes._
-
-    val streamsConfiguration: Properties = getStreamsConfiguration()
-
-    val builder = new StreamsBuilder()
-
-    val userClicksStream: KStream[String, Long] = builder.stream(userClicksTopic)
-
-    val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic + "1")
-
-    // Compute the total per region by summing the individual click counts per region.
-    val clicksPerRegion: KTable[String, Long] =
-      userClicksStream
-
-      // Join the stream against the table.
-        .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN"
else region, clicks))
-
-        // Change the stream from <user> -> <region, clicks> to <region>
-> <clicks>
-        .map((_, regionWithClicks) => regionWithClicks)
-
-        // Compute the total per region by summing the individual click counts per region.
-        .groupByKey
-        .reduce(_ + _)
-
-    // Write the (continuously updating) results to the output topic.
-    clicksPerRegion.toStream.to(outputTopic)
-
-    val streams: KafkaStreamsWrapper = new KafkaStreamsWrapper(builder.build(), streamsConfiguration)
-    val listener = new IntegrationTestUtils.StateListenerStub()
-    streams.setStreamThreadStateListener(listener)
-    streams.start()
-
-    val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
-      produceNConsume(userClicksTopic, userRegionsTopic, outputTopic, false)
-    while (!listener.revokedToPendingShutdownSeen()) {
-      Thread.sleep(3)
-    }
-    streams.close()
-    assertEquals(listener.runningToRevokedSeen(), true)
-    assertEquals(listener.revokedToPendingShutdownSeen(), true)
-  }
-}


Mime
View raw message