kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-2399; Replace `Stream.continually` with `Iterator.continually`
Date Wed, 09 Dec 2015 01:22:12 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d09188aa3 -> b5f5be69e


KAFKA-2399; Replace `Stream.continually` with `Iterator.continually`

`Iterator.continually` is more efficient (it doesn't allocate a `Cons` instance per element)
and we don't need the extra functionality provided by `Stream.continually`.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Ewen Cheslack-Postava

Closes #106 from ijuma/kafka-2399-replace-stream-continually


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b5f5be69
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b5f5be69
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b5f5be69

Branch: refs/heads/trunk
Commit: b5f5be69ea99f3b434b187129e7319607009a8f3
Parents: d09188a
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Tue Dec 8 17:22:03 2015 -0800
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Tue Dec 8 17:22:03 2015 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/producer/async/ProducerSendThread.scala  | 2 +-
 core/src/main/scala/kafka/utils/CoreUtils.scala               | 7 ++-----
 2 files changed, 3 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b5f5be69/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index 2ccf82a..8a903f3 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -63,7 +63,7 @@ class ProducerSendThread[K,V](val threadName: String,
     var full: Boolean = false
 
     // drain the queue until you get a shutdown command
-    Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds),
TimeUnit.MILLISECONDS))
+    Iterator.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds),
TimeUnit.MILLISECONDS))
                       .takeWhile(item => if(item != null) item ne shutdownCommand else
true).foreach {
       currentQueueItem =>
         val elapsed = (SystemTime.milliseconds - lastSend)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b5f5be69/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index e25468f..b70a1e6 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -228,11 +228,8 @@ object CoreUtils extends Logging {
    * @param coll An iterable over the underlying collection.
    * @return A circular iterator over the collection.
    */
-  def circularIterator[T](coll: Iterable[T]) = {
-    val stream: Stream[T] =
-      for (forever <- Stream.continually(1); t <- coll) yield t
-    stream.iterator
-  }
+  def circularIterator[T](coll: Iterable[T]) =
+    for (_ <- Iterator.continually(1); t <- coll) yield t
 
   /**
    * Replace the given string suffix with the new suffix. If the string doesn't end with
the given suffix throw an exception.


Mime
View raw message