kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Simplify log cleaner and fix compiler warnings
Date Wed, 04 Oct 2017 17:55:58 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9949e1ed1 -> 2a1b39ef1


MINOR: Simplify log cleaner and fix compiler warnings

- Simplify LogCleaner.cleanSegments and add comment regarding thread
unsafe usage of `LogSegment.append`. This was a result of investigating
KAFKA-4972.
- Fix compiler warnings (in some cases use the fully qualified name as a
workaround for deprecation warnings in import statements).

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>

Closes #4016 from ijuma/simplify-log-cleaner-and-fix-warnings


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2a1b39ef
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2a1b39ef
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2a1b39ef

Branch: refs/heads/trunk
Commit: 2a1b39ef1b65a4de4f3813bc749458f9fd6a1e38
Parents: 9949e1e
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Wed Oct 4 18:55:46 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed Oct 4 18:55:46 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/admin/AbstractOptions.java    |  1 +
 .../org/apache/kafka/common/KafkaFuture.java    |  6 +-
 .../kafka/common/config/AbstractConfig.java     |  2 +-
 .../apache/kafka/common/config/ConfigDef.java   |  1 +
 .../apache/kafka/common/config/SslConfigs.java  |  5 +-
 .../kafka/common/internals/KafkaFutureImpl.java |  2 +-
 .../kafka/common/network/ChannelBuilders.java   | 20 ++++---
 .../common/network/SaslChannelBuilder.java      |  1 +
 .../DefaultKafkaPrincipalBuilder.java           | 14 ++---
 .../authenticator/SaslServerAuthenticator.java  |  1 +
 .../org/apache/kafka/common/utils/Utils.java    |  1 +
 .../connect/runtime/MockConnectMetrics.java     |  1 +
 .../connect/runtime/WorkerSourceTaskTest.java   |  1 +
 core/src/main/scala/kafka/log/Log.scala         | 10 ++--
 core/src/main/scala/kafka/log/LogCleaner.scala  | 63 +++++++++++---------
 .../scala/kafka/tools/DumpLogSegments.scala     |  2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala | 11 ++--
 17 files changed, 78 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2a1b39ef/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java
index 5b13dea..d3085c3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AbstractOptions.java
@@ -29,6 +29,7 @@ public abstract class AbstractOptions<T extends AbstractOptions> {
      * Set the request timeout in milliseconds for this operation or {@code null} if the
default request timeout for the
      * AdminClient should be used.
      */
+    @SuppressWarnings("unchecked")
     public T timeoutMs(Integer timeoutMs) {
         this.timeoutMs = timeoutMs;
         return (T) this;

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a1b39ef/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
index f6a0d51..23e2181 100644
--- a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
+++ b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
@@ -48,9 +48,9 @@ public abstract class KafkaFuture<T> implements Future<T> {
 
     private static class AllOfAdapter<R> extends BiConsumer<R, Throwable> {
         private int remainingResponses;
-        private KafkaFuture future;
+        private KafkaFuture<?> future;
 
-        public AllOfAdapter(int remainingResponses, KafkaFuture future) {
+        public AllOfAdapter(int remainingResponses, KafkaFuture<?> future) {
             this.remainingResponses = remainingResponses;
             this.future = future;
             maybeComplete();
@@ -91,7 +91,7 @@ public abstract class KafkaFuture<T> implements Future<T> {
      */
     public static KafkaFuture<Void> allOf(KafkaFuture<?>... futures) {
         KafkaFuture<Void> allOfFuture = new KafkaFutureImpl<>();
-        AllOfAdapter allOfWaiter = new AllOfAdapter(futures.length, allOfFuture);
+        AllOfAdapter<Object> allOfWaiter = new AllOfAdapter<>(futures.length,
allOfFuture);
         for (KafkaFuture<?> future : futures) {
             future.addWaiter(allOfWaiter);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a1b39ef/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index d2b6d34..dc80d98 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -260,7 +260,7 @@ public class AbstractConfig {
      * @return The list of configured instances
      */
     public <T> List<T> getConfiguredInstances(String key, Class<T> t) {
-        return getConfiguredInstances(key, t, Collections.EMPTY_MAP);
+        return getConfiguredInstances(key, t, Collections.<String, Object>emptyMap());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a1b39ef/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 28fc801..7b9881f 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -867,6 +867,7 @@ public class ConfigDef {
 
         @Override
         public void ensureValid(final String name, final Object value) {
+            @SuppressWarnings("unchecked")
             List<String> values = (List<String>) value;
             for (String string : values) {
                 validString.ensureValid(name, string);

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a1b39ef/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
index 5e8d304..042b051 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.common.config;
 
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
-import org.apache.kafka.common.security.auth.DefaultPrincipalBuilder;
 
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.TrustManagerFactory;
@@ -41,8 +40,10 @@ public class SslConfigs {
      * @deprecated As of 1.0.0. This field will be removed in a future major release. In
recent versions,
      *   the config is optional and there is no default.
      */
+    // use FQN to avoid import deprecation warning
     @Deprecated
-    public static final String DEFAULT_PRINCIPAL_BUILDER_CLASS = DefaultPrincipalBuilder.class.getName();
+    public static final String DEFAULT_PRINCIPAL_BUILDER_CLASS =
+            org.apache.kafka.common.security.auth.DefaultPrincipalBuilder.class.getName();
 
     public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol";
     public static final String SSL_PROTOCOL_DOC = "The SSL protocol used to generate the
SSLContext. "

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a1b39ef/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
index 01355c6..cb97e87 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
@@ -142,7 +142,7 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
     @Override
     public <R> KafkaFuture<R> thenApply(Function<T, R> function) {
         KafkaFutureImpl<R> future = new KafkaFutureImpl<R>();
-        addWaiter(new Applicant(function, future));
+        addWaiter(new Applicant<>(function, future));
         return future;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a1b39ef/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index 42723ff..74bd0a0 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -23,9 +23,7 @@ import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
-import org.apache.kafka.common.security.auth.DefaultPrincipalBuilder;
 import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
-import org.apache.kafka.common.security.auth.PrincipalBuilder;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
 import org.apache.kafka.common.utils.Utils;
@@ -122,13 +120,15 @@ public class ChannelBuilders {
             throw new IllegalArgumentException("`mode` must be non-null if `securityProtocol`
is `" + securityProtocol + "`");
     }
 
+    // Use FQN to avoid deprecated import warnings
     @SuppressWarnings("deprecation")
-    private static PrincipalBuilder createPrincipalBuilder(Class<?> principalBuilderClass,
Map<String, ?> configs) {
-        PrincipalBuilder principalBuilder;
+    private static org.apache.kafka.common.security.auth.PrincipalBuilder createPrincipalBuilder(
+            Class<?> principalBuilderClass, Map<String, ?> configs) {
+        org.apache.kafka.common.security.auth.PrincipalBuilder principalBuilder;
         if (principalBuilderClass == null)
-            principalBuilder = new DefaultPrincipalBuilder();
+            principalBuilder = new org.apache.kafka.common.security.auth.DefaultPrincipalBuilder();
         else
-            principalBuilder = (PrincipalBuilder) Utils.newInstance(principalBuilderClass);
+            principalBuilder = (org.apache.kafka.common.security.auth.PrincipalBuilder) Utils.newInstance(principalBuilderClass);
         principalBuilder.configure(configs);
         return principalBuilder;
     }
@@ -145,13 +145,15 @@ public class ChannelBuilders {
             builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer);
         } else if (KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
             builder = (KafkaPrincipalBuilder) Utils.newInstance(principalBuilderClass);
-        } else if (PrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
-            PrincipalBuilder oldPrincipalBuilder = createPrincipalBuilder(principalBuilderClass,
configs);
+        } else if (org.apache.kafka.common.security.auth.PrincipalBuilder.class.isAssignableFrom(principalBuilderClass))
{
+            org.apache.kafka.common.security.auth.PrincipalBuilder oldPrincipalBuilder =
+                    createPrincipalBuilder(principalBuilderClass, configs);
             builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator,
transportLayer,
                     oldPrincipalBuilder, kerberosShortNamer);
         } else {
             throw new InvalidConfigurationException("Type " + principalBuilderClass.getName()
+ " is not " +
-                    "an instance of " + PrincipalBuilder.class.getName() + " or " + KafkaPrincipalBuilder.class.getName());
+                    "an instance of " + org.apache.kafka.common.security.auth.PrincipalBuilder.class.getName()
+ " or " +
+                    KafkaPrincipalBuilder.class.getName());
         }
 
         if (builder instanceof Configurable)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a1b39ef/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index e4eb791..72a2bc1 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -81,6 +81,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
             this.configs = configs;
             boolean hasKerberos;
             if (mode == Mode.SERVER) {
+                @SuppressWarnings("unchecked")
                 List<String> enabledMechanisms = (List<String>) this.configs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG);
                 hasKerberos = enabledMechanisms == null || enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM);
             } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a1b39ef/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
index 1d0295a..30b0a3e 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.security.auth.AuthenticationContext;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
 import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext;
-import org.apache.kafka.common.security.auth.PrincipalBuilder;
 import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
 import org.apache.kafka.common.security.auth.SslAuthenticationContext;
 import org.apache.kafka.common.security.kerberos.KerberosName;
@@ -47,27 +46,28 @@ import static java.util.Objects.requireNonNull;
  *
  * NOTE: This is an internal class and can change without notice. Unlike normal implementations
  * of {@link KafkaPrincipalBuilder}, there is no default no-arg constructor since this class
- * must adapt implementations of the older {@link PrincipalBuilder} interface.
+ * must adapt implementations of the older {@link org.apache.kafka.common.security.auth.PrincipalBuilder}
interface.
  */
 public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder, Closeable {
+    // Use FQN to avoid import deprecation warnings
     @SuppressWarnings("deprecation")
-    private final PrincipalBuilder oldPrincipalBuilder;
+    private final org.apache.kafka.common.security.auth.PrincipalBuilder oldPrincipalBuilder;
     private final Authenticator authenticator;
     private final TransportLayer transportLayer;
     private final KerberosShortNamer kerberosShortNamer;
 
     /**
-     * Construct a new instance which wraps an instance of the older {@link PrincipalBuilder}.
+     * Construct a new instance which wraps an instance of the older {@link org.apache.kafka.common.security.auth.PrincipalBuilder}.
      *
      * @param authenticator The authenticator in use
      * @param transportLayer The underlying transport layer
-     * @param oldPrincipalBuilder Instance of {@link PrincipalBuilder}
+     * @param oldPrincipalBuilder Instance of {@link org.apache.kafka.common.security.auth.PrincipalBuilder}
      * @param kerberosShortNamer Kerberos name rewrite rules or null if none have been configured
      */
     @SuppressWarnings("deprecation")
     public static DefaultKafkaPrincipalBuilder fromOldPrincipalBuilder(Authenticator authenticator,
                                                                        TransportLayer transportLayer,
-                                                                       PrincipalBuilder oldPrincipalBuilder,
+                                                                       org.apache.kafka.common.security.auth.PrincipalBuilder
oldPrincipalBuilder,
                                                                        KerberosShortNamer
kerberosShortNamer) {
         return new DefaultKafkaPrincipalBuilder(
                 requireNonNull(authenticator),
@@ -79,7 +79,7 @@ public class DefaultKafkaPrincipalBuilder implements KafkaPrincipalBuilder,
Clos
     @SuppressWarnings("deprecation")
     private DefaultKafkaPrincipalBuilder(Authenticator authenticator,
                                          TransportLayer transportLayer,
-                                         PrincipalBuilder oldPrincipalBuilder,
+                                         org.apache.kafka.common.security.auth.PrincipalBuilder
oldPrincipalBuilder,
                                          KerberosShortNamer kerberosShortNamer) {
         this.authenticator = authenticator;
         this.transportLayer = transportLayer;

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a1b39ef/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 4e565d8..1a2aaf4 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -146,6 +146,7 @@ public class SaslServerAuthenticator implements Authenticator {
         this.transportLayer = transportLayer;
 
         this.configs = configs;
+        @SuppressWarnings("unchecked")
         List<String> enabledMechanisms = (List<String>) this.configs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG);
         if (enabledMechanisms == null || enabledMechanisms.isEmpty())
             throw new IllegalArgumentException("No SASL mechanisms are enabled");

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a1b39ef/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 82b12c3..2843f3e 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -327,6 +327,7 @@ public class Utils {
                 argTypes[i] = (Class<?>) params[2 * i];
                 args[i] = params[(2 * i) + 1];
             }
+            @SuppressWarnings("unchecked")
             Constructor<T> constructor = (Constructor<T>) c.getConstructor(argTypes);
             return constructor.newInstance(args);
         } catch (NoSuchMethodException e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a1b39ef/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
index 00be485..f0bb1e2e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
@@ -136,6 +136,7 @@ public class MockConnectMetrics extends ConnectMetrics {
          * @param metricName the name of the metric that was registered most recently
          * @return the current value of the metric
          */
+        @SuppressWarnings("deprecation")
         public double currentMetricValue(MetricName metricName) {
             KafkaMetric metric = metricsByName.get(metricName);
             return metric != null ? metric.value() : Double.NaN;

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a1b39ef/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index d1e0290..928ccb9 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -677,6 +677,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         return expectSendRecord(anyTimes, isRetry, false);
     }
 
+    @SuppressWarnings("unchecked")
     private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(boolean
anyTimes, boolean isRetry, boolean succeed) throws InterruptedException {
         expectConvertKeyValue(anyTimes);
         expectApplyTransformationChain(anyTimes);

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a1b39ef/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 37137ec..dc47194 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -155,7 +155,7 @@ class Log(@volatile var dir: File,
   /* last time it was flushed */
   private val lastflushedTime = new AtomicLong(time.milliseconds)
 
-  def initFileSize() : Int = {
+  def initFileSize: Int = {
     if (config.preallocate)
       config.segmentSize
     else
@@ -408,7 +408,7 @@ class Log(@volatile var dir: File,
                                       rollJitterMs = config.randomSegmentJitter,
                                       time = time,
                                       fileAlreadyExists = false,
-                                      initFileSize = this.initFileSize(),
+                                      initFileSize = this.initFileSize,
                                       preallocate = config.preallocate))
     } else if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
       recoverLog()
@@ -1575,7 +1575,7 @@ class Log(@volatile var dir: File,
    * @param oldSegments The old log segments to delete from the log
    * @param isRecoveredSwapFile true if the new segment was created from a swap file during
recovery after a crash
    */
-  private[log] def replaceSegments(newSegment: LogSegment, oldSegments: Seq[LogSegment],
isRecoveredSwapFile : Boolean = false) {
+  private[log] def replaceSegments(newSegment: LogSegment, oldSegments: Seq[LogSegment],
isRecoveredSwapFile: Boolean = false) {
     lock synchronized {
       // need to do this in two phases to be crash safe AND do the delete asynchronously
       // if we crash in the middle of this we complete the swap in loadSegments()
@@ -1584,9 +1584,9 @@ class Log(@volatile var dir: File,
       addSegment(newSegment)
 
       // delete the old files
-      for(seg <- oldSegments) {
+      for (seg <- oldSegments) {
         // remove the index entry
-        if(seg.baseOffset != newSegment.baseOffset)
+        if (seg.baseOffset != newSegment.baseOffset)
           segments.remove(seg.baseOffset)
         // delete segment
         asyncDeleteSegment(seg)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a1b39ef/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 61dd0fc..d45984b 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -415,42 +415,46 @@ private[log] class Cleaner(val id: Int,
                                  map: OffsetMap,
                                  deleteHorizonMs: Long,
                                  stats: CleanerStats) {
-    // create a new segment with the suffix .cleaned appended to both the log and index name
-    val logFile = new File(segments.head.log.file.getPath + Log.CleanedFileSuffix)
-    logFile.delete()
-    val indexFile = new File(segments.head.index.file.getPath + Log.CleanedFileSuffix)
-    val timeIndexFile = new File(segments.head.timeIndex.file.getPath + Log.CleanedFileSuffix)
-    val txnIndexFile = new File(segments.head.txnIndex.file.getPath + Log.CleanedFileSuffix)
-    indexFile.delete()
-    timeIndexFile.delete()
-    txnIndexFile.delete()
-
-    val startOffset = segments.head.baseOffset
-    val records = FileRecords.open(logFile, false, log.initFileSize(), log.config.preallocate)
-    val index = new OffsetIndex(indexFile, startOffset, segments.head.index.maxIndexSize)
-    val timeIndex = new TimeIndex(timeIndexFile, startOffset, segments.head.timeIndex.maxIndexSize)
+
+    def deleteAndGetCleanedFile(file: File): File = {
+      val f = new File(file.getPath + Log.CleanedFileSuffix)
+      f.delete()
+      f
+    }
+
+    // create a new segment with a suffix appended to the name of the log and indexes
+    val firstSegment = segments.head
+    val logFile = deleteAndGetCleanedFile(firstSegment.log.file)
+    val indexFile = deleteAndGetCleanedFile(firstSegment.index.file)
+    val timeIndexFile = deleteAndGetCleanedFile(firstSegment.timeIndex.file)
+    val txnIndexFile = deleteAndGetCleanedFile(firstSegment.txnIndex.file)
+
+    val startOffset = firstSegment.baseOffset
+    val records = FileRecords.open(logFile, false, log.initFileSize, log.config.preallocate)
+    val index = new OffsetIndex(indexFile, startOffset, firstSegment.index.maxIndexSize)
+    val timeIndex = new TimeIndex(timeIndexFile, startOffset, firstSegment.timeIndex.maxIndexSize)
     val txnIndex = new TransactionIndex(startOffset, txnIndexFile)
-    val cleaned = new LogSegment(records, index, timeIndex, txnIndex, startOffset,
-      segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time)
+    val cleaned = new LogSegment(records, index, timeIndex, txnIndex, startOffset, firstSegment.indexIntervalBytes,
+      log.config.randomSegmentJitter, time)
 
     try {
       // clean segments into the new destination segment
       val iter = segments.iterator
       var currentSegmentOpt: Option[LogSegment] = Some(iter.next())
       while (currentSegmentOpt.isDefined) {
-        val oldSegmentOpt = currentSegmentOpt.get
+        val currentSegment = currentSegmentOpt.get
         val nextSegmentOpt = if (iter.hasNext) Some(iter.next()) else None
 
-        val startOffset = oldSegmentOpt.baseOffset
+        val startOffset = currentSegment.baseOffset
         val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(map.latestOffset
+ 1)
         val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset)
         val transactionMetadata = CleanedTransactionMetadata(abortedTransactions, Some(txnIndex))
 
-        val retainDeletes = oldSegmentOpt.lastModified > deleteHorizonMs
-        info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
-          .format(startOffset, log.name, new Date(oldSegmentOpt.largestTimestamp), cleaned.baseOffset,
if(retainDeletes) "retaining" else "discarding"))
-        cleanInto(log.topicPartition, oldSegmentOpt, cleaned, map, retainDeletes, log.config.maxMessageSize,
transactionMetadata,
-          log.activeProducersWithLastSequence, stats)
+        val retainDeletes = currentSegment.lastModified > deleteHorizonMs
+        info(s"Cleaning segment $startOffset in log ${log.name} (largest timestamp ${new
Date(currentSegment.largestTimestamp)}) " +
+          s"into ${cleaned.baseOffset}, ${if(retainDeletes) "retaining" else "discarding"}
deletes.")
+        cleanInto(log.topicPartition, currentSegment.log, cleaned, map, retainDeletes, log.config.maxMessageSize,
+          transactionMetadata, log.activeProducersWithLastSequence, stats)
 
         currentSegmentOpt = nextSegmentOpt
       }
@@ -497,7 +501,7 @@ private[log] class Cleaner(val id: Int,
    * @param stats Collector for cleaning statistics
    */
   private[log] def cleanInto(topicPartition: TopicPartition,
-                             source: LogSegment,
+                             sourceRecords: FileRecords,
                              dest: LogSegment,
                              map: OffsetMap,
                              retainDeletes: Boolean,
@@ -528,18 +532,18 @@ private[log] class Cleaner(val id: Int,
           // The batch is only retained to preserve producer sequence information; the records
can be removed
           false
         else
-          Cleaner.this.shouldRetainRecord(source, map, retainDeletes, batch, record, stats)
+          Cleaner.this.shouldRetainRecord(map, retainDeletes, batch, record, stats)
       }
     }
 
     var position = 0
-    while (position < source.log.sizeInBytes) {
+    while (position < sourceRecords.sizeInBytes) {
       checkDone(topicPartition)
       // read a chunk of messages and copy any that are to be retained to the write buffer
to be written out
       readBuffer.clear()
       writeBuffer.clear()
 
-      source.log.readInto(readBuffer, position)
+      sourceRecords.readInto(readBuffer, position)
       val records = MemoryRecords.readableRecords(readBuffer)
       throttler.maybeThrottle(records.sizeInBytes)
       val result = records.filterTo(topicPartition, logCleanerFilter, writeBuffer, maxLogMessageSize,
decompressionBufferSupplier)
@@ -553,6 +557,8 @@ private[log] class Cleaner(val id: Int,
       if (outputBuffer.position() > 0) {
         outputBuffer.flip()
         val retained = MemoryRecords.readableRecords(outputBuffer)
+        // it's OK not to hold the Log's lock in this case, because this segment is only
accessed by other threads
+        // after `Log.replaceSegments` (which acquires the lock) is called
         dest.append(firstOffset = retained.batches.iterator.next().baseOffset,
           largestOffset = result.maxOffset,
           largestTimestamp = result.maxTimestamp,
@@ -580,8 +586,7 @@ private[log] class Cleaner(val id: Int,
     }
   }
 
-  private def shouldRetainRecord(source: kafka.log.LogSegment,
-                                 map: kafka.log.OffsetMap,
+  private def shouldRetainRecord(map: kafka.log.OffsetMap,
                                  retainDeletes: Boolean,
                                  batch: RecordBatch,
                                  record: Record,

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a1b39ef/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 8bf43b2..a8a2ba7 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -440,7 +440,7 @@ object DumpLogSegments {
     val shallowOffsetNotFound = new mutable.HashMap[String, ArrayBuffer[(Long, Long)]]
 
     def recordMismatchTimeIndex(file: File, indexTimestamp: Long, logTimestamp: Long) {
-      var misMatchesSeq = misMatchesForTimeIndexFilesMap.getOrElse(file.getAbsolutePath,
new ArrayBuffer[(Long, Long)]())
+      val misMatchesSeq = misMatchesForTimeIndexFilesMap.getOrElse(file.getAbsolutePath,
new ArrayBuffer[(Long, Long)]())
       if (misMatchesSeq.isEmpty)
         misMatchesForTimeIndexFilesMap.put(file.getAbsolutePath, misMatchesSeq)
       misMatchesSeq += ((indexTimestamp, logTimestamp))

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a1b39ef/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 08da667..687307a 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -57,7 +57,6 @@ import org.apache.zookeeper.ZooDefs._
 import org.apache.zookeeper.data.ACL
 import org.junit.Assert._
 
-import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 import scala.collection.{Map, mutable}
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
@@ -1412,7 +1411,7 @@ object TestUtils extends Logging {
   // If true, this will return the value as a string. It is expected that the record in question
should have been created
   // by the `producerRecordWithExpectedTransactionStatus` method.
   def assertCommittedAndGetValue(record: ConsumerRecord[Array[Byte], Array[Byte]]) : String
= {
-    record.headers.headers(transactionStatusKey).headOption match {
+    record.headers.headers(transactionStatusKey).asScala.headOption match {
       case Some(header) =>
         assertEquals(s"Got ${asString(header.value)} but expected the value to indicate "
+
           s"committed status.", asString(committedValue), asString(header.value))
@@ -1434,7 +1433,7 @@ object TestUtils extends Logging {
       else
         abortedValue
     }
-    new ProducerRecord[Array[Byte], Array[Byte]](topic, null, key, value, List(header))
+    new ProducerRecord[Array[Byte], Array[Byte]](topic, null, key, value, Collections.singleton(header))
   }
 
   def producerRecordWithExpectedTransactionStatus(topic: String, key: String, value: String,
@@ -1445,7 +1444,7 @@ object TestUtils extends Logging {
   // Collect the current positions for all partition in the consumers current assignment.
   def consumerPositions(consumer: KafkaConsumer[Array[Byte], Array[Byte]]) : Map[TopicPartition,
OffsetAndMetadata]  = {
     val offsetsToCommit = new mutable.HashMap[TopicPartition, OffsetAndMetadata]()
-    consumer.assignment.foreach{ topicPartition =>
+    consumer.assignment.asScala.foreach { topicPartition =>
       offsetsToCommit.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition)))
     }
     offsetsToCommit.toMap
@@ -1454,14 +1453,14 @@ object TestUtils extends Logging {
   def pollUntilAtLeastNumRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords:
Int): Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
     val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]()
     TestUtils.waitUntilTrue(() => {
-      records ++= consumer.poll(50)
+      records ++= consumer.poll(50).asScala
       records.size >= numRecords
     }, s"Consumed ${records.size} records until timeout, but expected $numRecords records.")
     records
   }
 
   def resetToCommittedPositions(consumer: KafkaConsumer[Array[Byte], Array[Byte]]) = {
-    consumer.assignment.foreach { case(topicPartition) =>
+    consumer.assignment.asScala.foreach { case(topicPartition) =>
       val offset = consumer.committed(topicPartition)
       if (offset != null)
         consumer.seek(topicPartition, offset.offset)


Mime
View raw message