kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: MINOR: Follow-up minor improvements/cleanup for KAFKA-3396
Date Tue, 04 Oct 2016 00:23:48 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a23859e56 -> 7115c66ae


MINOR: Follow-up minor improvements/cleanup for KAFKA-3396

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1946 from hachikuji/followup-for-kafka-3396


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7115c66a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7115c66a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7115c66a

Branch: refs/heads/trunk
Commit: 7115c66aefb11efa802e61a42bcc13fadf92598d
Parents: a23859e
Author: Jason Gustafson <jason@confluent.io>
Authored: Mon Oct 3 16:49:50 2016 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Mon Oct 3 16:49:50 2016 -0700

----------------------------------------------------------------------
 .../consumer/internals/ConsumerCoordinator.java |   4 +-
 .../clients/consumer/internals/Fetcher.java     |  28 ++--
 .../clients/producer/internals/Sender.java      |   8 +-
 .../common/requests/OffsetCommitResponse.java   |   1 +
 .../common/requests/OffsetFetchResponse.java    |   2 +-
 .../internals/ConsumerCoordinatorTest.java      |  20 +++
 .../src/main/scala/kafka/admin/AdminUtils.scala |   2 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  17 ++-
 core/src/main/scala/kafka/utils/Logging.scala   |   2 +
 .../kafka/api/AuthorizerIntegrationTest.scala   | 133 +++++++++++--------
 .../kafka/api/EndToEndAuthorizationTest.scala   | 115 ++++++----------
 .../kafka/api/IntegrationTestHarness.scala      |  18 +--
 docs/upgrade.html                               |   7 +-
 13 files changed, 186 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7115c66a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index bc77a7a..bd95409 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -675,7 +675,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                     return;
                 } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                     log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message());
-                    future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic: " + error.message()));
+                    future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic"));
                     return;
                 } else {
                     log.error("Group {} failed to commit partition {} at offset {}: {}", groupId, tp, offset, error.message());
@@ -739,7 +739,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                         coordinatorDead();
                         future.raise(error);
                     } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
-                        future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic: " + error.message()));
+                        future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic"));
                     } else {
                         future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7115c66a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index fe19dbe..17ab398 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -592,11 +592,14 @@ public class Fetcher<K, V> {
                 log.debug("Cannot search by timestamp for partition {} because the message format version " +
                         "is before 0.10.0", topicPartition);
                 timestampOffsetMap.put(topicPartition, null);
-            } else if (error == Errors.NOT_LEADER_FOR_PARTITION
-                    || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+            } else if (error == Errors.NOT_LEADER_FOR_PARTITION) {
                 log.debug("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
                         topicPartition);
                 future.raise(error);
+            } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+                log.warn("Received unknown topic or partition error in ListOffset request for partition {}. The topic/partition " +
+                        "may not exist or the user may not have Describe access to it", topicPartition);
+                future.raise(error);
             } else {
                 log.warn("Attempt to fetch offsets for partition {} failed due to: {}",
                         topicPartition, error.message());
@@ -664,13 +667,14 @@ public class Fetcher<K, V> {
         int bytes = 0;
         int recordsCount = 0;
         PartitionRecords<K, V> parsedRecords = null;
+        Errors error = Errors.forCode(partition.errorCode);
 
         try {
             if (!subscriptions.isFetchable(tp)) {
                 // this can happen when a rebalance happened or a partition consumption paused
                 // while fetch is still in-flight
                 log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp);
-            } else if (partition.errorCode == Errors.NONE.code()) {
+            } else if (error == Errors.NONE) {
                 // we are interested in this fetch only if the beginning offset matches the
                 // current consumed position
                 Long position = subscriptions.position(tp);
@@ -700,10 +704,14 @@ public class Fetcher<K, V> {
                     ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
                     this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
                 }
-            } else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
-                    || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
+            } else if (error == Errors.NOT_LEADER_FOR_PARTITION) {
+                log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
+                this.metadata.requestUpdate();
+            } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+                log.warn("Received unknown topic or partition error in fetch for partition {}. The topic/partition " +
+                        "may not exist or the user may not have Describe access to it", tp);
                 this.metadata.requestUpdate();
-            } else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
+            } else if (error == Errors.OFFSET_OUT_OF_RANGE) {
                 if (fetchOffset != subscriptions.position(tp)) {
                     log.debug("Discarding stale fetch response for partition {} since the fetched offset {}" +
                             "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
@@ -713,13 +721,13 @@ public class Fetcher<K, V> {
                 } else {
                     throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
                 }
-            } else if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) {
+            } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                 log.warn("Not authorized to read from topic {}.", tp.topic());
                 throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
-            } else if (partition.errorCode == Errors.UNKNOWN.code()) {
+            } else if (error == Errors.UNKNOWN) {
                 log.warn("Unknown error fetching data for topic-partition {}", tp);
             } else {
-                throw new IllegalStateException("Unexpected error code " + partition.errorCode + " while fetching data");
+                throw new IllegalStateException("Unexpected error code " + error.code() + " while fetching data");
             }
         } finally {
             completedFetch.metricAggregator.record(tp, bytes, recordsCount);
@@ -727,7 +735,7 @@ public class Fetcher<K, V> {
 
         // we move the partition to the end if we received some bytes or if there was an error. This way, it's more
         // likely that partitions for the same topic can remain together (allowing for more efficient serialization).
-        if (bytes > 0 || partition.errorCode != Errors.NONE.code())
+        if (bytes > 0 || error != Errors.NONE)
             subscriptions.movePartitionToEnd(tp);
 
         return parsedRecords;

http://git-wip-us.apache.org/repos/asf/kafka/blob/7115c66a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 6471dad..8fc7f2c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.MetricName;
@@ -322,8 +323,13 @@ public class Sender implements Runnable {
             if (error != Errors.NONE)
                 this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
         }
-        if (error.exception() instanceof InvalidMetadataException)
+        if (error.exception() instanceof InvalidMetadataException) {
+            if (error.exception() instanceof UnknownTopicOrPartitionException)
+                log.warn("Received unknown topic or partition error in produce request on partition {}. The " +
+                        "topic/partition may not exist or the user may not have Describe access to it", batch.topicPartition);
             metadata.requestUpdate();
+        }
+
         // Unmute the completed partition.
         if (guaranteeMessageOrder)
             this.accumulator.unmutePartition(batch.topicPartition);

http://git-wip-us.apache.org/repos/asf/kafka/blob/7115c66a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 71dd490..1dfda93 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -41,6 +41,7 @@ public class OffsetCommitResponse extends AbstractRequestResponse {
     /**
      * Possible error codes:
      *
+     * UNKNOWN_TOPIC_OR_PARTITION (3)
      * OFFSET_METADATA_TOO_LARGE (12)
      * GROUP_LOAD_IN_PROGRESS (14)
      * GROUP_COORDINATOR_NOT_AVAILABLE (15)

http://git-wip-us.apache.org/repos/asf/kafka/blob/7115c66a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 6cf93a0..1715777 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -47,7 +47,7 @@ public class OffsetFetchResponse extends AbstractRequestResponse {
     /**
      * Possible error codeS:
      *
-     *  UNKNOWN_TOPIC_OR_PARTITION (3)  <- only for request v0
+     *  UNKNOWN_TOPIC_OR_PARTITION (3)
      *  GROUP_LOAD_IN_PROGRESS (14)
      *  NOT_COORDINATOR_FOR_GROUP (16)
      *  TOPIC_AUTHORIZATION_FAILED (29)

http://git-wip-us.apache.org/repos/asf/kafka/blob/7115c66a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 924a582..3957615 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1056,6 +1056,15 @@ public class ConsumerCoordinatorTest {
         coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
     }
 
+    @Test(expected = KafkaException.class)
+    public void testCommitUnknownTopicOrPartition() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorReady();
+
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION.code())));
+        coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
+    }
+
     @Test(expected = OffsetMetadataTooLarge.class)
     public void testCommitOffsetMetadataTooLarge() {
         // since offset metadata is provided by the user, we have to propagate the exception so they can handle it
@@ -1133,6 +1142,17 @@ public class ConsumerCoordinatorTest {
         assertEquals(100L, subscriptions.committed(tp).offset());
     }
 
+    @Test(expected = KafkaException.class)
+    public void testRefreshOffsetUnknownTopicOrPartition() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorReady();
+
+        subscriptions.assignFromUser(singleton(tp));
+        subscriptions.needRefreshCommits();
+        client.prepareResponse(offsetFetchResponse(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "", 100L));
+        coordinator.refreshCommittedOffsetsIfNeeded();
+    }
+
     @Test
     public void testRefreshOffsetNotCoordinatorForConsumer() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/7115c66a/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 7873028..96f09b0 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -325,7 +325,7 @@ object AdminUtils extends Logging with AdminUtilities {
           case e2: Throwable => throw new AdminOperationException(e2)
         }
       } else {
-        throw new UnknownTopicOrPartitionException("topic %s to delete does not exist".format(topic))
+        throw new UnknownTopicOrPartitionException(s"Topic `$topic` to delete does not exist")
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7115c66a/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index d765c8a..c1b723f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -215,7 +215,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, controlledShutdownResponse)))
   }
 
-
   /**
    * Handle an offset commit request
    */
@@ -237,7 +236,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case (topicPartition, _) => {
           val authorizedForDescribe = authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic))
           val exists = metadataCache.contains(topicPartition.topic)
-          if (!authorizedForDescribe && exists) 
+          if (!authorizedForDescribe && exists)
               debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
                 s"on partition $topicPartition failing due to user not having DESCRIBE authorization, but returning UNKNOWN_TOPIC_OR_PARTITION")
           authorizedForDescribe && exists
@@ -254,7 +253,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           unauthorizedForReadTopics.mapValues(_ => new JShort(Errors.TOPIC_AUTHORIZATION_FAILED.code)) ++
           nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code))
 
-        if (logger.isDebugEnabled()) //optimizing code as it's a loop
+        if (isDebugEnabled)
           combinedCommitStatus.foreach { case (topicPartition, errorCode) =>
             if (errorCode != Errors.NONE.code) {
               debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
@@ -1195,17 +1194,17 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleDeleteTopicsRequest(request: RequestChannel.Request) {
     val deleteTopicRequest = request.body.asInstanceOf[DeleteTopicsRequest]
 
-    val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = deleteTopicRequest.topics.asScala.partition( topic =>
+    val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = deleteTopicRequest.topics.asScala.partition { topic =>
       authorize(request.session, Describe, new Resource(auth.Topic, topic)) && metadataCache.contains(topic)
-    )
+    }
 
-    val (authorizedTopics, unauthorizedForDeleteTopics) = existingAndAuthorizedForDescribeTopics.partition( topic =>
+    val (authorizedTopics, unauthorizedForDeleteTopics) = existingAndAuthorizedForDescribeTopics.partition { topic =>
       authorize(request.session, Delete, new Resource(auth.Topic, topic))
-    )
+    }
     
     def sendResponseCallback(results: Map[String, Errors]): Unit = {
-      val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map( topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++
-          unauthorizedForDeleteTopics.map( topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results
+      val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++
+          unauthorizedForDeleteTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results
       val respHeader = new ResponseHeader(request.header.correlationId)
       val responseBody = new DeleteTopicsResponse(completeResults.asJava)
       trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/7115c66a/core/src/main/scala/kafka/utils/Logging.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Logging.scala b/core/src/main/scala/kafka/utils/Logging.scala
index d34c464..2df5878 100755
--- a/core/src/main/scala/kafka/utils/Logging.scala
+++ b/core/src/main/scala/kafka/utils/Logging.scala
@@ -47,6 +47,8 @@ trait Logging {
     CoreUtils.swallow(logger.trace, action)
   }
 
+  def isDebugEnabled: Boolean = logger.isDebugEnabled
+
   def debug(msg: => String): Unit = {
     if (logger.isDebugEnabled())
       logger.debug(msgWithLogIdent(msg))

http://git-wip-us.apache.org/repos/asf/kafka/blob/7115c66a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index be41581..55e8e4f 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -38,13 +38,8 @@ import org.junit.{After, Assert, Before, Test}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.Buffer
-import scala.concurrent.{Await, Future}
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.duration._
-import scala.util.{Failure, Success}
 
 import org.apache.kafka.common.KafkaException
-import java.util.HashMap
 import kafka.admin.AdminUtils
 
 class AuthorizerIntegrationTest extends BaseRequestTest {
@@ -60,7 +55,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val correlationId = 0
   val clientId = "client-Id"
   val tp = new TopicPartition(topic, part)
-
   val topicAndPartition = new TopicAndPartition(topic, part)
   val group = "my-group"
   val topicResource = new Resource(Topic, topic)
@@ -157,6 +151,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
     for (i <- 0 until producerCount)
       producers += TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
+        maxBlockMs = 3000,
         acks = 1)
     for (i <- 0 until consumerCount)
       consumers += TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT)
@@ -258,7 +253,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   @Test
-  def testAuthorization() {
+  def testAuthorizationWithTopicExisting() {
     val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
       ApiKeys.METADATA -> createMetadataRequest,
       ApiKeys.PRODUCE -> createProduceRequest,
@@ -282,15 +277,25 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     for ((key, request) <- requestKeyToRequest) {
       removeAllAcls
       val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet
-      sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = false)
-      for ((resource, acls) <- RequestKeysToAcls(key))
+      sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false)
+
+      val resourceToAcls = RequestKeysToAcls(key)
+      resourceToAcls.get(topicResource).map { acls =>
+        val describeAcls = TopicDescribeAcl(topicResource)
+        val isAuthorized =  describeAcls == acls
+        addAndVerifyAcls(describeAcls, topicResource)
+        sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true)
+        removeAllAcls
+      }
+
+      for ((resource, acls) <- resourceToAcls)
         addAndVerifyAcls(acls, resource)
-      sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = true)
+      sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = true, isAuthorizedTopicDescribe = false)
     }
   }
 
   /*
-   * checking that whether the topic exists or not, when unauthorized, FETCH and PRODUCE do not leak the topic name
+   * even if the topic doesn't exist, request APIs should not leak the topic name
    */
   @Test
   def testAuthorizationWithTopicNotExisting() {
@@ -298,20 +303,33 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
     AdminUtils.deleteTopic(zkUtils, deleteTopic)
     TestUtils.verifyTopicDeletion(zkUtils, deleteTopic, 1, servers)
-    
+
     val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
       ApiKeys.PRODUCE -> createProduceRequest,
       ApiKeys.FETCH -> createFetchRequest,
+      ApiKeys.LIST_OFFSETS -> createListOffsetsRequest,
+      ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest,
+      ApiKeys.OFFSET_FETCH -> createOffsetFetchRequest,
       ApiKeys.DELETE_TOPICS -> deleteTopicsRequest
     )
 
     for ((key, request) <- requestKeyToRequest) {
       removeAllAcls
       val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet
-      sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = false, topicExists = false)
-      for ((resource, acls) <- RequestKeysToAcls(key))
+      sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = false, isAuthorizedTopicDescribe = false, topicExists = false)
+
+      val resourceToAcls = RequestKeysToAcls(key)
+      resourceToAcls.get(topicResource).map { acls =>
+        val describeAcls = TopicDescribeAcl(topicResource)
+        val isAuthorized =  describeAcls == acls
+        addAndVerifyAcls(describeAcls, topicResource)
+        sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = isAuthorized, isAuthorizedTopicDescribe = true, topicExists = false)
+        removeAllAcls
+      }
+
+      for ((resource, acls) <- resourceToAcls)
         addAndVerifyAcls(acls, resource)
-      sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = true, topicExists = false)
+      sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = true, isAuthorizedTopicDescribe = false, topicExists = false)
     }
   }
 
@@ -412,20 +430,18 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     consumeRecords(this.consumers.head)
   }
 
-  @Test
+  @Test(expected = classOf[KafkaException])
   def testConsumeWithoutTopicDescribeAccess() {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
 
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
-    try {
-      this.consumers.head.assign(List(tp).asJava)
-      consumeRecords(this.consumers.head)
-      Assert.fail("should have thrown exception")
-    } catch {
-      case e: KafkaException => //expected
-    }
+    this.consumers.head.assign(List(tp).asJava)
+
+    // the consumer should raise an exception if it receives UNKNOWN_TOPIC_OR_PARTITION
+    // from the ListOffsets response when looking up the initial position.
+    consumeRecords(this.consumers.head)
   }
 
   @Test
@@ -502,8 +518,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       Assert.fail("Expected TopicAuthorizationException")
     } catch {
       case e: TopicAuthorizationException => //expected
-    } 
-
+    }
   }
 
   @Test
@@ -511,7 +526,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
     sendRecords(1, tp)
 
-    //create a unmatched topic
+    // create an unmatched topic
     val unmatchedTopic = "unmatched"
     TestUtils.createTopic(zkUtils, unmatchedTopic, 1, 1, this.servers)
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)),  new Resource(Topic, unmatchedTopic))
@@ -524,15 +539,17 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener)
     consumeRecords(consumer)
 
-    // set the subscription pattern to an internal topic that the consumer has no read permission for, but since
-    // `exclude.internal.topics` is true by default, the subscription should be empty and no authorization exception
-    // should be thrown
+    // set the subscription pattern to an internal topic that the consumer has read permission to. Since
+    // internal topics are not included, we should not be assigned any partitions from this topic
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),  new Resource(Topic, kafka.common.Topic.GroupMetadataTopicName))
     consumer.subscribe(Pattern.compile(kafka.common.Topic.GroupMetadataTopicName), new NoOpConsumerRebalanceListener)
-    assertTrue(consumer.poll(50).isEmpty)
+    consumer.poll(0)
+    assertTrue(consumer.subscription().isEmpty)
+    assertTrue(consumer.assignment().isEmpty)
   }
 
   @Test
-  def testPatternSubscriptionMatchingInternalTopicWithNoPermission() {
+  def testPatternSubscriptionMatchingInternalTopic() {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
@@ -545,9 +562,16 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group,
       securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig))
     try {
+      // ensure that internal topics are not included if no permission
       consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener)
       consumeRecords(consumer)
-      assertEquals(Set[String](topic).asJava, consumer.subscription)
+      assertEquals(Set(topic).asJava, consumer.subscription)
+
+      // now authorize the user for the internal topic and verify that we can subscribe
+      addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), Resource(Topic, kafka.common.Topic.GroupMetadataTopicName))
+      consumer.subscribe(Pattern.compile(kafka.common.Topic.GroupMetadataTopicName), new NoOpConsumerRebalanceListener)
+      consumer.poll(0)
+      assertEquals(Set(kafka.common.Topic.GroupMetadataTopicName), consumer.subscription.asScala)
     } finally consumer.close()
   }
 
@@ -741,27 +765,35 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
                                             request: AbstractRequest,
                                             resources: Set[ResourceType],
                                             isAuthorized: Boolean,
+                                            isAuthorizedTopicDescribe: Boolean,
                                             topicExists: Boolean = true): AbstractRequestResponse = {
     val resp = send(request, apiKey)
     val response = RequestKeyToResponseDeserializer(apiKey).getMethod("parse", classOf[ByteBuffer]).invoke(null, resp).asInstanceOf[AbstractRequestResponse]
-    val errorCode = RequestKeyToErrorCode(apiKey).asInstanceOf[(AbstractRequestResponse) => Short](response)
-
-    val possibleErrorCodes = resources.flatMap { resourceType =>
-      if (resourceType == Topic)
-          // When completely unauthorized topic resources must return an UNKNOWN_TOPIC_OR_PARTITION to prevent leaking topic names
-          Seq(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-      else
-          Seq(resourceType.errorCode)
+    val error = Errors.forCode(RequestKeyToErrorCode(apiKey).asInstanceOf[(AbstractRequestResponse) => Short](response))
+
+    val authorizationErrorCodes = resources.flatMap { resourceType =>
+      if (resourceType == Topic) {
+        if (isAuthorized)
+          Set(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.forCode(Topic.errorCode))
+        else if (!isAuthorizedTopicDescribe)
+          Set(Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        else
+          Set(Errors.forCode(Topic.errorCode))
+      } else {
+        Set(Errors.forCode(resourceType.errorCode))
+      }
     }
 
     if (topicExists)
       if (isAuthorized)
-        assertFalse(s"${apiKey} should be allowed. Found error code $errorCode", possibleErrorCodes.contains(errorCode))
+        assertFalse(s"${apiKey} should be allowed. Found unexpected authorization error $error", authorizationErrorCodes.contains(error))
       else
-        assertTrue(s"${apiKey} should be forbidden. Found error code $errorCode but expected one of ${possibleErrorCodes.mkString(",")} ", possibleErrorCodes.contains(errorCode))
+        assertTrue(s"${apiKey} should be forbidden. Found error $error but expected one of $authorizationErrorCodes", authorizationErrorCodes.contains(error))
+    else if (resources == Set(Topic))
+      assertEquals(s"${apiKey} had an unexpected error", Errors.UNKNOWN_TOPIC_OR_PARTITION, error)
     else
-      assertEquals(s"${apiKey} - Found error code $errorCode", Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), errorCode) 
-      
+      assertNotEquals(s"${apiKey} had an unexpected error", Errors.TOPIC_AUTHORIZATION_FAILED, error)
+
     response
   }
 
@@ -786,7 +818,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     TestUtils.waitAndVerifyAcls(servers.head.apis.authorizer.get.getAcls(resource) -- acls, servers.head.apis.authorizer.get, resource)
   }
 
-
   private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
                              numRecords: Int = 1,
                              startingOffset: Int = 0,
@@ -794,13 +825,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
                              part: Int = part) {
     val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
 
-    val future = Future {
-      while (records.size < numRecords) 
-        for (record <- consumer.poll(50).asScala)
-          records.add(record)
-      records
-    }
-    val result = Await.result(future, 10 seconds)
+    TestUtils.waitUntilTrue(() => {
+      for (record <- consumer.poll(50).asScala)
+        records.add(record)
+      records.size == numRecords
+    }, "Failed to receive all expected records from the consumer")
 
     for (i <- 0 until numRecords) {
       val record = records.get(i)

http://git-wip-us.apache.org/repos/asf/kafka/blob/7115c66a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 2f5858c..4e6c740 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -19,28 +19,23 @@ package kafka.api
 
 import java.io.File
 import java.util.ArrayList
-import java.util.concurrent.{ExecutionException, TimeoutException => JTimeoutException}
+import java.util.concurrent.ExecutionException
 
 import kafka.admin.AclCommand
 import kafka.common.TopicAndPartition
 import kafka.security.auth._
 import kafka.server._
 import kafka.utils._
-
-import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, ConsumerConfig}
-import org.apache.kafka.clients.producer.{ProducerRecord, ProducerConfig, KafkaProducer}
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecord}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.{TopicPartition,KafkaException}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.protocol.SecurityProtocol
-import org.apache.kafka.common.errors.{GroupAuthorizationException,TopicAuthorizationException,TimeoutException}
+import org.apache.kafka.common.errors.{GroupAuthorizationException, TimeoutException, TopicAuthorizationException}
 import org.junit.Assert._
-import org.junit.{Test, After, Before}
+import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
-import scala.concurrent.{Await, Future}
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.duration._
-import scala.util.{Failure, Success}
 
 /**
   * The test cases here verify that a producer authorized to publish to a topic
@@ -60,7 +55,7 @@ import scala.util.{Failure, Success}
   * SaslTestHarness here directly because it extends ZooKeeperTestHarness, and we
   * would end up with ZooKeeperTestHarness twice.
   */
-trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
+abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
   override val producerCount = 1
   override val consumerCount = 2
   override val serverCount = 3
@@ -181,11 +176,11 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
 
   override def createNewProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
     TestUtils.createNewProducer(brokerList,
-                                  maxBlockMs = 5000L,
-                                  securityProtocol = this.securityProtocol,
-                                  trustStoreFile = this.trustStoreFile,
-                                  saslProperties = this.saslProperties,
-                                  props = Some(producerConfig))
+                                maxBlockMs = 3000L,
+                                securityProtocol = this.securityProtocol,
+                                trustStoreFile = this.trustStoreFile,
+                                saslProperties = this.saslProperties,
+                                props = Some(producerConfig))
   }
   
   /**
@@ -206,7 +201,6 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
     setAclsAndProduce()
     consumers.head.assign(List(tp).asJava)
     consumeRecords(this.consumers.head, numRecords)
-    debug("Finished consuming")
   }
 
   @Test
@@ -214,7 +208,6 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
     setAclsAndProduce()
     consumers.head.subscribe(List(topic).asJava)
     consumeRecords(this.consumers.head, numRecords)
-    debug("Finished consuming")
   }
 
   private def setAclsAndProduce() {
@@ -224,27 +217,16 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
       TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource)
       TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
     }
-    //Produce records
-    debug("Starting to send records")
     sendRecords(numRecords, tp)
-    //Consume records
-    debug("Finished sending and starting to consume records")
   }
 
   /**
     * Tests that a producer fails to publish messages when the appropriate ACL
     * isn't set.
     */
-  @Test
+  @Test(expected = classOf[TimeoutException])
   def testNoProduceWithoutDescribeAcl {
-    //Produce records
-    debug("Starting to send records")
-    try{
-      sendRecords(numRecords, tp)
-      fail("exception expected")
-    } catch {
-      case e: TimeoutException => //expected
-    }
+    sendRecords(numRecords, tp)
   }
 
   @Test
@@ -253,13 +235,12 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
     servers.foreach { s =>
       TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.apis.authorizer.get, topicResource)
     }
-    //Produce records
-    debug("Starting to send records")
     try{
       sendRecords(numRecords, tp)
       fail("exception expected")
     } catch {
-      case e: TopicAuthorizationException => //expected
+      case e: TopicAuthorizationException =>
+        assertEquals(Set(topic).asJava, e.unauthorizedTopics())
     }
   }
   
@@ -267,31 +248,21 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
     * Tests that a consumer fails to consume messages without the appropriate
     * ACL set.
     */
-  @Test
+  @Test(expected = classOf[KafkaException])
   def testNoConsumeWithoutDescribeAclViaAssign {
     noConsumeWithoutDescribeAclSetup
     consumers.head.assign(List(tp).asJava)
-
-    try {
-      consumeRecords(this.consumers.head)
-      fail("exception expected")
-    } catch {
-       case e: KafkaException => //expected
-    }
+    // the exception is expected when the consumer attempts to lookup offsets
+    consumeRecords(this.consumers.head)
   }
   
-  @Test
+  @Test(expected = classOf[TimeoutException])
   def testNoConsumeWithoutDescribeAclViaSubscribe {
     noConsumeWithoutDescribeAclSetup
     consumers.head.subscribe(List(topic).asJava)
-
-    try {
-      consumeRecords(this.consumers.head)
-      fail("exception expected")
-    } catch {
-      case e: JTimeoutException => //expected
-    }
-  } 
+    // this should timeout since the consumer will not be able to fetch any metadata for the topic
+    consumeRecords(this.consumers.head, timeout = 3000)
+  }
   
   private def noConsumeWithoutDescribeAclSetup {
     AclCommand.main(produceAclArgs)
@@ -300,18 +271,14 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
       TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource)
       TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
     }
-    //Produce records
-    debug("Starting to send records")
+
     sendRecords(numRecords, tp)
 
-    //Deleting topic ACL
     AclCommand.main(deleteDescribeAclArgs)
     AclCommand.main(deleteWriteAclArgs)
     servers.foreach { s =>
       TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource)
     }
-    
-    debug("Finished sending and starting to consume records")
   }
  
   /**
@@ -327,7 +294,8 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
       consumeRecords(this.consumers.head)
       fail("Topic authorization exception expected")
     } catch {
-      case e: TopicAuthorizationException => //expected
+      case e: TopicAuthorizationException =>
+        assertEquals(Set(topic).asJava, e.unauthorizedTopics())
     }
   }
   
@@ -340,22 +308,19 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
       consumeRecords(this.consumers.head)
       fail("Topic authorization exception expected")
     } catch {
-      case e: TopicAuthorizationException => //expected
+      case e: TopicAuthorizationException =>
+        assertEquals(Set(topic).asJava, e.unauthorizedTopics())
     }
   }
   
   private def noConsumeWithDescribeAclSetup {
-    AclCommand.main(produceAclArgs) 
+    AclCommand.main(produceAclArgs)
     AclCommand.main(groupAclArgs)
     servers.foreach { s =>
       TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource)
       TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource)
     }
-    //Produce records
-    debug("Starting to send records")
     sendRecords(numRecords, tp)
-    //Consume records
-    debug("Finished sending and starting to consume records")
   }
 
   /**
@@ -368,17 +333,14 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
     servers.foreach { s =>
       TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource)
     }
-    //Produce records
-    debug("Starting to send records")
     sendRecords(numRecords, tp)
-    //Consume records
-    debug("Finished sending and starting to consume records")
     consumers.head.assign(List(tp).asJava)
     try {
       consumeRecords(this.consumers.head)
       fail("Topic authorization exception expected")
     } catch {
-      case e: GroupAuthorizationException => //expected
+      case e: GroupAuthorizationException =>
+        assertEquals(group, e.groupId())
     }
   }
   
@@ -399,16 +361,17 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
                              numRecords: Int = 1,
                              startingOffset: Int = 0,
                              topic: String = topic,
-                             part: Int = part) {
+                             part: Int = part,
+                             timeout: Long = 10000) {
     val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
 
-    val future = Future {
-      while (records.size < numRecords) 
-        for (record <- consumer.poll(50).asScala)
-          records.add(record)
-      records
+    val deadlineMs = System.currentTimeMillis() + timeout
+    while (records.size < numRecords && System.currentTimeMillis() < deadlineMs) {
+      for (record <- consumer.poll(50).asScala)
+        records.add(record)
     }
-    val result = Await.result(future, 10 seconds)
+    if (records.size < numRecords)
+      throw new TimeoutException
 
     for (i <- 0 until numRecords) {
       val record = records.get(i)

http://git-wip-us.apache.org/repos/asf/kafka/blob/7115c66a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index ffca431..ca020a6 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -79,7 +79,6 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
       servers.head.groupCoordinator.offsetsTopicConfigs)
   }
 
-  //extracted method to allow for different params in some specific tests
   def createNewProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
       TestUtils.createNewProducer(brokerList,
                                   securityProtocol = this.securityProtocol,
@@ -88,7 +87,6 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
                                   props = Some(producerConfig))
   }
   
-  //extracted method to allow for different params in some specific tests
   def createNewConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
       TestUtils.createNewConsumer(brokerList,
                                   securityProtocol = this.securityProtocol,
@@ -100,21 +98,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
   @After
   override def tearDown() {
     producers.foreach(_.close())
-    
-    consumers.foreach { consumer => 
-      breakable {
-        while(true) {
-          try {
-            consumer.close
-            break
-          } catch {
-            //short wait to make sure that woken up consumer can be closed without spurious ConcurrentModificationException
-            case e: ConcurrentModificationException => Thread.sleep(100L)
-          }
-        }
-      }
-    }
-    
+    consumers.foreach(_.close())
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7115c66a/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 1b1c593..372bf14 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -64,8 +64,11 @@ Note: Because new protocols are introduced, it is important to upgrade your Kafk
     <li> Kafka clusters can now be uniquely identified by a cluster id. It will be automatically generated when a broker is upgraded to 0.10.1.0. The cluster id is available via the kafka.server:type=KafkaServer,name=ClusterId metric and it is part of the Metadata response. Serializers, client interceptors and metric reporters can receive the cluster id by implementing the ClusterResourceListener interface. </li>
     <li> The BrokerState "RunningAsController" (value 4) has been removed. Due to a bug, a broker would only be in this state briefly before transitioning out of it and hence the impact of the removal should be minimal. The recommended way to detect if a given broker is the controller is via the kafka.controller:type=KafkaController,name=ActiveControllerCount metric. </li>
     <li> The new Java Consumer now allows users to search offsets by timestamp on partitions. </li>
-    <li> When using an Authorizer and a user hasn't got <b>Describe</b> authorization on a topic, the broker will no longer return TOPIC_AUTHORIZATION_FAILED errors  
-         but just UNKNOWN_TOPIC_OR_PARTITION errors, to avoid leaking topic names.</li>
+    <li> When using an Authorizer and a user doesn't have <b>Describe</b> authorization on a topic, the broker will no
+         longer return TOPIC_AUTHORIZATION_FAILED errors to requests since this leaks topic names. Instead, the UNKNOWN_TOPIC_OR_PARTITION
+         error code will be returned. This may cause unexpected timeouts or delays when using the producer and consumer since
+         Kafka clients will typically retry automatically on unknown topic errors. You should consult the client logs if you
+         suspect this could be happening.</li>
 </ul>
 
 <h5><a id="upgrade_1010_new_protocols" href="#upgrade_1010_new_protocols">New Protocol Versions</a></h5>


Mime
View raw message