kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/3] kafka git commit: KAFKA-4144: Allow per stream/table timestamp extractor
Date Sat, 13 May 2017 04:39:07 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a1c8e7d94 -> 9198467eb


http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index e9c4ef9..3add508 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -126,7 +126,7 @@ public class StreamTaskTest {
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
                 setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
-                setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
+                setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
             }
         });
     }
@@ -365,8 +365,8 @@ public class StreamTaskTest {
 
         task.close();
 
-        task  = new StreamTask(taskId00, applicationId, partitions,
-                                                     topology, consumer, changelogReader,
config, streamsMetrics, stateDirectory, testCache, time, recordCollector);
+        task  = new StreamTask(taskId00, applicationId, Utils.mkSet(partition1),
+                topology, consumer, changelogReader, config, streamsMetrics, stateDirectory,
testCache, time, recordCollector);
         final int offset = 20;
         task.addRecords(partition1, Collections.singletonList(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), offset,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -445,8 +445,13 @@ public class StreamTaskTest {
                 return true;
             }
         };
+        Map<String, SourceNode> sourceByTopics =  new HashMap() { {
+                put(partition1.topic(), source1);
+                put(partition2.topic(), source2);
+            }
+        };
         final ProcessorTopology topology = new ProcessorTopology(Collections.<ProcessorNode>emptyList(),
-                                                                 Collections.<String,
SourceNode>emptyMap(),
+                                                                 sourceByTopics,
                                                                  Collections.<String,
SinkNode>emptyMap(),
                                                                  Collections.<StateStore>singletonList(inMemoryStore),
                                                                  Collections.singletonMap(storeName,
changelogTopic),
@@ -583,8 +588,8 @@ public class StreamTaskTest {
                                                                  Collections.<String,
String>emptyMap(),
                                                                  Collections.<StateStore>emptyList());
 
-        return new StreamTask(taskId00, applicationId, partitions,
-                              topology, consumer, changelogReader, config, streamsMetrics,
stateDirectory, testCache, time, recordCollector);
+        return new StreamTask(taskId00, applicationId, Utils.mkSet(partition1),
+                topology, consumer, changelogReader, config, streamsMetrics, stateDirectory,
testCache, time, recordCollector);
     }
 
     private Iterable<ConsumerRecord<byte[], byte[]>> records(final ConsumerRecord<byte[],
byte[]>... recs) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 5b44260..7abe4dd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -47,6 +47,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
+import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.util.Arrays;
@@ -140,7 +141,7 @@ public class StreamThreadTest {
                 setProperty(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
-                setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
+                setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
                 setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
             }
         };
@@ -344,6 +345,7 @@ public class StreamThreadTest {
                 .persistent()
                 .build()
         );
+        builder.addSource("source", TOPIC);
         final StreamsConfig config = new StreamsConfig(configProps());
         final MockClientSupplier mockClientSupplier = new MockClientSupplier();
         mockClientSupplier.consumer.assign(Arrays.asList(new TopicPartition(TOPIC, 0), new
TopicPartition(TOPIC, 1)));
@@ -683,7 +685,7 @@ public class StreamThreadTest {
 
     @Test
     public void shouldInjectSharedProducerForAllTasksUsingClientSupplierWhenEosDisabled()
{
-        final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
+        final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X").addSource("source1",
"someTopic");
         final StreamsConfig config = new StreamsConfig(configProps());
         final MockClientSupplier clientSupplier = new MockClientSupplier();
         final StreamThread thread = new StreamThread(
@@ -717,7 +719,7 @@ public class StreamThreadTest {
 
     @Test
     public void shouldInjectProducerPerTaskUsingClientSupplierForEoS() {
-        final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
+        final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X").addSource("source1",
"someTopic");
         final Properties properties = configProps();
         properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
         final StreamsConfig config = new StreamsConfig(properties);
@@ -756,7 +758,7 @@ public class StreamThreadTest {
 
     @Test
     public void shouldCloseAllTaskProducers() {
-        final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
+        final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X").addSource("source1",
"someTopic");
         final Properties properties = configProps();
         properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
         final StreamsConfig config = new StreamsConfig(properties);
@@ -790,7 +792,7 @@ public class StreamThreadTest {
 
     @Test
     public void shouldCloseThreadProducer() {
-        final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
+        final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X").addSource("source1",
"someTopic");
         final StreamsConfig config = new StreamsConfig(configProps());
         final MockClientSupplier clientSupplier = new MockClientSupplier();
         final StreamThread thread = new StreamThread(
@@ -993,6 +995,13 @@ public class StreamThreadTest {
             }
         });
 
+        StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
+        Field updatedTopicsField  = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
+        updatedTopicsField.setAccessible(true);
+        Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates);
+        updatedTopics.add(t1.topic());
+        builder.updateSubscriptions(subscriptionUpdates, null);
+
         // should create task for id 0_0 with a single partition
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
         thread.rebalanceListener.onPartitionsAssigned(task00Partitions);
@@ -1002,6 +1011,8 @@ public class StreamThreadTest {
 
         // update assignment for the task 0_0 so it now has 2 partitions
         task00Partitions.add(new TopicPartition("t2", 0));
+        updatedTopics.add("t2");
+
         thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
         thread.rebalanceListener.onPartitionsAssigned(task00Partitions);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 9a60197..b4598fd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -221,9 +221,9 @@ public class KeyValueStoreTestDriver<K, V> {
         props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-id");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
-        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, serdes.keySerde().getClass());
-        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, serdes.valueSerde().getClass());
+        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, serdes.keySerde().getClass());
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, serdes.valueSerde().getClass());
         props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, RocksDBKeyValueStoreTest.TheRocksDbConfigSetter.class);
 
         context = new MockProcessorContext(stateDir, serdes.keySerde(), serdes.valueSerde(),
recordCollector, null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index dec3718..f8b17b2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -68,11 +68,12 @@ public class StreamThreadStateStoreProviderTest {
     private StateDirectory stateDirectory;
     private File stateDir;
     private boolean storesAvailable;
+    private final String topicName = "topic";
 
     @Before
     public void before() throws IOException {
         final TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource("the-source", "the-source");
+        builder.addSource("the-source", topicName);
         builder.addProcessor("the-processor", new MockProcessorSupplier(), "the-source");
         builder.addStateStore(Stores.create("kv-store")
                                   .withStringKeys()
@@ -188,7 +189,7 @@ public class StreamThreadStateStoreProviderTest {
                                          final ProcessorTopology topology,
                                          final TaskId taskId) {
         return new StreamTask(taskId, applicationId, Collections
-                .singletonList(new TopicPartition("topic", taskId.partition)), topology,
+                .singletonList(new TopicPartition(topicName, taskId.partition)), topology,
                               clientSupplier.consumer,
                               new StoreChangelogReader(clientSupplier.restoreConsumer, Time.SYSTEM,
5000),
                               streamsConfig, new MockStreamsMetrics(new Metrics()), stateDirectory,
null, new MockTime(), new NoOpRecordCollector()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index 88d1ccc..c04b3d1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -55,8 +55,8 @@ public class BrokerCompatibilityTest {
         streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-system-test-broker-compatibility");
         streamsProperties.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
         streamsProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsProperties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        streamsProperties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
         final int timeout = 6000;
         streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
timeout);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 1e97e11..5d46ce0 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -92,7 +92,7 @@ import java.io.IOException;
  * StringDeserializer strDeserializer = new StringDeserializer();
  * Properties props = new Properties();
  * props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- * props.setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
+ * props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
  * props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
  * props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
  * props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index 1e5c1f8..30ec90a 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -39,8 +39,8 @@ public class StreamsTestUtils {
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
         streamsConfiguration.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
-        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, keySerdeClassName);
-        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, valueSerdeClassName);
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerdeClassName);
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerdeClassName);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);


Mime
View raw message