kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [4/4] kafka git commit: MINOR: A bunch of clean-ups related to usage of unused variables
Date Tue, 25 Oct 2016 02:17:47 GMT
MINOR: A bunch of clean-ups related to usage of unused variables

There should be only one cases where these clean-ups have a functional impact: replaced repeated identical logs with a single log for the stale controller epoch case.

The rest should just make the code easier to read and make it a bit less wasteful. I did this exercise because unused variables sometimes mask bugs.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #1985 from ijuma/remove-unused


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d0926738
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d0926738
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d0926738

Branch: refs/heads/trunk
Commit: d092673838173d9dedbf5acf3f4e2cd8c736294f
Parents: 1fc450f
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Tue Oct 25 02:55:55 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Oct 25 02:55:55 2016 +0100

----------------------------------------------------------------------
 .../consumer/internals/AbstractCoordinator.java |  2 -
 .../consumer/internals/ConsumerCoordinator.java |  2 -
 .../consumer/internals/SubscriptionState.java   |  7 --
 .../kafka/clients/producer/KafkaProducer.java   |  1 -
 .../clients/producer/internals/Sender.java      | 10 +--
 .../apache/kafka/common/network/MultiSend.java  |  6 +-
 .../common/network/PlaintextChannelBuilder.java |  4 +-
 .../kafka/common/network/SslChannelBuilder.java |  4 +-
 .../kafka/common/network/SslTransportLayer.java |  5 +-
 .../authenticator/SaslServerAuthenticator.java  |  5 +-
 .../apache/kafka/clients/NetworkClientTest.java |  3 +-
 .../producer/internals/BufferPoolTest.java      |  2 +-
 .../clients/producer/internals/SenderTest.java  |  3 -
 .../apache/kafka/connect/data/Timestamp.java    |  4 --
 .../runtime/distributed/WorkerCoordinator.java  |  5 +-
 .../runtime/rest/entities/ConnectorInfo.java    |  9 ---
 .../src/main/scala/kafka/admin/AclCommand.scala |  1 -
 .../src/main/scala/kafka/admin/AdminUtils.scala |  6 +-
 .../main/scala/kafka/admin/ConfigCommand.scala  | 10 +--
 .../kafka/admin/ConsumerGroupCommand.scala      |  2 +-
 .../PreferredReplicaLeaderElectionCommand.scala |  2 +-
 .../kafka/admin/ReassignPartitionsCommand.scala |  4 +-
 .../main/scala/kafka/admin/TopicCommand.scala   |  4 +-
 .../scala/kafka/admin/ZkSecurityMigrator.scala  |  1 -
 .../kafka/api/ControlledShutdownResponse.scala  |  2 +-
 .../src/main/scala/kafka/api/FetchRequest.scala |  7 +-
 .../src/main/scala/kafka/api/LeaderAndIsr.scala |  4 +-
 .../scala/kafka/api/OffsetCommitResponse.scala  |  2 +-
 .../main/scala/kafka/api/OffsetRequest.scala    |  5 +-
 .../main/scala/kafka/api/ProducerRequest.scala  |  5 +-
 .../main/scala/kafka/client/ClientUtils.scala   |  6 +-
 .../scala/kafka/cluster/BrokerEndPoint.scala    |  2 +-
 .../main/scala/kafka/cluster/Partition.scala    |  3 +-
 core/src/main/scala/kafka/cluster/Replica.scala |  7 +-
 .../ZkNodeChangeNotificationListener.scala      |  2 +-
 .../kafka/consumer/PartitionAssignor.scala      |  4 +-
 .../main/scala/kafka/consumer/TopicCount.scala  |  4 +-
 .../main/scala/kafka/consumer/TopicFilter.scala |  2 +-
 .../consumer/ZookeeperConsumerConnector.scala   | 24 +++----
 .../kafka/controller/KafkaController.scala      | 74 +++++++++-----------
 .../controller/PartitionStateMachine.scala      | 12 ++--
 .../kafka/controller/ReplicaStateMachine.scala  |  2 +-
 .../kafka/controller/TopicDeletionManager.scala |  4 +-
 .../coordinator/GroupMetadataManager.scala      |  2 +-
 core/src/main/scala/kafka/log/Log.scala         |  4 +-
 core/src/main/scala/kafka/log/LogCleaner.scala  |  2 +-
 .../scala/kafka/log/LogCleanerManager.scala     | 10 ++-
 .../kafka/message/ByteBufferMessageSet.scala    |  2 +-
 .../scala/kafka/metrics/KafkaMetricsGroup.scala | 19 ++---
 .../scala/kafka/network/BlockingChannel.scala   |  2 +-
 .../kafka/producer/DefaultPartitioner.scala     |  2 -
 .../main/scala/kafka/producer/Producer.scala    |  2 +-
 .../scala/kafka/security/auth/Resource.scala    |  2 +-
 .../security/auth/SimpleAclAuthorizer.scala     |  4 +-
 .../kafka/server/AbstractFetcherManager.scala   |  2 +-
 .../main/scala/kafka/server/AdminManager.scala  |  2 +-
 .../kafka/server/BrokerMetadataCheckpoint.scala |  2 +-
 .../scala/kafka/server/ClientQuotaManager.scala |  8 +--
 .../main/scala/kafka/server/DelayedFetch.scala  |  4 +-
 .../kafka/server/DynamicConfigManager.scala     |  3 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  2 +-
 .../scala/kafka/server/KafkaHealthcheck.scala   |  1 -
 .../scala/kafka/server/OffsetCheckpoint.scala   |  2 +-
 .../scala/kafka/server/ReplicaManager.scala     | 21 +++---
 .../kafka/server/ZookeeperLeaderElector.scala   |  2 +-
 .../scala/kafka/tools/ConsoleConsumer.scala     |  6 +-
 .../scala/kafka/tools/ConsumerPerformance.scala |  2 +-
 .../scala/kafka/tools/DumpLogSegments.scala     | 10 ++-
 .../scala/kafka/tools/EndToEndLatency.scala     |  3 +-
 .../main/scala/kafka/tools/MirrorMaker.scala    | 10 +--
 .../scala/kafka/tools/ReplayLogProducer.scala   |  5 +-
 .../kafka/tools/ReplicaVerificationTool.scala   | 21 +++---
 core/src/main/scala/kafka/utils/CoreUtils.scala |  4 +-
 core/src/main/scala/kafka/utils/FileLock.scala  |  4 +-
 .../src/main/scala/kafka/utils/Mx4jLoader.scala |  6 +-
 .../scala/kafka/utils/ReplicationUtils.scala    |  4 +-
 .../kafka/utils/VerifiableProperties.scala      |  4 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 30 ++++----
 .../integration/kafka/api/AdminClientTest.scala |  1 -
 .../kafka/api/AuthorizerIntegrationTest.scala   | 15 ++--
 .../kafka/api/BaseConsumerTest.scala            |  2 +-
 .../kafka/api/BaseProducerSendTest.scala        | 21 +++---
 .../kafka/api/EndToEndAuthorizationTest.scala   |  2 +-
 .../kafka/api/FixedPortTestUtils.scala          |  2 +-
 .../kafka/api/IntegrationTestHarness.scala      |  4 +-
 .../kafka/api/PlaintextConsumerTest.scala       | 13 ++--
 .../kafka/api/PlaintextProducerSendTest.scala   |  4 +-
 .../kafka/api/ProducerBounceTest.scala          |  4 +-
 .../scala/kafka/security/minikdc/MiniKdc.scala  |  5 +-
 .../scala/kafka/tools/TestLogCleaning.scala     |  2 +-
 .../test/scala/other/kafka/StressTestLog.scala  |  3 +-
 .../scala/other/kafka/TestCrcPerformance.scala  | 14 ++--
 .../scala/other/kafka/TestKafkaAppender.scala   |  9 +--
 .../other/kafka/TestLinearWriteSpeed.scala      |  3 +-
 .../scala/other/kafka/TestOffsetManager.scala   |  6 +-
 .../other/kafka/TestPurgatoryPerformance.scala  |  2 +-
 .../unit/kafka/admin/AddPartitionsTest.scala    |  6 +-
 .../unit/kafka/admin/ConfigCommandTest.scala    |  4 +-
 .../unit/kafka/admin/DeleteTopicTest.scala      |  7 +-
 .../kafka/admin/DescribeConsumerGroupTest.scala |  4 +-
 .../admin/ReassignPartitionsCommandTest.scala   |  2 +-
 .../scala/unit/kafka/api/ApiUtilsTest.scala     | 12 ++--
 .../api/RequestResponseSerializationTest.scala  | 36 ----------
 .../scala/unit/kafka/common/ConfigTest.scala    |  8 +--
 .../scala/unit/kafka/common/TopicTest.scala     |  6 +-
 .../kafka/consumer/ConsumerIteratorTest.scala   | 13 ++--
 .../kafka/consumer/PartitionAssignorTest.scala  | 26 +++----
 .../ZookeeperConsumerConnectorTest.scala        | 19 +++--
 .../controller/ControllerFailoverTest.scala     |  4 +-
 .../coordinator/GroupMetadataManagerTest.scala  |  4 +-
 .../kafka/coordinator/GroupMetadataTest.scala   |  2 +-
 .../kafka/integration/AutoOffsetResetTest.scala |  6 +-
 .../unit/kafka/integration/FetcherTest.scala    |  9 +--
 .../kafka/integration/PrimitiveApiTest.scala    |  6 +-
 .../ZookeeperConsumerConnectorTest.scala        |  4 +-
 .../unit/kafka/log/FileMessageSetTest.scala     |  9 ++-
 .../log/LogCleanerLagIntegrationTest.scala      |  6 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala  |  9 ---
 .../scala/unit/kafka/log/LogCleanerTest.scala   |  5 +-
 .../scala/unit/kafka/log/LogConfigTest.scala    |  6 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   | 24 +++----
 .../scala/unit/kafka/log/LogSegmentTest.scala   |  6 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 62 ++++++++--------
 .../scala/unit/kafka/log/OffsetIndexTest.scala  |  2 +-
 .../scala/unit/kafka/log/OffsetMapTest.scala    |  3 +-
 .../message/ByteBufferMessageSetTest.scala      |  4 +-
 .../kafka/message/MessageCompressionTest.scala  |  5 +-
 .../unit/kafka/network/SocketServerTest.scala   |  6 +-
 .../unit/kafka/producer/AsyncProducerTest.scala | 20 +++---
 .../unit/kafka/producer/ProducerTest.scala      | 66 ++++++++---------
 .../unit/kafka/producer/SyncProducerTest.scala  |  6 +-
 .../kafka/security/auth/OperationTest.scala     |  2 +-
 .../security/auth/PermissionTypeTest.scala      |  2 +-
 .../kafka/security/auth/ResourceTypeTest.scala  |  2 +-
 .../security/auth/SimpleAclAuthorizerTest.scala |  2 +-
 .../security/auth/ZkAuthorizationTest.scala     |  8 +--
 .../kafka/server/ClientQuotaManagerTest.scala   |  4 +-
 .../kafka/server/DynamicConfigChangeTest.scala  |  8 +--
 .../unit/kafka/server/ISRExpirationTest.scala   | 25 +++----
 .../unit/kafka/server/KafkaConfigTest.scala     | 13 ++--
 .../scala/unit/kafka/server/LogOffsetTest.scala |  8 +--
 .../unit/kafka/server/MetadataCacheTest.scala   |  2 +-
 .../unit/kafka/server/OffsetCommitTest.scala    |  1 -
 .../kafka/server/ReplicationQuotasTest.scala    |  4 +-
 .../server/SaslApiVersionsRequestTest.scala     |  2 +-
 .../server/ServerGenerateBrokerIdTest.scala     |  2 +-
 .../unit/kafka/server/ServerShutdownTest.scala  |  8 +--
 .../unit/kafka/server/ServerStartupTest.scala   |  2 +-
 .../unit/kafka/tools/ConsoleProducerTest.scala  |  2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  8 +--
 .../kafka/utils/timer/TimerTaskListTest.scala   |  4 +-
 .../scala/unit/kafka/zk/ZKEphemeralTest.scala   |  7 +-
 .../test/scala/unit/kafka/zk/ZKPathTest.scala   | 50 ++++---------
 .../scala/unit/kafka/zk/ZkFourLetterWords.scala |  2 +-
 .../internals/RocksDBKeyValueStoreTest.java     |  2 +-
 155 files changed, 477 insertions(+), 704 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 73543ad..59319ef 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -749,7 +749,6 @@ public abstract class AbstractCoordinator implements Closeable {
     }
 
     private class GroupCoordinatorMetrics {
-        public final Metrics metrics;
         public final String metricGrpName;
 
         public final Sensor heartbeatLatency;
@@ -757,7 +756,6 @@ public abstract class AbstractCoordinator implements Closeable {
         public final Sensor syncLatency;
 
         public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
-            this.metrics = metrics;
             this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
 
             this.heartbeatLatency = metrics.sensor("heartbeat-latency");

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index bd95409..a8d94fa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -757,13 +757,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     }
 
     private class ConsumerCoordinatorMetrics {
-        public final Metrics metrics;
         public final String metricGrpName;
 
         public final Sensor commitLatency;
 
         public ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
-            this.metrics = metrics;
             this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
 
             this.commitLatency = metrics.sensor("commit-latency");

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 6dc2060..003d1a1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -65,9 +65,6 @@ public class SubscriptionState {
     /* the list of topics the user has requested */
     private Set<String> subscription;
 
-    /* the list of partitions the user has requested */
-    private Set<TopicPartition> userAssignment;
-
     /* the list of topics the group has subscribed to (set only for the leader on join group completion) */
     private final Set<String> groupSubscription;
 
@@ -86,7 +83,6 @@ public class SubscriptionState {
     public SubscriptionState(OffsetResetStrategy defaultResetStrategy) {
         this.defaultResetStrategy = defaultResetStrategy;
         this.subscription = Collections.emptySet();
-        this.userAssignment = Collections.emptySet();
         this.assignment = new PartitionStates<>();
         this.groupSubscription = new HashSet<>();
         this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
@@ -160,8 +156,6 @@ public class SubscriptionState {
         setSubscriptionType(SubscriptionType.USER_ASSIGNED);
 
         if (!this.assignment.partitionSet().equals(partitions)) {
-            this.userAssignment = partitions;
-
             Map<TopicPartition, TopicPartitionState> partitionToState = new HashMap<>();
             for (TopicPartition partition : partitions) {
                 TopicPartitionState state = assignment.stateValue(partition);
@@ -218,7 +212,6 @@ public class SubscriptionState {
 
     public void unsubscribe() {
         this.subscription = Collections.emptySet();
-        this.userAssignment = Collections.emptySet();
         this.assignment.clear();
         this.subscribedPattern = null;
         this.subscriptionType = SubscriptionType.NONE;

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3632384..489c762 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -318,7 +318,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     config.getInt(ProducerConfig.RETRIES_CONFIG),
                     this.metrics,
                     new SystemTime(),
-                    clientId,
                     this.requestTimeoutMs);
             String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
             this.ioThread = new KafkaThread(ioThreadName, this.sender, true);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 8fc7f2c..c71bb67 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -92,9 +92,6 @@ public class Sender implements Runnable {
     /* metrics */
     private final SenderMetrics sensors;
 
-    /* param clientId of the client */
-    private String clientId;
-
     /* the max time to wait for the server to respond to the request*/
     private final int requestTimeout;
 
@@ -107,7 +104,6 @@ public class Sender implements Runnable {
                   int retries,
                   Metrics metrics,
                   Time time,
-                  String clientId,
                   int requestTimeout) {
         this.client = client;
         this.accumulator = accumulator;
@@ -118,7 +114,6 @@ public class Sender implements Runnable {
         this.acks = acks;
         this.retries = retries;
         this.time = time;
-        this.clientId = clientId;
         this.sensors = new SenderMetrics(metrics);
         this.requestTimeout = requestTimeout;
     }
@@ -281,8 +276,7 @@ public class Sender implements Runnable {
                     completeBatch(batch, error, partResp.baseOffset, partResp.timestamp, correlationId, now);
                 }
                 this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs());
-                this.sensors.recordThrottleTime(response.request().request().destination(),
-                                                produceResponse.getThrottleTime());
+                this.sensors.recordThrottleTime(produceResponse.getThrottleTime());
             } else {
                 // this is the acks = 0 case, just complete all requests
                 for (RecordBatch batch : batches.values())
@@ -564,7 +558,7 @@ public class Sender implements Runnable {
             }
         }
 
-        public void recordThrottleTime(String node, long throttleTimeMs) {
+        public void recordThrottleTime(long throttleTimeMs) {
             this.produceThrottleTimeSensor.record(throttleTimeMs, time.milliseconds());
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java b/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
index 0e14a39..11f5e07 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
@@ -34,7 +34,6 @@ public class MultiSend implements Send {
     private static final Logger log = LoggerFactory.getLogger(MultiSend.class);
     private String dest;
     private long totalWritten = 0;
-    private List<Send> sends;
     private Iterator<Send> sendsIterator;
     private Send current;
     private boolean doneSends = false;
@@ -42,7 +41,6 @@ public class MultiSend implements Send {
 
     public MultiSend(String dest, List<Send> sends) {
         this.dest = dest;
-        this.sends = sends;
         this.sendsIterator = sends.iterator();
         nextSendOrDone();
         for (Send send: sends)
@@ -76,7 +74,7 @@ public class MultiSend implements Send {
             throw new KafkaException("This operation cannot be completed on a complete request.");
 
         int totalWrittenPerCall = 0;
-        boolean sendComplete = false;
+        boolean sendComplete;
         do {
             long written = current.writeTo(channel);
             totalWritten += written;
@@ -97,4 +95,4 @@ public class MultiSend implements Send {
         else
             doneSends = true;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
index f0af935..c573672 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
@@ -36,17 +36,15 @@ public class PlaintextChannelBuilder implements ChannelBuilder {
     }
 
     public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
-        KafkaChannel channel = null;
         try {
             PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key);
             Authenticator authenticator = new DefaultAuthenticator();
             authenticator.configure(transportLayer, this.principalBuilder, this.configs);
-            channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+            return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
         } catch (Exception e) {
             log.warn("Failed to create channel due to ", e);
             throw new KafkaException(e);
         }
-        return channel;
     }
 
     public void close() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index b546174..1d612bd 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -46,17 +46,15 @@ public class SslChannelBuilder implements ChannelBuilder {
     }
 
     public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
-        KafkaChannel channel = null;
         try {
             SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key);
             Authenticator authenticator = new DefaultAuthenticator();
             authenticator.configure(transportLayer, this.principalBuilder, this.configs);
-            channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+            return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
         } catch (Exception e) {
             log.info("Failed to create channel due to ", e);
             throw new KafkaException(e);
         }
-        return channel;
     }
 
     public void close()  {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index f8926f3..7ce59f3 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -397,12 +397,11 @@ public class SslTransportLayer implements TransportLayer {
     private SSLEngineResult handshakeUnwrap(boolean doRead) throws IOException {
         log.trace("SSLHandshake handshakeUnwrap {}", channelId);
         SSLEngineResult result;
-        boolean cont = false;
-        int read = 0;
         if (doRead)  {
-            read = socketChannel.read(netReadBuffer);
+            int read = socketChannel.read(netReadBuffer);
             if (read == -1) throw new EOFException("EOF during handshake.");
         }
+        boolean cont;
         do {
             //prepare the buffer with the incoming data
             netReadBuffer.flip();

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index e1074a1..206fe39 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -63,7 +63,6 @@ import org.apache.kafka.common.protocol.Protocol;
 import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractRequestResponse;
-import org.apache.kafka.common.requests.ApiVersionsRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.ResponseHeader;
@@ -310,7 +309,7 @@ public class SaslServerAuthenticator implements Authenticator {
                 LOG.debug("Handle Kafka request {}", apiKey);
                 switch (apiKey) {
                     case API_VERSIONS:
-                        handleApiVersionsRequest(requestHeader, (ApiVersionsRequest) request);
+                        handleApiVersionsRequest(requestHeader);
                         break;
                     case SASL_HANDSHAKE:
                         clientMechanism = handleHandshakeRequest(requestHeader, (SaslHandshakeRequest) request);
@@ -361,7 +360,7 @@ public class SaslServerAuthenticator implements Authenticator {
         }
     }
 
-    private void handleApiVersionsRequest(RequestHeader requestHeader, ApiVersionsRequest versionRequest) throws IOException, UnsupportedSaslMechanismException {
+    private void handleApiVersionsRequest(RequestHeader requestHeader) throws IOException, UnsupportedSaslMechanismException {
         sendKafkaResponse(requestHeader, ApiVersionsResponse.apiVersionsResponse());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index b556240..d305e8e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -153,14 +153,13 @@ public class NetworkClientTest {
 
     @Test
     public void testLeastLoadedNode() {
-        Node leastNode = null;
         client.ready(node, time.milliseconds());
         awaitReady(client, node);
         client.poll(1, time.milliseconds());
         assertTrue("The client should be ready", client.isReady(node, time.milliseconds()));
         
         // leastloadednode should be our single node
-        leastNode = client.leastLoadedNode(time.milliseconds());
+        Node leastNode = client.leastLoadedNode(time.milliseconds());
         assertEquals("There should be one leastloadednode", leastNode.id(), node.id());
         
         // sleep for longer than reconnect backoff

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
index 48682b1..3756d8a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
@@ -88,7 +88,7 @@ public class BufferPoolTest {
         ByteBuffer buffer = pool.allocate(1024, maxBlockTimeMs);
         assertEquals(1024, buffer.limit());
         pool.deallocate(buffer);
-        buffer = pool.allocate(1025, maxBlockTimeMs);
+        pool.allocate(1025, maxBlockTimeMs);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index b7f9e74..b7645dd 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -83,7 +83,6 @@ public class SenderTest {
                             MAX_RETRIES,
                             metrics,
                             time,
-                            CLIENT_ID,
                             REQUEST_TIMEOUT);
 
         metadata.update(cluster, time.milliseconds());
@@ -143,7 +142,6 @@ public class SenderTest {
                                        maxRetries,
                                        m,
                                        time,
-                                       "clientId",
                                        REQUEST_TIMEOUT);
             // do a successful retry
             Future<RecordMetadata> future = accumulator.append(tp, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
@@ -196,7 +194,6 @@ public class SenderTest {
                 maxRetries,
                 m,
                 time,
-                "clientId",
                 REQUEST_TIMEOUT);
 
             // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java
index c447f6d..cd7ed4a 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java
@@ -19,8 +19,6 @@ package org.apache.kafka.connect.data;
 
 import org.apache.kafka.connect.errors.DataException;
 
-import java.util.TimeZone;
-
 /**
  * <p>
  *     A timestamp representing an absolute time, without timezone information. The corresponding Java type is a
@@ -30,8 +28,6 @@ import java.util.TimeZone;
 public class Timestamp {
     public static final String LOGICAL_NAME = "org.apache.kafka.connect.data.Timestamp";
 
-    private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
-
     /**
      * Returns a SchemaBuilder for a Timestamp. By returning a SchemaBuilder you can override additional schema settings such
      * as required/optional, default value, and documentation.

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 8a065f1..88a0a8d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -52,7 +52,6 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
     private final String restUrl;
     private final ConfigBackingStore configStorage;
     private ConnectProtocol.Assignment assignmentSnapshot;
-    private final WorkerCoordinatorMetrics sensors;
     private ClusterConfigState configSnapshot;
     private final WorkerRebalanceListener listener;
     private LeaderState leaderState;
@@ -86,7 +85,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
         this.restUrl = restUrl;
         this.configStorage = configStorage;
         this.assignmentSnapshot = null;
-        this.sensors = new WorkerCoordinatorMetrics(metrics, metricGrpPrefix);
+        new WorkerCoordinatorMetrics(metrics, metricGrpPrefix);
         this.listener = listener;
         this.rejoinRequested = false;
     }
@@ -306,11 +305,9 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
     }
 
     private class WorkerCoordinatorMetrics {
-        public final Metrics metrics;
         public final String metricGrpName;
 
         public WorkerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
-            this.metrics = metrics;
             this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
 
             Measurable numConnectors = new Measurable() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
index 9567ef9..3faff65 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
@@ -22,8 +22,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 
 import org.apache.kafka.connect.util.ConnectorTaskId;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -74,11 +72,4 @@ public class ConnectorInfo {
         return Objects.hash(name, config, tasks);
     }
 
-
-    private static List<ConnectorTaskId> jsonTasks(Collection<org.apache.kafka.connect.util.ConnectorTaskId> tasks) {
-        List<ConnectorTaskId> jsonTasks = new ArrayList<>();
-        for (ConnectorTaskId task : tasks)
-            jsonTasks.add(task);
-        return jsonTasks;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/AclCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index a098535..58c966d 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -84,7 +84,6 @@ object AclCommand {
         CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add ACLs.")
 
       for ((resource, acls) <- resourceToAcl) {
-        val acls = resourceToAcl(resource)
         println(s"Adding ACLs for resource `${resource}`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
         authorizer.addAcls(acls, resource)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index aa38f69..d3ce217 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -320,7 +320,7 @@ object AdminUtils extends Logging with AdminUtilities {
         try {
           zkUtils.createPersistentPath(getDeleteTopicPath(topic))
         } catch {
-          case e1: ZkNodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
+          case _: ZkNodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
             "topic %s is already marked for deletion".format(topic))
           case e2: Throwable => throw new AdminOperationException(e2)
         }
@@ -471,7 +471,7 @@ object AdminUtils extends Logging with AdminUtilities {
       }
       debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))
     } catch {
-      case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
+      case _: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))
       case e2: Throwable => throw new AdminOperationException(e2.toString)
     }
   }
@@ -593,7 +593,7 @@ object AdminUtils extends Logging with AdminUtilities {
             case _ => throw new IllegalArgumentException(s"Invalid ${entityConfigPath} config: ${str}")
           }
 
-        case o => throw new IllegalArgumentException(s"Unexpected value in config:(${str}), entity_config_path: ${entityConfigPath}")
+        case _ => throw new IllegalArgumentException(s"Unexpected value in config:(${str}), entity_config_path: ${entityConfigPath}")
       }
     }
     props

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 20048ec..34df6b0 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -99,7 +99,7 @@ object ConfigCommand extends Config {
   private def parseBroker(broker: String): Int = {
     try broker.toInt
     catch {
-      case e: NumberFormatException =>
+      case _: NumberFormatException =>
         throw new IllegalArgumentException(s"Error parsing broker $broker. The broker's Entity Name must be a single integer value")
     }
   }
@@ -190,20 +190,20 @@ object ConfigCommand extends Config {
           val rootEntities = zkUtils.getAllEntitiesWithConfig(root.entityType)
                                    .map(name => ConfigEntity(Entity(root.entityType, Some(name)), child))
           child match {
-            case Some (s) =>
+            case Some(s) =>
                 rootEntities.flatMap(rootEntity =>
                   ConfigEntity(rootEntity.root, Some(Entity(s.entityType, None))).getAllEntities(zkUtils))
             case None => rootEntities
           }
-        case (rootName, Some(childEntity)) =>
+        case (_, Some(childEntity)) =>
           childEntity.sanitizedName match {
-            case Some(subName) => Seq(this)
+            case Some(_) => Seq(this)
             case None =>
                 zkUtils.getAllEntitiesWithConfig(root.entityPath + "/" + childEntity.entityType)
                        .map(name => ConfigEntity(root, Some(Entity(childEntity.entityType, Some(name)))))
 
           }
-        case (rootName, None) =>
+        case (_, None) =>
           Seq(this)
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 354e6a2..6300d76 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -253,7 +253,7 @@ object ConsumerGroupCommand extends Logging {
       }
 
       assignmentRows ++= groupConsumerIds.sortBy(- consumerTopicPartitions.get(_).size).flatMap { consumerId =>
-        topicsByConsumerId(consumerId).flatMap { topic =>
+        topicsByConsumerId(consumerId).flatMap { _ =>
           // since consumers with no topic partitions are processed here, we pass empty for topic partitions and offsets
           // since consumer id is repeated in client id, leave host and client id empty
           collectConsumerAssignment(group, None, Array[TopicAndPartition](), Map[TopicAndPartition, Option[Long]](), Some(consumerId), None, None)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index d194eca..81014b1 100755
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -109,7 +109,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
       zkUtils.createPersistentPath(zkPath, jsonData)
       info("Created preferred replica election path with %s".format(jsonData))
     } catch {
-      case nee: ZkNodeExistsException =>
+      case _: ZkNodeExistsException =>
         val partitionsUndergoingPreferredReplicaElection =
           PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(zkUtils.readData(zkPath)._1)
         throw new AdminOperationException("Preferred replica leader election currently in progress for " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index a037fd4..709b365 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -218,7 +218,7 @@ object ReassignPartitionsCommand extends Logging {
                                             partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = {
     val newReplicas = partitionsToBeReassigned(topicAndPartition)
     partitionsBeingReassigned.get(topicAndPartition) match {
-      case Some(partition) => ReassignmentInProgress
+      case Some(_) => ReassignmentInProgress
       case None =>
         // check if the current replica assignment matches the expected one after reassignment
         val assignedReplicas = zkUtils.getReplicasForPartition(topicAndPartition.topic, topicAndPartition.partition)
@@ -394,7 +394,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, proposedAssignment: Map[TopicA
         true
       }
     } catch {
-      case ze: ZkNodeExistsException =>
+      case _: ZkNodeExistsException =>
         val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
         throw new AdminCommandFailedException("Partition reassignment currently in " +
         "progress for %s. Aborting operation".format(partitionsBeingReassigned))

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index a35a989..2fcc2ce 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -176,11 +176,11 @@ object TopicCommand extends Logging {
           println("Note: This will have no impact if delete.topic.enable is not set to true.")
         }
       } catch {
-        case e: ZkNodeExistsException =>
+        case _: ZkNodeExistsException =>
           println("Topic %s is already marked for deletion.".format(topic))
         case e: AdminOperationException =>
           throw e
-        case e: Throwable =>
+        case _: Throwable =>
           throw new AdminOperationException("Error while deleting topic %s".format(topic))
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index a87e5b7..9ffee86 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -123,7 +123,6 @@ object ZkSecurityMigrator extends Logging {
 }
 
 class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
-  private val workQueue = new LinkedBlockingQueue[Runnable]
   private val futures = new Queue[Future[String]]
 
   private def setAcl(path: String, setPromise: Promise[String]) = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
index 02eeae1..1ba5cfa 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
@@ -30,7 +30,7 @@ object ControlledShutdownResponse {
     val numEntries = buffer.getInt
 
     var partitionsRemaining = Set[TopicAndPartition]()
-    for (i<- 0 until numEntries){
+    for (_ <- 0 until numEntries){
       val topic = readShortString(buffer)
       val partition = buffer.getInt
       partitionsRemaining += new TopicAndPartition(topic, partition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 3c380c9..57e99c1 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -69,7 +69,7 @@ object FetchRequest {
     val groupedByTopic = requestInfo.groupBy { case (tp, _) => tp.topic }.map { case (topic, values) =>
       topic -> random.shuffle(values)
     }
-    random.shuffle(groupedByTopic.toSeq).flatMap { case (topic, partitions) =>
+    random.shuffle(groupedByTopic.toSeq).flatMap { case (_, partitions) =>
       partitions.map { case (tp, fetchInfo) => tp -> fetchInfo }
     }
   }
@@ -196,9 +196,8 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
   }
 
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    val fetchResponsePartitionData = requestInfo.map {
-      case (topicAndPartition, data) =>
-        (topicAndPartition, FetchResponsePartitionData(Errors.forException(e).code, -1, MessageSet.Empty))
+    val fetchResponsePartitionData = requestInfo.map { case (topicAndPartition, _) =>
+      (topicAndPartition, FetchResponsePartitionData(Errors.forException(e).code, -1, MessageSet.Empty))
     }
     val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
     val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData, fetchRequest.versionId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/api/LeaderAndIsr.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsr.scala b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
index e5813a5..9123788 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsr.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
@@ -46,10 +46,10 @@ object PartitionStateInfo {
     val leader = buffer.getInt
     val leaderEpoch = buffer.getInt
     val isrSize = buffer.getInt
-    val isr = for(i <- 0 until isrSize) yield buffer.getInt
+    val isr = for (_ <- 0 until isrSize) yield buffer.getInt
     val zkVersion = buffer.getInt
     val replicationFactor = buffer.getInt
-    val replicas = for(i <- 0 until replicationFactor) yield buffer.getInt
+    val replicas = for (_ <- 0 until replicationFactor) yield buffer.getInt
     PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, leaderEpoch, isr.toList, zkVersion), controllerEpoch),
                        replicas.toSet)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
index d4f6158..94223c7 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
@@ -48,7 +48,7 @@ case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short],
 
   lazy val commitStatusGroupedByTopic = commitStatus.groupBy(_._1.topic)
 
-  def hasError = commitStatus.exists{ case (topicAndPartition, errorCode) => errorCode != Errors.NONE.code }
+  def hasError = commitStatus.values.exists(_ != Errors.NONE.code)
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(correlationId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/api/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index b15cf5a..416dd73 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -114,8 +114,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
   }
 
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    val partitionOffsetResponseMap = requestInfo.map {
-      case (topicAndPartition, partitionOffsetRequest) =>
+    val partitionOffsetResponseMap = requestInfo.map { case (topicAndPartition, _) =>
         (topicAndPartition, PartitionOffsetsResponse(Errors.forException(e).code, Nil))
     }
     val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap)
@@ -133,4 +132,4 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
       offsetRequest.append("; RequestInfo: " + requestInfo.mkString(","))
     offsetRequest.toString()
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index aad2fa5..3ca7bd7 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -133,9 +133,8 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
         requestChannel.closeConnection(request.processor, request)
     }
     else {
-      val producerResponseStatus = data.map {
-        case (topicAndPartition, data) =>
-          (topicAndPartition, ProducerResponseStatus(Errors.forException(e).code, -1l, Message.NoTimestamp))
+      val producerResponseStatus = data.map { case (topicAndPartition, _) =>
+        (topicAndPartition, ProducerResponseStatus(Errors.forException(e).code, -1l, Message.NoTimestamp))
       }
       val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
       requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index fda8b0b..8893697 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -121,7 +121,7 @@ object ClientUtils extends Logging{
            debug("Created channel to broker %s:%d.".format(channel.host, channel.port))
            true
          } catch {
-           case e: Exception =>
+           case _: Exception =>
              if (channel != null) channel.disconnect()
              channel = null
              info("Error while creating channel to %s:%d.".format(broker.host, broker.port))
@@ -164,7 +164,7 @@ object ClientUtils extends Logging{
            }
          }
          catch {
-           case ioe: IOException =>
+           case _: IOException =>
              info("Failed to fetch consumer metadata from %s:%d.".format(queryChannel.host, queryChannel.port))
              queryChannel.disconnect()
          }
@@ -187,7 +187,7 @@ object ClientUtils extends Logging{
            queryChannel.disconnect()
          }
          catch {
-           case ioe: IOException => // offsets manager may have moved
+           case _: IOException => // offsets manager may have moved
              info("Error while connecting to %s.".format(connectString))
              if (offsetManagerChannel != null) offsetManagerChannel.disconnect()
              Thread.sleep(retryBackOffMs)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
index 91823f0..847e959 100644
--- a/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
+++ b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
@@ -32,7 +32,7 @@ object BrokerEndPoint {
    */
   def parseHostPort(connectionString: String): Option[(String, Int)] = {
     connectionString match {
-      case uriParseExp(host, port) => try Some(host, port.toInt) catch { case e: NumberFormatException => None }
+      case uriParseExp(host, port) => try Some(host, port.toInt) catch { case _: NumberFormatException => None }
       case _ => None
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 28d3c8d..4d3fb56 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -415,11 +415,10 @@ class Partition(val topic: String,
      * is violated, that replica is considered to be out of sync
      *
      **/
-    val leaderLogEndOffset = leaderReplica.logEndOffset
     val candidateReplicas = inSyncReplicas - leaderReplica
 
     val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
-    if(laggingReplicas.nonEmpty)
+    if (laggingReplicas.nonEmpty)
       debug("Lagging replicas for partition %s are %s".format(TopicAndPartition(topic, partitionId), laggingReplicas.map(_.brokerId).mkString(",")))
 
     laggingReplicas

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index dfb203a..13c1921 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -38,12 +38,7 @@ class Replica(val brokerId: Int,
   val topic = partition.topic
   val partitionId = partition.partitionId
 
-  def isLocal: Boolean = {
-    log match {
-      case Some(l) => true
-      case None => false
-    }
-  }
+  def isLocal: Boolean = log.isDefined
 
   private[this] val lastCaughtUpTimeMsUnderlying = new AtomicLong(time.milliseconds)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index 580ae33..ef8190c 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -91,7 +91,7 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
           val changeId = changeNumber(notification)
           if (changeId > lastExecutedChange) {
             val changeZnode = seqNodeRoot + "/" + notification
-            val (data, stat) = zkUtils.readDataMaybeNull(changeZnode)
+            val (data, _) = zkUtils.readDataMaybeNull(changeZnode)
             data.map(notificationHandler.processNotification(_)).getOrElse {
               logger.warn(s"read null data from $changeZnode when processing notification $notification")
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
index 96fe690..900a4b6 100755
--- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
@@ -72,7 +72,7 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging {
 
   def assign(ctx: AssignmentContext) = {
 
-    val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
+    val valueFactory = (_: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
     val partitionAssignment =
       new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory))
 
@@ -131,7 +131,7 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging {
 class RangeAssignor() extends PartitionAssignor with Logging {
 
   def assign(ctx: AssignmentContext) = {
-    val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
+    val valueFactory = (_: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
     val partitionAssignment =
       new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory))
     for (topic <- ctx.myTopicThreadIds.keySet) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/consumer/TopicCount.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index 5706d3c..eb035f2 100755
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -140,8 +140,8 @@ private[kafka] class WildcardTopicCount(zkUtils: ZkUtils,
 
   def pattern: String = {
     topicFilter match {
-      case wl: Whitelist => TopicCount.whiteListPattern
-      case bl: Blacklist => TopicCount.blackListPattern
+      case _: Whitelist => TopicCount.whiteListPattern
+      case _: Blacklist => TopicCount.blackListPattern
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/consumer/TopicFilter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicFilter.scala b/core/src/main/scala/kafka/consumer/TopicFilter.scala
index 1ab4e5c..914e9b9 100644
--- a/core/src/main/scala/kafka/consumer/TopicFilter.scala
+++ b/core/src/main/scala/kafka/consumer/TopicFilter.scala
@@ -34,7 +34,7 @@ sealed abstract class TopicFilter(rawRegex: String) extends Logging {
     Pattern.compile(regex)
   }
   catch {
-    case e: PatternSyntaxException =>
+    case _: PatternSyntaxException =>
       throw new RuntimeException(regex + " is an invalid regex.")
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index f776578..81b6264 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -322,8 +322,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   def commitOffsets(isAutoCommit: Boolean) {
 
     val offsetsToCommit =
-      immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) =>
-        partitionTopicInfos.map { case (partition, info) =>
+      immutable.Map(topicRegistry.values.flatMap { partitionTopicInfos =>
+        partitionTopicInfos.values.map { info =>
           TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset())
         }
       }.toSeq: _*)
@@ -442,7 +442,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
             trace("Offset fetch response: %s.".format(offsetFetchResponse))
 
             val (leaderChanged, loadInProgress) =
-              offsetFetchResponse.requestInfo.foldLeft(false, false) { case(folded, (topicPartition, offsetMetadataAndError)) =>
+              offsetFetchResponse.requestInfo.values.foldLeft(false, false) { case (folded, offsetMetadataAndError) =>
                 (folded._1 || (offsetMetadataAndError.error == Errors.NOT_COORDINATOR_FOR_GROUP.code),
                  folded._2 || (offsetMetadataAndError.error == Errors.GROUP_LOAD_IN_PROGRESS.code))
               }
@@ -706,7 +706,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         val globalPartitionAssignment = partitionAssignor.assign(assignmentContext)
         val partitionAssignment = globalPartitionAssignment.get(assignmentContext.consumerId)
         val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]](
-          valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo]))
+          valueFactory = Some((_: String) => new Pool[Int, PartitionTopicInfo]))
 
         // fetch current offsets for all topic-partitions
         val topicPartitions = partitionAssignment.keySet.toSeq
@@ -731,7 +731,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
           if(reflectPartitionOwnershipDecision(partitionAssignment)) {
             allTopicsOwnedPartitionsCount = partitionAssignment.size
 
-            partitionAssignment.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic }
+            partitionAssignment.view.groupBy { case (topicPartition, _) => topicPartition.topic }
                                       .foreach { case (topic, partitionThreadPairs) =>
               newGauge("OwnedPartitionsCount",
                 new Gauge[Int] {
@@ -851,11 +851,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
           successfullyOwnedPartitions ::= (topic, partition)
           true
         } catch {
-          case e: ZkNodeExistsException =>
+          case _: ZkNodeExistsException =>
             // The node hasn't been deleted by the original owner. So wait a bit and retry.
             info("waiting for the partition ownership to be deleted: " + partition)
             false
-          case e2: Throwable => throw e2
         }
       }
       val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1))
@@ -918,19 +917,18 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       topicCount.getConsumerThreadIdsPerTopic
 
     val allQueuesAndStreams = topicCount match {
-      case wildTopicCount: WildcardTopicCount =>
+      case _: WildcardTopicCount =>
         /*
          * Wild-card consumption streams share the same queues, so we need to
          * duplicate the list for the subsequent zip operation.
          */
         (1 to consumerThreadIdsPerTopic.keySet.size).flatMap(_ => queuesAndStreams).toList
-      case statTopicCount: StaticTopicCount =>
+      case _: StaticTopicCount =>
         queuesAndStreams
     }
 
-    val topicThreadIds = consumerThreadIdsPerTopic.map {
-      case(topic, threadIds) =>
-        threadIds.map((topic, _))
+    val topicThreadIds = consumerThreadIdsPerTopic.map { case (topic, threadIds) =>
+      threadIds.map((topic, _))
     }.flatten
 
     require(topicThreadIds.size == allQueuesAndStreams.size,
@@ -988,7 +986,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         "message streams by filter at most once.")
 
     private val wildcardQueuesAndStreams = (1 to numStreams)
-      .map(e => {
+      .map(_ => {
         val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
         val stream = new KafkaStream[K,V](queue,
                                           config.consumerTimeoutMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 063ea6f..b7b4d71 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -77,25 +77,23 @@ class ControllerContext(val zkUtils: ZkUtils,
   def liveOrShuttingDownBrokers = liveBrokersUnderlying
 
   def partitionsOnBroker(brokerId: Int): Set[TopicAndPartition] = {
-    partitionReplicaAssignment
-      .filter { case(topicAndPartition, replicas) => replicas.contains(brokerId) }
-      .map { case(topicAndPartition, replicas) => topicAndPartition }
-      .toSet
+    partitionReplicaAssignment.collect {
+      case (topicAndPartition, replicas) if replicas.contains(brokerId) => topicAndPartition
+    }.toSet
   }
 
   def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = {
     brokerIds.flatMap { brokerId =>
-      partitionReplicaAssignment
-        .filter { case (topicAndPartition, replicas) => replicas.contains(brokerId) }
-        .map { case (topicAndPartition, replicas) =>
+      partitionReplicaAssignment.collect {
+        case (topicAndPartition, replicas) if replicas.contains(brokerId) =>
           new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, brokerId)
-        }
+      }
     }.toSet
   }
 
   def replicasForTopic(topic: String): Set[PartitionAndReplica] = {
     partitionReplicaAssignment
-      .filter { case (topicAndPartition, replicas) => topicAndPartition.topic.equals(topic) }
+      .filter { case (topicAndPartition, _) => topicAndPartition.topic == topic }
       .flatMap { case (topicAndPartition, replicas) =>
         replicas.map { r =>
           new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, r)
@@ -103,10 +101,8 @@ class ControllerContext(val zkUtils: ZkUtils,
       }.toSet
   }
 
-  def partitionsForTopic(topic: String): collection.Set[TopicAndPartition] = {
-    partitionReplicaAssignment
-      .filter { case(topicAndPartition, replicas) => topicAndPartition.topic.equals(topic) }.keySet
-  }
+  def partitionsForTopic(topic: String): collection.Set[TopicAndPartition] =
+    partitionReplicaAssignment.keySet.filter(topicAndPartition => topicAndPartition.topic == topic)
 
   def allLiveReplicas(): Set[PartitionAndReplica] = {
     replicasOnBrokers(liveBrokerIds)
@@ -144,7 +140,7 @@ object KafkaController extends Logging {
         case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString))
       }
     } catch {
-      case t: Throwable =>
+      case _: Throwable =>
         // It may be due to an incompatible controller register version
         warn("Failed to parse the controller info as json. "
           + "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString))
@@ -433,7 +429,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
     partitionStateMachine.triggerOnlinePartitionStateChange()
     // check if reassignment of some partitions need to be restarted
     val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter {
-      case (topicAndPartition, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains(_))
+      case (_, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains(_))
     }
     partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2))
     // check if topic deletion needs to be resumed. If at least one replica that belongs to the topic being deleted exists
@@ -622,12 +618,11 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
     val newReplicas = reassignedPartitionContext.newReplicas
     val topic = topicAndPartition.topic
     val partition = topicAndPartition.partition
-    val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
     try {
       val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
       assignedReplicasOpt match {
         case Some(assignedReplicas) =>
-          if(assignedReplicas == newReplicas) {
+          if (assignedReplicas == newReplicas) {
             throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
               " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
           } else {
@@ -706,7 +701,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
         controllerContext.epoch = newControllerEpoch
       }
     } catch {
-      case nne: ZkNoNodeException =>
+      case _: ZkNoNodeException =>
         // if path doesn't exist, this is the first controller whose epoch should be 1
         // the following call can still fail if another controller gets elected between checking if the path exists and
         // trying to create the controller epoch path
@@ -715,7 +710,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
           controllerContext.epoch = KafkaController.InitialControllerEpoch
           controllerContext.epochZkVersion = KafkaController.InitialControllerEpochZkVersion
         } catch {
-          case e: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " +
+          case _: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " +
             "Aborting controller startup procedure")
           case oe: Throwable => error("Error while incrementing controller epoch", oe)
         }
@@ -788,7 +783,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
 
   private def initializeTopicDeletion() {
     val topicsQueuedForDeletion = zkUtils.getChildrenParentMayNotExist(ZkUtils.DeleteTopicsPath).toSet
-    val topicsWithReplicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter { case(partition, replicas) =>
+    val topicsWithReplicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter { case (_, replicas) =>
       replicas.exists(r => !controllerContext.liveBrokerIds.contains(r)) }.keySet.map(_.topic)
     val topicsForWhichPreferredReplicaElectionIsInProgress = controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic)
     val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic)
@@ -989,7 +984,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
       zkUtils.updatePersistentPath(zkPath, jsonPartitionMap)
       debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
     } catch {
-      case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
+      case _: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
       case e2: Throwable => throw new KafkaException(e2.toString)
     }
   }
@@ -1182,7 +1177,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
       inLock(controllerContext.controllerLock) {
         preferredReplicasForTopicsByBrokers =
           controllerContext.partitionReplicaAssignment.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy {
-            case(topicAndPartition, assignedReplicas) => assignedReplicas.head
+            case (_, assignedReplicas) => assignedReplicas.head
           }
       }
       debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
@@ -1192,13 +1187,10 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
           var imbalanceRatio: Double = 0
           var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
           inLock(controllerContext.controllerLock) {
-            topicsNotInPreferredReplica =
-              topicAndPartitionsForBroker.filter {
-                case(topicPartition, replicas) => {
-                  controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
-                  controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
-                }
-              }
+            topicsNotInPreferredReplica = topicAndPartitionsForBroker.filter { case (topicPartition, _) =>
+              controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
+                controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
+            }
             debug("topics not in preferred replica " + topicsNotInPreferredReplica)
             val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
             val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
@@ -1208,18 +1200,16 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
           // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
           // that need to be on this broker
           if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
-            topicsNotInPreferredReplica.foreach {
-              case(topicPartition, replicas) => {
-                inLock(controllerContext.controllerLock) {
-                  // do this check only if the broker is live and there are no partitions being reassigned currently
-                  // and preferred replica election is not in progress
-                  if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
-                      controllerContext.partitionsBeingReassigned.isEmpty &&
-                      controllerContext.partitionsUndergoingPreferredReplicaElection.isEmpty &&
-                      !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
-                      controllerContext.allTopics.contains(topicPartition.topic)) {
-                    onPreferredReplicaElection(Set(topicPartition), true)
-                  }
+            topicsNotInPreferredReplica.keys.foreach { topicPartition =>
+              inLock(controllerContext.controllerLock) {
+                // do this check only if the broker is live and there are no partitions being reassigned currently
+                // and preferred replica election is not in progress
+                if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
+                    controllerContext.partitionsBeingReassigned.isEmpty &&
+                    controllerContext.partitionsUndergoingPreferredReplicaElection.isEmpty &&
+                    !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
+                    controllerContext.allTopics.contains(topicPartition.topic)) {
+                  onPreferredReplicaElection(Set(topicPartition), true)
                 }
               }
             }
@@ -1373,7 +1363,7 @@ class IsrChangeNotificationListener(controller: KafkaController) extends IZkChil
 
   private def getTopicAndPartition(child: String): Set[TopicAndPartition] = {
     val changeZnode: String = ZkUtils.IsrChangeNotificationPath + "/" + child
-    val (jsonOpt, stat) = controller.controllerContext.zkUtils.readDataMaybeNull(changeZnode)
+    val (jsonOpt, _) = controller.controllerContext.zkUtils.readDataMaybeNull(changeZnode)
     if (jsonOpt.isDefined) {
       val json = Json.parseFull(jsonOpt.get)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 32bf4da..ee94b46 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -238,7 +238,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
    * zookeeper
    */
   private def initializePartitionState() {
-    for((topicPartition, replicaAssignment) <- controllerContext.partitionReplicaAssignment) {
+    for (topicPartition <- controllerContext.partitionReplicaAssignment.keys) {
       // check if leader and isr path exists for partition. If not, then it is in NEW state
       controllerContext.partitionLeadershipInfo.get(topicPartition) match {
         case Some(currentLeaderIsrAndEpoch) =>
@@ -297,7 +297,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,
             topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment)
         } catch {
-          case e: ZkNodeExistsException =>
+          case _: ZkNodeExistsException =>
             // read the controller epoch
             val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topicAndPartition.topic,
               topicAndPartition.partition).get
@@ -357,7 +357,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
         newLeaderIsrAndControllerEpoch, replicas)
     } catch {
-      case lenne: LeaderElectionNotNeededException => // swallow
+      case _: LeaderElectionNotNeededException => // swallow
       case nroe: NoReplicaOnlineException => throw nroe
       case sce: Throwable =>
         val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage)
@@ -430,8 +430,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
             controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
             info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
               deletedTopics, addedPartitionReplicaAssignment))
-            if(newTopics.nonEmpty)
-              controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)
+            if (newTopics.nonEmpty)
+              controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)
           } catch {
             case e: Throwable => error("Error while handling new topic", e )
           }
@@ -522,7 +522,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
             if (partitionsToBeAdded.nonEmpty) {
               info("New partitions to be added %s".format(partitionsToBeAdded))
               controllerContext.partitionReplicaAssignment.++=(partitionsToBeAdded)
-              controller.onNewPartitionCreation(partitionsToBeAdded.keySet.toSet)
+              controller.onNewPartitionCreation(partitionsToBeAdded.keySet)
             }
           }
         } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index d4e9bb4..03887ae 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -246,7 +246,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
           // As an optimization, the controller removes dead replicas from the ISR
           val leaderAndIsrIsEmpty: Boolean =
             controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
-              case Some(currLeaderIsrAndControllerEpoch) =>
+              case Some(_) =>
                 controller.removeReplicaFromIsr(topic, partition, replicaId) match {
                   case Some(updatedLeaderIsrAndControllerEpoch) =>
                     // send the shrunk ISR state change request to all the remaining alive replicas of the partition.

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 98057dd..8e5f3a1 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -342,8 +342,8 @@ class TopicDeletionManager(controller: KafkaController,
    *@param replicasForTopicsToBeDeleted
    */
   private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) {
-    replicasForTopicsToBeDeleted.groupBy(_.topic).foreach { case(topic, replicas) =>
-      var aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic.equals(topic))
+    replicasForTopicsToBeDeleted.groupBy(_.topic).keys.foreach { topic =>
+      val aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic == topic)
       val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic
       val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
       val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 79d4411..e0c8e65 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -265,7 +265,7 @@ class GroupMetadataManager(val brokerId: Int,
                           offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                           responseCallback: immutable.Map[TopicPartition, Short] => Unit): DelayedStore = {
     // first filter out partitions with offset metadata size exceeding limit
-    val filteredOffsetMetadata = offsetMetadata.filter { case (topicPartition, offsetAndMetadata) =>
+    val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) =>
       validateOffsetMetadataLength(offsetAndMetadata.metadata)
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index f29cde7..6b57696 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -290,7 +290,7 @@ class Log(val dir: File,
         try {
           curr.recover(config.maxMessageSize)
         } catch {
-          case e: InvalidOffsetException =>
+          case _: InvalidOffsetException =>
             val startOffset = curr.baseOffset
             warn("Found invalid offset during recovery for log " + dir.getName +". Deleting the corrupt segment and " +
                  "creating an empty one with starting offset " + startOffset)
@@ -627,7 +627,7 @@ class Log(val dir: File,
       val fetchDataInfo = read(offset, 1)
       fetchDataInfo.fetchOffsetMetadata
     } catch {
-      case e: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata
+      case _: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index bb8a89a..34b0dbf 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -240,7 +240,7 @@ class LogCleaner(val config: CleanerConfig,
             recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleanerStats)
             endOffset = nextDirtyOffset
           } catch {
-            case pe: LogCleaningAbortedException => // task can be aborted, let it go.
+            case _: LogCleaningAbortedException => // task can be aborted, let it go.
           } finally {
             cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0926738/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index b3e6e72..b808348 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -112,12 +112,10 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
     */
   def deletableLogs(): Iterable[(TopicAndPartition, Log)] = {
     inLock(lock) {
-      val toClean = logs.filterNot {
-        case (topicAndPartition, log) => inProgress.contains(topicAndPartition)
-      }.filter {
-        case (topicAndPartition, log) => isCompactAndDelete(log)
+      val toClean = logs.filter { case (topicAndPartition, log) =>
+        !inProgress.contains(topicAndPartition) && isCompactAndDelete(log)
       }
-      toClean.foreach{x => inProgress.put(x._1, LogCleaningInProgress)}
+      toClean.foreach { case (tp, _) => inProgress.put(tp, LogCleaningInProgress) }
       toClean
     }
 
@@ -317,4 +315,4 @@ private[log] object LogCleanerManager extends Logging {
 
     (firstDirtyOffset, firstUncleanableDirtyOffset)
   }
-}
\ No newline at end of file
+}


Mime
View raw message