kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 0.10.1 updated: MINOR: Caching layer should forward record timestamp (#5423) (#5426)
Date Wed, 01 Aug 2018 22:21:42 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 0.10.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.10.1 by this push:
     new cb5c33d  MINOR: Caching layer should forward record timestamp (#5423) (#5426)
cb5c33d is described below

commit cb5c33d55e7f80ddb9933fb411f5cd2da1377f4f
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Thu Jul 26 13:10:24 2018 -0700

    MINOR: Caching layer should forward record timestamp (#5423) (#5426)
    
    Reviewer: Guozhang Wang <guozhang@confluent.io>
---
 .../state/internals/CachingWindowStore.java        |  2 +-
 .../KStreamAggregationIntegrationTest.java         | 79 ++++++++++++++--------
 .../integration/utils/IntegrationTestUtils.java    | 78 +++++++++++++++++++++
 3 files changed, 130 insertions(+), 29 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 304a206..8d631e1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -145,7 +145,7 @@ class CachingWindowStore<K, V> implements WindowStore<K, V>,
CachedStateStore<Wi
     public synchronized void put(final K key, final V value, final long timestamp) {
         final byte[] binaryKey = WindowStoreUtils.toBinaryKey(key, timestamp, 0, serdes);
         final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), true, context.offset(),
-                                                      timestamp, context.partition(), context.topic());
+                                                      context.timestamp(), context.partition(),
context.topic());
         cache.put(name, binaryKey, entry);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 383a793..8f0b63a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -185,6 +185,15 @@ public class KStreamAggregationIntegrationTest {
         return keyComparison;
     }
 
+    private static <K extends Comparable, V extends Comparable> int compareIgnoreTimestamp(final
KeyValue<K, KeyValue<V, Long>> o1,
+                                                                                        
  final KeyValue<K, KeyValue<V, Long>> o2) {
+        final int keyComparison = o1.key.compareTo(o2.key);
+        if (keyComparison == 0) {
+            return o1.value.key.compareTo(o2.value.key);
+        }
+        return keyComparison;
+    }
+
     @Test
     public void shouldReduceWindowed() throws Exception {
         final long firstBatchTimestamp = mockTime.milliseconds();
@@ -310,18 +319,18 @@ public class KStreamAggregationIntegrationTest {
 
         startStreams();
 
-        final List<KeyValue<String, Integer>> windowedMessages = receiveMessages(
+        final List<KeyValue<String, KeyValue<Integer, Long>>> windowedMessages
= receiveMessagesWithTimestamp(
             new StringDeserializer(),
             new IntegerDeserializer(),
             15);
 
-        final Comparator<KeyValue<String, Integer>>
+        final Comparator<KeyValue<String, KeyValue<Integer, Long>>>
             comparator =
-            new Comparator<KeyValue<String, Integer>>() {
+            new Comparator<KeyValue<String, KeyValue<Integer, Long>>>()
{
                 @Override
-                public int compare(final KeyValue<String, Integer> o1,
-                                   final KeyValue<String, Integer> o2) {
-                    return KStreamAggregationIntegrationTest.compare(o1, o2);
+                public int compare(final KeyValue<String, KeyValue<Integer, Long>>
o1,
+                                   final KeyValue<String, KeyValue<Integer, Long>>
o2) {
+                    return KStreamAggregationIntegrationTest.compareIgnoreTimestamp(o1, o2);
                 }
             };
 
@@ -332,21 +341,21 @@ public class KStreamAggregationIntegrationTest {
 
         assertThat(windowedMessages, is(
             Arrays.asList(
-                new KeyValue<>("A@" + firstWindow, 1),
-                new KeyValue<>("A@" + secondWindow, 1),
-                new KeyValue<>("A@" + secondWindow, 2),
-                new KeyValue<>("B@" + firstWindow, 1),
-                new KeyValue<>("B@" + secondWindow, 1),
-                new KeyValue<>("B@" + secondWindow, 2),
-                new KeyValue<>("C@" + firstWindow, 1),
-                new KeyValue<>("C@" + secondWindow, 1),
-                new KeyValue<>("C@" + secondWindow, 2),
-                new KeyValue<>("D@" + firstWindow, 1),
-                new KeyValue<>("D@" + secondWindow, 1),
-                new KeyValue<>("D@" + secondWindow, 2),
-                new KeyValue<>("E@" + firstWindow, 1),
-                new KeyValue<>("E@" + secondWindow, 1),
-                new KeyValue<>("E@" + secondWindow, 2)
+                new KeyValue<>("A@" + firstWindow, KeyValue.pair(1, firstTimestamp)),
+                new KeyValue<>("A@" + secondWindow, KeyValue.pair(1, secondTimestamp)),
+                new KeyValue<>("A@" + secondWindow, KeyValue.pair(2, secondTimestamp)),
+                new KeyValue<>("B@" + firstWindow, KeyValue.pair(1, firstTimestamp)),
+                new KeyValue<>("B@" + secondWindow, KeyValue.pair(1, secondTimestamp)),
+                new KeyValue<>("B@" + secondWindow, KeyValue.pair(2, secondTimestamp)),
+                new KeyValue<>("C@" + firstWindow, KeyValue.pair(1, firstTimestamp)),
+                new KeyValue<>("C@" + secondWindow, KeyValue.pair(1, secondTimestamp)),
+                new KeyValue<>("C@" + secondWindow, KeyValue.pair(2, secondTimestamp)),
+                new KeyValue<>("D@" + firstWindow, KeyValue.pair(1, firstTimestamp)),
+                new KeyValue<>("D@" + secondWindow, KeyValue.pair(1, secondTimestamp)),
+                new KeyValue<>("D@" + secondWindow, KeyValue.pair(2, secondTimestamp)),
+                new KeyValue<>("E@" + firstWindow, KeyValue.pair(1, firstTimestamp)),
+                new KeyValue<>("E@" + secondWindow, KeyValue.pair(1, secondTimestamp)),
+                new KeyValue<>("E@" + secondWindow, KeyValue.pair(2, secondTimestamp))
             )));
     }
 
@@ -462,16 +471,13 @@ public class KStreamAggregationIntegrationTest {
         kafkaStreams.start();
     }
 
-
-    private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
-                                                            keyDeserializer,
-                                                        final Deserializer<V>
-                                                            valueDeserializer,
+    private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
keyDeserializer,
+                                                        final Deserializer<V> valueDeserializer,
                                                         final int numMessages)
         throws InterruptedException {
+
         final Properties consumerProperties = new Properties();
-        consumerProperties
-            .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-"
+ testNo);
         consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
@@ -481,7 +487,24 @@ public class KStreamAggregationIntegrationTest {
             outputTopic,
             numMessages,
             60 * 1000);
+    }
 
+    private <K, V> List<KeyValue<K, KeyValue<V, Long>>> receiveMessagesWithTimestamp(final
Deserializer<K> keyDeserializer,
+                                                                                     final
Deserializer<V> valueDeserializer,
+                                                                                     final
int numMessages)
+        throws InterruptedException {
+
+        final Properties consumerProperties = new Properties();
+        consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-"
+ testNo);
+        consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
+        consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
+        return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
+            consumerProperties,
+            outputTopic,
+            numMessages,
+            60 * 1000);
     }
 
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 117e6ff..91a6ba2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -18,6 +18,8 @@
 package org.apache.kafka.streams.integration.utils;
 
 import kafka.utils.Time;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -213,6 +215,37 @@ public class IntegrationTestUtils {
         return accumData;
     }
 
+    /**
+     * Wait until enough data (key-value records) has been consumed.
+     *
+     * @param consumerConfig     Kafka Consumer configuration
+     * @param topic              Topic to consume from
+     * @param expectedNumRecords Minimum number of expected records
+     * @param waitTime           Upper bound in waiting time in milliseconds
+     * @return All the records consumed, or null if no records are consumed
+     * @throws AssertionError       if the given wait time elapses
+     */
+    public static <K, V> List<KeyValue<K, KeyValue<V, Long>>> waitUntilMinKeyValueWithTimestampRecordsReceived(final
Properties consumerConfig,
+                                                                                        
                      final String topic,
+                                                                                        
                      final int expectedNumRecords,
+                                                                                        
                      final long waitTime) throws InterruptedException {
+        final List<KeyValue<K, KeyValue<V, Long>>> accumData = new ArrayList<>();
+        try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
+            final TestCondition valuesRead = new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    final List<KeyValue<K, KeyValue<V, Long>>> readData
=
+                        readKeyValuesWithTimestamp(topic, consumer, waitTime, expectedNumRecords);
+                    accumData.addAll(readData);
+                    return accumData.size() >= expectedNumRecords;
+                }
+            };
+            final String conditionDetails = "Did not receive all " + expectedNumRecords +
" records from topic " + topic;
+            TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
+        }
+        return accumData;
+    }
+
     public static <V> List<V> waitUntilMinValuesRecordsReceived(final Properties
consumerConfig,
                                                                 final String topic,
                                                                 final int expectedNumRecords)
throws InterruptedException {
@@ -253,4 +286,49 @@ public class IntegrationTestUtils {
         return accumData;
     }
 
+    /**
+     * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to
read from
+     * are already configured in the consumer).
+     *
+     * @param topic          Kafka topic to read messages from
+     * @param consumer       Kafka consumer
+     * @param waitTime       Maximum wait time in milliseconds
+     * @param maxMessages    Maximum number of messages to read via the consumer
+     * @return The KeyValue elements retrieved via the consumer
+     */
+    private static <K, V> List<KeyValue<K, KeyValue<V, Long>>> readKeyValuesWithTimestamp(final
String topic,
+                                                                                        
 final Consumer<K, V> consumer,
+                                                                                        
 final long waitTime,
+                                                                                        
 final int maxMessages) {
+        final List<KeyValue<K, KeyValue<V, Long>>> consumedValues;
+        consumer.subscribe(Collections.singletonList(topic));
+        final int pollIntervalMs = 100;
+        consumedValues = new ArrayList<>();
+        int totalPollTimeMs = 0;
+        while (totalPollTimeMs < waitTime &&
+            continueConsuming(consumedValues.size(), maxMessages)) {
+            totalPollTimeMs += pollIntervalMs;
+            final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
+
+            for (final ConsumerRecord<K, V> record : records) {
+                consumedValues.add(new KeyValue<>(record.key(), KeyValue.pair(record.value(),
record.timestamp())));
+            }
+        }
+        return consumedValues;
+    }
+
+    /**
+     * Sets up a {@link KafkaConsumer} from a copy of the given configuration that has
+     * {@link ConsumerConfig#AUTO_OFFSET_RESET_CONFIG} set to "earliest" and {@link ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG}
+     * set to "true" to prevent missing events as well as repeat consumption.
+     * @param consumerConfig Consumer configuration
+     * @return Consumer
+     */
+    private static <K, V> KafkaConsumer<K, V> createConsumer(final Properties
consumerConfig) {
+        final Properties filtered = new Properties();
+        filtered.putAll(consumerConfig);
+        filtered.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        filtered.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+        return new KafkaConsumer<>(filtered);
+    }
 }


Mime
View raw message