kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 2.1 updated: KAFKA-7895: Revert suppress changelog bugfix for 2.1 (#7373)
Date Fri, 27 Sep 2019 20:09:06 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new d90c5ca  KAFKA-7895: Revert suppress changelog bugfix for 2.1 (#7373)
d90c5ca is described below

commit d90c5ca32e3582cb13a08671f9382e050b22fabd
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Fri Sep 27 15:08:45 2019 -0500

    KAFKA-7895: Revert suppress changelog bugfix for 2.1 (#7373)
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
---
 .../InMemoryTimeOrderedKeyValueBuffer.java         |  57 +++-----
 .../SuppressionDurabilityIntegrationTest.java      |  60 ---------
 .../internals/TimeOrderedKeyValueBufferTest.java   | 145 ++++-----------------
 3 files changed, 46 insertions(+), 216 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index 82d07a8..42abd70 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -17,9 +17,6 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.header.internals.RecordHeader;
-import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.BytesSerializer;
 import org.apache.kafka.common.serialization.Serde;
@@ -50,8 +47,6 @@ import static java.util.Objects.requireNonNull;
 public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyValueBuffer<K,
V> {
     private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
     private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
-    private static final RecordHeaders V_1_CHANGELOG_HEADERS =
-        new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})});
 
     private final Map<Bytes, BufferKey> index = new HashMap<>();
     private final TreeMap<BufferKey, ContextualRecord> sortedMap = new TreeMap<>();
@@ -89,7 +84,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements
TimeOrdere
          * As of 2.1, there's no way for users to directly interact with the buffer,
          * so this method is implemented solely to be called by Streams (which
          * it will do based on the {@code cache.max.bytes.buffering} config.
-         *
+         * <p>
          * It's currently a no-op.
          */
         @Override
@@ -101,7 +96,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements
TimeOrdere
          * As of 2.1, there's no way for users to directly interact with the buffer,
          * so this method is implemented solely to be called by Streams (which
          * it will do based on the {@code cache.max.bytes.buffering} config.
-         *
+         * <p>
          * It's currently a no-op.
          */
         @Override
@@ -259,23 +254,23 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements
TimeOrdere
     }
 
     private void logValue(final Bytes key, final BufferKey bufferKey, final ContextualRecord
value) {
-        final byte[] serializedContextualRecord = value.serialize();
+        final byte[] innerValue = value.value();
 
         final int sizeOfBufferTime = Long.BYTES;
-        final int sizeOfContextualRecord = serializedContextualRecord.length;
+        final int sizeOfContextualRecord = innerValue.length;
 
         final byte[] timeAndContextualRecord = ByteBuffer.wrap(new byte[sizeOfBufferTime
+ sizeOfContextualRecord])
                                                          .putLong(bufferKey.time)
-                                                         .put(serializedContextualRecord)
+                                                         .put(innerValue)
                                                          .array();
 
         collector.send(
             changelogTopic,
             key,
             timeAndContextualRecord,
-            V_1_CHANGELOG_HEADERS,
-            partition,
             null,
+            partition,
+            value.recordContext().timestamp(),
             KEY_SERIALIZER,
             VALUE_SERIALIZER
         );
@@ -323,32 +318,20 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements
TimeOrdere
                 final long time = timeAndValue.getLong();
                 final byte[] value = new byte[record.value().length - 8];
                 timeAndValue.get(value);
-                if (record.headers().lastHeader("v") == null) {
-                    cleanPut(
-                        time,
-                        key,
-                        new ContextualRecord(
-                            value,
-                            new ProcessorRecordContext(
-                                record.timestamp(),
-                                record.offset(),
-                                record.partition(),
-                                record.topic(),
-                                record.headers()
-                            )
+                cleanPut(
+                    time,
+                    key,
+                    new ContextualRecord(
+                        value,
+                        new ProcessorRecordContext(
+                            record.timestamp(),
+                            record.offset(),
+                            record.partition(),
+                            record.topic(),
+                            null
                         )
-                    );
-                } else if (V_1_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v")))
{
-                    final ContextualRecord contextualRecord = ContextualRecord.deserialize(ByteBuffer.wrap(value));
-
-                    cleanPut(
-                        time,
-                        key,
-                        contextualRecord
-                    );
-                } else {
-                    throw new IllegalArgumentException("Restoring apparently invalid changelog
record: " + record);
-                }
+                    )
+                );
             }
             if (record.partition() != partition) {
                 throw new IllegalStateException(
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
index fa49386..9b8982b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
@@ -38,9 +37,6 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.kstream.Transformer;
-import org.apache.kafka.streams.kstream.TransformerSupplier;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.ClassRule;
@@ -49,8 +45,6 @@ import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.HashSet;
@@ -60,7 +54,6 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static java.lang.Long.MAX_VALUE;
 import static java.time.Duration.ofMillis;
@@ -77,7 +70,6 @@ import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecord
 import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
 
 @RunWith(Parameterized.class)
 @Category(IntegrationTest.class)
@@ -132,16 +124,11 @@ public class SuppressionDurabilityIntegrationTest {
         final AtomicInteger eventCount = new AtomicInteger(0);
         suppressedCounts.foreach((key, value) -> eventCount.incrementAndGet());
 
-        // expect all post-suppress records to keep the right input topic
-        final MetadataValidator metadataValidator = new MetadataValidator(input);
-
         suppressedCounts
-            .transform(metadataValidator)
             .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
 
         valueCounts
             .toStream()
-            .transform(metadataValidator)
             .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
 
         final Properties streamsConfig = mkProperties(mkMap(
@@ -237,59 +224,12 @@ public class SuppressionDurabilityIntegrationTest {
                 )
             );
 
-            metadataValidator.raiseExceptionIfAny();
-
         } finally {
             driver.close();
             cleanStateAfterTest(CLUSTER, driver);
         }
     }
 
-    private static final class MetadataValidator implements TransformerSupplier<String,
Long, KeyValue<String, Long>> {
-        private static final Logger LOG = LoggerFactory.getLogger(MetadataValidator.class);
-        private final AtomicReference<Throwable> firstException = new AtomicReference<>();
-        private final String topic;
-
-        MetadataValidator(final String topic) {
-            this.topic = topic;
-        }
-
-        @Override
-        public Transformer<String, Long, KeyValue<String, Long>> get() {
-            return new Transformer<String, Long, KeyValue<String, Long>>() {
-                private ProcessorContext context;
-
-                @Override
-                public void init(final ProcessorContext context) {
-                    this.context = context;
-                }
-
-                @Override
-                public KeyValue<String, Long> transform(final String key, final Long
value) {
-                    try {
-                        assertThat(context.topic(), equalTo(topic));
-                    } catch (final Throwable e) {
-                        firstException.compareAndSet(null, e);
-                        LOG.error("Validation Failed", e);
-                    }
-                    return new KeyValue<>(key, value);
-                }
-
-                @Override
-                public void close() {
-
-                }
-            };
-        }
-
-        void raiseExceptionIfAny() {
-            final Throwable exception = firstException.get();
-            if (exception != null) {
-                throw new AssertionError("Got an exception during run", exception);
-            }
-        }
-    }
-
     private static void verifyOutput(final String topic, final List<KeyValueTimestamp<String,
Long>> keyValueTimestamps) {
         final Properties properties = mkProperties(
             mkMap(
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 6ae36d4..8253728 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -53,6 +53,7 @@ import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.fail;
 
 @RunWith(Parameterized.class)
@@ -258,8 +259,10 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
-        putRecord(buffer, context, 2L, 0L, "asdf", "2093j");
-        putRecord(buffer, context, 1L, 1L, "zxcv", "3gon4i");
+        final String value1 = "2093j";
+        final String value2 = "3gon4i";
+        putRecord(buffer, context, 2L, 0L, "asdf", value1);
+        putRecord(buffer, context, 1L, 1L, "zxcv", value2);
         putRecord(buffer, context, 0L, 2L, "deleteme", "deadbeef");
 
         // replace "deleteme" with a tombstone
@@ -272,20 +275,21 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
         // which we can't compare for equality using ProducerRecord.
         // As a workaround, I'm deserializing them and shoving them in a KeyValue, just for
ease of testing.
 
-        final List<ProducerRecord<String, KeyValue<Long, ContextualRecord>>>
collected =
+        final List<ProducerRecord<String, KeyValue<Long, String>>> collected
=
             ((MockRecordCollector) context.recordCollector())
                 .collected()
                 .stream()
                 .map(pr -> {
-                    final KeyValue<Long, ContextualRecord> niceValue;
+                    final KeyValue<Long, String> niceValue;
                     if (pr.value() == null) {
                         niceValue = null;
                     } else {
                         final byte[] timestampAndValue = pr.value();
                         final ByteBuffer wrap = ByteBuffer.wrap(timestampAndValue);
                         final long timestamp = wrap.getLong();
-                        final ContextualRecord contextualRecord = ContextualRecord.deserialize(wrap);
-                        niceValue = new KeyValue<>(timestamp, contextualRecord);
+                        final byte[] value = new byte[pr.value().length - Long.BYTES];
+                        wrap.get(value);
+                        niceValue = new KeyValue<>(timestamp, new String(value, UTF_8));
                     }
 
                     return new ProducerRecord<>(pr.topic(),
@@ -303,21 +307,21 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                  null,
                                  "deleteme",
                                  null,
-                                 new RecordHeaders()
+                                 null
             ),
             new ProducerRecord<>(APP_ID + "-" + testName + "-changelog",
                                  0,
-                                 null,
+                                 1L,
                                  "zxcv",
-                                 new KeyValue<>(1L, getRecord("3gon4i", 1)),
-                                 V_1_CHANGELOG_HEADERS
+                                 new KeyValue<>(1L, value2),
+                                 null
             ),
             new ProducerRecord<>(APP_ID + "-" + testName + "-changelog",
                                  0,
-                                 null,
+                                 0L,
                                  "asdf",
-                                 new KeyValue<>(2L, getRecord("2093j", 0)),
-                                 V_1_CHANGELOG_HEADERS
+                                 new KeyValue<>(2L, value1),
+                                 null
             )
         )));
 
@@ -406,118 +410,18 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
             new Eviction<>(
                 "zxcv",
                 "3o4im",
-                new ProcessorRecordContext(2L, 2, 0, "changelog-topic", new RecordHeaders())),
+                new ProcessorRecordContext(2L, 2, 0, "changelog-topic", null)),
             new Eviction<>(
                 "asdf",
                 "qwer",
-                new ProcessorRecordContext(1L, 1, 0, "changelog-topic", new RecordHeaders()))
+                new ProcessorRecordContext(1L, 1, 0, "changelog-topic", null))
         )));
 
         cleanup(context, buffer);
     }
 
     @Test
-    public void shouldRestoreNewFormat() {
-        final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
-        final MockInternalProcessorContext context = makeContext();
-        buffer.init(context, buffer);
-
-        final RecordBatchingStateRestoreCallback stateRestoreCallback =
-            (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName);
-
-        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null));
-
-        final RecordHeaders v1FlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v",
new byte[] {(byte) 1})});
-
-        final byte[] todeleteValue = getRecord("doomed", 0).serialize();
-        final byte[] asdfValue = getRecord("qwer", 1).serialize();
-        final byte[] zxcvValue = getRecord("3o4im", 2).serialize();
-        stateRestoreCallback.restoreBatch(asList(
-            new ConsumerRecord<>("changelog-topic",
-                                 0,
-                                 0,
-                                 999,
-                                 TimestampType.CREATE_TIME,
-                                 -1L,
-                                 -1,
-                                 -1,
-                                 "todelete".getBytes(UTF_8),
-                                 ByteBuffer.allocate(Long.BYTES + todeleteValue.length).putLong(0L).put(todeleteValue).array(),
-                                 v1FlagHeaders),
-            new ConsumerRecord<>("changelog-topic",
-                                 0,
-                                 1,
-                                 9999,
-                                 TimestampType.CREATE_TIME,
-                                 -1L,
-                                 -1,
-                                 -1,
-                                 "asdf".getBytes(UTF_8),
-                                 ByteBuffer.allocate(Long.BYTES + asdfValue.length).putLong(2L).put(asdfValue).array(),
-                                 v1FlagHeaders),
-            new ConsumerRecord<>("changelog-topic",
-                                 0,
-                                 2,
-                                 99,
-                                 TimestampType.CREATE_TIME,
-                                 -1L,
-                                 -1,
-                                 -1,
-                                 "zxcv".getBytes(UTF_8),
-                                 ByteBuffer.allocate(Long.BYTES + zxcvValue.length).putLong(1L).put(zxcvValue).array(),
-                                 v1FlagHeaders)
-        ));
-
-        assertThat(buffer.numRecords(), is(3));
-        assertThat(buffer.minTimestamp(), is(0L));
-        assertThat(buffer.bufferSize(), is(130L));
-
-        stateRestoreCallback.restoreBatch(singletonList(
-            new ConsumerRecord<>("changelog-topic",
-                                 0,
-                                 3,
-                                 3,
-                                 TimestampType.CREATE_TIME,
-                                 -1L,
-                                 -1,
-                                 -1,
-                                 "todelete".getBytes(UTF_8),
-                                 null)
-        ));
-
-        assertThat(buffer.numRecords(), is(2));
-        assertThat(buffer.minTimestamp(), is(1L));
-        assertThat(buffer.bufferSize(), is(83L));
-
-        // flush the buffer into a list in buffer order so we can make assertions about the
contents.
-
-        final List<Eviction<String, String>> evicted = new LinkedList<>();
-        buffer.evictWhile(() -> true, evicted::add);
-
-        // Several things to note:
-        // * The buffered records are ordered according to their buffer time (serialized
in the value of the changelog)
-        // * The record timestamps are properly restored, and not conflated with the record's
buffer time.
-        // * The keys and values are properly restored
-        // * The record topic is set to the original input topic, *not* the changelog topic
-        // * The record offset preserves the original input record's offset, *not* the offset
of the changelog record
-
-
-        assertThat(evicted, is(asList(
-            new Eviction<>(
-                "zxcv",
-                "3o4im",
-                getContext(2L)),
-            new Eviction<>(
-                "asdf",
-                "qwer",
-                getContext(1L)
-            ))));
-
-        cleanup(context, buffer);
-    }
-
-    @Test
-    public void shouldNotRestoreUnrecognizedVersionRecord() {
+    public void shouldIgnoreHeadersOnRestore() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
@@ -544,9 +448,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<S
                                      ByteBuffer.allocate(Long.BYTES + todeleteValue.length).putLong(0L).put(todeleteValue).array(),
                                      unknownFlagHeaders)
             ));
-            fail("expected an exception");
-        } catch (final IllegalArgumentException expected) {
-            // nothing to do.
+
+            final List<Eviction<String, String>> evicted = new LinkedList<>();
+            buffer.evictWhile(() -> buffer.numRecords() > 0, evicted::add);
+            assertThat(evicted.size(), is(1));
+            final Eviction<String, String> eviction = evicted.get(0);
+            assertThat(eviction.recordContext().headers(), nullValue());
         } finally {
             cleanup(context, buffer);
         }


Mime
View raw message