kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [1/3] kafka git commit: KAFKA-2929: Migrate duplicate error mapping functionality
Date Thu, 07 Jan 2016 05:25:24 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 991cafe4d -> a9ff3f2ec


http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index 5f0e650..dead0eb 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -23,14 +23,15 @@ import kafka.common._
 import kafka.message._
 import kafka.log._
 import kafka.utils._
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException
 
 /**
- * A stress test that instantiates a log and then runs continual appends against it from
one thread and continual reads against it 
+ * A stress test that instantiates a log and then runs continual appends against it from
one thread and continual reads against it
  * from another thread and checks a few basic assertions until the user kills the process.
  */
 object StressTestLog {
   val running = new AtomicBoolean(true)
-  
+
   def main(args: Array[String]) {
     val dir = TestUtils.randomPartitionLogDir(TestUtils.tempDir())
     val time = new MockTime
@@ -48,7 +49,7 @@ object StressTestLog {
     writer.start()
     val reader = new ReaderThread(log)
     reader.start()
-    
+
     Runtime.getRuntime().addShutdownHook(new Thread() {
       override def run() = {
         running.set(false)
@@ -57,13 +58,13 @@ object StressTestLog {
         CoreUtils.rm(dir)
       }
     })
-    
+
     while(running.get) {
       println("Reader offset = %d, writer offset = %d".format(reader.offset, writer.offset))
       Thread.sleep(1000)
     }
   }
-  
+
   abstract class WorkerThread extends Thread {
     override def run() {
       try {
@@ -71,7 +72,7 @@ object StressTestLog {
         while(running.get)
           work()
       } catch {
-        case e: Exception => 
+        case e: Exception =>
           e.printStackTrace()
           running.set(false)
       }
@@ -79,7 +80,7 @@ object StressTestLog {
     }
     def work()
   }
-  
+
   class WriterThread(val log: Log) extends WorkerThread {
     @volatile var offset = 0
     override def work() {
@@ -90,7 +91,7 @@ object StressTestLog {
         Thread.sleep(500)
     }
   }
-  
+
   class ReaderThread(val log: Log) extends WorkerThread {
     @volatile var offset = 0
     override def work() {
@@ -109,4 +110,4 @@ object StressTestLog {
       }
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index 86e6877..5c2f1ae 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -17,14 +17,13 @@
 
 package other.kafka
 
-import org.I0Itec.zkclient.ZkClient
 import kafka.api._
 import kafka.utils.{ZkUtils, ShutdownableThread}
-import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.protocol.Errors
 import scala.collection._
 import kafka.client.ClientUtils
 import joptsimple.OptionParser
-import kafka.common.{ErrorMapping, OffsetAndMetadata, TopicAndPartition}
+import kafka.common.{OffsetAndMetadata, TopicAndPartition}
 import kafka.network.BlockingChannel
 import scala.util.Random
 import java.io.IOException
@@ -90,7 +89,7 @@ object TestOffsetManager {
         numCommits.getAndIncrement
         commitTimer.time {
           val response = OffsetCommitResponse.readFrom(offsetsChannel.receive().payload())
-          if (response.commitStatus.exists(_._2 != ErrorMapping.NoError)) numErrors.getAndIncrement
+          if (response.commitStatus.exists(_._2 != Errors.NONE.code)) numErrors.getAndIncrement
         }
         offset += 1
       }
@@ -153,7 +152,7 @@ object TestOffsetManager {
 
           fetchTimer.time {
             val response = OffsetFetchResponse.readFrom(channel.receive().payload())
-            if (response.requestInfo.exists(_._2.error != ErrorMapping.NoError)) {
+            if (response.requestInfo.exists(_._2.error != Errors.NONE.code)) {
               numErrors.getAndIncrement
             }
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 47afad1..3aa971b 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -17,6 +17,7 @@
 package kafka.admin
 
 import junit.framework.Assert._
+import org.apache.kafka.common.errors.InvalidTopicException
 import org.apache.kafka.common.metrics.Quota
 import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.Test
@@ -25,7 +26,7 @@ import kafka.utils._
 import kafka.log._
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.{Logging, ZkUtils, TestUtils}
-import kafka.common.{InvalidTopicException, TopicExistsException, TopicAndPartition}
+import kafka.common.{TopicExistsException, TopicAndPartition}
 import kafka.server.{ConfigType, KafkaServer, KafkaConfig}
 import java.io.File
 import TestUtils._

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 51d6c91..cfbca00 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -17,8 +17,8 @@
 
 package kafka.api
 
-import kafka.cluster.{EndPoint, Broker}
-import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError}
+import kafka.cluster.{BrokerEndPoint, EndPoint, Broker}
+import kafka.common.{OffsetAndMetadata, OffsetMetadataAndError}
 import kafka.common._
 import kafka.message.{Message, ByteBufferMessageSet}
 import kafka.utils.SystemTime
@@ -28,7 +28,7 @@ import kafka.common.TopicAndPartition
 
 import java.nio.ByteBuffer
 
-import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
 import org.junit._
 import org.scalatest.junit.JUnitSuite
 import org.junit.Assert._
@@ -141,7 +141,7 @@ object SerializationTestUtils {
 
   def createTestOffsetResponse: OffsetResponse = {
     new OffsetResponse(0, collection.immutable.Map(
-      TopicAndPartition(topic1, 1) -> PartitionOffsetsResponse(ErrorMapping.NoError, Seq(1000l,
2000l, 3000l, 4000l)))
+      TopicAndPartition(topic1, 1) -> PartitionOffsetsResponse(Errors.NONE.code, Seq(1000l,
2000l, 3000l, 4000l)))
     )
   }
 
@@ -184,8 +184,8 @@ object SerializationTestUtils {
   }
 
   def createTestOffsetCommitResponse: OffsetCommitResponse = {
-    new OffsetCommitResponse(collection.immutable.Map(TopicAndPartition(topic1, 0) ->
ErrorMapping.NoError,
-                                 TopicAndPartition(topic1, 1) -> ErrorMapping.NoError))
+    new OffsetCommitResponse(collection.immutable.Map(TopicAndPartition(topic1, 0) ->
Errors.NONE.code,
+                                 TopicAndPartition(topic1, 1) -> Errors.NONE.code))
   }
 
   def createTestOffsetFetchRequest: OffsetFetchRequest = {
@@ -197,8 +197,8 @@ object SerializationTestUtils {
 
   def createTestOffsetFetchResponse: OffsetFetchResponse = {
     new OffsetFetchResponse(collection.immutable.Map(
-      TopicAndPartition(topic1, 0) -> OffsetMetadataAndError(42L, "some metadata", ErrorMapping.NoError),
-      TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(100L, OffsetMetadata.NoMetadata,
ErrorMapping.UnknownTopicOrPartitionCode)
+      TopicAndPartition(topic1, 0) -> OffsetMetadataAndError(42L, "some metadata", Errors.NONE.code),
+      TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(100L, OffsetMetadata.NoMetadata,
Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
     ))
   }
 
@@ -207,7 +207,7 @@ object SerializationTestUtils {
   }
 
   def createConsumerMetadataResponse: GroupCoordinatorResponse = {
-    GroupCoordinatorResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)),
ErrorMapping.NoError, 0)
+    GroupCoordinatorResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)),
Errors.NONE.code, 0)
   }
 
   def createUpdateMetadataRequest(versionId: Short): UpdateMetadataRequest = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/test/scala/unit/kafka/common/TopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/common/TopicTest.scala b/core/src/test/scala/unit/kafka/common/TopicTest.scala
index 0482bf1..da6083a 100644
--- a/core/src/test/scala/unit/kafka/common/TopicTest.scala
+++ b/core/src/test/scala/unit/kafka/common/TopicTest.scala
@@ -42,7 +42,7 @@ class TopicTest {
         fail("Should throw InvalidTopicException.")
       }
       catch {
-        case e: InvalidTopicException => "This is good."
+        case e: org.apache.kafka.common.errors.InvalidTopicException => // This is good.
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
index a621efc..7a59eba 100644
--- a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
@@ -24,12 +24,11 @@ import kafka.admin.AdminUtils
 import kafka.api.{TopicMetadataRequest, TopicMetadataResponse}
 import kafka.client.ClientUtils
 import kafka.cluster.{Broker, BrokerEndPoint}
-import kafka.common.ErrorMapping
 import kafka.server.{KafkaConfig, KafkaServer, NotRunning}
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
 import org.junit.Assert._
 import org.junit.{Test, After, Before}
 
@@ -89,8 +88,8 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
 
     val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
       2000,0).topicsMetadata
-    assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
-    assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)
+    assertEquals(Errors.NONE.code, topicsMetadata.head.errorCode)
+    assertEquals(Errors.NONE.code, topicsMetadata.head.partitionsMetadata.head.errorCode)
     assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
     assertEquals("Expecting metadata for the test topic", "test", topicsMetadata.head.topic)
     val partitionMetadata = topicsMetadata.head.partitionsMetadata
@@ -110,10 +109,10 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
     // issue metadata request with empty list of topics
     val topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokerEndPoints, "TopicMetadataTest-testGetAllTopicMetadata",
       2000, 0).topicsMetadata
-    assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
+    assertEquals(Errors.NONE.code, topicsMetadata.head.errorCode)
     assertEquals(2, topicsMetadata.size)
-    assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)
-    assertEquals(ErrorMapping.NoError, topicsMetadata.last.partitionsMetadata.head.errorCode)
+    assertEquals(Errors.NONE.code, topicsMetadata.head.partitionsMetadata.head.errorCode)
+    assertEquals(Errors.NONE.code, topicsMetadata.last.partitionsMetadata.head.errorCode)
     val partitionMetadataTopic1 = topicsMetadata.head.partitionsMetadata
     val partitionMetadataTopic2 = topicsMetadata.last.partitionsMetadata
     assertEquals("Expecting metadata for 1 partition", 1, partitionMetadataTopic1.size)
@@ -130,7 +129,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
     val topic = "testAutoCreateTopic"
     var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testAutoCreateTopic",
       2000,0).topicsMetadata
-    assertEquals(ErrorMapping.LeaderNotAvailableCode, topicsMetadata.head.errorCode)
+    assertEquals(Errors.LEADER_NOT_AVAILABLE.code, topicsMetadata.head.errorCode)
     assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
     assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic)
     assertEquals(0, topicsMetadata.head.partitionsMetadata.size)
@@ -142,8 +141,8 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
     // retry the metadata for the auto created topic
     topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
       2000,0).topicsMetadata
-    assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
-    assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)
+    assertEquals(Errors.NONE.code, topicsMetadata.head.errorCode)
+    assertEquals(Errors.NONE.code, topicsMetadata.head.partitionsMetadata.head.errorCode)
     val partitionMetadata = topicsMetadata.head.partitionsMetadata
     assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
     assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
@@ -160,9 +159,9 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
       2000,0).topicsMetadata
     assertEquals("Expecting metadata for 2 topics", 2, topicsMetadata.size)
     assertEquals("Expecting metadata for topic1", topic1, topicsMetadata.head.topic)
-    assertEquals(ErrorMapping.LeaderNotAvailableCode, topicsMetadata.head.errorCode)
+    assertEquals(Errors.LEADER_NOT_AVAILABLE.code, topicsMetadata.head.errorCode)
     assertEquals("Expecting metadata for topic2", topic2, topicsMetadata(1).topic)
-    assertEquals("Expecting InvalidTopicCode for topic2 metadata", ErrorMapping.InvalidTopicCode,
topicsMetadata(1).errorCode)
+    assertEquals("Expecting InvalidTopicCode for topic2 metadata", Errors.INVALID_TOPIC_EXCEPTION.code,
topicsMetadata(1).errorCode)
 
     // wait for leader to be elected
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 0)
@@ -171,8 +170,8 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness {
     // retry the metadata for the first auto created topic
     topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
       2000,0).topicsMetadata
-    assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
-    assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)
+    assertEquals(Errors.NONE.code, topicsMetadata.head.errorCode)
+    assertEquals(Errors.NONE.code, topicsMetadata.head.partitionsMetadata.head.errorCode)
     var partitionMetadata = topicsMetadata.head.partitionsMetadata
     assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
     assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 816354f..46bfbed 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -23,6 +23,7 @@ import java.util.Properties
 import kafka.common._
 import kafka.server.OffsetCheckpoint
 import kafka.utils._
+import org.apache.kafka.common.errors.OffsetOutOfRangeException
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
@@ -56,7 +57,7 @@ class LogManagerTest {
     CoreUtils.rm(logDir)
     logManager.logDirs.foreach(CoreUtils.rm(_))
   }
-  
+
   /**
    * Test that getOrCreateLog on a non-existent log creates a new log and that we can append
to the new log.
    */
@@ -92,9 +93,9 @@ class LogManagerTest {
       offset = info.lastOffset
     }
     assertTrue("There should be more than one segment now.", log.numberOfSegments > 1)
-    
+
     log.logSegments.foreach(_.log.file.setLastModified(time.milliseconds))
-    
+
     time.sleep(maxLogAgeMs + 1)
     assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments)
     time.sleep(log.config.fileDeleteDelayMs + 1)
@@ -177,15 +178,15 @@ class LogManagerTest {
     time.sleep(logManager.InitialTaskDelayMs)
     assertTrue("Time based flush should have been triggered triggered", lastFlush != log.lastFlushTime)
   }
-  
+
   /**
    * Test that new logs that are created are assigned to the least loaded log directory
    */
   @Test
   def testLeastLoadedAssignment() {
     // create a log manager with multiple data directories
-    val dirs = Array(TestUtils.tempDir(), 
-                     TestUtils.tempDir(), 
+    val dirs = Array(TestUtils.tempDir(),
+                     TestUtils.tempDir(),
                      TestUtils.tempDir())
     logManager.shutdown()
     logManager = createLogManager()
@@ -198,7 +199,7 @@ class LogManagerTest {
       assertTrue("Load should balance evenly", counts.max <= counts.min + 1)
     }
   }
-  
+
   /**
    * Test that it is not possible to open two log managers using the same data directory
    */
@@ -208,7 +209,7 @@ class LogManagerTest {
       createLogManager()
       fail("Should not be able to create a second log manager instance with the same data
directory")
     } catch {
-      case e: KafkaException => // this is good 
+      case e: KafkaException => // this is good
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index f4427b9..47908e7 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -20,21 +20,21 @@ package kafka.log
 import java.io._
 import java.util.Properties
 import java.util.concurrent.atomic._
+import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordBatchTooLargeException,
RecordTooLargeException, CorruptRecordException}
 import org.junit.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.message._
-import kafka.common.{MessageSizeTooLargeException, OffsetOutOfRangeException, MessageSetSizeTooLargeException}
 import kafka.utils._
 import kafka.server.KafkaConfig
 
 class LogTest extends JUnitSuite {
-  
+
   val tmpDir = TestUtils.tempDir()
   val logDir = TestUtils.randomPartitionLogDir(tmpDir)
   val time = new MockTime(0)
   var config: KafkaConfig = null
-  val logConfig = LogConfig()  
+  val logConfig = LogConfig()
 
   @Before
   def setUp() {
@@ -46,7 +46,7 @@ class LogTest extends JUnitSuite {
   def tearDown() {
     CoreUtils.rm(tmpDir)
   }
-  
+
   def createEmptyLogs(dir: File, offsets: Int*) {
     for(offset <- offsets) {
       Log.logFilename(dir, offset).createNewFile()
@@ -347,9 +347,9 @@ class LogTest extends JUnitSuite {
 
     try {
       log.append(messageSet)
-      fail("message set should throw MessageSetSizeTooLargeException.")
+      fail("message set should throw RecordBatchTooLargeException.")
     } catch {
-      case e: MessageSetSizeTooLargeException => // this is good
+      case e: RecordBatchTooLargeException => // this is good
     }
   }
 
@@ -376,19 +376,19 @@ class LogTest extends JUnitSuite {
       log.append(messageSetWithUnkeyedMessage)
       fail("Compacted topics cannot accept a message without a key.")
     } catch {
-      case e: InvalidMessageException => // this is good
+      case e: CorruptRecordException => // this is good
     }
     try {
       log.append(messageSetWithOneUnkeyedMessage)
       fail("Compacted topics cannot accept a message without a key.")
     } catch {
-      case e: InvalidMessageException => // this is good
+      case e: CorruptRecordException => // this is good
     }
     try {
       log.append(messageSetWithCompressedUnkeyedMessage)
       fail("Compacted topics cannot accept a message without a key.")
     } catch {
-      case e: InvalidMessageException => // this is good
+      case e: CorruptRecordException => // this is good
     }
 
     // the following should succeed without any InvalidMessageException
@@ -419,7 +419,7 @@ class LogTest extends JUnitSuite {
       log.append(second)
       fail("Second message set should throw MessageSizeTooLargeException.")
     } catch {
-      case e: MessageSizeTooLargeException => // this is good
+      case e: RecordTooLargeException => // this is good
     }
   }
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index b54f30e..60d2588 100755
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -19,6 +19,7 @@ package kafka.producer
 
 import java.util.Properties
 import java.util.concurrent.LinkedBlockingQueue
+import org.apache.kafka.common.protocol.Errors
 import org.junit.Assert._
 import org.easymock.EasyMock
 import org.junit.Test
@@ -378,11 +379,11 @@ class AsyncProducerTest {
     val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs),
acks = 1, correlationId = 11)
     val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs),
acks = 1, correlationId = 17)
     val response1 = ProducerResponse(0,
-      Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NotLeaderForPartitionCode.toShort,
0L)),
-          (TopicAndPartition("topic1", 1), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
+      Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(Errors.NOT_LEADER_FOR_PARTITION.code,
0L)),
+          (TopicAndPartition("topic1", 1), ProducerResponseStatus(Errors.NONE.code, 0L))))
     val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), acks = 1, correlationId
= 21)
     val response2 = ProducerResponse(0,
-      Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
+      Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(Errors.NONE.code, 0L))))
     val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
     // don't care about config mock
     EasyMock.expect(mockSyncProducer.config).andReturn(EasyMock.anyObject()).anyTimes()
@@ -463,11 +464,11 @@ class AsyncProducerTest {
     val broker1 = new BrokerEndPoint(brokerId, brokerHost, brokerPort)
     new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1))))
   }
-  
+
   def messagesToSet(messages: Seq[String]): ByteBufferMessageSet = {
     new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => new Message(m.getBytes)):
_*)
   }
-  
+
   def messagesToSet(key: Array[Byte], messages: Seq[Array[Byte]]): ByteBufferMessageSet =
{
     new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => new Message(key = key,
bytes = m)): _*)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 79b2603..f356a69 100755
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -22,13 +22,14 @@ import java.util.Properties
 
 import kafka.admin.AdminUtils
 import kafka.api.FetchRequestBuilder
-import kafka.common.{ErrorMapping, FailedToSendMessageException}
+import kafka.common.FailedToSendMessageException
 import kafka.consumer.SimpleConsumer
 import kafka.message.Message
 import kafka.serializer.StringEncoder
 import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
 import kafka.utils._
 import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.protocol.Errors
 import org.apache.log4j.{Level, Logger}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
@@ -333,7 +334,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
       // create topic
       AdminUtils.createTopic(zkUtils, "new-topic", 2, 1)
       TestUtils.waitUntilTrue(() =>
-        AdminUtils.fetchTopicMetadataFromZk("new-topic", zkUtils).errorCode != ErrorMapping.UnknownTopicOrPartitionCode,
+        AdminUtils.fetchTopicMetadataFromZk("new-topic", zkUtils).errorCode != Errors.UNKNOWN_TOPIC_OR_PARTITION.code,
         "Topic new-topic not created after timeout",
         waitTime = zookeeper.tickTime)
       TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "new-topic", 0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 8e15ef8..6e7b964 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -23,12 +23,12 @@ import java.util.Properties
 import org.junit.Assert
 import kafka.admin.AdminUtils
 import kafka.api.ProducerResponseStatus
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.common.TopicAndPartition
 import kafka.integration.KafkaServerTestHarness
 import kafka.message._
 import kafka.server.KafkaConfig
 import kafka.utils._
-import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
 import org.junit.Test
 
 class SyncProducerTest extends KafkaServerTestHarness {
@@ -103,8 +103,8 @@ class SyncProducerTest extends KafkaServerTestHarness {
     val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages
= message1)
     val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1, acks =
1))
 
-    Assert.assertEquals(1, response1.status.count(_._2.error != ErrorMapping.NoError))
-    Assert.assertEquals(ErrorMapping.MessageSizeTooLargeCode, response1.status(TopicAndPartition("test",
0)).error)
+    Assert.assertEquals(1, response1.status.count(_._2.error != Errors.NONE.code))
+    Assert.assertEquals(Errors.MESSAGE_TOO_LARGE.code, response1.status(TopicAndPartition("test",
0)).error)
     Assert.assertEquals(-1L, response1.status(TopicAndPartition("test", 0)).offset)
 
     val safeSize = configs(0).messageMaxBytes - Message.MessageOverhead - MessageSet.LogOverhead
- 1
@@ -112,8 +112,8 @@ class SyncProducerTest extends KafkaServerTestHarness {
     val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages
= message2)
     val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2, acks =
1))
 
-    Assert.assertEquals(1, response1.status.count(_._2.error != ErrorMapping.NoError))
-    Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicAndPartition("test",
0)).error)
+    Assert.assertEquals(1, response1.status.count(_._2.error != Errors.NONE.code))
+    Assert.assertEquals(Errors.NONE.code, response2.status(TopicAndPartition("test", 0)).error)
     Assert.assertEquals(0, response2.status(TopicAndPartition("test", 0)).offset)
   }
 
@@ -162,7 +162,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
     Assert.assertEquals(3, response.status.size)
     response.status.values.foreach {
       case ProducerResponseStatus(error, nextOffset) =>
-        Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, error)
+        Assert.assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, error)
         Assert.assertEquals(-1L, nextOffset)
     }
 
@@ -178,13 +178,13 @@ class SyncProducerTest extends KafkaServerTestHarness {
     Assert.assertEquals(3, response2.status.size)
 
     // the first and last message should have been accepted by broker
-    Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicAndPartition("topic1",
0)).error)
-    Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicAndPartition("topic3",
0)).error)
+    Assert.assertEquals(Errors.NONE.code, response2.status(TopicAndPartition("topic1", 0)).error)
+    Assert.assertEquals(Errors.NONE.code, response2.status(TopicAndPartition("topic3", 0)).error)
     Assert.assertEquals(0, response2.status(TopicAndPartition("topic1", 0)).offset)
     Assert.assertEquals(0, response2.status(TopicAndPartition("topic3", 0)).offset)
 
     // the middle message should have been rejected because broker doesn't lead partition
-    Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort,
+    Assert.assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code,
                         response2.status(TopicAndPartition("topic2", 0)).error)
     Assert.assertEquals(-1, response2.status(TopicAndPartition("topic2", 0)).offset)
   }
@@ -250,6 +250,6 @@ class SyncProducerTest extends KafkaServerTestHarness {
     val response = producer.send(TestUtils.produceRequest(topicName, 0,
       new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)),-1))
 
-    Assert.assertEquals(ErrorMapping.NotEnoughReplicasCode, response.status(TopicAndPartition(topicName,
0)).error)
+    Assert.assertEquals(Errors.NOT_ENOUGH_REPLICAS.code, response.status(TopicAndPartition(topicName,
0)).error)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 1d741f2..704f776 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -21,17 +21,16 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.requests.LeaderAndIsrRequest.PartitionState
 
 import scala.collection.JavaConverters._
-import kafka.api.{PartitionStateInfo, LeaderAndIsr}
+import kafka.api.LeaderAndIsr
 import org.apache.kafka.common.requests.{LeaderAndIsrResponse, LeaderAndIsrRequest, AbstractRequestResponse}
 import org.junit.Assert._
-import kafka.utils.{TestUtils, ZkUtils, CoreUtils}
+import kafka.utils.{TestUtils, CoreUtils}
 import kafka.cluster.Broker
-import kafka.common.ErrorMapping
-import kafka.controller.{ControllerChannelManager, ControllerContext, LeaderIsrAndControllerEpoch}
+import kafka.controller.{ControllerChannelManager, ControllerContext}
 import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
+import org.apache.kafka.common.protocol.{Errors, ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.utils.SystemTime
 import org.junit.{Test, After, Before}
 
@@ -164,8 +163,8 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
 
   private def staleControllerEpochCallback(response: AbstractRequestResponse): Unit = {
     val leaderAndIsrResponse = response.asInstanceOf[LeaderAndIsrResponse]
-    staleControllerEpochDetected = leaderAndIsrResponse.errorCode match {
-      case ErrorMapping.StaleControllerEpochCode => true
+    staleControllerEpochDetected = Errors.forCode(leaderAndIsrResponse.errorCode) match {
+      case Errors.STALE_CONTROLLER_EPOCH => true
       case _ => false
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 3f89a8a..b6bc4fc 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
+ * 
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -19,6 +19,7 @@ package kafka.server
 
 import java.io.File
 import kafka.utils._
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
 import java.util.{Random, Properties}
@@ -28,13 +29,13 @@ import kafka.zk.ZooKeeperTestHarness
 import kafka.admin.AdminUtils
 import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
 import kafka.utils.TestUtils._
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.common.TopicAndPartition
 import org.junit.After
 import org.junit.Before
 import org.junit.Test
 
 class LogOffsetTest extends ZooKeeperTestHarness {
-  val random = new Random()
+  val random = new Random() 
   var logDir: File = null
   var topicLogDir: File = null
   var server: KafkaServer = null
@@ -67,7 +68,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val request = OffsetRequest(
       Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)))
     val offsetResponse = simpleConsumer.getOffsetsBefore(request)
-    assertEquals(ErrorMapping.UnknownTopicOrPartitionCode,
+    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code,
                  offsetResponse.partitionErrorAndOffsets(topicAndPartition).error)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ff3f2e/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 31f743b..1d5148b 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -19,10 +19,11 @@ package kafka.server
 
 import kafka.api.{GroupCoordinatorRequest, OffsetCommitRequest, OffsetFetchRequest}
 import kafka.consumer.SimpleConsumer
-import kafka.common.{OffsetMetadata, OffsetMetadataAndError, OffsetAndMetadata, ErrorMapping,
TopicAndPartition}
+import kafka.common.{OffsetMetadata, OffsetMetadataAndError, OffsetAndMetadata, TopicAndPartition}
 import kafka.utils._
 import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.protocol.Errors
 
 import org.junit.{After, Before, Test}
 
@@ -87,13 +88,13 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     val commitRequest = OffsetCommitRequest(group, immutable.Map(topicAndPartition ->
OffsetAndMetadata(offset = 42L)))
     val commitResponse = simpleConsumer.commitOffsets(commitRequest)
 
-    assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(topicAndPartition).get)
+    assertEquals(Errors.NONE.code, commitResponse.commitStatus.get(topicAndPartition).get)
 
     // Fetch it and verify
     val fetchRequest = OffsetFetchRequest(group, Seq(topicAndPartition))
     val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest)
 
-    assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(topicAndPartition).get.error)
+    assertEquals(Errors.NONE.code, fetchResponse.requestInfo.get(topicAndPartition).get.error)
     assertEquals(OffsetMetadata.NoMetadata, fetchResponse.requestInfo.get(topicAndPartition).get.metadata)
     assertEquals(42L, fetchResponse.requestInfo.get(topicAndPartition).get.offset)
 
@@ -104,13 +105,13 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     )))
     val commitResponse1 = simpleConsumer.commitOffsets(commitRequest1)
 
-    assertEquals(ErrorMapping.NoError, commitResponse1.commitStatus.get(topicAndPartition).get)
+    assertEquals(Errors.NONE.code, commitResponse1.commitStatus.get(topicAndPartition).get)
 
     // Fetch it and verify
     val fetchRequest1 = OffsetFetchRequest(group, Seq(topicAndPartition))
     val fetchResponse1 = simpleConsumer.fetchOffsets(fetchRequest1)
 
-    assertEquals(ErrorMapping.NoError, fetchResponse1.requestInfo.get(topicAndPartition).get.error)
+    assertEquals(Errors.NONE.code, fetchResponse1.requestInfo.get(topicAndPartition).get.error)
     assertEquals("some metadata", fetchResponse1.requestInfo.get(topicAndPartition).get.metadata)
     assertEquals(100L, fetchResponse1.requestInfo.get(topicAndPartition).get.offset)
 
@@ -143,10 +144,10 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
       TopicAndPartition(topic2, 1) -> OffsetAndMetadata(offset=45L)
     ))
     val commitResponse = simpleConsumer.commitOffsets(commitRequest)
-    assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic1,
0)).get)
-    assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic2,
0)).get)
-    assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic3,
0)).get)
-    assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic2,
1)).get)
+    assertEquals(Errors.NONE.code, commitResponse.commitStatus.get(TopicAndPartition(topic1,
0)).get)
+    assertEquals(Errors.NONE.code, commitResponse.commitStatus.get(TopicAndPartition(topic2,
0)).get)
+    assertEquals(Errors.NONE.code, commitResponse.commitStatus.get(TopicAndPartition(topic3,
0)).get)
+    assertEquals(Errors.NONE.code, commitResponse.commitStatus.get(TopicAndPartition(topic2,
1)).get)
 
     val fetchRequest = OffsetFetchRequest(group, Seq(
       TopicAndPartition(topic1, 0),
@@ -159,19 +160,19 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     ))
     val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest)
 
-    assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic1,
0)).get.error)
+    assertEquals(Errors.NONE.code, fetchResponse.requestInfo.get(TopicAndPartition(topic1,
0)).get.error)
 
-    assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2,
0)).get.error)
-    assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2,
1)).get.error)
+    assertEquals(Errors.NONE.code, fetchResponse.requestInfo.get(TopicAndPartition(topic2,
0)).get.error)
+    assertEquals(Errors.NONE.code, fetchResponse.requestInfo.get(TopicAndPartition(topic2,
1)).get.error)
 
-    assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3,
0)).get.error)
-    assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3,
1)).get.error)
+    assertEquals(Errors.NONE.code, fetchResponse.requestInfo.get(TopicAndPartition(topic3,
0)).get.error)
+    assertEquals(Errors.NONE.code, fetchResponse.requestInfo.get(TopicAndPartition(topic3,
1)).get.error)
     assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3,
1)).get)
 
-    assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic4,
0)).get.error)
+    assertEquals(Errors.NONE.code, fetchResponse.requestInfo.get(TopicAndPartition(topic4,
0)).get.error)
     assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4,
0)).get)
 
-    assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic5,
0)).get.error)
+    assertEquals(Errors.NONE.code, fetchResponse.requestInfo.get(TopicAndPartition(topic5,
0)).get.error)
     assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic5,
0)).get)
 
     assertEquals("metadata one", fetchResponse.requestInfo.get(TopicAndPartition(topic1,
0)).get.metadata)
@@ -206,7 +207,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     )))
     val commitResponse = simpleConsumer.commitOffsets(commitRequest)
 
-    assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(topicAndPartition).get)
+    assertEquals(Errors.NONE.code, commitResponse.commitStatus.get(topicAndPartition).get)
 
     val commitRequest1 = OffsetCommitRequest(group, immutable.Map(topicAndPartition ->
OffsetAndMetadata(
       offset=42L,
@@ -214,7 +215,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     )))
     val commitResponse1 = simpleConsumer.commitOffsets(commitRequest1)
 
-    assertEquals(ErrorMapping.OffsetMetadataTooLargeCode, commitResponse1.commitStatus.get(topicAndPartition).get)
+    assertEquals(Errors.OFFSET_METADATA_TOO_LARGE.code, commitResponse1.commitStatus.get(topicAndPartition).get)
   }
 
   @Test
@@ -233,7 +234,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
       requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(1L, "metadata")),
       versionId = 0
     )
-    assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest0).commitStatus.get(topicPartition).get)
+    assertEquals(Errors.NONE.code, simpleConsumer.commitOffsets(commitRequest0).commitStatus.get(topicPartition).get)
     assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
 
     // committed offset should exist with fetch version 0
@@ -247,7 +248,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
       requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(2L, "metadata",
-1L)),
       versionId = 1
     )
-    assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest1).commitStatus.get(topicPartition).get)
+    assertEquals(Errors.NONE.code, simpleConsumer.commitOffsets(commitRequest1).commitStatus.get(topicPartition).get)
     Thread.sleep(retentionCheckInterval * 2)
     assertEquals(2L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
 
@@ -258,7 +259,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
       requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(3L, "metadata",
SystemTime.milliseconds - 2*24*60*60*1000L)),
       versionId = 1
     )
-    assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest2).commitStatus.get(topicPartition).get)
+    assertEquals(Errors.NONE.code, simpleConsumer.commitOffsets(commitRequest2).commitStatus.get(topicPartition).get)
     Thread.sleep(retentionCheckInterval * 2)
     assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
 
@@ -270,7 +271,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
       versionId = 2,
       retentionMs = 1000 * 60 * 60L
     )
-    assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest3).commitStatus.get(topicPartition).get)
+    assertEquals(Errors.NONE.code, simpleConsumer.commitOffsets(commitRequest3).commitStatus.get(topicPartition).get)
     Thread.sleep(retentionCheckInterval * 2)
     assertEquals(4L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
 
@@ -282,7 +283,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
       versionId = 2,
       retentionMs = 0L
     )
-    assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest4).commitStatus.get(topicPartition).get)
+    assertEquals(Errors.NONE.code, simpleConsumer.commitOffsets(commitRequest4).commitStatus.get(topicPartition).get)
     Thread.sleep(retentionCheckInterval * 2)
     assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
 
@@ -303,7 +304,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     ))
     val commitResponse = simpleConsumer.commitOffsets(commitRequest)
 
-    assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, commitResponse.commitStatus.get(TopicAndPartition(topic1,
0)).get)
-    assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic2,
0)).get)
+    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, commitResponse.commitStatus.get(TopicAndPartition(topic1,
0)).get)
+    assertEquals(Errors.NONE.code, commitResponse.commitStatus.get(TopicAndPartition(topic2,
0)).get)
   }
 }


Mime
View raw message