sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject svn commit: r1190430 [1/5] - in /incubator/sqoop/trunk: ./ src/java/com/cloudera/sqoop/io/ src/java/com/cloudera/sqoop/lib/ src/java/org/apache/sqoop/io/ src/java/org/apache/sqoop/lib/
Date Fri, 28 Oct 2011 16:32:45 GMT
Author: blee
Date: Fri Oct 28 16:32:43 2011
New Revision: 1190430

URL: http://svn.apache.org/viewvc?rev=1190430&view=rev
Log:
SQOOP-379 Migrate lib and io packages to new name space

Added:
    incubator/sqoop/trunk/src/java/org/apache/sqoop/io/
    incubator/sqoop/trunk/src/java/org/apache/sqoop/io/CodecMap.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/io/FixedLengthInputStream.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/io/LobFile.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/io/LobReaderCache.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/io/NamedFifo.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/io/SplittableBufferedWriter.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/io/SplittingOutputStream.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/io/UnsupportedCodecException.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/
    incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/BigDecimalSerializer.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/BlobRef.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/BooleanParser.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/ClobRef.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/DelimiterSet.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/FieldFormatter.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/FieldMapProcessor.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/FieldMappable.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/JdbcWritableBridge.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/LargeObjectLoader.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/LobRef.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/LobSerializer.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/ProcessingException.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/RecordParser.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/SqoopRecord.java   (with props)
Modified:
    incubator/sqoop/trunk/build.xml
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/FixedLengthInputStream.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/LobFile.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/LobReaderCache.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/NamedFifo.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/SplittableBufferedWriter.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/SplittingOutputStream.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/UnsupportedCodecException.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/lib/BigDecimalSerializer.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/lib/BlobRef.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/lib/BooleanParser.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/lib/ClobRef.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/lib/DelimiterSet.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/lib/FieldFormatter.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/lib/FieldMapProcessor.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/lib/FieldMappable.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/lib/JdbcWritableBridge.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/lib/LargeObjectLoader.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/lib/LobRef.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/lib/LobSerializer.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/lib/ProcessingException.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/lib/RecordParser.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/lib/SqoopRecord.java

Modified: incubator/sqoop/trunk/build.xml
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/build.xml?rev=1190430&r1=1190429&r2=1190430&view=diff
==============================================================================
--- incubator/sqoop/trunk/build.xml (original)
+++ incubator/sqoop/trunk/build.xml Fri Oct 28 16:32:43 2011
@@ -808,7 +808,6 @@
       <arg value="+%Y" />
     </exec>
     <javadoc
-      packagenames="com.cloudera.sqoop.lib.*"
       destdir="${build.javadoc}"
       author="true"
       version="true"
@@ -818,6 +817,7 @@
       bottom="Copyright &amp;copy; ${year} The Apache Software Foundation">
       <packageset dir="${src.dir}">
         <include name="com/cloudera/sqoop/lib/**" />
+        <include name="org/apache/sqoop/lib/**" />
       </packageset>
       <classpath>
         <path refid="compile.classpath" />

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java?rev=1190430&r1=1190429&r2=1190430&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/CodecMap.java Fri Oct 28 16:32:43 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,22 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-
 package com.cloudera.sqoop.io;
 
-import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * Provides a mapping from codec names to concrete implementation class names.
+ *
+ * @deprecated use org.apache.sqoop.io.CodecMap instead.
+ * @see org.apache.sqoop.io.CodecMap
  */
 public final class CodecMap {
 
@@ -40,33 +34,10 @@ public final class CodecMap {
   // Note: do not add more values here, since codecs are discovered using the
   // standard Hadoop mechanism (io.compression.codecs). See
   // CompressionCodecFactory.
-  public static final String NONE = "none";
-  public static final String DEFLATE = "deflate";
-  public static final String LZO = "lzo";
-  public static final String LZOP = "lzop";
-
-  private static Map<String, String> codecNames;
-  static {
-    codecNames = new TreeMap<String, String>();
-
-    // Register the names of codecs we know about.
-    codecNames.put(NONE,    null);
-    codecNames.put(DEFLATE, "org.apache.hadoop.io.compress.DefaultCodec");
-    codecNames.put(LZO,     "com.hadoop.compression.lzo.LzoCodec");
-    codecNames.put(LZOP,     "com.hadoop.compression.lzo.LzopCodec");
-
-    // add more from Hadoop CompressionCodecFactory
-    for (Class<? extends CompressionCodec> cls
-        : CompressionCodecFactory.getCodecClasses(new Configuration())) {
-      String simpleName = cls.getSimpleName();
-      String codecName = simpleName;
-      if (simpleName.endsWith("Codec")) {
-        codecName = simpleName.substring(0, simpleName.length()
-            - "Codec".length());
-      }
-      codecNames.put(codecName.toLowerCase(), cls.getCanonicalName());
-    }
-  }
+  public static final String NONE = org.apache.sqoop.io.CodecMap.NONE;
+  public static final String DEFLATE = org.apache.sqoop.io.CodecMap.DEFLATE;
+  public static final String LZO = org.apache.sqoop.io.CodecMap.LZO;
+  public static final String LZOP = org.apache.sqoop.io.CodecMap.LZOP;
 
   private CodecMap() {
   }
@@ -79,11 +50,7 @@ public final class CodecMap {
    */
   public static String getCodecClassName(String codecName)
       throws UnsupportedCodecException {
-    if (!codecNames.containsKey(codecName)) {
-      throw new UnsupportedCodecException(codecName);
-    }
-
-    return codecNames.get(codecName);
+    return org.apache.sqoop.io.CodecMap.getCodecClassName(codecName);
   }
 
   /**
@@ -94,79 +61,13 @@ public final class CodecMap {
    */
   public static CompressionCodec getCodec(String codecName,
       Configuration conf) throws UnsupportedCodecException {
-    // Try standard Hadoop mechanism first
-    CompressionCodec codec = getCodecByName(codecName, conf);
-    if (codec != null) {
-      return codec;
-    }
-    // Fall back to Sqoop mechanism
-    String codecClassName = null;
-    try {
-      codecClassName = getCodecClassName(codecName);
-      if (null == codecClassName) {
-        return null;
-      }
-      Class<? extends CompressionCodec> codecClass =
-          (Class<? extends CompressionCodec>)
-          conf.getClassByName(codecClassName);
-      return (CompressionCodec) ReflectionUtils.newInstance(
-          codecClass, conf);
-    } catch (ClassNotFoundException cnfe) {
-      throw new UnsupportedCodecException("Cannot find codec class "
-          + codecClassName + " for codec " + codecName);
-    }
-  }
-
-  /**
-   * Find the relevant compression codec for the codec's canonical class name
-   * or by codec alias.
-   * <p>
-   * Codec aliases are case insensitive.
-   * <p>
-   * The code alias is the short class name (without the package name).
-   * If the short class name ends with 'Codec', then there are two aliases for
-   * the codec, the complete short class name and the short class name without
-   * the 'Codec' ending. For example for the 'GzipCodec' codec class name the
-   * alias are 'gzip' and 'gzipcodec'.
-   * <p>
-   * Note: When HADOOP-7323 is available this method can be replaced with a call
-   * to CompressionCodecFactory.
-   * @param classname the canonical class name of the codec or the codec alias
-   * @return the codec object or null if none matching the name were found
-   */
-  private static CompressionCodec getCodecByName(String codecName,
-      Configuration conf) {
-    List<Class<? extends CompressionCodec>> codecs =
-      CompressionCodecFactory.getCodecClasses(conf);
-    for (Class<? extends CompressionCodec> cls : codecs) {
-      if (codecMatches(cls, codecName)) {
-        return ReflectionUtils.newInstance(cls, conf);
-      }
-    }
-    return null;
-  }
-
-  private static boolean codecMatches(Class<? extends CompressionCodec> cls,
-      String codecName) {
-    String simpleName = cls.getSimpleName();
-    if (cls.getName().equals(codecName)
-        || simpleName.equalsIgnoreCase(codecName)) {
-      return true;
-    }
-    if (simpleName.endsWith("Codec")) {
-      String prefix = simpleName.substring(0, simpleName.length()
-          - "Codec".length());
-      if (prefix.equalsIgnoreCase(codecName)) {
-        return true;
-      }
-    }
-    return false;
+    return org.apache.sqoop.io.CodecMap.getCodec(codecName, conf);
   }
 
   /**
    * Return the set of available codec names.
    */
   public static Set<String> getCodecNames() {
-    return codecNames.keySet();
+    return org.apache.sqoop.io.CodecMap.getCodecNames();
   }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/FixedLengthInputStream.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/FixedLengthInputStream.java?rev=1190430&r1=1190429&r2=1190430&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/FixedLengthInputStream.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/FixedLengthInputStream.java Fri Oct 28 16:32:43 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,76 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-
 package com.cloudera.sqoop.io;
 
 import java.io.InputStream;
-import java.io.IOException;
-
-import org.apache.commons.io.input.CloseShieldInputStream;
-import org.apache.commons.io.input.CountingInputStream;
-import org.apache.commons.io.input.ProxyInputStream;
 
 /**
  * Provides an InputStream that can consume a fixed maximum number of bytes
  * from an underlying stream. Closing the FixedLengthInputStream does not
  * close the underlying stream. After reading the maximum number of available
  * bytes this acts as though EOF has been reached.
+ *
+ * @deprecated use org.apache.sqoop.io.FixedLengthInputStream instead.
+ * @see org.apache.sqoop.io.FixedLengthInputStream
  */
-public class FixedLengthInputStream extends ProxyInputStream {
-
-  private CountingInputStream countingIn;
-  private long maxBytes;
-
-  public FixedLengthInputStream(InputStream stream, long maxLen) {
-    super(new CountingInputStream(new CloseShieldInputStream(stream)));
-
-    // Save a correctly-typed reference to the underlying stream.
-    this.countingIn = (CountingInputStream) this.in;
-    this.maxBytes = maxLen;
-  }
-
-  /** @return the number of bytes already consumed by the client. */
-  private long consumed() {
-    return countingIn.getByteCount();
-  }
-
-  /**
-   * @return number of bytes remaining to be read before the limit
-   * is reached.
-   */
-  private long toLimit() {
-    return maxBytes - consumed();
-  }
-
-  @Override
-  public int available() throws IOException {
-    return (int) Math.min(toLimit(), countingIn.available());
-  }
-
-  @Override
-  public int read() throws IOException {
-    if (toLimit() > 0) {
-      return super.read();
-    } else {
-      return -1; // EOF.
-    }
-  }
-
-  @Override
-  public int read(byte [] buf) throws IOException {
-    return read(buf, 0, buf.length);
-  }
+public class FixedLengthInputStream
+  extends org.apache.sqoop.io.FixedLengthInputStream {
 
-  @Override
-  public int read(byte [] buf, int start, int count) throws IOException {
-    long limit = toLimit();
-    if (limit == 0) {
-      return -1; // EOF.
-    } else {
-      return super.read(buf, start, (int) Math.min(count, limit));
-    }
-  }
+   public FixedLengthInputStream(InputStream stream, long maxLen) {
+     super(stream, maxLen);
+   }
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/LobFile.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/LobFile.java?rev=1190430&r1=1190429&r2=1190430&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/LobFile.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/LobFile.java Fri Oct 28 16:32:43 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,56 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-
 package com.cloudera.sqoop.io;
 
-import java.io.BufferedOutputStream;
-import java.io.Closeable;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.UnsupportedEncodingException;
-import java.util.AbstractMap;
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
 
-import org.apache.commons.io.output.CloseShieldOutputStream;
-import org.apache.commons.io.output.CountingOutputStream;
 import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.CompressorStream;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.DecompressorStream;
-
-import com.cloudera.sqoop.util.RandomHash;
+import org.apache.hadoop.fs.Path;
 
 /**
  * File format which stores large object records.
@@ -75,1698 +30,55 @@ import com.cloudera.sqoop.util.RandomHas
  * Each record is assigned an id and can be accessed by id efficiently by
  * consulting an index at the end of the file.
  *
- * The LobFile format is specified at:
- * http://wiki.github.com/cloudera/sqoop/sip-3
+ * @deprecated use org.apache.sqoop.io.LobFile instead.
+ * @see org.apache.sqoop.io.LobFile
  */
 public final class LobFile {
 
   private LobFile() {
   }
 
-  public static final Log LOG = LogFactory.getLog(LobFile.class.getName());
+  public static final Log LOG = org.apache.sqoop.io.LobFile.LOG;
+
+  public static final int LATEST_LOB_VERSION =
+      org.apache.sqoop.io.LobFile.LATEST_LOB_VERSION;
 
-  public static final int LATEST_LOB_VERSION = 0;
-  static final char [] HEADER_ID_STR = { 'L', 'O', 'B' };
+  // Must be in sync with org.apache.sqoop.io.LobFile.HEADER_ID_STR
+  static final char [] HEADER_ID_STR = 
+      org.apache.sqoop.io.LobFile.HEADER_ID_STR;
 
   // Value for entryId to write to the beginning of an IndexSegment.
-  static final long SEGMENT_HEADER_ID = -1;
+  static final long SEGMENT_HEADER_ID =
+      org.apache.sqoop.io.LobFile.SEGMENT_HEADER_ID;
 
   // Value for entryId to write before the finale.
-  static final long SEGMENT_OFFSET_ID = -2;
+  static final long SEGMENT_OFFSET_ID =
+      org.apache.sqoop.io.LobFile.SEGMENT_OFFSET_ID;
 
   // Value for entryID to write before the IndexTable
-  static final long INDEX_TABLE_ID = -3;
-
-  /**
-   * Represents a header block in a LobFile. Can write a new header
-   * block (and generate a record start mark), or read an existing
-   * header block.
-   */
-  private static class LobFileHeader implements Writable {
-
-    private int version;
-    private RecordStartMark startMark;
-    private MetaBlock metaBlock;
-
-    /**
-     * Create a new LobFileHeader.
-     */
-    public LobFileHeader() {
-      this.version = LATEST_LOB_VERSION;
-      this.startMark = new RecordStartMark();
-      this.metaBlock = new MetaBlock();
-    }
-
-    /**
-     * Read a LobFileHeader from an existing file.
-     */
-    public LobFileHeader(DataInput in) throws IOException {
-      readFields(in);
-    }
-
-    /**
-     * Write a LobFile header to an output sink.
-     */
-    public void write(DataOutput out) throws IOException {
-      // Start with the file type identification.
-      for (char c : HEADER_ID_STR) {
-        out.writeByte((int) c);
-      }
-
-      // Write the format version
-      WritableUtils.writeVInt(out, this.version);
-
-      startMark.write(out);
-      metaBlock.write(out);
-    }
-
-    public void readFields(DataInput in) throws IOException {
-      char [] chars = new char[3];
-      for (int i = 0; i < 3; i++) {
-        chars[i] = (char) in.readByte();
-      }
-
-      // Check that these match what we expect. Throws IOE if not.
-      checkHeaderChars(chars);
-
-      this.version = WritableUtils.readVInt(in);
-      if (this.version != LATEST_LOB_VERSION) {
-        // Right now we only have one version we can handle.
-        throw new IOException("Unexpected LobFile version " + this.version);
-      }
-
-      this.startMark = new RecordStartMark(in);
-      this.metaBlock = new MetaBlock(in);
-    }
-
-    /**
-     * Checks that a header array matches the standard LobFile header.
-     * Additional data at the end of the headerStamp is ignored.
-     * @param headerStamp the header bytes received from the file.
-     * @throws IOException if it doesn't.
-     */
-    private void checkHeaderChars(char [] headerStamp) throws IOException {
-      if (headerStamp.length != HEADER_ID_STR.length) {
-        throw new IOException("Invalid LobFile header stamp: expected length "
-            + HEADER_ID_STR.length);
-      }
-      for (int i = 0; i < HEADER_ID_STR.length; i++) {
-        if (headerStamp[i] != HEADER_ID_STR[i]) {
-          throw new IOException("Invalid LobFile header stamp");
-        }
-      }
-    }
-
-    /**
-     * @return the format version number for this LobFile
-     */
-    public int getVersion() {
-      return version;
-    }
-
-    /**
-     * @return the RecordStartMark for this LobFile.
-     */
-    public RecordStartMark getStartMark() {
-      return startMark;
-    }
-
-    /**
-     * @return the MetaBlock for this LobFile.
-     */
-    public MetaBlock getMetaBlock() {
-      return metaBlock;
-    }
-  }
-
-  /**
-   * Holds a RecordStartMark -- a 16 byte randomly-generated
-   * sync token. Can read a RSM from an input source, or can
-   * generate a new one.
-   */
-  private static class RecordStartMark implements Writable {
-
-    // This is a 16-byte array.
-    public static final int START_MARK_LENGTH = 16;
-
-    private byte [] startBytes;
-
-    public RecordStartMark() {
-      generateStartMark();
-    }
-
-    public RecordStartMark(DataInput in) throws IOException {
-      readFields(in);
-    }
-
-    public byte [] getBytes() {
-      byte [] out = new byte[START_MARK_LENGTH];
-      System.arraycopy(this.startBytes, 0, out, 0, START_MARK_LENGTH);
-      return out;
-    }
-
-    public void readFields(DataInput in) throws IOException {
-      this.startBytes = new byte[START_MARK_LENGTH];
-      in.readFully(this.startBytes);
-    }
-
-    public void write(DataOutput out) throws IOException {
-      out.write(this.startBytes);
-    }
-
-    /**
-     * Generate a new random RecordStartMark.
-     */
-    private void generateStartMark() {
-      this.startBytes = RandomHash.generateMD5Bytes();
-    }
-  }
-
-  /**
-   * Represents the metadata block stored in the header of a LobFile.
-   */
-  private static class MetaBlock extends AbstractMap<String, BytesWritable>
-      implements Writable {
-
-    // Strings which typically appear in the metablock have canonical names.
-    public static final String ENTRY_ENCODING_KEY = "EntryEncoding";
-    public static final String COMPRESSION_CODEC_KEY = "CompressionCodec";
-    public static final String ENTRIES_PER_SEGMENT_KEY = "EntriesPerSegment";
-
-    // Standard entry encodings.
-    public static final String CLOB_ENCODING = "CLOB";
-    public static final String BLOB_ENCODING = "BLOB";
-
-    private Map<String, BytesWritable> entries;
-
-    public MetaBlock() {
-      entries = new TreeMap<String, BytesWritable>();
-    }
-
-    public MetaBlock(DataInput in) throws IOException {
-      entries = new TreeMap<String, BytesWritable>();
-      readFields(in);
-    }
-
-    public MetaBlock(Map<String, BytesWritable> map) {
-      entries = new TreeMap<String, BytesWritable>();
-      for (Map.Entry<String, BytesWritable> entry : map.entrySet()) {
-        entries.put(entry.getKey(), entry.getValue());
-      }
-    }
-
-    @Override
-    public Set<Map.Entry<String, BytesWritable>> entrySet() {
-      return entries.entrySet();
-    }
-
-    @Override
-    public BytesWritable put(String k, BytesWritable v) {
-      BytesWritable old = entries.get(k);
-      entries.put(k, v);
-      return old;
-    }
-
-    public BytesWritable put(String k, String v) {
-      try {
-        return put(k, new BytesWritable(v.getBytes("UTF-8")));
-      } catch (UnsupportedEncodingException uee) {
-        // Shouldn't happen; UTF-8 is always supported.
-        throw new RuntimeException(uee);
-      }
-    }
-
-    @Override
-    public BytesWritable get(Object k) {
-      return entries.get(k);
-    }
-
-    public String getString(Object k) {
-      BytesWritable bytes = get(k);
-      if (null == bytes) {
-        return null;
-      } else {
-        try {
-          return new String(bytes.getBytes(), 0, bytes.getLength(), "UTF-8");
-        } catch (UnsupportedEncodingException uee) {
-          // Shouldn't happen; UTF-8 is always supported.
-          throw new RuntimeException(uee);
-        }
-      }
-    }
-
-    public void readFields(DataInput in) throws IOException {
-      int numEntries = WritableUtils.readVInt(in);
-      entries.clear();
-      for (int i = 0; i < numEntries; i++) {
-        String key = Text.readString(in);
-        BytesWritable val = new BytesWritable();
-        val.readFields(in);
-        entries.put(key, val);
-      }
-    }
-
-    public void write(DataOutput out) throws IOException {
-      int numEntries = entries.size();
-      WritableUtils.writeVInt(out, numEntries);
-      for (Map.Entry<String, BytesWritable> entry : entries.entrySet()) {
-        Text.writeString(out, entry.getKey());
-        entry.getValue().write(out);
-      }
-    }
-  }
-
-  /**
-   * Class that represents the IndexSegment entries in a LobIndex.
-   */
-  private static class IndexSegment implements Writable {
-
-    // The main body of the IndexSegment: the record lengths
-    // of all the records in the IndexSegment.
-    private BytesWritable recordLenBytes;
-
-    // The length of the previously recorded field (used when
-    // generating an index). Intermediate state used in calculation
-    // of the lastIndexOffset.
-    private long prevLength;
-
-    // Used to write VLong-encoded lengths into a temp
-    // array, which are then copied into recordLenBytes.
-    private DataOutputBuffer outputBuffer;
-
-    // The IndexTableEntry that describes this IndexSegment in the IndexTable.
-    private IndexTableEntry tableEntry;
-
-    public IndexSegment(IndexTableEntry tableEntry) {
-      this.recordLenBytes = new BytesWritable();
-      this.outputBuffer = new DataOutputBuffer(10); // max VLong size.
-      this.tableEntry = tableEntry;
-    }
-
-    /**
-     * Read an IndexSegment from an existing file.
-     */
-    public IndexSegment(IndexTableEntry tableEntry, DataInput in)
-        throws IOException {
-      this.recordLenBytes = new BytesWritable();
-      this.outputBuffer = new DataOutputBuffer(10);
-      this.tableEntry = tableEntry;
-      readFields(in);
-    }
-
-    /**
-     * @return the IndexTableEntry describing this IndexSegment in the
-     * IndexTable.
-     */
-    public IndexTableEntry getTableEntry() {
-      return tableEntry;
-    }
-
-    /**
-     * Add a recordLength to the recordLenBytes array.
-     */
-    public void addRecordLen(long recordLen) throws IOException {
-      // Allocate space for the new bytes.
-      int numBytes = WritableUtils.getVIntSize(recordLen);
-      recordLenBytes.setSize(recordLenBytes.getLength() + numBytes);
-
-      // Write the new bytes into a temporary buffer wrapped in a DataOutput.
-      outputBuffer.reset();
-      WritableUtils.writeVLong(outputBuffer, recordLen);
-
-      // Then copy those new bytes into the end of the recordLenBytes array.
-      System.arraycopy(outputBuffer.getData(), 0, recordLenBytes.getBytes(),
-          recordLenBytes.getLength() - numBytes, numBytes);
-
-      // Now that we've added a new recordLength to the array,
-      // it's the last index. We need to calculate its offset.
-      // This is based on how long the previous record was.
-      this.tableEntry.setLastIndexOffset(
-          this.tableEntry.getLastIndexOffset() + this.prevLength);
-
-      // Save this record's length (unserialized) for calculating
-      // lastIndexOffset for the next record.
-      this.prevLength = recordLen;
-    }
-
-    public void write(DataOutput out) throws IOException {
-      // Write the SEGMENT_HEADER_ID to distinguish this from a LobRecord.
-      WritableUtils.writeVLong(out, SEGMENT_HEADER_ID);
-
-      // The length of the main body of the segment is the length of the
-      // data byte array.
-      int segmentBytesLen = recordLenBytes.getLength();
-      WritableUtils.writeVLong(out, segmentBytesLen);
-
-      // Write the body of the segment.
-      out.write(recordLenBytes.getBytes(), 0, segmentBytesLen);
-    }
-
-    public void readFields(DataInput in) throws IOException {
-      // After the RecordStartMark, we expect to get a SEGMENT_HEADER_ID (-1).
-      long segmentId = WritableUtils.readVLong(in);
-      if (SEGMENT_HEADER_ID != segmentId) {
-        throw new IOException("Expected segment header id " + SEGMENT_HEADER_ID
-            + "; got " + segmentId);
-      }
-
-      // Get the length of the rest of the segment, in bytes.
-      long length = WritableUtils.readVLong(in);
-
-      // Now read the actual main byte array.
-      if (length > Integer.MAX_VALUE) {
-        throw new IOException("Unexpected oversize data array length: "
-            + length);
-      } else if (length < 0) {
-        throw new IOException("Unexpected undersize data array length: "
-            + length);
-      }
-      byte [] segmentData = new byte[(int) length];
-      in.readFully(segmentData);
-      recordLenBytes = new BytesWritable(segmentData);
-
-      reset(); // Reset the iterator allowing the user to yield offset/lengths.
-    }
-
-
-    // The following methods are used by a Reader to walk through the index
-    // segment and get data about the records described in this segment of
-    // the index.
-
-    private DataInputBuffer dataInputBuf;
-
-    // The following two fields are advanced by the next() method.
-    private long curOffset; // offset into the file of the current record.
-    private long curLen; // length of the current record in bytes.
-
-    // Used to allow rewindOnce() to go backwards a single position in the
-    // iterator.
-    private int prevInputBufPos; // prev offset into dataInputBuf.
-    private long prevOffset;
-    private long prevLen;
-
-    /**
-     * Resets the record index iterator.
-     */
-    public void reset() {
-      this.dataInputBuf = null;
-    }
-
-    /**
-     * Aligns the iteration capability to return info about the next
-     * record in the IndexSegment. Must be called before the first
-     * record.
-     * @return true if there is another record described in this IndexSegment.
-     */
-    public boolean next() {
-      this.prevOffset = this.curOffset;
-      if (null == dataInputBuf) {
-        // We need to set up the iterator; this is the first use.
-        if (null == recordLenBytes) {
-          return false; // We don't have any records?
-        }
-
-        this.dataInputBuf = new DataInputBuffer();
-        this.dataInputBuf.reset(recordLenBytes.getBytes(),
-            0, recordLenBytes.getLength());
-
-        this.curOffset = this.tableEntry.getFirstIndexOffset();
-        this.prevOffset = 0;
-      } else {
-        this.curOffset += this.curLen;
-      }
-
-      boolean available = dataInputBuf.getPosition() < dataInputBuf.getLength();
-      if (available) {
-        this.prevInputBufPos = dataInputBuf.getPosition();
-        // Then read out the next record length.
-        try {
-          this.prevLen = this.curLen;
-          this.curLen = WritableUtils.readVLong(dataInputBuf);
-        } catch (IOException ioe) {
-          // Shouldn't happen; data in DataInputBuffer is materialized.
-          throw new RuntimeException(ioe);
-        }
-      }
-
-      return available;
-    }
-
-    /**
-     * Undoes a single call to next(). This cannot be called twice in a row;
-     * before calling this again, next() must be called in the interim. This
-     * makes a subsequent call to next() yield the same iterated values as the
-     * previous call.
-     */
-    public void rewindOnce() {
-      // Move the buffer backwards so we deserialize the same VLong with
-      // the next call.
-      if (prevInputBufPos == 0) {
-        // We actually rewound the first next() in the iterator.
-        // Just reset the iterator to the beginning. Otherwise we'll
-        // backfill it with bogus data.
-        reset();
-      } else {
-        // Use the normal codepath; move the serialization buffer
-        // backwards and restores the previously yielded values.
-        dataInputBuf.reset(recordLenBytes.getBytes(), prevInputBufPos,
-            recordLenBytes.getLength() - prevInputBufPos);
-
-        // And restore the previously-yielded values.
-        this.curLen = this.prevLen;
-        this.curOffset = this.prevOffset;
-      }
-    }
-
-    /**
-     * Returns the length of the current record.
-     * You must call next() and it must return true before calling this method.
-     * @return the length in bytes of the current record.
-     */
-    public long getCurRecordLen() {
-      return curLen;
-    }
-
-    /**
-     * Returns the offset of the current record from the beginning of the file.
-     * You must call next() and it must return true before calling this method.
-     * @return the offset in bytes from the beginning of the file for the
-     * current record.
-     */
-    public long getCurRecordStart() {
-      return curOffset;
-    }
-  }
-
-  /**
-   * Describes an IndexSegment. This is one entry in the IndexTable. It
-   * holds the physical location of the IndexSegment in the file, as well
-   * as the range of entryIds and byte ranges corresponding to records
-   * described by the index subset in the IndexSegment.
-   */
-  private static class IndexTableEntry implements Writable {
-    private long segmentOffset;
-    private long firstIndexId;
-    private long firstIndexOffset;
-    private long lastIndexOffset;
-
-    public IndexTableEntry() {
-    }
-
-    public IndexTableEntry(DataInput in) throws IOException {
-      readFields(in);
-    }
-
-    private void setSegmentOffset(long offset) {
-      this.segmentOffset = offset;
-    }
-
-    private void setFirstIndexId(long id) {
-      this.firstIndexId = id;
-    }
-
-    private void setFirstIndexOffset(long offset) {
-      this.firstIndexOffset = offset;
-    }
-
-    private void setLastIndexOffset(long offset) {
-      this.lastIndexOffset = offset;
-    }
-
-    public void write(DataOutput out) throws IOException {
-      WritableUtils.writeVLong(out, segmentOffset);
-      WritableUtils.writeVLong(out, firstIndexId);
-      WritableUtils.writeVLong(out, firstIndexOffset);
-      WritableUtils.writeVLong(out, lastIndexOffset);
-    }
-
-    public void readFields(DataInput in) throws IOException {
-      segmentOffset = WritableUtils.readVLong(in);
-      firstIndexId = WritableUtils.readVLong(in);
-      firstIndexOffset = WritableUtils.readVLong(in);
-      lastIndexOffset = WritableUtils.readVLong(in);
-    }
-
-    /**
-     * @return the entryId of the first record indexed by this segment.
-     */
-    public long getFirstIndexId() {
-      return this.firstIndexId;
-    }
-
-    /**
-     * @return the offset of the first record indexed by this segment.
-     */
-    public long getFirstIndexOffset() {
-      return this.firstIndexOffset;
-    }
-
-    /**
-     * @return the offset of the last record indexed by this segment.
-     */
-    public long getLastIndexOffset() {
-      return this.lastIndexOffset;
-    }
-
-    /**
-     * @return the offset from the start of the file of the IndexSegment
-     * data itself.
-     */
-    public long getSegmentOffset() {
-      return this.segmentOffset;
-    }
-
-    /**
-     * Inform whether the user's requested offset corresponds
-     * to a record that starts in this IndexSegment. If this
-     * returns true, the requested offset may actually be in
-     * a previous IndexSegment.
-     * @param off the offset of the start of a record to test.
-     * @return true if the user's requested offset is in this
-     * or a previous IndexSegment.
-     */
-    public boolean containsOffset(long off) {
-      return off <= getLastIndexOffset();
-    }
-  }
-
-  /**
-   * Stores the locations and ranges indexed by each IndexSegment.
-   */
-  private static class IndexTable
-      implements Iterable<IndexTableEntry>, Writable {
-    private List<IndexTableEntry> tableEntries;
-
-    public IndexTable() {
-      tableEntries = new ArrayList<IndexTableEntry>();
-    }
-
-    public IndexTable(DataInput in) throws IOException {
-      readFields(in);
-    }
-
-    public void readFields(DataInput in) throws IOException {
-      long recordTypeId = WritableUtils.readVLong(in);
-      if (recordTypeId != INDEX_TABLE_ID) {
-        // We expected to read an IndexTable.
-        throw new IOException("Expected IndexTable; got record with typeId="
-            + recordTypeId);
-      }
-
-      int tableCount = WritableUtils.readVInt(in);
-
-      tableEntries = new ArrayList<IndexTableEntry>(tableCount);
-      for (int i = 0; i < tableCount; i++) {
-        tableEntries.add(new IndexTableEntry(in));
-      }
-    }
-
-    public void write(DataOutput out) throws IOException {
-      // Start with the record type id.
-      WritableUtils.writeVLong(out, INDEX_TABLE_ID);
-
-      // Then the count of the records.
-      WritableUtils.writeVInt(out, tableEntries.size());
-
-      // Followed by the table itself.
-      for (IndexTableEntry entry : tableEntries) {
-        entry.write(out);
-      }
-    }
-
-    public void add(IndexTableEntry entry) {
-      tableEntries.add(entry);
-    }
-
-    public IndexTableEntry get(int i) {
-      return tableEntries.get(i);
-    }
-
-    public int size() {
-      return tableEntries.size();
-    }
-
-    public Iterator<IndexTableEntry> iterator() {
-      return tableEntries.iterator();
-    }
-  }
-
-  /**
-   * Class that writes out a LobFile. Instantiate via LobFile.create().
-   */
-  public abstract static class Writer implements Closeable {
-
-    /**
-     * If this Writer is writing to a physical LobFile, then this returns
-     * the file path it is writing to. Otherwise it returns null.
-     * @return the fully-qualified path being written to by this writer.
-     */
-    public abstract Path getPath();
-
-    /**
-     * Finishes writing the LobFile and closes underlying handles.
-     */
-    public abstract void close() throws IOException;
-
-    @Override
-    protected synchronized void finalize() throws Throwable {
-      close();
-      super.finalize();
-    }
-
-    /**
-     * Terminates the current record and writes any trailing zero-padding
-     * required by the specified record size.
-     * This is implicitly called between consecutive writeBlobRecord() /
-     * writeClobRecord() calls.
-     */
-    public abstract void finishRecord() throws IOException;
-
-    /**
-     * Declares a new BLOB record to be written to the file.
-     * @param len the "claimed" number of bytes that will be written to
-     * this record. The actual number of bytes may differ.
-     */
-    public abstract OutputStream writeBlobRecord(long len) throws IOException;
-
-    /**
-     * Declares a new CLOB record to be written to the file.
-     * @param len the claimed number of characters that will be written to
-     * this record. The actual number of characters may differ.
-     */
-    public abstract java.io.Writer writeClobRecord(long len)
-        throws IOException;
-
-    /**
-     * Report the current position in the output file.
-     * @return the number of bytes written through this Writer.
-     */
-    public abstract long tell() throws IOException;
-
-    /**
-     * Checks whether an underlying stream is present or null.
-     * @param out the stream to check for null-ness.
-     * @throws IOException if out is null.
-     */
-    protected void checkForNull(OutputStream out) throws IOException {
-      if (null == out) {
-        throw new IOException("Writer has been closed.");
-      }
-    }
-  }
-
-  /**
-   * Concrete writer implementation for LobFile format version 0.
-   * Instantiate via LobFile.create().
-   */
-  private static class V0Writer extends Writer {
-    public static final Log LOG = LogFactory.getLog(
-        V0Writer.class.getName());
-
-    private Configuration conf;
-    private Path path;
-    private boolean isCharData;
-    private LobFileHeader header;
-
-    private String codecName;
-    private CompressionCodec codec;
-    private Compressor compressor;
-
-    // The LobIndex we are constructing.
-    private LinkedList<IndexSegment> indexSegments;
-    // Number of entries in the current IndexSegment.
-    private int entriesInSegment;
-    private IndexTable indexTable;
-
-    // Number of entries that can be written to a single IndexSegment.
-    private int maxEntriesPerSegment;
-
-    // By default we write this many entries per IndexSegment.
-    static final int DEFAULT_MAX_SEGMENT_ENTRIES = 4096;
-
-    // Our OutputStream to the underlying file.
-    private DataOutputStream out;
-
-    // 'out' is layered on top of this stream, which gives us a count
-    // of how much data we've written so far.
-    private CountingOutputStream countingOut;
-
-    // State regarding the current record being written.
-    private long curEntryId; // entryId of the current LOB being written.
-    private long curClaimedLen; // The user claims a length for a record.
-
-    // The user's OutputStream and/or Writer that writes to us.
-    private OutputStream userOutputStream;
-    private java.io.Writer userWriter;
-
-    // The userCountingOutputStream may be the same as userOutputStream;
-    // but if the user is writing through a compressor, it is actually
-    // underneath of it. This tells us how many compressed bytes were
-    // really written.
-    private CountingOutputStream userCountingOutputStream;
-
-    /**
-     * Creates a LobFile Writer for file format version 0.
-     * @param p the path to create.
-     * @param conf the configuration to use to interact with the filesystem.
-     * @param isCharData true if this is for CLOBs, false for BLOBs.
-     * @param codecName the compression codec to use (or null for none).
-     * @param entriesPerSegment the number of index entries per IndexSegment.
-     */
-    V0Writer(Path p, Configuration conf, boolean isCharData,
-        String codecName, int entriesPerSegment) throws IOException {
-
-      this.path = LobReaderCache.qualify(p, conf);
-      this.conf = conf;
-      this.isCharData = isCharData;
-      this.header = new LobFileHeader();
-      this.indexSegments = new LinkedList<IndexSegment>();
-      this.indexTable = new IndexTable();
-      this.maxEntriesPerSegment = entriesPerSegment;
-
-      this.codecName = codecName;
-      if (this.codecName != null) {
-        this.codec = CodecMap.getCodec(codecName, conf);
-        if (null != this.codec) {
-          this.compressor = codec.createCompressor();
-        }
-      }
-
-      init();
-    }
-
-    /**
-     * Open the file and write its header.
-     */
-    private void init() throws IOException {
-      FileSystem fs = this.path.getFileSystem(conf);
-      FSDataOutputStream fsOut = fs.create(this.path);
-      this.countingOut = new CountingOutputStream(
-          new BufferedOutputStream(fsOut));
-      this.out = new DataOutputStream(this.countingOut);
-
-      // put any necessary config strings into the header.
-      MetaBlock m = this.header.getMetaBlock();
-      if (isCharData) {
-        m.put(MetaBlock.ENTRY_ENCODING_KEY, MetaBlock.CLOB_ENCODING);
-      } else {
-        m.put(MetaBlock.ENTRY_ENCODING_KEY, MetaBlock.BLOB_ENCODING);
-      }
-
-      if (null != codec) {
-        m.put(MetaBlock.COMPRESSION_CODEC_KEY, this.codecName);
-      }
-
-      // Serialize the value of maxEntriesPerSegment as a VInt in a byte array
-      // and put that into the metablock as ENTRIES_PER_SEGMENT_KEY.
-      int segmentBufLen = WritableUtils.getVIntSize(this.maxEntriesPerSegment);
-      DataOutputBuffer entriesPerSegBuf = new DataOutputBuffer(segmentBufLen);
-      WritableUtils.writeVInt(entriesPerSegBuf, this.maxEntriesPerSegment);
-      byte [] entriesPerSegArray =
-          Arrays.copyOf(entriesPerSegBuf.getData(), segmentBufLen);
-      m.put(MetaBlock.ENTRIES_PER_SEGMENT_KEY,
-          new BytesWritable(entriesPerSegArray));
-
-      // Write the file header to the file.
-      this.header.write(out);
-
-      // Now we're ready to accept record data from the user.
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    public Path getPath() {
-      return this.path;
-    }
-
-    @Override
-    /**
-     * {@inheritDoc}
-     */
-    public long tell() throws IOException {
-      checkForNull(this.out);
-      this.out.flush();
-      return this.countingOut.getByteCount();
-    }
-
-    @Override
-    /**
-     * {@inheritDoc}
-     */
-    public void close() throws IOException {
-      finishRecord();
-      writeIndex();
-      if (this.out != null) {
-        this.out.close();
-        this.out = null;
-      }
-
-      if (this.countingOut != null) {
-        this.countingOut.close();
-        this.countingOut = null;
-      }
-    }
-
-    @Override
-    /**
-     * {@inheritDoc}
-     */
-    public void finishRecord() throws IOException {
-      if (null != this.userWriter) {
-        this.userWriter.close();
-        this.userWriter = null;
-      }
-
-      if (null != this.userCountingOutputStream) {
-
-        // If there is a wrapping stream for compression,
-        // close this first.
-        if (null != this.userOutputStream
-            && this.userOutputStream != this.userCountingOutputStream) {
-          this.userOutputStream.close();
-        }
-
-        // Now close the "main" stream.
-        this.userCountingOutputStream.close();
-
-        // Write the true length of the current record to the index.
-        updateIndex(this.userCountingOutputStream.getByteCount()
-            + RecordStartMark.START_MARK_LENGTH
-            + WritableUtils.getVIntSize(curEntryId)
-            + WritableUtils.getVIntSize(curClaimedLen));
-
-        this.userOutputStream = null;
-        this.userCountingOutputStream = null;
-      }
-
-      if (null != this.out) {
-        out.flush();
-      }
-    }
-
-    /**
-     * Write in the current IndexSegment, the true compressed length of the
-     * record we just finished writing.
-     * @param curRecordLen the true length in bytes of the compressed record.
-     */
-    private void updateIndex(long curRecordLen) throws IOException {
-      LOG.debug("Adding index entry: id=" + curEntryId
-          + "; len=" + curRecordLen);
-      indexSegments.getLast().addRecordLen(curRecordLen);
-      entriesInSegment++;
-      curEntryId++;
-    }
-
-    /**
-     * Write the index itself to the file.
-     */
-    private void writeIndex() throws IOException {
-
-      // Write out all the segments in turn.
-      // As we do so, reify their offsets into the IndexTable.
-      for (IndexSegment segment : indexSegments) {
-        long segmentOffset = tell();
-        segment.getTableEntry().setSegmentOffset(segmentOffset);
-
-        header.getStartMark().write(out);
-        segment.write(out);
-      }
-
-      long indexTableStartPos = tell(); // Save for the end of the file.
-      LOG.debug("IndexTable offset: " + indexTableStartPos);
-
-      header.getStartMark().write(out);
-      indexTable.write(out); // write the IndexTable record.
-
-      // Write the finale that tells us where the IndexTable begins.
-      header.getStartMark().write(out);
-      WritableUtils.writeVLong(out, SEGMENT_OFFSET_ID);
-      WritableUtils.writeVLong(out, indexTableStartPos);
-    }
-
-    /**
-     * Prepare to index a new record that will soon be written to the file.
-     * If this is is the first record in the current IndexSegment, we need
-     * to record its entryId and the current file position.
-     */
-    private void startRecordIndex() throws IOException {
-      if (entriesInSegment == maxEntriesPerSegment
-          || indexSegments.size() == 0) {
-        // The current segment is full. Start a new one.
-        this.entriesInSegment = 0;
-        IndexTableEntry tableEntry = new IndexTableEntry();
-        IndexSegment curSegment = new IndexSegment(tableEntry);
-        this.indexSegments.add(curSegment);
-
-        long filePos = tell();
-        LOG.debug("Starting IndexSegment; first id=" + curEntryId
-            + "; off=" + filePos);
-        tableEntry.setFirstIndexId(curEntryId);
-        tableEntry.setFirstIndexOffset(filePos);
-        tableEntry.setLastIndexOffset(filePos);
-        this.indexTable.add(tableEntry);
-      }
-    }
-
-    @Override
-    /**
-     * {@inheritDoc}
-     */
-    public OutputStream writeBlobRecord(long claimedLen) throws IOException {
-      finishRecord(); // finish any previous record.
-      checkForNull(this.out);
-      startRecordIndex();
-      this.header.getStartMark().write(out);
-      LOG.debug("Starting new record; id=" + curEntryId
-          + "; claimedLen=" + claimedLen);
-      WritableUtils.writeVLong(out, curEntryId);
-      WritableUtils.writeVLong(out, claimedLen);
-      this.curClaimedLen = claimedLen;
-      this.userCountingOutputStream = new CountingOutputStream(
-          new CloseShieldOutputStream(out));
-      if (null == this.codec) {
-        // No codec; pass thru the same OutputStream to the user.
-        this.userOutputStream = this.userCountingOutputStream;
-      } else {
-        // Wrap our CountingOutputStream in a compressing OutputStream to
-        // give to the user.
-        this.compressor.reset();
-        this.userOutputStream = new CompressorStream(
-            this.userCountingOutputStream, compressor);
-      }
-
-      return this.userOutputStream;
-    }
-
-    @Override
-    /**
-     * {@inheritDoc}
-     */
-    public java.io.Writer writeClobRecord(long len) throws IOException {
-      if (!isCharData) {
-        throw new IOException(
-            "Can only write CLOB data to a Clob-specific LobFile");
-      }
-
-      // Get a binary handle to the record and wrap it in a java.io.Writer.
-      writeBlobRecord(len);
-      this.userWriter = new OutputStreamWriter(userOutputStream);
-      return this.userWriter;
-    }
-  }
+  static final long INDEX_TABLE_ID = org.apache.sqoop.io.LobFile.INDEX_TABLE_ID;
 
   /**
-   * Class that can read a LobFile. Create with LobFile.open().
+   * @deprecated use org.apache.sqoop.io.LobFile.Writer
+   * @see org.apache.sqoop.io.LobFile.Writer
    */
-  public abstract static class Reader implements Closeable {
-    /**
-     * If this Reader is reading from a physical LobFile, then this returns
-     * the file path it is reading from. Otherwise it returns null.
-     * @return the fully-qualified path being read by this reader.
-     */
-    public abstract Path getPath();
-
-    /**
-     * Report the current position in the file. Note that the internal
-     * cursor may move in an unpredictable fashion; e.g., to fetch
-     * additional data from the index stored at the end of the file.
-     * Clients may be more interested in the getRecordOffset() method
-     * which returns the starting offset of the current record.
-     * @return the current offset from the start of the file in bytes.
-     */
-    public abstract long tell() throws IOException;
-
-    /**
-     * Move the file pointer to the first available full record beginning at
-     * position 'pos', relative to the start of the file.  After calling
-     * seek(), you will need to call next() to move to the record itself.
-     * @param pos the position to seek to or past.
-     */
-    public abstract void seek(long pos) throws IOException;
-
-    /**
-     * Advances to the next record in the file.
-     * @return true if another record exists, or false if the
-     * end of the file has been reached.
-     */
-    public abstract boolean next() throws IOException;
-
-    /**
-     * @return true if we have aligned the Reader (through a call to next())
-     * onto a record.
-     */
-    public abstract boolean isRecordAvailable();
-
-    /**
-     * Reports the length of the record to the user.
-     * If next() has not been called, or seek() has been called without
-     * a subsequent call to next(), or next() returned false, the return
-     * value of this method is undefined.
-     * @return the 'claimedLen' field of the current record. For
-     * character-based records, this is often in characters, not bytes.
-     * Records may have more bytes associated with them than are reported
-     * by this method, but never fewer.
-     */
-    public abstract long getRecordLen();
-
-    /**
-     * Return the entryId of the current record to the user.
-     * If next() has not been called, or seek() has been called without
-     * a subsequent call to next(), or next() returned false, the return
-     * value of this method is undefined.
-     * @return the 'entryId' field of the current record.
-     */
-    public abstract long getRecordId();
-
-    /**
-     * Return the byte offset at which the current record starts.
-     * If next() has not been called, or seek() has been called without
-     * a subsequent call to next(), or next() returned false, the return
-     * value of this method is undefined.
-     * @return the byte offset of the beginning of the current record.
-     */
-    public abstract long getRecordOffset();
-
-    /**
-     * @return an InputStream allowing the user to read the next binary
-     * record from the file.
-     */
-    public abstract InputStream readBlobRecord() throws IOException;
-
-    /**
-     * @return a java.io.Reader allowing the user to read the next character
-     * record from the file.
-     */
-    public abstract java.io.Reader readClobRecord() throws IOException;
-
-    /**
-     * Closes the reader.
-     */
-    public abstract void close() throws IOException;
-
-    /**
-     * Checks whether an underlying stream is present or null.
-     * @param in the stream to check for null-ness.
-     * @throws IOException if in is null.
-     */
-    protected void checkForNull(InputStream in) throws IOException {
-      if (null == in) {
-        throw new IOException("Reader has been closed.");
-      }
-    }
-
-    /**
-     * @return true if the Reader.close() method has been called.
-     */
-    public abstract boolean isClosed();
-
-    @Override
-    protected synchronized void finalize() throws Throwable {
-      close();
-      super.finalize();
-    }
+  public abstract static class Writer
+    extends org.apache.sqoop.io.LobFile.Writer {
   }
 
   /**
-   * Reader implementation for LobFile format version 0. Acquire with
-   * LobFile.open().
+   * @deprecated use org.apache.sqoop.io.LobFile.Reader instead.
+   * @see org.apache.sqoop.io.LobFile.Reader
    */
-  private static class V0Reader extends Reader {
-    public static final Log LOG = LogFactory.getLog(
-        V0Reader.class.getName());
-
-    // Forward seeks of up to this size are performed by reading, not seeking.
-    private static final long MAX_CONSUMPTION_WIDTH = 512 * 1024;
-
-    private LobFileHeader header;
-
-    private Configuration conf;
-
-    // Codec to use to decompress the file.
-    private CompressionCodec codec;
-    private Decompressor decompressor;
-
-    // Length of the entire file.
-    private long fileLen;
-
-    // State bit set to true after we've called next() and successfully
-    // aligned on a record. If true, we can hand an InputStream back to
-    // the user.
-    private boolean isAligned;
-
-    // After we've aligned on a record, this contains the record's
-    // reported length. In the presence of compression, etc, this may
-    // not represent its true length in the file.
-    private long claimedRecordLen;
-
-    // After we've aligned on a record, this contains its entryId.
-    private long curEntryId;
-
-    // After we've aligned on a record, this contains the offset of the
-    // beginning of its RSM from the start of the file.
-    private long curRecordOffset;
-
-    // After we've aligned on a record, this contains the record's
-    // true length from the index.
-    private long indexRecordLen;
-
-    // tmp buffer used to consume RecordStartMarks during alignment.
-    private byte [] tmpRsmBuf;
-
-    // The actual file stream itself, which we can move around (e.g. with
-    // seeking).
-    private FSDataInputStream underlyingInput;
-
-    // The data deserializer we typically place on top of this.
-    // If we use underlyingInput.seek(), then we instantiate a new
-    // dataIn on top of it.
-    private DataInputStream dataIn;
-
-    // The user accesses the current record through a stream memoized here.
-    // We retain a pointer here so that we can forcibly close the old
-    // userInputStream when they want to align on the next record.
-    private InputStream userInputStream;
-
-    // The current index segment to read record lengths from.
-    private IndexSegment curIndexSegment;
-
-    // The offset into the indexTable of the curIndexSegment.
-    private int curIndexSegmentId;
-
-    // The IndexTable that provides fast pointers to the IndexSegments.
-    private IndexTable indexTable;
-
-    // The path being opened.
-    private Path path;
-
-    // Users should use LobFile.open() instead of directly calling this.
-    V0Reader(Path path, Configuration conf, LobFileHeader header,
-        DataInputStream dis, FSDataInputStream stream, long fileLen)
-        throws IOException {
-      this.path = LobReaderCache.qualify(path, conf);
-      this.conf = conf;
-      this.header = header;
-      this.dataIn = dis;
-      this.underlyingInput = stream;
-      this.isAligned = false;
-      this.tmpRsmBuf = new byte[RecordStartMark.START_MARK_LENGTH];
-      this.fileLen = fileLen;
-      LOG.debug("Opening LobFile path: " + path);
-      openCodec();
-      openIndex();
-    }
-
-    /**
-     * If the user has specified a compression codec in the header metadata,
-     * create an instance of it.
-     */
-    private void openCodec() throws IOException {
-      String codecName = header.getMetaBlock().getString(
-          MetaBlock.COMPRESSION_CODEC_KEY);
-      if (null != codecName) {
-        LOG.debug("Decompressing file with codec: " + codecName);
-        this.codec = CodecMap.getCodec(codecName, conf);
-        if (null != this.codec) {
-          this.decompressor = codec.createDecompressor();
-        }
-      }
-    }
-
-    /**
-     * Get the first index segment out of the file; determine
-     * where that is by loading the index locator at the end of
-     * the file.
-     */
-    private void openIndex() throws IOException {
-      // Jump to the end of the file.
-      // At the end of the file is a RSM followed by two VLongs;
-      // the first of these is the value -2 (one byte) and the
-      // second of these is the offset of the beginning of the index (up to
-      // 9 bytes).
-      internalSeek(fileLen - RecordStartMark.START_MARK_LENGTH - 10);
-
-      byte [] finaleBuffer = new byte[RecordStartMark.START_MARK_LENGTH + 10];
-      this.dataIn.readFully(finaleBuffer);
-
-      // Figure out where in the finaleBuffer the RSM actually starts,
-      // as the finale might not fully fill the finaleBuffer.
-      int rsmStart = findRecordStartMark(finaleBuffer);
-      if (-1 == rsmStart) {
-        throw new IOException(
-            "Corrupt file index; could not find index start offset.");
-      }
-
-      // Wrap a buffer around those two vlongs.
-      int vlongStart = rsmStart + RecordStartMark.START_MARK_LENGTH;
-      DataInputBuffer inBuf = new DataInputBuffer();
-      inBuf.reset(finaleBuffer, vlongStart, finaleBuffer.length - vlongStart);
-
-      long offsetMarker = WritableUtils.readVLong(inBuf);
-      if (SEGMENT_OFFSET_ID != offsetMarker) {
-        // This isn't the correct signature; we got an RSM ahead of some
-        // other data.
-        throw new IOException("Invalid segment offset id: " + offsetMarker);
-      }
-
-      // This will contain the position of the IndexTable.
-      long indexTableStart = WritableUtils.readVLong(inBuf);
-      LOG.debug("IndexTable begins at " + indexTableStart);
-
-      readIndexTable(indexTableStart);
-
-      // Set up to read records from the beginning of the file. This
-      // starts with the first IndexSegment.
-      curIndexSegmentId = 0;
-      loadIndexSegment();
-
-      // This has moved the file pointer all over but we don't need to
-      // worry about resetting it now. The next() method will seek the
-      // file pointer to the first record when the user is ready to
-      // consume it.
-    }
-
-    /**
-     * Load the entire IndexTable into memory and decode it.
-     */
-    private void readIndexTable(long indexTableOffset) throws IOException {
-      internalSeek(indexTableOffset);
-
-      // Read the RecordStartMark ahead of the IndexTable.
-      this.dataIn.readFully(tmpRsmBuf);
-      if (!matchesRsm(tmpRsmBuf)) {
-        throw new IOException("Expected record start mark before IndexTable");
-      }
-
-      this.indexTable = new IndexTable(dataIn);
-    }
-
-    /**
-     * Ingest the next IndexSegment.
-     */
-    private void readNextIndexSegment() throws IOException {
-      this.curIndexSegmentId++;
-      loadIndexSegment();
-    }
-
-    /**
-     * Load curIndexSegment with the segment specified by curIndexSegmentId.
-     * The file pointer will be moved to the position after this segment.
-     * If the segment id does not exist, then the curIndexSegment will be
-     * set to null.
-     */
-    private void loadIndexSegment() throws IOException {
-      if (indexTable.size() <= curIndexSegmentId || curIndexSegmentId < 0) {
-        // We've iterated past the last IndexSegment. Set this to null
-        // and return; the next() method will then return false.
-        this.curIndexSegment = null;
-        return;
-      }
-
-      // Otherwise, seek to the segment and load it.
-      IndexTableEntry tableEntry = indexTable.get(curIndexSegmentId);
-      long segmentOffset = tableEntry.getSegmentOffset();
-      internalSeek(segmentOffset);
-      readPositionedIndexSegment();
-    }
-
-    /**
-     * When the underlying stream is aligned on the RecordStartMark
-     * ahead of an IndexSegment, read in the next IndexSegment.
-     * After this method the curIndexSegment contains the next
-     * IndexSegment to read in the file; if the entire index has been
-     * read in this fastion, curIndexSegment will be null.
-     */
-    private void readPositionedIndexSegment() throws IOException {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Reading index segment at " + tell());
-      }
-
-      // Read the RecordStartMark ahead of the IndexSegment.
-      this.dataIn.readFully(tmpRsmBuf);
-      if (!matchesRsm(tmpRsmBuf)) {
-        throw new IOException("Expected record start mark before IndexSegment");
-      }
-
-      // Read the IndexSegment proper.
-      this.curIndexSegment = new IndexSegment(
-          this.indexTable.get(curIndexSegmentId), this.dataIn);
-    }
-
-    /**
-     * @return true if the bytes in 'buf' starting at 'offset' match
-     * the RecordStartMark.
-     * @param rsm the RecordStartMark
-     * @param buf the buffer to check
-     * @param offset the offset into buf to begin checking.
-     */
-    private boolean matchesRsm(byte [] rsm, byte [] buf, int offset) {
-      for (int i = 0; i < RecordStartMark.START_MARK_LENGTH; i++) {
-        if (buf[i + offset] != rsm[i]) {
-          return false; // Mismatch at position i.
-        }
-      }
-
-      return true; // Matched the whole thing.
-    }
-
-    private boolean matchesRsm(byte [] buf, int offset) {
-      return matchesRsm(this.header.getStartMark().getBytes(),
-          buf, offset);
-    }
-
-    private boolean matchesRsm(byte [] buf) {
-      return matchesRsm(buf, 0);
-    }
-
-    /**
-     * @return the offset in 'buf' where a RecordStartMark begins, or -1
-     * if the RecordStartMark is not present in the buffer.
-     */
-    private int findRecordStartMark(byte [] buf) {
-      byte [] rsm = this.header.getStartMark().getBytes();
-
-      for (int i = 0; i < buf.length; i++) {
-        if (matchesRsm(rsm, buf, i)) {
-          return i;
-        }
-      }
-
-      return -1; // couldn't find it.
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    public Path getPath() {
-      return this.path;
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    public long tell() throws IOException {
-      checkForNull(this.underlyingInput);
-      return this.underlyingInput.getPos();
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    public void seek(long pos) throws IOException {
-      closeUserStream();
-      checkForNull(this.underlyingInput);
-      this.isAligned = false;
-      searchForRecord(pos);
-    }
-
-    /**
-     * Search the index for the first record starting on or after 'start'.
-     * @param start the offset in the file where we should start looking
-     * for a record.
-     */
-    private void searchForRecord(long start) throws IOException {
-      LOG.debug("Looking for the first record at/after offset " + start);
-
-      // Scan through the IndexTable until we find the IndexSegment
-      // that contains the offset.
-      for (int i = 0; i < indexTable.size(); i++) {
-        IndexTableEntry tableEntry = indexTable.get(i);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Checking index table entry for range: "
-              + tableEntry.getFirstIndexOffset() + ", "
-              + tableEntry.getLastIndexOffset());
-        }
-
-        if (tableEntry.containsOffset(start)) {
-          // Seek to the IndexSegment associated with this tableEntry.
-          curIndexSegmentId = i;
-          loadIndexSegment();
-
-          // Use this index segment. The record index iterator
-          // is at the beginning of the IndexSegment, since we just
-          // read it in.
-          LOG.debug("Found matching index segment.");
-          while (this.curIndexSegment.next()) {
-            long curStart = this.curIndexSegment.getCurRecordStart();
-            if (curStart >= start) {
-              LOG.debug("Found seek target record with offset " + curStart);
-              // This is the first record to meet this criterion.
-              // Rewind the index iterator by one so that the next()
-              // method will do the right thing. next() will also
-              // take care of actually seeking to the correct position
-              // in the file to read the record proper.
-              this.curIndexSegment.rewindOnce();
-              return;
-            }
-          }
-
-          // If it wasn't actually in this IndexSegment, then we've
-          // got a corrupt IndexTableEntry; the entry represented that
-          // the segment ran longer than it actually does.
-          throw new IOException("IndexTableEntry claims last offset of "
-              + tableEntry.getLastIndexOffset()
-              + " but IndexSegment ends early."
-              + " The IndexTable appears corrupt.");
-        }
-      }
-
-      // If we didn't return inside the loop, then we've searched the entire
-      // file and it's not there. Advance the IndexSegment iterator to
-      // the end of the road so that next() returns false.
-      this.curIndexSegmentId = indexTable.size();
-      loadIndexSegment();
-    }
-
-    /**
-     * Read data from the stream and discard it.
-     * @param numBytes number of bytes to read and discard.
-     */
-    private void consumeBytes(int numBytes) throws IOException {
-      int remaining = numBytes;
-      while (remaining > 0) {
-        int received = dataIn.skipBytes(remaining);
-        if (received < 1) {
-          throw new IOException("Could not consume additional bytes");
-        }
-        remaining -= received;
-      }
-    }
-
-    /**
-     * Seek to position 'pos' (offset from start of file). If this
-     * is nearby, actually just consume data from the underlying
-     * stream rather than doing a real seek.
-     * @param targetPos the position to seek to, expressed as an offset
-     * from the start of the file.
-     */
-    private void internalSeek(long targetPos) throws IOException {
-      long curPos = this.underlyingInput.getPos();
-      LOG.debug("Internal seek: target=" + targetPos + "; cur=" + curPos);
-      long distance = targetPos - curPos;
-      if (targetPos == curPos) {
-        LOG.debug("(no motion required)");
-        return; // We're already there!
-      } else if (targetPos > curPos && distance < MAX_CONSUMPTION_WIDTH) {
-        // We're "close enough" that we should just read it.
-        LOG.debug("Advancing by " + distance + " bytes.");
-        consumeBytes((int) distance);
-      } else {
-        LOG.debug("Direct seek to target");
-        this.underlyingInput.seek(targetPos);
-        this.dataIn = new DataInputStream(this.underlyingInput);
-      }
-    }
-
-    /**
-     * Close any stream to an open record that was opened by a user.
-     */
-    private void closeUserStream() throws IOException {
-      if (this.userInputStream != null) {
-        this.userInputStream.close();
-        this.userInputStream = null;
-      }
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    public boolean next() throws IOException {
-      LOG.debug("Checking for next record");
-      checkForNull(this.underlyingInput);
-      // If the user has opened a record stream, it is now void.
-      closeUserStream();
-      this.isAligned = false; // false until proven true.
-
-      // Get the position of the next record start.
-      // Check the index: is there another record?
-      if (null == curIndexSegment) {
-        LOG.debug("Index is finished; false");
-        return false; // No index remains. Ergo, no more records.
-      }
-      boolean moreInSegment = curIndexSegment.next();
-      if (!moreInSegment) {
-        // The current IndexSegment has been exhausted. Move to the next.
-        LOG.debug("Loading next index segment.");
-        readNextIndexSegment();
-        if (null == curIndexSegment) {
-          LOG.debug("Index is finished; false");
-          return false; // No index; no records.
-        }
-
-        // Try again with the next IndexSegment.
-        moreInSegment = curIndexSegment.next();
-      }
-
-      if (!moreInSegment) {
-        // Nothing left in the last IndexSegment.
-        LOG.debug("Last index segment is finished; false.");
-        this.curIndexSegment = null;
-        return false;
-      }
-
-      // Determine where the next record starts.
-      this.indexRecordLen = this.curIndexSegment.getCurRecordLen();
-      this.curRecordOffset = this.curIndexSegment.getCurRecordStart();
-
-      LOG.debug("Next record starts at position: " + this.curRecordOffset
-          + "; indexedLen=" + this.indexRecordLen);
-
-      // Make sure we're at the target position.
-      internalSeek(this.curRecordOffset);
-
-      // We are now on top of the next record's RecordStartMark.
-      // Consume the RSM and the record header.
-      this.dataIn.readFully(this.tmpRsmBuf);
-      if (!matchesRsm(tmpRsmBuf)) {
-        // No rsm? No dice.
-        throw new IOException("Index contains bogus offset.");
-      }
-
-      this.curEntryId = WritableUtils.readVLong(this.dataIn);
-      if (this.curEntryId < 0) {
-        // We've moved past the end of the records and started
-        // trying to consume the index. This is the EOF from
-        // the client's perspective.
-        LOG.debug("Indexed position is itself an IndexSegment; false.");
-        return false;
-      }
-      LOG.debug("Aligned on record id=" + this.curEntryId);
-
-      this.claimedRecordLen = WritableUtils.readVLong(this.dataIn);
-      LOG.debug("Record has claimed length " + this.claimedRecordLen);
-      // We are now aligned on the start of the user's data.
-      this.isAligned = true;
-      return true;
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    public boolean isRecordAvailable() {
-      return this.isAligned;
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    public long getRecordLen() {
-      return this.claimedRecordLen;
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    public long getRecordId() {
-      return this.curEntryId;
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    public long getRecordOffset() {
-      return this.curRecordOffset;
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    public InputStream readBlobRecord() throws IOException {
-      if (!isRecordAvailable()) {
-        // we're not currently aligned on a record-start.
-        // Try to get the next one.
-        if (!next()) {
-          // No more records available.
-          throw new EOFException("End of file reached.");
-        }
-      }
-
-      // Ensure any previously-open user record stream is closed.
-      closeUserStream();
-
-      // Mark this record as consumed.
-      this.isAligned = false;
-
-      // The length of the stream we can return to the user is
-      // the indexRecordLen minus the length of any per-record headers.
-      // That includes the RecordStartMark, the entryId, and the claimedLen.
-      long streamLen = this.indexRecordLen - RecordStartMark.START_MARK_LENGTH
-          - WritableUtils.getVIntSize(this.curEntryId)
-          - WritableUtils.getVIntSize(this.claimedRecordLen);
-      LOG.debug("Yielding stream to user with length " + streamLen);
-      this.userInputStream = new FixedLengthInputStream(this.dataIn, streamLen);
-      if (this.codec != null) {
-        // The user needs to decompress the data; wrap the InputStream.
-        decompressor.reset();
-        this.userInputStream = new DecompressorStream(
-            this.userInputStream, decompressor);
-      }
-      return this.userInputStream;
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    public java.io.Reader readClobRecord() throws IOException {
-      // Get a handle to the binary reader and then wrap it.
-      InputStream is = readBlobRecord();
-      return new InputStreamReader(is);
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    public void close() throws IOException {
-      closeUserStream();
-
-      if (null != dataIn) {
-        dataIn.close();
-        dataIn = null;
-      }
-
-      if (null != underlyingInput) {
-        underlyingInput.close();
-        underlyingInput = null;
-      }
-
-      this.isAligned = false;
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    public boolean isClosed() {
-      return this.underlyingInput == null;
-    }
+  public abstract static class Reader
+    extends org.apache.sqoop.io.LobFile.Reader {
   }
 
   /**
    * Creates a LobFile Reader configured to read from the specified file.
    */
   public static Reader open(Path p, Configuration conf) throws IOException {
-    FileSystem fs = p.getFileSystem(conf);
-    FileStatus [] stats = fs.listStatus(p);
-    if (null == stats || stats.length == 0) {
-      throw new IOException("Could not find file: " + p);
-    }
-    FSDataInputStream fis = fs.open(p);
-    DataInputStream dis = new DataInputStream(fis);
-    LobFileHeader header = new LobFileHeader(dis);
-    int version = header.getVersion();
-
-    if (version == 0) {
-      return new V0Reader(p, conf, header, dis, fis, stats[0].getLen());
-    } else {
-      throw new IOException("No reader available for LobFile version "
-          + version);
-    }
+    return org.apache.sqoop.io.LobFile.open(p, conf);
   }
 
   /**
@@ -1775,7 +87,7 @@ public final class LobFile {
    * @param conf the configuration to use to interact with the filesystem.
    */
   public static Writer create(Path p, Configuration conf) throws IOException {
-    return create(p, conf, false);
+    return org.apache.sqoop.io.LobFile.create(p, conf, false);
   }
 
   /**
@@ -1786,7 +98,7 @@ public final class LobFile {
    */
   public static Writer create(Path p, Configuration conf, boolean isCharData)
       throws IOException {
-    return create(p, conf, isCharData, null);
+    return org.apache.sqoop.io.LobFile.create(p, conf, isCharData, null);
   }
 
   /**
@@ -1798,8 +110,7 @@ public final class LobFile {
    */
   public static Writer create(Path p, Configuration conf, boolean isCharData,
       String codec) throws IOException {
-    return create(p, conf, isCharData, codec,
-        V0Writer.DEFAULT_MAX_SEGMENT_ENTRIES);
+    return org.apache.sqoop.io.LobFile.create(p, conf, isCharData, codec);
   }
 
   /**
@@ -1813,7 +124,8 @@ public final class LobFile {
   public static Writer create(Path p, Configuration conf, boolean isCharData,
       String codec, int entriesPerSegment)
       throws IOException {
-    return new V0Writer(p, conf, isCharData, codec, entriesPerSegment);
+    return org.apache.sqoop.io.LobFile.create(
+        p, conf, isCharData, codec, entriesPerSegment);
   }
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/LobReaderCache.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/LobReaderCache.java?rev=1190430&r1=1190429&r2=1190430&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/LobReaderCache.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/LobReaderCache.java Fri Oct 28 16:32:43 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,19 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-
 package com.cloudera.sqoop.io;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 
 /**
  * A cache of open LobFile.Reader objects.
@@ -38,17 +30,13 @@ import org.apache.hadoop.conf.Configurat
  * instances, it is most useful to have a single global cache. This cache is
  * internally synchronized; only one thread can insert or retrieve a reader
  * from the cache at a time.
+ *
+ * @deprecated use org.apache.sqoop.io.LobReaderCache instead.
+ * @see org.apache.sqoop.io.LobReaderCache
  */
-public final class LobReaderCache {
+public final class LobReaderCache extends org.apache.sqoop.io.LobReaderCache {
 
-  public static final Log LOG = LogFactory.getLog(
-      LobReaderCache.class.getName());
-
-  private Map<Path, LobFile.Reader> readerMap;
-
-  private LobReaderCache() {
-    this.readerMap = new TreeMap<Path, LobFile.Reader>();
-  }
+  public static final Log LOG = org.apache.sqoop.io.LobReaderCache.LOG;
 
   private static final LobReaderCache CACHE;
   static {
@@ -71,79 +59,7 @@ public final class LobReaderCache {
    */
   public static Path qualify(Path path, Configuration conf)
       throws IOException {
-    if (null == path) {
-      return null;
-    }
-
-    FileSystem fs = path.getFileSystem(conf);
-    if (null == fs) {
-      fs = FileSystem.get(conf);
-    }
-    return path.makeQualified(fs);
-  }
-
-  /**
-   * Open a LobFile for read access, returning a cached reader if one is
-   * available, or a new reader otherwise.
-   * @param path the path to the LobFile to open
-   * @param conf the configuration to use to access the FS.
-   * @throws IOException if there's an error opening the file.
-   */
-  public LobFile.Reader get(Path path, Configuration conf)
-      throws IOException {
-
-    LobFile.Reader reader = null;
-    Path canonicalPath = qualify(path, conf);
-    // Look up an entry in the cache.
-    synchronized(this) {
-      reader = readerMap.remove(canonicalPath);
-    }
-
-    if (null != reader && !reader.isClosed()) {
-      // Cache hit. return it.
-      LOG.debug("Using cached reader for " + canonicalPath);
-      return reader;
-    }
-
-    // Cache miss; open the file.
-    LOG.debug("No cached reader available for " + canonicalPath);
-    return LobFile.open(path, conf);
-  }
-
-  /**
-   * Return a reader back to the cache. If there's already a reader for
-   * this path, then the current reader is closed.
-   * @param reader the opened reader. Any record-specific subreaders should be
-   * closed.
-   * @throws IOException if there's an error accessing the path's filesystem.
-   */
-  public void recycle(LobFile.Reader reader) throws IOException {
-    Path canonicalPath = reader.getPath();
-
-    // Check if the cache has a reader for this path already. If not, add this.
-    boolean cached = false;
-    synchronized(this) {
-      if (readerMap.get(canonicalPath) == null) {
-        LOG.debug("Caching reader for path: " + canonicalPath);
-        readerMap.put(canonicalPath, reader);
-        cached = true;
-      }
-    }
-
-    if (!cached) {
-      LOG.debug("Reader already present for path: " + canonicalPath
-          + "; closing.");
-      reader.close();
-    }
-  }
-
-  @Override
-  protected synchronized void finalize() throws Throwable {
-    for (LobFile.Reader r : readerMap.values()) {
-      r.close();
-    }
-
-    super.finalize();
+    return org.apache.sqoop.io.LobReaderCache.qualify(path, conf);
   }
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/NamedFifo.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/NamedFifo.java?rev=1190430&r1=1190429&r2=1190430&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/NamedFifo.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/NamedFifo.java Fri Oct 28 16:32:43 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,82 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package com.cloudera.sqoop.io;
 
 import java.io.File;
-import java.io.IOException;
-
-import org.apache.hadoop.util.Shell;
-import org.apache.log4j.Logger;
 
 /**
  * A named FIFO channel.
+ *
+ * @deprecated use org.apache.sqoop.io.NamedFifo instead.
+ * @see org.apache.sqoop.io.NamedFifo
  */
-public class NamedFifo {
+public class NamedFifo extends org.apache.sqoop.io.NamedFifo {
 
-  private static final Logger LOG = Logger.getLogger(NamedFifo.class);
-
-  private File fifoFile;
-
-  /** Create a named FIFO object at the local fs path given by 'pathname'. */
   public NamedFifo(String pathname) {
-    this.fifoFile = new File(pathname);
+    super(pathname);
   }
 
-  /** Create a named FIFO object at the local fs path given by the 'fifo' File
-   * object. */
   public NamedFifo(File fifo) {
-    this.fifoFile = fifo;
-  }
-
-  /**
-   * Return the File object representing the FIFO.
-   */
-  public File getFile() {
-    return this.fifoFile;
-  }
-
-  /**
-   * Create a named FIFO object.
-   * The pipe will be created with permissions 0600.
-   * @throws IOException on failure.
-   */
-  public void create() throws IOException {
-    create(0600);
-  }
-
-  /**
-   * Create a named FIFO object with the specified fs permissions.
-   * This depends on the 'mknod' or 'mkfifo' (Mac OS X) system utility
-   * existing. (for example, provided by Linux coreutils). This object
-   * will be deleted when the process exits.
-   * @throws IOException on failure.
-   */
-  public void create(int permissions) throws IOException {
-    String filename = fifoFile.toString();
-
-    // Format permissions as a mode string in base 8.
-    String modeStr = Integer.toString(permissions, 8);
-
-    // Create the FIFO itself.
-    try {
-      String output = Shell.execCommand("mknod", "--mode=0" + modeStr,
-          filename, "p");
-      LOG.info("mknod output:\n"+output);
-    } catch (IOException ex) {
-      LOG.info("IO error running mknod: " + ex.getMessage());
-      LOG.debug("IO error running mknod", ex);
-    }
-    if (!this.fifoFile.exists()) {
-      LOG.info("mknod failed, falling back to mkfifo");
-      String output = Shell.execCommand("mkfifo", "-m", "0" + modeStr,
-          filename);
-      LOG.info("mkfifo output:\n"+output);
-    }
-
-    // Schedule the FIFO to be cleaned up when we exit.
-    this.fifoFile.deleteOnExit();
+    super(fifo);
   }
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/SplittableBufferedWriter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/SplittableBufferedWriter.java?rev=1190430&r1=1190429&r2=1190430&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/SplittableBufferedWriter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/SplittableBufferedWriter.java Fri Oct 28 16:32:43 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,56 +18,27 @@
 
 package com.cloudera.sqoop.io;
 
-import java.io.BufferedWriter;
-import java.io.OutputStreamWriter;
-import java.io.IOException;
+import org.apache.sqoop.io.SplittingOutputStream;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 /**
  * A BufferedWriter implementation that wraps around a SplittingOutputStream
  * and allows splitting of the underlying stream.
  * Splits occur at allowSplit() calls, or newLine() calls.
+ *
+ * @deprecated use org.apache.sqoop.io.SplittableBufferedWriter instead.
+ * @see org.apache.sqoop.io.SplittableBufferedWriter
  */
-public class SplittableBufferedWriter extends BufferedWriter {
-
-  public static final Log LOG = LogFactory.getLog(
-      SplittableBufferedWriter.class.getName());
-
-  private SplittingOutputStream splitOutputStream;
-  private boolean alwaysFlush;
+public class SplittableBufferedWriter
+  extends org.apache.sqoop.io.SplittableBufferedWriter {
 
   public SplittableBufferedWriter(
       final SplittingOutputStream splitOutputStream) {
-    super(new OutputStreamWriter(splitOutputStream));
-
-    this.splitOutputStream = splitOutputStream;
-    this.alwaysFlush = false;
+    super(splitOutputStream);
   }
 
-  /** For testing. */
   SplittableBufferedWriter(final SplittingOutputStream splitOutputStream,
       final boolean alwaysFlush) {
-    super(new OutputStreamWriter(splitOutputStream));
-
-    this.splitOutputStream = splitOutputStream;
-    this.alwaysFlush = alwaysFlush;
-  }
-
-  public void newLine() throws IOException {
-    super.newLine();
-    this.allowSplit();
-  }
-
-  public void allowSplit() throws IOException {
-    if (alwaysFlush) {
-      this.flush();
-    }
-    if (this.splitOutputStream.wouldSplit()) {
-      LOG.debug("Starting new split");
-      this.flush();
-      this.splitOutputStream.allowSplit();
-    }
+    super(splitOutputStream, alwaysFlush);
   }
 }



Mime
View raw message