kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 0.10.2 updated: MINOR: Caching layer should forward record timestamp (#5423) (#5426)
Date Wed, 01 Aug 2018 20:54:07 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 0.10.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.10.2 by this push:
     new 0ec25eb  MINOR: Caching layer should forward record timestamp (#5423) (#5426)
0ec25eb is described below

commit 0ec25ebdf19c9adcec3b8c9e00b8af4da55c49d9
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Thu Jul 26 13:10:24 2018 -0700

    MINOR: Caching layer should forward record timestamp (#5423) (#5426)
    
    Reviewer: Guozhang Wang <guozhang@confluent.io>
---
 .../state/internals/CachingSessionStore.java       |   2 +-
 .../state/internals/CachingWindowStore.java        |   2 +-
 .../KStreamAggregationIntegrationTest.java         | 131 ++++++++++++++-------
 .../integration/utils/IntegrationTestUtils.java    |  79 +++++++++++++
 4 files changed, 169 insertions(+), 45 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 80160b0..92ee3e6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -83,7 +83,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractWrappedState
         validateStoreOpen();
         final Bytes binaryKey = SessionKeySerde.toBinary(key, serdes.keySerializer(), topic);
         final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), true, context.offset(),
-                                                      key.window().end(), context.partition(),
context.topic());
+                                                      context.timestamp(), context.partition(),
context.topic());
         cache.put(cacheName, binaryKey.get(), entry);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 37ce336..28dee12 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -138,7 +138,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractWrappedStateSto
         validateStoreOpen();
         final byte[] binaryKey = WindowStoreUtils.toBinaryKey(key, timestamp, 0, serdes);
         final LRUCacheEntry entry = new LRUCacheEntry(serdes.rawValue(value), true, context.offset(),
-                                                      timestamp, context.partition(), context.topic());
+                                                      context.timestamp(), context.partition(),
context.topic());
         cache.put(name, binaryKey, entry);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 13124f1..5b36020 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -38,8 +38,11 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlySessionStore;
@@ -185,6 +188,15 @@ public class KStreamAggregationIntegrationTest {
         return keyComparison;
     }
 
+    private static <K extends Comparable, V extends Comparable> int compareIgnoreTimestamp(final
KeyValue<K, KeyValue<V, Long>> o1,
+                                                                                        
  final KeyValue<K, KeyValue<V, Long>> o2) {
+        final int keyComparison = o1.key.compareTo(o2.key);
+        if (keyComparison == 0) {
+            return o1.value.key.compareTo(o2.value.key);
+        }
+        return keyComparison;
+    }
+
     @Test
     public void shouldReduceWindowed() throws Exception {
         final long firstBatchTimestamp = mockTime.milliseconds();
@@ -310,18 +322,18 @@ public class KStreamAggregationIntegrationTest {
 
         startStreams();
 
-        final List<KeyValue<String, Integer>> windowedMessages = receiveMessages(
+        final List<KeyValue<String, KeyValue<Integer, Long>>> windowedMessages
= receiveMessagesWithTimestamp(
             new StringDeserializer(),
             new IntegerDeserializer(),
             15);
 
-        final Comparator<KeyValue<String, Integer>>
+        final Comparator<KeyValue<String, KeyValue<Integer, Long>>>
             comparator =
-            new Comparator<KeyValue<String, Integer>>() {
+            new Comparator<KeyValue<String, KeyValue<Integer, Long>>>()
{
                 @Override
-                public int compare(final KeyValue<String, Integer> o1,
-                                   final KeyValue<String, Integer> o2) {
-                    return KStreamAggregationIntegrationTest.compare(o1, o2);
+                public int compare(final KeyValue<String, KeyValue<Integer, Long>>
o1,
+                                   final KeyValue<String, KeyValue<Integer, Long>>
o2) {
+                    return KStreamAggregationIntegrationTest.compareIgnoreTimestamp(o1, o2);
                 }
             };
 
@@ -332,21 +344,21 @@ public class KStreamAggregationIntegrationTest {
 
         assertThat(windowedMessages, is(
             Arrays.asList(
-                new KeyValue<>("A@" + firstWindow, 1),
-                new KeyValue<>("A@" + secondWindow, 1),
-                new KeyValue<>("A@" + secondWindow, 2),
-                new KeyValue<>("B@" + firstWindow, 1),
-                new KeyValue<>("B@" + secondWindow, 1),
-                new KeyValue<>("B@" + secondWindow, 2),
-                new KeyValue<>("C@" + firstWindow, 1),
-                new KeyValue<>("C@" + secondWindow, 1),
-                new KeyValue<>("C@" + secondWindow, 2),
-                new KeyValue<>("D@" + firstWindow, 1),
-                new KeyValue<>("D@" + secondWindow, 1),
-                new KeyValue<>("D@" + secondWindow, 2),
-                new KeyValue<>("E@" + firstWindow, 1),
-                new KeyValue<>("E@" + secondWindow, 1),
-                new KeyValue<>("E@" + secondWindow, 2)
+                new KeyValue<>("A@" + firstWindow, KeyValue.pair(1, firstTimestamp)),
+                new KeyValue<>("A@" + secondWindow, KeyValue.pair(1, secondTimestamp)),
+                new KeyValue<>("A@" + secondWindow, KeyValue.pair(2, secondTimestamp)),
+                new KeyValue<>("B@" + firstWindow, KeyValue.pair(1, firstTimestamp)),
+                new KeyValue<>("B@" + secondWindow, KeyValue.pair(1, secondTimestamp)),
+                new KeyValue<>("B@" + secondWindow, KeyValue.pair(2, secondTimestamp)),
+                new KeyValue<>("C@" + firstWindow, KeyValue.pair(1, firstTimestamp)),
+                new KeyValue<>("C@" + secondWindow, KeyValue.pair(1, secondTimestamp)),
+                new KeyValue<>("C@" + secondWindow, KeyValue.pair(2, secondTimestamp)),
+                new KeyValue<>("D@" + firstWindow, KeyValue.pair(1, firstTimestamp)),
+                new KeyValue<>("D@" + secondWindow, KeyValue.pair(1, secondTimestamp)),
+                new KeyValue<>("D@" + secondWindow, KeyValue.pair(2, secondTimestamp)),
+                new KeyValue<>("E@" + firstWindow, KeyValue.pair(1, firstTimestamp)),
+                new KeyValue<>("E@" + secondWindow, KeyValue.pair(1, secondTimestamp)),
+                new KeyValue<>("E@" + secondWindow, KeyValue.pair(2, secondTimestamp))
             )));
     }
 
@@ -491,29 +503,50 @@ public class KStreamAggregationIntegrationTest {
                         new Properties()),
                 t4);
 
-        final Map<Windowed<String>, Long> results = new HashMap<>();
+        final Map<Windowed<String>, KeyValue<Long, Long>> results = new
HashMap<>();
         final CountDownLatch latch = new CountDownLatch(11);
         builder.stream(Serdes.String(), Serdes.String(), userSessionsStream)
                 .groupByKey(Serdes.String(), Serdes.String())
                 .count(SessionWindows.with(sessionGap).until(maintainMillis), "UserSessionsStore")
                 .toStream()
-                .foreach(new ForeachAction<Windowed<String>, Long>() {
+                .transform(new TransformerSupplier<Windowed<String>, Long, KeyValue<Object,
Object>>() {
                     @Override
-                    public void apply(final Windowed<String> key, final Long value)
{
-                        results.put(key, value);
-                        latch.countDown();
+                    public Transformer<Windowed<String>, Long, KeyValue<Object,
Object>> get() {
+                        return new Transformer<Windowed<String>, Long, KeyValue<Object,
Object>>() {
+                            private ProcessorContext context;
+
+                            @Override
+                            public void init(final ProcessorContext context) {
+                                this.context = context;
+                            }
+
+                            @Override
+                            public KeyValue<Object, Object> transform(final Windowed<String>
key, final Long value) {
+                                results.put(key, KeyValue.pair(value, context.timestamp()));
+                                latch.countDown();
+                                return null;
+                            }
+
+                            @Override
+                            public KeyValue<Object, Object> punctuate(final long timestamp)
{
+                                return null;
+                            }
+
+                            @Override
+                            public void close() {}
+                        };
                     }
                 });
 
         startStreams();
         latch.await(30, TimeUnit.SECONDS);
-        assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo(1L));
-        assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, t1))),
equalTo(1L));
-        assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, t1))), equalTo(1L));
-        assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4, t4))), equalTo(1L));
-        assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, t2))),
equalTo(2L));
-        assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo(2L));
-        assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))),
equalTo(1L));
+        assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo(KeyValue.pair(1L,
t1)));
+        assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, t1))),
equalTo(KeyValue.pair(1L, t1)));
+        assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, t1))), equalTo(KeyValue.pair(1L,
t1)));
+        assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4, t4))), equalTo(KeyValue.pair(1L,
t4)));
+        assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, t2))),
equalTo(KeyValue.pair(2L, t2)));
+        assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo(KeyValue.pair(2L,
t4)));
+        assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))),
equalTo(KeyValue.pair(1L, t3)));
     }
 
     @Test
@@ -588,6 +621,7 @@ public class KStreamAggregationIntegrationTest {
                         return value1 + ":" + value2;
                     }
                 }, SessionWindows.with(sessionGap).until(maintainMillis), userSessionsStore)
+                .toStream()
                 .foreach(new ForeachAction<Windowed<String>, String>() {
                     @Override
                     public void apply(final Windowed<String> key, final String value)
{
@@ -615,10 +649,8 @@ public class KStreamAggregationIntegrationTest {
         assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t1,
t1)), "start")));
         assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t3,
t4)), "pause:resume")));
         assertFalse(bob.hasNext());
-
     }
 
-
     private void produceMessages(final long timestamp)
         throws ExecutionException, InterruptedException {
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
@@ -637,7 +669,6 @@ public class KStreamAggregationIntegrationTest {
             timestamp);
     }
 
-
     private void createTopics() throws InterruptedException {
         streamOneInput = "stream-one-" + testNo;
         outputTopic = "output-" + testNo;
@@ -652,16 +683,13 @@ public class KStreamAggregationIntegrationTest {
         kafkaStreams.start();
     }
 
-
-    private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
-                                                            keyDeserializer,
-                                                        final Deserializer<V>
-                                                            valueDeserializer,
+    private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
keyDeserializer,
+                                                        final Deserializer<V> valueDeserializer,
                                                         final int numMessages)
         throws InterruptedException {
+
         final Properties consumerProperties = new Properties();
-        consumerProperties
-            .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-"
+ testNo);
         consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
@@ -671,7 +699,24 @@ public class KStreamAggregationIntegrationTest {
             outputTopic,
             numMessages,
             60 * 1000);
+    }
+
+    private <K, V> List<KeyValue<K, KeyValue<V, Long>>> receiveMessagesWithTimestamp(final
Deserializer<K> keyDeserializer,
+                                                                                     final
Deserializer<V> valueDeserializer,
+                                                                                     final
int numMessages)
+        throws InterruptedException {
 
+        final Properties consumerProperties = new Properties();
+        consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-"
+ testNo);
+        consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
+        consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
+        return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
+            consumerProperties,
+            outputTopic,
+            numMessages,
+            60 * 1000);
     }
 
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 38ef64d..c857bdd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -21,6 +21,8 @@ import kafka.api.PartitionStateInfo;
 import kafka.api.Request;
 import kafka.server.KafkaServer;
 import kafka.server.MetadataCache;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -222,6 +224,37 @@ public class IntegrationTestUtils {
         return accumData;
     }
 
+    /**
+     * Wait until enough data (key-value records) has been consumed.
+     *
+     * @param consumerConfig     Kafka Consumer configuration
+     * @param topic              Topic to consume from
+     * @param expectedNumRecords Minimum number of expected records
+     * @param waitTime           Upper bound in waiting time in milliseconds
+     * @return All the records consumed, or null if no records are consumed
+     * @throws AssertionError       if the given wait time elapses
+     */
+    public static <K, V> List<KeyValue<K, KeyValue<V, Long>>> waitUntilMinKeyValueWithTimestampRecordsReceived(final
Properties consumerConfig,
+                                                                                        
                      final String topic,
+                                                                                        
                      final int expectedNumRecords,
+                                                                                        
                      final long waitTime) throws InterruptedException {
+        final List<KeyValue<K, KeyValue<V, Long>>> accumData = new ArrayList<>();
+        try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
+            final TestCondition valuesRead = new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    final List<KeyValue<K, KeyValue<V, Long>>> readData
=
+                        readKeyValuesWithTimestamp(topic, consumer, waitTime, expectedNumRecords);
+                    accumData.addAll(readData);
+                    return accumData.size() >= expectedNumRecords;
+                }
+            };
+            final String conditionDetails = "Did not receive all " + expectedNumRecords +
" records from topic " + topic;
+            TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
+        }
+        return accumData;
+    }
+
     public static <V> List<V> waitUntilMinValuesRecordsReceived(final Properties
consumerConfig,
                                                                 final String topic,
                                                                 final int expectedNumRecords)
throws InterruptedException {
@@ -299,4 +332,50 @@ public class IntegrationTestUtils {
         }, timeout, "metatadata for topic=" + topic + " partition=" + partition + " not propogated
to all brokers");
 
     }
+
+    /**
+     * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to
read from
+     * are already configured in the consumer).
+     *
+     * @param topic          Kafka topic to read messages from
+     * @param consumer       Kafka consumer
+     * @param waitTime       Maximum wait time in milliseconds
+     * @param maxMessages    Maximum number of messages to read via the consumer
+     * @return The KeyValue elements retrieved via the consumer
+     */
+    private static <K, V> List<KeyValue<K, KeyValue<V, Long>>> readKeyValuesWithTimestamp(final
String topic,
+                                                                                        
 final Consumer<K, V> consumer,
+                                                                                        
 final long waitTime,
+                                                                                        
 final int maxMessages) {
+        final List<KeyValue<K, KeyValue<V, Long>>> consumedValues;
+        consumer.subscribe(Collections.singletonList(topic));
+        final int pollIntervalMs = 100;
+        consumedValues = new ArrayList<>();
+        int totalPollTimeMs = 0;
+        while (totalPollTimeMs < waitTime &&
+            continueConsuming(consumedValues.size(), maxMessages)) {
+            totalPollTimeMs += pollIntervalMs;
+            final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
+
+            for (final ConsumerRecord<K, V> record : records) {
+                consumedValues.add(new KeyValue<>(record.key(), KeyValue.pair(record.value(),
record.timestamp())));
+            }
+        }
+        return consumedValues;
+    }
+
+    /**
+     * Sets up a {@link KafkaConsumer} from a copy of the given configuration that has
+     * {@link ConsumerConfig#AUTO_OFFSET_RESET_CONFIG} set to "earliest" and {@link ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG}
+     * set to "true" to prevent missing events as well as repeat consumption.
+     * @param consumerConfig Consumer configuration
+     * @return Consumer
+     */
+    private static <K, V> KafkaConsumer<K, V> createConsumer(final Properties
consumerConfig) {
+        final Properties filtered = new Properties();
+        filtered.putAll(consumerConfig);
+        filtered.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        filtered.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+        return new KafkaConsumer<>(filtered);
+    }
 }


Mime
View raw message