kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject [6/7] kafka git commit: KAFKA-1760: New consumer.
Date Fri, 30 Jan 2015 02:39:36 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
index bdf4b26..416d703 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -9,53 +9,98 @@
  * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
  * specific language governing permissions and limitations under the License.
-*/
+ */
 package org.apache.kafka.clients.consumer;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.AbstractIterator;
 
 /**
- * A container that holds the list {@link ConsumerRecord} per partition for a particular
topic. There is one for every topic returned by a 
- * {@link Consumer#poll(long)} operation. 
+ * A container that holds the list {@link ConsumerRecord} per partition for a
+ * particular topic. There is one for every topic returned by a
+ * {@link Consumer#poll(long)} operation.
  */
-public class ConsumerRecords<K,V> {
+public class ConsumerRecords<K,V> implements Iterable<ConsumerRecord<K,V>>
{
 
-    private final String topic;
-    private final Map<Integer, List<ConsumerRecord<K,V>>> recordsPerPartition;
-    
-    public ConsumerRecords(String topic, Map<Integer, List<ConsumerRecord<K,V>>>
records) {
-        this.topic = topic;
-        this.recordsPerPartition = records;
+    private final Map<TopicPartition, List<ConsumerRecord<K,V>>> records;
+
+    public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K,V>>>
records) {
+        this.records = records;
     }
-    
+
     /**
-     * @param partitions The input list of partitions for a particular topic. If no partitions
are 
-     * specified, returns records for all partitions
-     * @return The list of {@link ConsumerRecord}s associated with the given partitions.
+     * Get just the records for the given partition
+     * 
+     * @param partition The partition to get records for
      */
-    public List<ConsumerRecord<K,V>> records(int... partitions) {
-        List<ConsumerRecord<K,V>> recordsToReturn = new ArrayList<ConsumerRecord<K,V>>();
-        if(partitions.length == 0) {
-            // return records for all partitions
-            for(Entry<Integer, List<ConsumerRecord<K,V>>> record : recordsPerPartition.entrySet())
{
-                recordsToReturn.addAll(record.getValue());
-            }
-        } else {
-           for(int partition : partitions) {
-               List<ConsumerRecord<K,V>> recordsForThisPartition = recordsPerPartition.get(partition);
-               recordsToReturn.addAll(recordsForThisPartition);
-           }
+    public Iterable<ConsumerRecord<K,V>> records(TopicPartition partition) {
+        List<ConsumerRecord<K,V>> recs = this.records.get(partition);
+        if (recs == null)
+            return Collections.emptyList();
+        else
+            return recs;
+    }
+
+    /**
+     * Get just the records for the given topic
+     */
+    public Iterable<ConsumerRecord<K,V>> records(String topic) {
+        if (topic == null)
+            throw new IllegalArgumentException("Topic must be non-null.");
+        List<List<ConsumerRecord<K,V>>> recs = new ArrayList<List<ConsumerRecord<K,V>>>();
+        for (Map.Entry<TopicPartition, List<ConsumerRecord<K,V>>> entry
: records.entrySet()) {
+            if (entry.getKey().equals(topic))
+                recs.add(entry.getValue());
         }
-        return recordsToReturn;
+        return new ConcatenatedIterable<K,V>(recs);
     }
 
+    @Override
+    public Iterator<ConsumerRecord<K,V>> iterator() {
+        return new ConcatenatedIterable<K,V>(records.values()).iterator();
+    }
+    
     /**
-     * @return The topic of all records associated with this instance
+     * The number of records for all topics
      */
-    public String topic() {
-        return this.topic;
+    public int count() {
+        int count = 0;
+        for(List<ConsumerRecord<K,V>> recs: this.records.values())
+            count += recs.size();
+        return count;
+    }
+
+    private static class ConcatenatedIterable<K,V> implements Iterable<ConsumerRecord<K,V>>
{
+
+        private final Iterable<? extends Iterable<ConsumerRecord<K,V>>>
iterables;
+
+        public ConcatenatedIterable(Iterable<? extends Iterable<ConsumerRecord<K,V>>>
iterables) {
+            this.iterables = iterables;
+        }
+
+        @Override
+        public Iterator<ConsumerRecord<K,V>> iterator() {
+            return new AbstractIterator<ConsumerRecord<K,V>>() {
+                Iterator<? extends Iterable<ConsumerRecord<K,V>>> iters
= iterables.iterator();
+                Iterator<ConsumerRecord<K,V>> current;
+
+                public ConsumerRecord<K,V> makeNext() {
+                    if (current == null || !current.hasNext()) {
+                        if (iters.hasNext())
+                            current = iters.next().iterator();
+                        else
+                            return allDone();
+                    }
+                    return current.next();
+                }
+            };
+        }
     }
+
 }


Mime
View raw message