kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/3] kafka git commit: KAFKA-1910; Refactor new consumer and fixed a bunch of corner cases / unit tests; reviewed by Onur Karaman and Jay Kreps
Date Tue, 10 Mar 2015 18:20:36 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 67940c43e -> 0b92cec1e


http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
new file mode 100644
index 0000000..1de22b9
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.ConsumerMetadataResponse;
+import org.apache.kafka.common.requests.HeartbeatResponse;
+import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class CoordinatorTest {
+
+    private String topicName = "test";
+    private String groupId = "test-group";
+    private TopicPartition tp = new TopicPartition(topicName, 0);
+    private long retryBackoffMs = 0L;
+    private long sessionTimeoutMs = 10L;
+    private String rebalanceStrategy = "not-matter";
+    private MockTime time = new MockTime();
+    private MockClient client = new MockClient(time);
+    private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+    private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
+    private Node node = cluster.nodes().get(0);
+    private SubscriptionState subscriptions = new SubscriptionState();
+    private Metrics metrics = new Metrics(time);
+    private Map<String, String> metricTags = new LinkedHashMap<String, String>();
+
+    private Coordinator coordinator = new Coordinator(client,
+        groupId,
+        retryBackoffMs,
+        sessionTimeoutMs,
+        rebalanceStrategy,
+        metadata,
+        subscriptions,
+        metrics,
+        "consumer" + groupId,
+        metricTags,
+        time);
+
+    @Before
+    public void setup() {
+        metadata.update(cluster, time.milliseconds());
+        client.setNode(node);
+    }
+
+    @Test
+    public void testNormalHeartbeat() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+
+        // normal heartbeat
+        time.sleep(sessionTimeoutMs);
+        coordinator.maybeHeartbeat(time.milliseconds()); // should send out the heartbeat
+        assertEquals(1, client.inFlightRequestCount());
+        client.respond(heartbeatResponse(Errors.NONE.code()));
+        assertEquals(1, client.poll(0, time.milliseconds()).size());
+    }
+
+    @Test
+    public void testCoordinatorNotAvailable() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+
+        // consumer_coordinator_not_available will mark coordinator as unknown
+        time.sleep(sessionTimeoutMs);
+        coordinator.maybeHeartbeat(time.milliseconds()); // should send out the heartbeat
+        assertEquals(1, client.inFlightRequestCount());
+        client.respond(heartbeatResponse(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()));
+        time.sleep(sessionTimeoutMs);
+        assertEquals(1, client.poll(0, time.milliseconds()).size());
+        assertTrue(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testNotCoordinator() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+
+        // not_coordinator will mark coordinator as unknown
+        time.sleep(sessionTimeoutMs);
+        coordinator.maybeHeartbeat(time.milliseconds()); // should send out the heartbeat
+        assertEquals(1, client.inFlightRequestCount());
+        client.respond(heartbeatResponse(Errors.NOT_COORDINATOR_FOR_CONSUMER.code()));
+        time.sleep(sessionTimeoutMs);
+        assertEquals(1, client.poll(0, time.milliseconds()).size());
+        assertTrue(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testIllegalGeneration() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+
+        // illegal_generation will cause re-partition
+        subscriptions.subscribe(topicName);
+        subscriptions.changePartitionAssignment(Collections.singletonList(tp));
+
+        time.sleep(sessionTimeoutMs);
+        coordinator.maybeHeartbeat(time.milliseconds()); // should send out the heartbeat
+        assertEquals(1, client.inFlightRequestCount());
+        client.respond(heartbeatResponse(Errors.ILLEGAL_GENERATION.code()));
+        time.sleep(sessionTimeoutMs);
+        assertEquals(1, client.poll(0, time.milliseconds()).size());
+        assertTrue(subscriptions.partitionAssignmentNeeded());
+    }
+
+    @Test
+    public void testCoordinatorDisconnect() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+
+        // coordinator disconnect will mark coordinator as unknown
+        time.sleep(sessionTimeoutMs);
+        coordinator.maybeHeartbeat(time.milliseconds()); // should send out the heartbeat
+        assertEquals(1, client.inFlightRequestCount());
+        client.respond(heartbeatResponse(Errors.NONE.code()), true); // return disconnected
+        time.sleep(sessionTimeoutMs);
+        assertEquals(1, client.poll(0, time.milliseconds()).size());
+        assertTrue(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testNormalJoinGroup() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+
+        // normal join group
+        client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()));
+        assertEquals(Collections.singletonList(tp),
+            coordinator.assignPartitions(Collections.singletonList(topicName), time.milliseconds()));
+        assertEquals(0, client.inFlightRequestCount());
+    }
+
+    @Test
+    public void testReJoinGroup() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+
+        // diconnected from original coordinator will cause re-discover and join again
+        client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()), true);
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()));
+        assertEquals(Collections.singletonList(tp),
+            coordinator.assignPartitions(Collections.singletonList(topicName), time.milliseconds()));
+        assertEquals(0, client.inFlightRequestCount());
+    }
+
+
+    @Test
+    public void testCommitOffsetNormal() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+
+        // sync commit
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, time.milliseconds());
+
+        // async commit
+        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, time.milliseconds());
+        client.respond(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+        assertEquals(1, client.poll(0, time.milliseconds()).size());
+    }
+
+    @Test
+    public void testCommitOffsetError() {
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+
+        // async commit with coordinator not available
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code())));
+        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, time.milliseconds());
+        assertEquals(1, client.poll(0, time.milliseconds()).size());
+        assertTrue(coordinator.coordinatorUnknown());
+        // resume
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+
+        // async commit with not coordinator
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code())));
+        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), false, time.milliseconds());
+        assertEquals(1, client.poll(0, time.milliseconds()).size());
+        assertTrue(coordinator.coordinatorUnknown());
+        // resume
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+
+        // sync commit with not_coordinator
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code())));
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, time.milliseconds());
+
+        // sync commit with coordinator disconnected
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+        coordinator.commitOffsets(Collections.singletonMap(tp, 100L), true, time.milliseconds());
+    }
+
+
+    @Test
+    public void testFetchOffset() {
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+
+        // normal fetch
+        client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
+        assertEquals(100L, (long) coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).get(tp));
+
+        // fetch with loading in progress
+        client.prepareResponse(offsetFetchResponse(tp, Errors.OFFSET_LOAD_IN_PROGRESS.code(), "", 100L));
+        client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
+        assertEquals(100L, (long) coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).get(tp));
+
+        // fetch with not coordinator
+        client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code(), "", 100L));
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
+        assertEquals(100L, (long) coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).get(tp));
+
+        // fetch with no fetchable offsets
+        client.prepareResponse(offsetFetchResponse(tp, Errors.NO_OFFSETS_FETCHABLE.code(), "", 100L));
+        assertEquals(0, coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).size());
+
+        // fetch with offset topic unknown
+        client.prepareResponse(offsetFetchResponse(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "", 100L));
+        assertEquals(0, coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).size());
+
+        // fetch with offset -1
+        client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
+        assertEquals(0, coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()).size());
+    }
+
+    private Struct consumerMetadataResponse(Node node, short error) {
+        ConsumerMetadataResponse response = new ConsumerMetadataResponse(error, node);
+        return response.toStruct();
+    }
+
+    private Struct heartbeatResponse(short error) {
+        HeartbeatResponse response = new HeartbeatResponse(error);
+        return response.toStruct();
+    }
+
+    private Struct joinGroupResponse(int generationId, String consumerId, List<TopicPartition> assignedPartitions, short error) {
+        JoinGroupResponse response = new JoinGroupResponse(error, generationId, consumerId, assignedPartitions);
+        return response.toStruct();
+    }
+
+    private Struct offsetCommitResponse(Map<TopicPartition, Short> responseData) {
+        OffsetCommitResponse response = new OffsetCommitResponse(responseData);
+        return response.toStruct();
+    }
+
+    private Struct offsetFetchResponse(TopicPartition tp, Short error, String metadata, long offset) {
+        OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, metadata, error);
+        OffsetFetchResponse response = new OffsetFetchResponse(Collections.singletonMap(tp, data));
+        return response.toStruct();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
new file mode 100644
index 0000000..de03ff1
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.ListOffsetResponse;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class FetcherTest {
+
+    private String topicName = "test";
+    private String groupId = "test-group";
+    private TopicPartition tp = new TopicPartition(topicName, 0);
+    private long retryBackoffMs = 0L;
+    private int minBytes = 1;
+    private int maxWaitMs = 0;
+    private int fetchSize = 1000;
+    private String offsetReset = "EARLIEST";
+    private MockTime time = new MockTime();
+    private MockClient client = new MockClient(time);
+    private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+    private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
+    private Node node = cluster.nodes().get(0);
+    private SubscriptionState subscriptions = new SubscriptionState();
+    private Metrics metrics = new Metrics(time);
+    private Map<String, String> metricTags = new LinkedHashMap<String, String>();
+
+    private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
+
+    private Fetcher<byte[], byte[]> fetcher = new Fetcher<byte[], byte[]>(client,
+        retryBackoffMs,
+        minBytes,
+        maxWaitMs,
+        fetchSize,
+        true, // check crc
+        offsetReset,
+        new ByteArrayDeserializer(),
+        new ByteArrayDeserializer(),
+        metadata,
+        subscriptions,
+        metrics,
+        "consumer" + groupId,
+        metricTags,
+        time);
+
+    @Before
+    public void setup() throws Exception {
+        metadata.update(cluster, time.milliseconds());
+        client.setNode(node);
+
+        records.append(1L, "key".getBytes(), "value-1".getBytes());
+        records.append(2L, "key".getBytes(), "value-2".getBytes());
+        records.append(3L, "key".getBytes(), "value-3".getBytes());
+        records.close();
+        records.rewind();
+    }
+
+    @Test
+    public void testFetchNormal() {
+        List<ConsumerRecord<byte[], byte[]>> records;
+        subscriptions.subscribe(tp);
+        subscriptions.fetched(tp, 0);
+        subscriptions.consumed(tp, 0);
+
+        // normal fetch
+        fetcher.initFetches(cluster, time.milliseconds());
+        client.respond(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L));
+        client.poll(0, time.milliseconds());
+        records = fetcher.fetchedRecords().get(tp);
+        assertEquals(3, records.size());
+        assertEquals(4L, (long) subscriptions.fetched(tp)); // this is the next fetching position
+        assertEquals(4L, (long) subscriptions.consumed(tp));
+        long offset = 1;
+        for (ConsumerRecord<byte[], byte[]> record : records) {
+            assertEquals(offset, record.offset());
+            offset += 1;
+        }
+    }
+
+    @Test
+    public void testFetchFailed() {
+        List<ConsumerRecord<byte[], byte[]>> records;
+        subscriptions.subscribe(tp);
+        subscriptions.fetched(tp, 0);
+        subscriptions.consumed(tp, 0);
+
+        // fetch with not leader
+        fetcher.initFetches(cluster, time.milliseconds());
+        client.respond(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L));
+        client.poll(0, time.milliseconds());
+        assertEquals(0, fetcher.fetchedRecords().size());
+        assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
+
+        // fetch with unknown topic partition
+        fetcher.initFetches(cluster, time.milliseconds());
+        client.respond(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L));
+        client.poll(0, time.milliseconds());
+        assertEquals(0, fetcher.fetchedRecords().size());
+        assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
+
+        // fetch with out of range
+        subscriptions.fetched(tp, 5);
+        fetcher.initFetches(cluster, time.milliseconds());
+        client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
+        client.prepareResponse(listOffsetResponse(Collections.singletonList(0L), Errors.NONE.code()));
+        client.poll(0, time.milliseconds());
+        assertEquals(0, fetcher.fetchedRecords().size());
+        assertEquals(0L, (long) subscriptions.fetched(tp));
+        assertEquals(0L, (long) subscriptions.consumed(tp));
+    }
+
+    @Test
+    public void testFetchOutOfRange() {
+        List<ConsumerRecord<byte[], byte[]>> records;
+        subscriptions.subscribe(tp);
+        subscriptions.fetched(tp, 5);
+        subscriptions.consumed(tp, 5);
+
+        // fetch with out of range
+        fetcher.initFetches(cluster, time.milliseconds());
+        client.respond(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
+        client.prepareResponse(listOffsetResponse(Collections.singletonList(0L), Errors.NONE.code()));
+        client.poll(0, time.milliseconds());
+        assertEquals(0, fetcher.fetchedRecords().size());
+        assertEquals(0L, (long) subscriptions.fetched(tp));
+        assertEquals(0L, (long) subscriptions.consumed(tp));
+    }
+
+    private Struct fetchResponse(ByteBuffer buffer, short error, long hw) {
+        FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer)));
+        return response.toStruct();
+    }
+
+    private Struct listOffsetResponse(List<Long> offsets, short error) {
+        ListOffsetResponse response = new ListOffsetResponse(Collections.singletonMap(tp, new ListOffsetResponse.PartitionData(error, offsets)));
+        return response.toStruct();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
new file mode 100644
index 0000000..ecc78ce
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.MockTime;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class HeartbeatTest {
+
+    private long timeout = 300L;
+    private MockTime time = new MockTime();
+    private Heartbeat heartbeat = new Heartbeat(timeout, -1L);
+
+    @Test
+    public void testShouldHeartbeat() {
+        heartbeat.sentHeartbeat(time.milliseconds());
+        time.sleep((long) ((float) timeout / Heartbeat.HEARTBEATS_PER_SESSION_INTERVAL * 1.1));
+        assertTrue(heartbeat.shouldHeartbeat(time.milliseconds()));
+    }
+
+    @Test
+    public void testShouldNotHeartbeat() {
+        heartbeat.sentHeartbeat(time.milliseconds());
+        time.sleep(timeout / (2 * Heartbeat.HEARTBEATS_PER_SESSION_INTERVAL));
+        assertFalse(heartbeat.shouldHeartbeat(time.milliseconds()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 090087a..e000cf8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -16,7 +16,8 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static java.util.Arrays.asList;
 
 import java.util.Collections;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index c1bc406..d34d27e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -76,6 +76,7 @@ public class RecordAccumulatorTest {
         List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
         assertEquals(1, batches.size());
         RecordBatch batch = batches.get(0);
+        batch.records.rewind();
         Iterator<LogEntry> iter = batch.records.iterator();
         for (int i = 0; i < appends; i++) {
             LogEntry entry = iter.next();
@@ -104,6 +105,7 @@ public class RecordAccumulatorTest {
         List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
         assertEquals(1, batches.size());
         RecordBatch batch = batches.get(0);
+        batch.records.rewind();
         Iterator<LogEntry> iter = batch.records.iterator();
         LogEntry entry = iter.next();
         assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
@@ -156,6 +158,7 @@ public class RecordAccumulatorTest {
             List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id());
             if (batches != null) {
                 for (RecordBatch batch : batches) {
+                    batch.records.rewind();
                     for (LogEntry entry : batch.records)
                         read++;
                     accum.deallocate(batch);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index ea56c99..24274a6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -16,6 +16,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -27,11 +28,10 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
@@ -76,7 +76,7 @@ public class SenderTest {
         sender.run(time.milliseconds()); // connect
         sender.run(time.milliseconds()); // send produce request
         assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
-        client.respond(produceResponse(tp.topic(), tp.partition(), offset, Errors.NONE.code()));
+        client.respond(produceResponse(tp, offset, Errors.NONE.code()));
         sender.run(time.milliseconds());
         assertEquals("All requests completed.", offset, (long) client.inFlightRequestCount());
         sender.run(time.milliseconds());
@@ -110,7 +110,7 @@ public class SenderTest {
         sender.run(time.milliseconds()); // resend
         assertEquals(1, client.inFlightRequestCount());
         long offset = 0;
-        client.respond(produceResponse(tp.topic(), tp.partition(), offset, Errors.NONE.code()));
+        client.respond(produceResponse(tp, offset, Errors.NONE.code()));
         sender.run(time.milliseconds());
         assertTrue("Request should have retried and completed", future.isDone());
         assertEquals(offset, future.get().offset());
@@ -138,17 +138,11 @@ public class SenderTest {
         }
     }
 
-    private Struct produceResponse(String topic, int part, long offset, int error) {
-        Struct struct = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id));
-        Struct response = struct.instance("responses");
-        response.set("topic", topic);
-        Struct partResp = response.instance("partition_responses");
-        partResp.set("partition", part);
-        partResp.set("error_code", (short) error);
-        partResp.set("base_offset", offset);
-        response.set("partition_responses", new Object[] {partResp});
-        struct.set("responses", new Object[] {response});
-        return struct;
+    private Struct produceResponse(TopicPartition tp, long offset, int error) {
+        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse((short) error, offset);
+        Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
+        ProduceResponse response = new ProduceResponse(partResp);
+        return response.toStruct();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index e343327..42f8f5e 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -50,7 +50,9 @@ public class MemoryRecordsTest {
             recs2.append(i, toArray(r.key()), toArray(r.value()));
         }
         recs1.close();
+        recs1.rewind();
         recs2.close();
+        recs2.rewind();
 
         for (int iteration = 0; iteration < 2; iteration++) {
             for (MemoryRecords recs : Arrays.asList(recs1, recs2)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index eedc2f5..eb1eb4a 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -49,6 +49,9 @@ object ErrorMapping {
   val MessageSetSizeTooLargeCode: Short = 18
   val NotEnoughReplicasCode : Short = 19
   val NotEnoughReplicasAfterAppendCode: Short = 20
+  // 21: InvalidRequiredAcks
+  // 22: IllegalConsumerGeneration
+  val NoOffsetsCommittedCode: Short = 23
 
   private val exceptionToCode =
     Map[Class[Throwable], Short](
@@ -70,7 +73,8 @@ object ErrorMapping {
       classOf[InvalidTopicException].asInstanceOf[Class[Throwable]] -> InvalidTopicCode,
       classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode,
       classOf[NotEnoughReplicasException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasCode,
-      classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode
+      classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode,
+      classOf[NoOffsetsCommittedException].asInstanceOf[Class[Throwable]] -> NoOffsetsCommittedCode
     ).withDefaultValue(UnknownCode)
 
   /* invert the mapping */

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/core/src/main/scala/kafka/common/NoOffsetsCommittedException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/NoOffsetsCommittedException.scala b/core/src/main/scala/kafka/common/NoOffsetsCommittedException.scala
new file mode 100644
index 0000000..2a68e87
--- /dev/null
+++ b/core/src/main/scala/kafka/common/NoOffsetsCommittedException.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Number of insync replicas for the partition is lower than min.insync.replicas
+ * This exception is raised when the low ISR size is discovered *after* the message
+ * was already appended to the log. Producer retries will cause duplicates.
+ */
+class NoOffsetsCommittedException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
index 4cabffe..1584a92 100644
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -48,7 +48,7 @@ case class OffsetMetadataAndError(offset: Long,
 }
 
 object OffsetMetadataAndError {
-  val NoOffset = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.NoError)
+  val NoOffset = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.NoOffsetsCommittedCode)
   val OffsetsLoading = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.OffsetsLoadInProgressCode)
   val NotOffsetManagerForGroup = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.NotCoordinatorForConsumerCode)
   val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.UnknownTopicOrPartitionCode)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
index 21790a5..456b602 100644
--- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
@@ -25,6 +25,7 @@ import kafka.utils._
 import scala.collection.mutable.HashMap
 
 import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
+import org.apache.kafka.common.requests.JoinGroupRequest
 
 
 /**
@@ -114,9 +115,15 @@ class ConsumerCoordinator(val config: KafkaConfig,
     if (!consumerGroupRegistries.contains(groupId))
       createNewGroup(groupId, partitionAssignmentStrategy)
 
+    val groupRegistry = consumerGroupRegistries(groupId)
+
     // if the consumer id is unknown or it does exists in
     // the group yet, register this consumer to the group
-    // TODO
+    if (consumerId.equals(JoinGroupRequest.UNKNOWN_CONSUMER_ID)) {
+      createNewConsumer(groupId, groupRegistry.generateNextConsumerId, topics, sessionTimeoutMs)
+    } else if (!groupRegistry.memberRegistries.contains(consumerId)) {
+      createNewConsumer(groupId, consumerId, topics, sessionTimeoutMs)
+    }
 
     // add a delayed join-group operation to the purgatory
     // TODO
@@ -146,9 +153,9 @@ class ConsumerCoordinator(val config: KafkaConfig,
    * Process a heartbeat request from a consumer
    */
   def consumerHeartbeat(groupId: String,
-                      consumerId: String,
-                      generationId: Int,
-                      responseCallback: Short => Unit) {
+                        consumerId: String,
+                        generationId: Int,
+                        responseCallback: Short => Unit) {
 
     // check that the group already exists
     // TODO
@@ -171,21 +178,28 @@ class ConsumerCoordinator(val config: KafkaConfig,
     // TODO: this is just a stub for new consumer testing,
     // TODO: needs to be replaced with the logic above
     // TODO --------------------------------------------------------------
-    // always return OK for heartbeat immediately
-    responseCallback(Errors.NONE.code)
+    // check if the consumer already exist, if yes return OK,
+    // otherwise return illegal generation error
+    if (consumerGroupRegistries.contains(groupId)
+      && consumerGroupRegistries(groupId).memberRegistries.contains(consumerId))
+      responseCallback(Errors.NONE.code)
+    else
+      responseCallback(Errors.ILLEGAL_GENERATION.code)
   }
 
   /**
    * Create a new consumer
    */
-  private def createNewConsumer(consumerId: String,
+  private def createNewConsumer(groupId: String,
+                                consumerId: String,
                                 topics: List[String],
-                                sessionTimeoutMs: Int,
-                                groupRegistry: GroupRegistry) {
-    debug("Registering consumer " + consumerId + " for group " + groupRegistry.groupId)
+                                sessionTimeoutMs: Int) {
+    debug("Registering consumer " + consumerId + " for group " + groupId)
 
     // create the new consumer registry entry
-    // TODO: specify consumerId as unknown and update at the end of the prepare-rebalance phase
+    val consumerRegistry = new ConsumerRegistry(groupId, consumerId, topics, sessionTimeoutMs)
+
+    consumerGroupRegistries(groupId).memberRegistries.put(consumerId, consumerRegistry)
 
     // check if the partition assignment strategy is consistent with the group
     // TODO
@@ -202,7 +216,7 @@ class ConsumerCoordinator(val config: KafkaConfig,
     // start preparing group partition rebalance
     // TODO
 
-    info("Registered consumer " + consumerId + " for group " + groupRegistry.groupId)
+    info("Registered consumer " + consumerId + " for group " + groupId)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala b/core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala
index b65c04d..2f57970 100644
--- a/core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala
+++ b/core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala
@@ -32,10 +32,10 @@ import java.util.HashMap
  *  1. subscribed topic list
  *  2. assigned partitions for the subscribed topics.
  */
-class ConsumerRegistry(val consumerId: String,
-                       val subscribedTopics: List[String],
-                       val sessionTimeoutMs: Int,
-                       val groupRegistry: GroupRegistry) {
+class ConsumerRegistry(val groupId: String,
+                       val consumerId: String,
+                       val topics: List[String],
+                       val sessionTimeoutMs: Int) {
 
   /* number of expired heartbeat recorded */
   val numExpiredHeartbeat = new AtomicInteger(0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
index b1248e9..6a6bc7b 100644
--- a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
+++ b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
@@ -43,6 +43,6 @@ class DelayedHeartbeat(sessionTimeout: Long,
   /* mark all consumers within the heartbeat as heartbeat timed out */
   override def onComplete() {
     for (registry <- bucket.consumerRegistryList)
-      expireCallback(registry.groupRegistry.groupId, registry.consumerId)
+      expireCallback(registry.groupId, registry.consumerId)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/core/src/main/scala/kafka/coordinator/GroupRegistry.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupRegistry.scala b/core/src/main/scala/kafka/coordinator/GroupRegistry.scala
index 7d17e10..94ef582 100644
--- a/core/src/main/scala/kafka/coordinator/GroupRegistry.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupRegistry.scala
@@ -18,6 +18,7 @@
 package kafka.coordinator
 
 import scala.collection.mutable
+import java.util.concurrent.atomic.AtomicInteger
 
 sealed trait GroupStates { def state: Byte }
 
@@ -69,6 +70,10 @@ class GroupRegistry(val groupId: String,
 
   val state: GroupState = new GroupState()
 
-  var generationId: Int = 1
+  val generationId = new AtomicInteger(1)
+
+  val nextConsumerId = new AtomicInteger(1)
+
+  def generateNextConsumerId = groupId + "-" + nextConsumerId.getAndIncrement
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 378a74d..dddef93 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -311,7 +311,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
         throw new IllegalStateException("Kafka server is still starting up, cannot shut down!")
 
       val canShutdown = isShuttingDown.compareAndSet(false, true)
-      if (canShutdown) {
+      if (canShutdown && shutdownLatch.getCount > 0) {
         Utils.swallow(controlledShutdown())
         brokerState.newState(BrokerShuttingDown)
         if(socketServer != null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
new file mode 100644
index 0000000..ef59ed9
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package kafka.api
+
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.consumer.CommitType
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException
+
+import kafka.utils.{ShutdownableThread, TestUtils, Logging}
+import kafka.server.OffsetManager
+
+import java.util.ArrayList
+import org.junit.Assert._
+
+import scala.collection.JavaConversions._
+
+
+/**
+ * Integration tests for the new consumer that cover basic usage as well as server failures
+ */
+class ConsumerTest extends IntegrationTestHarness with Logging {
+
+  val producerCount = 1
+  val consumerCount = 2
+  val serverCount = 3
+
+  val topic = "topic"
+  val part = 0
+  val tp = new TopicPartition(topic, part)
+
+  // configure the servers and clients
+  this.serverConfig.setProperty("controlled.shutdown.enable", "false") // speed up shutdown
+  this.serverConfig.setProperty("offsets.topic.replication.factor", "3") // don't want to lose offset
+  this.serverConfig.setProperty("offsets.topic.num.partitions", "1")
+  this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
+  this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
+  this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
+  this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+  
+  override def setUp() {
+    super.setUp()
+
+    // create the test topic with all the brokers as replicas
+    TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers)
+  }
+
+  def testSimpleConsumption() {
+    val numRecords = 10000
+    sendRecords(numRecords)
+
+    assertEquals(0, this.consumers(0).subscriptions.size)
+    this.consumers(0).subscribe(tp)
+    assertEquals(1, this.consumers(0).subscriptions.size)
+    
+    this.consumers(0).seek(tp, 0)
+    consumeRecords(this.consumers(0), numRecords = numRecords, startingOffset = 0)
+  }
+
+  def testAutoOffsetReset() {
+    sendRecords(1)
+    this.consumers(0).subscribe(tp)
+    consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
+  }
+
+  def testSeek() {
+    val consumer = this.consumers(0)
+    val totalRecords = 50L
+    sendRecords(totalRecords.toInt)
+    consumer.subscribe(tp)
+
+    consumer.seekToEnd(tp)
+    assertEquals(totalRecords, consumer.position(tp))
+    assertFalse(consumer.poll(totalRecords).iterator().hasNext)
+
+    consumer.seekToBeginning(tp)
+    assertEquals(0, consumer.position(tp), 0)
+    consumeRecords(consumer, numRecords = 1, startingOffset = 0)
+
+    val mid = totalRecords / 2
+    consumer.seek(tp, mid)
+    assertEquals(mid, consumer.position(tp))
+    consumeRecords(consumer, numRecords = 1, startingOffset = mid.toInt)
+  }
+
+  def testGroupConsumption() {
+    sendRecords(10)
+    this.consumers(0).subscribe(topic)
+    consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
+  }
+
+  def testPositionAndCommit() {
+    sendRecords(5)
+
+    // committed() on a partition with no committed offset throws an exception
+    intercept[NoOffsetForPartitionException] {
+      this.consumers(0).committed(new TopicPartition(topic, 15))
+    }
+
+    // position() on a partition that we aren't subscribed to throws an exception
+    intercept[IllegalArgumentException] {
+      this.consumers(0).position(new TopicPartition(topic, 15))
+    }
+
+    this.consumers(0).subscribe(tp)
+
+    assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers(0).position(tp))
+    this.consumers(0).commit(CommitType.SYNC)
+    assertEquals(0L, this.consumers(0).committed(tp))
+
+    consumeRecords(this.consumers(0), 5, 0)
+    assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers(0).position(tp))
+    this.consumers(0).commit(CommitType.SYNC)
+    assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp))
+
+    sendRecords(1)
+
+    // another consumer in the same group should get the same position
+    this.consumers(1).subscribe(tp)
+    consumeRecords(this.consumers(1), 1, 5)
+  }
+
+  def testPartitionsFor() {
+    val numParts = 2
+    TestUtils.createTopic(this.zkClient, "part-test", numParts, 1, this.servers)
+    val parts = this.consumers(0).partitionsFor("part-test")
+    assertNotNull(parts)
+    assertEquals(2, parts.length)
+    assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
+  }
+
+  def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(5)
+
+  /*
+   * 1. Produce a bunch of messages
+   * 2. Then consume the messages while killing and restarting brokers at random
+   */
+  def consumeWithBrokerFailures(numIters: Int) {
+    val numRecords = 1000
+    sendRecords(numRecords)
+    this.producers.map(_.close)
+
+    var consumed = 0
+    val consumer = this.consumers(0)
+    consumer.subscribe(topic)
+
+    val scheduler = new BounceBrokerScheduler(numIters)
+    scheduler.start()
+
+    while (scheduler.isRunning.get()) {
+      for (record <- consumer.poll(100)) {
+        assertEquals(consumed.toLong, record.offset())
+        consumed += 1
+      }
+      consumer.commit(CommitType.SYNC)
+
+      if (consumed == numRecords) {
+        consumer.seekToBeginning()
+        consumed = 0
+      }
+    }
+    scheduler.shutdown()
+  }
+
+  def testSeekAndCommitWithBrokerFailures() = seekAndCommitWithBrokerFailures(5)
+
+  def seekAndCommitWithBrokerFailures(numIters: Int) {
+    val numRecords = 1000
+    sendRecords(numRecords)
+    this.producers.map(_.close)
+
+    val consumer = this.consumers(0)
+    consumer.subscribe(tp)
+    consumer.seek(tp, 0)
+
+    val scheduler = new BounceBrokerScheduler(numIters)
+    scheduler.start()
+
+    while(scheduler.isRunning.get()) {
+      val coin = TestUtils.random.nextInt(3)
+      if (coin == 0) {
+        info("Seeking to end of log")
+        consumer.seekToEnd()
+        assertEquals(numRecords.toLong, consumer.position(tp))
+      } else if (coin == 1) {
+        val pos = TestUtils.random.nextInt(numRecords).toLong
+        info("Seeking to " + pos)
+        consumer.seek(tp, pos)
+        assertEquals(pos, consumer.position(tp))
+      } else if (coin == 2) {
+        info("Committing offset.")
+        consumer.commit(CommitType.SYNC)
+        assertEquals(consumer.position(tp), consumer.committed(tp))
+      }
+    }
+  }
+  
+  def testPartitionReassignmentCallback() {
+    val callback = new TestConsumerReassignmentCallback()
+    this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "200"); // timeout quickly to avoid slow test
+    val consumer0 = new KafkaConsumer(this.consumerConfig, callback, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    consumer0.subscribe(topic)
+        
+    // the initial subscription should cause a callback execution
+    while(callback.callsToAssigned == 0)
+      consumer0.poll(50)
+    
+    // get metadata for the topic
+    var parts = consumer0.partitionsFor(OffsetManager.OffsetsTopicName)
+    while(parts == null)
+      parts = consumer0.partitionsFor(OffsetManager.OffsetsTopicName)
+    assertEquals(1, parts.size)
+    assertNotNull(parts(0).leader())
+    
+    // shutdown the coordinator
+    val coordinator = parts(0).leader().id()
+    this.servers(coordinator).shutdown()
+    
+    // this should cause another callback execution
+    while(callback.callsToAssigned < 2)
+      consumer0.poll(50)
+    assertEquals(2, callback.callsToAssigned)
+    assertEquals(2, callback.callsToRevoked)
+
+    consumer0.close()
+  }
+
+  private class TestConsumerReassignmentCallback extends ConsumerRebalanceCallback {
+    var callsToAssigned = 0
+    var callsToRevoked = 0
+    def onPartitionsAssigned(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) {
+      info("onPartitionsAssigned called.")
+      callsToAssigned += 1
+    }
+    def onPartitionsRevoked(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) {
+      info("onPartitionsRevoked called.")
+      callsToRevoked += 1
+    } 
+  }
+
+  private class BounceBrokerScheduler(val numIters: Int) extends ShutdownableThread("daemon-bounce-broker", false)
+  {
+    var iter: Int = 0
+
+    override def doWork(): Unit = {
+      killRandomBroker()
+      restartDeadBrokers()
+
+      iter += 1
+      if (iter == numIters)
+        initiateShutdown()
+      else
+        Thread.sleep(500)
+    }
+  }
+
+  private def sendRecords(numRecords: Int) {
+    val futures = (0 until numRecords).map { i =>
+      this.producers(0).send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))
+    }
+    futures.map(_.get)
+  }
+
+  private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int, startingOffset: Int) {
+    val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
+    val maxIters = numRecords * 300
+    var iters = 0
+    while (records.size < numRecords) {
+      for (record <- consumer.poll(50))
+        records.add(record)
+      if(iters > maxIters)
+        throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.");
+      iters += 1
+    }
+    for (i <- 0 until numRecords) {
+      val record = records.get(i)
+      val offset = startingOffset + i
+      assertEquals(topic, record.topic())
+      assertEquals(part, record.partition())
+      assertEquals(offset.toLong, record.offset())
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 82fe4c9..5b7e366 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -19,14 +19,11 @@ package kafka.api
 
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.scalatest.junit.JUnit3Suite
-import collection._
 import kafka.utils.TestUtils
 import java.util.Properties
-import java.util.Arrays
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer.KafkaProducer
-import kafka.server.KafkaConfig
+import kafka.server.{OffsetManager, KafkaConfig}
 import kafka.integration.KafkaServerTestHarness
 import scala.collection.mutable.Buffer
 
@@ -62,6 +59,13 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
       producers += new KafkaProducer(producerConfig)
     for(i <- 0 until consumerCount)
       consumers += new KafkaConsumer(consumerConfig)
+
+    // create the consumer offset topic
+    TestUtils.createTopic(zkClient, OffsetManager.OffsetsTopicName,
+      serverConfig.getProperty("offsets.topic.num.partitions").toInt,
+      serverConfig.getProperty("offsets.topic.replication.factor").toInt,
+      servers,
+      servers(0).offsetManager.offsetsTopicConfig)
   }
   
   override def tearDown() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 8246e12..84689e1 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package kafka.api.test
+package kafka.api
 
 import org.junit.Test
 import org.junit.Assert._

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index dc0512b..062790f 100644
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -75,7 +75,7 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
    */
   def restartDeadBrokers() {
     for(i <- 0 until servers.length if !alive(i)) {
-      servers(i) = TestUtils.createServer(configs(i))
+      servers(i).startup()
       alive(i) = true
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index ea9b315..e4d0435 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -159,7 +159,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.error)
     assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.error)
     assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.error)
-    assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.error)
+    assertEquals(ErrorMapping.NoOffsetsCommittedCode, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.error)
     assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.error)
     assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get)
     assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0b92cec1/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 6ce1807..52c7920 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -48,6 +48,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
 import collection.Iterable
 
 import scala.collection.Map
+import org.apache.kafka.clients.consumer.KafkaConsumer
 
 /**
  * Utility functions to help with testing
@@ -407,6 +408,27 @@ object TestUtils extends Logging {
   }
 
   /**
+   * Create a new consumer with a few pre-configured properties.
+   */
+  def createNewConsumer(brokerList: String,
+                        groupId: String,
+                        autoOffsetReset: String = "earliest",
+                        partitionFetchSize: Long = 4096L) : KafkaConsumer[Array[Byte],Array[Byte]] = {
+    import org.apache.kafka.clients.consumer.ConsumerConfig
+
+    val consumerProps= new Properties()
+    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset)
+    consumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, partitionFetchSize.toString)
+    consumerProps.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
+    consumerProps.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
+    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
+    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
+    return new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps)
+  }
+
+  /**
    * Create a default producer config properties map with the given metadata broker list
    */
   def getProducerConfig(brokerList: String): Properties = {


Mime
View raw message