kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2487: change kafka.examples.Consumer to use the new java consumer
Date Fri, 16 Oct 2015 00:23:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 50a076d1e -> 5338f8432


KAFKA-2487: change kafka.examples.Consumer to use the new java consumer

Author: Ashish Singh <asingh@cloudera.com>

Reviewers: Guozhang Wang

Closes #297 from SinghAsDev/KAFKA-2487


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5338f843
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5338f843
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5338f843

Branch: refs/heads/trunk
Commit: 5338f8432f74054671e9da59ba6f97abb81a03f2
Parents: 50a076d
Author: Ashish Singh <asingh@cloudera.com>
Authored: Thu Oct 15 17:28:23 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Oct 15 17:28:23 2015 -0700

----------------------------------------------------------------------
 .../src/main/java/kafka/examples/Consumer.java  | 72 ++++++++++----------
 1 file changed, 36 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5338f843/examples/src/main/java/kafka/examples/Consumer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java
index 8af64d8..3bb93ee 100644
--- a/examples/src/main/java/kafka/examples/Consumer.java
+++ b/examples/src/main/java/kafka/examples/Consumer.java
@@ -16,52 +16,52 @@
  */
 package kafka.examples;
 
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.Collections;
 import java.util.Properties;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
 
+import kafka.utils.ShutdownableThread;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 
-public class Consumer extends Thread
+public class Consumer extends ShutdownableThread
 {
-  private final ConsumerConnector consumer;
+  private final KafkaConsumer<Integer, String> consumer;
   private final String topic;
-  
+
   public Consumer(String topic)
   {
-    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
-            createConsumerConfig());
+    super("KafkaConsumerExample", false);
+    Properties props = new Properties();
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
+    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
+    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+
+    consumer = new KafkaConsumer<>(props);
     this.topic = topic;
   }
 
-  private static ConsumerConfig createConsumerConfig()
-  {
-    Properties props = new Properties();
-    props.put("zookeeper.connect", KafkaProperties.zkConnect);
-    props.put("group.id", KafkaProperties.groupId);
-    props.put("zookeeper.session.timeout.ms", "400");
-    props.put("zookeeper.sync.time.ms", "200");
-    props.put("auto.commit.interval.ms", "1000");
-
-    return new ConsumerConfig(props);
+  @Override
+  public void doWork() {
+    consumer.subscribe(Collections.singletonList(this.topic));
+    ConsumerRecords<Integer, String> records = consumer.poll(1000);
+    for (ConsumerRecord<Integer, String> record : records) {
+      System.out.println("Received message: (" + record.key() + ", " + record.value() + ")
at offset " + record.offset());
+    }
+  }
 
+  @Override
+  public String name() {
+    return null;
   }
- 
-  public void run() {
-    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
-    topicCountMap.put(topic, 1);
-    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
-    KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
-    for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
-      System.out.println("Received message: (" + ByteBuffer.wrap(messageAndMetadata.key()).getInt()
+
-          ", " +
-          "" + new String(messageAndMetadata.message()) + ")");
-    }
+
+  @Override
+  public boolean isInterruptible() {
+    return false;
   }
-}
+}
\ No newline at end of file


Mime
View raw message