kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maniku...@apache.org
Subject [kafka] branch trunk updated: MINOR: Switch anonymous classes to lambda expressions in tools module
Date Fri, 21 Dec 2018 08:51:26 GMT
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 85906d3  MINOR: Switch anonymous classes to lambda expressions in tools module
85906d3 is described below

commit 85906d3d2ba49e593623de1484a9946224ce71cc
Author: Srinivas Reddy <srinivas96alluri@gmail.com>
AuthorDate: Fri Dec 21 14:20:57 2018 +0530

    MINOR: Switch anonymous classes to lambda expressions in tools module
    
    Switch to lambda when ever possible instead of old anonymous way
    in tools module
    
    Author: Srinivas Reddy <srinivas96alluri@gmail.com>
    Author: Srinivas Reddy <mrsrinivas@users.noreply.github.com>
    
    Reviewers: Ryanne Dolan <ryannedolan@gmail.com>, Ismael Juma <ismael@juma.me.uk>,
Manikumar Reddy <manikumar.reddy@gmail.com>
    
    Closes #6013 from mrsrinivas/tools-switch-to-java8
---
 .../kafka/tools/ClientCompatibilityTest.java       | 109 +++++++++------------
 .../java/org/apache/kafka/tools/ToolsUtils.java    |   8 +-
 .../kafka/tools/TransactionalMessageCopier.java    |  19 ++--
 .../org/apache/kafka/tools/VerifiableConsumer.java |   7 +-
 .../kafka/tools/VerifiableLog4jAppender.java       |  11 +--
 .../org/apache/kafka/tools/VerifiableProducer.java |  23 ++---
 .../java/org/apache/kafka/trogdor/agent/Agent.java |  19 ++--
 .../apache/kafka/trogdor/agent/WorkerManager.java  |  25 +++--
 .../kafka/trogdor/coordinator/Coordinator.java     |  19 ++--
 .../apache/kafka/trogdor/rest/JsonRestServer.java  |  27 +++--
 .../trogdor/workload/ConnectionStressSpec.java     |   9 +-
 .../kafka/trogdor/workload/ProduceBenchSpec.java   |   9 +-
 .../kafka/trogdor/workload/RoundTripWorker.java    |  19 ++--
 .../trogdor/workload/RoundTripWorkloadSpec.java    |   9 +-
 .../org/apache/kafka/trogdor/agent/AgentTest.java  |   4 +-
 .../apache/kafka/trogdor/common/ExpectedTasks.java | 107 ++++++++++----------
 .../kafka/trogdor/common/MiniTrogdorCluster.java   |  35 +++----
 .../kafka/trogdor/coordinator/CoordinatorTest.java |  11 +--
 .../kafka/trogdor/task/SampleTaskWorker.java       |  10 +-
 19 files changed, 196 insertions(+), 284 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
index 6182744..5b7e228 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
@@ -261,65 +261,68 @@ public class ClientCompatibilityTest {
                     nodes.size(), testConfig.numClusterNodes);
             }
             tryFeature("createTopics", testConfig.createTopicsSupported,
-                new Invoker() {
-                    @Override
-                    public void invoke() throws Throwable {
-                        try {
-                            client.createTopics(Collections.singleton(
-                                new NewTopic("newtopic", 1, (short) 1))).all().get();
-                        } catch (ExecutionException e) {
-                            throw e.getCause();
-                        }
+                () -> {
+                    try {
+                        client.createTopics(Collections.singleton(
+                            new NewTopic("newtopic", 1, (short) 1))).all().get();
+                    } catch (ExecutionException e) {
+                        throw e.getCause();
                     }
                 },
-                new ResultTester() {
-                    @Override
-                    public void test() throws Throwable {
-                        while (true) {
-                            try {
-                                client.describeTopics(Collections.singleton("newtopic")).all().get();
-                                break;
-                            } catch (ExecutionException e) {
-                                if (e.getCause() instanceof UnknownTopicOrPartitionException)
-                                    continue;
-                                throw e;
-                            }
-                        }
-                    }
-                });
+                () ->  createTopicsResultTest(client, Collections.singleton("newtopic"))
+            );
+
             while (true) {
                 Collection<TopicListing> listings = client.listTopics().listings().get();
                 if (!testConfig.createTopicsSupported)
                     break;
-                boolean foundNewTopic = false;
-                for (TopicListing listing : listings) {
-                    if (listing.name().equals("newtopic")) {
-                        if (listing.isInternal())
-                            throw new KafkaException("Did not expect newtopic to be an internal
topic.");
-                        foundNewTopic = true;
-                    }
-                }
-                if (foundNewTopic)
+
+                if (topicExists(listings, "newtopic"))
                     break;
+
                 Thread.sleep(1);
                 log.info("Did not see newtopic.  Retrying listTopics...");
             }
+
             tryFeature("describeAclsSupported", testConfig.describeAclsSupported,
-                new Invoker() {
-                    @Override
-                    public void invoke() throws Throwable {
-                        try {
-                            client.describeAcls(AclBindingFilter.ANY).values().get();
-                        } catch (ExecutionException e) {
-                            if (e.getCause() instanceof SecurityDisabledException)
-                                return;
-                            throw e.getCause();
-                        }
+                () -> {
+                    try {
+                        client.describeAcls(AclBindingFilter.ANY).values().get();
+                    } catch (ExecutionException e) {
+                        if (e.getCause() instanceof SecurityDisabledException)
+                            return;
+                        throw e.getCause();
                     }
                 });
         }
     }
 
+    private void createTopicsResultTest(AdminClient client, Collection<String> topics)
+            throws InterruptedException, ExecutionException {
+        while (true) {
+            try {
+                client.describeTopics(topics).all().get();
+                break;
+            } catch (ExecutionException e) {
+                if (e.getCause() instanceof UnknownTopicOrPartitionException)
+                    continue;
+                throw e;
+            }
+        }
+    }
+
+    private boolean topicExists(Collection<TopicListing> listings, String topicName)
{
+        boolean foundTopic = false;
+        for (TopicListing listing : listings) {
+            if (listing.name().equals(topicName)) {
+                if (listing.isInternal())
+                    throw new KafkaException(String.format("Did not expect %s to be an internal
topic.", topicName));
+                foundTopic = true;
+            }
+        }
+        return foundTopic;
+    }
+
     private static class OffsetsForTime {
         Map<TopicPartition, OffsetAndTimestamp> result;
 
@@ -384,18 +387,8 @@ public class ClientCompatibilityTest {
             }
             final OffsetsForTime offsetsForTime = new OffsetsForTime();
             tryFeature("offsetsForTimes", testConfig.offsetsForTimesSupported,
-                    new Invoker() {
-                        @Override
-                        public void invoke() {
-                            offsetsForTime.result = consumer.offsetsForTimes(timestampsToSearch);
-                        }
-                    },
-                    new ResultTester() {
-                        @Override
-                        public void test() {
-                            log.info("offsetsForTime = {}", offsetsForTime.result);
-                        }
-                    });
+                () -> offsetsForTime.result = consumer.offsetsForTimes(timestampsToSearch),
+                () -> log.info("offsetsForTime = {}", offsetsForTime.result));
             // Whether or not offsetsForTimes works, beginningOffsets and endOffsets
             // should work.
             consumer.beginningOffsets(timestampsToSearch.keySet());
@@ -486,11 +479,7 @@ public class ClientCompatibilityTest {
     }
 
     private void tryFeature(String featureName, boolean supported, Invoker invoker) throws
Throwable {
-        tryFeature(featureName, supported, invoker, new ResultTester() {
-                @Override
-                public void test() {
-                }
-            });
+        tryFeature(featureName, supported, invoker, () -> { });
     }
 
     private void tryFeature(String featureName, boolean supported, Invoker invoker, ResultTester
resultTester)
diff --git a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
index 0e5d130..3a80b58 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
@@ -19,7 +19,6 @@ package org.apache.kafka.tools;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 
-import java.util.Comparator;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -32,12 +31,7 @@ public class ToolsUtils {
     public static void printMetrics(Map<MetricName, ? extends Metric> metrics) {
         if (metrics != null && !metrics.isEmpty()) {
             int maxLengthOfDisplayName = 0;
-            TreeMap<String, Object> sortedMetrics = new TreeMap<>(new Comparator<String>()
{
-                @Override
-                public int compare(String o1, String o2) {
-                    return o1.compareTo(o2);
-                }
-            });
+            TreeMap<String, Object> sortedMetrics = new TreeMap<>();
             for (Metric metric : metrics.values()) {
                 MetricName mName = metric.metricName();
                 String mergedName = mName.group() + ":" + mName.name() + ":" + mName.tags();
diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
index 27e7c7f..a0ac1f1 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
@@ -263,18 +263,15 @@ public class TransactionalMessageCopier {
         final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
         final AtomicLong remainingMessages = new AtomicLong(maxMessages);
         final AtomicLong numMessagesProcessed = new AtomicLong(0);
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                isShuttingDown.set(true);
-                // Flush any remaining messages
-                producer.close();
-                synchronized (consumer) {
-                    consumer.close();
-                }
-                System.out.println(shutDownString(numMessagesProcessed.get(), remainingMessages.get(),
transactionalId));
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            isShuttingDown.set(true);
+            // Flush any remaining messages
+            producer.close();
+            synchronized (consumer) {
+                consumer.close();
             }
-        });
+            System.out.println(shutDownString(numMessagesProcessed.get(), remainingMessages.get(),
transactionalId));
+        }));
 
         try {
             Random random = new Random();
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 58f3471..1297841 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -620,12 +620,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback,
Cons
 
         try {
             final VerifiableConsumer consumer = createFromArgs(parser, args);
-            Runtime.getRuntime().addShutdownHook(new Thread() {
-                @Override
-                public void run() {
-                    consumer.close();
-                }
-            });
+            Runtime.getRuntime().addShutdownHook(new Thread(() -> consumer.close()));
 
             consumer.run();
         } catch (ArgumentParserException e) {
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
index 9d23bf3..12aa4f4 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
@@ -241,13 +241,10 @@ public class VerifiableLog4jAppender {
         final VerifiableLog4jAppender appender = createFromArgs(args);
         boolean infinite = appender.maxMessages < 0;
 
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                // Trigger main thread to stop producing messages
-                appender.stopLogging = true;
-            }
-        });
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            // Trigger main thread to stop producing messages
+            appender.stopLogging = true;
+        }));
 
         long maxMessages = infinite ? Long.MAX_VALUE : appender.maxMessages;
         for (long i = 0; i < maxMessages; i++) {
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index f0a991f..3e6f3f1 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -517,22 +517,19 @@ public class VerifiableProducer implements AutoCloseable {
             final long startMs = System.currentTimeMillis();
             ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput,
startMs);
 
-            Runtime.getRuntime().addShutdownHook(new Thread() {
-                @Override
-                public void run() {
-                    // Trigger main thread to stop producing messages
-                    producer.stopProducing = true;
+            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+                // Trigger main thread to stop producing messages
+                producer.stopProducing = true;
 
-                    // Flush any remaining messages
-                    producer.close();
+                // Flush any remaining messages
+                producer.close();
 
-                    // Print a summary
-                    long stopMs = System.currentTimeMillis();
-                    double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs
- startMs));
+                // Print a summary
+                long stopMs = System.currentTimeMillis();
+                double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
 
-                    producer.printJson(new ToolData(producer.numSent, producer.numAcked,
producer.throughput, avgThroughput));
-                }
-            });
+                producer.printJson(new ToolData(producer.numSent, producer.numAcked, producer.throughput,
avgThroughput));
+            }));
 
             producer.run(throttler);
         } catch (ArgumentParserException e) {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
index 20d34b7..c76ef26 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
@@ -147,18 +147,15 @@ public final class Agent {
         log.info("Starting agent process.");
         final Agent agent = new Agent(platform, Scheduler.SYSTEM, restServer, resource);
         restServer.start(resource);
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                log.warn("Running agent shutdown hook.");
-                try {
-                    agent.beginShutdown();
-                    agent.waitForShutdown();
-                } catch (Exception e) {
-                    log.error("Got exception while running agent shutdown hook.", e);
-                }
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            log.warn("Running agent shutdown hook.");
+            try {
+                agent.beginShutdown();
+                agent.waitForShutdown();
+            } catch (Exception e) {
+                log.error("Got exception while running agent shutdown hook.", e);
             }
-        });
+        }));
         agent.waitForShutdown();
     }
 };
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
index 59d34c9..ef02716 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
@@ -317,21 +317,18 @@ public final class WorkerManager {
                 return;
             }
             KafkaFutureImpl<String> haltFuture = new KafkaFutureImpl<>();
-            haltFuture.thenApply(new KafkaFuture.BaseFunction<String, Void>() {
-                @Override
-                public Void apply(String errorString) {
-                    if (errorString == null)
-                        errorString = "";
-                    if (errorString.isEmpty()) {
-                        log.info("{}: Worker {} is halting.", nodeName, worker);
-                    } else {
-                        log.info("{}: Worker {} is halting with error {}",
-                            nodeName, worker, errorString);
-                    }
-                    stateChangeExecutor.submit(
-                        new HandleWorkerHalting(worker, errorString, false));
-                    return null;
+            haltFuture.thenApply((KafkaFuture.BaseFunction<String, Void>) errorString
-> {
+                if (errorString == null)
+                    errorString = "";
+                if (errorString.isEmpty()) {
+                    log.info("{}: Worker {} is halting.", nodeName, worker);
+                } else {
+                    log.info("{}: Worker {} is halting with error {}",
+                        nodeName, worker, errorString);
                 }
+                stateChangeExecutor.submit(
+                    new HandleWorkerHalting(worker, errorString, false));
+                return null;
             });
             try {
                 worker.taskWorker.start(platform, worker.status, haltFuture);
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
index cd3da90..a41a6f2 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
@@ -162,18 +162,15 @@ public final class Coordinator {
         final Coordinator coordinator = new Coordinator(platform, Scheduler.SYSTEM,
             restServer, resource, ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE
/ 2));
         restServer.start(resource);
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                log.warn("Running coordinator shutdown hook.");
-                try {
-                    coordinator.beginShutdown(false);
-                    coordinator.waitForShutdown();
-                } catch (Exception e) {
-                    log.error("Got exception while running coordinator shutdown hook.", e);
-                }
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            log.warn("Running coordinator shutdown hook.");
+            try {
+                coordinator.beginShutdown(false);
+                coordinator.waitForShutdown();
+            } catch (Exception e) {
+                log.error("Got exception while running coordinator shutdown hook.", e);
             }
-        });
+        }));
         coordinator.waitForShutdown();
     }
 };
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java
index ee8643b..196ec82 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java
@@ -132,22 +132,19 @@ public class JsonRestServer {
      */
     public void beginShutdown() {
         if (!shutdownExecutor.isShutdown()) {
-            shutdownExecutor.submit(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    try {
-                        log.info("Stopping REST server");
-                        jettyServer.stop();
-                        jettyServer.join();
-                        log.info("REST server stopped");
-                    } catch (Exception e) {
-                        log.error("Unable to stop REST server", e);
-                    } finally {
-                        jettyServer.destroy();
-                    }
-                    shutdownExecutor.shutdown();
-                    return null;
+            shutdownExecutor.submit((Callable<Void>) () -> {
+                try {
+                    log.info("Stopping REST server");
+                    jettyServer.stop();
+                    jettyServer.join();
+                    log.info("REST server stopped");
+                } catch (Exception e) {
+                    log.error("Unable to stop REST server", e);
+                } finally {
+                    jettyServer.destroy();
                 }
+                shutdownExecutor.shutdown();
+                return null;
             });
         }
     }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
index c22396f..6141d30 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
@@ -19,7 +19,6 @@ package org.apache.kafka.trogdor.workload;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.common.Topology;
 import org.apache.kafka.trogdor.task.TaskController;
 import org.apache.kafka.trogdor.task.TaskSpec;
 import org.apache.kafka.trogdor.task.TaskWorker;
@@ -28,7 +27,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeSet;
 
 /**
@@ -98,12 +96,7 @@ public class ConnectionStressSpec extends TaskSpec {
     }
 
     public TaskController newController(String id) {
-        return new TaskController() {
-            @Override
-            public Set<String> targetNodes(Topology topology) {
-                return new TreeSet<>(clientNodes);
-            }
-        };
+        return topology -> new TreeSet<>(clientNodes);
     }
 
     @Override
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
index d15172f..34b5393 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
@@ -19,7 +19,6 @@ package org.apache.kafka.trogdor.workload;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.common.Topology;
 import org.apache.kafka.trogdor.task.TaskController;
 import org.apache.kafka.trogdor.task.TaskSpec;
 import org.apache.kafka.trogdor.task.TaskWorker;
@@ -27,7 +26,6 @@ import org.apache.kafka.trogdor.task.TaskWorker;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 
 /**
  * The specification for a benchmark that produces messages to a set of topics.
@@ -170,12 +168,7 @@ public class ProduceBenchSpec extends TaskSpec {
 
     @Override
     public TaskController newController(String id) {
-        return new TaskController() {
-            @Override
-            public Set<String> targetNodes(Topology topology) {
-                return Collections.singleton(producerNode);
-            }
-        };
+        return topology -> Collections.singleton(producerNode);
     }
 
     @Override
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
index 669fafc..b22292a 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
@@ -25,11 +25,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -248,16 +246,13 @@ public class RoundTripWorker implements TaskWorker {
                     ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(partition.topic(),
                         partition.partition(), KEY_GENERATOR.generate(messageIndex),
                         spec.valueGenerator().generate(messageIndex));
-                    producer.send(record, new Callback() {
-                        @Override
-                        public void onCompletion(RecordMetadata metadata, Exception exception)
{
-                            if (exception == null) {
-                                unackedSends.countDown();
-                            } else {
-                                log.info("{}: Got exception when sending message {}: {}",
-                                    id, messageIndex, exception.getMessage());
-                                toSendTracker.addFailed(messageIndex);
-                            }
+                    producer.send(record, (metadata, exception) -> {
+                        if (exception == null) {
+                            unackedSends.countDown();
+                        } else {
+                            log.info("{}: Got exception when sending message {}: {}",
+                                id, messageIndex, exception.getMessage());
+                            toSendTracker.addFailed(messageIndex);
                         }
                     });
                 }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
index 9522e0a..42e09ee 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
@@ -19,14 +19,12 @@ package org.apache.kafka.trogdor.workload;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.common.Topology;
 import org.apache.kafka.trogdor.task.TaskController;
 import org.apache.kafka.trogdor.task.TaskSpec;
 import org.apache.kafka.trogdor.task.TaskWorker;
 
 import java.util.Collections;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * The specification for a workload that sends messages to a broker and then
@@ -124,12 +122,7 @@ public class RoundTripWorkloadSpec extends TaskSpec {
 
     @Override
     public TaskController newController(String id) {
-        return new TaskController() {
-            @Override
-            public Set<String> targetNodes(Topology topology) {
-                return Collections.singleton(clientNode);
-            }
-        };
+        return topology -> Collections.singleton(clientNode);
     }
 
     @Override
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
index 158e690..f0ea475 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
@@ -304,7 +304,7 @@ public class AgentTest {
         try (MockKibosh mockKibosh = new MockKibosh()) {
             Assert.assertEquals(KiboshControlFile.EMPTY, mockKibosh.read());
             FilesUnreadableFaultSpec fooSpec = new FilesUnreadableFaultSpec(0, 900000,
-                Collections.singleton("myAgent"), mockKibosh.tempDir.getPath().toString(),
"/foo", 123);
+                Collections.singleton("myAgent"), mockKibosh.tempDir.getPath(), "/foo", 123);
             client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
@@ -314,7 +314,7 @@ public class AgentTest {
             Assert.assertEquals(new KiboshControlFile(Collections.<Kibosh.KiboshFaultSpec>singletonList(
                 new KiboshFilesUnreadableFaultSpec("/foo", 123))), mockKibosh.read());
             FilesUnreadableFaultSpec barSpec = new FilesUnreadableFaultSpec(0, 900000,
-                Collections.singleton("myAgent"), mockKibosh.tempDir.getPath().toString(),
"/bar", 456);
+                Collections.singleton("myAgent"), mockKibosh.tempDir.getPath(), "/bar", 456);
             client.createWorker(new CreateWorkerRequest(1, "bar", barSpec));
             new ExpectedTasks().
                 addTask(new ExpectedTaskBuilder("foo").
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
index c092c92..3eb781c 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
@@ -19,7 +19,6 @@ package org.apache.kafka.trogdor.common;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.trogdor.agent.AgentClient;
 import org.apache.kafka.trogdor.coordinator.CoordinatorClient;
@@ -142,71 +141,65 @@ public class ExpectedTasks {
     }
 
     public ExpectedTasks waitFor(final CoordinatorClient client) throws InterruptedException
{
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                TasksResponse tasks = null;
-                try {
-                    tasks = client.tasks(new TasksRequest(null, 0, 0, 0, 0, Optional.empty()));
-                } catch (Exception e) {
-                    log.info("Unable to get coordinator tasks", e);
-                    throw new RuntimeException(e);
-                }
-                StringBuilder errors = new StringBuilder();
-                for (Map.Entry<String, ExpectedTask> entry : expected.entrySet()) {
-                    String id = entry.getKey();
-                    ExpectedTask task = entry.getValue();
-                    String differences = task.compare(tasks.tasks().get(id));
-                    if (differences != null) {
-                        errors.append(differences);
-                    }
-                }
-                String errorString = errors.toString();
-                if (!errorString.isEmpty()) {
-                    log.info("EXPECTED TASKS: {}", JsonUtil.toJsonString(expected));
-                    log.info("ACTUAL TASKS  : {}", JsonUtil.toJsonString(tasks.tasks()));
-                    log.info(errorString);
-                    return false;
+        TestUtils.waitForCondition(() -> {
+            TasksResponse tasks = null;
+            try {
+                tasks = client.tasks(new TasksRequest(null, 0, 0, 0, 0, Optional.empty()));
+            } catch (Exception e) {
+                log.info("Unable to get coordinator tasks", e);
+                throw new RuntimeException(e);
+            }
+            StringBuilder errors = new StringBuilder();
+            for (Map.Entry<String, ExpectedTask> entry : expected.entrySet()) {
+                String id = entry.getKey();
+                ExpectedTask task = entry.getValue();
+                String differences = task.compare(tasks.tasks().get(id));
+                if (differences != null) {
+                    errors.append(differences);
                 }
-                return true;
             }
+            String errorString = errors.toString();
+            if (!errorString.isEmpty()) {
+                log.info("EXPECTED TASKS: {}", JsonUtil.toJsonString(expected));
+                log.info("ACTUAL TASKS  : {}", JsonUtil.toJsonString(tasks.tasks()));
+                log.info(errorString);
+                return false;
+            }
+            return true;
         }, "Timed out waiting for expected tasks " + JsonUtil.toJsonString(expected));
         return this;
     }
 
     public ExpectedTasks waitFor(final AgentClient client) throws InterruptedException {
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                AgentStatusResponse status = null;
-                try {
-                    status = client.status();
-                } catch (Exception e) {
-                    log.info("Unable to get agent status", e);
-                    throw new RuntimeException(e);
-                }
-                StringBuilder errors = new StringBuilder();
-                HashMap<String, WorkerState> taskIdToWorkerState = new HashMap<>();
-                for (WorkerState state : status.workers().values()) {
-                    taskIdToWorkerState.put(state.taskId(), state);
-                }
-                for (Map.Entry<String, ExpectedTask> entry : expected.entrySet()) {
-                    String id = entry.getKey();
-                    ExpectedTask worker = entry.getValue();
-                    String differences = worker.compare(taskIdToWorkerState.get(id));
-                    if (differences != null) {
-                        errors.append(differences);
-                    }
-                }
-                String errorString = errors.toString();
-                if (!errorString.isEmpty()) {
-                    log.info("EXPECTED WORKERS: {}", JsonUtil.toJsonString(expected));
-                    log.info("ACTUAL WORKERS  : {}", JsonUtil.toJsonString(status.workers()));
-                    log.info(errorString);
-                    return false;
+        TestUtils.waitForCondition(() -> {
+            AgentStatusResponse status = null;
+            try {
+                status = client.status();
+            } catch (Exception e) {
+                log.info("Unable to get agent status", e);
+                throw new RuntimeException(e);
+            }
+            StringBuilder errors = new StringBuilder();
+            HashMap<String, WorkerState> taskIdToWorkerState = new HashMap<>();
+            for (WorkerState state : status.workers().values()) {
+                taskIdToWorkerState.put(state.taskId(), state);
+            }
+            for (Map.Entry<String, ExpectedTask> entry : expected.entrySet()) {
+                String id = entry.getKey();
+                ExpectedTask worker = entry.getValue();
+                String differences = worker.compare(taskIdToWorkerState.get(id));
+                if (differences != null) {
+                    errors.append(differences);
                 }
-                return true;
             }
+            String errorString = errors.toString();
+            if (!errorString.isEmpty()) {
+                log.info("EXPECTED WORKERS: {}", JsonUtil.toJsonString(expected));
+                log.info("ACTUAL WORKERS  : {}", JsonUtil.toJsonString(status.workers()));
+                log.info(errorString);
+                return false;
+            }
+            return true;
         }, "Timed out waiting for expected workers " + JsonUtil.toJsonString(expected));
         return this;
     }
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
index 46315c2..9edffaa 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
@@ -172,27 +172,24 @@ public class MiniTrogdorCluster implements AutoCloseable {
                 ThreadUtils.createThreadFactory("MiniTrogdorClusterStartupThread%d", false));
             final AtomicReference<Exception> failure = new AtomicReference<Exception>(null);
             for (final Map.Entry<String, NodeData> entry : nodes.entrySet()) {
-                executor.submit(new Callable<Void>() {
-                    @Override
-                    public Void call() throws Exception {
-                        String nodeName = entry.getKey();
-                        try {
-                            NodeData node = entry.getValue();
-                            node.platform = new BasicPlatform(nodeName, topology, scheduler,
commandRunner);
-                            if (node.agentRestResource != null) {
-                                node.agent = new Agent(node.platform, scheduler, node.agentRestServer,
-                                    node.agentRestResource);
-                            }
-                            if (node.coordinatorRestResource != null) {
-                                node.coordinator = new Coordinator(node.platform, scheduler,
-                                    node.coordinatorRestServer, node.coordinatorRestResource,
0);
-                            }
-                        } catch (Exception e) {
-                            log.error("Unable to initialize {}", nodeName, e);
-                            failure.compareAndSet(null, e);
+                executor.submit((Callable<Void>) () -> {
+                    String nodeName = entry.getKey();
+                    try {
+                        NodeData node = entry.getValue();
+                        node.platform = new BasicPlatform(nodeName, topology, scheduler,
commandRunner);
+                        if (node.agentRestResource != null) {
+                            node.agent = new Agent(node.platform, scheduler, node.agentRestServer,
+                                node.agentRestResource);
                         }
-                        return null;
+                        if (node.coordinatorRestResource != null) {
+                            node.coordinator = new Coordinator(node.platform, scheduler,
+                                node.coordinatorRestServer, node.coordinatorRestResource,
0);
+                        }
+                    } catch (Exception e) {
+                        log.error("Unable to initialize {}", nodeName, e);
+                        failure.compareAndSet(null, e);
                     }
+                    return null;
                 });
             }
             executor.shutdown();
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
index 0207104..db1afac 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.utils.MockScheduler;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Scheduler;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.trogdor.agent.AgentClient;
 import org.apache.kafka.trogdor.common.CapturingCommandRunner;
@@ -318,12 +317,8 @@ public class CoordinatorTest {
 
         public ExpectedLines waitFor(final String nodeName,
                 final CapturingCommandRunner runner) throws InterruptedException {
-            TestUtils.waitForCondition(new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    return linesMatch(nodeName, runner.lines(nodeName));
-                }
-            }, "failed to find the expected lines " + this.toString());
+            TestUtils.waitForCondition(() -> linesMatch(nodeName, runner.lines(nodeName)),
+                "failed to find the expected lines " + this.toString());
             return this;
         }
 
@@ -473,7 +468,7 @@ public class CoordinatorTest {
             assertEquals(0, coordinatorClient.tasks(
                 new TasksRequest(null, 10, 0, 10, 0, Optional.empty())).tasks().size());
             TasksResponse resp1 = coordinatorClient.tasks(
-                new TasksRequest(Arrays.asList(new String[] {"foo", "baz" }), 0, 0, 0, 0,
Optional.empty()));
+                new TasksRequest(Arrays.asList("foo", "baz"), 0, 0, 0, 0, Optional.empty()));
             assertTrue(resp1.tasks().containsKey("foo"));
             assertFalse(resp1.tasks().containsKey("bar"));
             assertEquals(1, resp1.tasks().size());
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java
index ade055d..404817a 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.common.ThreadUtils;
 
-import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
@@ -53,12 +52,9 @@ public class SampleTaskWorker implements TaskWorker {
         if (exitMs == null) {
             exitMs = Long.MAX_VALUE;
         }
-        this.future = platform.scheduler().schedule(executor, new Callable<Void>()
{
-            @Override
-            public Void call() throws Exception {
-                haltFuture.complete(spec.error());
-                return null;
-            }
+        this.future = platform.scheduler().schedule(executor, () -> {
+            haltFuture.complete(spec.error());
+            return null;
         }, exitMs);
     }
 


Mime
View raw message