kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [1/2] kafka git commit: KAFKA-4501; Java 9 compilation and runtime fixes
Date Sat, 19 Aug 2017 07:55:45 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 3e22c1c04 -> ed96523a2


http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/utils/ToolsUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ToolsUtils.scala b/core/src/main/scala/kafka/utils/ToolsUtils.scala
index 1be5a45..50e04f5 100644
--- a/core/src/main/scala/kafka/utils/ToolsUtils.scala
+++ b/core/src/main/scala/kafka/utils/ToolsUtils.scala
@@ -16,13 +16,9 @@
 */
 package kafka.utils
 
-import java.util
-import java.util.Comparator
-
 import joptsimple.OptionParser
 import org.apache.kafka.common.{Metric, MetricName}
 
-import scala.collection.immutable.ListMap
 import scala.collection.mutable
 
 object ToolsUtils {
@@ -32,9 +28,8 @@ object ToolsUtils {
       hostPort.split(",")
     else
       Array(hostPort)
-    val validHostPort = hostPorts.filter {
-      hostPortData =>
-        org.apache.kafka.common.utils.Utils.getPort(hostPortData) != null
+    val validHostPort = hostPorts.filter { hostPortData =>
+      org.apache.kafka.common.utils.Utils.getPort(hostPortData) != null
     }
     val isValid = !validHostPort.isEmpty && validHostPort.size == hostPorts.length
     if(!isValid)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 012f254..90d7838 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -27,12 +27,12 @@ import kafka.log.LogConfig
 import kafka.server.{Defaults, KafkaConfig, KafkaServer}
 import org.apache.kafka.clients.admin._
 import kafka.utils.{Logging, TestUtils, ZkUtils}
+import kafka.utils.Implicits._
 import org.apache.kafka.clients.admin.NewTopic
 import org.apache.kafka.common.KafkaFuture
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation,
AclPermissionType}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException,
TimeoutException, TopicExistsException, UnknownTopicOrPartitionException}
-import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.{After, Before, Rule, Test}
 import org.apache.kafka.common.requests.MetadataResponse
 import org.apache.kafka.common.resource.{Resource, ResourceType}
@@ -235,8 +235,8 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging
{
     val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER, servers(1).config.brokerId.toString)
     val brokerResource2 = new ConfigResource(ConfigResource.Type.BROKER, servers(2).config.brokerId.toString)
     val configResources = Seq(topicResource1, topicResource2, brokerResource1, brokerResource2)
-    var describeResult = client.describeConfigs(configResources.asJava)
-    var configs = describeResult.all.get
+    val describeResult = client.describeConfigs(configResources.asJava)
+    val configs = describeResult.all.get
 
     assertEquals(4, configs.size)
 
@@ -378,7 +378,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging
{
       if (!config.containsKey(KafkaConfig.SslTruststorePasswordProp))
         config.setProperty(KafkaConfig.SslTruststorePasswordProp, "some.invalid.pass")
     }
-    cfgs.foreach(_.putAll(serverConfig))
+    cfgs.foreach(_ ++= serverConfig)
     cfgs.map(KafkaConfig.fromProps)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 3c36bb0..e124468 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -19,7 +19,7 @@ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.{PartitionInfo, TopicPartition}
-import kafka.utils.{Logging, ShutdownableThread, TestUtils}
+import kafka.utils.{ShutdownableThread, TestUtils}
 import kafka.server.KafkaConfig
 import org.junit.Assert._
 import org.junit.{Before, Test}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
index 6c61cd9..2cd0df2 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
@@ -25,6 +25,7 @@ import kafka.common.TopicAndPartition
 import kafka.integration.KafkaServerTestHarness
 import kafka.server._
 import kafka.utils._
+import kafka.utils.Implicits._
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, TopicPartition}
@@ -105,7 +106,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
   override def generateConfigs = {
     val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol
= Some(securityProtocol),
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
-    cfgs.foreach(_.putAll(serverConfig))
+    cfgs.foreach(_ ++= serverConfig)
     cfgs.map(KafkaConfig.fromProps)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 5e3c7ab..b8dc57b 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -20,6 +20,7 @@ package kafka.api
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import kafka.utils.TestUtils
+import kafka.utils.Implicits._
 import java.util.Properties
 
 import org.apache.kafka.clients.producer.KafkaProducer
@@ -54,7 +55,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
       config.setProperty(KafkaConfig.InterBrokerListenerNameProp, listenerName.value)
       config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}")
     }
-    cfgs.foreach(_.putAll(serverConfig))
+    cfgs.foreach(_ ++= serverConfig)
     cfgs.map(KafkaConfig.fromProps)
   }
 
@@ -65,10 +66,10 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
     super.setUp()
     producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
     producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
-    producerConfig.putAll(producerSecurityProps)
+    producerConfig ++= producerSecurityProps
     consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
     consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
-    consumerConfig.putAll(consumerSecurityProps)
+    consumerConfig ++= consumerSecurityProps
     for (_ <- 0 until producerCount)
       producers += createNewProducer
     for (_ <- 0 until consumerCount) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index aa92f40..a11972e 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -20,6 +20,7 @@ import kafka.consumer.SimpleConsumer
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.{ShutdownableThread, TestUtils}
+import kafka.utils.Implicits._
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.junit.Assert._
@@ -123,7 +124,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
     producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
     producerConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
     val producerConfigWithCompression = new Properties()
-    producerConfigWithCompression.putAll(producerConfig)
+    producerConfigWithCompression ++= producerConfig
     producerConfigWithCompression.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4")
     val producers = List(
       TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize / 4, retries
= 10, props = Some(producerConfig)),

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
index cc9ee3e..cbe882d 100644
--- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
@@ -19,6 +19,7 @@ package kafka.api
 import java.util.Properties
 
 import kafka.utils.TestUtils
+import kafka.utils.Implicits._
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.errors.GroupAuthorizationException
@@ -58,7 +59,7 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest
{
     val consumer1 = consumers.head
 
     val consumer2Config = new Properties
-    consumer2Config.putAll(consumerConfig)
+    consumer2Config ++= consumerConfig
     // consumer2 retrieves its credentials from the static JAAS configuration, so we test
also this path
     consumer2Config.remove(SaslConfigs.SASL_JAAS_CONFIG)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index 03afc9e..c2b5993 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -18,7 +18,7 @@ import kafka.security.auth.{All, Allow, Alter, AlterConfigs, Authorizer,
Cluster
 import org.apache.kafka.common.protocol.SecurityProtocol
 import kafka.server.KafkaConfig
 import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
-import org.apache.kafka.clients.admin.{AdminClient, CreateAclsOptions, DeleteAclsOptions,
KafkaAdminClient}
+import org.apache.kafka.clients.admin.{AdminClient, CreateAclsOptions, DeleteAclsOptions}
 import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding,
AclBindingFilter, AclOperation, AclPermissionType}
 import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException}
 import org.apache.kafka.common.resource.{Resource, ResourceFilter, ResourceType}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
index 4d879d0..40ec293 100644
--- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -26,6 +26,7 @@ import kafka.api.SaslSetup
 import kafka.coordinator.group.OffsetConfig
 import kafka.utils.JaasTestUtils.JaasSection
 import kafka.utils.TestUtils
+import kafka.utils.Implicits._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
@@ -84,7 +85,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends
ZooKeep
       props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(","))
       props.put(KafkaConfig.SaslKerberosServiceNameProp, "kafka")
 
-      props.putAll(TestUtils.sslConfigs(Mode.SERVER, false, Some(trustStoreFile), s"server$brokerId"))
+      props ++= TestUtils.sslConfigs(Mode.SERVER, false, Some(trustStoreFile), s"server$brokerId")
 
       // set listener-specific configs and set an invalid path for the global config to verify
that the overrides work
       Seq(SecureInternal, SecureExternal).foreach { listenerName =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index 8a26cac..dd6c951 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -169,13 +169,12 @@ object ReplicationQuotasTestRig {
 
     def logOutput(config: ExperimentDef, replicas: Map[Int, Seq[Int]], newAssignment: Map[TopicAndPartition,
Seq[Int]]): Unit = {
       val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName)
-      val existing = zkUtils.getReplicaAssignmentForTopics(newAssignment.map(_._1.topic).toSeq)
 
       //Long stats
       println("The replicas are " + replicas.toSeq.sortBy(_._1).map("\n" + _))
       println("This is the current replica assignment:\n" + actual.toSeq)
       println("proposed assignment is: \n" + newAssignment)
-      println("This is the assigment we eneded up with" + actual)
+      println("This is the assignment we ended up with" + actual)
 
       //Test Stats
       println(s"numBrokers: ${config.brokers}")

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index e05f29d..16325ee 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -184,7 +184,7 @@ object TestLinearWriteSpeed {
     def write(): Int = {
       buffer.put(content)
       content.rewind()
-      content.limit
+      content.limit()
     }
     def close() {
       raf.close()
@@ -198,7 +198,7 @@ object TestLinearWriteSpeed {
     def write(): Int = {
       channel.write(content)
       content.rewind()
-      content.limit
+      content.limit()
     }
     def close() {
       raf.close()

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index 445ee09..af77c67 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -24,7 +24,7 @@ import kafka.server.{ConfigEntityName, QuotaId}
 import kafka.utils.{Logging, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
 
-import org.apache.kafka.common.security.scram.{ScramCredential, ScramCredentialUtils, ScramMechanism}
+import org.apache.kafka.common.security.scram.ScramCredentialUtils
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index 09b1e75..c36400b 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.consumer
 
-import java.util.{Collections, Properties}
+import java.util.Properties
 
 import org.junit.Assert._
 import kafka.common.MessageStreamsExistException
@@ -28,7 +28,6 @@ import kafka.serializer._
 import kafka.server._
 import kafka.utils.TestUtils._
 import kafka.utils._
-import org.I0Itec.zkclient.ZkClient
 import org.apache.log4j.{Level, Logger}
 import org.junit.{Test, After, Before}
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 9e7fb13..e7fbb83 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -422,12 +422,12 @@ class TransactionMarkerChannelManagerTest {
 
   @Test
   def shouldCreateMetricsOnStarting(): Unit = {
-    val metrics = Metrics.defaultRegistry.allMetrics
+    val metrics = Metrics.defaultRegistry.allMetrics.asScala
 
-    assertEquals(1, Metrics.defaultRegistry.allMetrics.asScala
+    assertEquals(1, metrics
       .filterKeys(_.getMBeanName == "kafka.coordinator.transaction:type=TransactionMarkerChannelManager,name=UnknownDestinationQueueSize")
       .size)
-    assertEquals(1, Metrics.defaultRegistry.allMetrics.asScala
+    assertEquals(1, metrics
       .filterKeys(_.getMBeanName == "kafka.coordinator.transaction:type=TransactionMarkerChannelManager,name=LogAppendRetryQueueSize")
       .size)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index 34baf89..bff2700 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -22,6 +22,7 @@ import java.util.Properties
 
 import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 import kafka.utils.{MockTime, Pool, TestUtils}
+import kafka.utils.Implicits._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Utils
 import org.junit.After
@@ -66,7 +67,7 @@ abstract class AbstractLogCleanerIntegrationTest {
     props.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float)
     props.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
     props.put(LogConfig.MinCompactionLagMsProp, compactionLag: java.lang.Long)
-    props.putAll(propertyOverrides)
+    props ++= propertyOverrides
     props
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
index 5e22433..00f9dc9 100644
--- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
@@ -104,7 +104,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
       val emptyMessageList : List[Message] = Nil
       val emptyMessageSet = new ByteBufferMessageSet(NoCompressionCodec, emptyMessageList:
_*)
       val regularMessgeSet = new ByteBufferMessageSet(NoCompressionCodec, messageList: _*)
-      val buffer = ByteBuffer.allocate(emptyMessageSet.buffer.limit + regularMessgeSet.buffer.limit)
+      val buffer = ByteBuffer.allocate(emptyMessageSet.buffer.limit() + regularMessgeSet.buffer.limit())
       buffer.put(emptyMessageSet.buffer)
       buffer.put(regularMessgeSet.buffer)
       buffer.rewind
@@ -122,7 +122,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
       val emptyMessageList : List[Message] = Nil
       val emptyMessageSet = new ByteBufferMessageSet(DefaultCompressionCodec, emptyMessageList:
_*)
       val regularMessgeSet = new ByteBufferMessageSet(DefaultCompressionCodec, messageList:
_*)
-      val buffer = ByteBuffer.allocate(emptyMessageSet.buffer.limit + regularMessgeSet.buffer.limit)
+      val buffer = ByteBuffer.allocate(emptyMessageSet.buffer.limit() + regularMessgeSet.buffer.limit())
       buffer.put(emptyMessageSet.buffer)
       buffer.put(regularMessgeSet.buffer)
       buffer.rewind

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ad641c0..d141a26 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -38,7 +38,8 @@ import kafka.security.auth.{Acl, Authorizer, Resource}
 import kafka.serializer.{DefaultEncoder, Encoder, StringEncoder}
 import kafka.server._
 import kafka.server.checkpoints.OffsetCheckpointFile
-import kafka.utils.ZkUtils._
+import ZkUtils._
+import Implicits._
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, OffsetAndMetadata,
RangeAssignor}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
@@ -252,10 +253,10 @@ object TestUtils extends Logging {
     rack.foreach(props.put(KafkaConfig.RackProp, _))
 
     if (protocolAndPorts.exists { case (protocol, _) => usesSslTransportLayer(protocol)
})
-      props.putAll(sslConfigs(Mode.SERVER, false, trustStoreFile, s"server$nodeId"))
+      props ++= sslConfigs(Mode.SERVER, false, trustStoreFile, s"server$nodeId")
 
     if (protocolAndPorts.exists { case (protocol, _) => usesSaslAuthentication(protocol)
})
-      props.putAll(JaasTestUtils.saslConfigs(saslProperties))
+      props ++= JaasTestUtils.saslConfigs(saslProperties)
 
     interBrokerSecurityProtocol.foreach { protocol =>
       props.put(KafkaConfig.InterBrokerSecurityProtocolProp, protocol.name)
@@ -388,9 +389,9 @@ object TestUtils extends Logging {
    * Check that the buffer content from buffer.position() to buffer.limit() is equal
    */
   def checkEquals(b1: ByteBuffer, b2: ByteBuffer) {
-    assertEquals("Buffers should have equal length", b1.limit - b1.position, b2.limit - b2.position)
-    for(i <- 0 until b1.limit - b1.position)
-      assertEquals("byte " + i + " byte not equal.", b1.get(b1.position + i), b2.get(b1.position
+ i))
+    assertEquals("Buffers should have equal length", b1.limit() - b1.position(), b2.limit()
- b2.position())
+    for(i <- 0 until b1.limit() - b1.position())
+      assertEquals("byte " + i + " byte not equal.", b1.get(b1.position() + i), b2.get(b1.position()
+ i))
   }
 
   /**
@@ -484,8 +485,8 @@ object TestUtils extends Logging {
    */
   def hexString(buffer: ByteBuffer): String = {
     val builder = new StringBuilder("0x")
-    for(i <- 0 until buffer.limit)
-      builder.append(String.format("%x", Integer.valueOf(buffer.get(buffer.position + i))))
+    for(i <- 0 until buffer.limit())
+      builder.append(String.format("%x", Integer.valueOf(buffer.get(buffer.position() + i))))
     builder.toString
   }
 
@@ -503,7 +504,7 @@ object TestUtils extends Logging {
 
     //override any explicitly specified properties
     if (producerProps != null)
-      props.putAll(producerProps)
+      props ++= producerProps
 
     props.put("serializer.class", encoder)
     props.put("key.serializer.class", keyEncoder)
@@ -518,10 +519,10 @@ object TestUtils extends Logging {
                               saslProperties: Option[Properties]): Properties = {
     val props = new Properties
     if (usesSslTransportLayer(securityProtocol))
-      props.putAll(sslConfigs(mode, securityProtocol == SecurityProtocol.SSL, trustStoreFile,
certAlias))
+      props ++= sslConfigs(mode, securityProtocol == SecurityProtocol.SSL, trustStoreFile,
certAlias)
 
     if (usesSaslAuthentication(securityProtocol))
-      props.putAll(JaasTestUtils.saslConfigs(saslProperties))
+      props ++= JaasTestUtils.saslConfigs(saslProperties)
     props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name)
     props
   }
@@ -572,7 +573,7 @@ object TestUtils extends Logging {
      * SSL client auth fails.
      */
     if (!producerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG))
-      producerProps.putAll(producerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties))
+      producerProps ++= producerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties)
 
     new KafkaProducer[K, V](producerProps, keySerializer, valueSerializer)
   }
@@ -633,7 +634,7 @@ object TestUtils extends Logging {
      * SSL client auth fails.
      */
     if(!consumerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG))
-      consumerProps.putAll(consumerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties))
+      consumerProps ++= consumerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties)
 
     new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps)
   }
@@ -1208,7 +1209,7 @@ object TestUtils extends Logging {
 
   def copyOf(props: Properties): Properties = {
     val copy = new Properties()
-    copy.putAll(props)
+    copy ++= props
     copy
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
index 22a0a16..461767a 100755
--- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
@@ -28,7 +28,7 @@ import org.junit.Assert._
 import kafka.common.KafkaException
 import kafka.utils.CoreUtils.inLock
 import org.junit.Test
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{Base64, Utils}
 
 class UtilsTest extends JUnitSuite {
 
@@ -157,13 +157,15 @@ class UtilsTest extends JUnitSuite {
   def testUrlSafeBase64EncodeUUID() {
 
     // Test a UUID that has no + or / characters in base64 encoding [a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46
->(base64)-> oUm0owbhS0moy4qcSln6Rg==]
-    val clusterId1 = CoreUtils.urlSafeBase64EncodeNoPadding(CoreUtils.getBytesFromUuid(UUID.fromString("a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46")))
+    val clusterId1 = Base64.urlEncoderNoPadding.encodeToString(CoreUtils.getBytesFromUuid(UUID.fromString(
+      "a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46")))
     assertEquals(clusterId1, "oUm0owbhS0moy4qcSln6Rg")
     assertEquals(clusterId1.length, 22)
     assertTrue(clusterIdPattern.matcher(clusterId1).matches())
 
     // Test a UUID that has + or / characters in base64 encoding [d418ec02-277e-4853-81e6-afe30259daec
->(base64)-> 1BjsAid+SFOB5q/jAlna7A==]
-    val clusterId2 = CoreUtils.urlSafeBase64EncodeNoPadding(CoreUtils.getBytesFromUuid(UUID.fromString("d418ec02-277e-4853-81e6-afe30259daec")))
+    val clusterId2 = Base64.urlEncoderNoPadding.encodeToString(CoreUtils.getBytesFromUuid(UUID.fromString(
+      "d418ec02-277e-4853-81e6-afe30259daec")))
     assertEquals(clusterId2, "1BjsAid-SFOB5q_jAlna7A")
     assertEquals(clusterId2.length, 22)
     assertTrue(clusterIdPattern.matcher(clusterId2).matches())


Mime
View raw message