kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1185772 [1/4] - in /incubator/kafka/trunk/clients/csharp/src/Kafka: Kafka.Client/ Kafka.Client/Cfg/ Kafka.Client/Consumers/ Kafka.Client/Exceptions/ Kafka.Client/Messages/ Kafka.Client/Producers/ Kafka.Client/Producers/Async/ Kafka.Client/...
Date Tue, 18 Oct 2011 17:52:16 GMT
Author: junrao
Date: Tue Oct 18 17:52:13 2011
New Revision: 1185772

URL: http://svn.apache.org/viewvc?rev=1185772&view=rev
Log:
Add compression to C# client; patched by Eric Hauser; KAFKA-153

Modified:
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/IAsyncProducerConfigShared.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ISyncProducerConfigShared.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/Consumer.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ConsumerIterator.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/Fetcher.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/FetcherRunnable.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/IConsumer.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ZookeeperConsumerConnector.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ConsumerTimeoutException.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Kafka.Client.csproj
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaConnection.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/BufferedMessageSet.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/Message.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/MessageSet.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/AsyncProducer.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/AsyncProducerPool.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/IAsyncProducer.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/IProducerPool.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ConfigBrokerPartitionInfo.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/DefaultPartitioner.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Partitioning/ZKBrokerPartitionInfo.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.StrMsg.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Producer.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/ProducerPool.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/ISyncProducer.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducer.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Sync/SyncProducerPool.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/FetchRequest.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiFetchRequest.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/MultiProducerRequest.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/OffsetRequest.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Requests/ProducerRequest.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Serialization/KafkaBinaryReader.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/Guard.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Utils/ReflectionHelper.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperAwareKafkaClientBase.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/IZooKeeperClient.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/BrokerTopicsListener.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/Listeners/ZKSessionExpireListener.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperClient.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperConnection.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/ZooKeeperIntegration/ZooKeeperStringSerializer.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Config/Integration/App.config
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerRebalancingTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ConsumerTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/IntegrationFixtureBase.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/MockAlwaysZeroPartitioner.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ProducerTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestHelper.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/TestMultipleBrokersHelper.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZKBrokerPartitionInfoTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperAwareProducerTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperClientTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/ZooKeeperConnectionTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageSetTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Producers/PartitioningTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiProducerRequestTests.cs
    incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/ProducerRequestTests.cs

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/IAsyncProducerConfigShared.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/IAsyncProducerConfigShared.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/IAsyncProducerConfigShared.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/IAsyncProducerConfigShared.cs Tue Oct 18 17:52:13 2011
@@ -17,24 +17,10 @@
 
 namespace Kafka.Client.Cfg
 {
-    using System.Collections.Generic;
-
     internal interface IAsyncProducerConfigShared
     {
-        int QueueTime { get; set; }
-
-        int QueueSize { get; set; }
-
-        int BatchSize { get; set; }
-
         string SerializerClass { get; set; }
 
-        string CallbackHandler { get; set; }
-
-        string EventHandler { get; set; }
-
-        IDictionary<string, string> CallbackHandlerProps { get; set; }
-
-        IDictionary<string, string> EventHandlerProps { get; set; }
+        string CallbackHandlerClass { get; set; }
     }
 }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ISyncProducerConfigShared.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ISyncProducerConfigShared.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ISyncProducerConfigShared.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Cfg/ISyncProducerConfigShared.cs Tue Oct 18 17:52:13 2011
@@ -25,8 +25,6 @@ namespace Kafka.Client.Cfg
 
         int SocketTimeout { get; set; }
 
-        int ReconnectInterval { get; set; }
-
         int MaxMessageSize { get; set; }
     }
 }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/Consumer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/Consumer.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/Consumer.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/Consumer.cs Tue Oct 18 17:52:13 2011
@@ -20,7 +20,6 @@ namespace Kafka.Client.Consumers
     using System;
     using System.Collections.Generic;
     using System.Globalization;
-    using System.Linq;
     using System.Reflection;
     using Kafka.Client.Cfg;
     using Kafka.Client.Exceptions;
@@ -33,25 +32,32 @@ namespace Kafka.Client.Consumers
     /// The low-level API of consumer of Kafka messages
     /// </summary>
     /// <remarks>
-    /// Maintains a connection to a single broker and has a close correspondence 
+    /// Maintains a connection to a single broker and has a close correspondence
     /// to the network requests sent to the server.
     /// Also, is completely stateless.
     /// </remarks>
     public class Consumer : IConsumer
     {
-        private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);  
+        private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
 
-        private readonly ConsumerConfig config;
+        private readonly ConsumerConfiguration config;
+        private readonly string host;
+        private readonly int port;
 
         /// <summary>
-        /// Gets the server to which the connection is to be established.
+        /// Initializes a new instance of the <see cref="Consumer"/> class.
         /// </summary>
-        public string Host { get; private set; }
+        /// <param name="config">
+        /// The consumer configuration.
+        /// </param>
+        public Consumer(ConsumerConfiguration config)
+        {
+            Guard.NotNull(config, "config");
 
-        /// <summary>
-        /// Gets the port to which the connection is to be established.
-        /// </summary>
-        public int Port { get; private set; }
+            this.config = config;
+            this.host = config.Broker.Host;
+            this.port = config.Broker.Port;
+        }
 
         /// <summary>
         /// Initializes a new instance of the <see cref="Consumer"/> class.
@@ -59,13 +65,15 @@ namespace Kafka.Client.Consumers
         /// <param name="config">
         /// The consumer configuration.
         /// </param>
-        public Consumer(ConsumerConfig config)
+        /// <param name="host"></param>
+        /// <param name="port"></param>
+        public Consumer(ConsumerConfiguration config, string host, int port)
         {
-            Guard.Assert<ArgumentNullException>(() => config != null);
+            Guard.NotNull(config, "config");
 
             this.config = config;
-            this.Host = config.Host;
-            this.Port = config.Port;
+            this.host = host;
+            this.port = port;
         }
 
         /// <summary>
@@ -78,38 +86,41 @@ namespace Kafka.Client.Consumers
         /// A set of fetched messages.
         /// </returns>
         /// <remarks>
-        /// Offset is passed in on every request, allowing the user to maintain this metadata 
+        /// Offset is passed in on every request, allowing the user to maintain this metadata
         /// however they choose.
         /// </remarks>
         public BufferedMessageSet Fetch(FetchRequest request)
         {
-            BufferedMessageSet result = null;
-            using (var conn = new KafkaConnection(this.Host, this.Port))
+            short tryCounter = 1;
+            while (tryCounter <= this.config.NumberOfTries)
             {
-                short tryCounter = 1;
-                bool success = false;
-                while (!success && tryCounter <= this.config.NumberOfTries)
+                try
                 {
-                    try
+                    using (var conn = new KafkaConnection(
+                        this.host,
+                        this.port,
+                        this.config.BufferSize,
+                        this.config.SocketTimeout))
                     {
-                        result = Fetch(conn, request);
-                        success = true;
+                        conn.Write(request);
+                        int size = conn.Reader.ReadInt32();
+                        return BufferedMessageSet.ParseFrom(conn.Reader, size);
                     }
-                    catch (Exception ex)
+                }
+                catch (Exception ex)
+                {
+                    //// if maximum number of tries reached
+                    if (tryCounter == this.config.NumberOfTries)
                     {
-                        //// if maximum number of tries reached
-                        if (tryCounter == this.config.NumberOfTries)
-                        {
-                            throw;
-                        }
-
-                        tryCounter++;
-                        Logger.InfoFormat(CultureInfo.CurrentCulture, "Fetch reconnect due to {0}", ex);
+                        throw;
                     }
+
+                    tryCounter++;
+                    Logger.InfoFormat(CultureInfo.CurrentCulture, "Fetch reconnect due to {0}", ex);
                 }
             }
 
-            return result;
+            return null;
         }
 
         /// <summary>
@@ -128,28 +139,32 @@ namespace Kafka.Client.Consumers
         public IList<BufferedMessageSet> MultiFetch(MultiFetchRequest request)
         {
             var result = new List<BufferedMessageSet>();
-            using (var conn = new KafkaConnection(this.Host, this.Port))
+            short tryCounter = 1;
+            while (tryCounter <= this.config.NumberOfTries)
             {
-                short tryCounter = 1;
-                bool success = false;
-                while (!success && tryCounter <= this.config.NumberOfTries)
+                try
                 {
-                    try
+                    using (var conn = new KafkaConnection(
+                        this.host,
+                        this.port,
+                        this.config.BufferSize,
+                        this.config.SocketTimeout))
                     {
-                        MultiFetch(conn, request, result);
-                        success = true;
+                        conn.Write(request);
+                        int size = conn.Reader.ReadInt32();
+                        return BufferedMessageSet.ParseMultiFrom(conn.Reader, size, request.ConsumerRequests.Count);
                     }
-                    catch (Exception ex)
+                }
+                catch (Exception ex)
+                {
+                    // if maximum number of tries reached
+                    if (tryCounter == this.config.NumberOfTries)
                     {
-                        // if maximum number of tries reached
-                        if (tryCounter == this.config.NumberOfTries)
-                        {
-                            throw;
-                        }
-
-                        tryCounter++;
-                        Logger.InfoFormat(CultureInfo.CurrentCulture, "MultiFetch reconnect due to {0}", ex);
+                        throw;
                     }
+
+                    tryCounter++;
+                    Logger.InfoFormat(CultureInfo.CurrentCulture, "MultiFetch reconnect due to {0}", ex);
                 }
             }
 
@@ -167,122 +182,54 @@ namespace Kafka.Client.Consumers
         /// </returns>
         public IList<long> GetOffsetsBefore(OffsetRequest request)
         {
-            var offsets = new List<long>();
-            using (var conn = new KafkaConnection(this.Host, this.Port))
-            {
-                short tryCounter = 1;
-                bool success = false;
-                while (!success && tryCounter <= this.config.NumberOfTries)
-                {
-                    try
-                    {
-                        GetOffsetsBefore(conn, request, offsets);
-                        success = true;
-                    }
-                    catch (Exception ex)
-                    {
-                        // if maximum number of tries reached
-                        if (tryCounter == this.config.NumberOfTries)
+            var result = new List<long>();
+            short tryCounter = 1;
+            while (tryCounter <= this.config.NumberOfTries)
+            {
+                try
+                {
+                    using (var conn = new KafkaConnection(
+                        this.host,
+                        this.port,
+                        this.config.BufferSize,
+                        this.config.SocketTimeout))
+                    {
+                        conn.Write(request);
+                        int size = conn.Reader.ReadInt32();
+                        if (size == 0)
                         {
-                            throw;
+                            return result;
                         }
 
-                        tryCounter++;
-                        Logger.InfoFormat(CultureInfo.CurrentCulture, "GetOffsetsBefore reconnect due to {0}", ex);
-                    }
-                }
-            }
-
-            return offsets;
-        }
-
-        private static BufferedMessageSet Fetch(KafkaConnection conn, FetchRequest request)
-        {
-            conn.Write(request);
-            int dataLength = BitConverter.ToInt32(BitWorks.ReverseBytes(conn.Read(4)), 0);
-            if (dataLength > 0)
-            {
-                byte[] data = conn.Read(dataLength);
-
-                int errorCode = BitConverter.ToInt16(BitWorks.ReverseBytes(data.Take(2).ToArray()), 0);
-                if (errorCode != KafkaException.NoError)
-                {
-                    throw new KafkaException(errorCode);
-                }
-
-                // skip the error code
-                byte[] unbufferedData = data.Skip(2).ToArray();
-                return BufferedMessageSet.ParseFrom(unbufferedData);
-            }
-
-            return null;
-        }
-
-        private static void MultiFetch(KafkaConnection conn, MultiFetchRequest request, IList<BufferedMessageSet> result)
-        {
-            result.Clear();
-            conn.Write(request);
-            int dataLength = BitConverter.ToInt32(BitWorks.ReverseBytes(conn.Read(4)), 0);
-            if (dataLength <= 0)
-            {
-                return;
-            }
-
-            byte[] data = conn.Read(dataLength);
+                        short errorCode = conn.Reader.ReadInt16();
+                        if (errorCode != KafkaException.NoError)
+                        {
+                            throw new KafkaException(errorCode);
+                        }
 
-            int errorCode = BitConverter.ToInt16(BitWorks.ReverseBytes(data.Take(2).ToArray()), 0);
-            if (errorCode != KafkaException.NoError)
-            {
-                throw new KafkaException(errorCode);
-            }
+                        int count = conn.Reader.ReadInt32();
+                        for (int i = 0; i < count; i++)
+                        {
+                            result.Add(conn.Reader.ReadInt64());
+                        }
 
-            // skip the error code
-            byte[] unbufferedData = data.Skip(2).ToArray();
-            for (int i = 0; i < request.ConsumerRequests.Count; i++)
-            {
-                int partLength = BitConverter.ToInt32(BitWorks.ReverseBytes(unbufferedData.Take(4).ToArray()), 0);
-                errorCode = BitConverter.ToInt16(BitWorks.ReverseBytes(unbufferedData.Skip(4).Take(2).ToArray()), 0);
-                if (errorCode != KafkaException.NoError)
-                {
-                    throw new KafkaException(errorCode);
+                        return result;
+                    }
                 }
-
-                result.Add(BufferedMessageSet.ParseFrom(unbufferedData.Skip(6).Take(partLength - 2).ToArray()));
-                unbufferedData = unbufferedData.Skip(partLength + 4).ToArray();
-            }
-        }
-
-        private static void GetOffsetsBefore(KafkaConnection conn, OffsetRequest request, IList<long> offsets)
-        {
-            offsets.Clear(); // to make sure the list is clean after some previous attampts to get data
-            conn.Write(request);
-            int dataLength = BitConverter.ToInt32(BitWorks.ReverseBytes(conn.Read(4)), 0);
-
-            if (dataLength > 0)
-            {
-                byte[] data = conn.Read(dataLength);
-
-                int errorCode = BitConverter.ToInt16(BitWorks.ReverseBytes(data.Take(2).ToArray()), 0);
-                if (errorCode != KafkaException.NoError)
+                catch (Exception ex)
                 {
-                    throw new KafkaException(errorCode);
-                }
-
-                // skip the error code and process the rest
-                byte[] unbufferedData = data.Skip(2).ToArray();
-
-                // first four bytes are the number of offsets
-                int numOfOffsets =
-                    BitConverter.ToInt32(BitWorks.ReverseBytes(unbufferedData.Take(4).ToArray()), 0);
+                    //// if maximum number of tries reached
+                    if (tryCounter == this.config.NumberOfTries)
+                    {
+                        throw;
+                    }
 
-                for (int ix = 0; ix < numOfOffsets; ix++)
-                {
-                    int position = (ix * 8) + 4;
-                    offsets.Add(
-                        BitConverter.ToInt64(
-                            BitWorks.ReverseBytes(unbufferedData.Skip(position).Take(8).ToArray()), 0));
+                    tryCounter++;
+                    Logger.InfoFormat(CultureInfo.CurrentCulture, "GetOffsetsBefore reconnect due to {0}", ex);
                 }
             }
+
+            return result;
         }
     }
 }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ConsumerIterator.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ConsumerIterator.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ConsumerIterator.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ConsumerIterator.cs Tue Oct 18 17:52:13 2011
@@ -40,8 +40,10 @@ namespace Kafka.Client.Consumers
         private readonly int consumerTimeoutMs;
         private PartitionTopicInfo currentTopicInfo;
         private ConsumerIteratorState state = ConsumerIteratorState.NotReady;
-        private IEnumerator<Message> current;
+        private IEnumerator<MessageAndOffset> current;
+        private FetchedDataChunk currentDataChunk = null;
         private Message nextItem;
+        private long consumedOffset = -1;
 
         /// <summary>
         /// Initializes a new instance of the <see cref="ConsumerIterator"/> class.
@@ -70,17 +72,27 @@ namespace Kafka.Client.Consumers
             {
                 if (!MoveNext())
                 {
-                    throw new Exception("No element");
+                    throw new NoSuchElementException();
                 }
 
                 state = ConsumerIteratorState.NotReady;
                 if (nextItem != null)
                 {
-                    currentTopicInfo.Consumed(MessageSet.GetEntrySize(nextItem));
+                    if (consumedOffset < 0)
+                    {
+                        throw new IllegalStateException(String.Format(CultureInfo.CurrentCulture, "Offset returned by the message set is invalid {0}.", consumedOffset));
+                    }
+
+                    currentTopicInfo.ResetConsumeOffset(consumedOffset);
+                    if (Logger.IsDebugEnabled)
+                    {
+                        Logger.DebugFormat(CultureInfo.CurrentCulture, "Setting consumed offset to {0}", consumedOffset);
+                    }
+
                     return nextItem;
                 }
 
-                throw new Exception("Expected item but none found.");
+                throw new IllegalStateException("Expected item but none found.");
             }
         }
 
@@ -105,7 +117,7 @@ namespace Kafka.Client.Consumers
         {
             if (state == ConsumerIteratorState.Failed)
             {
-                throw new Exception("Iterator is in failed state");
+                throw new IllegalStateException("Iterator is in failed state");
             }
             
             switch (state)
@@ -148,14 +160,13 @@ namespace Kafka.Client.Consumers
         {
             if (current == null || !current.MoveNext())
             {
-                FetchedDataChunk found;
                 if (consumerTimeoutMs < 0)
                 {
-                    found = this.channel.Take();
+                    currentDataChunk = this.channel.Take();
                 }
                 else
                 {
-                    bool done = channel.TryTake(out found, consumerTimeoutMs);
+                    bool done = channel.TryTake(out currentDataChunk, consumerTimeoutMs);
                     if (!done)
                     {
                         Logger.Debug("Consumer iterator timing out...");
@@ -163,30 +174,32 @@ namespace Kafka.Client.Consumers
                     }
                 }
 
-                if (found.Equals(ZookeeperConsumerConnector.ShutdownCommand))
+                if (currentDataChunk.Equals(ZookeeperConsumerConnector.ShutdownCommand))
                 {
                     Logger.Debug("Received the shutdown command");
-                    channel.Add(found);
+                    channel.Add(currentDataChunk);
                     return this.AllDone();
                 }
 
-                currentTopicInfo = found.TopicInfo;
-                if (currentTopicInfo.GetConsumeOffset() != found.FetchOffset)
+                currentTopicInfo = currentDataChunk.TopicInfo;
+                if (currentTopicInfo.GetConsumeOffset() != currentDataChunk.FetchOffset)
                 {
                     Logger.ErrorFormat(
                         CultureInfo.CurrentCulture,
                         "consumed offset: {0} doesn't match fetch offset: {1} for {2}; consumer may lose data",
                         currentTopicInfo.GetConsumeOffset(),
-                        found.FetchOffset,
+                        currentDataChunk.FetchOffset,
                         currentTopicInfo);
-                    currentTopicInfo.ResetConsumeOffset(found.FetchOffset);
+                    currentTopicInfo.ResetConsumeOffset(currentDataChunk.FetchOffset);
                 }
 
-                current = found.Messages.Messages.GetEnumerator();
+                current = currentDataChunk.Messages.GetEnumerator();
                 current.MoveNext();
             }
 
-            return current.Current;
+            var item = current.Current;
+            consumedOffset = item.Offset;
+            return item.Message;
         }
 
         private Message AllDone()

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/Fetcher.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/Fetcher.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/Fetcher.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/Fetcher.cs Tue Oct 18 17:52:13 2011
@@ -33,7 +33,7 @@ namespace Kafka.Client.Consumers
     internal class Fetcher : IDisposable
     {
         private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
-        private readonly ConsumerConfig config;
+        private readonly ConsumerConfiguration config;
         private readonly IZooKeeperClient zkClient;
         private FetcherRunnable[] fetcherWorkerObjects;
         private volatile bool disposed;
@@ -48,7 +48,7 @@ namespace Kafka.Client.Consumers
         /// <param name="zkClient">
         /// The wrapper above ZooKeeper client.
         /// </param>
-        public Fetcher(ConsumerConfig config, IZooKeeperClient zkClient)
+        public Fetcher(ConsumerConfiguration config, IZooKeeperClient zkClient)
         {
             this.config = config;
             this.zkClient = zkClient;

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/FetcherRunnable.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/FetcherRunnable.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/FetcherRunnable.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/FetcherRunnable.cs Tue Oct 18 17:52:13 2011
@@ -42,7 +42,7 @@ namespace Kafka.Client.Consumers
 
         private readonly IZooKeeperClient zkClient;
 
-        private readonly ConsumerConfig config;
+        private readonly ConsumerConfiguration config;
 
         private readonly Broker broker;
 
@@ -52,14 +52,15 @@ namespace Kafka.Client.Consumers
 
         private bool shouldStop;
 
-        internal FetcherRunnable(string name, IZooKeeperClient zkClient, ConsumerConfig config, Broker broker, List<PartitionTopicInfo> partitionTopicInfos)
+        internal FetcherRunnable(string name, IZooKeeperClient zkClient, ConsumerConfiguration config, Broker broker, List<PartitionTopicInfo> partitionTopicInfos)
         {
             this.name = name;
             this.zkClient = zkClient;
             this.config = config;
             this.broker = broker;
             this.partitionTopicInfos = partitionTopicInfos;
-            this.simpleConsumer = new Consumer(this.config);
+
+            this.simpleConsumer = new Consumer(this.config, broker.Host, broker.Port);
         }
 
         /// <summary>
@@ -87,7 +88,7 @@ namespace Kafka.Client.Consumers
                     var requestList = new List<FetchRequest>();
                     foreach (var partitionTopicInfo in this.partitionTopicInfos)
                     {
-                        var singleRequest = new FetchRequest(partitionTopicInfo.Topic, partitionTopicInfo.Partition.PartId, partitionTopicInfo.GetFetchOffset(), this.config.FetchSize);
+                        var singleRequest = new FetchRequest(partitionTopicInfo.Topic, partitionTopicInfo.Partition.PartId, partitionTopicInfo.GetFetchOffset(), this.config.MaxFetchSize);
                         requestList.Add(singleRequest);
                     }
 
@@ -138,8 +139,8 @@ namespace Kafka.Client.Consumers
                     Logger.Info("Fetched bytes: " + read);
                     if (read == 0)
                     {
-                        Logger.DebugFormat(CultureInfo.CurrentCulture, "backing off {0} ms", this.config.BackOffIncrementMs);
-                        Thread.Sleep(this.config.BackOffIncrementMs);
+                        Logger.DebugFormat(CultureInfo.CurrentCulture, "backing off {0} ms", this.config.BackOffIncrement);
+                        Thread.Sleep(this.config.BackOffIncrement);
                     }
                 }
             }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/IConsumer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/IConsumer.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/IConsumer.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/IConsumer.cs Tue Oct 18 17:52:13 2011
@@ -31,16 +31,6 @@ namespace Kafka.Client.Consumers
     public interface IConsumer
     {
         /// <summary>
-        /// Gets the server to which the connection is to be established.
-        /// </summary>
-        string Host { get; }
-
-        /// <summary>
-        /// Gets the port to which the connection is to be established.
-        /// </summary>
-        int Port { get; }
-
-        /// <summary>
         /// Fetch a set of messages from a topic.
         /// </summary>
         /// <param name="request">

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ZookeeperConsumerConnector.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ZookeeperConsumerConnector.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ZookeeperConsumerConnector.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumers/ZookeeperConsumerConnector.cs Tue Oct 18 17:52:13 2011
@@ -33,7 +33,7 @@ namespace Kafka.Client.Consumers
     /// The consumer high-level API, that hides the details of brokers from the consumer. 
     /// It also maintains the state of what has been consumed. 
     /// </summary>
-    public class ZookeeperConsumerConnector : ZooKeeperAwareKafkaClientBase, IConsumerConnector
+    public class ZookeeperConsumerConnector : KafkaClientBase, IConsumerConnector
     {
         private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
         
@@ -41,7 +41,7 @@ namespace Kafka.Client.Consumers
         
         internal static readonly FetchedDataChunk ShutdownCommand = new FetchedDataChunk(null, null, -1);
 
-        private readonly ConsumerConfig config;
+        private readonly ConsumerConfiguration config;
        
         private IZooKeeperClient zkClient;
        
@@ -79,8 +79,7 @@ namespace Kafka.Client.Consumers
         /// <param name="enableFetcher">
         /// Indicates whether fetchers should be enabled
         /// </param>
-        public ZookeeperConsumerConnector(ConsumerConfig config, bool enableFetcher)
-            : base(config)
+        public ZookeeperConsumerConnector(ConsumerConfiguration config, bool enableFetcher)
         {
             this.config = config;
             this.enableFetcher = enableFetcher;
@@ -89,8 +88,8 @@ namespace Kafka.Client.Consumers
 
             if (this.config.AutoCommit)
             {
-                Logger.InfoFormat(CultureInfo.CurrentCulture, "starting auto committer every {0} ms", this.config.AutoCommitIntervalMs);
-                scheduler.ScheduleWithRate(this.AutoCommit, this.config.AutoCommitIntervalMs, this.config.AutoCommitIntervalMs);
+                Logger.InfoFormat(CultureInfo.CurrentCulture, "starting auto committer every {0} ms", this.config.AutoCommitInterval);
+                scheduler.ScheduleWithRate(this.AutoCommit, this.config.AutoCommitInterval, this.config.AutoCommitInterval);
             }
         }
 
@@ -210,8 +209,8 @@ namespace Kafka.Client.Consumers
 
         private void ConnectZk()
         {
-            Logger.InfoFormat(CultureInfo.CurrentCulture, "Connecting to zookeeper instance at {0}", this.config.ZkConnect);
-            this.zkClient = new ZooKeeperClient(this.config.ZkConnect, this.config.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer);
+            Logger.InfoFormat(CultureInfo.CurrentCulture, "Connecting to zookeeper instance at {0}", this.config.ZooKeeper.ZkConnect);
+            this.zkClient = new ZooKeeperClient(this.config.ZooKeeper.ZkConnect, this.config.ZooKeeper.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer);
             this.zkClient.Connect();
         }
 

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ConsumerTimeoutException.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ConsumerTimeoutException.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ConsumerTimeoutException.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Exceptions/ConsumerTimeoutException.cs Tue Oct 18 17:52:13 2011
@@ -16,6 +16,9 @@
  */
 
 using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
 
 namespace Kafka.Client.Exceptions
 {

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Kafka.Client.csproj
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Kafka.Client.csproj?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Kafka.Client.csproj (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Kafka.Client.csproj Tue Oct 18 17:52:13 2011
@@ -109,17 +109,19 @@
     </Reference>
   </ItemGroup>
   <ItemGroup>
-    <Compile Include="Cfg\AsyncProducerConfig.cs" />
-    <Compile Include="Cfg\BrokerPartitionInfo.cs" />
-    <Compile Include="Cfg\BrokerPartitionInfoCollection.cs" />
-    <Compile Include="Cfg\Consumer.cs" />
-    <Compile Include="Cfg\ConsumerConfig.cs" />
+    <Compile Include="Cfg\AsyncProducerConfiguration.cs" />
+    <Compile Include="Cfg\BrokerConfiguration.cs" />
+    <Compile Include="Cfg\BrokerConfigurationElement.cs" />
+    <Compile Include="Cfg\BrokerConfigurationElementCollection.cs" />
+    <Compile Include="Cfg\ConsumerConfigurationSection.cs" />
+    <Compile Include="Cfg\ConsumerConfiguration.cs" />
     <Compile Include="Cfg\IAsyncProducerConfigShared.cs" />
     <Compile Include="Cfg\ISyncProducerConfigShared.cs" />
-    <Compile Include="Cfg\KafkaClientConfiguration.cs" />
-    <Compile Include="Cfg\KafkaServer.cs" />
-    <Compile Include="Cfg\ProducerConfig.cs" />
-    <Compile Include="Cfg\ZooKeeperServers.cs" />
+    <Compile Include="Cfg\ProducerConfiguration.cs" />
+    <Compile Include="Cfg\ProducerConfigurationSection.cs" />
+    <Compile Include="Cfg\ZooKeeperConfigurationElement.cs" />
+    <Compile Include="Cfg\ZooKeeperServerConfigurationElement.cs" />
+    <Compile Include="Cfg\ZooKeeperServerConfigurationElementCollection.cs" />
     <Compile Include="Cluster\Cluster.cs" />
     <Compile Include="Cluster\Partition.cs" />
     <Compile Include="Consumers\Consumer.cs" />
@@ -135,7 +137,11 @@
     <Compile Include="Consumers\TopicCount.cs" />
     <Compile Include="Consumers\ZookeeperConsumerConnector.cs" />
     <Compile Include="Exceptions\ConsumerTimeoutException.cs" />
+    <Compile Include="Exceptions\IllegalStateException.cs" />
+    <Compile Include="Exceptions\InvalidMessageSizeException.cs" />
     <Compile Include="Exceptions\MessageSizeTooLargeException.cs" />
+    <Compile Include="Exceptions\NoSuchElementException.cs" />
+    <Compile Include="Exceptions\UnknownCodecException.cs" />
     <Compile Include="Exceptions\ZKRebalancerException.cs" />
     <Compile Include="Exceptions\ZooKeeperException.cs" />
     <Compile Include="Exceptions\ZooKeeperTimeoutException.cs" />
@@ -145,7 +151,12 @@
     <Compile Include="Exceptions\KafkaException.cs">
       <SubType>Code</SubType>
     </Compile>
+    <Compile Include="KafkaStopWatch.cs" />
     <Compile Include="Messages\BoundedBuffer.cs" />
+    <Compile Include="Messages\CompressionCodec.cs" />
+    <Compile Include="Messages\CompressionCodecs.cs" />
+    <Compile Include="Messages\CompressionUtils.cs" />
+    <Compile Include="Messages\MessageAndOffset.cs" />
     <Compile Include="Producers\Async\AsyncProducerPool.cs" />
     <Compile Include="Producers\Async\MessageSent.cs" />
     <Compile Include="Producers\Producer.StrMsg.cs" />
@@ -153,8 +164,8 @@
     <Compile Include="Serialization\StringEncoder.cs" />
     <Compile Include="Serialization\IWritable.cs" />
     <Compile Include="Producers\ProducerTypes.cs" />
-    <Compile Include="Cfg\SyncProducerConfig.cs" />
-    <Compile Include="Cfg\ZKConfig.cs" />
+    <Compile Include="Cfg\SyncProducerConfiguration.cs" />
+    <Compile Include="Cfg\ZooKeeperConfiguration.cs" />
     <Compile Include="Cluster\Broker.cs" />
     <Compile Include="Producers\Partitioning\ConfigBrokerPartitionInfo.cs" />
     <Compile Include="Producers\Partitioning\DefaultPartitioner.cs" />
@@ -224,7 +235,11 @@
     <Compile Include="ZooKeeperIntegration\IZooKeeperSerializer.cs" />
     <Compile Include="ZooKeeperIntegration\ZooKeeperStringSerializer.cs" />
   </ItemGroup>
-  <ItemGroup />
+  <ItemGroup>
+    <None Include="..\..\..\Settings.StyleCop">
+      <Link>Settings.StyleCop</Link>
+    </None>
+  </ItemGroup>
   <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
   <Import Project="..\..\..\lib\StyleCop\Microsoft.StyleCop.Targets" />
-</Project>
\ No newline at end of file
+</Project>

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaConnection.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaConnection.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaConnection.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaConnection.cs Tue Oct 18 17:52:13 2011
@@ -23,6 +23,7 @@ namespace Kafka.Client
     using System.Threading;
     using Kafka.Client.Producers.Async;
     using Kafka.Client.Requests;
+    using Kafka.Client.Serialization;
     using Kafka.Client.Utils;
 
     /// <summary>
@@ -30,9 +31,10 @@ namespace Kafka.Client
     /// </summary>
     public class KafkaConnection : IDisposable
     {
-        /// <summary>
-        /// TCP client that connects to the server.
-        /// </summary>
+        private readonly int bufferSize;
+
+        private readonly int socketTimeout;
+
         private readonly TcpClient client;
 
         private volatile bool disposed;
@@ -42,96 +44,25 @@ namespace Kafka.Client
         /// </summary>
         /// <param name="server">The server to connect to.</param>
         /// <param name="port">The port to connect to.</param>
-        public KafkaConnection(string server, int port)
+        public KafkaConnection(string server, int port, int bufferSize, int socketTimeout)
         {
-            Server = server;
-            Port = port;
+            this.bufferSize = bufferSize;
+            this.socketTimeout = socketTimeout;
 
             // connection opened
-            client = new TcpClient(server, port);
-        }
-
-        /// <summary>
-        /// Gets the server to which the connection is to be established.
-        /// </summary>
-        public string Server { get; private set; }
-        
-        /// <summary>
-        /// Gets the port to which the connection is to be established.
-        /// </summary>
-        public int Port { get; private set; }
-
-        /// <summary>
-        /// Readds data from the server.
-        /// </summary>
-        /// <remarks>
-        /// Defauls the amount of time that a read operation blocks waiting for data to <see cref="Timeout.Infinite"/>.
-        /// </remarks>
-        /// <param name="size">The number of bytes to read from the server.</param>
-        /// <returns>The data read from the server as a byte array.</returns>
-        public byte[] Read(int size)
-        {
-            this.EnsuresNotDisposed();
-            return Read(size, Timeout.Infinite);
-        }
-
-        /// <summary>
-        /// Readds data from the server.
-        /// </summary>
-        /// <param name="size">The number of bytes to read from the server.</param>
-        /// <param name="readTimeout">The amount of time that a read operation blocks waiting for data.</param>
-        /// <returns>The data read from the server as a byte array.</returns>
-        public byte[] Read(int size, int readTimeout)
-        {
-            this.EnsuresNotDisposed();
-            byte[] bytes;
-            NetworkStream stream = client.GetStream();
-            stream.ReadTimeout = readTimeout;
-            int numberOfTries = 0;
-
-            int readSize = size;
-            if (client.ReceiveBufferSize < size)
-            {
-                readSize = client.ReceiveBufferSize;
-            }
-
-            using (var ms = new MemoryStream())
-            {
-                var bytesToRead = new byte[client.ReceiveBufferSize];
-
-                while (true)
+            this.client = new TcpClient(server, port)
                 {
-                    int numberOfBytesRead = stream.Read(bytesToRead, 0, readSize);
-                    if (numberOfBytesRead > 0)
-                    {
-                        ms.Write(bytesToRead, 0, numberOfBytesRead);
-                    }
-
-                    if (ms.Length >= size)
-                    {
-                        break;
-                    }
-
-                    if (numberOfBytesRead == 0)
-                    {
-                        if (numberOfTries >= 1000)
-                        {
-                            break;
-                        }
-
-                        numberOfTries++;
-                        Thread.Sleep(10);
-                    }
-                }
-
-                bytes = new byte[ms.Length];
-                ms.Seek(0, SeekOrigin.Begin);
-                ms.Read(bytes, 0, (int)ms.Length);
-            }
-
-            return bytes;
+                    ReceiveTimeout = socketTimeout,
+                    SendTimeout = socketTimeout,
+                    ReceiveBufferSize = bufferSize,
+                    SendBufferSize = bufferSize
+                };
+            var stream = this.client.GetStream();
+            this.Reader = new KafkaBinaryReader(stream);
         }
 
+        public KafkaBinaryReader Reader { get; private set; }
+
         /// <summary>
         /// Writes a producer request to the server asynchronously.
         /// </summary>
@@ -139,7 +70,7 @@ namespace Kafka.Client
         public void BeginWrite(ProducerRequest request)
         {
             this.EnsuresNotDisposed();
-            Guard.Assert<ArgumentNullException>(() => request != null);
+            Guard.NotNull(request, "request");
             NetworkStream stream = client.GetStream();
             byte[] data = request.RequestBuffer.GetBuffer();
             stream.BeginWrite(data, 0, data.Length, asyncResult => ((NetworkStream)asyncResult.AsyncState).EndWrite(asyncResult), stream);
@@ -157,7 +88,7 @@ namespace Kafka.Client
         public void BeginWrite(ProducerRequest request, MessageSent<ProducerRequest> callback)
         {
             this.EnsuresNotDisposed();
-            Guard.Assert<ArgumentNullException>(() => request != null);
+            Guard.NotNull(request, "request");
             if (callback == null)
             {
                 this.BeginWrite(request);
@@ -191,8 +122,8 @@ namespace Kafka.Client
         public void Write(ProducerRequest request)
         {
             this.EnsuresNotDisposed();
-            Guard.Assert<ArgumentNullException>(() => request != null);
-            this.Write(request.RequestBuffer.GetBuffer(), Timeout.Infinite);
+            Guard.NotNull(request, "request");
+            this.Write(request.RequestBuffer.GetBuffer());
         }
 
         /// <summary>
@@ -205,21 +136,18 @@ namespace Kafka.Client
         public void Write(MultiProducerRequest request)
         {
             this.EnsuresNotDisposed();
-            Guard.Assert<ArgumentNullException>(() => request != null);
-            this.Write(request.RequestBuffer.GetBuffer(), Timeout.Infinite);
+            Guard.NotNull(request, "request");
+            this.Write(request.RequestBuffer.GetBuffer());
         }
 
         /// <summary>
         /// Writes data to the server.
         /// </summary>
         /// <param name="data">The data to write to the server.</param>
-        /// <param name="writeTimeout">The amount of time that a write operation blocks waiting for data.</param>
-        private void Write(byte[] data, int writeTimeout)
+        private void Write(byte[] data)
         {
             NetworkStream stream = this.client.GetStream();
-            stream.WriteTimeout = writeTimeout;
-
-            // Send the message to the connected TcpServer. 
+            //// Send the message to the connected TcpServer. 
             stream.Write(data, 0, data.Length);
         }
 
@@ -233,8 +161,8 @@ namespace Kafka.Client
         public void Write(FetchRequest request)
         {
             this.EnsuresNotDisposed();
-            Guard.Assert<ArgumentNullException>(() => request != null);
-            this.Write(request.RequestBuffer.GetBuffer(), Timeout.Infinite);
+            Guard.NotNull(request, "request");
+            this.Write(request.RequestBuffer.GetBuffer());
         }
 
         /// <summary>
@@ -247,8 +175,8 @@ namespace Kafka.Client
         public void Write(MultiFetchRequest request)
         {
             this.EnsuresNotDisposed();
-            Guard.Assert<ArgumentNullException>(() => request != null);
-            this.Write(request.RequestBuffer.GetBuffer(), Timeout.Infinite);
+            Guard.NotNull(request, "request");
+            this.Write(request.RequestBuffer.GetBuffer());
         }
 
         /// <summary>
@@ -261,8 +189,8 @@ namespace Kafka.Client
         public void Write(OffsetRequest request)
         {
             this.EnsuresNotDisposed();
-            Guard.Assert<ArgumentNullException>(() => request != null);
-            this.Write(request.RequestBuffer.GetBuffer(), Timeout.Infinite);
+            Guard.NotNull(request, "request");
+            this.Write(request.RequestBuffer.GetBuffer());
         }
 
         /// <summary>
@@ -278,7 +206,6 @@ namespace Kafka.Client
             this.disposed = true;
             if (this.client != null)
             {
-                this.client.GetStream().Close();
                 this.client.Close();
             }
         }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/BufferedMessageSet.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/BufferedMessageSet.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/BufferedMessageSet.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/BufferedMessageSet.cs Tue Oct 18 17:52:13 2011
@@ -15,21 +15,39 @@
  * limitations under the License.
  */
 
+using Kafka.Client.Exceptions;
+
 namespace Kafka.Client.Messages
 {
     using System;
+    using System.Collections;
     using System.Collections.Generic;
+    using System.Globalization;
     using System.IO;
     using System.Linq;
+    using System.Reflection;
     using System.Text;
+    using Kafka.Client.Consumers;
     using Kafka.Client.Serialization;
     using Kafka.Client.Utils;
+    using log4net;
 
     /// <summary>
     /// A collection of messages stored as memory stream
     /// </summary>
-    public class BufferedMessageSet : MessageSet
+    public class BufferedMessageSet : MessageSet, IEnumerable<MessageAndOffset>, IEnumerator<MessageAndOffset>
     {
+        private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+        private MemoryStream topIter;
+        private int topIterPosition;
+        private long currValidBytes = 0;
+        private IEnumerator<MessageAndOffset> innerIter = null;
+        private long lastMessageSize = 0;
+        private long deepValidByteCount = -1;
+        private long shallowValidByteCount = -1;
+        private ConsumerIteratorState state = ConsumerIteratorState.NotReady;
+        private MessageAndOffset nextItem;
+
         /// <summary>
         /// Gets the error code
         /// </summary>
@@ -41,7 +59,8 @@ namespace Kafka.Client.Messages
         /// <param name="messages">
         /// The list of messages.
         /// </param>
-        public BufferedMessageSet(IEnumerable<Message> messages) : this(messages, ErrorMapping.NoError)
+        public BufferedMessageSet(IEnumerable<Message> messages)
+            : this(messages, ErrorMapping.NoError)
         {
         }
 
@@ -58,15 +77,34 @@ namespace Kafka.Client.Messages
         {
             int length = GetMessageSetSize(messages);
             this.Messages = messages;
-            this.SetBuffer = new BoundedBuffer(length);
-            this.WriteTo(this.SetBuffer);
             this.ErrorCode = errorCode;
+            this.topIterPosition = 0;
         }
 
         /// <summary>
-        /// Gets set internal buffer
+        /// Initializes a new instance of the <see cref="BufferedMessageSet"/> class with compression.
         /// </summary>
-        public MemoryStream SetBuffer { get; private set; }
+        /// <param name="compressionCodec">compression method</param>
+        /// <param name="messages">messages to add</param>
+        public BufferedMessageSet(CompressionCodecs compressionCodec, IEnumerable<Message> messages)
+        {
+            IEnumerable<Message> messagesToAdd;
+            switch (compressionCodec)
+            {
+                case CompressionCodecs.NoCompressionCodec:
+                    messagesToAdd = messages;
+                    break;
+                default:
+                    var message = CompressionUtils.Compress(messages, compressionCodec);
+                    messagesToAdd = new List<Message>() { message };
+                    break;
+            }
+
+            int length = GetMessageSetSize(messagesToAdd);
+            this.Messages = messagesToAdd;
+            this.ErrorCode = ErrorMapping.NoError;
+            this.topIterPosition = 0;
+        }
 
         /// <summary>
         /// Gets the list of messages.
@@ -78,12 +116,33 @@ namespace Kafka.Client.Messages
         /// </summary>
         public override int SetSize
         {
-            get 
+            get { return GetMessageSetSize(this.Messages); }
+        }
+
+        public MessageAndOffset Current
+        {
+            get
             {
-                return (int)this.SetBuffer.Length;
+                if (!MoveNext())
+                {
+                    throw new NoSuchElementException();
+                }
+
+                state = ConsumerIteratorState.NotReady;
+                if (nextItem != null)
+                {
+                    return nextItem;
+                }
+
+                throw new IllegalStateException("Expected item but none found.");
             }
         }
 
+        object IEnumerator.Current
+        {
+            get { return this.Current; }
+        }
+
         /// <summary>
         /// Writes content into given stream
         /// </summary>
@@ -92,7 +151,7 @@ namespace Kafka.Client.Messages
         /// </param>
         public sealed override void WriteTo(MemoryStream output)
         {
-            Guard.Assert<ArgumentNullException>(() => output != null);
+            Guard.NotNull(output, "output");
             using (var writer = new KafkaBinaryWriter(output))
             {
                 this.WriteTo(writer);
@@ -107,7 +166,7 @@ namespace Kafka.Client.Messages
         /// </param>
         public sealed override void WriteTo(KafkaBinaryWriter writer)
         {
-            Guard.Assert<ArgumentNullException>(() => writer != null);
+            Guard.NotNull(writer, "writer");
             foreach (var message in this.Messages)
             {
                 writer.Write(message.Size);
@@ -123,38 +182,16 @@ namespace Kafka.Client.Messages
         /// </returns>
         public override string ToString()
         {
-            using (var reader = new KafkaBinaryReader(this.SetBuffer))
-            {
-                return ParseFrom(reader, this.SetSize);
-            }
-        }
-
-        /// <summary>
-        /// Helper method to get string representation of set
-        /// </summary>
-        /// <param name="reader">
-        /// The reader.
-        /// </param>
-        /// <param name="count">
-        /// The count.
-        /// </param>
-        /// <returns>
-        /// String representation of set
-        /// </returns>
-        internal static string ParseFrom(KafkaBinaryReader reader, int count)
-        {
-            Guard.Assert<ArgumentNullException>(() => reader != null);
             var sb = new StringBuilder();
             int i = 1;
-            while (reader.BaseStream.Position != reader.BaseStream.Length)
+            foreach (var message in this.Messages)
             {
                 sb.Append("Message ");
                 sb.Append(i);
                 sb.Append(" {Length: ");
-                int msgSize = reader.ReadInt32();
-                sb.Append(msgSize);
+                sb.Append(message.Size);
                 sb.Append(", ");
-                sb.Append(Message.ParseFrom(reader, msgSize));
+                sb.Append(message.ToString());
                 sb.AppendLine("} ");
                 i++;
             }
@@ -162,6 +199,77 @@ namespace Kafka.Client.Messages
             return sb.ToString();
         }
 
+        internal static BufferedMessageSet ParseFrom(KafkaBinaryReader reader, int size)
+        {
+            if (size == 0)
+            {
+                return new BufferedMessageSet(Enumerable.Empty<Message>());
+            }
+
+            short errorCode = reader.ReadInt16();
+            if (errorCode != KafkaException.NoError)
+            {
+                throw new KafkaException(errorCode);
+            }
+
+            int readed = 2;
+            if (readed == size)
+            {
+                return new BufferedMessageSet(Enumerable.Empty<Message>());
+            }
+
+            var messages = new List<Message>();
+            do
+            {
+                int msgSize = reader.ReadInt32();
+                readed += 4;
+                Message msg = Message.ParseFrom(reader, msgSize);
+                readed += msgSize;
+                messages.Add(msg);
+            }
+            while (readed < size);
+            if (size != readed)
+            {
+                throw new KafkaException(KafkaException.InvalidRetchSizeCode);
+            }
+
+            return new BufferedMessageSet(messages);
+        }
+
+        internal static IList<BufferedMessageSet> ParseMultiFrom(KafkaBinaryReader reader, int size, int count)
+        {
+            var result = new List<BufferedMessageSet>();
+            if (size == 0)
+            {
+                return result;
+            }
+
+            int readed = 0;
+            short errorCode = reader.ReadInt16();
+            readed += 2;
+            if (errorCode != KafkaException.NoError)
+            {
+                throw new KafkaException(errorCode);
+            }
+
+            for (int i = 0; i < count; i++)
+            {
+                int partSize = reader.ReadInt32();
+                readed += 4;
+                var item = ParseFrom(reader, partSize);
+                readed += partSize;
+                result.Add(item);
+            }
+
+            if (size != readed)
+            {
+                throw new KafkaException(KafkaException.InvalidRetchSizeCode);
+            }
+
+            return result;
+        }
+
+        [Obsolete]
         internal static BufferedMessageSet ParseFrom(byte[] bytes)
         {
             var messages = new List<Message>();
@@ -176,5 +284,124 @@ namespace Kafka.Client.Messages
 
             return new BufferedMessageSet(messages);
         }
+
+        public IEnumerator<MessageAndOffset> GetEnumerator()
+        {
+            return this;
+        }
+
+        IEnumerator IEnumerable.GetEnumerator()
+        {
+            return GetEnumerator();
+        }
+
+        private bool InnerDone()
+        {
+            return innerIter == null || !innerIter.MoveNext();
+        }
+
+        private MessageAndOffset MakeNextOuter()
+        {
+            if (topIterPosition >= this.Messages.Count())
+            {
+                return AllDone();
+            }
+
+            Message newMessage = this.Messages.ToList()[topIterPosition];
+            topIterPosition++;
+            switch (newMessage.CompressionCodec)
+            {
+                case CompressionCodecs.NoCompressionCodec:
+                    if (Logger.IsDebugEnabled)
+                    {
+                        Logger.DebugFormat(
+                            CultureInfo.CurrentCulture,
+                            "Message is uncompressed. Valid byte count = {0}",
+                            currValidBytes);
+                    }
+
+                    innerIter = null;
+                    currValidBytes += 4 + newMessage.Size;
+                    return new MessageAndOffset(newMessage, currValidBytes);
+                default:
+                    if (Logger.IsDebugEnabled)
+                    {
+                        Logger.DebugFormat(CultureInfo.CurrentCulture, "Message is compressed. Valid byte count = {0}", currValidBytes);
+                    }
+
+                    innerIter = CompressionUtils.Decompress(newMessage).GetEnumerator();
+                    return MakeNext();
+            }
+        }
+
+        private MessageAndOffset MakeNext()
+        {
+            if (Logger.IsDebugEnabled)
+            {
+                Logger.DebugFormat(CultureInfo.CurrentCulture, "MakeNext() in deepIterator: innerDone = {0}", InnerDone());
+            }
+
+            switch (InnerDone())
+            {
+                case true:
+                    return MakeNextOuter();
+                default:
+                    var messageAndOffset = innerIter.Current;
+                    if (!innerIter.MoveNext())
+                    {
+                        currValidBytes += 4 + lastMessageSize;
+                    }
+
+                    return new MessageAndOffset(messageAndOffset.Message, currValidBytes);
+            }
+        }
+
+        private MessageAndOffset AllDone()
+        {
+            state = ConsumerIteratorState.Done;
+            return null;
+        }
+
+        public void Dispose()
+        {
+        }
+
+        public bool MoveNext()
+        {
+            if (state == ConsumerIteratorState.Failed)
+            {
+                throw new IllegalStateException("Iterator is in failed state");
+            }
+
+            switch (state)
+            {
+                case ConsumerIteratorState.Done:
+                    return false;
+                case ConsumerIteratorState.Ready:
+                    return true;
+                default:
+                    return MaybeComputeNext();
+            }
+        }
+
+        private bool MaybeComputeNext()
+        {
+            state = ConsumerIteratorState.Failed;
+            nextItem = MakeNext();
+            if (state == ConsumerIteratorState.Done)
+            {
+                return false;
+            }
+            else
+            {
+                state = ConsumerIteratorState.Ready;
+                return true;
+            }
+        }
+
+        public void Reset()
+        {
+            this.topIterPosition = 0;
+        }
     }
 }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/Message.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/Message.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/Message.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/Message.cs Tue Oct 18 17:52:13 2011
@@ -21,6 +21,7 @@ namespace Kafka.Client.Messages
     using System.IO;
     using System.Linq;
     using System.Text;
+    using Kafka.Client.Exceptions;
     using Kafka.Client.Serialization;
     using Kafka.Client.Utils;
 
@@ -35,10 +36,27 @@ namespace Kafka.Client.Messages
     /// </remarks>
     public class Message : IWritable
     {
-        private const byte DefaultMagicValue = 0;
+        private const byte DefaultMagicValue = 1;
         private const byte DefaultMagicLength = 1;
         private const byte DefaultCrcLength = 4;
         private const int DefaultHeaderSize = DefaultMagicLength + DefaultCrcLength;
+        private const byte CompressionCodeMask = 3;
+
+        public CompressionCodecs CompressionCodec
+        {
+            get
+            {
+                switch (Magic)
+                {
+                    case 0:
+                        return CompressionCodecs.NoCompressionCodec;
+                    case 1:
+                        return Messages.CompressionCodec.GetCompressionCodec(Attributes & CompressionCodeMask);
+                    default:
+                        throw new KafkaException(KafkaException.InvalidMessageCode);
+                }
+            }
+        }
 
         /// <summary>
         /// Initializes a new instance of the <see cref="Message"/> class.
@@ -53,11 +71,11 @@ namespace Kafka.Client.Messages
         /// Initializes with default magic number
         /// </remarks>
         public Message(byte[] payload, byte[] checksum)
-            : this(payload, DefaultMagicValue, checksum)
+            : this(payload, checksum, CompressionCodecs.NoCompressionCodec)
         {
-            Guard.Assert<ArgumentNullException>(() => payload != null);
-            Guard.Assert<ArgumentNullException>(() => checksum != null);
-            Guard.Assert<ArgumentException>(() => checksum.Length == 4);
+            Guard.NotNull(payload, "payload");
+            Guard.NotNull(checksum, "checksum");
+            Guard.Count(checksum, 4, "checksum");
         }
 
         /// <summary>
@@ -70,9 +88,9 @@ namespace Kafka.Client.Messages
         /// Initializes the magic number as default and the checksum as null. It will be automatically computed.
         /// </remarks>
         public Message(byte[] payload)
-            : this(payload, DefaultMagicValue)
+            : this(payload, CompressionCodecs.NoCompressionCodec)
         {
-            Guard.Assert<ArgumentNullException>(() => payload != null);
+            Guard.NotNull(payload, "payload");
         }
 
         /// <summary>
@@ -83,10 +101,10 @@ namespace Kafka.Client.Messages
         /// </remarks>
         /// <param name="payload">The data for the payload.</param>
         /// <param name="magic">The magic identifier.</param>
-        public Message(byte[] payload, byte magic)
-            : this(payload, magic, Crc32Hasher.Compute(payload))
+        public Message(byte[] payload, CompressionCodecs compressionCodec)
+            : this(payload, Crc32Hasher.Compute(payload), compressionCodec)
         {
-            Guard.Assert<ArgumentNullException>(() => payload != null);
+            Guard.NotNull(payload, "payload");
         }
 
         /// <summary>
@@ -95,25 +113,32 @@ namespace Kafka.Client.Messages
         /// <param name="payload">The data for the payload.</param>
         /// <param name="magic">The magic identifier.</param>
         /// <param name="checksum">The checksum for the payload.</param>
-        public Message(byte[] payload, byte magic, byte[] checksum)
+        public Message(byte[] payload, byte[] checksum, CompressionCodecs compressionCodec)
         {
-            Guard.Assert<ArgumentNullException>(() => payload != null);
-            Guard.Assert<ArgumentNullException>(() => checksum != null);
+            Guard.NotNull(payload, "payload");
+            Guard.NotNull(checksum, "checksum");
+            Guard.Count(checksum, 4, "checksum");
 
             int length = DefaultHeaderSize + payload.Length;
             this.Payload = payload;
-            this.Magic = magic;
+            this.Magic = DefaultMagicValue;
+            
+            if (compressionCodec != CompressionCodecs.NoCompressionCodec)
+            {
+                this.Attributes |=
+                    (byte)(CompressionCodeMask & Messages.CompressionCodec.GetCompressionCodecValue(compressionCodec));
+            }
+
+            if (Magic == 1)
+            {
+                length++;
+            }
+
             this.Checksum = checksum;
-            this.MessageBuffer = new BoundedBuffer(length);
-            this.WriteTo(this.MessageBuffer);
+            this.Size = length;
         }
 
         /// <summary>
-        /// Gets internal message buffer.
-        /// </summary>
-        public MemoryStream MessageBuffer { get; private set; }
-
-        /// <summary>
         /// Gets the payload.
         /// </summary>
         public byte[] Payload { get; private set; }
@@ -129,15 +154,14 @@ namespace Kafka.Client.Messages
         public byte[] Checksum { get; private set; }
 
         /// <summary>
+        /// Gets the Attributes for the message.
+        /// </summary>
+        public byte Attributes { get; private set; }
+
+        /// <summary>
         /// Gets the total size of message.
         /// </summary>
-        public int Size
-        {
-            get
-            {
-                return (int)this.MessageBuffer.Length;
-            }
-        }
+        public int Size { get; private set; }
 
         /// <summary>
         /// Gets the payload size.
@@ -158,7 +182,7 @@ namespace Kafka.Client.Messages
         /// </param>
         public void WriteTo(MemoryStream output)
         {
-            Guard.Assert<ArgumentNullException>(() => output != null);
+            Guard.NotNull(output, "output");
 
             using (var writer = new KafkaBinaryWriter(output))
             {
@@ -174,9 +198,9 @@ namespace Kafka.Client.Messages
         /// </param>
         public void WriteTo(KafkaBinaryWriter writer)
         {
-            Guard.Assert<ArgumentNullException>(() => writer != null);
-
+            Guard.NotNull(writer, "writer");
             writer.Write(this.Magic);
+            writer.Write(this.Attributes);
             writer.Write(this.Checksum);
             writer.Write(this.Payload);
         }
@@ -187,44 +211,27 @@ namespace Kafka.Client.Messages
         /// <returns>The decoded payload as string.</returns>
         public override string ToString()
         {
-            using (var reader = new KafkaBinaryReader(this.MessageBuffer))
+            var sb = new StringBuilder();
+            sb.Append("Magic: ");
+            sb.Append(this.Magic);
+            if (this.Magic == 1)
             {
-                return ParseFrom(reader, this.Size);
+                sb.Append(", Attributes: ");
+                sb.Append(this.Attributes);
             }
-        }
 
-        /// <summary>
-        /// Creates string representation of message
-        /// </summary>
-        /// <param name="reader">
-        /// The reader.
-        /// </param>
-        /// <param name="count">
-        /// The count.
-        /// </param>
-        /// <returns>
-        /// String representation of message
-        /// </returns>
-        public static string ParseFrom(KafkaBinaryReader reader, int count)
-        {
-            Guard.Assert<ArgumentNullException>(() => reader != null);
-            var sb = new StringBuilder();
-            int payloadSize = count - DefaultHeaderSize;
-            sb.Append("Magic: ");
-            sb.Append(reader.ReadByte());
             sb.Append(", Checksum: ");
             for (int i = 0; i < 4; i++)
             {
                 sb.Append("[");
-                sb.Append(reader.ReadByte());
+                sb.Append(this.Checksum[i]);
                 sb.Append("]");
             }
 
             sb.Append(", topic: ");
-            var encodedPayload = reader.ReadBytes(payloadSize);
             try
             {
-                sb.Append(Encoding.UTF8.GetString(encodedPayload));
+                sb.Append(Encoding.UTF8.GetString(this.Payload));
             }
             catch (Exception)
             {
@@ -234,6 +241,63 @@ namespace Kafka.Client.Messages
             return sb.ToString();
         }
 
+        [Obsolete("Use KafkaBinaryReader instead")]
+        public static Message FromMessageBytes(byte[] data)
+        {
+            byte magic = data[0];
+            byte[] checksum;
+            byte[] payload;
+            byte attributes;
+            if (magic == (byte)1)
+            {
+                attributes = data[1];
+                checksum = data.Skip(2).Take(4).ToArray();
+                payload = data.Skip(6).ToArray();
+                return new Message(payload, checksum, Messages.CompressionCodec.GetCompressionCodec(attributes & CompressionCodeMask));
+            }
+            else
+            {
+                checksum = data.Skip(1).Take(4).ToArray();
+                payload = data.Skip(5).ToArray();
+                return new Message(payload, checksum);
+            }
+        }
+
+        internal static Message ParseFrom(KafkaBinaryReader reader, int size)
+        {
+            Message result;
+            int readed = 0;
+            byte magic = reader.ReadByte();
+            readed++;
+            byte[] checksum;
+            byte[] payload;
+            if (magic == 1)
+            {
+                byte attributes = reader.ReadByte();
+                readed++;
+                checksum = reader.ReadBytes(4);
+                readed += 4;
+                payload = reader.ReadBytes(size - (DefaultHeaderSize + 1));
+                readed += size - (DefaultHeaderSize + 1);
+                result = new Message(payload, checksum, Messages.CompressionCodec.GetCompressionCodec(attributes & CompressionCodeMask));
+            }
+            else
+            {
+                checksum = reader.ReadBytes(4);
+                readed += 4;
+                payload = reader.ReadBytes(size - DefaultHeaderSize);
+                readed += size - DefaultHeaderSize;
+                result = new Message(payload, checksum);
+            }
+
+            if (size != readed)
+            {
+                throw new KafkaException(KafkaException.InvalidRetchSizeCode);
+            }
+
+            return result;
+        }
+
         /// <summary>
         /// Parses a message from a byte array given the format Kafka likes. 
         /// </summary>
@@ -244,10 +308,22 @@ namespace Kafka.Client.Messages
         {
             int size = BitConverter.ToInt32(BitWorks.ReverseBytes(data.Take(4).ToArray()), 0);
             byte magic = data[4];
-            byte[] checksum = data.Skip(5).Take(4).ToArray();
-            byte[] payload = data.Skip(9).Take(size).ToArray();
-
-            return new Message(payload, magic, checksum);
+            byte[] checksum;
+            byte[] payload;
+            byte attributes;
+            if (magic == 1)
+            {
+                attributes = data[5];
+                checksum = data.Skip(6).Take(4).ToArray();
+                payload = data.Skip(10).Take(size).ToArray();
+                return new Message(payload, checksum, Messages.CompressionCodec.GetCompressionCodec(attributes & CompressionCodeMask));
+            }
+            else
+            {
+                checksum = data.Skip(5).Take(4).ToArray();
+                payload = data.Skip(9).Take(size).ToArray();
+                return new Message(payload, checksum);
+            }
         }
     }
 }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/MessageSet.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/MessageSet.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/MessageSet.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Messages/MessageSet.cs Tue Oct 18 17:52:13 2011
@@ -48,7 +48,7 @@ namespace Kafka.Client.Messages
         /// </returns>
         public static int GetEntrySize(Message message)
         {
-            Guard.Assert<ArgumentNullException>(() => message != null);
+            Guard.NotNull(message, "message");
 
             return message.Size + DefaultMessageLengthSize;
         }

Modified: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/AsyncProducer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/AsyncProducer.cs?rev=1185772&r1=1185771&r2=1185772&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/AsyncProducer.cs (original)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producers/Async/AsyncProducer.cs Tue Oct 18 17:52:13 2011
@@ -28,19 +28,16 @@ namespace Kafka.Client.Producers.Async
     /// <summary>
     /// Sends messages encapsulated in request to Kafka server asynchronously
     /// </summary>
-    public class AsyncProducer : IAsyncProducer, IDisposable
+    public class AsyncProducer : IAsyncProducer
     {
-        private readonly AsyncProducerConfig config;
         private readonly ICallbackHandler callbackHandler;
-        private KafkaConnection connection = null;
+        private readonly KafkaConnection connection;
+        private volatile bool disposed;
 
         /// <summary>
         /// Gets producer config
         /// </summary>
-        public AsyncProducerConfig Config
-        {
-            get { return config; }
-        }
+        public AsyncProducerConfiguration Config { get; private set; }
 
         /// <summary>
         /// Initializes a new instance of the <see cref="AsyncProducer"/> class.
@@ -48,10 +45,10 @@ namespace Kafka.Client.Producers.Async
         /// <param name="config">
         /// The producer config.
         /// </param>
-        public AsyncProducer(AsyncProducerConfig config)
+        public AsyncProducer(AsyncProducerConfiguration config)
             : this(
                 config,
-                ReflectionHelper.Instantiate<ICallbackHandler>(config.CallbackHandler))
+                ReflectionHelper.Instantiate<ICallbackHandler>(config.CallbackHandlerClass))
         {
         }
 
@@ -65,14 +62,18 @@ namespace Kafka.Client.Producers.Async
         /// The callback invoked when a request is finished being sent.
         /// </param>
         public AsyncProducer(
-            AsyncProducerConfig config,
+            AsyncProducerConfiguration config,
             ICallbackHandler callbackHandler)
         {
-            Guard.Assert<ArgumentNullException>(() => config != null);
+            Guard.NotNull(config, "config");
 
-            this.config = config;
+            this.Config = config;
             this.callbackHandler = callbackHandler;
-            this.connection = new KafkaConnection(this.config.Host, this.config.Port);
+            this.connection = new KafkaConnection(
+                this.Config.Host,
+                this.Config.Port,
+                this.Config.BufferSize,
+                this.Config.SocketTimeout);
         }
 
         /// <summary>
@@ -83,7 +84,8 @@ namespace Kafka.Client.Producers.Async
         /// </param>
         public void Send(ProducerRequest request)
         {
-            Guard.Assert<ArgumentNullException>(() => request != null);
+            this.EnsuresNotDisposed();
+            Guard.NotNull(request, "request");
             Guard.Assert<ArgumentException>(() => request.MessageSet.Messages.All(x => x.PayloadSize <= this.Config.MaxMessageSize));
             if (this.callbackHandler != null)
             {
@@ -91,7 +93,7 @@ namespace Kafka.Client.Producers.Async
             }
             else
             {
-                connection.BeginWrite(request);
+                this.connection.BeginWrite(request);
             }
         }
 
@@ -106,12 +108,13 @@ namespace Kafka.Client.Producers.Async
         /// </param>
         public void Send(ProducerRequest request, MessageSent<ProducerRequest> callback)
         {
-            Guard.Assert<ArgumentNullException>(() => request != null);
-            Guard.Assert<ArgumentNullException>(() => request.MessageSet != null);
-            Guard.Assert<ArgumentNullException>(() => request.MessageSet.Messages != null);
+            this.EnsuresNotDisposed();
+            Guard.NotNull(request, "request");
+            Guard.NotNull(request.MessageSet, "request.MessageSet");
+            Guard.NotNull(request.MessageSet.Messages, "request.MessageSet.Messages");
             Guard.Assert<ArgumentException>(
                 () => request.MessageSet.Messages.All(x => x.PayloadSize <= this.Config.MaxMessageSize));
-            
+
             connection.BeginWrite(request, callback);
         }
 
@@ -129,10 +132,11 @@ namespace Kafka.Client.Producers.Async
         /// </param>
         public void Send(string topic, int partition, IEnumerable<Message> messages)
         {
-            Guard.Assert<ArgumentNullException>(() => !string.IsNullOrEmpty(topic));
-            Guard.Assert<ArgumentNullException>(() => messages != null);
+            this.EnsuresNotDisposed();
+            Guard.NotNullNorEmpty(topic, "topic");
+            Guard.NotNull(messages, "messages");
             Guard.Assert<ArgumentException>(() => messages.All(x => x.PayloadSize <= this.Config.MaxMessageSize));
-            
+
             this.Send(new ProducerRequest(topic, partition, messages));
         }
 
@@ -153,18 +157,50 @@ namespace Kafka.Client.Producers.Async
         /// </param>
         public void Send(string topic, int partition, IEnumerable<Message> messages, MessageSent<ProducerRequest> callback)
         {
-            Guard.Assert<ArgumentNullException>(() => !string.IsNullOrEmpty(topic));
-            Guard.Assert<ArgumentNullException>(() => messages != null);
+            this.EnsuresNotDisposed();
+            Guard.NotNullNorEmpty(topic, "topic");
+            Guard.NotNull(messages, "messages");
             Guard.Assert<ArgumentException>(() => messages.All(x => x.PayloadSize <= this.Config.MaxMessageSize));
-            
+
             this.Send(new ProducerRequest(topic, partition, messages), callback);
         }
 
+        /// <summary>
+        /// Releases all unmanaged and managed resources
+        /// </summary>
         public void Dispose()
         {
-            if (connection != null)
+            this.Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        protected virtual void Dispose(bool disposing)
+        {
+            if (!disposing)
+            {
+                return;
+            }
+
+            if (this.disposed)
+            {
+                return;
+            }
+
+            this.disposed = true;
+            if (this.connection != null)
+            {
+                this.connection.Dispose();
+            }
+        }
+
+        /// <summary>
+        /// Ensures that object was not disposed
+        /// </summary>
+        private void EnsuresNotDisposed()
+        {
+            if (this.disposed)
             {
-                connection.Dispose();
+                throw new ObjectDisposedException(this.GetType().Name);
             }
         }
     }



Mime
View raw message