kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4060; Follow-up: Throw exceptions when internal topics to create already exist with unexpected number of partitions
Date Wed, 18 Jan 2017 23:54:33 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4a6f2c6cc -> 8e2cbae8b


KAFKA-4060; Follow-up: Throw exceptions when internal topics to create already exist with
unexpected number of partitions

Re-branched the trunk and applied the changes to the new branch to simplify commit log.

Author: Hojjat Jafarpour <hojjat@Hojjat-Jafarpours-MBP.local>

Reviewers: Ismael Juma, Damian Guy, Eno Thereska, Guozhang Wang

Closes #2389 from hjafarpour/KAFKA-4060-Remove-ZkClient-dependency-in-Kafka-Streams-followup-from-trunk

Address Ismael's comments upon merging


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8e2cbae8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8e2cbae8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8e2cbae8

Branch: refs/heads/trunk
Commit: 8e2cbae8b12cd71f99f13953b744163f68021323
Parents: 4a6f2c6
Author: Hojjat Jafarpour <hojjat@Hojjat-Jafarpours-MBP.local>
Authored: Wed Jan 18 15:53:17 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Jan 18 15:54:15 2017 -0800

----------------------------------------------------------------------
 .../internals/InternalTopicManager.java         |  28 ++---
 .../processor/internals/StreamsKafkaClient.java | 104 ++++++----------
 .../internals/InternalTopicManagerTest.java     | 124 +++++++++++++++++++
 .../internals/StreamPartitionAssignorTest.java  |  25 ----
 4 files changed, 174 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8e2cbae8/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 8bb5a3d..133375f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -48,9 +48,13 @@ public class InternalTopicManager {
     }
 
     /**
-     * Prepares the set of given internal topics. If the topic with the correct number of
partitions exists ignores it. For the ones with different number of
-     * partitions delete them and create new ones with correct number of partitons along
with the non existing topics.
+     * Prepares a given internal topic.
+     * If the topic does not exist creates a new topic.
+     * If the topic with the correct number of partitions exists ignores it.
+     * If the topic exists already but has different number of partitions we fail and throw
exception requesting user to reset the app before restarting again.
+     *
      * @param topic
+     * @param numPartitions
      */
     public void makeReady(final InternalTopicConfig topic, int numPartitions) {
 
@@ -58,11 +62,9 @@ public class InternalTopicManager {
         topics.put(topic, numPartitions);
         for (int i = 0; i < MAX_TOPIC_READY_TRY; i++) {
             try {
-                Collection<MetadataResponse.TopicMetadata> topicMetadatas = streamsKafkaClient.fetchTopicMetadata();
-                Map<InternalTopicConfig, Integer> topicsToBeDeleted = getTopicsToBeDeleted(topics,
topicMetadatas);
-                Map<InternalTopicConfig, Integer> topicsToBeCreated = filterExistingTopics(topics,
topicMetadatas);
-                topicsToBeCreated.putAll(topicsToBeDeleted);
-                streamsKafkaClient.deleteTopics(topicsToBeDeleted);
+                Collection<MetadataResponse.TopicMetadata> topicsMetadata = streamsKafkaClient.fetchTopicsMetadata();
+                validateTopicPartitons(topics, topicsMetadata);
+                Map<InternalTopicConfig, Integer> topicsToBeCreated = filterExistingTopics(topics,
topicsMetadata);
                 streamsKafkaClient.createTopics(topicsToBeCreated, replicationFactor, windowChangeLogAdditionalRetention);
                 return;
             } catch (StreamsException ex) {
@@ -99,24 +101,22 @@ public class InternalTopicManager {
         return nonExistingTopics;
     }
 
+
     /**
-     * Return the topics that exist but have different partiton number to be deleted.
+     * Make sure the existing topics have correct number of partitions.
+     *
      * @param topicsPartitionsMap
      * @param topicsMetadata
-     * @return
      */
-    private Map<InternalTopicConfig, Integer> getTopicsToBeDeleted(final Map<InternalTopicConfig,
Integer> topicsPartitionsMap, Collection<MetadataResponse.TopicMetadata> topicsMetadata)
{
+    private void validateTopicPartitons(final Map<InternalTopicConfig, Integer> topicsPartitionsMap,
Collection<MetadataResponse.TopicMetadata> topicsMetadata) {
         Map<String, Integer> existingTopicNamesPartitions = getExistingTopicNamesPartitions(topicsMetadata);
-        Map<InternalTopicConfig, Integer> deleteTopics = new HashMap<>();
-        // Add the topics that don't exist to the nonExistingTopics.
         for (InternalTopicConfig topic: topicsPartitionsMap.keySet()) {
             if (existingTopicNamesPartitions.get(topic.name()) != null) {
                 if (existingTopicNamesPartitions.get(topic.name()) != topicsPartitionsMap.get(topic))
{
-                    deleteTopics.put(topic, topicsPartitionsMap.get(topic));
+                    throw new StreamsException("Internal topic with invalid partitons. Use
'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.");
                 }
             }
         }
-        return deleteTopics;
     }
 
     private Map<String, Integer> getExistingTopicNamesPartitions(Collection<MetadataResponse.TopicMetadata>
topicsMetadata) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e2cbae8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index f33a4e4..8ea570f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -33,11 +33,8 @@ import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
-import org.apache.kafka.common.requests.DeleteTopicsRequest;
-import org.apache.kafka.common.requests.DeleteTopicsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
-import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
@@ -45,14 +42,13 @@ import org.apache.kafka.streams.errors.StreamsException;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.LinkedHashMap;
 import java.util.Properties;
+import java.util.HashMap;
 import java.util.Collection;
+import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
 public class StreamsKafkaClient {
@@ -125,10 +121,13 @@ public class StreamsKafkaClient {
 
             topicRequestDetails.put(internalTopicConfig.name(), topicDetails);
         }
-        final CreateTopicsRequest.Builder createTopicsRequest =
-                new CreateTopicsRequest.Builder(topicRequestDetails,
-                        streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG));
-        final ClientResponse clientResponse = sendRequest(createTopicsRequest);
+
+        final ClientRequest clientRequest = kafkaClient.newClientRequest(getBrokerId(), new
CreateTopicsRequest.Builder(topicRequestDetails, streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG)),
Time.SYSTEM.milliseconds(), true, null);
+        final ClientResponse clientResponse = sendRequest(clientRequest);
+
+        if (!clientResponse.hasResponse()) {
+            throw new StreamsException("Empty response for client request.");
+        }
         if (!(clientResponse.responseBody() instanceof CreateTopicsResponse)) {
             throw new StreamsException("Inconsistent response type for internal topic creation
request. Expected CreateTopicsResponse but received " + clientResponse.responseBody().getClass().getName());
         }
@@ -146,51 +145,8 @@ public class StreamsKafkaClient {
         }
     }
 
-    /**
-     * Delets a set of topics.
-     *
-     * @param topics
-     */
-    public void deleteTopics(final Map<InternalTopicConfig, Integer> topics) {
-
-        final Set<String> topicNames = new HashSet<>();
-        for (InternalTopicConfig internalTopicConfig: topics.keySet()) {
-            topicNames.add(internalTopicConfig.name());
-        }
-        deleteTopics(topicNames);
-    }
-
-    /**
-     * Delete a set of topics in one request.
-     *
-     * @param topics
-     */
-    private void deleteTopics(final Set<String> topics) {
-
-        final DeleteTopicsRequest.Builder deleteTopicsRequest =
-                new DeleteTopicsRequest.Builder(topics, streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG));
-        final ClientResponse clientResponse = sendRequest(deleteTopicsRequest);
-        if (!(clientResponse.responseBody() instanceof DeleteTopicsResponse)) {
-            throw new StreamsException("Inconsistent response type for internal topic deletion
request. Expected DeleteTopicsResponse but received " + clientResponse.responseBody().getClass().getName());
-        }
-        final DeleteTopicsResponse deleteTopicsResponse = (DeleteTopicsResponse) clientResponse.responseBody();
-        for (Map.Entry<String, Errors> entry : deleteTopicsResponse.errors().entrySet())
{
-            if (entry.getValue() != Errors.NONE) {
-                throw new StreamsException("Could not delete topic: " + entry.getKey() +
" due to " + entry.getValue().message());
-            }
-        }
-
-    }
-
-    /**
-     * Send a request to kafka broker of this client. Keep polling until the corresponding
response is received.
-     *
-     * @param request
-     */
-    private ClientResponse sendRequest(final AbstractRequest.Builder<?> request) {
-
+    private String getBrokerId() {
         String brokerId = null;
-
         final Metadata metadata = new Metadata(streamsConfig.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG));
         final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
         metadata.update(Cluster.bootstrap(addresses), Time.SYSTEM.milliseconds());
@@ -211,12 +167,11 @@ public class StreamsKafkaClient {
         if (brokerId == null) {
             throw new StreamsException("Could not find any available broker.");
         }
+        return brokerId;
+    }
 
-        final ClientRequest clientRequest = kafkaClient.newClientRequest(
-                brokerId, request, Time.SYSTEM.milliseconds(), true, null);
-
+    private ClientResponse sendRequest(final ClientRequest clientRequest) {
         kafkaClient.send(clientRequest, Time.SYSTEM.milliseconds());
-
         final long responseTimeout = Time.SYSTEM.milliseconds() + streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG);
         // Poll for the response.
         while (Time.SYSTEM.milliseconds() < responseTimeout) {
@@ -229,24 +184,27 @@ public class StreamsKafkaClient {
                 if (response.requestHeader().correlationId() == clientRequest.correlationId())
{
                     return response;
                 } else {
-                    throw new StreamsException("Inconsistent response received from broker
" + brokerId +
-                            ", expected correlation id " + clientRequest.correlationId()
+ ", but received " +
+                    throw new StreamsException("Inconsistent response received from the broker
" + clientRequest.destination() + ", expected correlation id " + clientRequest.correlationId()
+ ", but received " +
                             response.requestHeader().correlationId());
                 }
             }
         }
         throw new StreamsException("Failed to get response from broker within timeout");
+
     }
 
 
-    /**
-     * Get the metadata for a topic.
+     /**
+     * Fetch the metadata for a topic.
      * @param topic
      * @return
      */
-    public MetadataResponse.TopicMetadata getTopicMetadata(final String topic) {
-
-        final ClientResponse clientResponse = sendRequest(MetadataRequest.Builder.allTopics());
+    public MetadataResponse.TopicMetadata fetchTopicMetadata(final String topic) {
+        final ClientRequest clientRequest = kafkaClient.newClientRequest(getBrokerId(), new
MetadataRequest.Builder(Arrays.asList(topic)), Time.SYSTEM.milliseconds(), true, null);
+        final ClientResponse clientResponse = sendRequest(clientRequest);
+        if (!clientResponse.hasResponse()) {
+            throw new StreamsException("Empty response for client request.");
+        }
         if (!(clientResponse.responseBody() instanceof MetadataResponse)) {
             throw new StreamsException("Inconsistent response type for internal topic metadata
request. Expected MetadataResponse but received " + clientResponse.responseBody().getClass().getName());
         }
@@ -260,8 +218,18 @@ public class StreamsKafkaClient {
     }
 
 
-    public Collection<MetadataResponse.TopicMetadata> fetchTopicMetadata() {
-        final ClientResponse clientResponse = sendRequest(MetadataRequest.Builder.allTopics());
+    /**
+     * Fetch the metadata for all topics
+     *
+     * @return
+     */
+    public Collection<MetadataResponse.TopicMetadata> fetchTopicsMetadata() {
+
+        final ClientRequest clientRequest = kafkaClient.newClientRequest(getBrokerId(), new
MetadataRequest.Builder(null), Time.SYSTEM.milliseconds(), true, null);
+        final ClientResponse clientResponse = sendRequest(clientRequest);
+        if (!clientResponse.hasResponse()) {
+            throw new StreamsException("Empty response for client request.");
+        }
         if (!(clientResponse.responseBody() instanceof MetadataResponse)) {
             throw new StreamsException("Inconsistent response type for internal topic metadata
request. Expected MetadataResponse but received " + clientResponse.responseBody().getClass().getName());
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e2cbae8/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
new file mode 100644
index 0000000..f828099
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.test.MockTimestampExtractor;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Arrays;
+import java.util.ArrayList;
+
+import static org.apache.kafka.streams.processor.internals.InternalTopicManager.WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT;
+
+public class InternalTopicManagerTest {
+
+    private String userEndPoint = "localhost:2171";
+    StreamsConfig config;
+    MockStreamKafkaClient streamsKafkaClient;
+
+    @Before
+    public void init() {
+        config = new StreamsConfig(configProps());
+        streamsKafkaClient = new MockStreamKafkaClient(config);
+    }
+
+    @Test
+    public void shouldCreateRequiredTopics() throws Exception {
+
+        streamsKafkaClient.setReturnCorrectTopic(true);
+        InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient,
1, WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT);
+        internalTopicManager.makeReady(new InternalTopicConfig("test_topic", Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
null), 1);
+    }
+
+    @Test
+    public void shouldNotCreateTopicIfExistsWithDifferentPartitions() throws Exception {
+
+        streamsKafkaClient.setReturnCorrectTopic(true);
+        InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient,
1, WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT);
+        boolean exceptionWasThrown = false;
+        try {
+            internalTopicManager.makeReady(new InternalTopicConfig("test_topic", Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
null), 2);
+        } catch (StreamsException e) {
+            exceptionWasThrown = true;
+        }
+        Assert.assertTrue(exceptionWasThrown);
+    }
+
+    private Properties configProps() {
+        return new Properties() {
+            {
+                setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "Internal-Topic-ManagerTest");
+                setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, userEndPoint);
+                setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
+                setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
+            }
+        };
+    }
+
+    private class MockStreamKafkaClient extends StreamsKafkaClient {
+        public MockStreamKafkaClient(final StreamsConfig streamsConfig) {
+            super(streamsConfig);
+        }
+
+        public boolean isReturnCorrectTopic() {
+            return returnCorrectTopic;
+        }
+
+        public void setReturnCorrectTopic(boolean returnCorrectTopic) {
+            this.returnCorrectTopic = returnCorrectTopic;
+        }
+
+        boolean returnCorrectTopic = false;
+
+
+        @Override
+        public void createTopics(final Map<InternalTopicConfig, Integer> topicsMap,
final int replicationFactor, final long windowChangeLogAdditionalRetention) {
+
+        }
+
+        @Override
+        public MetadataResponse.TopicMetadata fetchTopicMetadata(final String topic) {
+
+            if (returnCorrectTopic) {
+                MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE,
1, null, new ArrayList<Node>(), new ArrayList<Node>());
+                MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE,
topic, true, Arrays.asList(partitionMetadata));
+                return topicMetadata;
+            }
+            return null;
+        }
+
+        @Override
+        public Collection<MetadataResponse.TopicMetadata> fetchTopicsMetadata() {
+            if (returnCorrectTopic) {
+                return Arrays.asList(fetchTopicMetadata("test_topic"));
+            }
+            return null;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e2cbae8/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 5aa40c8..c212f14 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
@@ -1062,28 +1061,4 @@ public class StreamPartitionAssignorTest {
         return info;
     }
 
-    private class MockInternalTopicManager extends InternalTopicManager {
-
-        Map<String, Integer> readyTopics = new HashMap<>();
-        MockConsumer<byte[], byte[]> restoreConsumer;
-
-        MockInternalTopicManager(StreamsConfig streamsConfig, MockConsumer<byte[], byte[]>
restoreConsumer) {
-            super(new StreamsKafkaClient(streamsConfig), 0, 0);
-
-            this.restoreConsumer = restoreConsumer;
-        }
-
-        @Override
-        public void makeReady(InternalTopicConfig topic, int numPartitions) {
-            readyTopics.put(topic.name(), numPartitions);
-
-            List<PartitionInfo> partitions = new ArrayList<>();
-            for (int i = 0; i < numPartitions; i++) {
-                partitions.add(new PartitionInfo(topic.name(), i, null, null, null));
-            }
-
-            restoreConsumer.updatePartitions(topic.name(), partitions);
-        }
-    }
-
 }


Mime
View raw message