kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-6190: Use consumer.position() instead of record.offset() to advance in GlobalKTable restoration to avoid transactional control messages
Date Thu, 09 Nov 2017 23:33:40 GMT
Repository: kafka
Updated Branches:
  refs/heads/1.0 c8246cfaa -> 1a5a547bb


KAFKA-6190: Use consumer.position() instead of record.offset() to advance in GlobalKTable
restoration to avoid transactional control messages

Calculate offset using consumer.position() in GlobalStateManagerImp#restoreState

Author: Alex Good <alexjsgood@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Damian Guy <damian.guy@gmail.com>,
Guozhang Wang <wangguoz@gmail.com>

Closes #4197 from alexjg/0.11.0

(cherry picked from commit 1321d89484a9a0657620b20c08ce96ee43d8a691)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1a5a547b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1a5a547b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1a5a547b

Branch: refs/heads/1.0
Commit: 1a5a547bb249eec2c24905dcaea72426ec346506
Parents: c8246cf
Author: Alex Good <alexjsgood@gmail.com>
Authored: Thu Nov 9 15:30:53 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Nov 9 15:32:52 2017 -0800

----------------------------------------------------------------------
 .../internals/GlobalStateManagerImpl.java       |  2 +-
 .../GlobalKTableIntegrationTest.java            | 58 ++++++++++++++++++--
 .../integration/utils/IntegrationTestUtils.java | 41 +++++++++++++-
 3 files changed, 94 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1a5a547b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 10a0775..6052f96 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -189,10 +189,10 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
                 final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
                 final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
                 for (ConsumerRecord<byte[], byte[]> record : records) {
-                    offset = record.offset() + 1;
                     if (record.key() != null) {
                         restoreRecords.add(KeyValue.pair(record.key(), record.value()));
                     }
+                    offset = consumer.position(topicPartition);
                 }
                 stateRestoreAdapter.restoreAll(restoreRecords);
                 stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a5a547b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 0bdd3a3..ba8841a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.integration;
 
 import kafka.utils.MockTime;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -27,6 +28,7 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.ForeachAction;
@@ -51,16 +53,23 @@ import org.junit.experimental.categories.Category;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 
 @Category({IntegrationTest.class})
 public class GlobalKTableIntegrationTest {
     private static final int NUM_BROKERS = 1;
+    private static final Properties BROKER_CONFIG;
+    static {
+        BROKER_CONFIG = new Properties();
+        BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 1);
+        BROKER_CONFIG.put("transaction.state.log.min.isr", 1);
+    }
 
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER =
-            new EmbeddedKafkaCluster(NUM_BROKERS);
+            new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
 
     private static volatile int testNo = 0;
     private final MockTime mockTime = CLUSTER.time;
@@ -225,6 +234,37 @@ public class GlobalKTableIntegrationTest {
         }, 30000L, "waiting for final values");
     }
 
+    @Test
+    public void shouldRestoreTransactionalMessages() throws Exception {
+        produceInitialGlobalTableValues(true);
+        startStreams();
+
+        final Map<Long, String> expected = new HashMap<>();
+        expected.put(1L, "A");
+        expected.put(2L, "B");
+        expected.put(3L, "C");
+        expected.put(4L, "D");
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                ReadOnlyKeyValueStore<Long, String> store = null;
+                try {
+                    store = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long,
String>keyValueStore());
+                } catch (InvalidStateStoreException ex) {
+                    return false;
+                }
+                Map<Long, String> result = new HashMap<>();
+                Iterator<KeyValue<Long, String>> it = store.all();
+                while (it.hasNext()) {
+                    KeyValue<Long, String> kv = it.next();
+                    result.put(kv.key, kv.value);
+                }
+                return result.equals(expected);
+            }
+        }, 30000L, "waiting for initial values");
+        System.out.println("no failed test");
+    }
 
     private void createTopics() throws InterruptedException {
         inputStream = "input-stream-" + testNo;
@@ -256,7 +296,16 @@ public class GlobalKTableIntegrationTest {
                 mockTime);
     }
 
-    private void produceInitialGlobalTableValues() throws Exception {
+    private void produceInitialGlobalTableValues() throws java.util.concurrent.ExecutionException,
InterruptedException {
+        produceInitialGlobalTableValues(false);
+    }
+
+    private void produceInitialGlobalTableValues(final boolean enableTransactions) throws
java.util.concurrent.ExecutionException, InterruptedException {
+        Properties properties = new Properties();
+        if (enableTransactions) {
+            properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
+            properties.put(ProducerConfig.RETRIES_CONFIG, 1);
+        }
         IntegrationTestUtils.produceKeyValuesSynchronously(
                 globalOne,
                 Arrays.asList(
@@ -268,8 +317,9 @@ public class GlobalKTableIntegrationTest {
                         CLUSTER.bootstrapServers(),
                         LongSerializer.class,
                         StringSerializer.class,
-                        new Properties()),
-                mockTime);
+                        properties),
+                mockTime,
+                enableTransactions);
     }
 
     private void produceGlobalTableValues() throws Exception {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a5a547b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index c0eeab3..f50417d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -86,11 +86,26 @@ public class IntegrationTestUtils {
     public static <K, V> void produceKeyValuesSynchronously(
         final String topic, final Collection<KeyValue<K, V>> records, final Properties
producerConfig, final Time time)
         throws ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig,
time, false);
+    }
+
+    /**
+     * @param topic               Kafka topic to write the data records to
+     * @param records             Data records to write to Kafka
+     * @param producerConfig      Kafka producer configuration
+     * @param enableTransactions  Send messages in a transaction
+     * @param <K>                 Key type of the data records
+     * @param <V>                 Value type of the data records
+     */
+    public static <K, V> void produceKeyValuesSynchronously(
+        final String topic, final Collection<KeyValue<K, V>> records, final Properties
producerConfig, final Time time, final boolean enableTransactions)
+        throws ExecutionException, InterruptedException {
         for (final KeyValue<K, V> record : records) {
             produceKeyValuesSynchronouslyWithTimestamp(topic,
                 Collections.singleton(record),
                 producerConfig,
-                time.milliseconds());
+                time.milliseconds(),
+                enableTransactions);
             time.sleep(1L);
         }
     }
@@ -100,12 +115,28 @@ public class IntegrationTestUtils {
                                                                          final Properties
producerConfig,
                                                                          final Long timestamp)
         throws ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig,
timestamp, false);
+    }
+
+    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String
topic,
+                                                                         final Collection<KeyValue<K,
V>> records,
+                                                                         final Properties
producerConfig,
+                                                                         final Long timestamp,
+                                                                         final boolean enableTransactions)
+        throws ExecutionException, InterruptedException {
         try (Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
+            if (enableTransactions) {
+                producer.initTransactions();
+                producer.beginTransaction();
+            }
             for (final KeyValue<K, V> record : records) {
                 final Future<RecordMetadata> f = producer.send(
                     new ProducerRecord<>(topic, null, timestamp, record.key, record.value));
                 f.get();
             }
+            if (enableTransactions) {
+                producer.commitTransaction();
+            }
             producer.flush();
         }
     }
@@ -113,12 +144,18 @@ public class IntegrationTestUtils {
     public static <V> void produceValuesSynchronously(
         final String topic, final Collection<V> records, final Properties producerConfig,
final Time time)
         throws ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceValuesSynchronously(topic, records, producerConfig, time,
false);
+    }
+
+    public static <V> void produceValuesSynchronously(
+        final String topic, final Collection<V> records, final Properties producerConfig,
final Time time, final boolean enableTransactions)
+        throws ExecutionException, InterruptedException {
         final Collection<KeyValue<Object, V>> keyedRecords = new ArrayList<>();
         for (final V value : records) {
             final KeyValue<Object, V> kv = new KeyValue<>(null, value);
             keyedRecords.add(kv);
         }
-        produceKeyValuesSynchronously(topic, keyedRecords, producerConfig, time);
+        produceKeyValuesSynchronously(topic, keyedRecords, producerConfig, time, enableTransactions);
     }
 
     public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final
Properties consumerConfig,


Mime
View raw message