Added: incubator/kafka/trunk/clients/php/src/tests/Kafka/MessageTest.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/tests/Kafka/MessageTest.php?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/tests/Kafka/MessageTest.php (added)
+++ incubator/kafka/trunk/clients/php/src/tests/Kafka/MessageTest.php Mon Aug 1 23:41:24 2011
@@ -0,0 +1,45 @@
+<?php
+
+/**
+ * @author Lorenzo Alberton <l.alberton@quipo.it>
+ */
+class Kafka_MessageTest extends PHPUnit_Framework_TestCase
+{
+ private $test;
+ private $encoded;
+ private $msg;
+ public function setUp() {
+ $this->test = 'a sample string';
+ $this->encoded = Kafka_Encoder::encode_message($this->test);
+ $this->msg = new Kafka_Message($this->encoded);
+
+ }
+
+ public function testPayload() {
+ $this->assertEquals($this->test, $this->msg->payload());
+ }
+
+ public function testValid() {
+ $this->assertTrue($this->msg->isValid());
+ }
+
+ public function testEncode() {
+ $this->assertEquals($this->encoded, $this->msg->encode());
+ }
+
+ public function testChecksum() {
+ $this->assertInternalType('integer', $this->msg->checksum());
+ }
+
+ public function testSize() {
+ $this->assertEquals(strlen($this->test), $this->msg->size());
+ }
+
+ public function testToString() {
+ $this->assertInternalType('string', $this->msg->__toString());
+ }
+
+ public function testMagic() {
+ $this->assertInternalType('integer', $this->msg->magic());
+ }
+}
Added: incubator/kafka/trunk/clients/php/src/tests/Kafka/ProducerTest.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/tests/Kafka/ProducerTest.php?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/tests/Kafka/ProducerTest.php (added)
+++ incubator/kafka/trunk/clients/php/src/tests/Kafka/ProducerTest.php Mon Aug 1 23:41:24 2011
@@ -0,0 +1,59 @@
+<?php
+
+/**
+ * Override connect() method of base class
+ *
+ * @author Lorenzo Alberton <l.alberton@quipo.it>
+ */
+class Kafka_ProducerMock extends Kafka_Producer {
+ public function connect() {
+ if (!is_resource($this->conn)) {
+ $this->conn = fopen('php://temp', 'w+b');
+ }
+ }
+
+ public function getData() {
+ $this->connect();
+ rewind($this->conn);
+ return stream_get_contents($this->conn);
+ }
+}
+
+/**
+ * Description of ProducerTest
+ *
+ * @author Lorenzo Alberton <l.alberton@quipo.it>
+ */
+class Kafka_ProducerTest extends PHPUnit_Framework_TestCase
+{
+ /**
+ * @var Kafka_Producer
+ */
+ private $producer;
+
+ public function setUp() {
+ $this->producer = new Kafka_ProducerMock('localhost', 1234);
+ }
+
+ public function tearDown() {
+ $this->producer->close();
+ unset($this->producer);
+ }
+
+
+ public function testProducer() {
+ $messages = array(
+ 'test 1',
+ 'test 2 abc',
+ );
+ $topic = 'a topic';
+ $partition = 3;
+ $this->producer->send($messages, $topic, $partition);
+ $sent = $this->producer->getData();
+ $this->assertContains($topic, $sent);
+ $this->assertContains($partition, $sent);
+ foreach ($messages as $msg) {
+ $this->assertContains($msg, $sent);
+ }
+ }
+}
Added: incubator/kafka/trunk/clients/php/src/tests/bootstrap.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/tests/bootstrap.php?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/tests/bootstrap.php (added)
+++ incubator/kafka/trunk/clients/php/src/tests/bootstrap.php Mon Aug 1 23:41:24 2011
@@ -0,0 +1,36 @@
+<?php
+
+function test_autoload($className)
+{
+ $classFile = str_replace('_', DIRECTORY_SEPARATOR, $className) . '.php';
+ if (function_exists('stream_resolve_include_path')) {
+ $file = stream_resolve_include_path($classFile);
+ } else {
+ foreach (explode(PATH_SEPARATOR, get_include_path()) as $path) {
+ if (file_exists($path . '/' . $classFile)) {
+ $file = $path . '/' . $classFile;
+ break;
+ }
+ }
+ }
+ /* If file is found, store it into the cache, classname <-> file association */
+ if (($file !== false) && ($file !== null)) {
+ include $file;
+ return;
+ }
+
+ throw new RuntimeException($className. ' not found');
+}
+
+// register the autoloader
+spl_autoload_register('test_autoload');
+
+set_include_path(
+ implode(PATH_SEPARATOR, array(
+ realpath(dirname(__FILE__).'/../lib'),
+ get_include_path(),
+ ))
+);
+
+date_default_timezone_set('Europe/London');
+
\ No newline at end of file
Added: incubator/kafka/trunk/clients/php/src/tests/phpunit.xml
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/tests/phpunit.xml?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/tests/phpunit.xml (added)
+++ incubator/kafka/trunk/clients/php/src/tests/phpunit.xml Mon Aug 1 23:41:24 2011
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<phpunit
+ bootstrap="./bootstrap.php"
+ colors="true"
+ backupGlobals="false"
+ backupStaticAttributes="false">
+
+ <testsuite name="Kafka PHP Client Test Suite">
+ <directory>./Kafka</directory>
+ </testsuite>
+
+ <filter>
+ <blacklist>
+ <directory>./</directory>
+ </blacklist>
+ </filter>
+</phpunit>
Added: incubator/kafka/trunk/clients/python/kafka.py
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/python/kafka.py?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/python/kafka.py (added)
+++ incubator/kafka/trunk/clients/python/kafka.py Mon Aug 1 23:41:24 2011
@@ -0,0 +1,68 @@
+# Copyright 2010 LinkedIn
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import socket
+import struct
+import binascii
+import sys
+
+PRODUCE_REQUEST_ID = 0
+
+def encode_message(message):
+ # <MAGIC_BYTE: char> <CRC32: int> <PAYLOAD: bytes>
+ return struct.pack('>B', 0) + \
+ struct.pack('>i', binascii.crc32(message)) + \
+ message
+
+def encode_produce_request(topic, partition, messages):
+ # encode messages as <LEN: int><MESSAGE_BYTES>
+ encoded = [encode_message(message) for message in messages]
+ message_set = ''.join([struct.pack('>i', len(m)) + m for m in encoded])
+
+ # create the request as <REQUEST_SIZE: int> <REQUEST_ID: short> <TOPIC: bytes> <PARTITION: int> <BUFFER_SIZE: int> <BUFFER: bytes>
+ data = struct.pack('>H', PRODUCE_REQUEST_ID) + \
+ struct.pack('>H', len(topic)) + topic + \
+ struct.pack('>i', partition) + \
+ struct.pack('>i', len(message_set)) + message_set
+ return struct.pack('>i', len(data)) + data
+
+
+class KafkaProducer:
+ def __init__(self, host, port):
+ self.REQUEST_KEY = 0
+ self.connection = socket.socket()
+ self.connection.connect((host, port))
+
+ def close(self):
+ self.connection.close()
+
+ def send(self, messages, topic, partition = 0):
+ self.connection.sendall(encode_produce_request(topic, partition, messages))
+
+if __name__ == '__main__':
+ if len(sys.argv) < 4:
+ print >> sys.stderr, 'USAGE: python', sys.argv[0], 'host port topic'
+ sys.exit(1)
+ host = sys.argv[1]
+ port = int(sys.argv[2])
+ topic = sys.argv[3]
+
+ producer = KafkaProducer(host, port)
+
+ while True:
+ print 'Enter comma seperated messages: ',
+ line = sys.stdin.readline()
+ messages = line.split(',')
+ producer.send(messages, topic)
+ print 'Sent', len(messages), 'messages successfully'
\ No newline at end of file
Added: incubator/kafka/trunk/clients/python/setup.py
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/python/setup.py?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/python/setup.py (added)
+++ incubator/kafka/trunk/clients/python/setup.py Mon Aug 1 23:41:24 2011
@@ -0,0 +1,15 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from distutils.core import setup
+
+setup(
+ name='kafka-python-client',
+ version='0.6',
+ description='This library implements a Kafka client',
+ author='LinkedIn.com',
+ url='https://github.com/kafka-dev/kafka',
+ package_dir={'': '.'},
+ py_modules=[
+ 'kafka',
+ ],
+)
Added: incubator/kafka/trunk/clients/ruby/LICENSE
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/LICENSE?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/LICENSE (added)
+++ incubator/kafka/trunk/clients/ruby/LICENSE Mon Aug 1 23:41:24 2011
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+END OF TERMS AND CONDITIONS
+
+APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+Copyright 2011 LinkedIn
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
Added: incubator/kafka/trunk/clients/ruby/README.md
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/README.md?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/README.md (added)
+++ incubator/kafka/trunk/clients/ruby/README.md Mon Aug 1 23:41:24 2011
@@ -0,0 +1,62 @@
+# kafka-rb
+kafka-rb allows you to produce messages to the Kafka distributed publish/subscribe messaging service.
+
+## Requirements
+You need to have access to your Kafka instance and be able to connect through TCP. You can obtain a copy and instructions on how to setup kafka at https://github.com/kafka-dev/kafka
+
+## Installation
+sudo gem install kafka-rb
+
+(the code works fine with JRuby, Ruby 1.8x and Ruby 1.9.x)
+
+## Usage
+
+### Sending a simple message
+
+ require 'kafka'
+ producer = Kafka::Producer.new
+ message = Kafka::Message.new("some random message content")
+ producer.send(message)
+
+### Sending a sequence of messages
+
+ require 'kafka'
+ producer = Kafka::Producer.new
+ message1 = Kafka::Message.new("some random message content")
+ message2 = Kafka::Message.new("some more content")
+ producer.send([message1, message2])
+
+### Batching a bunch of messages using the block syntax
+
+ require 'kafka'
+ producer = Kafka::Producer.new
+ producer.batch do |messages|
+ puts "Batching a send of multiple messages.."
+ messages << Kafka::Message.new("first message to send")
+ messages << Kafka::Message.new("second message to send")
+ end
+
+* they will be sent all at once, after the block execution
+
+### Consuming messages one by one
+
+ require 'kafka'
+ consumer = Kafka::Consumer.new
+ messages = consumer.consume
+
+
+### Consuming messages using a block loop
+
+ require 'kafka'
+ consumer = Kafka::Consumer.new
+ consumer.loop do |messages|
+ puts "Received"
+ puts messages
+ end
+
+
+Contact for questions
+
+alejandrocrosa at(@) gmail.com
+
+http://twitter.com/alejandrocrosa
Added: incubator/kafka/trunk/clients/ruby/Rakefile
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/Rakefile?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/Rakefile (added)
+++ incubator/kafka/trunk/clients/ruby/Rakefile Mon Aug 1 23:41:24 2011
@@ -0,0 +1,61 @@
+require 'rubygems'
+require 'rake/gempackagetask'
+require 'rubygems/specification'
+require 'date'
+require 'rspec/core/rake_task'
+
+GEM = 'kafka-rb'
+GEM_NAME = 'Kafka Client'
+GEM_VERSION = '0.0.5'
+AUTHORS = ['Alejandro Crosa']
+EMAIL = "alejandrocrosa@gmail.com"
+HOMEPAGE = "http://github.com/acrosa/kafka-rb"
+SUMMARY = "A Ruby client for the Kafka distributed publish/subscribe messaging service"
+DESCRIPTION = "kafka-rb allows you to produce and consume messages using the Kafka distributed publish/subscribe messaging service."
+
+spec = Gem::Specification.new do |s|
+ s.name = GEM
+ s.version = GEM_VERSION
+ s.platform = Gem::Platform::RUBY
+ s.has_rdoc = true
+ s.extra_rdoc_files = ["LICENSE"]
+ s.summary = SUMMARY
+ s.description = DESCRIPTION
+ s.authors = AUTHORS
+ s.email = EMAIL
+ s.homepage = HOMEPAGE
+ s.add_development_dependency "rspec"
+ s.require_path = 'lib'
+ s.autorequire = GEM
+ s.files = %w(LICENSE README.md Rakefile) + Dir.glob("{lib,tasks,spec}/**/*")
+end
+
+task :default => :spec
+
+desc "Run specs"
+RSpec::Core::RakeTask.new do |t|
+ t.pattern = FileList['spec/**/*_spec.rb']
+ t.rspec_opts = %w(-fs --color)
+end
+
+Rake::GemPackageTask.new(spec) do |pkg|
+ pkg.gem_spec = spec
+end
+
+desc "install the gem locally"
+task :install => [:package] do
+ sh %{sudo gem install pkg/#{GEM}-#{GEM_VERSION}}
+end
+
+desc "create a gemspec file"
+task :make_spec do
+ File.open("#{GEM}.gemspec", "w") do |file|
+ file.puts spec.to_ruby
+ end
+end
+
+desc "Run all examples with RCov"
+RSpec::Core::RakeTask.new(:rcov) do |t|
+ t.pattern = FileList['spec/**/*_spec.rb']
+ t.rcov = true
+end
Added: incubator/kafka/trunk/clients/ruby/TODO
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/TODO?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/TODO (added)
+++ incubator/kafka/trunk/clients/ruby/TODO Mon Aug 1 23:41:24 2011
@@ -0,0 +1 @@
+* should persist the offset somewhere (currently thinking alternatives)
Added: incubator/kafka/trunk/clients/ruby/kafka-rb.gemspec
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/kafka-rb.gemspec?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/kafka-rb.gemspec (added)
+++ incubator/kafka/trunk/clients/ruby/kafka-rb.gemspec Mon Aug 1 23:41:24 2011
@@ -0,0 +1,32 @@
+# -*- encoding: utf-8 -*-
+
+Gem::Specification.new do |s|
+ s.name = %q{kafka-rb}
+ s.version = "0.0.5"
+
+ s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version=
+ s.authors = ["Alejandro Crosa"]
+ s.autorequire = %q{kafka-rb}
+ s.date = %q{2011-01-13}
+ s.description = %q{kafka-rb allows you to produce and consume messages using the Kafka distributed publish/subscribe messaging service.}
+ s.email = %q{alejandrocrosa@gmail.com}
+ s.extra_rdoc_files = ["LICENSE"]
+ s.files = ["LICENSE", "README.md", "Rakefile", "lib/kafka", "lib/kafka/batch.rb", "lib/kafka/consumer.rb", "lib/kafka/io.rb", "lib/kafka/message.rb", "lib/kafka/producer.rb", "lib/kafka/request_type.rb", "lib/kafka.rb", "spec/batch_spec.rb", "spec/consumer_spec.rb", "spec/io_spec.rb", "spec/kafka_spec.rb", "spec/message_spec.rb", "spec/producer_spec.rb", "spec/spec_helper.rb"]
+ s.homepage = %q{http://github.com/acrosa/kafka-rb}
+ s.require_paths = ["lib"]
+ s.rubygems_version = %q{1.3.7}
+ s.summary = %q{A Ruby client for the Kafka distributed publish/subscribe messaging service}
+
+ if s.respond_to? :specification_version then
+ current_version = Gem::Specification::CURRENT_SPECIFICATION_VERSION
+ s.specification_version = 3
+
+ if Gem::Version.new(Gem::VERSION) >= Gem::Version.new('1.2.0') then
+ s.add_development_dependency(%q<rspec>, [">= 0"])
+ else
+ s.add_dependency(%q<rspec>, [">= 0"])
+ end
+ else
+ s.add_dependency(%q<rspec>, [">= 0"])
+ end
+end
Added: incubator/kafka/trunk/clients/ruby/lib/kafka.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/lib/kafka.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/lib/kafka.rb (added)
+++ incubator/kafka/trunk/clients/ruby/lib/kafka.rb Mon Aug 1 23:41:24 2011
@@ -0,0 +1,13 @@
+require 'socket'
+require 'zlib'
+
+require File.join(File.dirname(__FILE__), "kafka", "io")
+require File.join(File.dirname(__FILE__), "kafka", "request_type")
+require File.join(File.dirname(__FILE__), "kafka", "error_codes")
+require File.join(File.dirname(__FILE__), "kafka", "batch")
+require File.join(File.dirname(__FILE__), "kafka", "message")
+require File.join(File.dirname(__FILE__), "kafka", "producer")
+require File.join(File.dirname(__FILE__), "kafka", "consumer")
+
+module Kafka
+end
Added: incubator/kafka/trunk/clients/ruby/lib/kafka/batch.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/lib/kafka/batch.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/lib/kafka/batch.rb (added)
+++ incubator/kafka/trunk/clients/ruby/lib/kafka/batch.rb Mon Aug 1 23:41:24 2011
@@ -0,0 +1,13 @@
+module Kafka
+ class Batch
+ attr_accessor :messages
+
+ def initialize
+ self.messages = []
+ end
+
+ def << (message)
+ self.messages << message
+ end
+ end
+end
\ No newline at end of file
Added: incubator/kafka/trunk/clients/ruby/lib/kafka/consumer.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/lib/kafka/consumer.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/lib/kafka/consumer.rb (added)
+++ incubator/kafka/trunk/clients/ruby/lib/kafka/consumer.rb Mon Aug 1 23:41:24 2011
@@ -0,0 +1,135 @@
+module Kafka
+ class Consumer
+
+ include Kafka::IO
+
+ CONSUME_REQUEST_TYPE = Kafka::RequestType::FETCH
+ MAX_SIZE = 1048576 # 1 MB
+ DEFAULT_POLLING_INTERVAL = 2 # 2 seconds
+ MAX_OFFSETS = 100
+
+ attr_accessor :topic, :partition, :offset, :max_size, :request_type, :polling
+
+ def initialize(options = {})
+ self.topic = options[:topic] || "test"
+ self.partition = options[:partition] || 0
+ self.host = options[:host] || "localhost"
+ self.port = options[:port] || 9092
+ self.offset = options[:offset] || -2
+ self.max_size = options[:max_size] || MAX_SIZE
+ self.request_type = options[:request_type] || CONSUME_REQUEST_TYPE
+ self.polling = options[:polling] || DEFAULT_POLLING_INTERVAL
+ self.connect(self.host, self.port)
+
+ if @offset < 0
+ send_offsets_request
+ offsets = read_offsets_response
+ raise Exception, "No offsets for #@topic-#@partition" if offsets.empty?
+ @offset = offsets[0]
+ end
+ end
+
+ # REQUEST TYPE ID + TOPIC LENGTH + TOPIC + PARTITION + OFFSET + MAX SIZE
+ def request_size
+ 2 + 2 + topic.length + 4 + 8 + 4
+ end
+
+ def encode_request_size
+ [self.request_size].pack("N")
+ end
+
+ def encode_request(request_type, topic, partition, offset, max_size)
+ request_type = [request_type].pack("n")
+ topic = [topic.length].pack('n') + topic
+ partition = [partition].pack("N")
+ offset = [offset].pack("Q").reverse # DIY 64bit big endian integer
+ max_size = [max_size].pack("N")
+
+ request_type + topic + partition + offset + max_size
+ end
+
+ def offsets_request_size
+ 2 + 2 + topic.length + 4 + 8 +4
+ end
+
+ def encode_offsets_request_size
+ [offsets_request_size].pack('N')
+ end
+
+ # Query the server for the offsets
+ def encode_offsets_request(topic, partition, time, max_offsets)
+ req = [Kafka::RequestType::OFFSETS].pack('n')
+ topic = [topic.length].pack('n') + topic
+ partition = [partition].pack('N')
+ time = [time].pack("q").reverse # DIY 64bit big endian integer
+ max_offsets = [max_offsets].pack('N')
+
+ req + topic + partition + time + max_offsets
+ end
+
+ def consume
+ self.send_consume_request # request data
+ data = self.read_data_response # read data response
+ self.parse_message_set_from(data) # parse message set
+ end
+
+ def loop(&block)
+ messages = []
+ while(true) do
+ messages = self.consume
+ block.call(messages) if messages && !messages.empty?
+ sleep(self.polling)
+ end
+ end
+
+ def read_data_response
+ data_length = self.socket.read(4).unpack("N").shift # read length
+ data = self.socket.read(data_length) # read message set
+ data[2, data.length] # we start with a 2 byte offset
+ end
+
+ def send_consume_request
+ self.write(self.encode_request_size) # write request_size
+ self.write(self.encode_request(self.request_type, self.topic, self.partition, self.offset, self.max_size)) # write request
+ end
+
+ def send_offsets_request
+ self.write(self.encode_offsets_request_size) # write request_size
+ self.write(self.encode_offsets_request(@topic, @partition, -2, MAX_OFFSETS)) # write request
+ end
+
+ def read_offsets_response
+ data_length = self.socket.read(4).unpack('N').shift # read length
+ data = self.socket.read(data_length) # read message
+
+ pos = 0
+ error_code = data[pos,2].unpack('n')[0]
+ raise Exception, Kafka::ErrorCodes::to_s(error_code) if error_code != Kafka::ErrorCodes::NO_ERROR
+
+ pos += 2
+ count = data[pos,4].unpack('N')[0]
+ pos += 4
+
+ res = []
+ while pos != data.size
+ res << data[pos,8].reverse.unpack('q')[0]
+ pos += 8
+ end
+
+ res
+ end
+
+ def parse_message_set_from(data)
+ messages = []
+ processed = 0
+ length = data.length - 4
+ while(processed <= length) do
+ message_size = data[processed, 4].unpack("N").shift
+ messages << Kafka::Message.parse_from(data[processed, message_size + 4])
+ processed += 4 + message_size
+ end
+ self.offset += processed
+ messages
+ end
+ end
+end
Added: incubator/kafka/trunk/clients/ruby/lib/kafka/error_codes.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/lib/kafka/error_codes.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/lib/kafka/error_codes.rb (added)
+++ incubator/kafka/trunk/clients/ruby/lib/kafka/error_codes.rb Mon Aug 1 23:41:24 2011
@@ -0,0 +1,21 @@
+module Kafka
+ module ErrorCodes
+ NO_ERROR = 0
+ OFFSET_OUT_OF_RANGE = 1
+ INVALID_MESSAGE_CODE = 2
+ WRONG_PARTITION_CODE = 3
+ INVALID_RETCH_SIZE_CODE = 4
+
+ STRINGS = {
+ 0 => 'No error',
+ 1 => 'Offset out of range',
+ 2 => 'Invalid message code',
+ 3 => 'Wrong partition code',
+ 4 => 'Invalid retch size code',
+ }
+
+ def self.to_s(code)
+ STRINGS[code] || 'Unknown error'
+ end
+ end
+end
Added: incubator/kafka/trunk/clients/ruby/lib/kafka/io.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/lib/kafka/io.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/lib/kafka/io.rb (added)
+++ incubator/kafka/trunk/clients/ruby/lib/kafka/io.rb Mon Aug 1 23:41:24 2011
@@ -0,0 +1,39 @@
+module Kafka
+ module IO
+ attr_accessor :socket, :host, :port
+
+ def connect(host, port)
+ raise ArgumentError, "No host or port specified" unless host && port
+ self.host = host
+ self.port = port
+ self.socket = TCPSocket.new(host, port)
+ end
+
+ def reconnect
+ self.disconnect
+ self.socket = self.connect(self.host, self.port)
+ end
+
+ def disconnect
+ self.socket.close rescue nil
+ self.socket = nil
+ end
+
+ def write(data)
+ self.reconnect unless self.socket
+ self.socket.write(data)
+ rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED
+ self.reconnect
+ self.socket.write(data) # retry
+ end
+
+ def read(length)
+ begin
+ self.socket.read(length)
+ rescue Errno::EAGAIN
+ self.disconnect
+ raise Errno::EAGAIN, "Timeout reading from the socket"
+ end
+ end
+ end
+end
Added: incubator/kafka/trunk/clients/ruby/lib/kafka/message.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/lib/kafka/message.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/lib/kafka/message.rb (added)
+++ incubator/kafka/trunk/clients/ruby/lib/kafka/message.rb Mon Aug 1 23:41:24 2011
@@ -0,0 +1,35 @@
+module Kafka
+
+ # A message. The format of an N byte message is the following:
+ # 1 byte "magic" identifier to allow format changes
+ # 4 byte CRC32 of the payload
+ # N - 5 byte payload
+ class Message
+
+ MAGIC_IDENTIFIER_DEFAULT = 0
+
+ attr_accessor :magic, :checksum, :payload
+
+ def initialize(payload = nil, magic = MAGIC_IDENTIFIER_DEFAULT, checksum = nil)
+ self.magic = magic
+ self.payload = payload
+ self.checksum = checksum || self.calculate_checksum
+ end
+
+ def calculate_checksum
+ Zlib.crc32(self.payload)
+ end
+
+ def valid?
+ self.checksum == Zlib.crc32(self.payload)
+ end
+
+ def self.parse_from(binary)
+ size = binary[0, 4].unpack("N").shift.to_i
+ magic = binary[4, 1].unpack("C").shift
+ checksum = binary[5, 4].unpack("N").shift
+ payload = binary[9, size] # 5 = 1 + 4 is Magic + Checksum
+ return Kafka::Message.new(payload, magic, checksum)
+ end
+ end
+end
Added: incubator/kafka/trunk/clients/ruby/lib/kafka/producer.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/lib/kafka/producer.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/lib/kafka/producer.rb (added)
+++ incubator/kafka/trunk/clients/ruby/lib/kafka/producer.rb Mon Aug 1 23:41:24 2011
@@ -0,0 +1,49 @@
+module Kafka
+ class Producer
+
+ include Kafka::IO
+
+ PRODUCE_REQUEST_ID = Kafka::RequestType::PRODUCE
+
+ attr_accessor :topic, :partition
+
+ def initialize(options = {})
+ self.topic = options[:topic] || "test"
+ self.partition = options[:partition] || 0
+ self.host = options[:host] || "localhost"
+ self.port = options[:port] || 9092
+ self.connect(self.host, self.port)
+ end
+
+ def encode(message)
+ [message.magic].pack("C") + [message.calculate_checksum].pack("N") + message.payload.to_s
+ end
+
+ def encode_request(topic, partition, messages)
+ message_set = Array(messages).collect { |message|
+ encoded_message = self.encode(message)
+ [encoded_message.length].pack("N") + encoded_message
+ }.join("")
+
+ request = [PRODUCE_REQUEST_ID].pack("n")
+ topic = [topic.length].pack("n") + topic
+ partition = [partition].pack("N")
+ messages = [message_set.length].pack("N") + message_set
+
+ data = request + topic + partition + messages
+
+ return [data.length].pack("N") + data
+ end
+
+ def send(messages)
+ self.write(self.encode_request(self.topic, self.partition, messages))
+ end
+
+ def batch(&block)
+ batch = Kafka::Batch.new
+ block.call( batch )
+ self.send(batch.messages)
+ batch.messages.clear
+ end
+ end
+end
Added: incubator/kafka/trunk/clients/ruby/lib/kafka/request_type.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/lib/kafka/request_type.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/lib/kafka/request_type.rb (added)
+++ incubator/kafka/trunk/clients/ruby/lib/kafka/request_type.rb Mon Aug 1 23:41:24 2011
@@ -0,0 +1,9 @@
+module Kafka
+ module RequestType
+ PRODUCE = 0
+ FETCH = 1
+ MULTIFETCH = 2
+ MULTIPRODUCE = 3
+ OFFSETS = 4
+ end
+end
\ No newline at end of file
Added: incubator/kafka/trunk/clients/ruby/spec/batch_spec.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/spec/batch_spec.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/spec/batch_spec.rb (added)
+++ incubator/kafka/trunk/clients/ruby/spec/batch_spec.rb Mon Aug 1 23:41:24 2011
@@ -0,0 +1,21 @@
+require File.dirname(__FILE__) + '/spec_helper'
+
+describe Batch do
+
+ before(:each) do
+ @batch = Batch.new
+ end
+
+ describe "batch messages" do
+ it "holds all messages to be sent" do
+ @batch.should respond_to(:messages)
+ @batch.messages.class.should eql(Array)
+ end
+
+ it "supports queueing/adding messages to be send" do
+ @batch.messages << mock(Kafka::Message.new("one"))
+ @batch.messages << mock(Kafka::Message.new("two"))
+ @batch.messages.length.should eql(2)
+ end
+ end
+end
\ No newline at end of file
Added: incubator/kafka/trunk/clients/ruby/spec/consumer_spec.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/spec/consumer_spec.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/spec/consumer_spec.rb (added)
+++ incubator/kafka/trunk/clients/ruby/spec/consumer_spec.rb Mon Aug 1 23:41:24 2011
@@ -0,0 +1,120 @@
+require File.dirname(__FILE__) + '/spec_helper'
+
+describe Consumer do
+
+ before(:each) do
+ @mocked_socket = mock(TCPSocket)
+ TCPSocket.stub!(:new).and_return(@mocked_socket) # don't use a real socket
+ @consumer = Consumer.new
+ end
+
+ describe "Kafka Consumer" do
+
+ it "should have a CONSUME_REQUEST_TYPE" do
+ Consumer::CONSUME_REQUEST_TYPE.should eql(1)
+ @consumer.should respond_to(:request_type)
+ end
+
+ it "should have a topic and a partition" do
+ @consumer.should respond_to(:topic)
+ @consumer.should respond_to(:partition)
+ end
+
+ it "should have a polling option, and a default value" do
+ Consumer::DEFAULT_POLLING_INTERVAL.should eql(2)
+ @consumer.should respond_to(:polling)
+ @consumer.polling.should eql(2)
+ end
+
+ it "should set a topic and partition on initialize" do
+ @consumer = Consumer.new({ :host => "localhost", :port => 9092, :topic => "testing" })
+ @consumer.topic.should eql("testing")
+ @consumer.partition.should eql(0)
+ @consumer = Consumer.new({ :topic => "testing", :partition => 3 })
+ @consumer.partition.should eql(3)
+ end
+
+ it "should set default host and port if none is specified" do
+ @consumer = Consumer.new
+ @consumer.host.should eql("localhost")
+ @consumer.port.should eql(9092)
+ end
+
+ it "should have a default offset, and be able to set it" do
+ @consumer.offset.should eql(0)
+ @consumer = Consumer.new({ :offset => 1111 })
+ @consumer.offset.should eql(1111)
+ end
+
+ it "should have a max size" do
+ Consumer::MAX_SIZE.should eql(1048576)
+ @consumer.max_size.should eql(1048576)
+ end
+
+ it "should return the size of the request" do
+ @consumer.request_size.should eql(24)
+ @consumer.topic = "someothertopicname"
+ @consumer.request_size.should eql(38)
+ @consumer.encode_request_size.should eql([@consumer.request_size].pack("N"))
+ end
+
+ it "should encode a request to consume" do
+ bytes = [Kafka::Consumer::CONSUME_REQUEST_TYPE].pack("n") + ["test".length].pack("n") + "test" + [0].pack("N") + [0].pack("L_") + [Kafka::Consumer::MAX_SIZE].pack("N")
+ @consumer.encode_request(Kafka::Consumer::CONSUME_REQUEST_TYPE, "test", 0, 0, Kafka::Consumer::MAX_SIZE).should eql(bytes)
+ end
+
+ it "should read the response data" do
+ bytes = [12].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
+ @mocked_socket.should_receive(:read).exactly(:twice).and_return(bytes)
+ @consumer.read_data_response.should eql(bytes[2, bytes.length])
+ end
+
+ it "should send a consumer request" do
+ @consumer.stub!(:encode_request_size).and_return(666)
+ @consumer.stub!(:encode_request).and_return("someencodedrequest")
+ @consumer.should_receive(:write).with("someencodedrequest").exactly(:once).and_return(true)
+ @consumer.should_receive(:write).with(666).exactly(:once).and_return(true)
+ @consumer.send_consume_request.should eql(true)
+ end
+
+ it "should parse a message set from bytes" do
+ bytes = [12].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
+ message = @consumer.parse_message_set_from(bytes).first
+ message.payload.should eql("ale")
+ message.checksum.should eql(1120192889)
+ message.magic.should eql(0)
+ message.valid?.should eql(true)
+ end
+
+ it "should consume messages" do
+ @consumer.should_receive(:send_consume_request).and_return(true)
+ @consumer.should_receive(:read_data_response).and_return("")
+ @consumer.consume.should eql([])
+ end
+
+ it "should loop and execute a block with the consumed messages" do
+ @consumer.stub!(:consume).and_return([mock(Kafka::Message)])
+ messages = []
+ messages.should_receive(:<<).exactly(:once).and_return([])
+ @consumer.loop do |message|
+ messages << message
+ break # we don't wanna loop forever on the test
+ end
+ end
+
+ it "should loop (every N seconds, configurable on polling attribute), and execute a block with the consumed messages" do
+ @consumer = Consumer.new({ :polling => 1 })
+ @consumer.stub!(:consume).and_return([mock(Kafka::Message)])
+ messages = []
+ messages.should_receive(:<<).exactly(:twice).and_return([])
+ executed_times = 0
+ @consumer.loop do |message|
+ messages << message
+ executed_times += 1
+ break if executed_times >= 2 # we don't wanna loop forever on the test, only 2 seconds
+ end
+
+ executed_times.should eql(2)
+ end
+ end
+end
Added: incubator/kafka/trunk/clients/ruby/spec/io_spec.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/spec/io_spec.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/spec/io_spec.rb (added)
+++ incubator/kafka/trunk/clients/ruby/spec/io_spec.rb Mon Aug 1 23:41:24 2011
@@ -0,0 +1,77 @@
+require File.dirname(__FILE__) + '/spec_helper'
+
+class IOTest
+ include Kafka::IO
+end
+
+describe IO do
+
+ before(:each) do
+ @mocked_socket = mock(TCPSocket)
+ TCPSocket.stub!(:new).and_return(@mocked_socket) # don't use a real socket
+ @io = IOTest.new
+ @io.connect("somehost", 9093)
+ end
+
+ describe "default methods" do
+ it "has a socket, a host and a port" do
+ [:socket, :host, :port].each do |m|
+ @io.should respond_to(m.to_sym)
+ end
+ end
+
+ it "raises an exception if no host and port is specified" do
+ lambda {
+ io = IOTest.new
+ io.connect
+ }.should raise_error(ArgumentError)
+ end
+
+ it "should remember the port and host on connect" do
+ @io.connect("somehost", 9093)
+ @io.host.should eql("somehost")
+ @io.port.should eql(9093)
+ end
+
+ it "should write to a socket" do
+ data = "some data"
+ @mocked_socket.should_receive(:write).with(data).and_return(9)
+ @io.write(data).should eql(9)
+ end
+
+ it "should read from a socket" do
+ length = 200
+ @mocked_socket.should_receive(:read).with(length).and_return(nil)
+ @io.read(length)
+ end
+
+ it "should disconnect on a timeout when reading from a socket (to aviod protocol desync state)" do
+ length = 200
+ @mocked_socket.should_receive(:read).with(length).and_raise(Errno::EAGAIN)
+ @io.should_receive(:disconnect)
+ lambda { @io.read(length) }.should raise_error(Errno::EAGAIN)
+ end
+
+ it "should disconnect" do
+ @io.should respond_to(:disconnect)
+ @mocked_socket.should_receive(:close).and_return(nil)
+ @io.disconnect
+ end
+
+ it "should reconnect" do
+ @mocked_socket.should_receive(:close)
+ @io.should_receive(:connect)
+ @io.reconnect
+ end
+
+ it "should reconnect on a broken pipe error" do
+ [Errno::ECONNABORTED, Errno::EPIPE, Errno::ECONNRESET].each do |error|
+ @mocked_socket.should_receive(:write).exactly(:twice).and_raise(error)
+ @mocked_socket.should_receive(:close).exactly(:once).and_return(nil)
+ lambda {
+ @io.write("some data to send")
+ }.should raise_error(error)
+ end
+ end
+ end
+end
Added: incubator/kafka/trunk/clients/ruby/spec/kafka_spec.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/spec/kafka_spec.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/spec/kafka_spec.rb (added)
+++ incubator/kafka/trunk/clients/ruby/spec/kafka_spec.rb Mon Aug 1 23:41:24 2011
@@ -0,0 +1,7 @@
+require File.dirname(__FILE__) + '/spec_helper'
+
+describe Kafka do
+
+ before(:each) do
+ end
+end
\ No newline at end of file
Added: incubator/kafka/trunk/clients/ruby/spec/message_spec.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/spec/message_spec.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/spec/message_spec.rb (added)
+++ incubator/kafka/trunk/clients/ruby/spec/message_spec.rb Mon Aug 1 23:41:24 2011
@@ -0,0 +1,55 @@
+require File.dirname(__FILE__) + '/spec_helper'
+
+describe Message do
+
+ before(:each) do
+ @message = Message.new
+ end
+
+ describe "Kafka Message" do
+ it "should have a default magic number" do
+ Message::MAGIC_IDENTIFIER_DEFAULT.should eql(0)
+ end
+
+ it "should have a magic field, a checksum and a payload" do
+ [:magic, :checksum, :payload].each do |field|
+ @message.should respond_to(field.to_sym)
+ end
+ end
+
+ it "should set a default value of zero" do
+ @message.magic.should eql(Kafka::Message::MAGIC_IDENTIFIER_DEFAULT)
+ end
+
+ it "should allow to set a custom magic number" do
+ @message = Message.new("ale", 1)
+ @message.magic.should eql(1)
+ end
+
+ it "should calculate the checksum (crc32 of a given message)" do
+ @message.payload = "ale"
+ @message.calculate_checksum.should eql(1120192889)
+ @message.payload = "alejandro"
+ @message.calculate_checksum.should eql(2865078607)
+ end
+
+ it "should say if the message is valid using the crc32 signature" do
+ @message.payload = "alejandro"
+ @message.checksum = 2865078607
+ @message.valid?.should eql(true)
+ @message.checksum = 0
+ @message.valid?.should eql(false)
+ @message = Message.new("alejandro", 0, 66666666) # 66666666 is a funny checksum
+ @message.valid?.should eql(false)
+ end
+
+ it "should parse a message from bytes" do
+ bytes = [12].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
+ message = Kafka::Message.parse_from(bytes)
+ message.valid?.should eql(true)
+ message.magic.should eql(0)
+ message.checksum.should eql(1120192889)
+ message.payload.should eql("ale")
+ end
+ end
+end
Added: incubator/kafka/trunk/clients/ruby/spec/producer_spec.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/spec/producer_spec.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/spec/producer_spec.rb (added)
+++ incubator/kafka/trunk/clients/ruby/spec/producer_spec.rb Mon Aug 1 23:41:24 2011
@@ -0,0 +1,95 @@
+require File.dirname(__FILE__) + '/spec_helper'
+
+describe Producer do
+
+ before(:each) do
+ @mocked_socket = mock(TCPSocket)
+ TCPSocket.stub!(:new).and_return(@mocked_socket) # don't use a real socket
+ @producer = Producer.new
+ end
+
+ describe "Kafka Producer" do
+ it "should have a PRODUCE_REQUEST_ID" do
+ Producer::PRODUCE_REQUEST_ID.should eql(0)
+ end
+
+ it "should have a topic and a partition" do
+ @producer.should respond_to(:topic)
+ @producer.should respond_to(:partition)
+ end
+
+ it "should set a topic and partition on initialize" do
+ @producer = Producer.new({ :host => "localhost", :port => 9092, :topic => "testing" })
+ @producer.topic.should eql("testing")
+ @producer.partition.should eql(0)
+ @producer = Producer.new({ :topic => "testing", :partition => 3 })
+ @producer.partition.should eql(3)
+ end
+
+ it "should set default host and port if none is specified" do
+ @producer = Producer.new
+ @producer.host.should eql("localhost")
+ @producer.port.should eql(9092)
+ end
+
+ describe "Message Encoding" do
+ it "should encode a message" do
+ message = Kafka::Message.new("alejandro")
+ full_message = [message.magic].pack("C") + [message.calculate_checksum].pack("N") + message.payload
+ @producer.encode(message).should eql(full_message)
+ end
+
+ it "should encode an empty message" do
+ message = Kafka::Message.new()
+ full_message = [message.magic].pack("C") + [message.calculate_checksum].pack("N") + message.payload.to_s
+ @producer.encode(message).should eql(full_message)
+ end
+ end
+
+ describe "Request Encoding" do
+ it "should binary encode an empty request" do
+ bytes = @producer.encode_request("test", 0, [])
+ bytes.length.should eql(20)
+ bytes.should eql("\000\000\000\020\000\000\000\004test\000\000\000\000\000\000\000\000")
+ end
+
+ it "should binary encode a request with a message, using a specific wire format" do
+ message = Kafka::Message.new("ale")
+ bytes = @producer.encode_request("test", 3, message)
+ data_size = bytes[0, 4].unpack("N").shift
+ request_id = bytes[4, 2].unpack("n").shift
+ topic_length = bytes[6, 2].unpack("n").shift
+ topic = bytes[8, 4]
+ partition = bytes[12, 4].unpack("N").shift
+ messages_length = bytes[16, 4].unpack("N").shift
+ messages = bytes[20, messages_length]
+
+ bytes.length.should eql(32)
+ data_size.should eql(28)
+ request_id.should eql(0)
+ topic_length.should eql(4)
+ topic.should eql("test")
+ partition.should eql(3)
+ messages_length.should eql(12)
+ end
+ end
+ end
+
+ it "should send messages" do
+ @producer.should_receive(:write).and_return(32)
+ message = Kafka::Message.new("ale")
+ @producer.send(message).should eql(32)
+ end
+
+ describe "Message Batching" do
+ it "should batch messages and send them at once" do
+ message1 = Kafka::Message.new("one")
+ message2 = Kafka::Message.new("two")
+ @producer.should_receive(:send).with([message1, message2]).exactly(:once).and_return(nil)
+ @producer.batch do |messages|
+ messages << message1
+ messages << message2
+ end
+ end
+ end
+end
\ No newline at end of file
Added: incubator/kafka/trunk/clients/ruby/spec/spec_helper.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/spec/spec_helper.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/spec/spec_helper.rb (added)
+++ incubator/kafka/trunk/clients/ruby/spec/spec_helper.rb Mon Aug 1 23:41:24 2011
@@ -0,0 +1,4 @@
+require 'rubygems'
+require 'kafka'
+
+include Kafka
\ No newline at end of file
Added: incubator/kafka/trunk/config/consumer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/config/consumer.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/config/consumer.properties (added)
+++ incubator/kafka/trunk/config/consumer.properties Mon Aug 1 23:41:24 2011
@@ -0,0 +1,15 @@
+# see kafka.consumer.ConsumerConfig for more details
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=127.0.0.1:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+#consumer group id
+groupid=test-consumer-group
+
+#consumer timeout
+#consumer.timeout.ms=5000
Added: incubator/kafka/trunk/config/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/config/log4j.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/config/log4j.properties (added)
+++ incubator/kafka/trunk/config/log4j.properties Mon Aug 1 23:41:24 2011
@@ -0,0 +1,22 @@
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+#log4j.appender.fileAppender=org.apache.log4j.FileAppender
+#log4j.appender.fileAppender.File=kafka-request.log
+#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
+#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n
+
+
+# Turn on all our debugging info
+log4j.logger.kafka=INFO,stdout
+#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG,stdout
+#log4j.logger.kafka.consumer.PartitionTopicInfo=TRACE,stdout
+#log4j.logger.kafka.request.logger=TRACE,fileAppender
+#log4j.additivity.kafka.request.logger=false
+#log4j.logger.kafka.network.Processor=TRACE,fileAppender
+#log4j.additivity.kafka.network.Processor=false
+#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
+
Added: incubator/kafka/trunk/config/server.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/config/server.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/config/server.properties (added)
+++ incubator/kafka/trunk/config/server.properties Mon Aug 1 23:41:24 2011
@@ -0,0 +1,64 @@
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=0
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost. If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9092
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-logs
+
+# the send buffer used by the socket server
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=1
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=2000
+
+# the interval (in ms) at which logs are checked to see if they need to be flushed to disk.
+log.default.flush.scheduler.interval.ms=1000
+
+# topic partition count map
+# topic.partition.count.map=topic1:3, topic2:4
Added: incubator/kafka/trunk/config/zookeeper.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/config/zookeeper.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/config/zookeeper.properties (added)
+++ incubator/kafka/trunk/config/zookeeper.properties Mon Aug 1 23:41:24 2011
@@ -0,0 +1,4 @@
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper
+# the port at which the clients will connect
+clientPort=2181
Added: incubator/kafka/trunk/contrib/hadoop-consumer/README
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/README?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/README (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/README Mon Aug 1 23:41:24 2011
@@ -0,0 +1,66 @@
+This is a Hadoop job that pulls data from kafka server into HDFS.
+
+It requires the following inputs from a configuration file
+(test/test.properties is an example)
+
+kafka.etl.topic : the topic to be fetched;
+
+input : input directory containing topic offsets and
+ it can be generated by DataGenerator;
+ the number of files in this directory determines the
+ number of mappers in the hadoop job;
+
+output : output directory containing kafka data and updated
+ topic offsets;
+
+kafka.request.limit : it is used to limit the number events fetched.
+
+KafkaETLRecordReader is a record reader associated with KafkaETLInputFormat.
+It fetches kafka data from the server. It starts from provided offsets
+(specified by "input") and stops when it reaches the largest available offsets
+or the specified limit (specified by "kafka.request.limit").
+
+KafkaETLJob contains some helper functions to initialize job configuration.
+
+SimpleKafkaETLJob sets up job properties and files Hadoop job.
+
+SimpleKafkaETLMapper dumps kafka data into hdfs.
+
+HOW TO RUN:
+In order to run this, make sure the HADOOP_HOME environment variable points to
+your hadoop installation directory.
+
+1. Complile using "sbt" to create a package for hadoop consumer code.
+./sbt package
+
+2. Run the hadoop-setup.sh script that enables write permission on the
+ required HDFS directory
+
+3. Produce test events in server and generate offset files
+ 1) Start kafka server [ Follow the quick start -
+ http://sna-projects.com/kafka/quickstart.php ]
+
+ 2) Update test/test.properties to change the following parameters:
+ kafka.etl.topic : topic name
+ event.count : number of events to be generated
+ kafka.server.uri : kafka server uri;
+ input : hdfs directory of offset files
+
+ 3) Produce test events to Kafka server and generate offset files
+ ./run-class.sh kafka.etl.impl.DataGenerator test/test.properties
+
+4. Fetch generated topic into HDFS:
+ 1) Update test/test.properties to change the following parameters:
+ hadoop.job.ugi : id and group
+ input : input location
+ output : output location
+ kafka.request.limit: limit the number of events to be fetched;
+ -1 means no limitation.
+ hdfs.default.classpath.dir : hdfs location of jars
+
+ 2) copy jars into hdfs
+ ./copy-jars.sh ${hdfs.default.classpath.dir}
+
+ 2) Fetch data
+ ./run-class.sh kafka.etl.impl.SimpleKafkaETLJob test/test.properties
+
Added: incubator/kafka/trunk/contrib/hadoop-consumer/copy-jars.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/copy-jars.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/copy-jars.sh (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/copy-jars.sh Mon Aug 1 23:41:24 2011
@@ -0,0 +1,55 @@
+#!/bin/bash
+
+if [ $# -lt 1 ];
+then
+ echo "USAGE: $0 dir"
+ exit 1
+fi
+
+base_dir=$(dirname $0)/../..
+
+hadoop=${HADOOP_HOME}/bin/hadoop
+
+echo "$hadoop fs -rmr $1"
+$hadoop fs -rmr $1
+
+echo "$hadoop fs -mkdir $1"
+$hadoop fs -mkdir $1
+
+# include kafka jars
+for file in $base_dir/contrib/hadoop-consumer/target/scala_2.8.0/*.jar;
+do
+ echo "$hadoop fs -put $file $1/"
+ $hadoop fs -put $file $1/
+done
+
+# include kafka jars
+echo "$hadoop fs -put $base_dir/core/target/scala_2.8.0/kafka-*.jar; $1/"
+$hadoop fs -put $base_dir/core/target/scala_2.8.0/kafka-*.jar $1/
+
+# include core lib jars
+for file in $base_dir/core/lib/*.jar;
+do
+ echo "$hadoop fs -put $file $1/"
+ $hadoop fs -put $file $1/
+done
+
+for file in $base_dir/core/lib_managed/scala_2.8.0/compile/*.jar;
+do
+ echo "$hadoop fs -put $file $1/"
+ $hadoop fs -put $file $1/
+done
+
+# include scala library jar
+echo "$hadoop fs -put $base_dir/project/boot/scala-2.8.0/lib/scala-library.jar; $1/"
+$hadoop fs -put $base_dir/project/boot/scala-2.8.0/lib/scala-library.jar $1/
+
+local_dir=$(dirname $0)
+
+# include hadoop-consumer jars
+for file in $local_dir/lib/*.jar;
+do
+ echo "$hadoop fs -put $file $1/"
+ $hadoop fs -put $file $1/
+done
+
Propchange: incubator/kafka/trunk/contrib/hadoop-consumer/copy-jars.sh
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/kafka/trunk/contrib/hadoop-consumer/hadoop-setup.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/hadoop-setup.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/hadoop-setup.sh (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/hadoop-setup.sh Mon Aug 1 23:41:24 2011
@@ -0,0 +1,6 @@
+#!/bin/bash
+
+hadoop=${HADOOP_HOME}/bin/hadoop
+
+$hadoop fs -chmod ugoa+w /tmp
+
Propchange: incubator/kafka/trunk/contrib/hadoop-consumer/hadoop-setup.sh
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/kafka/trunk/contrib/hadoop-consumer/lib/avro-1.4.0.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/lib/avro-1.4.0.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/kafka/trunk/contrib/hadoop-consumer/lib/avro-1.4.0.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: incubator/kafka/trunk/contrib/hadoop-consumer/lib/commons-logging-1.0.4.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/lib/commons-logging-1.0.4.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/kafka/trunk/contrib/hadoop-consumer/lib/commons-logging-1.0.4.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: incubator/kafka/trunk/contrib/hadoop-consumer/lib/hadoop-0.20.2-core.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/lib/hadoop-0.20.2-core.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/kafka/trunk/contrib/hadoop-consumer/lib/hadoop-0.20.2-core.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: incubator/kafka/trunk/contrib/hadoop-consumer/lib/jackson-core-asl-1.5.5.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/lib/jackson-core-asl-1.5.5.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/kafka/trunk/contrib/hadoop-consumer/lib/jackson-core-asl-1.5.5.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: incubator/kafka/trunk/contrib/hadoop-consumer/lib/jackson-mapper-asl-1.5.5.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/lib/jackson-mapper-asl-1.5.5.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/kafka/trunk/contrib/hadoop-consumer/lib/jackson-mapper-asl-1.5.5.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: incubator/kafka/trunk/contrib/hadoop-consumer/lib/pig-0.8.0-core.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/lib/pig-0.8.0-core.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/kafka/trunk/contrib/hadoop-consumer/lib/pig-0.8.0-core.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: incubator/kafka/trunk/contrib/hadoop-consumer/lib/piggybank.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/lib/piggybank.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/kafka/trunk/contrib/hadoop-consumer/lib/piggybank.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: incubator/kafka/trunk/contrib/hadoop-consumer/run-class.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/run-class.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/run-class.sh (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/run-class.sh Mon Aug 1 23:41:24 2011
@@ -0,0 +1,51 @@
+#!/bin/bash
+
+if [ $# -lt 1 ];
+then
+ echo "USAGE: $0 classname [opts]"
+ exit 1
+fi
+
+base_dir=$(dirname $0)/../..
+
+# include kafka jars
+for file in $base_dir/core/target/scala_2.8.0/kafka-*.jar
+do
+ CLASSPATH=$CLASSPATH:$file
+done
+
+for file in $base_dir/contrib/hadoop-consumer/lib_managed/scala_2.8.0/compile/*.jar;
+do
+ CLASSPATH=$CLASSPATH:$file
+done
+
+local_dir=$(dirname $0)
+
+# include hadoop-consumer jars
+for file in $base_dir/contrib/hadoop-consumer/target/scala_2.8.0/*.jar;
+do
+ CLASSPATH=$CLASSPATH:$file
+done
+
+for file in $base_dir/contrib/hadoop-consumer/lib/*.jar;
+do
+ CLASSPATH=$CLASSPATH:$file
+done
+
+CLASSPATH=$CLASSPATH:$base_dir/project/boot/scala-2.8.0/lib/scala-library.jar
+
+echo $CLASSPATH
+
+CLASSPATH=dist:$CLASSPATH:${HADOOP_HOME}/conf
+
+#if [ -z "$KAFKA_OPTS" ]; then
+# KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote"
+#fi
+
+if [ -z "$JAVA_HOME" ]; then
+ JAVA="java"
+else
+ JAVA="$JAVA_HOME/bin/java"
+fi
+
+$JAVA $KAFKA_OPTS -cp $CLASSPATH $@
Propchange: incubator/kafka/trunk/contrib/hadoop-consumer/run-class.sh
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java Mon Aug 1 23:41:24 2011
@@ -0,0 +1,270 @@
+package kafka.etl;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.zip.CRC32;
+import kafka.api.FetchRequest;
+import kafka.javaapi.MultiFetchResponse;
+import kafka.api.OffsetRequest;
+import kafka.common.ErrorMapping;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
+import kafka.message.MessageSet;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import java.nio.ByteBuffer;
+
+@SuppressWarnings({ "deprecation"})
+public class KafkaETLContext {
+
+ static protected int MAX_RETRY_TIME = 1;
+ final static String CLIENT_BUFFER_SIZE = "client.buffer.size";
+ final static String CLIENT_TIMEOUT = "client.so.timeout";
+
+ final static int DEFAULT_BUFFER_SIZE = 1 * 1024 * 1024;
+ final static int DEFAULT_TIMEOUT = 60000; // one minute
+
+ final static KafkaETLKey DUMMY_KEY = new KafkaETLKey();
+
+ protected int _index; /*index of context*/
+ protected String _input = null; /*input string*/
+ protected KafkaETLRequest _request = null;
+ protected SimpleConsumer _consumer = null; /*simple consumer*/
+
+ protected long[] _offsetRange = {0, 0}; /*offset range*/
+ protected long _offset = Long.MAX_VALUE; /*current offset*/
+ protected long _count; /*current count*/
+
+ protected MultiFetchResponse _response = null; /*fetch response*/
+ protected Iterator<MessageAndOffset> _messageIt = null; /*message iterator*/
+
+ protected int _retry = 0;
+ protected long _requestTime = 0; /*accumulative request time*/
+ protected long _startTime = -1;
+
+ protected int _bufferSize;
+ protected int _timeout;
+ protected Reporter _reporter;
+
+ protected MultipleOutputs _mos;
+ protected OutputCollector<KafkaETLKey, BytesWritable> _offsetOut = null;
+
+ public long getTotalBytes() {
+ return (_offsetRange[1] > _offsetRange[0])? _offsetRange[1] - _offsetRange[0] : 0;
+ }
+
+ public long getReadBytes() {
+ return _offset - _offsetRange[0];
+ }
+
+ public long getCount() {
+ return _count;
+ }
+
+ /**
+ * construct using input string
+ */
+ @SuppressWarnings("unchecked")
+ public KafkaETLContext(JobConf job, Props props, Reporter reporter,
+ MultipleOutputs mos, int index, String input)
+ throws Exception {
+
+ _bufferSize = getClientBufferSize(props);
+ _timeout = getClientTimeout(props);
+ System.out.println("bufferSize=" +_bufferSize);
+ System.out.println("timeout=" + _timeout);
+ _reporter = reporter;
+ _mos = mos;
+
+ // read topic and current offset from input
+ _index= index;
+ _input = input;
+ _request = new KafkaETLRequest(input.trim());
+
+ // read data from queue
+ URI uri = _request.getURI();
+ _consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), _timeout, _bufferSize);
+
+ // get available offset range
+ _offsetRange = getOffsetRange();
+ System.out.println("Connected to node " + uri
+ + " beginning reading at offset " + _offsetRange[0]
+ + " latest offset=" + _offsetRange[1]);
+
+ _offset = _offsetRange[0];
+ _count = 0;
+ _requestTime = 0;
+ _retry = 0;
+
+ _startTime = System.currentTimeMillis();
+ }
+
+ public boolean hasMore () {
+ return _messageIt != null && _messageIt.hasNext()
+ || _response != null && _response.iterator().hasNext()
+ || _offset < _offsetRange[1];
+ }
+
+ public boolean getNext(KafkaETLKey key, BytesWritable value) throws IOException {
+ if ( !hasMore() ) return false;
+
+ boolean gotNext = get(key, value);
+
+ if(_response != null) {
+ Iterator<ByteBufferMessageSet> iter = _response.iterator();
+ while ( !gotNext && iter.hasNext()) {
+ ByteBufferMessageSet msgSet = iter.next();
+ if ( hasError(msgSet)) return false;
+ _messageIt = (Iterator<MessageAndOffset>) msgSet.iterator();
+ gotNext = get(key, value);
+ }
+ }
+ return gotNext;
+ }
+
+ public boolean fetchMore () throws IOException {
+ if (!hasMore()) return false;
+
+ FetchRequest fetchRequest =
+ new FetchRequest(_request.getTopic(), _request.getPartition(), _offset, _bufferSize);
+ List<FetchRequest> array = new ArrayList<FetchRequest>();
+ array.add(fetchRequest);
+
+ long tempTime = System.currentTimeMillis();
+ _response = _consumer.multifetch(array);
+ _requestTime += (System.currentTimeMillis() - tempTime);
+
+ return true;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void output(String fileprefix) throws IOException {
+ String offsetString = _request.toString(_offset);
+
+ if (_offsetOut == null)
+ _offsetOut = (OutputCollector<KafkaETLKey, BytesWritable>)
+ _mos.getCollector("offsets", fileprefix+_index, _reporter);
+ _offsetOut.collect(DUMMY_KEY, new BytesWritable(offsetString.getBytes("UTF-8")));
+
+ }
+
+ public void close() throws IOException {
+ if (_consumer != null) _consumer.close();
+
+ String topic = _request.getTopic();
+ long endTime = System.currentTimeMillis();
+ _reporter.incrCounter(topic, "read-time(ms)", endTime - _startTime);
+ _reporter.incrCounter(topic, "request-time(ms)", _requestTime);
+
+ long bytesRead = _offset - _offsetRange[0];
+ double megaRead = bytesRead / (1024.0*1024.0);
+ _reporter.incrCounter(topic, "data-read(mb)", (long) megaRead);
+ _reporter.incrCounter(topic, "event-count", _count);
+ }
+
+ protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException {
+ if (_messageIt != null && _messageIt.hasNext()) {
+ MessageAndOffset msgAndOffset = _messageIt.next();
+
+ ByteBuffer buf = msgAndOffset.message().payload();
+ int origSize = buf.remaining();
+ byte[] bytes = new byte[origSize];
+ buf.get(bytes, buf.position(), origSize);
+ value.set(bytes, 0, origSize);
+
+ key.set(_index, _offset, msgAndOffset.message().checksum());
+
+ _offset += msgAndOffset.offset(); //increase offset
+ _count ++; //increase count
+
+ return true;
+ }
+ else return false;
+ }
+
+ /**
+ * Get offset ranges
+ */
+ protected long[] getOffsetRange() throws IOException {
+
+ /* get smallest and largest offsets*/
+ long[] range = new long[2];
+
+ long[] startOffsets = _consumer.getOffsetsBefore(_request.getTopic(), _request.getPartition(),
+ OffsetRequest.EarliestTime(), 1);
+ if (startOffsets.length != 1)
+ throw new IOException("input:" + _input + " Expect one smallest offset but get "
+ + startOffsets.length);
+ range[0] = startOffsets[0];
+
+ long[] endOffsets = _consumer.getOffsetsBefore(_request.getTopic(), _request.getPartition(),
+ OffsetRequest.LatestTime(), 1);
+ if (endOffsets.length != 1)
+ throw new IOException("input:" + _input + " Expect one latest offset but get "
+ + endOffsets.length);
+ range[1] = endOffsets[0];
+
+ /*adjust range based on input offsets*/
+ if ( _request.isValidOffset()) {
+ long startOffset = _request.getOffset();
+ if (startOffset > range[0]) {
+ System.out.println("Update starting offset with " + startOffset);
+ range[0] = startOffset;
+ }
+ else {
+ System.out.println("WARNING: given starting offset " + startOffset
+ + " is smaller than the smallest one " + range[0]
+ + ". Will ignore it.");
+ }
+ }
+ System.out.println("Using offset range [" + range[0] + ", " + range[1] + "]");
+ return range;
+ }
+
+ /**
+ * Called by the default implementation of {@link #map} to check error code
+ * to determine whether to continue.
+ */
+ protected boolean hasError(ByteBufferMessageSet messages)
+ throws IOException {
+ int errorCode = messages.getErrorCode();
+ if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
+ /* offset cannot cross the maximum offset (guaranteed by Kafka protocol).
+ Kafka server may delete old files from time to time */
+ System.err.println("WARNING: current offset=" + _offset + ". It is out of range.");
+
+ if (_retry >= MAX_RETRY_TIME) return true;
+ _retry++;
+ // get the current offset range
+ _offsetRange = getOffsetRange();
+ _offset = _offsetRange[0];
+ return false;
+ } else if (errorCode == ErrorMapping.InvalidMessageCode()) {
+ throw new IOException(_input + " current offset=" + _offset
+ + " : invalid offset.");
+ } else if (errorCode == ErrorMapping.WrongPartitionCode()) {
+ throw new IOException(_input + " : wrong partition");
+ } else if (errorCode != ErrorMapping.NoError()) {
+ throw new IOException(_input + " current offset=" + _offset
+ + " error:" + errorCode);
+ } else
+ return false;
+ }
+
+ public static int getClientBufferSize(Props props) throws Exception {
+ return props.getInt(CLIENT_BUFFER_SIZE, DEFAULT_BUFFER_SIZE);
+ }
+
+ public static int getClientTimeout(Props props) throws Exception {
+ return props.getInt(CLIENT_TIMEOUT, DEFAULT_TIMEOUT);
+ }
+
+}
Added: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java Mon Aug 1 23:41:24 2011
@@ -0,0 +1,61 @@
+package kafka.etl;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import kafka.consumer.SimpleConsumer;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+
+
+@SuppressWarnings("deprecation")
+public class KafkaETLInputFormat
+extends SequenceFileInputFormat<KafkaETLKey, BytesWritable> {
+
+ protected Props _props;
+ protected int _bufferSize;
+ protected int _soTimeout;
+
+ protected Map<Integer, URI> _nodes;
+ protected int _partition;
+ protected int _nodeId;
+ protected String _topic;
+ protected SimpleConsumer _consumer;
+
+ protected MultipleOutputs _mos;
+ protected OutputCollector<BytesWritable, BytesWritable> _offsetOut = null;
+
+ protected long[] _offsetRange;
+ protected long _startOffset;
+ protected long _offset;
+ protected boolean _toContinue = true;
+ protected int _retry;
+ protected long _timestamp;
+ protected long _count;
+ protected boolean _ignoreErrors = false;
+
+ @Override
+ public RecordReader<KafkaETLKey, BytesWritable> getRecordReader(InputSplit split,
+ JobConf job, Reporter reporter)
+ throws IOException {
+ return new KafkaETLRecordReader(split, job, reporter);
+ }
+
+ @Override
+ protected boolean isSplitable(FileSystem fs, Path file) {
+ return super.isSplitable(fs, file);
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf conf, int numSplits) throws IOException {
+ return super.getSplits(conf, numSplits);
+ }
+}
|