kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-1910 Follow-up; Revert the no-offset-committed error code; reviewed by Joel Koshy
Date Tue, 07 Apr 2015 22:38:56 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 79f7cca85 -> 013cda2d7


KAFKA-1910 Follow-up; Revert the no-offset-committed error code; reviewed by Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/013cda2d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/013cda2d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/013cda2d

Branch: refs/heads/trunk
Commit: 013cda2d79e1b82755cf948b668f4fb40b4efe62
Parents: 79f7cca
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Tue Apr 7 15:38:36 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Apr 7 15:38:36 2015 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/Coordinator.java |  8 +++---
 .../apache/kafka/common/protocol/Errors.java    |  4 +--
 .../org/apache/kafka/common/record/Record.java  |  1 -
 .../consumer/internals/CoordinatorTest.java     |  2 +-
 .../main/scala/kafka/common/ErrorMapping.scala  |  4 +--
 .../common/NoOffsetsCommittedException.scala    | 27 --------------------
 6 files changed, 6 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/013cda2d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index 8d44814..e55ab11 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -234,16 +234,14 @@ public final class Coordinator {
                         coordinatorDead();
                         offsetsReady = false;
                         Utils.sleep(this.retryBackoffMs);
-                    } else if (data.errorCode == Errors.NO_OFFSETS_FETCHABLE.code()
-                            || data.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
{
+                    } else if (data.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
{
                         // just ignore this partition
-                        log.debug("No committed offset for partition " + tp);
+                        log.debug("Unknown topic or partition for " + tp);
                     } else {
                         throw new IllegalStateException("Unexpected error code " + data.errorCode
+ " while fetching offset");
                     }
                 } else if (data.offset >= 0) {
-                    // record the position with the offset (-1 seems to indicate no
-                    // such offset known)
+                    // record the position with the offset (-1 indicates no committed offset
to fetch)
                     offsets.put(tp, data.offset);
                 } else {
                     log.debug("No committed offset for partition " + tp);

http://git-wip-us.apache.org/repos/asf/kafka/blob/013cda2d/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index ce18a6c..36aa412 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -69,9 +69,7 @@ public enum Errors {
     INVALID_REQUIRED_ACKS(21,
             new InvalidRequiredAcksException("Produce request specified an invalid value
for required acks.")),
     ILLEGAL_GENERATION(22,
-            new ApiException("Specified consumer generation id is not valid.")),
-    NO_OFFSETS_FETCHABLE(23,
-            new ApiException("No offsets have been committed so far."));
+            new ApiException("Specified consumer generation id is not valid."));
 
     private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>,
Errors>();
     private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/013cda2d/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index d2332c9..50fac24 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -324,7 +324,6 @@ public final class Record {
                              checksum(),
                              key() == null ? 0 : key().limit(),
                              value() == null ? 0 : value().limit());
-
     }
 
     public boolean equals(Object other) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/013cda2d/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
index 1de22b9..b06c4a7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
@@ -244,7 +244,7 @@ public class CoordinatorTest {
         assertEquals(100L, (long) coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).get(tp));
 
         // fetch with no fetchable offsets
-        client.prepareResponse(offsetFetchResponse(tp, Errors.NO_OFFSETS_FETCHABLE.code(),
"", 100L));
+        client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
         assertEquals(0, coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).size());
 
         // fetch with offset topic unknown

http://git-wip-us.apache.org/repos/asf/kafka/blob/013cda2d/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index eb1eb4a..c75c685 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -51,7 +51,6 @@ object ErrorMapping {
   val NotEnoughReplicasAfterAppendCode: Short = 20
   // 21: InvalidRequiredAcks
   // 22: IllegalConsumerGeneration
-  val NoOffsetsCommittedCode: Short = 23
 
   private val exceptionToCode =
     Map[Class[Throwable], Short](
@@ -73,8 +72,7 @@ object ErrorMapping {
       classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode,
       classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode,
       classOf[NotEnoughReplicasException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasCode,
-      classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] ->
NotEnoughReplicasAfterAppendCode,
-      classOf[NoOffsetsCommittedException].asInstanceOf[Class[Throwable]] -> NoOffsetsCommittedCode
+      classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] ->
NotEnoughReplicasAfterAppendCode
     ).withDefaultValue(UnknownCode)
 
   /* invert the mapping */

http://git-wip-us.apache.org/repos/asf/kafka/blob/013cda2d/core/src/main/scala/kafka/common/NoOffsetsCommittedException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/NoOffsetsCommittedException.scala b/core/src/main/scala/kafka/common/NoOffsetsCommittedException.scala
deleted file mode 100644
index 2a68e87..0000000
--- a/core/src/main/scala/kafka/common/NoOffsetsCommittedException.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-/**
- * Number of insync replicas for the partition is lower than min.insync.replicas
- * This exception is raised when the low ISR size is discovered *after* the message
- * was already appended to the log. Producer retries will cause duplicates.
- */
-class NoOffsetsCommittedException(message: String) extends RuntimeException(message) {
-  def this() = this(null)
-}
\ No newline at end of file


Mime
View raw message