kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch trunk updated: MINOR: KafkaAdminClient Java 8 code cleanup (#5594)
Date Thu, 13 Sep 2018 04:13:15 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 398d2ab  MINOR: KafkaAdminClient Java 8 code cleanup (#5594)
398d2ab is described below

commit 398d2ab244a40e7f975faa1ed60c0e0b14cf4674
Author: Viktor Somogyi <viktorsomogyi@gmail.com>
AuthorDate: Thu Sep 13 06:13:03 2018 +0200

    MINOR: KafkaAdminClient Java 8 code cleanup (#5594)
    
    Use lambdas and diamond operator whenever possible.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 .../kafka/clients/admin/KafkaAdminClient.java      | 73 ++++++++++------------
 1 file changed, 34 insertions(+), 39 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 5759d63..7abe7ef 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -379,7 +379,7 @@ public class KafkaAdminClient extends AdminClient {
         String clientId = generateClientId(config);
 
         try {
-            metrics = new Metrics(new MetricConfig(), new LinkedList<MetricsReporter>(),
time);
+            metrics = new Metrics(new MetricConfig(), new LinkedList<>(), time);
             LogContext logContext = createLogContext(clientId);
             AdminMetadataManager metadataManager = new AdminMetadataManager(logContext,
                 config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
@@ -897,7 +897,7 @@ public class KafkaAdminClient extends AdminClient {
                 }
                 Call call = calls.remove(0);
                 int timeoutMs = calcTimeoutMsRemainingAsInt(now, call.deadlineMs);
-                AbstractRequest.Builder<?> requestBuilder = null;
+                AbstractRequest.Builder<?> requestBuilder;
                 try {
                     requestBuilder = call.createRequest(timeoutMs);
                 } catch (Throwable throwable) {
@@ -1201,7 +1201,7 @@ public class KafkaAdminClient extends AdminClient {
                     // Since this only requests node information, it's safe to pass true
                     // for allowAutoTopicCreation (and it simplifies communication with
                     // older brokers)
-                    return new MetadataRequest.Builder(Collections.<String>emptyList(),
true);
+                    return new MetadataRequest.Builder(Collections.emptyList(), true);
                 }
 
                 @Override
@@ -1248,7 +1248,7 @@ public class KafkaAdminClient extends AdminClient {
                     newTopic.name() + "' cannot be represented in a request."));
                 topicFutures.put(newTopic.name(), future);
             } else if (!topicFutures.containsKey(newTopic.name())) {
-                topicFutures.put(newTopic.name(), new KafkaFutureImpl<Void>());
+                topicFutures.put(newTopic.name(), new KafkaFutureImpl<>());
                 topicsMap.put(newTopic.name(), newTopic.convertToTopicDetails());
             }
         }
@@ -1304,7 +1304,7 @@ public class KafkaAdminClient extends AdminClient {
         if (!topicsMap.isEmpty()) {
             runnable.call(call, now);
         }
-        return new CreateTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures));
+        return new CreateTopicsResult(new HashMap<>(topicFutures));
     }
 
     @Override
@@ -1319,7 +1319,7 @@ public class KafkaAdminClient extends AdminClient {
                     topicName + "' cannot be represented in a request."));
                 topicFutures.put(topicName, future);
             } else if (!topicFutures.containsKey(topicName)) {
-                topicFutures.put(topicName, new KafkaFutureImpl<Void>());
+                topicFutures.put(topicName, new KafkaFutureImpl<>());
                 validTopicNames.add(topicName);
             }
         }
@@ -1375,7 +1375,7 @@ public class KafkaAdminClient extends AdminClient {
         if (!validTopicNames.isEmpty()) {
             runnable.call(call, now);
         }
-        return new DeleteTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures));
+        return new DeleteTopicsResult(new HashMap<>(topicFutures));
     }
 
     @Override
@@ -1417,12 +1417,12 @@ public class KafkaAdminClient extends AdminClient {
         final ArrayList<String> topicNamesList = new ArrayList<>();
         for (String topicName : topicNames) {
             if (topicNameIsUnrepresentable(topicName)) {
-                KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<TopicDescription>();
+                KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>();
                 future.completeExceptionally(new InvalidTopicException("The given topic name
'" +
                     topicName + "' cannot be represented in a request."));
                 topicFutures.put(topicName, future);
             } else if (!topicFutures.containsKey(topicName)) {
-                topicFutures.put(topicName, new KafkaFutureImpl<TopicDescription>());
+                topicFutures.put(topicName, new KafkaFutureImpl<>());
                 topicNamesList.add(topicName);
             }
         }
@@ -1467,12 +1467,7 @@ public class KafkaAdminClient extends AdminClient {
                             Arrays.asList(partitionInfo.inSyncReplicas()));
                         partitions.add(topicPartitionInfo);
                     }
-                    Collections.sort(partitions, new Comparator<TopicPartitionInfo>()
{
-                        @Override
-                        public int compare(TopicPartitionInfo tp1, TopicPartitionInfo tp2)
{
-                            return Integer.compare(tp1.partition(), tp2.partition());
-                        }
-                    });
+                    partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
                     TopicDescription topicDescription = new TopicDescription(topicName, isInternal,
partitions);
                     future.complete(topicDescription);
                 }
@@ -1501,7 +1496,7 @@ public class KafkaAdminClient extends AdminClient {
         if (!topicNamesList.isEmpty()) {
             runnable.call(call, now);
         }
-        return new DescribeTopicsResult(new HashMap<String, KafkaFuture<TopicDescription>>(topicFutures));
+        return new DescribeTopicsResult(new HashMap<>(topicFutures));
     }
 
     @Override
@@ -1517,7 +1512,7 @@ public class KafkaAdminClient extends AdminClient {
             AbstractRequest.Builder createRequest(int timeoutMs) {
                 // Since this only requests node information, it's safe to pass true for
allowAutoTopicCreation (and it
                 // simplifies communication with older brokers)
-                return new MetadataRequest.Builder(Collections.<String>emptyList(),
true);
+                return new MetadataRequest.Builder(Collections.emptyList(), true);
             }
 
             @Override
@@ -1627,7 +1622,7 @@ public class KafkaAdminClient extends AdminClient {
                 completeAllExceptionally(futures.values(), throwable);
             }
         }, now);
-        return new CreateAclsResult(new HashMap<AclBinding, KafkaFuture<Void>>(futures));
+        return new CreateAclsResult(new HashMap<>(futures));
     }
 
     @Override
@@ -1638,7 +1633,7 @@ public class KafkaAdminClient extends AdminClient {
         for (AclBindingFilter filter : filters) {
             if (futures.get(filter) == null) {
                 filterList.add(filter);
-                futures.put(filter, new KafkaFutureImpl<FilterResults>());
+                futures.put(filter, new KafkaFutureImpl<>());
             }
         }
         runnable.call(new Call("deleteAcls", calcDeadlineMs(now, options.timeoutMs()),
@@ -1679,7 +1674,7 @@ public class KafkaAdminClient extends AdminClient {
                 completeAllExceptionally(futures.values(), throwable);
             }
         }, now);
-        return new DeleteAclsResult(new HashMap<AclBindingFilter, KafkaFuture<FilterResults>>(futures));
+        return new DeleteAclsResult(new HashMap<>(futures));
     }
 
     @Override
@@ -1899,7 +1894,7 @@ public class KafkaAdminClient extends AdminClient {
         final Map<TopicPartitionReplica, KafkaFutureImpl<Void>> futures = new
HashMap<>(replicaAssignment.size());
 
         for (TopicPartitionReplica replica : replicaAssignment.keySet())
-            futures.put(replica, new KafkaFutureImpl<Void>());
+            futures.put(replica, new KafkaFutureImpl<>());
 
         Map<Integer, Map<TopicPartition, String>> replicaAssignmentByBroker =
new HashMap<>();
         for (Map.Entry<TopicPartitionReplica, String> entry: replicaAssignment.entrySet())
{
@@ -1908,7 +1903,7 @@ public class KafkaAdminClient extends AdminClient {
             int brokerId = replica.brokerId();
             TopicPartition topicPartition = new TopicPartition(replica.topic(), replica.partition());
             if (!replicaAssignmentByBroker.containsKey(brokerId))
-                replicaAssignmentByBroker.put(brokerId, new HashMap<TopicPartition, String>());
+                replicaAssignmentByBroker.put(brokerId, new HashMap<>());
             replicaAssignmentByBroker.get(brokerId).put(topicPartition, logDir);
         }
 
@@ -1950,7 +1945,7 @@ public class KafkaAdminClient extends AdminClient {
             }, now);
         }
 
-        return new AlterReplicaLogDirsResult(new HashMap<TopicPartitionReplica, KafkaFuture<Void>>(futures));
+        return new AlterReplicaLogDirsResult(new HashMap<>(futures));
     }
 
     @Override
@@ -1958,7 +1953,7 @@ public class KafkaAdminClient extends AdminClient {
         final Map<Integer, KafkaFutureImpl<Map<String, DescribeLogDirsResponse.LogDirInfo>>>
futures = new HashMap<>(brokers.size());
 
         for (Integer brokerId: brokers) {
-            futures.put(brokerId, new KafkaFutureImpl<Map<String, DescribeLogDirsResponse.LogDirInfo>>());
+            futures.put(brokerId, new KafkaFutureImpl<>());
         }
 
         final long now = time.milliseconds();
@@ -1990,7 +1985,7 @@ public class KafkaAdminClient extends AdminClient {
             }, now);
         }
 
-        return new DescribeLogDirsResult(new HashMap<Integer, KafkaFuture<Map<String,
DescribeLogDirsResponse.LogDirInfo>>>(futures));
+        return new DescribeLogDirsResult(new HashMap<>(futures));
     }
 
     @Override
@@ -1998,14 +1993,14 @@ public class KafkaAdminClient extends AdminClient {
         final Map<TopicPartitionReplica, KafkaFutureImpl<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>>
futures = new HashMap<>(replicas.size());
 
         for (TopicPartitionReplica replica : replicas) {
-            futures.put(replica, new KafkaFutureImpl<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>());
+            futures.put(replica, new KafkaFutureImpl<>());
         }
 
         Map<Integer, Set<TopicPartition>> partitionsByBroker = new HashMap<>();
 
         for (TopicPartitionReplica replica: replicas) {
             if (!partitionsByBroker.containsKey(replica.brokerId()))
-                partitionsByBroker.put(replica.brokerId(), new HashSet<TopicPartition>());
+                partitionsByBroker.put(replica.brokerId(), new HashSet<>());
             partitionsByBroker.get(replica.brokerId()).add(new TopicPartition(replica.topic(),
replica.partition()));
         }
 
@@ -2074,7 +2069,7 @@ public class KafkaAdminClient extends AdminClient {
             }, now);
         }
 
-        return new DescribeReplicaLogDirsResult(new HashMap<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>>(futures));
+        return new DescribeReplicaLogDirsResult(new HashMap<>(futures));
     }
 
     @Override
@@ -2082,7 +2077,7 @@ public class KafkaAdminClient extends AdminClient {
                                                    final CreatePartitionsOptions options)
{
         final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>(newPartitions.size());
         for (String topic : newPartitions.keySet()) {
-            futures.put(topic, new KafkaFutureImpl<Void>());
+            futures.put(topic, new KafkaFutureImpl<>());
         }
         final Map<String, NewPartitions> requestMap = new HashMap<>(newPartitions);
 
@@ -2121,7 +2116,7 @@ public class KafkaAdminClient extends AdminClient {
                 completeAllExceptionally(futures.values(), throwable);
             }
         }, now);
-        return new CreatePartitionsResult(new HashMap<String, KafkaFuture<Void>>(futures));
+        return new CreatePartitionsResult(new HashMap<>(futures));
     }
 
     @Override
@@ -2133,7 +2128,7 @@ public class KafkaAdminClient extends AdminClient {
 
         final Map<TopicPartition, KafkaFutureImpl<DeletedRecords>> futures =
new HashMap<>(recordsToDelete.size());
         for (TopicPartition topicPartition: recordsToDelete.keySet()) {
-            futures.put(topicPartition, new KafkaFutureImpl<DeletedRecords>());
+            futures.put(topicPartition, new KafkaFutureImpl<>());
         }
 
         // preparing topics list for asking metadata about them
@@ -2180,7 +2175,7 @@ public class KafkaAdminClient extends AdminClient {
                         Node node = cluster.leaderFor(entry.getKey());
                         if (node != null) {
                             if (!leaders.containsKey(node))
-                                leaders.put(node, new HashMap<TopicPartition, Long>());
+                                leaders.put(node, new HashMap<>());
                             leaders.get(node).put(entry.getKey(), entry.getValue().beforeOffset());
                         } else {
                             KafkaFutureImpl<DeletedRecords> future = futures.get(entry.getKey());
@@ -2231,7 +2226,7 @@ public class KafkaAdminClient extends AdminClient {
             }
         }, nowMetadata);
 
-        return new DeleteRecordsResult(new HashMap<TopicPartition, KafkaFuture<DeletedRecords>>(futures));
+        return new DeleteRecordsResult(new HashMap<>(futures));
     }
 
     @Override
@@ -2373,7 +2368,7 @@ public class KafkaAdminClient extends AdminClient {
                         groupId + "' cannot be represented in a request."));
                 futures.put(groupId, future);
             } else if (!futures.containsKey(groupId)) {
-                futures.put(groupId, new KafkaFutureImpl<ConsumerGroupDescription>());
+                futures.put(groupId, new KafkaFutureImpl<>());
             }
         }
 
@@ -2469,7 +2464,7 @@ public class KafkaAdminClient extends AdminClient {
             }, startFindCoordinatorMs);
         }
 
-        return new DescribeConsumerGroupsResult(new HashMap<String, KafkaFuture<ConsumerGroupDescription>>(futures));
+        return new DescribeConsumerGroupsResult(new HashMap<>(futures));
     }
 
     private boolean handleFindCoordinatorError(FindCoordinatorResponse response, KafkaFutureImpl<?>
future) {
@@ -2518,12 +2513,12 @@ public class KafkaAdminClient extends AdminClient {
 
         private synchronized void tryComplete() {
             if (remaining.isEmpty()) {
-                ArrayList<Object> results = new ArrayList<Object>(listings.values());
+                ArrayList<Object> results = new ArrayList<>(listings.values());
                 results.addAll(errors);
                 future.complete(results);
             }
         }
-    };
+    }
 
     @Override
     public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options)
{
@@ -2687,7 +2682,7 @@ public class KafkaAdminClient extends AdminClient {
                         groupId + "' cannot be represented in a request."));
                 futures.put(groupId, future);
             } else if (!futures.containsKey(groupId)) {
-                futures.put(groupId, new KafkaFutureImpl<Void>());
+                futures.put(groupId, new KafkaFutureImpl<>());
             }
         }
 
@@ -2755,7 +2750,7 @@ public class KafkaAdminClient extends AdminClient {
             }, startFindCoordinatorMs);
         }
 
-        return new DeleteConsumerGroupsResult(new HashMap<String, KafkaFuture<Void>>(futures));
+        return new DeleteConsumerGroupsResult(new HashMap<>(futures));
     }
 
     @Override


Mime
View raw message