kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1152970 [12/26] - in /incubator/kafka: branches/ site/ trunk/ trunk/bin/ trunk/clients/ trunk/clients/clojure/ trunk/clients/clojure/leiningen/ trunk/clients/clojure/resources/ trunk/clients/clojure/src/ trunk/clients/clojure/src/kafka/ tr...
Date Mon, 01 Aug 2011 23:42:17 GMT
Added: incubator/kafka/trunk/clients/go/src/consumer.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/consumer.go?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/go/src/consumer.go (added)
+++ incubator/kafka/trunk/clients/go/src/consumer.go Mon Aug  1 23:41:24 2011
@@ -0,0 +1,184 @@
+/*
+ *  Copyright (c) 2011 NeuStar, Inc.
+ *  All rights reserved.  
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at 
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  
+ *  NeuStar, the Neustar logo and related names and logos are registered
+ *  trademarks, service marks or tradenames of NeuStar, Inc. All other 
+ *  product names, company names, marks, logos and symbols may be trademarks
+ *  of their respective owners.
+ */
+
+package kafka
+
+import (
+  "log"
+  "os"
+  "net"
+  "time"
+  "encoding/binary"
+)
+
+type BrokerConsumer struct {
+  broker  *Broker
+  offset  uint64
+  maxSize uint32
+}
+
+// Create a new broker consumer
+// hostname - host and optionally port, delimited by ':'
+// topic to consume
+// partition to consume from
+// offset to start consuming from
+// maxSize (in bytes) of the message to consume (this should be at least as big as the biggest message to be published)
+func NewBrokerConsumer(hostname string, topic string, partition int, offset uint64, maxSize uint32) *BrokerConsumer {
+  return &BrokerConsumer{broker: newBroker(hostname, topic, partition),
+    offset:  offset,
+    maxSize: maxSize}
+}
+
+// Simplified consumer that defaults the offset and maxSize to 0.
+// hostname - host and optionally port, delimited by ':'
+// topic to consume
+// partition to consume from
+func NewBrokerOffsetConsumer(hostname string, topic string, partition int) *BrokerConsumer {
+  return &BrokerConsumer{broker: newBroker(hostname, topic, partition),
+    offset:  0,
+    maxSize: 0}
+}
+
+
+func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan bool) (int, os.Error) {
+  conn, err := consumer.broker.connect()
+  if err != nil {
+    return -1, err
+  }
+
+  num := 0
+  done := make(chan bool, 1)
+  go func() {
+    for {
+      _, err := consumer.consumeWithConn(conn, func(msg *Message) {
+        msgChan <- msg
+        num += 1
+      })
+
+      if err != nil {
+        if err != os.EOF {
+          log.Println("Fatal Error: ", err)
+        }
+        break
+      }
+      time.Sleep(pollTimeoutMs * 1000000)
+    }
+    done <- true
+  }()
+
+  // wait to be told to stop..
+  <-quit
+  conn.Close()
+  close(msgChan)
+  <-done
+  return num, err
+}
+
+type MessageHandlerFunc func(msg *Message)
+
+func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc) (int, os.Error) {
+  conn, err := consumer.broker.connect()
+  if err != nil {
+    return -1, err
+  }
+  defer conn.Close()
+
+  num, err := consumer.consumeWithConn(conn, handlerFunc)
+
+  if err != nil {
+    log.Println("Fatal Error: ", err)
+  }
+
+  return num, err
+}
+
+
+func (consumer *BrokerConsumer) consumeWithConn(conn *net.TCPConn, handlerFunc MessageHandlerFunc) (int, os.Error) {
+  _, err := conn.Write(consumer.broker.EncodeConsumeRequest(consumer.offset, consumer.maxSize))
+  if err != nil {
+    return -1, err
+  }
+
+  length, payload, err := consumer.broker.readResponse(conn)
+
+  if err != nil {
+    return -1, err
+  }
+
+  num := 0
+  if length > 2 {
+    // parse out the messages
+    var currentOffset uint64 = 0
+    for currentOffset <= uint64(length-4) {
+      msg := Decode(payload[currentOffset:])
+      if msg == nil {
+        return num, os.NewError("Error Decoding Message")
+      }
+      msg.offset = consumer.offset + currentOffset
+      currentOffset += uint64(4 + msg.totalLength)
+      handlerFunc(msg)
+      num += 1
+    }
+    // update the broker's offset for next consumption
+    consumer.offset += currentOffset
+  }
+
+  return num, err
+}
+
+
+// Get a list of valid offsets (up to maxNumOffsets) before the given time, where 
+// time is in milliseconds (-1, from the latest offset available, -2 from the smallest offset available)
+// The result is a list of offsets, in descending order.
+func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, os.Error) {
+  offsets := make([]uint64, 0)
+
+  conn, err := consumer.broker.connect()
+  if err != nil {
+    return offsets, err
+  }
+
+  defer conn.Close()
+
+  _, err = conn.Write(consumer.broker.EncodeOffsetRequest(time, maxNumOffsets))
+  if err != nil {
+    return offsets, err
+  }
+
+  length, payload, err := consumer.broker.readResponse(conn)
+  if err != nil {
+    return offsets, err
+  }
+
+  if length > 4 {
+    // get the number of offsets
+    numOffsets := binary.BigEndian.Uint32(payload[0:])
+    var currentOffset uint64 = 4
+    for currentOffset < uint64(length-4) && uint32(len(offsets)) < numOffsets {
+      offset := binary.BigEndian.Uint64(payload[currentOffset:])
+      offsets = append(offsets, offset)
+      currentOffset += 8 // offset size
+    }
+  }
+
+  return offsets, err
+}

Added: incubator/kafka/trunk/clients/go/src/converts.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/converts.go?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/go/src/converts.go (added)
+++ incubator/kafka/trunk/clients/go/src/converts.go Mon Aug  1 23:41:24 2011
@@ -0,0 +1,53 @@
+/*
+ *  Copyright (c) 2011 NeuStar, Inc.
+ *  All rights reserved.  
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at 
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  
+ *  NeuStar, the Neustar logo and related names and logos are registered
+ *  trademarks, service marks or tradenames of NeuStar, Inc. All other 
+ *  product names, company names, marks, logos and symbols may be trademarks
+ *  of their respective owners.
+ */
+
+package kafka
+
+
+import (
+  "encoding/binary"
+)
+
+
+func uint16bytes(value int) []byte {
+  result := make([]byte, 2)
+  binary.BigEndian.PutUint16(result, uint16(value))
+  return result
+}
+
+func uint32bytes(value int) []byte {
+  result := make([]byte, 4)
+  binary.BigEndian.PutUint32(result, uint32(value))
+  return result
+}
+
+func uint32toUint32bytes(value uint32) []byte {
+  result := make([]byte, 4)
+  binary.BigEndian.PutUint32(result, value)
+  return result
+}
+
+func uint64ToUint64bytes(value uint64) []byte {
+  result := make([]byte, 8)
+  binary.BigEndian.PutUint64(result, value)
+  return result
+}

Added: incubator/kafka/trunk/clients/go/src/kafka.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/kafka.go?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/go/src/kafka.go (added)
+++ incubator/kafka/trunk/clients/go/src/kafka.go Mon Aug  1 23:41:24 2011
@@ -0,0 +1,97 @@
+/*
+ *  Copyright (c) 2011 NeuStar, Inc.
+ *  All rights reserved.  
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at 
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  
+ *  NeuStar, the Neustar logo and related names and logos are registered
+ *  trademarks, service marks or tradenames of NeuStar, Inc. All other 
+ *  product names, company names, marks, logos and symbols may be trademarks
+ *  of their respective owners.
+ */
+
+package kafka
+
+import (
+  "log"
+  "net"
+  "os"
+  "fmt"
+  "encoding/binary"
+  "strconv"
+  "io"
+  "bufio"
+)
+
+const (
+  MAGIC_DEFAULT = 0
+  NETWORK       = "tcp"
+)
+
+
+type Broker struct {
+  topic     string
+  partition int
+  hostname  string
+}
+
+func newBroker(hostname string, topic string, partition int) *Broker {
+  return &Broker{topic: topic,
+    partition: partition,
+    hostname:  hostname}
+}
+
+
+func (b *Broker) connect() (conn *net.TCPConn, error os.Error) {
+  raddr, err := net.ResolveTCPAddr(NETWORK, b.hostname)
+  if err != nil {
+    log.Println("Fatal Error: ", err)
+    return nil, err
+  }
+  conn, err = net.DialTCP(NETWORK, nil, raddr)
+  if err != nil {
+    log.Println("Fatal Error: ", err)
+    return nil, err
+  }
+  return conn, error
+}
+
+// returns length of response & payload & err
+func (b *Broker) readResponse(conn *net.TCPConn) (uint32, []byte, os.Error) {
+  reader := bufio.NewReader(conn)
+  length := make([]byte, 4)
+  lenRead, err := io.ReadFull(reader, length)
+  if err != nil {
+    return 0, []byte{}, err
+  }
+  if lenRead != 4 || lenRead < 0 {
+    return 0, []byte{}, os.NewError("invalid length of the packet length field")
+  }
+
+  expectedLength := binary.BigEndian.Uint32(length)
+  messages := make([]byte, expectedLength)
+  lenRead, err = io.ReadFull(reader, messages)
+  if err != nil {
+    return 0, []byte{}, err
+  }
+
+  if uint32(lenRead) != expectedLength {
+    return 0, []byte{}, os.NewError(fmt.Sprintf("Fatal Error: Unexpected Length: %d  expected:  %d", lenRead, expectedLength))
+  }
+
+  errorCode := binary.BigEndian.Uint16(messages[0:2])
+  if errorCode != 0 {
+    return 0, []byte{}, os.NewError(strconv.Uitoa(uint(errorCode)))
+  }
+  return expectedLength, messages[2:], nil
+}

Added: incubator/kafka/trunk/clients/go/src/message.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/message.go?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/go/src/message.go (added)
+++ incubator/kafka/trunk/clients/go/src/message.go Mon Aug  1 23:41:24 2011
@@ -0,0 +1,107 @@
+/*
+ *  Copyright (c) 2011 NeuStar, Inc.
+ *  All rights reserved.  
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at 
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  
+ *  NeuStar, the Neustar logo and related names and logos are registered
+ *  trademarks, service marks or tradenames of NeuStar, Inc. All other 
+ *  product names, company names, marks, logos and symbols may be trademarks
+ *  of their respective owners.
+ */
+
+
+package kafka
+
+import (
+  "hash/crc32"
+  "encoding/binary"
+  "bytes"
+  "log"
+)
+
+
+type Message struct {
+  magic       byte
+  checksum    [4]byte
+  payload     []byte
+  offset      uint64 // only used after decoding
+  totalLength uint32 // total length of the message (decoding)
+}
+
+func (m *Message) Offset() uint64 {
+  return m.offset
+}
+
+func (m *Message) Payload() []byte {
+  return m.payload
+}
+
+func (m *Message) PayloadString() string {
+  return string(m.payload)
+}
+
+func NewMessage(payload []byte) *Message {
+  message := &Message{}
+  message.magic = byte(MAGIC_DEFAULT)
+  binary.BigEndian.PutUint32(message.checksum[0:], crc32.ChecksumIEEE(payload))
+  message.payload = payload
+  return message
+}
+
+// MESSAGE SET: <MESSAGE LENGTH: uint32><MAGIC: 1 byte><CHECKSUM: uint32><MESSAGE PAYLOAD: bytes>
+func (m *Message) Encode() []byte {
+  msgLen := 1 + 4 + len(m.payload)
+  msg := make([]byte, 4+msgLen)
+  binary.BigEndian.PutUint32(msg[0:], uint32(msgLen))
+  msg[4] = m.magic
+  copy(msg[5:], m.checksum[0:])
+  copy(msg[9:], m.payload)
+  return msg
+}
+
+func Decode(packet []byte) *Message {
+  length := binary.BigEndian.Uint32(packet[0:])
+  if length > uint32(len(packet[4:])) {
+    log.Printf("length mismatch, expected at least: %X, was: %X\n", length, len(packet[4:]))
+    return nil
+  }
+  msg := Message{}
+  msg.totalLength = length
+  msg.magic = packet[4]
+  copy(msg.checksum[:], packet[5:9])
+  payloadLength := length - 1 - 4
+  msg.payload = packet[9 : 9+payloadLength]
+
+  payloadChecksum := make([]byte, 4)
+  binary.BigEndian.PutUint32(payloadChecksum, crc32.ChecksumIEEE(msg.payload))
+  if !bytes.Equal(payloadChecksum, msg.checksum[:]) {
+    log.Printf("checksum mismatch, expected: %X was: %X\n", payloadChecksum, msg.checksum[:])
+    return nil
+  }
+  return &msg
+}
+
+func (msg *Message) Print() {
+  log.Println("----- Begin Message ------")
+  log.Printf("magic: %X\n", msg.magic)
+  log.Printf("checksum: %X\n", msg.checksum)
+  if len(msg.payload) < 1048576 { // 1 MB 
+    log.Printf("payload: %X\n", msg.payload)
+    log.Printf("payload(string): %s\n", msg.PayloadString())
+  } else {
+    log.Printf("long payload, length: %d\n", len(msg.payload))
+  }
+  log.Printf("offset: %d\n", msg.offset)
+  log.Println("----- End Message ------")
+}

Added: incubator/kafka/trunk/clients/go/src/publisher.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/publisher.go?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/go/src/publisher.go (added)
+++ incubator/kafka/trunk/clients/go/src/publisher.go Mon Aug  1 23:41:24 2011
@@ -0,0 +1,59 @@
+/*
+ *  Copyright (c) 2011 NeuStar, Inc.
+ *  All rights reserved.  
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at 
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  
+ *  NeuStar, the Neustar logo and related names and logos are registered
+ *  trademarks, service marks or tradenames of NeuStar, Inc. All other 
+ *  product names, company names, marks, logos and symbols may be trademarks
+ *  of their respective owners.
+ */
+
+package kafka
+
+import (
+  "container/list"
+  "os"
+)
+
+
+type BrokerPublisher struct {
+  broker *Broker
+}
+
+func NewBrokerPublisher(hostname string, topic string, partition int) *BrokerPublisher {
+  return &BrokerPublisher{broker: newBroker(hostname, topic, partition)}
+}
+
+
+func (b *BrokerPublisher) Publish(message *Message) (int, os.Error) {
+  messages := list.New()
+  messages.PushBack(message)
+  return b.BatchPublish(messages)
+}
+
+func (b *BrokerPublisher) BatchPublish(messages *list.List) (int, os.Error) {
+  conn, err := b.broker.connect()
+  if err != nil {
+    return -1, err
+  }
+  defer conn.Close()
+  // TODO: MULTIPRODUCE
+  num, err := conn.Write(b.broker.EncodePublishRequest(messages))
+  if err != nil {
+    return -1, err
+  }
+
+  return num, err
+}

Added: incubator/kafka/trunk/clients/go/src/request.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/request.go?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/go/src/request.go (added)
+++ incubator/kafka/trunk/clients/go/src/request.go Mon Aug  1 23:41:24 2011
@@ -0,0 +1,108 @@
+/*
+ *  Copyright (c) 2011 NeuStar, Inc.
+ *  All rights reserved.  
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at 
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  
+ *  NeuStar, the Neustar logo and related names and logos are registered
+ *  trademarks, service marks or tradenames of NeuStar, Inc. All other 
+ *  product names, company names, marks, logos and symbols may be trademarks
+ *  of their respective owners.
+ */
+
+package kafka
+
+import (
+  "encoding/binary"
+  "bytes"
+  "container/list"
+)
+
+
+type RequestType uint16
+
+// Request Types
+const (
+  REQUEST_PRODUCE      RequestType = 0
+  REQUEST_FETCH        = 1
+  REQUEST_MULTIFETCH   = 2
+  REQUEST_MULTIPRODUCE = 3
+  REQUEST_OFFSETS      = 4
+)
+
+
+// Request Header: <REQUEST_SIZE: uint32><REQUEST_TYPE: uint16><TOPIC SIZE: uint16><TOPIC: bytes><PARTITION: uint32>
+func (b *Broker) EncodeRequestHeader(requestType RequestType) *bytes.Buffer {
+  request := bytes.NewBuffer([]byte{})
+  request.Write(uint32bytes(0)) // placeholder for request size
+  request.Write(uint16bytes(int(requestType)))
+  request.Write(uint16bytes(len(b.topic)))
+  request.WriteString(b.topic)
+  request.Write(uint32bytes(b.partition))
+
+  return request
+}
+
+// after writing to the buffer is complete, encode the size of the request in the request.
+func encodeRequestSize(request *bytes.Buffer) {
+  binary.BigEndian.PutUint32(request.Bytes()[0:], uint32(request.Len()-4))
+}
+
+// <Request Header><TIME: uint64><MAX NUMBER of OFFSETS: uint32>
+func (b *Broker) EncodeOffsetRequest(time int64, maxNumOffsets uint32) []byte {
+  request := b.EncodeRequestHeader(REQUEST_OFFSETS)
+  // specific to offset request
+  request.Write(uint64ToUint64bytes(uint64(time)))
+  request.Write(uint32toUint32bytes(maxNumOffsets))
+
+  encodeRequestSize(request)
+
+  return request.Bytes()
+}
+
+
+// <Request Header><OFFSET: uint64><MAX SIZE: uint32>
+func (b *Broker) EncodeConsumeRequest(offset uint64, maxSize uint32) []byte {
+  request := b.EncodeRequestHeader(REQUEST_FETCH)
+  // specific to consume request
+  request.Write(uint64ToUint64bytes(offset))
+  request.Write(uint32toUint32bytes(maxSize))
+
+  encodeRequestSize(request)
+
+  return request.Bytes()
+}
+
+
+// <Request Header><MESSAGE SET SIZE: uint32><MESSAGE SETS>
+func (b *Broker) EncodePublishRequest(messages *list.List) []byte {
+  // 4 + 2 + 2 + topicLength + 4 + 4
+  request := b.EncodeRequestHeader(REQUEST_PRODUCE)
+
+  messageSetSizePos := request.Len()
+  request.Write(uint32bytes(0)) // placeholder message len
+
+  written := 0
+  for element := messages.Front(); element != nil; element = element.Next() {
+    message := element.Value.(*Message)
+    wrote, _ := request.Write(message.Encode())
+    written += wrote
+  }
+
+  // now add the accumulated size of that the message set was
+  binary.BigEndian.PutUint32(request.Bytes()[messageSetSizePos:], uint32(written))
+  // now add the size of the whole to the first uint32
+  encodeRequestSize(request)
+
+  return request.Bytes()
+}

Added: incubator/kafka/trunk/clients/go/src/timing.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/src/timing.go?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/go/src/timing.go (added)
+++ incubator/kafka/trunk/clients/go/src/timing.go Mon Aug  1 23:41:24 2011
@@ -0,0 +1,50 @@
+/*
+ *  Copyright (c) 2011 NeuStar, Inc.
+ *  All rights reserved.  
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at 
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  
+ *  NeuStar, the Neustar logo and related names and logos are registered
+ *  trademarks, service marks or tradenames of NeuStar, Inc. All other 
+ *  product names, company names, marks, logos and symbols may be trademarks
+ *  of their respective owners.
+ */
+
+
+package kafka
+
+import (
+  "log"
+  "time"
+)
+
+type Timing struct {
+  label string
+  start int64
+  stop  int64
+}
+
+func StartTiming(label string) *Timing {
+  return &Timing{label: label, start: time.Nanoseconds(), stop: 0}
+}
+
+func (t *Timing) Stop() {
+  t.stop = time.Nanoseconds()
+}
+
+func (t *Timing) Print() {
+  if t.stop == 0 {
+    t.Stop()
+  }
+  log.Printf("%s took: %f ms\n", t.label, float64((time.Nanoseconds()-t.start))/1000000)
+}

Added: incubator/kafka/trunk/clients/go/tools/consumer/Makefile
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/tools/consumer/Makefile?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/go/tools/consumer/Makefile (added)
+++ incubator/kafka/trunk/clients/go/tools/consumer/Makefile Mon Aug  1 23:41:24 2011
@@ -0,0 +1,7 @@
+include $(GOROOT)/src/Make.inc
+
+TARG=consumer
+GOFILES=\
+	consumer.go\
+
+include $(GOROOT)/src/Make.cmd

Added: incubator/kafka/trunk/clients/go/tools/consumer/consumer.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/tools/consumer/consumer.go?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/go/tools/consumer/consumer.go (added)
+++ incubator/kafka/trunk/clients/go/tools/consumer/consumer.go Mon Aug  1 23:41:24 2011
@@ -0,0 +1,113 @@
+/*
+ *  Copyright (c) 2011 NeuStar, Inc.
+ *  All rights reserved.  
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at 
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  
+ *  NeuStar, the Neustar logo and related names and logos are registered
+ *  trademarks, service marks or tradenames of NeuStar, Inc. All other 
+ *  product names, company names, marks, logos and symbols may be trademarks
+ *  of their respective owners.
+ */
+
+
+package main
+
+import (
+  "kafka"
+  "flag"
+  "fmt"
+  "os"
+  "strconv"
+  "os/signal"
+  "syscall"
+)
+
+var hostname string
+var topic string
+var partition int
+var offset uint64
+var maxSize uint
+var writePayloadsTo string
+var consumerForever bool
+var printmessage bool
+
+func init() {
+  flag.StringVar(&hostname, "hostname", "localhost:9092", "host:port string for the kafka server")
+  flag.StringVar(&topic, "topic", "test", "topic to publish to")
+  flag.IntVar(&partition, "partition", 0, "partition to publish to")
+  flag.Uint64Var(&offset, "offset", 0, "offset to start consuming from")
+  flag.UintVar(&maxSize, "maxsize", 1048576, "offset to start consuming from")
+  flag.StringVar(&writePayloadsTo, "writeto", "", "write payloads to this file")
+  flag.BoolVar(&consumerForever, "consumeforever", false, "loop forever consuming")
+  flag.BoolVar(&printmessage, "printmessage", true, "print the message details to stdout")
+}
+
+
+func main() {
+  flag.Parse()
+  fmt.Println("Consuming Messages :")
+  fmt.Printf("From: %s, topic: %s, partition: %d\n", hostname, topic, partition)
+  fmt.Println(" ---------------------- ")
+  broker := kafka.NewBrokerConsumer(hostname, topic, partition, offset, uint32(maxSize))
+
+  var payloadFile *os.File = nil
+  if len(writePayloadsTo) > 0 {
+    var err os.Error
+    payloadFile, err = os.Create(writePayloadsTo)
+    if err != nil {
+      fmt.Println("Error opening file: ", err)
+      payloadFile = nil
+    }
+  }
+
+  consumerCallback := func(msg *kafka.Message) {
+    if printmessage {
+      msg.Print()
+    }
+    if payloadFile != nil {
+      payloadFile.Write([]byte("Message at: " + strconv.Uitoa64(msg.Offset()) + "\n"))
+      payloadFile.Write(msg.Payload())
+      payloadFile.Write([]byte("\n-------------------------------\n"))
+    }
+  }
+
+  if consumerForever {
+    quit := make(chan bool, 1)
+    go func() {
+      for {
+        sig := <-signal.Incoming
+        if sig.(signal.UnixSignal) == syscall.SIGINT {
+          quit <- true
+        }
+      }
+    }()
+
+    msgChan := make(chan *kafka.Message)
+    go broker.ConsumeOnChannel(msgChan, 10, quit)
+    for msg := range msgChan {
+      if msg != nil {
+        consumerCallback(msg)
+      } else {
+        break
+      }
+    }
+  } else {
+    broker.Consume(consumerCallback)
+  }
+
+  if payloadFile != nil {
+    payloadFile.Close()
+  }
+
+}

Added: incubator/kafka/trunk/clients/go/tools/offsets/Makefile
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/tools/offsets/Makefile?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/go/tools/offsets/Makefile (added)
+++ incubator/kafka/trunk/clients/go/tools/offsets/Makefile Mon Aug  1 23:41:24 2011
@@ -0,0 +1,7 @@
+include $(GOROOT)/src/Make.inc
+
+TARG=offsets
+GOFILES=\
+	offsets.go\
+
+include $(GOROOT)/src/Make.cmd

Added: incubator/kafka/trunk/clients/go/tools/offsets/offsets.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/tools/offsets/offsets.go?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/go/tools/offsets/offsets.go (added)
+++ incubator/kafka/trunk/clients/go/tools/offsets/offsets.go Mon Aug  1 23:41:24 2011
@@ -0,0 +1,62 @@
+/*
+ *  Copyright (c) 2011 NeuStar, Inc.
+ *  All rights reserved.  
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at 
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  
+ *  NeuStar, the Neustar logo and related names and logos are registered
+ *  trademarks, service marks or tradenames of NeuStar, Inc. All other 
+ *  product names, company names, marks, logos and symbols may be trademarks
+ *  of their respective owners.
+ */
+
+
+package main
+
+import (
+  "kafka"
+  "flag"
+  "fmt"
+)
+
+var hostname string
+var topic string
+var partition int
+var offsets uint
+var time int64
+
+func init() {
+  flag.StringVar(&hostname, "hostname", "localhost:9092", "host:port string for the kafka server")
+  flag.StringVar(&topic, "topic", "test", "topic to read offsets from")
+  flag.IntVar(&partition, "partition", 0, "partition to read offsets from")
+  flag.UintVar(&offsets, "offsets", 1, "number of offsets returned")
+  flag.Int64Var(&time, "time", -1, "timestamp of the offsets before that:  time(ms)/-1(latest)/-2(earliest)")
+}
+
+
+func main() {
+  flag.Parse()
+  fmt.Println("Offsets :")
+  fmt.Printf("From: %s, topic: %s, partition: %d\n", hostname, topic, partition)
+  fmt.Println(" ---------------------- ")
+  broker := kafka.NewBrokerOffsetConsumer(hostname, topic, partition)
+
+  offsets, err := broker.GetOffsets(time, uint32(offsets))
+  if err != nil {
+    fmt.Println("Error: ", err)
+  }
+  fmt.Printf("Offsets found: %d\n", len(offsets))
+  for i := 0 ; i < len(offsets); i++ {
+    fmt.Printf("Offset[%d] = %d\n", i, offsets[i])
+  }
+}

Added: incubator/kafka/trunk/clients/go/tools/publisher/Makefile
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/tools/publisher/Makefile?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/go/tools/publisher/Makefile (added)
+++ incubator/kafka/trunk/clients/go/tools/publisher/Makefile Mon Aug  1 23:41:24 2011
@@ -0,0 +1,7 @@
+include $(GOROOT)/src/Make.inc
+
+TARG=publisher
+GOFILES=\
+	publisher.go\
+
+include $(GOROOT)/src/Make.cmd

Added: incubator/kafka/trunk/clients/go/tools/publisher/publisher.go
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/tools/publisher/publisher.go?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/go/tools/publisher/publisher.go (added)
+++ incubator/kafka/trunk/clients/go/tools/publisher/publisher.go Mon Aug  1 23:41:24 2011
@@ -0,0 +1,75 @@
+/*
+ *  Copyright (c) 2011 NeuStar, Inc.
+ *  All rights reserved.  
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at 
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  
+ *  NeuStar, the Neustar logo and related names and logos are registered
+ *  trademarks, service marks or tradenames of NeuStar, Inc. All other 
+ *  product names, company names, marks, logos and symbols may be trademarks
+ *  of their respective owners.
+ */
+
+package main
+
+import (
+  "kafka"
+  "flag"
+  "fmt"
+  "os"
+)
+
+var hostname string
+var topic string
+var partition int
+var message string
+var messageFile string
+
+func init() {
+  flag.StringVar(&hostname, "hostname", "localhost:9092", "host:port string for the kafka server")
+  flag.StringVar(&topic, "topic", "test", "topic to publish to")
+  flag.IntVar(&partition, "partition", 0, "partition to publish to")
+  flag.StringVar(&message, "message", "", "message to publish")
+  flag.StringVar(&messageFile, "messagefile", "", "read message from this file")
+}
+
+func main() {
+  flag.Parse()
+  fmt.Println("Publishing :", message)
+  fmt.Printf("To: %s, topic: %s, partition: %d\n", hostname, topic, partition)
+  fmt.Println(" ---------------------- ")
+  broker := kafka.NewBrokerPublisher(hostname, topic, partition)
+
+  if len(message) == 0 && len(messageFile) != 0 {
+    file, err := os.Open(messageFile)
+    if err != nil {
+      fmt.Println("Error: ", err)
+      return
+    }
+    stat, err := file.Stat()
+    if err != nil {
+      fmt.Println("Error: ", err)
+      return
+    }
+    payload := make([]byte, stat.Size)
+    file.Read(payload)
+    timing := kafka.StartTiming("Sending")
+    broker.Publish(kafka.NewMessage(payload))
+    timing.Print()
+    file.Close()
+  } else {
+    timing := kafka.StartTiming("Sending")
+    broker.Publish(kafka.NewMessage([]byte(message)))
+    timing.Print()
+  }
+}

Added: incubator/kafka/trunk/clients/php/LICENSE
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/LICENSE?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/LICENSE (added)
+++ incubator/kafka/trunk/clients/php/LICENSE Mon Aug  1 23:41:24 2011
@@ -0,0 +1,203 @@
+
+                              Apache License
+                        Version 2.0, January 2004
+                     http://www.apache.org/licenses/
+
+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+1. Definitions.
+
+   "License" shall mean the terms and conditions for use, reproduction,
+   and distribution as defined by Sections 1 through 9 of this document.
+
+   "Licensor" shall mean the copyright owner or entity authorized by
+   the copyright owner that is granting the License.
+
+   "Legal Entity" shall mean the union of the acting entity and all
+   other entities that control, are controlled by, or are under common
+   control with that entity. For the purposes of this definition,
+   "control" means (i) the power, direct or indirect, to cause the
+   direction or management of such entity, whether by contract or
+   otherwise, or (ii) ownership of fifty percent (50%) or more of the
+   outstanding shares, or (iii) beneficial ownership of such entity.
+
+   "You" (or "Your") shall mean an individual or Legal Entity
+   exercising permissions granted by this License.
+
+   "Source" form shall mean the preferred form for making modifications,
+   including but not limited to software source code, documentation
+   source, and configuration files.
+
+   "Object" form shall mean any form resulting from mechanical
+   transformation or translation of a Source form, including but
+   not limited to compiled object code, generated documentation,
+   and conversions to other media types.
+
+   "Work" shall mean the work of authorship, whether in Source or
+   Object form, made available under the License, as indicated by a
+   copyright notice that is included in or attached to the work
+   (an example is provided in the Appendix below).
+
+   "Derivative Works" shall mean any work, whether in Source or Object
+   form, that is based on (or derived from) the Work and for which the
+   editorial revisions, annotations, elaborations, or other modifications
+   represent, as a whole, an original work of authorship. For the purposes
+   of this License, Derivative Works shall not include works that remain
+   separable from, or merely link (or bind by name) to the interfaces of,
+   the Work and Derivative Works thereof.
+
+   "Contribution" shall mean any work of authorship, including
+   the original version of the Work and any modifications or additions
+   to that Work or Derivative Works thereof, that is intentionally
+   submitted to Licensor for inclusion in the Work by the copyright owner
+   or by an individual or Legal Entity authorized to submit on behalf of
+   the copyright owner. For the purposes of this definition, "submitted"
+   means any form of electronic, verbal, or written communication sent
+   to the Licensor or its representatives, including but not limited to
+   communication on electronic mailing lists, source code control systems,
+   and issue tracking systems that are managed by, or on behalf of, the
+   Licensor for the purpose of discussing and improving the Work, but
+   excluding communication that is conspicuously marked or otherwise
+   designated in writing by the copyright owner as "Not a Contribution."
+
+   "Contributor" shall mean Licensor and any individual or Legal Entity
+   on behalf of whom a Contribution has been received by Licensor and
+   subsequently incorporated within the Work.
+
+2. Grant of Copyright License. Subject to the terms and conditions of
+   this License, each Contributor hereby grants to You a perpetual,
+   worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+   copyright license to reproduce, prepare Derivative Works of,
+   publicly display, publicly perform, sublicense, and distribute the
+   Work and such Derivative Works in Source or Object form.
+
+3. Grant of Patent License. Subject to the terms and conditions of
+   this License, each Contributor hereby grants to You a perpetual,
+   worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+   (except as stated in this section) patent license to make, have made,
+   use, offer to sell, sell, import, and otherwise transfer the Work,
+   where such license applies only to those patent claims licensable
+   by such Contributor that are necessarily infringed by their
+   Contribution(s) alone or by combination of their Contribution(s)
+   with the Work to which such Contribution(s) was submitted. If You
+   institute patent litigation against any entity (including a
+   cross-claim or counterclaim in a lawsuit) alleging that the Work
+   or a Contribution incorporated within the Work constitutes direct
+   or contributory patent infringement, then any patent licenses
+   granted to You under this License for that Work shall terminate
+   as of the date such litigation is filed.
+
+4. Redistribution. You may reproduce and distribute copies of the
+   Work or Derivative Works thereof in any medium, with or without
+   modifications, and in Source or Object form, provided that You
+   meet the following conditions:
+
+   (a) You must give any other recipients of the Work or
+       Derivative Works a copy of this License; and
+
+   (b) You must cause any modified files to carry prominent notices
+       stating that You changed the files; and
+
+   (c) You must retain, in the Source form of any Derivative Works
+       that You distribute, all copyright, patent, trademark, and
+       attribution notices from the Source form of the Work,
+       excluding those notices that do not pertain to any part of
+       the Derivative Works; and
+
+   (d) If the Work includes a "NOTICE" text file as part of its
+       distribution, then any Derivative Works that You distribute must
+       include a readable copy of the attribution notices contained
+       within such NOTICE file, excluding those notices that do not
+       pertain to any part of the Derivative Works, in at least one
+       of the following places: within a NOTICE text file distributed
+       as part of the Derivative Works; within the Source form or
+       documentation, if provided along with the Derivative Works; or,
+       within a display generated by the Derivative Works, if and
+       wherever such third-party notices normally appear. The contents
+       of the NOTICE file are for informational purposes only and
+       do not modify the License. You may add Your own attribution
+       notices within Derivative Works that You distribute, alongside
+       or as an addendum to the NOTICE text from the Work, provided
+       that such additional attribution notices cannot be construed
+       as modifying the License.
+
+   You may add Your own copyright statement to Your modifications and
+   may provide additional or different license terms and conditions
+   for use, reproduction, or distribution of Your modifications, or
+   for any such Derivative Works as a whole, provided Your use,
+   reproduction, and distribution of the Work otherwise complies with
+   the conditions stated in this License.
+
+5. Submission of Contributions. Unless You explicitly state otherwise,
+   any Contribution intentionally submitted for inclusion in the Work
+   by You to the Licensor shall be under the terms and conditions of
+   this License, without any additional terms or conditions.
+   Notwithstanding the above, nothing herein shall supersede or modify
+   the terms of any separate license agreement you may have executed
+   with Licensor regarding such Contributions.
+
+6. Trademarks. This License does not grant permission to use the trade
+   names, trademarks, service marks, or product names of the Licensor,
+   except as required for reasonable and customary use in describing the
+   origin of the Work and reproducing the content of the NOTICE file.
+
+7. Disclaimer of Warranty. Unless required by applicable law or
+   agreed to in writing, Licensor provides the Work (and each
+   Contributor provides its Contributions) on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+   implied, including, without limitation, any warranties or conditions
+   of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+   PARTICULAR PURPOSE. You are solely responsible for determining the
+   appropriateness of using or redistributing the Work and assume any
+   risks associated with Your exercise of permissions under this License.
+
+8. Limitation of Liability. In no event and under no legal theory,
+   whether in tort (including negligence), contract, or otherwise,
+   unless required by applicable law (such as deliberate and grossly
+   negligent acts) or agreed to in writing, shall any Contributor be
+   liable to You for damages, including any direct, indirect, special,
+   incidental, or consequential damages of any character arising as a
+   result of this License or out of the use or inability to use the
+   Work (including but not limited to damages for loss of goodwill,
+   work stoppage, computer failure or malfunction, or any and all
+   other commercial damages or losses), even if such Contributor
+   has been advised of the possibility of such damages.
+
+9. Accepting Warranty or Additional Liability. While redistributing
+   the Work or Derivative Works thereof, You may choose to offer,
+   and charge a fee for, acceptance of support, warranty, indemnity,
+   or other liability obligations and/or rights consistent with this
+   License. However, in accepting such obligations, You may act only
+   on Your own behalf and on Your sole responsibility, not on behalf
+   of any other Contributor, and only if You agree to indemnify,
+   defend, and hold each Contributor harmless for any liability
+   incurred by, or claims asserted against, such Contributor by reason
+   of your accepting any such warranty or additional liability.
+
+END OF TERMS AND CONDITIONS
+
+APPENDIX: How to apply the Apache License to your work.
+
+   To apply the Apache License to your work, attach the following
+   boilerplate notice, with the fields enclosed by brackets "[]"
+   replaced with your own identifying information. (Don't include
+   the brackets!)  The text should be enclosed in the appropriate
+   comment syntax for the file format. We also recommend that a
+   file or class name and description of purpose be included on the
+   same "printed page" as the copyright notice for easier
+   identification within third-party archives.
+
+Copyright 2011 LinkedIn
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+

Added: incubator/kafka/trunk/clients/php/README.md
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/README.md?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/README.md (added)
+++ incubator/kafka/trunk/clients/php/README.md Mon Aug  1 23:41:24 2011
@@ -0,0 +1,20 @@
+# kafka-php
+kafka-php allows you to produce messages to the Kafka distributed publish/subscribe messaging service.
+
+## Requirements
+Minimum PHP version: 5.3.3.
+You need to have access to your Kafka instance and be able to connect through TCP. You can obtain a copy and instructions on how to setup kafka at https://github.com/kafka-dev/kafka
+
+## Installation
+Add the lib directory to the include_path and use an autoloader like the one in the examples directory (the code follows the PEAR/Zend one-class-per-file convention).
+
+## Usage
+The examples directory contains an example of a Producer and a Consumer.
+
+## Contact for questions
+
+Lorenzo Alberton
+
+l.alberton at(@) quipo.it
+
+http://twitter.com/lorenzoalberton

Added: incubator/kafka/trunk/clients/php/src/examples/autoloader.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/examples/autoloader.php?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/examples/autoloader.php (added)
+++ incubator/kafka/trunk/clients/php/src/examples/autoloader.php Mon Aug  1 23:41:24 2011
@@ -0,0 +1,23 @@
+<?php
+
+spl_autoload_register(function($className)
+{
+	$classFile = str_replace('_', DIRECTORY_SEPARATOR, $className) . '.php';
+	if (function_exists('stream_resolve_include_path')) {
+		$file = stream_resolve_include_path($classFile);
+	} else {
+		foreach (explode(PATH_SEPARATOR, get_include_path()) as $path) {
+			if (file_exists($path . '/' . $classFile)) {
+				$file = $path . '/' . $classFile;
+				break;
+			}
+		}
+	}
+	/* If file is found, store it into the cache, classname <-> file association */
+	if (($file !== false) && ($file !== null)) {
+		include $file;
+		return;
+	}
+
+	throw new RuntimeException($className. ' not found');
+});

Added: incubator/kafka/trunk/clients/php/src/examples/consume.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/examples/consume.php?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/examples/consume.php (added)
+++ incubator/kafka/trunk/clients/php/src/examples/consume.php Mon Aug  1 23:41:24 2011
@@ -0,0 +1,36 @@
+#!/usr/bin/php
+<?php
+
+set_include_path(
+	implode(PATH_SEPARATOR, array(
+		realpath(dirname(__FILE__).'/../lib'),
+		get_include_path(),
+	))
+);
+require 'autoloader.php';
+
+$host = 'localhost';
+$zkPort  = 2181; //zookeeper
+$kPort   = 9092; //kafka server
+$topic   = 'test';
+$maxSize = 1000000;
+$socketTimeout = 5;
+
+$offset    = 0;
+$partition = 0;
+
+$consumer = new Kafka_SimpleConsumer($host, $kPort, $socketTimeout, $maxSize);
+while (true) {	
+	//create a fetch request for topic "test", partition 0, current offset and fetch size of 1MB
+	$fetchRequest = new Kafka_FetchRequest($topic, $partition, $offset, $maxSize);
+	//get the message set from the consumer and print them out
+	$messages = $consumer->fetch($fetchRequest);
+	foreach ($messages as $msg) {
+		echo "\nconsumed[$offset]: " . $msg->payload();
+	}
+	//advance the offset after consuming each message
+	$offset += $messages->validBytes();
+	//echo "\n---[Advancing offset to $offset]------(".date('H:i:s').")";
+	unset($fetchRequest);
+	sleep(2);
+}

Added: incubator/kafka/trunk/clients/php/src/examples/produce.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/examples/produce.php?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/examples/produce.php (added)
+++ incubator/kafka/trunk/clients/php/src/examples/produce.php Mon Aug  1 23:41:24 2011
@@ -0,0 +1,29 @@
+#!/usr/bin/php
+<?php
+
+set_include_path(
+	implode(PATH_SEPARATOR, array(
+		realpath(dirname(__FILE__).'/../lib'),
+		get_include_path(),
+	))
+);
+require 'autoloader.php';
+
+define('PRODUCE_REQUEST_ID', 0);
+
+
+$host = 'localhost';
+$port = 9092;
+$topic = 'test';
+
+$producer = new Kafka_Producer($host, $port);
+$in = fopen('php://stdin', 'r');
+while (true) {
+	echo "\nEnter comma separated messages:\n";
+	$messages = explode(',', fgets($in));
+	foreach (array_keys($messages) as $k) {
+		//$messages[$k] = trim($messages[$k]);
+	}
+	$bytes = $producer->send($messages, $topic);
+	printf("\nSuccessfully sent %d messages (%d bytes)\n\n", count($messages), $bytes);
+}

Added: incubator/kafka/trunk/clients/php/src/lib/Kafka/BoundedByteBuffer/Receive.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/lib/Kafka/BoundedByteBuffer/Receive.php?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/lib/Kafka/BoundedByteBuffer/Receive.php (added)
+++ incubator/kafka/trunk/clients/php/src/lib/Kafka/BoundedByteBuffer/Receive.php Mon Aug  1 23:41:24 2011
@@ -0,0 +1,154 @@
+<?php
+/**
+ * Kafka Client
+ *
+ * @category  Libraries
+ * @package   Kafka
+ * @author    Lorenzo Alberton <l.alberton@quipo.it>
+ * @copyright 2011 Lorenzo Alberton
+ * @license   http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
+ * @version   $Revision: $
+ * @link      http://sna-projects.com/kafka/
+ */
+
+/**
+ * Read an entire message set from a stream into an internal buffer
+ *
+ * @category Libraries
+ * @package  Kafka
+ * @author   Lorenzo Alberton <l.alberton@quipo.it>
+ * @license  http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
+ * @link     http://sna-projects.com/kafka/
+ */
+class Kafka_BoundedByteBuffer_Receive
+{
+	/**
+	 * @var integer
+	 */
+	protected $size;
+	
+	/**
+	 * @var boolean
+	 */
+	protected $sizeRead = false;
+	
+	/**
+	 * @var integer
+	 */
+	protected $remainingBytes = 0;
+
+	/**
+	 * @var string resource
+	 */
+	public $buffer = null;
+	
+	/**
+	 * @var boolean
+	 */
+	protected $complete = false;
+	
+	/**
+	 *
+	 * @var integer
+	 */
+	protected $maxSize = PHP_INT_MAX;
+	
+	/**
+	 * Constructor
+	 *
+	 * @param integer $maxSize Max buffer size
+	 */
+	public function __construct($maxSize = PHP_INT_MAX) {
+		$this->maxSize = $maxSize;
+	}
+	
+	/**
+	 * Destructor
+	 * 
+	 * @return void
+	 */
+	public function __destruct() {
+		if (is_resource($this->buffer)) {
+			fclose($this->buffer);
+		}
+	}
+	
+	/**
+	 * Read the request size (4 bytes) if not read yet
+	 * 
+	 * @param resource $stream Stream resource
+	 *
+	 * @return integer Number of bytes read
+	 * @throws RuntimeException when size is <=0 or >= $maxSize
+	 */
+	private function readRequestSize($stream) {
+		if (!$this->sizeRead) {
+			$this->size = fread($stream, 4);
+			if ((false === $this->size) || ('' === $this->size)) {
+				$errmsg = 'Received nothing when reading from channel, socket has likely been closed.';
+				throw new RuntimeException($errmsg);
+			}
+			$this->size = array_shift(unpack('N', $this->size));
+			if ($this->size <= 0 || $this->size > $this->maxSize) {
+				throw new RuntimeException($this->size . ' is not a valid message size');
+			}
+			$this->remainingBytes = $this->size;
+			$this->sizeRead = true;
+			return 4;
+		}
+		return 0;
+	}
+	
+	/**
+	 * Read a chunk of data from the stream
+	 * 
+	 * @param resource $stream Stream resource
+	 * 
+	 * @return integer number of read bytes
+	 * @throws RuntimeException when size is <=0 or >= $maxSize
+	 */
+	public function readFrom($stream) {
+		// have we read the request size yet?
+		$read = $this->readRequestSize($stream);
+		// have we allocated the request buffer yet?
+		if (!$this->buffer) {
+			$this->buffer = fopen('php://temp', 'w+b');
+		}
+		// if we have a buffer, read some stuff into it
+		if ($this->buffer && !$this->complete) {
+			$freadBufferSize = min(8192, $this->remainingBytes);
+			if ($freadBufferSize > 0) {
+				//TODO: check that fread returns something
+				$bytesRead = fwrite($this->buffer, fread($stream, $freadBufferSize));
+				$this->remainingBytes -= $bytesRead;
+				$read += $bytesRead;
+			}
+			// did we get everything?
+			if ($this->remainingBytes <= 0) {
+				rewind($this->buffer);
+				$this->complete = true;
+			}
+		}
+		return $read;
+	}
+	
+	/**
+	 * Read all the available bytes in the stream
+	 * 
+	 * @param resource $stream Stream resource
+	 * 
+	 * @return integer number of read bytes
+	 * @throws RuntimeException when size is <=0 or >= $maxSize
+	 */
+	public function readCompletely($stream) {
+		$read = 0;
+		while (!$this->complete) {
+			$read += $this->readFrom($stream);
+		}
+		return $read;
+	}
+}
+
+
+
+  
\ No newline at end of file

Added: incubator/kafka/trunk/clients/php/src/lib/Kafka/BoundedByteBuffer/Send.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/lib/Kafka/BoundedByteBuffer/Send.php?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/lib/Kafka/BoundedByteBuffer/Send.php (added)
+++ incubator/kafka/trunk/clients/php/src/lib/Kafka/BoundedByteBuffer/Send.php Mon Aug  1 23:41:24 2011
@@ -0,0 +1,118 @@
+<?php
+/**
+ * Kafka Client
+ *
+ * @category  Libraries
+ * @package   Kafka
+ * @author    Lorenzo Alberton <l.alberton@quipo.it>
+ * @copyright 2011 Lorenzo Alberton
+ * @license   http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
+ * @version   $Revision: $
+ * @link      http://sna-projects.com/kafka/
+ */
+
+/**
+ * Send a request to Kafka
+ *
+ * @category Libraries
+ * @package  Kafka
+ * @author   Lorenzo Alberton <l.alberton@quipo.it>
+ * @license  http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
+ * @link     http://sna-projects.com/kafka/
+ */
+class Kafka_BoundedByteBuffer_Send
+{
+	/**
+	 * @var integer
+	 */
+	protected $size;
+	
+	/**
+	 * @var boolean
+	 */
+	protected $sizeWritten = false; 
+
+	/**
+	 * @var string resource
+	 */
+	protected $buffer;
+	
+	/**
+	 * @var boolean
+	 */
+	protected $complete = false;
+	
+	/**
+	 * Constructor
+	 * 
+	 * @param Kafka_FetchRequest $req Request object
+	 */
+	public function __construct(Kafka_FetchRequest $req) {
+		$this->size = $req->sizeInBytes() + 2;
+		$this->buffer = fopen('php://temp', 'w+b');
+		fwrite($this->buffer, pack('n', $req->id));
+		$req->writeTo($this->buffer);
+		rewind($this->buffer);
+		//fseek($this->buffer, $req->getOffset(), SEEK_SET);
+	}
+	
+	/**
+	 * Try to write the request size if we haven't already
+	 * 
+	 * @param resource $stream Stream resource
+	 *
+	 * @return integer Number of bytes read
+	 * @throws RuntimeException when size is <=0 or >= $maxSize
+	 */
+	private function writeRequestSize($stream) {
+		if (!$this->sizeWritten) {
+			if (!fwrite($stream, pack('N', $this->size))) {
+				throw new RuntimeException('Cannot write request to stream (' . error_get_last() . ')');
+			}
+			$this->sizeWritten = true;
+			return 4;
+		}
+		return 0;
+	}
+	
+	/**
+	 * Write a chunk of data to the stream
+	 * 
+	 * @param resource $stream Stream resource
+	 * 
+	 * @return integer number of written bytes
+	 * @throws RuntimeException
+	 */
+	public function writeTo($stream) {
+		// have we written the request size yet?
+		$written = $this->writeRequestSize($stream);
+		
+		// try to write the actual buffer itself
+		if ($this->sizeWritten && !feof($this->buffer)) {
+			//TODO: check that fread returns something
+			$written += fwrite($stream, fread($this->buffer, 8192));
+		}
+		// if we are done, mark it off
+		if (feof($this->buffer)) {
+			$this->complete = true;
+			fclose($this->buffer);
+		}
+		return $written;
+	}
+	
+	/**
+	 * Write the entire request to the stream
+	 * 
+	 * @param resource $stream Stream resource
+	 * 
+	 * @return integer number of written bytes
+	 */
+	public function writeCompletely($stream) {
+		$written = 0;
+		while (!$this->complete) {
+			$written += $this->writeTo($stream);
+		}
+		//echo "\nWritten " . $written . ' bytes ';
+		return $written;
+	}
+}

Added: incubator/kafka/trunk/clients/php/src/lib/Kafka/Encoder.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/lib/Kafka/Encoder.php?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/lib/Kafka/Encoder.php (added)
+++ incubator/kafka/trunk/clients/php/src/lib/Kafka/Encoder.php Mon Aug  1 23:41:24 2011
@@ -0,0 +1,71 @@
+<?php
+/**
+ * Kafka Client
+ *
+ * @category  Libraries
+ * @package   Kafka
+ * @author    Lorenzo Alberton <l.alberton@quipo.it>
+ * @copyright 2011 Lorenzo Alberton
+ * @license   http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
+ * @version   $Revision: $
+ * @link      http://sna-projects.com/kafka/
+ */
+
+/**
+ * Encode messages and messages sets into the kafka protocol
+ *
+ * @category Libraries
+ * @package  Kafka
+ * @author   Lorenzo Alberton <l.alberton@quipo.it>
+ * @license  http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
+ * @link     http://sna-projects.com/kafka/
+ */
+class Kafka_Encoder
+{
+	/**
+	 * 1 byte "magic" identifier to allow format changes
+	 * 
+	 * @var integer
+	 */
+	const CURRENT_MAGIC_VALUE = 0;
+	
+	/**
+	 * Encode a message. The format of an N byte message is the following:
+     *  - 1 byte: "magic" identifier to allow format changes
+     *  - 4 bytes: CRC32 of the payload
+     *  - (N - 5) bytes: payload
+	 * 
+	 * @param string $msg Message to encode
+	 *
+	 * @return string
+	 */
+	static public function encode_message($msg) {
+		// <MAGIC_BYTE: 1 byte> <CRC32: 4 bytes bigendian> <PAYLOAD: N bytes>
+		return pack('CN', self::CURRENT_MAGIC_VALUE, crc32($msg)) 
+			 . $msg;
+	}
+
+	/**
+	 * Encode a complete request
+	 * 
+	 * @param string  $topic     Topic
+	 * @param integer $partition Partition number
+	 * @param array   $messages  Array of messages to send
+	 *
+	 * @return string
+	 */
+	static public function encode_produce_request($topic, $partition, array $messages) {
+		// encode messages as <LEN: int><MESSAGE_BYTES>
+		$message_set = '';
+		foreach ($messages as $message) {
+			$encoded = self::encode_message($message);
+			$message_set .= pack('N', strlen($encoded)) . $encoded;
+		}
+		// create the request as <REQUEST_SIZE: int> <REQUEST_ID: short> <TOPIC: bytes> <PARTITION: int> <BUFFER_SIZE: int> <BUFFER: bytes>
+		$data = pack('n', PRODUCE_REQUEST_ID) .
+			pack('n', strlen($topic)) . $topic .
+			pack('N', $partition) .
+			pack('N', strlen($message_set)) . $message_set;
+		return pack('N', strlen($data)) . $data;
+	}
+}

Added: incubator/kafka/trunk/clients/php/src/lib/Kafka/FetchRequest.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/lib/Kafka/FetchRequest.php?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/lib/Kafka/FetchRequest.php (added)
+++ incubator/kafka/trunk/clients/php/src/lib/Kafka/FetchRequest.php Mon Aug  1 23:41:24 2011
@@ -0,0 +1,126 @@
+<?php
+/**
+ * Kafka Client
+ *
+ * @category  Libraries
+ * @package   Kafka
+ * @author    Lorenzo Alberton <l.alberton@quipo.it>
+ * @copyright 2011 Lorenzo Alberton
+ * @license   http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
+ * @version   $Revision: $
+ * @link      http://sna-projects.com/kafka/
+ */
+
+/**
+ * Represents a request object
+ *
+ * @category Libraries
+ * @package  Kafka
+ * @author   Lorenzo Alberton <l.alberton@quipo.it>
+ * @license  http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
+ * @link     http://sna-projects.com/kafka/
+ */
+class Kafka_FetchRequest extends Kafka_Request
+{
+	/**
+	 * @var string
+	 */
+	private $topic;
+	
+	/**
+	 * @var integer
+	 */
+	private $partition;
+	
+	/**
+	 * @var integer
+	 */
+	private $offset;
+	
+	/**
+	 * @var integer
+	 */
+	private $maxSize;
+	
+	/**
+	 * @param string  $topic     Topic
+	 * @param integer $partition Partition
+	 * @param integer $offset    Offset
+	 * @param integer $maxSize   Max buffer size
+	 */
+	public function __construct($topic, $partition = 0, $offset = 0, $maxSize = 1000000) {
+		$this->id        = Kafka_RequestKeys::FETCH;
+		$this->topic     = $topic;
+		$this->partition = $partition;
+		$this->offset    = $offset;
+		$this->maxSize   = $maxSize;
+	}
+	
+	/**
+	 * Write the request to the output stream
+	 * 
+	 * @param resource $stream Output stream
+	 * 
+	 * @return void
+	 */
+	public function writeTo($stream) {
+		//echo "\nWriting request to stream: " . (string)$this;
+		// <topic size: short> <topic: bytes>
+		fwrite($stream, pack('n', strlen($this->topic)) . $this->topic);
+		// <partition: int> <offset: Long> <maxSize: int>
+		fwrite($stream, pack('N', $this->partition));
+		
+//TODO: need to store a 64bit integer (bigendian), but PHP only supports 32bit integers: 
+//setting first 32 bits to 0
+		fwrite($stream, pack('N2', 0, $this->offset));
+		fwrite($stream, pack('N', $this->maxSize));
+		//echo "\nWritten request to stream: " .(string)$this;
+	}
+	
+	/**
+	 * Get request size in bytes
+	 * 
+	 * @return integer
+	 */
+	public function sizeInBytes() {
+		return 2 + strlen($this->topic) + 4 + 8 + 4;
+	}
+	
+	/**
+	 * Get current offset
+	 *
+	 * @return integer
+	 */
+	public function getOffset() {
+		return $this->offset;
+	}
+	
+	/**
+	 * Get topic
+	 * 
+	 * @return string
+	 */
+	public function getTopic() {
+		return $this->topic;
+	}
+	
+	/**
+	 * Get partition
+	 * 
+	 * @return integer
+	 */
+	public function getPartition() {
+		return $this->partition;
+	}
+	
+	/**
+	 * String representation of the Fetch Request
+	 * 
+	 * @return string
+	 */
+	public function __toString()
+	{
+		return 'topic:' . $this->topic . ', part:' . $this->partition . ' offset:' . $this->offset . ' maxSize:' . $this->maxSize;
+	}
+}
+

Added: incubator/kafka/trunk/clients/php/src/lib/Kafka/Message.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/lib/Kafka/Message.php?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/lib/Kafka/Message.php (added)
+++ incubator/kafka/trunk/clients/php/src/lib/Kafka/Message.php Mon Aug  1 23:41:24 2011
@@ -0,0 +1,127 @@
+<?php
+/**
+ * Kafka Client
+ *
+ * @category  Libraries
+ * @package   Kafka
+ * @author    Lorenzo Alberton <l.alberton@quipo.it>
+ * @copyright 2011 Lorenzo Alberton
+ * @license   http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
+ * @version   $Revision: $
+ * @link      http://sna-projects.com/kafka/
+ */
+
+/**
+ * A message. The format of an N byte message is the following:
+ * 1 byte "magic" identifier to allow format changes
+ * 4 byte CRC32 of the payload
+ * N - 5 byte payload
+ *
+ * @category Libraries
+ * @package  Kafka
+ * @author   Lorenzo Alberton <l.alberton@quipo.it>
+ * @license  http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
+ * @link     http://sna-projects.com/kafka/
+ */
+class Kafka_Message
+{
+	/*
+	private $currentMagicValue = Kafka_Encoder::CURRENT_MAGIC_VALUE;
+	private $magicOffset   = 0;
+	private $magicLength   = 1;
+	private $crcOffset     = 1; // MagicOffset + MagicLength
+	private $crcLength     = 4;
+	private $payloadOffset = 5; // CrcOffset + CrcLength
+	private $headerSize    = 5; // PayloadOffset
+	*/
+	
+	/**
+	 * @var string
+	 */
+	private $payload = null;
+	
+	/**
+	 * @var integer
+	 */
+	private $size    = 0;
+	
+	/**
+	 * @var string
+	 */
+	private $crc     = false;
+	
+	/**
+	 * Constructor
+	 * 
+	 * @param string $data Message payload
+	 */
+	public function __construct($data) {
+		$this->payload = substr($data, 5);
+		$this->crc     = crc32($this->payload);
+		$this->size    = strlen($this->payload);
+	}
+	
+	/**
+	 * Encode a message
+	 * 
+	 * @return string
+	 */
+	public function encode() {
+		return Kafka_Encoder::encode_message($this->payload);
+	}
+	
+	/**
+	 * Get the message size
+	 * 
+	 * @return integer
+	 */
+	public function size() {
+		return $this->size;
+	}
+  
+	/**
+	 * Get the magic value
+	 * 
+	 * @return integer
+	 */
+	public function magic() {
+		return Kafka_Encoder::CURRENT_MAGIC_VALUE;
+	}
+	
+	/**
+	 * Get the message checksum
+	 * 
+	 * @return integer
+	 */
+	public function checksum() {
+		return $this->crc;
+	}
+	
+	/**
+	 * Get the message payload
+	 * 
+	 * @return string
+	 */
+	public function payload() {
+		return $this->payload;
+	}
+	
+	/**
+	 * Verify the message against the checksum
+	 * 
+	 * @return boolean
+	 */
+	public function isValid() {
+		return ($this->crc === crc32($this->payload));
+	}
+  
+	/**
+	 * Debug message
+	 * 
+	 * @return string
+	 */
+	public function __toString() {
+		return 'message(magic = ' . Kafka_Encoder::CURRENT_MAGIC_VALUE . ', crc = ' . $this->crc .
+			', payload = ' . $this->payload . ')';
+	}
+}

Added: incubator/kafka/trunk/clients/php/src/lib/Kafka/MessageSet.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/lib/Kafka/MessageSet.php?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/lib/Kafka/MessageSet.php (added)
+++ incubator/kafka/trunk/clients/php/src/lib/Kafka/MessageSet.php Mon Aug  1 23:41:24 2011
@@ -0,0 +1,122 @@
+<?php
+/**
+ * Kafka Client
+ *
+ * @category  Libraries
+ * @package   Kafka
+ * @author    Lorenzo Alberton <l.alberton@quipo.it>
+ * @copyright 2011 Lorenzo Alberton
+ * @license   http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
+ * @version   $Revision: $
+ * @link      http://sna-projects.com/kafka/
+ */
+
+/**
+ * A sequence of messages stored in a byte buffer
+ *
+ * @category Libraries
+ * @package  Kafka
+ * @author   Lorenzo Alberton <l.alberton@quipo.it>
+ * @license  http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
+ * @link     http://sna-projects.com/kafka/
+ */
+class Kafka_MessageSet implements Iterator
+{	
+	/**
+	 * @var integer
+	 */
+	protected $validByteCount = 0;
+	
+	/**
+	 * @var boolean
+	 */
+	private $valid = false;
+	
+	/**
+	 * @var array
+	 */
+	private $array = array();
+	
+	/**
+	 * Constructor
+	 * 
+	 * @param resource $stream    Stream resource
+	 * @param integer  $errorCode Error code
+	 */
+	public function __construct($stream, $errorCode = 0) {
+		$data = stream_get_contents($stream);
+		$len = strlen($data);
+		$ptr = 0;
+		while ($ptr <= ($len - 4)) {
+			$size = array_shift(unpack('N', substr($data, $ptr, 4)));
+			$ptr += 4;
+			$this->array[] = new Kafka_Message(substr($data, $ptr, $size));
+			$ptr += $size;
+			$this->validByteCount += 4 + $size;
+		}
+		fclose($stream);
+	}
+	
+	/**
+	 * Get message set size in bytes
+	 * 
+	 * @return integer
+	 */
+	public function validBytes() {
+		return $this->validByteCount;
+	}
+	
+	/**
+	 * Get message set size in bytes
+	 * 
+	 * @return integer
+	 */
+	public function sizeInBytes() {
+		return $this->validBytes();
+	}
+	
+	/**
+	 * next
+	 * 
+	 * @return void
+	 */
+	public function next() {
+		$this->valid = (FALSE !== next($this->array)); 
+	}	
+	
+	/**
+	 * valid
+	 * 
+	 * @return boolean
+	 */
+	public function valid() {
+		return $this->valid;
+	}
+	
+	/**
+	 * key
+	 * 
+	 * @return integer
+	 */
+	public function key() {
+		return key($this->array); 
+	}
+	
+	/**
+	 * current
+	 * 
+	 * @return Kafka_Message 
+	 */
+	public function current() {
+		return current($this->array);
+	}
+	
+	/**
+	 * rewind
+	 * 
+	 * @return void
+	 */
+	public function rewind() {
+		$this->valid = (FALSE !== reset($this->array)); 
+	}
+}

Added: incubator/kafka/trunk/clients/php/src/lib/Kafka/Producer.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/lib/Kafka/Producer.php?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/lib/Kafka/Producer.php (added)
+++ incubator/kafka/trunk/clients/php/src/lib/Kafka/Producer.php Mon Aug  1 23:41:24 2011
@@ -0,0 +1,116 @@
+<?php
+/**
+ * Kafka Client
+ *
+ * @category  Libraries
+ * @package   Kafka
+ * @author    Lorenzo Alberton <l.alberton@quipo.it>
+ * @copyright 2011 Lorenzo Alberton
+ * @license   http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
+ * @version   $Revision: $
+ * @link      http://sna-projects.com/kafka/
+ */
+
+/**
+ * Simple Kafka Producer
+ *
+ * @category Libraries
+ * @package  Kafka
+ * @author   Lorenzo Alberton <l.alberton@quipo.it>
+ * @license  http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
+ * @link     http://sna-projects.com/kafka/
+ */
+class Kafka_Producer
+{
+	/**
+	 * @var integer
+	 */
+	protected $request_key;
+
+	/**
+	 * @var resource
+	 */
+	protected $conn;
+	
+	/**
+	 * @var string
+	 */
+	protected $host;
+	
+	/**
+	 * @var integer
+	 */
+	protected $port;
+
+	/**
+	 * Constructor
+	 * 
+	 * @param integer $host Host 
+	 * @param integer $port Port
+	 */
+	public function __construct($host, $port) {
+		$this->request_key = 0;
+		$this->host = $host;
+		$this->port = $port;
+	}
+	
+	/**
+	 * Connect to Kafka via a socket
+	 * 
+	 * @return void
+	 * @throws RuntimeException
+	 */
+	public function connect() {
+		if (!is_resource($this->conn)) {
+			$this->conn = stream_socket_client('tcp://' . $this->host . ':' . $this->port, $errno, $errstr);
+		}
+		if (!is_resource($this->conn)) {
+			throw new RuntimeException('Cannot connect to Kafka: ' . $errstr, $errno);
+		}
+	}
+
+	/**
+	 * Close the socket
+	 * 
+	 * @return void
+	 */
+	public function close() {
+		if (is_resource($this->conn)) {
+			fclose($this->conn);
+		}
+	}
+
+	/**
+	 * Send messages to Kafka
+	 * 
+	 * @param array   $messages  Messages to send
+	 * @param string  $topic     Topic
+	 * @param integer $partition Partition
+	 *
+	 * @return boolean
+	 */
+	public function send(array $messages, $topic, $partition = 0xFFFFFFFF) {
+		$this->connect();
+		return fwrite($this->conn, Kafka_Encoder::encode_produce_request($topic, $partition, $messages));
+	}
+
+	/**
+	 * When serializing, close the socket and save the connection parameters
+	 * so it can connect again
+	 * 
+	 * @return array Properties to save
+	 */
+	public function __sleep() {
+		$this->close();
+		return array('request_key', 'host', 'port');
+	}
+
+	/**
+	 * Restore parameters on unserialize
+	 * 
+	 * @return void
+	 */
+	public function __wakeup() {
+		
+	}
+}

Added: incubator/kafka/trunk/clients/php/src/lib/Kafka/Request.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/lib/Kafka/Request.php?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/lib/Kafka/Request.php (added)
+++ incubator/kafka/trunk/clients/php/src/lib/Kafka/Request.php Mon Aug  1 23:41:24 2011
@@ -0,0 +1,30 @@
+<?php
+/**
+ * Kafka Client
+ *
+ * @category  Libraries
+ * @package   Kafka
+ * @author    Lorenzo Alberton <l.alberton@quipo.it>
+ * @copyright 2011 Lorenzo Alberton
+ * @license   http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
+ * @version   $Revision: $
+ * @link      http://sna-projects.com/kafka/
+ */
+
+/**
+ * Abstract Request class
+ *
+ * @category Libraries
+ * @package  Kafka
+ * @author   Lorenzo Alberton <l.alberton@quipo.it>
+ * @license  http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
+ * @link     http://sna-projects.com/kafka/
+ */
+abstract class Kafka_Request
+{
+	/**
+	 * @var integer
+	 */
+	public $id;
+}
+

Added: incubator/kafka/trunk/clients/php/src/lib/Kafka/RequestKeys.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/lib/Kafka/RequestKeys.php?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/lib/Kafka/RequestKeys.php (added)
+++ incubator/kafka/trunk/clients/php/src/lib/Kafka/RequestKeys.php Mon Aug  1 23:41:24 2011
@@ -0,0 +1,30 @@
+<?php
+/**
+ * Kafka Client
+ *
+ * @category  Libraries
+ * @package   Kafka
+ * @author    Lorenzo Alberton <l.alberton@quipo.it>
+ * @copyright 2011 Lorenzo Alberton
+ * @license   http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
+ * @version   $Revision: $
+ * @link      http://sna-projects.com/kafka/
+ */
+
+/**
+ * Some constants for request keys
+ *
+ * @category Libraries
+ * @package  Kafka
+ * @author   Lorenzo Alberton <l.alberton@quipo.it>
+ * @license  http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
+ * @link     http://sna-projects.com/kafka/
+ */
+class Kafka_RequestKeys
+{
+	const PRODUCE      = 0;
+	const FETCH        = 1;
+	const MULTIFETCH   = 2;
+	const MULTIPRODUCE = 3;
+	const OFFSETS      = 4;
+}

Added: incubator/kafka/trunk/clients/php/src/lib/Kafka/SimpleConsumer.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/lib/Kafka/SimpleConsumer.php?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/lib/Kafka/SimpleConsumer.php (added)
+++ incubator/kafka/trunk/clients/php/src/lib/Kafka/SimpleConsumer.php Mon Aug  1 23:41:24 2011
@@ -0,0 +1,142 @@
+<?php
+/**
+ * Kafka Client
+ *
+ * @category  Libraries
+ * @package   Kafka
+ * @author    Lorenzo Alberton <l.alberton@quipo.it>
+ * @copyright 2011 Lorenzo Alberton
+ * @license   http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
+ * @version   $Revision: $
+ * @link      http://sna-projects.com/kafka/
+ */
+
+/**
+ * Simple Kafka Consumer
+ *
+ * @category Libraries
+ * @package  Kafka
+ * @author   Lorenzo Alberton <l.alberton@quipo.it>
+ * @license  http://www.apache.org/licenses/LICENSE-2.0 Apache License, Version 2.0
+ * @link     http://sna-projects.com/kafka/
+ */
+class Kafka_SimpleConsumer
+{
+	/**
+	 * @var string
+	 */
+	protected $host             = 'localhost';
+	
+	/**
+	 * @var integer
+	 */
+	protected $port             = 9092;
+	
+	/**
+	 * @var integer
+	 */
+	protected $socketTimeout    = 10;
+	
+	/**
+	 * @var integer
+	 */
+	protected $socketBufferSize = 1000000;
+
+	/**
+	 * @var resource
+	 */
+	protected $conn = null;
+	
+	/**
+	 * Constructor
+	 * 
+	 * @param integer $host             Kafka Hostname
+	 * @param integer $port             Port
+	 * @param integer $socketTimeout    Socket timeout
+	 * @param integer $socketBufferSize Socket max buffer size
+	 */
+	public function __construct($host, $port, $socketTimeout, $socketBufferSize) {
+		$this->host = $host;
+		$this->port = $port;
+		$this->socketTimeout    = $socketTimeout;
+		$this->socketBufferSize = $socketBufferSize;
+	}
+	
+	/**
+	 * Connect to Kafka via socket
+	 * 
+	 * @return void
+	 */
+	public function connect() {
+		if (!is_resource($this->conn)) {
+			$this->conn = stream_socket_client('tcp://' . $this->host . ':' . $this->port, $errno, $errstr);
+			if (!$this->conn) {
+				throw new RuntimeException($errstr, $errno);
+			}
+			stream_set_timeout($this->conn,      $this->socketTimeout);
+			stream_set_read_buffer($this->conn,  $this->socketBufferSize);
+			stream_set_write_buffer($this->conn, $this->socketBufferSize);
+			//echo "\nConnected to ".$this->host.":".$this->port."\n";
+		}
+	}
+
+	/**
+	 * Close the connection
+	 * 
+	 * @return void
+	 */
+	public function close() {
+		if (is_resource($this->conn)) {
+			fclose($this->conn);
+		}
+	}
+
+	/**
+	 * Send a request and fetch the response
+	 * 
+	 * @param Kafka_FetchRequest $req Request
+	 *
+	 * @return Kafka_MessageSet $messages
+	 */
+	public function fetch(Kafka_FetchRequest $req) {
+		$this->connect();
+		$this->sendRequest($req);
+		//echo "\nRequest sent: ".(string)$req."\n";
+		$response = $this->getResponse();
+		//var_dump($response);
+		$this->close();
+		return new Kafka_MessageSet($response['response']->buffer, $response['errorCode']);
+	}
+	
+	/**
+	 * Send the request
+	 * 
+	 * @param Kafka_FetchRequest $req Request
+	 * 
+	 * @return void
+	 */
+	protected function sendRequest(Kafka_FetchRequest $req) {
+		$send = new Kafka_BoundedByteBuffer_Send($req);
+		$send->writeCompletely($this->conn);
+	}
+	
+	/**
+	 * Get the response
+	 * 
+	 * @return array
+	 */
+	protected function getResponse() {
+		$response = new Kafka_BoundedByteBuffer_Receive();
+		$response->readCompletely($this->conn);
+		
+		rewind($response->buffer);
+		// this has the side effect of setting the initial position of buffer correctly
+		$errorCode = array_shift(unpack('n', fread($response->buffer, 2))); 
+		//rewind($response->buffer);
+		return array(
+			'response'  => $response, 
+			'errorCode' => $errorCode,
+		);
+	}
+	
+}

Added: incubator/kafka/trunk/clients/php/src/tests/Kafka/BoundedByteBuffer/ReceiveTest.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/tests/Kafka/BoundedByteBuffer/ReceiveTest.php?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/tests/Kafka/BoundedByteBuffer/ReceiveTest.php (added)
+++ incubator/kafka/trunk/clients/php/src/tests/Kafka/BoundedByteBuffer/ReceiveTest.php Mon Aug  1 23:41:24 2011
@@ -0,0 +1,116 @@
+<?php
+if (!defined('PRODUCE_REQUEST_ID')) {
+	define('PRODUCE_REQUEST_ID', 0);
+}
+
+/**
+ * Description of Kafka_BoundedByteBuffer_ReceiveTest
+ *
+ * @author Lorenzo Alberton <l.alberton@quipo.it>
+ */
+class Kafka_BoundedByteBuffer_ReceiveTest extends PHPUnit_Framework_TestCase
+{
+	private $stream = null;
+	private $size1  = 0;
+	private $msg1   = '';
+	private $size2  = 0;
+	private $msg2   = '';
+	
+	/**
+	 * @var Kafka_BoundedByteBuffer_Receive
+	 */
+	private $obj = null;
+	
+	/**
+	 * Append two message sets to a sample stream to verify that only the first one is read
+	 */
+	public function setUp() {
+		$this->stream = fopen('php://temp', 'w+b');
+		$this->msg1 = 'test message';
+		$this->msg2 = 'another message';
+		$this->size1 = strlen($this->msg1);
+		$this->size2 = strlen($this->msg2);
+		fwrite($this->stream, pack('N', $this->size1));
+		fwrite($this->stream, $this->msg1);
+		fwrite($this->stream, pack('N', $this->size2));
+		fwrite($this->stream, $this->msg2);
+		rewind($this->stream);
+		$this->obj = new Kafka_BoundedByteBuffer_Receive;
+	}
+
+	public function tearDown() {
+		fclose($this->stream);
+		unset($this->obj);
+	}
+	
+	public function testReadFrom() {
+		$this->assertEquals($this->size1 + 4, $this->obj->readFrom($this->stream));
+		$this->assertEquals($this->msg1, stream_get_contents($this->obj->buffer));
+		//test that we don't go beyond the first message set
+		$this->assertEquals(0, $this->obj->readFrom($this->stream));
+		$this->assertEquals($this->size1 + 4, ftell($this->stream));
+	}
+	
+	public function testReadCompletely() {
+		$this->assertEquals($this->size1 + 4, $this->obj->readCompletely($this->stream));
+		$this->assertEquals($this->msg1, stream_get_contents($this->obj->buffer));
+		//test that we don't go beyond the first message set
+		$this->assertEquals(0, $this->obj->readCompletely($this->stream));
+		$this->assertEquals($this->size1 + 4, ftell($this->stream));
+	}
+	
+	public function testReadFromOffset() {
+		fseek($this->stream, $this->size1 + 4);
+		$this->obj = new Kafka_BoundedByteBuffer_Receive;
+		$this->assertEquals($this->size2 + 4, $this->obj->readFrom($this->stream));
+		$this->assertEquals($this->msg2, stream_get_contents($this->obj->buffer));
+		//test that we reached the end of the stream (2nd message set)
+		$this->assertEquals(0, $this->obj->readFrom($this->stream));
+		$this->assertEquals($this->size1 + 4 + $this->size2 + 4, ftell($this->stream));
+	}
+	
+	public function testReadCompletelyOffset() {
+		fseek($this->stream, $this->size1 + 4);
+		$this->obj = new Kafka_BoundedByteBuffer_Receive;
+		$this->assertEquals($this->size2 + 4, $this->obj->readCompletely($this->stream));
+		$this->assertEquals($this->msg2, stream_get_contents($this->obj->buffer));
+		//test that we reached the end of the stream (2nd message set)
+		$this->assertEquals(0, $this->obj->readCompletely($this->stream));
+		$this->assertEquals($this->size1 + 4 + $this->size2 + 4, ftell($this->stream));
+	}
+	
+	/**
+	 * @expectedException RuntimeException
+	 */
+	public function testInvalidStream() {
+		$this->stream = fopen('php://temp', 'w+b');
+		$this->obj->readFrom($this->stream);
+		$this->fail('The above call should throw an exception');	
+	}
+	
+	/**
+	 * @expectedException RuntimeException
+	 */
+	public function testInvalidSizeTooBig() {
+		$maxSize = 10;
+		$this->obj = new Kafka_BoundedByteBuffer_Receive($maxSize);
+		$this->stream = fopen('php://temp', 'w+b');
+		fwrite($this->stream, pack('N', $maxSize + 1));
+		fwrite($this->stream, $this->msg1);
+		rewind($this->stream);
+		$this->obj->readFrom($this->stream);
+		$this->fail('The above call should throw an exception');
+	}
+	
+	/**
+	 * @expectedException RuntimeException
+	 */
+	public function testInvalidSizeNotPositive() {
+		$this->stream = fopen('php://temp', 'w+b');
+		fwrite($this->stream, pack('N', 0));
+		fwrite($this->stream, '');
+		rewind($this->stream);
+		$this->obj->readFrom($this->stream);
+		$this->fail('The above call should throw an exception');
+	}
+}

Added: incubator/kafka/trunk/clients/php/src/tests/Kafka/BoundedByteBuffer/SendTest.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/tests/Kafka/BoundedByteBuffer/SendTest.php?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/tests/Kafka/BoundedByteBuffer/SendTest.php (added)
+++ incubator/kafka/trunk/clients/php/src/tests/Kafka/BoundedByteBuffer/SendTest.php Mon Aug  1 23:41:24 2011
@@ -0,0 +1,83 @@
+<?php
+
+/**
+ * Description of Kafka_BoundedByteBuffer_SendTest
+ *
+ * @author Lorenzo Alberton <l.alberton@quipo.it>
+ */
+class Kafka_BoundedByteBuffer_SendTest extends PHPUnit_Framework_TestCase
+{
+	private $stream;
+	private $topic;
+	private $partition;
+	private $offset;
+	
+	/**
+	 * @var Kafka_FetchRequest
+	 */
+	private $req;
+	
+	/**
+	 * @var Kafka_BoundedByteBuffer_Send
+	 */
+	private $obj = null;
+
+	public function setUp() {
+		$this->stream = fopen('php://temp', 'w+b');
+		$this->topic     = 'a test topic';
+		$this->partition = 0;
+		$this->offset    = 0;
+		$maxSize         = 10000;
+		$this->req = new Kafka_FetchRequest($this->topic, $this->partition, $this->offset, $maxSize);
+		$this->obj = new Kafka_BoundedByteBuffer_Send($this->req);
+	}
+
+	public function tearDown() {
+		fclose($this->stream);
+		unset($this->obj);
+	}
+	
+	public function testWriteTo() {
+		// 4 bytes = size
+		// 2 bytes = request ID
+		$this->assertEquals(4 + $this->req->sizeInBytes() + 2, $this->obj->writeTo($this->stream));
+	}
+	
+	public function testWriteCompletely() {
+		// 4 bytes = size
+		// 2 bytes = request ID
+		$this->assertEquals(4 + $this->req->sizeInBytes() + 2, $this->obj->writeCompletely($this->stream));
+	}
+	
+	public function testWriteToWithBigRequest() {
+		$topicSize = 9000;
+		$this->topic = str_repeat('a', $topicSize); //bigger than the fread buffer, 8192
+		$this->req = new Kafka_FetchRequest($this->topic, $this->partition, $this->offset);
+		$this->obj = new Kafka_BoundedByteBuffer_Send($this->req);
+		// 4 bytes = size
+		// 2 bytes = request ID
+		//$this->assertEquals(4 + $this->req->sizeInBytes() + 2, $this->obj->writeTo($this->stream));
+		$written = $this->obj->writeTo($this->stream);
+		$this->assertEquals(4 + 8192, $written);
+		$this->assertTrue($written < $topicSize);
+	}
+	
+	public function testWriteCompletelyWithBigRequest() {
+		$topicSize = 9000;
+		$this->topic = str_repeat('a', $topicSize); //bigger than the fread buffer, 8192
+		$this->req = new Kafka_FetchRequest($this->topic, $this->partition, $this->offset);
+		$this->obj = new Kafka_BoundedByteBuffer_Send($this->req);
+		// 4 bytes = size
+		// 2 bytes = request ID
+		$this->assertEquals(4 + $this->req->sizeInBytes() + 2, $this->obj->writeCompletely($this->stream));
+	}
+	
+	/**
+	 * @expectedException RuntimeException
+	 */
+	public function testWriteInvalidStream() {
+		$this->stream = fopen('php://temp', 'rb'); //read-only mode
+		$this->obj->writeTo($this->stream);
+		$this->fail('the above call should throw an exception');
+	}	
+}

Added: incubator/kafka/trunk/clients/php/src/tests/Kafka/EncoderTest.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/tests/Kafka/EncoderTest.php?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/tests/Kafka/EncoderTest.php (added)
+++ incubator/kafka/trunk/clients/php/src/tests/Kafka/EncoderTest.php Mon Aug  1 23:41:24 2011
@@ -0,0 +1,44 @@
+<?php
+if (!defined('PRODUCE_REQUEST_ID')) {
+	define('PRODUCE_REQUEST_ID', 0);
+}
+
+/**
+ * Description of EncoderTest
+ *
+ * @author Lorenzo Alberton <l.alberton@quipo.it>
+ */
+class Kafka_EncoderTest extends PHPUnit_Framework_TestCase
+{
+	public function testEncodedMessageLength() {
+		$test = 'a sample string';
+		$encoded = Kafka_Encoder::encode_message($test);
+		$this->assertEquals(5 + strlen($test), strlen($encoded));
+	}
+	
+	public function testByteArrayContainsString() {
+		$test = 'a sample string';
+		$encoded = Kafka_Encoder::encode_message($test);
+		$this->assertContains($test, $encoded);
+	}
+	
+	public function testEncodedMessages() {
+		$topic     = 'sample topic';
+		$partition = 1;
+		$messages  = array(
+			'test 1',
+			'test 2 abcde',
+		);
+		$encoded = Kafka_Encoder::encode_produce_request($topic, $partition, $messages);
+		$this->assertContains($topic, $encoded);
+		$this->assertContains($partition, $encoded);
+		foreach ($messages as $msg) {
+			$this->assertContains($msg, $encoded);
+		}
+		$size = 4 + 2 + 2 + strlen($topic) + 4 + 4;
+		foreach ($messages as $msg) {
+			$size += 9 + strlen($msg);
+		}
+		$this->assertEquals($size, strlen($encoded));
+	}
+}

Added: incubator/kafka/trunk/clients/php/src/tests/Kafka/FetchRequestTest.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/tests/Kafka/FetchRequestTest.php?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/tests/Kafka/FetchRequestTest.php (added)
+++ incubator/kafka/trunk/clients/php/src/tests/Kafka/FetchRequestTest.php Mon Aug  1 23:41:24 2011
@@ -0,0 +1,71 @@
+<?php
+
+/**
+ * Description of FetchRequestTest
+ *
+ * @author Lorenzo Alberton <l.alberton@quipo.it>
+ */
+class Kafka_FetchRequestTest extends PHPUnit_Framework_TestCase
+{
+	private $topic;
+	private $partition;
+	private $offset;
+	private $maxSize;
+	
+	/**
+	 * @var Kafka_FetchRequest
+	 */
+	private $req;
+
+	public function setUp() {
+		$this->topic     = 'a test topic';
+		$this->partition = 0;
+		$this->offset    = 0;
+		$this->maxSize   = 10000;
+		$this->req = new Kafka_FetchRequest($this->topic, $this->partition, $this->offset, $this->maxSize);
+	}
+	
+	public function testRequestSize() {
+		$this->assertEquals(18 + strlen($this->topic) , $this->req->sizeInBytes());
+	}
+	
+	public function testGetters() {
+		$this->assertEquals($this->topic,     $this->req->getTopic());
+		$this->assertEquals($this->offset,    $this->req->getOffset());
+		$this->assertEquals($this->partition, $this->req->getPartition());
+	}
+	
+	public function testWriteTo() {
+		$stream = fopen('php://temp', 'w+b');
+		$this->req->writeTo($stream);
+		rewind($stream);
+		$data = stream_get_contents($stream);
+		fclose($stream);
+		$this->assertEquals(strlen($data), $this->req->sizeInBytes());
+		$this->assertContains($this->topic, $data);
+		$this->assertContains($this->partition, $data);
+	}
+	
+	public function testWriteToOffset() {
+		$this->offset = 14;
+		$this->req = new Kafka_FetchRequest($this->topic, $this->partition, $this->offset, $this->maxSize);
+		$stream = fopen('php://temp', 'w+b');
+		$this->req->writeTo($stream);
+		rewind($stream);
+		//read it back
+		$topicLen = array_shift(unpack('n', fread($stream, 2)));
+		$this->assertEquals(strlen($this->topic), $topicLen);
+		$this->assertEquals($this->topic,     fread($stream, $topicLen));
+		$this->assertEquals($this->partition, array_shift(unpack('N', fread($stream, 4))));
+		$int64bit = unpack('N2', fread($stream, 8));
+		$this->assertEquals($this->offset,    $int64bit[2]);
+		$this->assertEquals($this->maxSize,   array_shift(unpack('N', fread($stream, 4))));
+	}
+	
+	public function testToString() {
+		$this->assertContains('topic:'   . $this->topic,     (string)$this->req);
+		$this->assertContains('part:'    . $this->partition, (string)$this->req);
+		$this->assertContains('offset:'  . $this->offset,    (string)$this->req);
+		$this->assertContains('maxSize:' . $this->maxSize,   (string)$this->req);
+	}
+}
\ No newline at end of file



Mime
View raw message