kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 1.1 updated: MINOR: Caching layer should forward record timestamp (#5423) (#5426)
Date Thu, 26 Jul 2018 20:10:33 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 3b6e0f3  MINOR: Caching layer should forward record timestamp (#5423) (#5426)
3b6e0f3 is described below

commit 3b6e0f3380f790e2705311a04550568ff2777c99
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Thu Jul 26 13:10:24 2018 -0700

    MINOR: Caching layer should forward record timestamp (#5423) (#5426)
    
    Reviewer: Guozhang Wang <guozhang@confluent.io>
---
 .../state/internals/CachingSessionStore.java       |   2 +-
 .../state/internals/CachingWindowStore.java        |   2 +-
 .../KStreamAggregationIntegrationTest.java         | 154 ++++++++++++++-------
 .../integration/utils/IntegrationTestUtils.java    |  62 +++++++++
 4 files changed, 165 insertions(+), 55 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 022f6f3..cb0cb25 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -143,7 +143,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore
i
         validateStoreOpen();
         final Bytes binaryKey = SessionKeySerde.bytesToBinary(key);
         final LRUCacheEntry entry = new LRUCacheEntry(value, true, context.offset(),
-                                                      key.window().end(), context.partition(),
context.topic());
+                                                      context.timestamp(), context.partition(),
context.topic());
         cache.put(cacheName, cacheFunction.cacheKey(binaryKey), entry);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index a78978b..99c3e7f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -152,7 +152,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore
impl
         
         final Bytes keyBytes = WindowStoreUtils.toBinaryKey(key, timestamp, 0, bytesSerdes);
         final LRUCacheEntry entry = new LRUCacheEntry(value, true, context.offset(),
-                                                      timestamp, context.partition(), context.topic());
+                                                      context.timestamp(), context.partition(),
context.topic());
         cache.put(name, cacheFunction.cacheKey(keyBytes), entry);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 4527c19..3afded5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -45,11 +45,16 @@ import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlySessionStore;
+import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockMapper;
@@ -196,6 +201,15 @@ public class KStreamAggregationIntegrationTest {
         return keyComparison;
     }
 
+    private static <K extends Comparable, V extends Comparable> int compareIgnoreTimestamp(final
KeyValue<K, KeyValue<V, Long>> o1,
+                                                                                        
  final KeyValue<K, KeyValue<V, Long>> o2) {
+        final int keyComparison = o1.key.compareTo(o2.key);
+        if (keyComparison == 0) {
+            return o1.value.key.compareTo(o2.value.key);
+        }
+        return keyComparison;
+    }
+
     @Test
     public void shouldReduceWindowed() throws Exception {
         final long firstBatchTimestamp = mockTime.milliseconds();
@@ -325,18 +339,18 @@ public class KStreamAggregationIntegrationTest {
 
         startStreams();
 
-        final List<KeyValue<String, Integer>> windowedMessages = receiveMessages(
+        final List<KeyValue<String, KeyValue<Integer, Long>>> windowedMessages
= receiveMessagesWithTimestamp(
             new StringDeserializer(),
             new IntegerDeserializer(),
             15);
 
-        final Comparator<KeyValue<String, Integer>>
+        final Comparator<KeyValue<String, KeyValue<Integer, Long>>>
             comparator =
-            new Comparator<KeyValue<String, Integer>>() {
+            new Comparator<KeyValue<String, KeyValue<Integer, Long>>>()
{
                 @Override
-                public int compare(final KeyValue<String, Integer> o1,
-                                   final KeyValue<String, Integer> o2) {
-                    return KStreamAggregationIntegrationTest.compare(o1, o2);
+                public int compare(final KeyValue<String, KeyValue<Integer, Long>>
o1,
+                                   final KeyValue<String, KeyValue<Integer, Long>>
o2) {
+                    return KStreamAggregationIntegrationTest.compareIgnoreTimestamp(o1, o2);
                 }
             };
 
@@ -347,21 +361,21 @@ public class KStreamAggregationIntegrationTest {
 
         assertThat(windowedMessages, is(
             Arrays.asList(
-                new KeyValue<>("A@" + firstWindow, 1),
-                new KeyValue<>("A@" + secondWindow, 1),
-                new KeyValue<>("A@" + secondWindow, 2),
-                new KeyValue<>("B@" + firstWindow, 1),
-                new KeyValue<>("B@" + secondWindow, 1),
-                new KeyValue<>("B@" + secondWindow, 2),
-                new KeyValue<>("C@" + firstWindow, 1),
-                new KeyValue<>("C@" + secondWindow, 1),
-                new KeyValue<>("C@" + secondWindow, 2),
-                new KeyValue<>("D@" + firstWindow, 1),
-                new KeyValue<>("D@" + secondWindow, 1),
-                new KeyValue<>("D@" + secondWindow, 2),
-                new KeyValue<>("E@" + firstWindow, 1),
-                new KeyValue<>("E@" + secondWindow, 1),
-                new KeyValue<>("E@" + secondWindow, 2)
+                new KeyValue<>("A@" + firstWindow, KeyValue.pair(1, firstTimestamp)),
+                new KeyValue<>("A@" + secondWindow, KeyValue.pair(1, secondTimestamp)),
+                new KeyValue<>("A@" + secondWindow, KeyValue.pair(2, secondTimestamp)),
+                new KeyValue<>("B@" + firstWindow, KeyValue.pair(1, firstTimestamp)),
+                new KeyValue<>("B@" + secondWindow, KeyValue.pair(1, secondTimestamp)),
+                new KeyValue<>("B@" + secondWindow, KeyValue.pair(2, secondTimestamp)),
+                new KeyValue<>("C@" + firstWindow, KeyValue.pair(1, firstTimestamp)),
+                new KeyValue<>("C@" + secondWindow, KeyValue.pair(1, secondTimestamp)),
+                new KeyValue<>("C@" + secondWindow, KeyValue.pair(2, secondTimestamp)),
+                new KeyValue<>("D@" + firstWindow, KeyValue.pair(1, firstTimestamp)),
+                new KeyValue<>("D@" + secondWindow, KeyValue.pair(1, secondTimestamp)),
+                new KeyValue<>("D@" + secondWindow, KeyValue.pair(2, secondTimestamp)),
+                new KeyValue<>("E@" + firstWindow, KeyValue.pair(1, firstTimestamp)),
+                new KeyValue<>("E@" + secondWindow, KeyValue.pair(1, secondTimestamp)),
+                new KeyValue<>("E@" + secondWindow, KeyValue.pair(2, secondTimestamp))
             )));
     }
 
@@ -400,8 +414,9 @@ public class KStreamAggregationIntegrationTest {
     public void shouldCount() throws Exception {
         produceMessages(mockTime.milliseconds());
 
-        groupedStream.count("count-by-key")
-            .to(Serdes.String(), Serdes.Long(), outputTopic);
+        groupedStream.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count-by-key"))
+                .toStream()
+                .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
         shouldCountHelper();
     }
@@ -524,30 +539,51 @@ public class KStreamAggregationIntegrationTest {
                         new Properties()),
                 t4);
 
-        final Map<Windowed<String>, Long> results = new HashMap<>();
+        final Map<Windowed<String>, KeyValue<Long, Long>> results = new
HashMap<>();
         final CountDownLatch latch = new CountDownLatch(11);
 
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
                 .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
                 .count(SessionWindows.with(sessionGap).until(maintainMillis))
                 .toStream()
-                .foreach(new ForeachAction<Windowed<String>, Long>() {
+                .transform(new TransformerSupplier<Windowed<String>, Long, KeyValue<Object,
Object>>() {
                     @Override
-                    public void apply(final Windowed<String> key, final Long value)
{
-                        results.put(key, value);
-                        latch.countDown();
+                    public Transformer<Windowed<String>, Long, KeyValue<Object,
Object>> get() {
+                        return new Transformer<Windowed<String>, Long, KeyValue<Object,
Object>>() {
+                            private ProcessorContext context;
+
+                            @Override
+                            public void init(final ProcessorContext context) {
+                                this.context = context;
+                            }
+
+                            @Override
+                            public KeyValue<Object, Object> transform(final Windowed<String>
key, final Long value) {
+                                results.put(key, KeyValue.pair(value, context.timestamp()));
+                                latch.countDown();
+                                return null;
+                            }
+
+                            @Override
+                            public KeyValue<Object, Object> punctuate(final long timestamp)
{
+                                return null;
+                            }
+
+                            @Override
+                            public void close() {}
+                        };
                     }
                 });
 
         startStreams();
         latch.await(30, TimeUnit.SECONDS);
-        assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo(1L));
-        assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, t1))),
equalTo(1L));
-        assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, t1))), equalTo(1L));
-        assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4, t4))), equalTo(1L));
-        assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, t2))),
equalTo(2L));
-        assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo(2L));
-        assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))),
equalTo(1L));
+        assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo(KeyValue.pair(1L,
t1)));
+        assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, t1))),
equalTo(KeyValue.pair(1L, t1)));
+        assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, t1))), equalTo(KeyValue.pair(1L,
t1)));
+        assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4, t4))), equalTo(KeyValue.pair(1L,
t4)));
+        assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, t2))),
equalTo(KeyValue.pair(2L, t2)));
+        assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo(KeyValue.pair(2L,
t4)));
+        assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))),
equalTo(KeyValue.pair(1L, t3)));
     }
 
     @SuppressWarnings("deprecation")
@@ -617,19 +653,20 @@ public class KStreamAggregationIntegrationTest {
         final String userSessionsStore = "UserSessionsStore";
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
                 .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+                .windowedBy(SessionWindows.with(sessionGap).until(maintainMillis))
                 .reduce(new Reducer<String>() {
                     @Override
                     public String apply(final String value1, final String value2) {
                         return value1 + ":" + value2;
                     }
-                }, SessionWindows.with(sessionGap).until(maintainMillis), userSessionsStore)
-                .foreach(new ForeachAction<Windowed<String>, String>() {
-                    @Override
-                    public void apply(final Windowed<String> key, final String value)
{
-                        results.put(key, value);
-                        latch.countDown();
-                    }
-                });
+                }, Materialized.<String, String, SessionStore<Bytes, byte[]>>as(userSessionsStore).withValueSerde(Serdes.String()))
+            .foreach(new ForeachAction<Windowed<String>, String>() {
+                @Override
+                public void apply(final Windowed<String> key, final String value) {
+                    results.put(key, value);
+                    latch.countDown();
+                }
+            });
 
         startStreams();
         latch.await(30, TimeUnit.SECONDS);
@@ -650,10 +687,8 @@ public class KStreamAggregationIntegrationTest {
         assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t1,
t1)), "start")));
         assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t3,
t4)), "pause:resume")));
         assertFalse(bob.hasNext());
-
     }
 
-
     private void produceMessages(final long timestamp) throws Exception {
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
             streamOneInput,
@@ -671,7 +706,6 @@ public class KStreamAggregationIntegrationTest {
             timestamp);
     }
 
-
     private void createTopics() throws InterruptedException {
         streamOneInput = "stream-one-" + testNo;
         outputTopic = "output-" + testNo;
@@ -685,16 +719,13 @@ public class KStreamAggregationIntegrationTest {
         kafkaStreams.start();
     }
 
-
-    private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
-                                                            keyDeserializer,
-                                                        final Deserializer<V>
-                                                            valueDeserializer,
+    private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
keyDeserializer,
+                                                        final Deserializer<V> valueDeserializer,
                                                         final int numMessages)
         throws InterruptedException {
+
         final Properties consumerProperties = new Properties();
-        consumerProperties
-            .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-"
+ testNo);
         consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
@@ -704,7 +735,24 @@ public class KStreamAggregationIntegrationTest {
             outputTopic,
             numMessages,
             60 * 1000);
+    }
 
+    private <K, V> List<KeyValue<K, KeyValue<V, Long>>> receiveMessagesWithTimestamp(final
Deserializer<K> keyDeserializer,
+                                                                                     final
Deserializer<V> valueDeserializer,
+                                                                                     final
int numMessages)
+        throws InterruptedException {
+
+        final Properties consumerProperties = new Properties();
+        consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-"
+ testNo);
+        consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
+        consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
+        return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
+            consumerProperties,
+            outputTopic,
+            numMessages,
+            60 * 1000);
     }
 
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 304a3e5..10d4083 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -217,6 +217,37 @@ public class IntegrationTestUtils {
         return accumData;
     }
 
+    /**
+     * Wait until enough data (key-value records) has been consumed.
+     *
+     * @param consumerConfig     Kafka Consumer configuration
+     * @param topic              Topic to consume from
+     * @param expectedNumRecords Minimum number of expected records
+     * @param waitTime           Upper bound in waiting time in milliseconds
+     * @return All the records consumed, or null if no records are consumed
+     * @throws AssertionError       if the given wait time elapses
+     */
+    public static <K, V> List<KeyValue<K, KeyValue<V, Long>>> waitUntilMinKeyValueWithTimestampRecordsReceived(final
Properties consumerConfig,
+                                                                                        
                      final String topic,
+                                                                                        
                      final int expectedNumRecords,
+                                                                                        
                      final long waitTime) throws InterruptedException {
+        final List<KeyValue<K, KeyValue<V, Long>>> accumData = new ArrayList<>();
+        try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
+            final TestCondition valuesRead = new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    final List<KeyValue<K, KeyValue<V, Long>>> readData
=
+                        readKeyValuesWithTimestamp(topic, consumer, waitTime, expectedNumRecords);
+                    accumData.addAll(readData);
+                    return accumData.size() >= expectedNumRecords;
+                }
+            };
+            final String conditionDetails = "Did not receive all " + expectedNumRecords +
" records from topic " + topic;
+            TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
+        }
+        return accumData;
+    }
+
     public static <V> List<V> waitUntilMinValuesRecordsReceived(final Properties
consumerConfig,
                                                                 final String topic,
                                                                 final int expectedNumRecords)
throws InterruptedException {
@@ -382,6 +413,37 @@ public class IntegrationTestUtils {
         return consumedValues;
     }
 
+    /**
+     * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to
read from
+     * are already configured in the consumer).
+     *
+     * @param topic          Kafka topic to read messages from
+     * @param consumer       Kafka consumer
+     * @param waitTime       Maximum wait time in milliseconds
+     * @param maxMessages    Maximum number of messages to read via the consumer
+     * @return The KeyValue elements retrieved via the consumer
+     */
+    private static <K, V> List<KeyValue<K, KeyValue<V, Long>>> readKeyValuesWithTimestamp(final
String topic,
+                                                                                        
 final Consumer<K, V> consumer,
+                                                                                        
 final long waitTime,
+                                                                                        
 final int maxMessages) {
+        final List<KeyValue<K, KeyValue<V, Long>>> consumedValues;
+        consumer.subscribe(Collections.singletonList(topic));
+        final int pollIntervalMs = 100;
+        consumedValues = new ArrayList<>();
+        int totalPollTimeMs = 0;
+        while (totalPollTimeMs < waitTime &&
+            continueConsuming(consumedValues.size(), maxMessages)) {
+            totalPollTimeMs += pollIntervalMs;
+            final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
+
+            for (final ConsumerRecord<K, V> record : records) {
+                consumedValues.add(new KeyValue<>(record.key(), KeyValue.pair(record.value(),
record.timestamp())));
+            }
+        }
+        return consumedValues;
+    }
+
     private static boolean continueConsuming(final int messagesConsumed, final int maxMessages)
{
         return maxMessages <= 0 || messagesConsumed < maxMessages;
     }


Mime
View raw message