kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4826: Fix some findbugs warnings in Kafka Streams
Date Sun, 05 Mar 2017 18:01:34 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b7378d567 -> 580bebe09


KAFKA-4826: Fix some findbugs warnings in Kafka Streams

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2623 from cmccabe/KAFKA-4826


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/580bebe0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/580bebe0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/580bebe0

Branch: refs/heads/trunk
Commit: 580bebe09798b8f0062a2a1566a1d5836ecad4bb
Parents: b7378d5
Author: Colin P. Mccabe <cmccabe@confluent.io>
Authored: Sun Mar 5 10:01:30 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sun Mar 5 10:01:30 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/kafka/streams/kstream/Window.java  |  4 +++-
 .../kstream/internals/KStreamWindowAggregate.java      |  6 +++---
 .../processor/internals/InternalTopicManager.java      | 10 ++++++----
 .../processor/internals/StoreChangelogReader.java      |  6 ++++--
 .../processor/internals/StreamsKafkaClient.java        |  6 ++++--
 .../processor/internals/assignment/ClientState.java    |  9 +++++----
 .../streams/state/internals/OffsetCheckpoint.java      |  3 ++-
 .../kafka/streams/state/internals/RocksDBStore.java    | 13 ++++++++++---
 .../org/apache/kafka/streams/kstream/WindowTest.java   |  5 +++++
 9 files changed, 42 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/580bebe0/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
index 2365f0e..c342112 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
@@ -86,7 +86,9 @@ public abstract class Window {
         if (obj == this) {
             return true;
         }
-
+        if (obj == null) {
+            return false;
+        }
         if (getClass() != obj.getClass()) {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/580bebe0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index e2ba512..9730511 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -116,11 +116,11 @@ public class KStreamWindowAggregate<K, V, T, W extends Window>
implements KStrea
             }
 
             // create the new window for the rest of unmatched window that do not exist yet
-            for (long windowStartMs : matchedWindows.keySet()) {
+            for (Map.Entry<Long, W> entry : matchedWindows.entrySet()) {
                 T oldAgg = initializer.apply();
                 T newAgg = aggregator.apply(key, value, oldAgg);
-                windowStore.put(key, newAgg, windowStartMs);
-                tupleForwarder.maybeForward(new Windowed<>(key, matchedWindows.get(windowStartMs)),
newAgg, oldAgg);
+                windowStore.put(key, newAgg, entry.getKey());
+                tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()),
newAgg, oldAgg);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/580bebe0/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 8218819..d8575e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -94,15 +94,17 @@ public class InternalTopicManager {
     private Map<InternalTopicConfig, Integer> validateTopicPartitions(final Map<InternalTopicConfig,
Integer> topicsPartitionsMap,
                                                                       final Map<String,
Integer> existingTopicNamesPartitions) {
         final Map<InternalTopicConfig, Integer> topicsToBeCreated = new HashMap<>();
-        for (InternalTopicConfig topic: topicsPartitionsMap.keySet()) {
+        for (Map.Entry<InternalTopicConfig, Integer> entry : topicsPartitionsMap.entrySet())
{
+            InternalTopicConfig topic = entry.getKey();
+            Integer partition = entry.getValue();
             if (existingTopicNamesPartitions.containsKey(topic.name())) {
-                if (!existingTopicNamesPartitions.get(topic.name()).equals(topicsPartitionsMap.get(topic)))
{
+                if (!existingTopicNamesPartitions.get(topic.name()).equals(partition)) {
                     throw new StreamsException("Existing internal topic " + topic.name()
+ " has invalid partitions." +
-                            " Expected: " + topicsPartitionsMap.get(topic) + " Actual: "
+ existingTopicNamesPartitions.get(topic.name()) +
+                            " Expected: " + partition + " Actual: " + existingTopicNamesPartitions.get(topic.name())
+
                             ". Use 'kafka.tools.StreamsResetter' tool to clean up invalid
topics before processing.");
                 }
             } else {
-                topicsToBeCreated.put(topic, topicsPartitionsMap.get(topic));
+                topicsToBeCreated.put(topic, partition);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/580bebe0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 85104fa..f95ea4a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -102,9 +102,11 @@ public class StoreChangelogReader implements ChangelogReader {
 
             // remove any partitions where we already have all of the data
             final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
-            for (final TopicPartition topicPartition : endOffsets.keySet()) {
+            for (final Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet())
{
+                TopicPartition topicPartition = entry.getKey();
+                Long offset = entry.getValue();
                 final StateRestorer restorer = stateRestorers.get(topicPartition);
-                if (restorer.checkpoint() >= endOffsets.get(topicPartition)) {
+                if (restorer.checkpoint() >= offset) {
                     restorer.setRestoredOffset(restorer.checkpoint());
                 } else {
                     needsRestoring.put(topicPartition, restorer);

http://git-wip-us.apache.org/repos/asf/kafka/blob/580bebe0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index 4ca0b4d..c493d74 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -144,14 +144,16 @@ public class StreamsKafkaClient {
                              final long windowChangeLogAdditionalRetention, final MetadataResponse
metadata) {
 
         final Map<String, CreateTopicsRequest.TopicDetails> topicRequestDetails = new
HashMap<>();
-        for (InternalTopicConfig internalTopicConfig:topicsMap.keySet()) {
+        for (Map.Entry<InternalTopicConfig, Integer> entry : topicsMap.entrySet())
{
+            InternalTopicConfig internalTopicConfig = entry.getKey();
+            Integer partitions = entry.getValue();
             final Properties topicProperties = internalTopicConfig.toProperties(windowChangeLogAdditionalRetention);
             final Map<String, String> topicConfig = new HashMap<>();
             for (String key : topicProperties.stringPropertyNames()) {
                 topicConfig.put(key, topicProperties.getProperty(key));
             }
             final CreateTopicsRequest.TopicDetails topicDetails = new CreateTopicsRequest.TopicDetails(
-                topicsMap.get(internalTopicConfig),
+                partitions,
                 (short) replicationFactor,
                 topicConfig);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/580bebe0/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index d5f8ccf..99bd29e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -117,11 +117,12 @@ public class ClientState<T> {
         final double otherLoad = (double) other.assignedTaskCount() / other.capacity;
         final double thisLoad = (double) assignedTaskCount() / capacity;
 
-        if (thisLoad == otherLoad) {
+        if (thisLoad < otherLoad)
+            return true;
+        else if (thisLoad > otherLoad)
+            return false;
+        else
             return capacity > other.capacity;
-        }
-
-        return thisLoad < otherLoad;
     }
 
     Set<T> previousStandbyTasks() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/580bebe0/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
index 57c7a85..b676421 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -28,6 +28,7 @@ import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
+import java.nio.file.Files;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -168,7 +169,7 @@ public class OffsetCheckpoint {
      * @throws IOException if there is any IO exception during delete
      */
     public void delete() throws IOException {
-        file.delete();
+        Files.delete(file.toPath());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/580bebe0/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index db4a03f..823ad47 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
@@ -43,6 +44,8 @@ import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteOptions;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
@@ -141,7 +144,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K,
V> {
                 valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
         this.dbDir = new File(new File(context.stateDir(), parentDir), this.name);
-        this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
+        try {
+            this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
+        } catch (IOException e) {
+            throw new StreamsException(e);
+        }
     }
 
     public void init(ProcessorContext context, StateStore root) {
@@ -160,10 +167,10 @@ public class RocksDBStore<K, V> implements KeyValueStore<K,
V> {
         open = true;
     }
 
-    private RocksDB openDB(File dir, Options options, int ttl) {
+    private RocksDB openDB(File dir, Options options, int ttl) throws IOException {
         try {
             if (ttl == TTL_NOT_USED) {
-                dir.getParentFile().mkdirs();
+                Files.createDirectories(dir.getParentFile().toPath());
                 return RocksDB.open(options, dir.getAbsolutePath());
             } else {
                 throw new UnsupportedOperationException("Change log is not supported for
store " + this.name + " since it is TTL based.");

http://git-wip-us.apache.org/repos/asf/kafka/blob/580bebe0/streams/src/test/java/org/apache/kafka/streams/kstream/WindowTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowTest.java
index e7a579e..4b3c84d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowTest.java
@@ -67,6 +67,11 @@ public class WindowTest {
     }
 
     @Test
+    public void shouldNotBeEqualIfNull() {
+        assertNotEquals(window, null);
+    }
+
+    @Test
     public void shouldNotBeEqualIfStartOrEndIsDifferent() {
         assertNotEquals(window, new TestWindow(0, window.endMs));
         assertNotEquals(window, new TestWindow(7, window.endMs));


Mime
View raw message