kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1173797 [3/10] - in /incubator/kafka/trunk/clients/csharp: ./ lib/StyleCop/ src/Kafka/ src/Kafka/Kafka.Client/ src/Kafka/Kafka.Client/Cfg/ src/Kafka/Kafka.Client/Cluster/ src/Kafka/Kafka.Client/Consumers/ src/Kafka/Kafka.Client/Exceptions/...
Date Wed, 21 Sep 2011 19:17:25 GMT
Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaConnection.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaConnection.cs?rev=1173797&r1=1173796&r2=1173797&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaConnection.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaConnection.cs Wed Sep 21 19:17:19 2011
@@ -1,204 +1,296 @@
-using System;
-using System.Net.Sockets;
-using System.Threading;
-using Kafka.Client.Request;
-
-namespace Kafka.Client
-{
-    /// <summary>
-    /// Callback made when a message request is finished being sent asynchronously.
-    /// </summary>
-    /// <typeparam name="T">
-    /// Must be of type <see cref="AbstractRequest"/> and represents the type of message 
-    /// sent to Kafka.
-    /// </typeparam>
-    /// <param name="request">The request that was sent to the server.</param>
-    public delegate void MessageSent<T>(RequestContext<T> request) where T : AbstractRequest;
-
-    /// <summary>
-    /// Manages connections to the Kafka.
-    /// </summary>
-    public class KafkaConnection : IDisposable
-    {
-        /// <summary>
-        /// TCP client that connects to the server.
-        /// </summary>
-        private TcpClient _client;
-
-        /// <summary>
-        /// Initializes a new instance of the KafkaConnection class.
-        /// </summary>
-        /// <param name="server">The server to connect to.</param>
-        /// <param name="port">The port to connect to.</param>
-        public KafkaConnection(string server, int port)
-        {
-            Server = server;
-            Port = port;
-
-            // connection opened
-            _client = new TcpClient(server, port);
-        }
-
-        /// <summary>
-        /// Gets the server to which the connection is to be established.
-        /// </summary>
-        public string Server { get; private set; }
-        
-        /// <summary>
-        /// Gets the port to which the connection is to be established.
-        /// </summary>
-        public int Port { get; private set; }
-
-        /// <summary>
-        /// Readds data from the server.
-        /// </summary>
-        /// <remarks>
-        /// Defauls the amount of time that a read operation blocks waiting for data to <see cref="Timeout.Infinite"/>.
-        /// </remarks>
-        /// <param name="size">The number of bytes to read from the server.</param>
-        /// <returns>The data read from the server as a byte array.</returns>
-        public byte[] Read(int size)
-        {
-            return Read(size, Timeout.Infinite);
-        }
-
-        /// <summary>
-        /// Readds data from the server.
-        /// </summary>
-        /// <param name="size">The number of bytes to read from the server.</param>
-        /// <param name="readTimeout">The amount of time that a read operation blocks waiting for data.</param>
-        /// <returns>The data read from the server as a byte array.</returns>
-        public byte[] Read(int size, int readTimeout)
-        {
-            NetworkStream stream = _client.GetStream();
-            stream.ReadTimeout = readTimeout;
-
-            byte[] bytes = new byte[size];
-            bool readComplete = false;
-            int numberOfTries = 0;
-
-            while (!readComplete && numberOfTries < 1000)
-            {
-                if (stream.DataAvailable)
-                {
-                    stream.Read(bytes, 0, size);
-                    readComplete = true;
-                }
-                else
-                {
-                    // wait until the server is ready to send some stuff.
-                    numberOfTries++;
-                    Thread.Sleep(10);
-                }
-            } 
-            
-            return bytes;
-        }
-        
-        /// <summary>
-        /// Writes a producer request to the server asynchronously.
-        /// </summary>
-        /// <param name="request">The request to make.</param>
-        /// <param name="callback">The code to execute once the message is completely sent.</param>
-        public void BeginWrite(ProducerRequest request, MessageSent<ProducerRequest> callback)
-        {
-            NetworkStream stream = _client.GetStream();
-            RequestContext<ProducerRequest> ctx = new RequestContext<ProducerRequest>(stream, request);
-
-            byte[] data = request.GetBytes();
-            stream.BeginWrite(
-                data, 
-                0, 
-                data.Length, 
-                delegate(IAsyncResult asyncResult)
-                {
-                    RequestContext<ProducerRequest> context = (RequestContext<ProducerRequest>)asyncResult.AsyncState;
-
-                    if (callback != null)
-                    {
-                        callback(context);
-                    }
-
-                    context.NetworkStream.EndWrite(asyncResult);
-                    context.NetworkStream.Dispose();
-                }, 
-                ctx);
-        }
-
-        /// <summary>
-        /// Writes a producer request to the server asynchronously.
-        /// </summary>
-        /// <remarks>
-        /// The default callback simply calls the <see cref="NetworkStream.EndWrite"/>. This is
-        /// basically a low level fire and forget call.
-        /// </remarks>
-        /// <param name="data">The data to send to the server.</param>
-        public void BeginWrite(byte[] data)
-        {
-            NetworkStream stream = _client.GetStream();
-            stream.BeginWrite(data, 0, data.Length, (asyncResult) => ((NetworkStream)asyncResult.AsyncState).EndWrite(asyncResult), stream);
-        }
-
-        /// <summary>
-        /// Writes data to the server.
-        /// </summary>
-        /// <remarks>
-        /// Write timeout is defaulted to infinite.
-        /// </remarks>
-        /// <param name="data">The data to write to the server.</param>
-        public void Write(byte[] data)
-        {
-            Write(data, Timeout.Infinite);
-        }
-
-        /// <summary>
-        /// Writes a producer request to the server.
-        /// </summary>
-        /// <remarks>
-        /// Write timeout is defaulted to infitite.
-        /// </remarks>
-        /// <param name="request">The <see cref="ProducerRequest"/> to send to the server.</param>
-        public void Write(ProducerRequest request)
-        {
-            Write(request.GetBytes());
-        }
-
-        /// <summary>
-        /// Writes a multi-producer request to the server.
-        /// </summary>
-        /// <remarks>
-        /// Write timeout is defaulted to infitite.
-        /// </remarks>
-        /// <param name="request">The <see cref="MultiProducerRequest"/> to send to the server.</param>
-        public void Write(MultiProducerRequest request)
-        {
-            Write(request.GetBytes());
-        }
-
-        /// <summary>
-        /// Writes data to the server.
-        /// </summary>
-        /// <param name="data">The data to write to the server.</param>
-        /// <param name="writeTimeout">The amount of time that a write operation blocks waiting for data.</param>
-        public void Write(byte[] data, int writeTimeout)
-        {
-            NetworkStream stream = _client.GetStream();
-            stream.WriteTimeout = writeTimeout;
-
-            // Send the message to the connected TcpServer. 
-            stream.Write(data, 0, data.Length);
-        }
-
-        /// <summary>
-        /// Close the connection to the server.
-        /// </summary>
-        public void Dispose()
-        {
-            if (_client != null)
-            {
-                _client.GetStream().Close();
-                _client.Close();
-            }
-        }
-    }
-}
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client
+{
+    using System;
+    using System.IO;
+    using System.Net.Sockets;
+    using System.Threading;
+    using Kafka.Client.Producers.Async;
+    using Kafka.Client.Requests;
+    using Kafka.Client.Utils;
+
+    /// <summary>
+    /// Manages connections to the Kafka.
+    /// </summary>
+    public class KafkaConnection : IDisposable
+    {
+        /// <summary>
+        /// TCP client that connects to the server.
+        /// </summary>
+        private readonly TcpClient client;
+
+        private volatile bool disposed;
+
+        /// <summary>
+        /// Initializes a new instance of the KafkaConnection class.
+        /// </summary>
+        /// <param name="server">The server to connect to.</param>
+        /// <param name="port">The port to connect to.</param>
+        public KafkaConnection(string server, int port)
+        {
+            Server = server;
+            Port = port;
+
+            // connection opened
+            client = new TcpClient(server, port);
+        }
+
+        /// <summary>
+        /// Gets the server to which the connection is to be established.
+        /// </summary>
+        public string Server { get; private set; }
+        
+        /// <summary>
+        /// Gets the port to which the connection is to be established.
+        /// </summary>
+        public int Port { get; private set; }
+
+        /// <summary>
+        /// Readds data from the server.
+        /// </summary>
+        /// <remarks>
+        /// Defauls the amount of time that a read operation blocks waiting for data to <see cref="Timeout.Infinite"/>.
+        /// </remarks>
+        /// <param name="size">The number of bytes to read from the server.</param>
+        /// <returns>The data read from the server as a byte array.</returns>
+        public byte[] Read(int size)
+        {
+            this.EnsuresNotDisposed();
+            return Read(size, Timeout.Infinite);
+        }
+
+        /// <summary>
+        /// Readds data from the server.
+        /// </summary>
+        /// <param name="size">The number of bytes to read from the server.</param>
+        /// <param name="readTimeout">The amount of time that a read operation blocks waiting for data.</param>
+        /// <returns>The data read from the server as a byte array.</returns>
+        public byte[] Read(int size, int readTimeout)
+        {
+            this.EnsuresNotDisposed();
+            byte[] bytes;
+            NetworkStream stream = client.GetStream();
+            stream.ReadTimeout = readTimeout;
+            int numberOfTries = 0;
+
+            int readSize = size;
+            if (client.ReceiveBufferSize < size)
+            {
+                readSize = client.ReceiveBufferSize;
+            }
+
+            using (var ms = new MemoryStream())
+            {
+                var bytesToRead = new byte[client.ReceiveBufferSize];
+
+                while (true)
+                {
+                    int numberOfBytesRead = stream.Read(bytesToRead, 0, readSize);
+                    if (numberOfBytesRead > 0)
+                    {
+                        ms.Write(bytesToRead, 0, numberOfBytesRead);
+                    }
+
+                    if (ms.Length >= size)
+                    {
+                        break;
+                    }
+
+                    if (numberOfBytesRead == 0)
+                    {
+                        if (numberOfTries >= 1000)
+                        {
+                            break;
+                        }
+
+                        numberOfTries++;
+                        Thread.Sleep(10);
+                    }
+                }
+
+                bytes = new byte[ms.Length];
+                ms.Seek(0, SeekOrigin.Begin);
+                ms.Read(bytes, 0, (int)ms.Length);
+            }
+
+            return bytes;
+        }
+
+        /// <summary>
+        /// Writes a producer request to the server asynchronously.
+        /// </summary>
+        /// <param name="request">The request to make.</param>
+        public void BeginWrite(ProducerRequest request)
+        {
+            this.EnsuresNotDisposed();
+            Guard.Assert<ArgumentNullException>(() => request != null);
+            NetworkStream stream = client.GetStream();
+            byte[] data = request.RequestBuffer.GetBuffer();
+            stream.BeginWrite(data, 0, data.Length, asyncResult => ((NetworkStream)asyncResult.AsyncState).EndWrite(asyncResult), stream);
+        }
+        
+        /// <summary>
+        /// Writes a producer request to the server asynchronously.
+        /// </summary>
+        /// <param name="request">The request to make.</param>
+        /// <param name="callback">The code to execute once the message is completely sent.</param>
+        /// <remarks>
+        /// Do not dispose connection till callback is invoked, 
+        /// otherwise underlying network stream will be closed.
+        /// </remarks>
+        public void BeginWrite(ProducerRequest request, MessageSent<ProducerRequest> callback)
+        {
+            this.EnsuresNotDisposed();
+            Guard.Assert<ArgumentNullException>(() => request != null);
+            if (callback == null)
+            {
+                this.BeginWrite(request);
+                return;
+            }
+
+            NetworkStream stream = client.GetStream();
+            var ctx = new RequestContext<ProducerRequest>(stream, request);
+
+            byte[] data = request.RequestBuffer.GetBuffer();
+            stream.BeginWrite(
+                data,
+                0,
+                data.Length,
+                delegate(IAsyncResult asyncResult)
+                    {
+                        var context = (RequestContext<ProducerRequest>)asyncResult.AsyncState;
+                        callback(context);
+                        context.NetworkStream.EndWrite(asyncResult);
+                    },
+                ctx);
+        }
+
+        /// <summary>
+        /// Writes a producer request to the server.
+        /// </summary>
+        /// <remarks>
+        /// Write timeout is defaulted to infitite.
+        /// </remarks>
+        /// <param name="request">The <see cref="ProducerRequest"/> to send to the server.</param>
+        public void Write(ProducerRequest request)
+        {
+            this.EnsuresNotDisposed();
+            Guard.Assert<ArgumentNullException>(() => request != null);
+            this.Write(request.RequestBuffer.GetBuffer(), Timeout.Infinite);
+        }
+
+        /// <summary>
+        /// Writes a multi-producer request to the server.
+        /// </summary>
+        /// <remarks>
+        /// Write timeout is defaulted to infitite.
+        /// </remarks>
+        /// <param name="request">The <see cref="MultiProducerRequest"/> to send to the server.</param>
+        public void Write(MultiProducerRequest request)
+        {
+            this.EnsuresNotDisposed();
+            Guard.Assert<ArgumentNullException>(() => request != null);
+            this.Write(request.RequestBuffer.GetBuffer(), Timeout.Infinite);
+        }
+
+        /// <summary>
+        /// Writes data to the server.
+        /// </summary>
+        /// <param name="data">The data to write to the server.</param>
+        /// <param name="writeTimeout">The amount of time that a write operation blocks waiting for data.</param>
+        private void Write(byte[] data, int writeTimeout)
+        {
+            NetworkStream stream = this.client.GetStream();
+            stream.WriteTimeout = writeTimeout;
+
+            // Send the message to the connected TcpServer. 
+            stream.Write(data, 0, data.Length);
+        }
+
+        /// <summary>
+        /// Writes a fetch request to the server.
+        /// </summary>
+        /// <remarks>
+        /// Write timeout is defaulted to infitite.
+        /// </remarks>
+        /// <param name="request">The <see cref="FetchRequest"/> to send to the server.</param>
+        public void Write(FetchRequest request)
+        {
+            this.EnsuresNotDisposed();
+            Guard.Assert<ArgumentNullException>(() => request != null);
+            this.Write(request.RequestBuffer.GetBuffer(), Timeout.Infinite);
+        }
+
+        /// <summary>
+        /// Writes a multifetch request to the server.
+        /// </summary>
+        /// <remarks>
+        /// Write timeout is defaulted to infitite.
+        /// </remarks>
+        /// <param name="request">The <see cref="MultiFetchRequest"/> to send to the server.</param>
+        public void Write(MultiFetchRequest request)
+        {
+            this.EnsuresNotDisposed();
+            Guard.Assert<ArgumentNullException>(() => request != null);
+            this.Write(request.RequestBuffer.GetBuffer(), Timeout.Infinite);
+        }
+
+        /// <summary>
+        /// Writes a offset request to the server.
+        /// </summary>
+        /// <remarks>
+        /// Write timeout is defaulted to infitite.
+        /// </remarks>
+        /// <param name="request">The <see cref="OffsetRequest"/> to send to the server.</param>
+        public void Write(OffsetRequest request)
+        {
+            this.EnsuresNotDisposed();
+            Guard.Assert<ArgumentNullException>(() => request != null);
+            this.Write(request.RequestBuffer.GetBuffer(), Timeout.Infinite);
+        }
+
+        /// <summary>
+        /// Close the connection to the server.
+        /// </summary>
+        public void Dispose()
+        {
+            if (this.disposed)
+            {
+                return;
+            }
+
+            this.disposed = true;
+            if (this.client != null)
+            {
+                this.client.GetStream().Close();
+                this.client.Close();
+            }
+        }
+
+        /// <summary>
+        /// Ensures that object was not disposed
+        /// </summary>
+        private void EnsuresNotDisposed()
+        {
+            if (this.disposed)
+            {
+                throw new ObjectDisposedException(this.GetType().Name);
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/BoundedBuffer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/BoundedBuffer.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/BoundedBuffer.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/BoundedBuffer.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Messages
+{
+    using System.IO;
+
+    /// <summary>
+    /// Wrapper over memory set with fixed capacity
+    /// </summary>
+    internal class BoundedBuffer : MemoryStream
+    {
+        /// <summary>
+        /// Initializes a new instance of the <see cref="BoundedBuffer"/> class.
+        /// </summary>
+        /// <param name="size">
+        /// The max size of stream.
+        /// </param>
+        public BoundedBuffer(int size)
+            : base(new byte[size], 0, size, true, true)
+        {
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/BufferedMessageSet.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/BufferedMessageSet.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/BufferedMessageSet.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/BufferedMessageSet.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Messages
+{
+    using System;
+    using System.Collections.Generic;
+    using System.IO;
+    using System.Linq;
+    using System.Text;
+    using Kafka.Client.Serialization;
+    using Kafka.Client.Utils;
+
+    /// <summary>
+    /// A collection of messages stored as memory stream
+    /// </summary>
+    public class BufferedMessageSet : MessageSet
+    {
+        /// <summary>
+        /// Gets the error code
+        /// </summary>
+        public int ErrorCode { get; private set; }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="BufferedMessageSet"/> class.
+        /// </summary>
+        /// <param name="messages">
+        /// The list of messages.
+        /// </param>
+        public BufferedMessageSet(IEnumerable<Message> messages) : this(messages, ErrorMapping.NoError)
+        {
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="BufferedMessageSet"/> class.
+        /// </summary>
+        /// <param name="messages">
+        /// The list of messages.
+        /// </param>
+        /// <param name="errorCode">
+        /// The error code.
+        /// </param>
+        public BufferedMessageSet(IEnumerable<Message> messages, int errorCode)
+        {
+            int length = GetMessageSetSize(messages);
+            this.Messages = messages;
+            this.SetBuffer = new BoundedBuffer(length);
+            this.WriteTo(this.SetBuffer);
+            this.ErrorCode = errorCode;
+        }
+
+        /// <summary>
+        /// Gets set internal buffer
+        /// </summary>
+        public MemoryStream SetBuffer { get; private set; }
+
+        /// <summary>
+        /// Gets the list of messages.
+        /// </summary>
+        public IEnumerable<Message> Messages { get; private set; }
+
+        /// <summary>
+        /// Gets the total set size.
+        /// </summary>
+        public override int SetSize
+        {
+            get 
+            {
+                return (int)this.SetBuffer.Length;
+            }
+        }
+
+        /// <summary>
+        /// Writes content into given stream
+        /// </summary>
+        /// <param name="output">
+        /// The output stream.
+        /// </param>
+        public sealed override void WriteTo(MemoryStream output)
+        {
+            Guard.Assert<ArgumentNullException>(() => output != null);
+            using (var writer = new KafkaBinaryWriter(output))
+            {
+                this.WriteTo(writer);
+            }
+        }
+
+        /// <summary>
+        /// Writes content into given writer
+        /// </summary>
+        /// <param name="writer">
+        /// The writer.
+        /// </param>
+        public sealed override void WriteTo(KafkaBinaryWriter writer)
+        {
+            Guard.Assert<ArgumentNullException>(() => writer != null);
+            foreach (var message in this.Messages)
+            {
+                writer.Write(message.Size);
+                message.WriteTo(writer);
+            }
+        }
+
+        /// <summary>
+        /// Gets string representation of set
+        /// </summary>
+        /// <returns>
+        /// String representation of set
+        /// </returns>
+        public override string ToString()
+        {
+            using (var reader = new KafkaBinaryReader(this.SetBuffer))
+            {
+                return ParseFrom(reader, this.SetSize);
+            }
+        }
+
+        /// <summary>
+        /// Helper method to get string representation of set
+        /// </summary>
+        /// <param name="reader">
+        /// The reader.
+        /// </param>
+        /// <param name="count">
+        /// The count.
+        /// </param>
+        /// <returns>
+        /// String representation of set
+        /// </returns>
+        internal static string ParseFrom(KafkaBinaryReader reader, int count)
+        {
+            Guard.Assert<ArgumentNullException>(() => reader != null);
+            var sb = new StringBuilder();
+            int i = 1;
+            while (reader.BaseStream.Position != reader.BaseStream.Length)
+            {
+                sb.Append("Message ");
+                sb.Append(i);
+                sb.Append(" {Length: ");
+                int msgSize = reader.ReadInt32();
+                sb.Append(msgSize);
+                sb.Append(", ");
+                sb.Append(Message.ParseFrom(reader, msgSize));
+                sb.AppendLine("} ");
+                i++;
+            }
+
+            return sb.ToString();
+        }
+
+        internal static BufferedMessageSet ParseFrom(byte[] bytes)
+        {
+            var messages = new List<Message>();
+            int processed = 0;
+            int length = bytes.Length - 4;
+            while (processed <= length)
+            {
+                int messageSize = BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Skip(processed).Take(4).ToArray()), 0);
+                messages.Add(Message.ParseFrom(bytes.Skip(processed).Take(messageSize + 4).ToArray()));
+                processed += 4 + messageSize;
+            }
+
+            return new BufferedMessageSet(messages);
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/Message.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/Message.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/Message.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/Message.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,252 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Messages
+{
+    using System;
+    using System.IO;
+    using System.Linq;
+    using System.Text;
+    using Kafka.Client.Serialization;
+    using Kafka.Client.Utils;
+
+    /// <summary>
+    /// Message send to Kafaka server
+    /// </summary>
+    /// <remarks>
+    /// Format:
+    /// 1 byte "magic" identifier to allow format changes
+    /// 4 byte CRC32 of the payload
+    /// N - 5 byte payload
+    /// </remarks>
+    public class Message : IWritable
+    {
+        private const byte DefaultMagicValue = 0;
+        private const byte DefaultMagicLength = 1;
+        private const byte DefaultCrcLength = 4;
+        private const int DefaultHeaderSize = DefaultMagicLength + DefaultCrcLength;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="Message"/> class.
+        /// </summary>
+        /// <param name="payload">
+        /// The payload.
+        /// </param>
+        /// <param name="checksum">
+        /// The checksum.
+        /// </param>
+        /// <remarks>
+        /// Initializes with default magic number
+        /// </remarks>
+        public Message(byte[] payload, byte[] checksum)
+            : this(payload, DefaultMagicValue, checksum)
+        {
+            Guard.Assert<ArgumentNullException>(() => payload != null);
+            Guard.Assert<ArgumentNullException>(() => checksum != null);
+            Guard.Assert<ArgumentException>(() => checksum.Length == 4);
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="Message"/> class.
+        /// </summary>
+        /// <param name="payload">
+        /// The payload.
+        /// </param>
+        /// <remarks>
+        /// Initializes the magic number as default and the checksum as null. It will be automatically computed.
+        /// </remarks>
+        public Message(byte[] payload)
+            : this(payload, DefaultMagicValue)
+        {
+            Guard.Assert<ArgumentNullException>(() => payload != null);
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the Message class.
+        /// </summary>
+        /// <remarks>
+        /// Initializes the checksum as null.  It will be automatically computed.
+        /// </remarks>
+        /// <param name="payload">The data for the payload.</param>
+        /// <param name="magic">The magic identifier.</param>
+        public Message(byte[] payload, byte magic)
+            : this(payload, magic, Crc32Hasher.Compute(payload))
+        {
+            Guard.Assert<ArgumentNullException>(() => payload != null);
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the Message class.
+        /// </summary>
+        /// <param name="payload">The data for the payload.</param>
+        /// <param name="magic">The magic identifier.</param>
+        /// <param name="checksum">The checksum for the payload.</param>
+        public Message(byte[] payload, byte magic, byte[] checksum)
+        {
+            Guard.Assert<ArgumentNullException>(() => payload != null);
+            Guard.Assert<ArgumentNullException>(() => checksum != null);
+
+            int length = DefaultHeaderSize + payload.Length;
+            this.Payload = payload;
+            this.Magic = magic;
+            this.Checksum = checksum;
+            this.MessageBuffer = new BoundedBuffer(length);
+            this.WriteTo(this.MessageBuffer);
+        }
+
+        /// <summary>
+        /// Gets internal message buffer.
+        /// </summary>
+        public MemoryStream MessageBuffer { get; private set; }
+
+        /// <summary>
+        /// Gets the payload.
+        /// </summary>
+        public byte[] Payload { get; private set; }
+
+        /// <summary>
+        /// Gets the magic bytes.
+        /// </summary>
+        public byte Magic { get; private set; }
+
+        /// <summary>
+        /// Gets the CRC32 checksum for the payload.
+        /// </summary>
+        public byte[] Checksum { get; private set; }
+
+        /// <summary>
+        /// Gets the total size of message.
+        /// </summary>
+        public int Size
+        {
+            get
+            {
+                return (int)this.MessageBuffer.Length;
+            }
+        }
+
+        /// <summary>
+        /// Gets the payload size.
+        /// </summary>
+        public int PayloadSize
+        {
+            get
+            {
+                return this.Payload.Length;
+            }
+        }
+
+        /// <summary>
+        /// Writes message data into given message buffer
+        /// </summary>
+        /// <param name="output">
+        /// The output.
+        /// </param>
+        public void WriteTo(MemoryStream output)
+        {
+            Guard.Assert<ArgumentNullException>(() => output != null);
+
+            using (var writer = new KafkaBinaryWriter(output))
+            {
+                this.WriteTo(writer);
+            }
+        }
+
+        /// <summary>
+        /// Writes message data using given writer
+        /// </summary>
+        /// <param name="writer">
+        /// The writer.
+        /// </param>
+        public void WriteTo(KafkaBinaryWriter writer)
+        {
+            Guard.Assert<ArgumentNullException>(() => writer != null);
+
+            writer.Write(this.Magic);
+            writer.Write(this.Checksum);
+            writer.Write(this.Payload);
+        }
+
+        /// <summary>
+        /// Try to show the payload as decoded to UTF-8.
+        /// </summary>
+        /// <returns>The decoded payload as string.</returns>
+        public override string ToString()
+        {
+            using (var reader = new KafkaBinaryReader(this.MessageBuffer))
+            {
+                return ParseFrom(reader, this.Size);
+            }
+        }
+
+        /// <summary>
+        /// Creates string representation of message
+        /// </summary>
+        /// <param name="reader">
+        /// The reader.
+        /// </param>
+        /// <param name="count">
+        /// The count.
+        /// </param>
+        /// <returns>
+        /// String representation of message
+        /// </returns>
+        public static string ParseFrom(KafkaBinaryReader reader, int count)
+        {
+            Guard.Assert<ArgumentNullException>(() => reader != null);
+            var sb = new StringBuilder();
+            int payloadSize = count - DefaultHeaderSize;
+            sb.Append("Magic: ");
+            sb.Append(reader.ReadByte());
+            sb.Append(", Checksum: ");
+            for (int i = 0; i < 4; i++)
+            {
+                sb.Append("[");
+                sb.Append(reader.ReadByte());
+                sb.Append("]");
+            }
+
+            sb.Append(", topic: ");
+            var encodedPayload = reader.ReadBytes(payloadSize);
+            try
+            {
+                sb.Append(Encoding.UTF8.GetString(encodedPayload));
+            }
+            catch (Exception)
+            {
+                sb.Append("n/a");
+            }
+
+            return sb.ToString();
+        }
+
+        /// <summary>
+        /// Parses a message from a byte array given the format Kafka likes. 
+        /// </summary>
+        /// <param name="data">The data for a message.</param>
+        /// <returns>The message.</returns>
+        [Obsolete("Use KafkaBinaryReader instead")]
+        public static Message ParseFrom(byte[] data)
+        {
+            int size = BitConverter.ToInt32(BitWorks.ReverseBytes(data.Take(4).ToArray()), 0);
+            byte magic = data[4];
+            byte[] checksum = data.Skip(5).Take(4).ToArray();
+            byte[] payload = data.Skip(9).Take(size).ToArray();
+
+            return new Message(payload, magic, checksum);
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/MessageSet.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/MessageSet.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/MessageSet.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/MessageSet.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Messages
+{
+    using System;
+    using System.Collections.Generic;
+    using System.IO;
+    using System.Linq;
+    using Kafka.Client.Serialization;
+    using Kafka.Client.Utils;
+
+    /// <summary>
+    /// A set of messages. A message set has a fixed serialized form, though the container
+    /// for the bytes could be either in-memory or on disk.
+    /// </summary>
+    /// <remarks>
+    /// Format:
+    /// 4 byte size containing an integer N
+    /// N message bytes as described in the message class
+    /// </remarks>
+    public abstract class MessageSet : IWritable
+    {
+        protected const byte DefaultMessageLengthSize = 4;
+
+        /// <summary>
+        /// Gives the size of a size-delimited entry in a message set
+        /// </summary>
+        /// <param name="message">
+        /// The message.
+        /// </param>
+        /// <returns>
+        /// Size of message
+        /// </returns>
+        public static int GetEntrySize(Message message)
+        {
+            Guard.Assert<ArgumentNullException>(() => message != null);
+
+            return message.Size + DefaultMessageLengthSize;
+        }
+
+        /// <summary>
+        /// Gives the size of a list of messages
+        /// </summary>
+        /// <param name="messages">
+        /// The messages.
+        /// </param>
+        /// <returns>
+        /// Size of all messages
+        /// </returns>
+        public static int GetMessageSetSize(IEnumerable<Message> messages)
+        {
+            return messages == null ? 0 : messages.Sum(x => GetEntrySize(x));
+        }
+
+        /// <summary>
+        /// Gets the total size of this message set in bytes
+        /// </summary>
+        public abstract int SetSize { get; }
+
+        /// <summary>
+        /// Writes content into given stream
+        /// </summary>
+        /// <param name="output">
+        /// The output stream.
+        /// </param>
+        public abstract void WriteTo(MemoryStream output);
+
+        /// <summary>
+        /// Writes content into given writer
+        /// </summary>
+        /// <param name="writer">
+        /// The writer.
+        /// </param>
+        public abstract void WriteTo(KafkaBinaryWriter writer);
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/AsyncProducer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/AsyncProducer.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/AsyncProducer.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/AsyncProducer.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers.Async
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using Kafka.Client.Cfg;
+    using Kafka.Client.Messages;
+    using Kafka.Client.Requests;
+    using Kafka.Client.Utils;
+
+    /// <summary>
+    /// Sends messages encapsulated in request to Kafka server asynchronously
+    /// </summary>
+    public class AsyncProducer : IAsyncProducer, IDisposable
+    {
+        private readonly AsyncProducerConfig config;
+        private readonly ICallbackHandler callbackHandler;
+        private KafkaConnection connection = null;
+
+        /// <summary>
+        /// Gets producer config
+        /// </summary>
+        public AsyncProducerConfig Config
+        {
+            get { return config; }
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="AsyncProducer"/> class.
+        /// </summary>
+        /// <param name="config">
+        /// The producer config.
+        /// </param>
+        public AsyncProducer(AsyncProducerConfig config)
+            : this(
+                config,
+                ReflectionHelper.Instantiate<ICallbackHandler>(config.CallbackHandler))
+        {
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="AsyncProducer"/> class.
+        /// </summary>
+        /// <param name="config">
+        /// The producer config.
+        /// </param>
+        /// <param name="callbackHandler">
+        /// The callback invoked when a request is finished being sent.
+        /// </param>
+        public AsyncProducer(
+            AsyncProducerConfig config,
+            ICallbackHandler callbackHandler)
+        {
+            Guard.Assert<ArgumentNullException>(() => config != null);
+
+            this.config = config;
+            this.callbackHandler = callbackHandler;
+            this.connection = new KafkaConnection(this.config.Host, this.config.Port);
+        }
+
+        /// <summary>
+        /// Sends request to Kafka server asynchronously
+        /// </summary>
+        /// <param name="request">
+        /// The request.
+        /// </param>
+        public void Send(ProducerRequest request)
+        {
+            Guard.Assert<ArgumentNullException>(() => request != null);
+            Guard.Assert<ArgumentException>(() => request.MessageSet.Messages.All(x => x.PayloadSize <= this.Config.MaxMessageSize));
+            if (this.callbackHandler != null)
+            {
+                this.Send(request, this.callbackHandler.Handle);
+            }
+            else
+            {
+                connection.BeginWrite(request);
+            }
+        }
+
+        /// <summary>
+        /// Sends request to Kafka server asynchronously
+        /// </summary>
+        /// <param name="request">
+        /// The request.
+        /// </param>
+        /// <param name="callback">
+        /// The callback invoked when a request is finished being sent.
+        /// </param>
+        public void Send(ProducerRequest request, MessageSent<ProducerRequest> callback)
+        {
+            Guard.Assert<ArgumentNullException>(() => request != null);
+            Guard.Assert<ArgumentNullException>(() => request.MessageSet != null);
+            Guard.Assert<ArgumentNullException>(() => request.MessageSet.Messages != null);
+            Guard.Assert<ArgumentException>(
+                () => request.MessageSet.Messages.All(x => x.PayloadSize <= this.Config.MaxMessageSize));
+            
+            connection.BeginWrite(request, callback);
+        }
+
+        /// <summary>
+        /// Constructs request and sent it to Kafka server asynchronously
+        /// </summary>
+        /// <param name="topic">
+        /// The topic.
+        /// </param>
+        /// <param name="partition">
+        /// The partition.
+        /// </param>
+        /// <param name="messages">
+        /// The list of messages to sent.
+        /// </param>
+        public void Send(string topic, int partition, IEnumerable<Message> messages)
+        {
+            Guard.Assert<ArgumentNullException>(() => !string.IsNullOrEmpty(topic));
+            Guard.Assert<ArgumentNullException>(() => messages != null);
+            Guard.Assert<ArgumentException>(() => messages.All(x => x.PayloadSize <= this.Config.MaxMessageSize));
+            
+            this.Send(new ProducerRequest(topic, partition, messages));
+        }
+
+        /// <summary>
+        /// Constructs request and sent it to Kafka server asynchronously
+        /// </summary>
+        /// <param name="topic">
+        /// The topic.
+        /// </param>
+        /// <param name="partition">
+        /// The partition.
+        /// </param>
+        /// <param name="messages">
+        /// The list of messages to sent.
+        /// </param>
+        /// <param name="callback">
+        /// The callback invoked when a request is finished being sent.
+        /// </param>
+        public void Send(string topic, int partition, IEnumerable<Message> messages, MessageSent<ProducerRequest> callback)
+        {
+            Guard.Assert<ArgumentNullException>(() => !string.IsNullOrEmpty(topic));
+            Guard.Assert<ArgumentNullException>(() => messages != null);
+            Guard.Assert<ArgumentException>(() => messages.All(x => x.PayloadSize <= this.Config.MaxMessageSize));
+            
+            this.Send(new ProducerRequest(topic, partition, messages), callback);
+        }
+
+        public void Dispose()
+        {
+            if (connection != null)
+            {
+                connection.Dispose();
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/AsyncProducerPool.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/AsyncProducerPool.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/AsyncProducerPool.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/AsyncProducerPool.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,206 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers.Async
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Globalization;
+    using System.Linq;
+    using System.Reflection;
+    using Kafka.Client.Cfg;
+    using Kafka.Client.Cluster;
+    using Kafka.Client.Messages;
+    using Kafka.Client.Requests;
+    using Kafka.Client.Serialization;
+    using Kafka.Client.Utils;
+    using log4net;
+
+    /// <summary>
+    /// Pool of asynchronous producers used by high-level API
+    /// </summary>
+    /// <typeparam name="TData">The type of the data.</typeparam>
+    internal class AsyncProducerPool<TData> : ProducerPool<TData>
+        where TData : class 
+    {
+        private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+        private readonly IDictionary<int, IAsyncProducer> asyncProducers;
+        
+        /// <summary>
+        /// Factory method used to instantiating asynchronous producer pool
+        /// </summary>
+        /// <param name="config">
+        /// The asynchronous producer pool configuration.
+        /// </param>
+        /// <param name="serializer">
+        /// The serializer.
+        /// </param>
+        /// <param name="cbkHandler">
+        /// The callback invoked after new broker is added.
+        /// </param>
+        /// <returns>
+        /// Instantiated asynchronous producer pool
+        /// </returns>
+        public static AsyncProducerPool<TData> CreateAsyncPool(
+            ProducerConfig config, 
+            IEncoder<TData> serializer, 
+            ICallbackHandler cbkHandler)
+        {
+            return new AsyncProducerPool<TData>(config, serializer, cbkHandler);
+        }
+
+        /// <summary>
+        /// Factory method used to instantiating asynchronous producer pool
+        /// </summary>
+        /// <param name="config">
+        /// The asynchronous producer pool configuration.
+        /// </param>
+        /// <param name="serializer">
+        /// The serializer.
+        /// </param>
+        /// <returns>
+        /// Instantiated asynchronous producer pool
+        /// </returns>
+        public static AsyncProducerPool<TData> CreateAsyncPool(
+            ProducerConfig config,
+            IEncoder<TData> serializer)
+        {
+            return new AsyncProducerPool<TData>(config, serializer);
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="AsyncProducerPool{TData}"/> class. 
+        /// </summary>
+        /// <param name="config">
+        /// The asynchronous producer pool configuration.
+        /// </param>
+        /// <param name="serializer">
+        /// The serializer.
+        /// </param>
+        /// <param name="asyncProducers">
+        /// The list of asynchronous producers.
+        /// </param>
+        /// <param name="cbkHandler">
+        /// The callback invoked after new broker is added.
+        /// </param>
+        /// <remarks>
+        /// Should be used for testing purpose only
+        /// </remarks>
+        private AsyncProducerPool(
+            ProducerConfig config, 
+            IEncoder<TData> serializer, 
+            IDictionary<int, IAsyncProducer> asyncProducers, 
+            ICallbackHandler cbkHandler)
+            : base(config, serializer, cbkHandler)
+        {
+            this.asyncProducers = asyncProducers;
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="AsyncProducerPool{TData}"/> class. 
+        /// </summary>
+        /// <param name="config">
+        /// The asynchronous producer pool configuration.
+        /// </param>
+        /// <param name="serializer">
+        /// The serializer.
+        /// </param>
+        /// <param name="cbkHandler">
+        /// The callback invoked after new broker is added.
+        /// </param>
+        private AsyncProducerPool(
+            ProducerConfig config, 
+            IEncoder<TData> serializer, 
+            ICallbackHandler cbkHandler)
+            : this(config, serializer, new Dictionary<int, IAsyncProducer>(), cbkHandler)
+        {
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="AsyncProducerPool{TData}"/> class. 
+        /// </summary>
+        /// <param name="config">
+        /// The asynchronous producer pool configuration.
+        /// </param>
+        /// <param name="serializer">
+        /// The serializer.
+        /// </param>
+        private AsyncProducerPool(ProducerConfig config, IEncoder<TData> serializer)
+            : this(
+            config,
+            serializer,
+            new Dictionary<int, IAsyncProducer>(),
+            ReflectionHelper.Instantiate<ICallbackHandler>(config.CallbackHandler))
+        {
+        }
+
+        /// <summary>
+        /// Selects an asynchronous producer, for
+        /// the specified broker id and calls the send API on the selected
+        /// producer to publish the data to the specified broker partition.
+        /// </summary>
+        /// <param name="poolData">The producer pool request object.</param>
+        /// <remarks>
+        /// Used for multi-topic request
+        /// </remarks>
+        public override void Send(IEnumerable<ProducerPoolData<TData>> poolData)
+        {
+            Guard.Assert<ArgumentNullException>(() => poolData != null);
+            Dictionary<int, List<ProducerPoolData<TData>>> distinctBrokers = poolData.GroupBy(
+                x => x.BidPid.BrokerId, x => x)
+                .ToDictionary(x => x.Key, x => x.ToList());
+            foreach (var broker in distinctBrokers)
+            {
+                Logger.DebugFormat(CultureInfo.CurrentCulture, "Fetching async producer for broker id: {0}", broker.Key);
+                var producer = this.asyncProducers[broker.Key];
+                IEnumerable<ProducerRequest> requests = broker.Value.Select(x => new ProducerRequest(
+                    x.Topic,
+                    x.BidPid.PartId,
+                    new BufferedMessageSet(x.Data.Select(y => this.Serializer.ToMessage(y)))));
+                foreach (var request in requests)
+                {
+                    producer.Send(request);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Add a new asynchronous producer to the pool.
+        /// </summary>
+        /// <param name="broker">The broker informations.</param>
+        public override void AddProducer(Broker broker)
+        {
+            Guard.Assert<ArgumentNullException>(() => broker != null);
+            var asyncConfig = new AsyncProducerConfig
+                {
+                    Host = broker.Host,
+                    Port = broker.Port,
+                    QueueTime = this.Config.QueueTime,
+                    QueueSize = this.Config.QueueSize,
+                    BatchSize = this.Config.BatchSize,
+                    SerializerClass = this.Config.SerializerClass
+                };
+            var asyncProducer = new AsyncProducer(asyncConfig, this.CallbackHandler);
+            Logger.InfoFormat(
+                CultureInfo.CurrentCulture,
+                "Creating async producer for broker id = {0} at {1}:{2}",
+                broker.Id,
+                broker.Host,
+                broker.Port);
+            this.asyncProducers.Add(broker.Id, asyncProducer);
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/IAsyncProducer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/IAsyncProducer.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/IAsyncProducer.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/IAsyncProducer.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers.Async
+{
+    using System.Collections.Generic;
+    using Kafka.Client.Messages;
+    using Kafka.Client.Requests;
+
+    /// <summary>
+    /// Sends messages encapsulated in request to Kafka server asynchronously
+    /// </summary>
+    public interface IAsyncProducer
+    {
+        /// <summary>
+        /// Sends request to Kafka server asynchronously
+        /// </summary>
+        /// <param name="request">
+        /// The request.
+        /// </param>
+        void Send(ProducerRequest request);
+
+        /// <summary>
+        /// Sends request to Kafka server asynchronously
+        /// </summary>
+        /// <param name="request">
+        /// The request.
+        /// </param>
+        /// <param name="callback">
+        /// The callback invoked when a request is finished being sent.
+        /// </param>
+        void Send(ProducerRequest request, MessageSent<ProducerRequest> callback);
+
+        /// <summary>
+        /// Constructs request and sent it to Kafka server asynchronously
+        /// </summary>
+        /// <param name="topic">
+        /// The topic.
+        /// </param>
+        /// <param name="partition">
+        /// The partition.
+        /// </param>
+        /// <param name="messages">
+        /// The list of messages to sent.
+        /// </param>
+        void Send(string topic, int partition, IEnumerable<Message> messages);
+
+        /// <summary>
+        /// Constructs request and sent it to Kafka server asynchronously
+        /// </summary>
+        /// <param name="topic">
+        /// The topic.
+        /// </param>
+        /// <param name="partition">
+        /// The partition.
+        /// </param>
+        /// <param name="messages">
+        /// The list of messages to sent.
+        /// </param>
+        /// <param name="callback">
+        /// The callback invoked when a request is finished being sent.
+        /// </param>
+        void Send(string topic, int partition, IEnumerable<Message> messages, MessageSent<ProducerRequest> callback);
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/ICallbackHandler.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/ICallbackHandler.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/ICallbackHandler.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/ICallbackHandler.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers.Async
+{
+    using Kafka.Client.Requests;
+
+    /// <summary>
+    /// Performs action when a producer request is finished being sent asynchronously.
+    /// </summary>
+    public interface ICallbackHandler
+    {
+        /// <summary>
+        /// Performs action when a producer request is finished being sent asynchronously.
+        /// </summary>
+        /// <param name="context">
+        /// The sent request context.
+        /// </param>
+        void Handle(RequestContext<ProducerRequest> context);
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/MessageSent.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/MessageSent.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/MessageSent.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/MessageSent.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers.Async
+{
+    using Kafka.Client.Requests;
+
+    /// <summary>
+    /// Callback made when a message request is finished being sent asynchronously.
+    /// </summary>
+    /// <typeparam name="T">
+    /// Must be of type <see cref="AbstractRequest"/> and represents the type of message 
+    /// sent to Kafka.
+    /// </typeparam>
+    /// <param name="request">The request that was sent to the server.</param>
+    public delegate void MessageSent<T>(RequestContext<T> request) where T : AbstractRequest;
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/IProducer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/IProducer.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/IProducer.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/IProducer.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers
+{
+    using System;
+    using System.Collections.Generic;
+
+    /// <summary>
+    /// High-level Producer API that exposing all the producer functionality through a single API to the client
+    /// </summary>
+    /// <typeparam name="TKey">The type of the key.</typeparam>
+    /// <typeparam name="TData">The type of the data.</typeparam>
+    public interface IProducer<TKey, TData> : IDisposable
+        where TKey : class
+        where TData : class 
+    {
+        /// <summary>
+        /// Sends the data to a single topic, partitioned by key, using either the
+        /// synchronous or the asynchronous producer.
+        /// </summary>
+        /// <param name="data">The producer data object that encapsulates the topic, key and message data.</param>
+        void Send(ProducerData<TKey, TData> data);
+
+        /// <summary>
+        /// Sends the data to a multiple topics, partitioned by key, using either the
+        /// synchronous or the asynchronous producer.
+        /// </summary>
+        /// <param name="data">The producer data object that encapsulates the topic, key and message data.</param>
+        void Send(IEnumerable<ProducerData<TKey, TData>> data);
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/IProducerPool.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/IProducerPool.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/IProducerPool.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/IProducerPool.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers
+{
+    using System.Collections.Generic;
+    using Kafka.Client.Cluster;
+
+    /// <summary>
+    /// Pool of producers used by producer high-level API
+    /// </summary>
+    /// <typeparam name="TData">The type of the data.</typeparam>
+    internal interface IProducerPool<TData>
+    {
+        /// <summary>
+        /// Selects either a synchronous or an asynchronous producer, for
+        /// the specified broker id and calls the send API on the selected
+        /// producer to publish the data to the specified broker partition.
+        /// </summary>
+        /// <param name="poolData">The producer pool request object.</param>
+        /// <remarks>
+        /// Used for single-topic request
+        /// </remarks>
+        void Send(ProducerPoolData<TData> poolData);
+
+        /// <summary>
+        /// Selects either a synchronous or an asynchronous producer, for
+        /// the specified broker id and calls the send API on the selected
+        /// producer to publish the data to the specified broker partition.
+        /// </summary>
+        /// <param name="poolData">The producer pool request object.</param>
+        /// <remarks>
+        /// Used for multi-topic request
+        /// </remarks>
+        void Send(IEnumerable<ProducerPoolData<TData>> poolData);
+
+        /// <summary>
+        /// Add a new producer, either synchronous or asynchronous, to the pool
+        /// </summary>
+        /// <param name="broker">The broker informations.</param>
+        void AddProducer(Broker broker);
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ConfigBrokerPartitionInfo.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ConfigBrokerPartitionInfo.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ConfigBrokerPartitionInfo.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ConfigBrokerPartitionInfo.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers.Partitioning
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Globalization;
+    using Kafka.Client.Cfg;
+    using Kafka.Client.Cluster;
+    using Kafka.Client.Utils;
+
+    /// <summary>
+    /// Fetch broker info like ID, host and port from configuration.
+    /// </summary>
+    /// <remarks>
+    /// Used when zookeeper based auto partition discovery is disabled
+    /// </remarks>
+    internal class ConfigBrokerPartitionInfo : IBrokerPartitionInfo
+    {
+        private readonly ProducerConfig config;
+        private IDictionary<int, Broker> brokers;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="ConfigBrokerPartitionInfo"/> class.
+        /// </summary>
+        /// <param name="config">The config.</param>
+        public ConfigBrokerPartitionInfo(ProducerConfig config)
+        {
+            Guard.Assert<ArgumentNullException>(() => config != null);
+            this.config = config;
+            this.InitializeBrokers();
+        }
+
+        /// <summary>
+        /// Gets a mapping from broker ID to the host and port for all brokers
+        /// </summary>
+        /// <returns>
+        /// Mapping from broker ID to the host and port for all brokers
+        /// </returns>
+        public IDictionary<int, Broker> GetAllBrokerInfo()
+        {
+            return this.brokers;
+        }
+
+        /// <summary>
+        /// Gets a mapping from broker ID to partition IDs
+        /// </summary>
+        /// <param name="topic">The topic for which this information is to be returned</param>
+        /// <returns>
+        /// Mapping from broker ID to partition IDs
+        /// </returns>
+        /// <remarks>Partition ID would be allways 0</remarks>
+        public SortedSet<Partition> GetBrokerPartitionInfo(string topic)
+        {
+            Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(topic));
+            var partitions = new SortedSet<Partition>();
+            foreach (var item in this.brokers)
+            {
+                partitions.Add(new Partition(item.Key, 0));
+            }
+
+            return partitions;
+        }
+
+        /// <summary>
+        /// Gets the host and port information for the broker identified by the given broker ID
+        /// </summary>
+        /// <param name="brokerId">The broker ID.</param>
+        /// <returns>
+        /// Host and port of broker
+        /// </returns>
+        public Broker GetBrokerInfo(int brokerId)
+        {
+            return this.brokers.ContainsKey(brokerId) ? this.brokers[brokerId] : null;
+        }
+
+        /// <summary>
+        /// Releasing unmanaged resources if any are used.
+        /// </summary>
+        /// <remarks>Do nothing</remarks>
+        public void Dispose()
+        {
+        }
+
+        /// <summary>
+        /// Initialize list of brokers from configuration
+        /// </summary>
+        private void InitializeBrokers()
+        {
+            if (this.brokers != null)
+            {
+                return;
+            }
+
+            this.brokers = new Dictionary<int, Broker>();
+            string[] brokersInfoList = this.config.BrokerPartitionInfo.Split(',');
+            foreach (string item in brokersInfoList)
+            {
+                var parts = item.Split(':');
+                int id = int.Parse(parts[0], CultureInfo.InvariantCulture);
+                int port = int.Parse(parts[2], CultureInfo.InvariantCulture);
+                this.brokers.Add(id, new Broker(id, parts[1], parts[1], port));
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/DefaultPartitioner.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/DefaultPartitioner.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/DefaultPartitioner.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/DefaultPartitioner.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers.Partitioning
+{
+    using System;
+    using Kafka.Client.Utils;
+
+    /// <summary>
+    /// Default partitioner using hash code to calculate partition
+    /// </summary>
+    /// <typeparam name="TKey">The type of the key.</typeparam>
+    public class DefaultPartitioner<TKey> : IPartitioner<TKey>
+        where TKey : class 
+    {
+        private static readonly Random Randomizer = new Random();
+
+        /// <summary>
+        /// Uses the key to calculate a partition bucket id for routing
+        /// the data to the appropriate broker partition
+        /// </summary>
+        /// <param name="key">The key.</param>
+        /// <param name="numPartitions">The num partitions.</param>
+        /// <returns>ID between 0 and numPartitions-1</returns>
+        /// <remarks>
+        /// Used hash code to calculate partition
+        /// </remarks>
+        public int Partition(TKey key, int numPartitions)
+        {
+            Guard.Assert<ArgumentOutOfRangeException>(() => numPartitions > 0);
+            return key == null 
+                ? Randomizer.Next(numPartitions) 
+                : key.GetHashCode() % numPartitions;
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/IBrokerPartitionInfo.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/IBrokerPartitionInfo.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/IBrokerPartitionInfo.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/IBrokerPartitionInfo.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers.Partitioning
+{
+    using System;
+    using System.Collections.Generic;
+    using Kafka.Client.Cluster;
+
+    /// <summary>
+    /// Retrieves brokers and partitions info
+    /// </summary>
+    internal interface IBrokerPartitionInfo : IDisposable
+    {
+        /// <summary>
+        /// Gets a mapping from broker ID to the host and port for all brokers
+        /// </summary>
+        /// <returns>Mapping from broker ID to the host and port for all brokers</returns>
+        IDictionary<int, Broker> GetAllBrokerInfo();
+
+        /// <summary>
+        /// Gets a mapping from broker ID to partition IDs
+        /// </summary>
+        /// <param name="topic">The topic for which this information is to be returned</param>
+        /// <returns>Mapping from broker ID to partition IDs</returns>
+        SortedSet<Partition> GetBrokerPartitionInfo(string topic);
+
+        /// <summary>
+        /// Gets the host and port information for the broker identified by the given broker ID
+        /// </summary>
+        /// <param name="brokerId">The broker ID.</param>
+        /// <returns>Host and port of broker</returns>
+        Broker GetBrokerInfo(int brokerId);
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/IPartitioner.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/IPartitioner.cs?rev=1173797&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/IPartitioner.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/IPartitioner.cs Wed Sep 21 19:17:19 2011
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+namespace Kafka.Client.Producers.Partitioning
+{
+    /// <summary>
+    /// User-defined partitioner
+    /// </summary>
+    /// <typeparam name="TKey">The type of the key.</typeparam>
+    public interface IPartitioner<TKey>
+        where TKey : class 
+    {
+        /// <summary>
+        /// Uses the key to calculate a partition bucket id for routing
+        /// the data to the appropriate broker partition
+        /// </summary>
+        /// <param name="key">The key.</param>
+        /// <param name="numPartitions">The num partitions.</param>
+        /// <returns>ID between 0 and numPartitions-1</returns>
+        int Partition(TKey key, int numPartitions);
+    }
+}



Mime
View raw message