kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: MINOR: Move compression stream construction into CompressionType
Date Thu, 16 Feb 2017 23:29:36 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 6c839395b -> 022d2017a


MINOR: Move compression stream construction into CompressionType

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #2536 from hachikuji/minor-move-compression-io-construction


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/022d2017
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/022d2017
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/022d2017

Branch: refs/heads/trunk
Commit: 022d2017a781286d223d80d8eae505fbc3fb369c
Parents: 6c83939
Author: Jason Gustafson <jason@confluent.io>
Authored: Thu Feb 16 15:22:55 2017 -0800
Committer: Jason Gustafson <jason@confluent.io>
Committed: Thu Feb 16 15:22:55 2017 -0800

----------------------------------------------------------------------
 .../common/record/ByteBufferInputStream.java    |  36 ++---
 .../common/record/ByteBufferOutputStream.java   |  53 +++----
 .../kafka/common/record/CompressionType.java    | 147 ++++++++++++++++++-
 .../common/record/MemoryRecordsBuilder.java     | 137 +----------------
 .../org/apache/kafka/common/record/Record.java  |   2 +-
 .../kafka/common/record/RecordsIterator.java    |   3 +-
 .../clients/consumer/internals/FetcherTest.java |   3 +-
 7 files changed, 191 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/022d2017/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
index 37e4766..fca45cf 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferInputStream.java
@@ -16,41 +16,33 @@
  */
 package org.apache.kafka.common.record;
 
-import java.io.DataInputStream;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 
 /**
  * A byte buffer backed input inputStream
  */
-public final class ByteBufferInputStream extends DataInputStream {
+public final class ByteBufferInputStream extends InputStream {
+    private final ByteBuffer buffer;
 
     public ByteBufferInputStream(ByteBuffer buffer) {
-        super(new UnderlyingInputStream(buffer));
+        this.buffer = buffer;
     }
 
-    private static final class UnderlyingInputStream extends InputStream {
-        private final ByteBuffer buffer;
-
-        public UnderlyingInputStream(ByteBuffer buffer) {
-            this.buffer = buffer;
+    public int read() {
+        if (!buffer.hasRemaining()) {
+            return -1;
         }
+        return buffer.get() & 0xFF;
+    }
 
-        public int read() {
-            if (!buffer.hasRemaining()) {
-                return -1;
-            }
-            return buffer.get() & 0xFF;
+    public int read(byte[] bytes, int off, int len) {
+        if (!buffer.hasRemaining()) {
+            return -1;
         }
 
-        public int read(byte[] bytes, int off, int len) {
-            if (!buffer.hasRemaining()) {
-                return -1;
-            }
-
-            len = Math.min(len, buffer.remaining());
-            buffer.get(bytes, off, len);
-            return len;
-        }
+        len = Math.min(len, buffer.remaining());
+        buffer.get(bytes, off, len);
+        return len;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/022d2017/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
index 3fb7f49..13609d8 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
@@ -16,54 +16,43 @@
  */
 package org.apache.kafka.common.record;
 
-import java.io.DataOutputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 /**
  * A byte buffer backed output outputStream
  */
-public class ByteBufferOutputStream extends DataOutputStream {
+public class ByteBufferOutputStream extends OutputStream {
 
     private static final float REALLOCATION_FACTOR = 1.1f;
 
+    private ByteBuffer buffer;
+
     public ByteBufferOutputStream(ByteBuffer buffer) {
-        super(new UnderlyingOutputStream(buffer));
+        this.buffer = buffer;
     }
 
-    public ByteBuffer buffer() {
-        return ((UnderlyingOutputStream) out).buffer;
+    public void write(int b) {
+        if (buffer.remaining() < 1)
+            expandBuffer(buffer.capacity() + 1);
+        buffer.put((byte) b);
     }
 
-    public static class UnderlyingOutputStream extends OutputStream {
-        private ByteBuffer buffer;
-
-        public UnderlyingOutputStream(ByteBuffer buffer) {
-            this.buffer = buffer;
-        }
-
-        public void write(int b) {
-            if (buffer.remaining() < 1)
-                expandBuffer(buffer.capacity() + 1);
-            buffer.put((byte) b);
-        }
-
-        public void write(byte[] bytes, int off, int len) {
-            if (buffer.remaining() < len)
-                expandBuffer(buffer.capacity() + len);
-            buffer.put(bytes, off, len);
-        }
+    public void write(byte[] bytes, int off, int len) {
+        if (buffer.remaining() < len)
+            expandBuffer(buffer.capacity() + len);
+        buffer.put(bytes, off, len);
+    }
 
-        public ByteBuffer buffer() {
-            return buffer;
-        }
+    public ByteBuffer buffer() {
+        return buffer;
+    }
 
-        private void expandBuffer(int size) {
-            int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size);
-            ByteBuffer temp = ByteBuffer.allocate(expandSize);
-            temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
-            buffer = temp;
-        }
+    private void expandBuffer(int size) {
+        int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size);
+        ByteBuffer temp = ByteBuffer.allocate(expandSize);
+        temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
+        buffer = temp;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/022d2017/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index e1d4754..62265dc 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -16,11 +16,91 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.KafkaException;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
 /**
  * The compression type to use
  */
 public enum CompressionType {
-    NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f), LZ4(3, "lz4",
0.5f);
+    NONE(0, "none", 1.0f) {
+        @Override
+        public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion,
int bufferSize) {
+            return buffer;
+        }
+
+        @Override
+        public InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersion)
{
+            return buffer;
+        }
+    },
+
+    GZIP(1, "gzip", 0.5f) {
+        @Override
+        public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion,
int bufferSize) {
+            try {
+                return new GZIPOutputStream(buffer, bufferSize);
+            } catch (Exception e) {
+                throw new KafkaException(e);
+            }
+        }
+
+        @Override
+        public InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersion)
{
+            try {
+                return new GZIPInputStream(buffer);
+            } catch (Exception e) {
+                throw new KafkaException(e);
+            }
+        }
+    },
+
+    SNAPPY(2, "snappy", 0.5f) {
+        @Override
+        public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion,
int bufferSize) {
+            try {
+                return (OutputStream) SNAPPY_OUTPUT_STREAM_SUPPLIER.get().newInstance(buffer,
bufferSize);
+            } catch (Exception e) {
+                throw new KafkaException(e);
+            }
+        }
+
+        @Override
+        public InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersion)
{
+            try {
+                return (InputStream) SNAPPY_INPUT_STREAM_SUPPLIER.get().newInstance(buffer);
+            } catch (Exception e) {
+                throw new KafkaException(e);
+            }
+        }
+    },
+
+    LZ4(3, "lz4", 0.5f) {
+        @Override
+        public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion,
int bufferSize) {
+            try {
+                return (OutputStream) LZ4_OUTPUT_STREAM_SUPPLIER.get().newInstance(buffer,
+                        messageVersion == Record.MAGIC_VALUE_V0);
+            } catch (Exception e) {
+                throw new KafkaException(e);
+            }
+        }
+
+        @Override
+        public InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersion)
{
+            try {
+                return (InputStream) LZ4_INPUT_STREAM_SUPPLIER.get().newInstance(buffer,
+                        messageVersion == Record.MAGIC_VALUE_V0);
+            } catch (Exception e) {
+                throw new KafkaException(e);
+            }
+        }
+    };
 
     public final int id;
     public final String name;
@@ -32,6 +112,10 @@ public enum CompressionType {
         this.rate = rate;
     }
 
+    public abstract OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion,
int bufferSize);
+
+    public abstract InputStream wrapForInput(ByteBufferInputStream buffer, byte messageVersion);
+
     public static CompressionType forId(int id) {
         switch (id) {
             case 0:
@@ -60,4 +144,65 @@ public enum CompressionType {
             throw new IllegalArgumentException("Unknown compression name: " + name);
     }
 
+    // dynamically load the snappy and lz4 classes to avoid runtime dependency if we are
not using compression
+    // caching constructors to avoid invoking of Class.forName method for each batch
+    private static final MemoizingConstructorSupplier SNAPPY_OUTPUT_STREAM_SUPPLIER = new
MemoizingConstructorSupplier(new ConstructorSupplier() {
+        @Override
+        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
+            return Class.forName("org.xerial.snappy.SnappyOutputStream")
+                    .getConstructor(OutputStream.class, Integer.TYPE);
+        }
+    });
+
+    private static final MemoizingConstructorSupplier LZ4_OUTPUT_STREAM_SUPPLIER = new MemoizingConstructorSupplier(new
ConstructorSupplier() {
+        @Override
+        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
+            return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockOutputStream")
+                    .getConstructor(OutputStream.class, Boolean.TYPE);
+        }
+    });
+
+    private static final MemoizingConstructorSupplier SNAPPY_INPUT_STREAM_SUPPLIER = new
MemoizingConstructorSupplier(new ConstructorSupplier() {
+        @Override
+        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
+            return Class.forName("org.xerial.snappy.SnappyInputStream")
+                    .getConstructor(InputStream.class);
+        }
+    });
+
+    private static final MemoizingConstructorSupplier LZ4_INPUT_STREAM_SUPPLIER = new MemoizingConstructorSupplier(new
ConstructorSupplier() {
+        @Override
+        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
+            return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream")
+                    .getConstructor(InputStream.class, Boolean.TYPE);
+        }
+    });
+
+    private interface ConstructorSupplier {
+        Constructor get() throws ClassNotFoundException, NoSuchMethodException;
+    }
+
+    // this code is based on Guava's @see{com.google.common.base.Suppliers.MemoizingSupplier}
+    private static class MemoizingConstructorSupplier {
+        final ConstructorSupplier delegate;
+        transient volatile boolean initialized;
+        transient Constructor value;
+
+        public MemoizingConstructorSupplier(ConstructorSupplier delegate) {
+            this.delegate = delegate;
+        }
+
+        public Constructor get() throws NoSuchMethodException, ClassNotFoundException {
+            if (!initialized) {
+                synchronized (this) {
+                    if (!initialized) {
+                        value = delegate.get();
+                        initialized = true;
+                    }
+                }
+            }
+            return value;
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/022d2017/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index a17c0e9..39f21a9 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -18,15 +18,9 @@ package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
 
-import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.Constructor;
 import java.nio.ByteBuffer;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
 
 /**
  * This class is used to write new log data in memory, i.e. this is the write path for {@link
MemoryRecords}.
@@ -34,10 +28,9 @@ import java.util.zip.GZIPOutputStream;
  * format conversion.
  */
 public class MemoryRecordsBuilder {
-
-    static private final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f;
-    static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
-    static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024;
+    private static final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f;
+    private static final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
+    private static final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024;
 
     private static final float[] TYPE_TO_RATE;
 
@@ -51,40 +44,6 @@ public class MemoryRecordsBuilder {
         }
     }
 
-    // dynamically load the snappy and lz4 classes to avoid runtime dependency if we are
not using compression
-    // caching constructors to avoid invoking of Class.forName method for each batch
-    private static MemoizingConstructorSupplier snappyOutputStreamSupplier = new MemoizingConstructorSupplier(new
ConstructorSupplier() {
-        @Override
-        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
-            return Class.forName("org.xerial.snappy.SnappyOutputStream")
-                .getConstructor(OutputStream.class, Integer.TYPE);
-        }
-    });
-
-    private static MemoizingConstructorSupplier lz4OutputStreamSupplier = new MemoizingConstructorSupplier(new
ConstructorSupplier() {
-        @Override
-        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
-            return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockOutputStream")
-                .getConstructor(OutputStream.class, Boolean.TYPE);
-        }
-    });
-
-    private static MemoizingConstructorSupplier snappyInputStreamSupplier = new MemoizingConstructorSupplier(new
ConstructorSupplier() {
-        @Override
-        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
-            return Class.forName("org.xerial.snappy.SnappyInputStream")
-                .getConstructor(InputStream.class);
-        }
-    });
-
-    private static MemoizingConstructorSupplier lz4InputStreamSupplier = new MemoizingConstructorSupplier(new
ConstructorSupplier() {
-        @Override
-        public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
-            return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream")
-                .getConstructor(InputStream.class, Boolean.TYPE);
-        }
-    });
-
     private final TimestampType timestampType;
     private final CompressionType compressionType;
     private final DataOutputStream appendStream;
@@ -146,7 +105,8 @@ public class MemoryRecordsBuilder {
 
         // create the stream
         bufferStream = new ByteBufferOutputStream(buffer);
-        appendStream = wrapForOutput(bufferStream, compressionType, magic, COMPRESSION_DEFAULT_BUFFER_SIZE);
+        appendStream = new DataOutputStream(compressionType.wrapForOutput(bufferStream, magic,
+                COMPRESSION_DEFAULT_BUFFER_SIZE));
     }
 
     public ByteBuffer buffer() {
@@ -399,93 +359,6 @@ public class MemoryRecordsBuilder {
         return builtRecords != null ? builtRecords.sizeInBytes() : estimatedBytesWritten();
     }
 
-    private static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType
type, byte messageVersion, int bufferSize) {
-        try {
-            switch (type) {
-                case NONE:
-                    return buffer;
-                case GZIP:
-                    return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
-                case SNAPPY:
-                    try {
-                        OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer,
bufferSize);
-                        return new DataOutputStream(stream);
-                    } catch (Exception e) {
-                        throw new KafkaException(e);
-                    }
-                case LZ4:
-                    try {
-                        OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer,
-                                messageVersion == Record.MAGIC_VALUE_V0);
-                        return new DataOutputStream(stream);
-                    } catch (Exception e) {
-                        throw new KafkaException(e);
-                    }
-                default:
-                    throw new IllegalArgumentException("Unknown compression type: " + type);
-            }
-        } catch (IOException e) {
-            throw new KafkaException(e);
-        }
-    }
-
-    public static DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType
type, byte messageVersion) {
-        try {
-            switch (type) {
-                case NONE:
-                    return buffer;
-                case GZIP:
-                    return new DataInputStream(new GZIPInputStream(buffer));
-                case SNAPPY:
-                    try {
-                        InputStream stream = (InputStream) snappyInputStreamSupplier.get().newInstance(buffer);
-                        return new DataInputStream(stream);
-                    } catch (Exception e) {
-                        throw new KafkaException(e);
-                    }
-                case LZ4:
-                    try {
-                        InputStream stream = (InputStream) lz4InputStreamSupplier.get().newInstance(buffer,
-                                messageVersion == Record.MAGIC_VALUE_V0);
-                        return new DataInputStream(stream);
-                    } catch (Exception e) {
-                        throw new KafkaException(e);
-                    }
-                default:
-                    throw new IllegalArgumentException("Unknown compression type: " + type);
-            }
-        } catch (IOException e) {
-            throw new KafkaException(e);
-        }
-    }
-
-    private interface ConstructorSupplier {
-        Constructor get() throws ClassNotFoundException, NoSuchMethodException;
-    }
-
-    // this code is based on Guava's @see{com.google.common.base.Suppliers.MemoizingSupplier}
-    private static class MemoizingConstructorSupplier {
-        final ConstructorSupplier delegate;
-        transient volatile boolean initialized;
-        transient Constructor value;
-
-        public MemoizingConstructorSupplier(ConstructorSupplier delegate) {
-            this.delegate = delegate;
-        }
-
-        public Constructor get() throws NoSuchMethodException, ClassNotFoundException {
-            if (!initialized) {
-                synchronized (this) {
-                    if (!initialized) {
-                        value = delegate.get();
-                        initialized = true;
-                    }
-                }
-            }
-            return value;
-        }
-    }
-
     public static class RecordsInfo {
         public final long maxTimestamp;
         public final long shallowOffsetOfMaxTimestamp;

http://git-wip-us.apache.org/repos/asf/kafka/blob/022d2017/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index 9dca544..51bbe35 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -479,7 +479,7 @@ public final class Record {
                               CompressionType compressionType,
                               TimestampType timestampType) {
         try {
-            ByteBufferOutputStream out = new ByteBufferOutputStream(buffer);
+            DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buffer));
             write(out, magic, timestamp, key, value, compressionType, timestampType);
         } catch (IOException e) {
             throw new KafkaException(e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/022d2017/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
index 792a857..07c9197 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
@@ -147,7 +147,8 @@ public class RecordsIterator extends AbstractIterator<LogEntry>
{
 
             CompressionType compressionType = wrapperRecord.compressionType();
             ByteBuffer buffer = wrapperRecord.value();
-            DataInputStream stream = MemoryRecordsBuilder.wrapForInput(new ByteBufferInputStream(buffer),
compressionType, wrapperRecord.magic());
+            DataInputStream stream = new DataInputStream(compressionType.wrapForInput(new
ByteBufferInputStream(buffer),
+                    wrapperRecord.magic()));
             LogInputStream logStream = new DataLogInputStream(stream, maxMessageSize);
 
             long wrapperRecordOffset = wrapperEntry.offset();

http://git-wip-us.apache.org/repos/asf/kafka/blob/022d2017/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index b27802d..fec9251 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -62,6 +62,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.DataOutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -217,7 +218,7 @@ public class FetcherTest {
     @Test
     public void testParseInvalidRecord() throws Exception {
         ByteBuffer buffer = ByteBuffer.allocate(1024);
-        ByteBufferOutputStream out = new ByteBufferOutputStream(buffer);
+        DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buffer));
 
         byte magic = Record.CURRENT_MAGIC_VALUE;
         byte[] key = "foo".getBytes();


Mime
View raw message