sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject svn commit: r1190430 [3/5] - in /incubator/sqoop/trunk: ./ src/java/com/cloudera/sqoop/io/ src/java/com/cloudera/sqoop/lib/ src/java/org/apache/sqoop/io/ src/java/org/apache/sqoop/lib/
Date Fri, 28 Oct 2011 16:32:45 GMT
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/io/CodecMap.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/io/CodecMap.java?rev=1190430&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/io/CodecMap.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/io/CodecMap.java Fri Oct 28 16:32:43 2011
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.io;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Provides a mapping from codec names to concrete implementation class names.
+ */
+public final class CodecMap {
+
+  // Supported codec map values
+  // Note: do not add more values here, since codecs are discovered using the
+  // standard Hadoop mechanism (io.compression.codecs). See
+  // CompressionCodecFactory.
+  public static final String NONE = "none";
+  public static final String DEFLATE = "deflate";
+  public static final String LZO = "lzo";
+  public static final String LZOP = "lzop";
+
+  private static Map<String, String> codecNames;
+  static {
+    codecNames = new TreeMap<String, String>();
+
+    // Register the names of codecs we know about.
+    codecNames.put(NONE,    null);
+    codecNames.put(DEFLATE, "org.apache.hadoop.io.compress.DefaultCodec");
+    codecNames.put(LZO,     "com.hadoop.compression.lzo.LzoCodec");
+    codecNames.put(LZOP,     "com.hadoop.compression.lzo.LzopCodec");
+
+    // add more from Hadoop CompressionCodecFactory
+    for (Class<? extends CompressionCodec> cls
+        : CompressionCodecFactory.getCodecClasses(new Configuration())) {
+      String simpleName = cls.getSimpleName();
+      String codecName = simpleName;
+      if (simpleName.endsWith("Codec")) {
+        codecName = simpleName.substring(0, simpleName.length()
+            - "Codec".length());
+      }
+      codecNames.put(codecName.toLowerCase(), cls.getCanonicalName());
+    }
+  }
+
+  private CodecMap() {
+  }
+
+  /**
+   * Given a codec name, return the name of the concrete class
+   * that implements it (or 'null' in the case of the "none" codec).
+   * @throws com.cloudera.sqoop.io.UnsupportedCodecException if a codec cannot
+   * be found with the supplied name.
+   */
+  public static String getCodecClassName(String codecName)
+      throws com.cloudera.sqoop.io.UnsupportedCodecException {
+    if (!codecNames.containsKey(codecName)) {
+      throw new com.cloudera.sqoop.io.UnsupportedCodecException(codecName);
+    }
+
+    return codecNames.get(codecName);
+  }
+
+  /**
+   * Given a codec name, instantiate the concrete implementation
+   * class that implements it.
+   * @throws com.cloudera.sqoop.io.UnsupportedCodecException if a codec cannot
+   * be found with the supplied name.
+   */
+  public static CompressionCodec getCodec(String codecName,
+    Configuration conf) throws com.cloudera.sqoop.io.UnsupportedCodecException {
+    // Try standard Hadoop mechanism first
+    CompressionCodec codec = getCodecByName(codecName, conf);
+    if (codec != null) {
+      return codec;
+    }
+    // Fall back to Sqoop mechanism
+    String codecClassName = null;
+    try {
+      codecClassName = getCodecClassName(codecName);
+      if (null == codecClassName) {
+        return null;
+      }
+      Class<? extends CompressionCodec> codecClass =
+          (Class<? extends CompressionCodec>)
+          conf.getClassByName(codecClassName);
+      return (CompressionCodec) ReflectionUtils.newInstance(
+          codecClass, conf);
+    } catch (ClassNotFoundException cnfe) {
+      throw new com.cloudera.sqoop.io.UnsupportedCodecException(
+          "Cannot find codec class "
+          + codecClassName + " for codec " + codecName);
+    }
+  }
+
+  /**
+   * Return the set of available codec names.
+   */
+  public static Set<String> getCodecNames() {
+    return codecNames.keySet();
+  }
+
+  /**
+   * Find the relevant compression codec for the codec's canonical class name
+   * or by codec alias.
+   * <p>
+   * Codec aliases are case insensitive.
+   * <p>
+   * The code alias is the short class name (without the package name).
+   * If the short class name ends with 'Codec', then there are two aliases for
+   * the codec, the complete short class name and the short class name without
+   * the 'Codec' ending. For example for the 'GzipCodec' codec class name the
+   * alias are 'gzip' and 'gzipcodec'.
+   * <p>
+   * Note: When HADOOP-7323 is available this method can be replaced with a call
+   * to CompressionCodecFactory.
+   * @param classname the canonical class name of the codec or the codec alias
+   * @return the codec object or null if none matching the name were found
+   */
+  private static CompressionCodec getCodecByName(String codecName,
+      Configuration conf) {
+    List<Class<? extends CompressionCodec>> codecs =
+      CompressionCodecFactory.getCodecClasses(conf);
+    for (Class<? extends CompressionCodec> cls : codecs) {
+      if (codecMatches(cls, codecName)) {
+        return ReflectionUtils.newInstance(cls, conf);
+      }
+    }
+    return null;
+  }
+
+
+  private static boolean codecMatches(Class<? extends CompressionCodec> cls,
+      String codecName) {
+    String simpleName = cls.getSimpleName();
+    if (cls.getName().equals(codecName)
+        || simpleName.equalsIgnoreCase(codecName)) {
+      return true;
+    }
+    if (simpleName.endsWith("Codec")) {
+      String prefix = simpleName.substring(0, simpleName.length()
+          - "Codec".length());
+      if (prefix.equalsIgnoreCase(codecName)) {
+        return true;
+      }
+    }
+    return false;
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/io/CodecMap.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/io/FixedLengthInputStream.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/io/FixedLengthInputStream.java?rev=1190430&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/io/FixedLengthInputStream.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/io/FixedLengthInputStream.java Fri Oct 28 16:32:43 2011
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.io.input.CloseShieldInputStream;
+import org.apache.commons.io.input.CountingInputStream;
+import org.apache.commons.io.input.ProxyInputStream;
+
+/**
+ * Provides an InputStream that can consume a fixed maximum number of bytes
+ * from an underlying stream. Closing the FixedLengthInputStream does not
+ * close the underlying stream. After reading the maximum number of available
+ * bytes this acts as though EOF has been reached.
+ */
+public class FixedLengthInputStream extends ProxyInputStream {
+
+  private CountingInputStream countingIn;
+  private long maxBytes;
+
+  public FixedLengthInputStream(InputStream stream, long maxLen) {
+    super(new CountingInputStream(new CloseShieldInputStream(stream)));
+
+    // Save a correctly-typed reference to the underlying stream.
+    this.countingIn = (CountingInputStream) this.in;
+    this.maxBytes = maxLen;
+  }
+
+  /** @return the number of bytes already consumed by the client. */
+  private long consumed() {
+    return countingIn.getByteCount();
+  }
+
+  /**
+   * @return number of bytes remaining to be read before the limit
+   * is reached.
+   */
+  private long toLimit() {
+    return maxBytes - consumed();
+  }
+
+  @Override
+  public int available() throws IOException {
+    return (int) Math.min(toLimit(), countingIn.available());
+  }
+
+  @Override
+  public int read() throws IOException {
+    if (toLimit() > 0) {
+      return super.read();
+    } else {
+      return -1; // EOF.
+    }
+  }
+
+  @Override
+  public int read(byte [] buf) throws IOException {
+    return read(buf, 0, buf.length);
+  }
+
+  @Override
+  public int read(byte [] buf, int start, int count) throws IOException {
+    long limit = toLimit();
+    if (limit == 0) {
+      return -1; // EOF.
+    } else {
+      return super.read(buf, start, (int) Math.min(count, limit));
+    }
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/io/FixedLengthInputStream.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/io/LobFile.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/io/LobFile.java?rev=1190430&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/io/LobFile.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/io/LobFile.java Fri Oct 28 16:32:43 2011
@@ -0,0 +1,1821 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.io;
+
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.commons.io.output.CloseShieldOutputStream;
+import org.apache.commons.io.output.CountingOutputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.CompressorStream;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DecompressorStream;
+
+import com.cloudera.sqoop.io.LobReaderCache;
+import com.cloudera.sqoop.util.RandomHash;
+
+/**
+ * File format which stores large object records.
+ * The format allows large objects to be read through individual InputStreams
+ * to allow reading without full materialization of a single record.
+ * Each record is assigned an id and can be accessed by id efficiently by
+ * consulting an index at the end of the file.
+ *
+ * The LobFile format is specified at:
+ * http://wiki.github.com/cloudera/sqoop/sip-3
+ */
+public final class LobFile {
+
+  public static final Log LOG = LogFactory.getLog(LobFile.class.getName());
+  public static final int LATEST_LOB_VERSION = 0;
+
+  public static final char[] HEADER_ID_STR = { 'L', 'O', 'B' };
+
+  //Value for entryId to write to the beginning of an IndexSegment.
+  public static final long SEGMENT_HEADER_ID = -1;
+
+  //Value for entryId to write before the finale.
+  public static final long SEGMENT_OFFSET_ID = -2;
+
+  //Value for entryID to write before the IndexTable
+  public static final long INDEX_TABLE_ID = -3;
+
+  private LobFile() {
+  }
+
+  /**
+   * Creates a LobFile Reader configured to read from the specified file.
+   */
+  public static com.cloudera.sqoop.io.LobFile.Reader
+      open(Path p, Configuration conf) throws IOException {
+    FileSystem fs = p.getFileSystem(conf);
+    FileStatus [] stats = fs.listStatus(p);
+    if (null == stats || stats.length == 0) {
+      throw new IOException("Could not find file: " + p);
+    }
+    FSDataInputStream fis = fs.open(p);
+    DataInputStream dis = new DataInputStream(fis);
+    LobFileHeader header = new LobFileHeader(dis);
+    int version = header.getVersion();
+
+    if (version == 0) {
+      return new V0Reader(p, conf, header, dis, fis, stats[0].getLen());
+    } else {
+      throw new IOException("No reader available for LobFile version "
+          + version);
+    }
+  }
+
+  /**
+   * Creates a LobFile Writer.
+   * @param p the path to create.
+   * @param conf the configuration to use to interact with the filesystem.
+   * @param isCharData true if this is for CLOBs, false for BLOBs.
+   * @param codec the compression codec to use (or null for none).
+   * @param entriesPerSegment number of entries per index segment.
+   */
+  public static com.cloudera.sqoop.io.LobFile.Writer
+            create(Path p, Configuration conf, boolean isCharData,
+            String codec, int entriesPerSegment)
+      throws IOException {
+    return new V0Writer(p, conf, isCharData, codec, entriesPerSegment);
+  }
+
+  /**
+   * Creates a LobFile Writer.
+   * @param p the path to create.
+   * @param conf the configuration to use to interact with the filesystem.
+   * @param isCharData true if this is for CLOBs, false for BLOBs.
+   * @param codec the compression codec to use (or null for none).
+   */
+  public static com.cloudera.sqoop.io.LobFile.Writer
+            create(Path p, Configuration conf, boolean isCharData,
+            String codec) throws IOException {
+    return create(p, conf, isCharData, codec,
+        V0Writer.DEFAULT_MAX_SEGMENT_ENTRIES);
+  }
+
+  /**
+   * Creates a LobFile Writer configured for uncompressed data.
+   * @param p the path to create.
+   * @param conf the configuration to use to interact with the filesystem.
+   * @param isCharData true if this is for CLOBs, false for BLOBs.
+   */
+  public static com.cloudera.sqoop.io.LobFile.Writer
+            create(Path p, Configuration conf, boolean isCharData)
+      throws IOException {
+    return create(p, conf, isCharData, null);
+  }
+
+  /**
+   * Creates a LobFile Writer configured for uncompressed binary data.
+   * @param p the path to create.
+   * @param conf the configuration to use to interact with the filesystem.
+   */
+  public static com.cloudera.sqoop.io.LobFile.Writer
+            create(Path p, Configuration conf) throws IOException {
+    return create(p, conf, false);
+  }
+
+  /**
+   * Class that writes out a LobFile. Instantiate via LobFile.create().
+   */
+  public abstract static class Writer implements Closeable {
+    /**
+     * If this Writer is writing to a physical LobFile, then this returns
+     * the file path it is writing to. Otherwise it returns null.
+     * @return the fully-qualified path being written to by this writer.
+     */
+    public abstract Path getPath();
+
+    /**
+     * Finishes writing the LobFile and closes underlying handles.
+     */
+    public abstract void close() throws IOException;
+
+    @Override
+    protected synchronized void finalize() throws Throwable {
+      close();
+      super.finalize();
+    }
+
+    /**
+     * Terminates the current record and writes any trailing zero-padding
+     * required by the specified record size.
+     * This is implicitly called between consecutive writeBlobRecord() /
+     * writeClobRecord() calls.
+     */
+    public abstract void finishRecord() throws IOException;
+
+    /**
+     * Declares a new BLOB record to be written to the file.
+     * @param len the "claimed" number of bytes that will be written to
+     * this record. The actual number of bytes may differ.
+     */
+    public abstract OutputStream writeBlobRecord(long len) throws IOException;
+
+    /**
+     * Declares a new CLOB record to be written to the file.
+     * @param len the claimed number of characters that will be written to
+     * this record. The actual number of characters may differ.
+     */
+    public abstract java.io.Writer writeClobRecord(long len)
+        throws IOException;
+
+    /**
+     * Report the current position in the output file.
+     * @return the number of bytes written through this Writer.
+     */
+    public abstract long tell() throws IOException;
+
+    /**
+     * Checks whether an underlying stream is present or null.
+     * @param out the stream to check for null-ness.
+     * @throws IOException if out is null.
+     */
+    protected void checkForNull(OutputStream out) throws IOException {
+      if (null == out) {
+        throw new IOException("Writer has been closed.");
+      }
+    }
+  }
+
+  /**
+   * Class that can read a LobFile. Create with LobFile.open().
+   */
+  public abstract static class Reader implements Closeable {
+    /**
+     * If this Reader is reading from a physical LobFile, then this returns
+     * the file path it is reading from. Otherwise it returns null.
+     * @return the fully-qualified path being read by this reader.
+     */
+    public abstract Path getPath();
+
+    /**
+     * Report the current position in the file. Note that the internal
+     * cursor may move in an unpredictable fashion; e.g., to fetch
+     * additional data from the index stored at the end of the file.
+     * Clients may be more interested in the getRecordOffset() method
+     * which returns the starting offset of the current record.
+     * @return the current offset from the start of the file in bytes.
+     */
+    public abstract long tell() throws IOException;
+
+    /**
+     * Move the file pointer to the first available full record beginning at
+     * position 'pos', relative to the start of the file.  After calling
+     * seek(), you will need to call next() to move to the record itself.
+     * @param pos the position to seek to or past.
+     */
+    public abstract void seek(long pos) throws IOException;
+
+    /**
+     * Advances to the next record in the file.
+     * @return true if another record exists, or false if the
+     * end of the file has been reached.
+     */
+    public abstract boolean next() throws IOException;
+
+    /**
+     * @return true if we have aligned the Reader (through a call to next())
+     * onto a record.
+     */
+    public abstract boolean isRecordAvailable();
+
+    /**
+     * Reports the length of the record to the user.
+     * If next() has not been called, or seek() has been called without
+     * a subsequent call to next(), or next() returned false, the return
+     * value of this method is undefined.
+     * @return the 'claimedLen' field of the current record. For
+     * character-based records, this is often in characters, not bytes.
+     * Records may have more bytes associated with them than are reported
+     * by this method, but never fewer.
+     */
+    public abstract long getRecordLen();
+
+    /**
+     * Return the entryId of the current record to the user.
+     * If next() has not been called, or seek() has been called without
+     * a subsequent call to next(), or next() returned false, the return
+     * value of this method is undefined.
+     * @return the 'entryId' field of the current record.
+     */
+    public abstract long getRecordId();
+
+    /**
+     * Return the byte offset at which the current record starts.
+     * If next() has not been called, or seek() has been called without
+     * a subsequent call to next(), or next() returned false, the return
+     * value of this method is undefined.
+     * @return the byte offset of the beginning of the current record.
+     */
+    public abstract long getRecordOffset();
+
+    /**
+     * @return an InputStream allowing the user to read the next binary
+     * record from the file.
+     */
+    public abstract InputStream readBlobRecord() throws IOException;
+
+    /**
+     * @return a java.io.Reader allowing the user to read the next character
+     * record from the file.
+     */
+    public abstract java.io.Reader readClobRecord() throws IOException;
+
+    /**
+     * Closes the reader.
+     */
+    public abstract void close() throws IOException;
+
+    /**
+     * Checks whether an underlying stream is present or null.
+     * @param in the stream to check for null-ness.
+     * @throws IOException if in is null.
+     */
+    protected void checkForNull(InputStream in) throws IOException {
+      if (null == in) {
+        throw new IOException("Reader has been closed.");
+      }
+    }
+
+    /**
+     * @return true if the Reader.close() method has been called.
+     */
+    public abstract boolean isClosed();
+
+    @Override
+    protected synchronized void finalize() throws Throwable {
+      close();
+      super.finalize();
+    }
+  }
+
+  /**
+   * Represents a header block in a LobFile. Can write a new header
+   * block (and generate a record start mark), or read an existing
+   * header block.
+   */
+  private static class LobFileHeader implements Writable {
+
+    private int version;
+    private RecordStartMark startMark;
+    private MetaBlock metaBlock;
+
+    /**
+     * Create a new LobFileHeader.
+     */
+    public LobFileHeader() {
+      this.version = LATEST_LOB_VERSION;
+      this.startMark = new RecordStartMark();
+      this.metaBlock = new MetaBlock();
+    }
+
+    /**
+     * Read a LobFileHeader from an existing file.
+     */
+    public LobFileHeader(DataInput in) throws IOException {
+      readFields(in);
+    }
+
+    /**
+     * Write a LobFile header to an output sink.
+     */
+    public void write(DataOutput out) throws IOException {
+      // Start with the file type identification.
+      for (char c : HEADER_ID_STR) {
+        out.writeByte((int) c);
+      }
+
+      // Write the format version
+      WritableUtils.writeVInt(out, this.version);
+
+      startMark.write(out);
+      metaBlock.write(out);
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      char [] chars = new char[3];
+      for (int i = 0; i < 3; i++) {
+        chars[i] = (char) in.readByte();
+      }
+
+      // Check that these match what we expect. Throws IOE if not.
+      checkHeaderChars(chars);
+
+      this.version = WritableUtils.readVInt(in);
+      if (this.version != LATEST_LOB_VERSION) {
+        // Right now we only have one version we can handle.
+        throw new IOException("Unexpected LobFile version " + this.version);
+      }
+
+      this.startMark = new RecordStartMark(in);
+      this.metaBlock = new MetaBlock(in);
+    }
+
+    /**
+     * Checks that a header array matches the standard LobFile header.
+     * Additional data at the end of the headerStamp is ignored.
+     * @param headerStamp the header bytes received from the file.
+     * @throws IOException if it doesn't.
+     */
+    private void checkHeaderChars(char [] headerStamp) throws IOException {
+      if (headerStamp.length != HEADER_ID_STR.length) {
+        throw new IOException("Invalid LobFile header stamp: expected length "
+            + HEADER_ID_STR.length);
+      }
+      for (int i = 0; i < HEADER_ID_STR.length; i++) {
+        if (headerStamp[i] != HEADER_ID_STR[i]) {
+          throw new IOException("Invalid LobFile header stamp");
+        }
+      }
+    }
+
+    /**
+     * @return the format version number for this LobFile
+     */
+    public int getVersion() {
+      return version;
+    }
+
+    /**
+     * @return the RecordStartMark for this LobFile.
+     */
+    public RecordStartMark getStartMark() {
+      return startMark;
+    }
+
+    /**
+     * @return the MetaBlock for this LobFile.
+     */
+    public MetaBlock getMetaBlock() {
+      return metaBlock;
+    }
+  }
+
+  /**
+   * Holds a RecordStartMark -- a 16 byte randomly-generated
+   * sync token. Can read a RSM from an input source, or can
+   * generate a new one.
+   */
+  private static class RecordStartMark implements Writable {
+
+    // This is a 16-byte array.
+    public static final int START_MARK_LENGTH = 16;
+
+    private byte [] startBytes;
+
+    public RecordStartMark() {
+      generateStartMark();
+    }
+
+    public RecordStartMark(DataInput in) throws IOException {
+      readFields(in);
+    }
+
+    public byte [] getBytes() {
+      byte [] out = new byte[START_MARK_LENGTH];
+      System.arraycopy(this.startBytes, 0, out, 0, START_MARK_LENGTH);
+      return out;
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      this.startBytes = new byte[START_MARK_LENGTH];
+      in.readFully(this.startBytes);
+    }
+
+    public void write(DataOutput out) throws IOException {
+      out.write(this.startBytes);
+    }
+
+    /**
+     * Generate a new random RecordStartMark.
+     */
+    private void generateStartMark() {
+      this.startBytes = RandomHash.generateMD5Bytes();
+    }
+  }
+
+  /**
+   * Represents the metadata block stored in the header of a LobFile.
+   */
+  private static class MetaBlock extends AbstractMap<String, BytesWritable>
+      implements Writable {
+
+    // Strings which typically appear in the metablock have canonical names.
+    public static final String ENTRY_ENCODING_KEY = "EntryEncoding";
+    public static final String COMPRESSION_CODEC_KEY = "CompressionCodec";
+    public static final String ENTRIES_PER_SEGMENT_KEY = "EntriesPerSegment";
+
+    // Standard entry encodings.
+    public static final String CLOB_ENCODING = "CLOB";
+    public static final String BLOB_ENCODING = "BLOB";
+
+    private Map<String, BytesWritable> entries;
+
+    public MetaBlock() {
+      entries = new TreeMap<String, BytesWritable>();
+    }
+
+    public MetaBlock(DataInput in) throws IOException {
+      entries = new TreeMap<String, BytesWritable>();
+      readFields(in);
+    }
+
+    public MetaBlock(Map<String, BytesWritable> map) {
+      entries = new TreeMap<String, BytesWritable>();
+      for (Map.Entry<String, BytesWritable> entry : map.entrySet()) {
+        entries.put(entry.getKey(), entry.getValue());
+      }
+    }
+
+    @Override
+    public Set<Map.Entry<String, BytesWritable>> entrySet() {
+      return entries.entrySet();
+    }
+
+    @Override
+    public BytesWritable put(String k, BytesWritable v) {
+      BytesWritable old = entries.get(k);
+      entries.put(k, v);
+      return old;
+    }
+
+    public BytesWritable put(String k, String v) {
+      try {
+        return put(k, new BytesWritable(v.getBytes("UTF-8")));
+      } catch (UnsupportedEncodingException uee) {
+        // Shouldn't happen; UTF-8 is always supported.
+        throw new RuntimeException(uee);
+      }
+    }
+
+    @Override
+    public BytesWritable get(Object k) {
+      return entries.get(k);
+    }
+
+    public String getString(Object k) {
+      BytesWritable bytes = get(k);
+      if (null == bytes) {
+        return null;
+      } else {
+        try {
+          return new String(bytes.getBytes(), 0, bytes.getLength(), "UTF-8");
+        } catch (UnsupportedEncodingException uee) {
+          // Shouldn't happen; UTF-8 is always supported.
+          throw new RuntimeException(uee);
+        }
+      }
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      int numEntries = WritableUtils.readVInt(in);
+      entries.clear();
+      for (int i = 0; i < numEntries; i++) {
+        String key = Text.readString(in);
+        BytesWritable val = new BytesWritable();
+        val.readFields(in);
+        entries.put(key, val);
+      }
+    }
+
+    public void write(DataOutput out) throws IOException {
+      int numEntries = entries.size();
+      WritableUtils.writeVInt(out, numEntries);
+      for (Map.Entry<String, BytesWritable> entry : entries.entrySet()) {
+        Text.writeString(out, entry.getKey());
+        entry.getValue().write(out);
+      }
+    }
+  }
+
+
+  /**
+   * Describes an IndexSegment. This is one entry in the IndexTable. It
+   * holds the physical location of the IndexSegment in the file, as well
+   * as the range of entryIds and byte ranges corresponding to records
+   * described by the index subset in the IndexSegment.
+   */
+  private static class IndexTableEntry implements Writable {
+    private long segmentOffset;
+    private long firstIndexId;
+    private long firstIndexOffset;
+    private long lastIndexOffset;
+
+    public IndexTableEntry() {
+    }
+
+    public IndexTableEntry(DataInput in) throws IOException {
+      readFields(in);
+    }
+
+    private void setSegmentOffset(long offset) {
+      this.segmentOffset = offset;
+    }
+
+    private void setFirstIndexId(long id) {
+      this.firstIndexId = id;
+    }
+
+    private void setFirstIndexOffset(long offset) {
+      this.firstIndexOffset = offset;
+    }
+
+    private void setLastIndexOffset(long offset) {
+      this.lastIndexOffset = offset;
+    }
+
+    public void write(DataOutput out) throws IOException {
+      WritableUtils.writeVLong(out, segmentOffset);
+      WritableUtils.writeVLong(out, firstIndexId);
+      WritableUtils.writeVLong(out, firstIndexOffset);
+      WritableUtils.writeVLong(out, lastIndexOffset);
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      segmentOffset = WritableUtils.readVLong(in);
+      firstIndexId = WritableUtils.readVLong(in);
+      firstIndexOffset = WritableUtils.readVLong(in);
+      lastIndexOffset = WritableUtils.readVLong(in);
+    }
+
+    /**
+     * @return the entryId of the first record indexed by this segment.
+     */
+    public long getFirstIndexId() {
+      return this.firstIndexId;
+    }
+
+    /**
+     * @return the offset of the first record indexed by this segment.
+     */
+    public long getFirstIndexOffset() {
+      return this.firstIndexOffset;
+    }
+
+    /**
+     * @return the offset of the last record indexed by this segment.
+     */
+    public long getLastIndexOffset() {
+      return this.lastIndexOffset;
+    }
+
+    /**
+     * @return the offset from the start of the file of the IndexSegment
+     * data itself.
+     */
+    public long getSegmentOffset() {
+      return this.segmentOffset;
+    }
+
+    /**
+     * Inform whether the user's requested offset corresponds
+     * to a record that starts in this IndexSegment. If this
+     * returns true, the requested offset may actually be in
+     * a previous IndexSegment.
+     * @param off the offset of the start of a record to test.
+     * @return true if the user's requested offset is in this
+     * or a previous IndexSegment.
+     */
+    public boolean containsOffset(long off) {
+      return off <= getLastIndexOffset();
+    }
+  }
+
+  /**
+   * Class that represents the IndexSegment entries in a LobIndex.
+   */
+  private static class IndexSegment implements Writable {
+
+    // The main body of the IndexSegment: the record lengths
+    // of all the records in the IndexSegment.
+    private BytesWritable recordLenBytes;
+
+    // The length of the previously recorded field (used when
+    // generating an index). Intermediate state used in calculation
+    // of the lastIndexOffset.
+    private long prevLength;
+
+    // Used to write VLong-encoded lengths into a temp
+    // array, which are then copied into recordLenBytes.
+    private DataOutputBuffer outputBuffer;
+
+    // The IndexTableEntry that describes this IndexSegment in the IndexTable.
+    private IndexTableEntry tableEntry;
+
+    public IndexSegment(IndexTableEntry tableEntry) {
+      this.recordLenBytes = new BytesWritable();
+      this.outputBuffer = new DataOutputBuffer(10); // max VLong size.
+      this.tableEntry = tableEntry;
+    }
+
+    /**
+     * Read an IndexSegment from an existing file.
+     */
+    public IndexSegment(IndexTableEntry tableEntry, DataInput in)
+        throws IOException {
+      this.recordLenBytes = new BytesWritable();
+      this.outputBuffer = new DataOutputBuffer(10);
+      this.tableEntry = tableEntry;
+      readFields(in);
+    }
+
+    /**
+     * @return the IndexTableEntry describing this IndexSegment in the
+     * IndexTable.
+     */
+    public IndexTableEntry getTableEntry() {
+      return tableEntry;
+    }
+
+    /**
+     * Add a recordLength to the recordLenBytes array.
+     */
+    public void addRecordLen(long recordLen) throws IOException {
+      // Allocate space for the new bytes.
+      int numBytes = WritableUtils.getVIntSize(recordLen);
+      recordLenBytes.setSize(recordLenBytes.getLength() + numBytes);
+
+      // Write the new bytes into a temporary buffer wrapped in a DataOutput.
+      outputBuffer.reset();
+      WritableUtils.writeVLong(outputBuffer, recordLen);
+
+      // Then copy those new bytes into the end of the recordLenBytes array.
+      System.arraycopy(outputBuffer.getData(), 0, recordLenBytes.getBytes(),
+          recordLenBytes.getLength() - numBytes, numBytes);
+
+      // Now that we've added a new recordLength to the array,
+      // it's the last index. We need to calculate its offset.
+      // This is based on how long the previous record was.
+      this.tableEntry.setLastIndexOffset(
+          this.tableEntry.getLastIndexOffset() + this.prevLength);
+
+      // Save this record's length (unserialized) for calculating
+      // lastIndexOffset for the next record.
+      this.prevLength = recordLen;
+    }
+
+    public void write(DataOutput out) throws IOException {
+      // Write the SEGMENT_HEADER_ID to distinguish this from a LobRecord.
+      WritableUtils.writeVLong(out, SEGMENT_HEADER_ID);
+
+      // The length of the main body of the segment is the length of the
+      // data byte array.
+      int segmentBytesLen = recordLenBytes.getLength();
+      WritableUtils.writeVLong(out, segmentBytesLen);
+
+      // Write the body of the segment.
+      out.write(recordLenBytes.getBytes(), 0, segmentBytesLen);
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      // After the RecordStartMark, we expect to get a SEGMENT_HEADER_ID (-1).
+      long segmentId = WritableUtils.readVLong(in);
+      if (SEGMENT_HEADER_ID != segmentId) {
+        throw new IOException("Expected segment header id " + SEGMENT_HEADER_ID
+            + "; got " + segmentId);
+      }
+
+      // Get the length of the rest of the segment, in bytes.
+      long length = WritableUtils.readVLong(in);
+
+      // Now read the actual main byte array.
+      if (length > Integer.MAX_VALUE) {
+        throw new IOException("Unexpected oversize data array length: "
+            + length);
+      } else if (length < 0) {
+        throw new IOException("Unexpected undersize data array length: "
+            + length);
+      }
+      byte [] segmentData = new byte[(int) length];
+      in.readFully(segmentData);
+      recordLenBytes = new BytesWritable(segmentData);
+
+      reset(); // Reset the iterator allowing the user to yield offset/lengths.
+    }
+
+
+    // The following methods are used by a Reader to walk through the index
+    // segment and get data about the records described in this segment of
+    // the index.
+
+    private DataInputBuffer dataInputBuf;
+
+    // The following two fields are advanced by the next() method.
+    private long curOffset; // offset into the file of the current record.
+    private long curLen; // length of the current record in bytes.
+
+    // Used to allow rewindOnce() to go backwards a single position in the
+    // iterator.
+    private int prevInputBufPos; // prev offset into dataInputBuf.
+    private long prevOffset;
+    private long prevLen;
+
+    /**
+     * Resets the record index iterator.
+     */
+    public void reset() {
+      this.dataInputBuf = null;
+    }
+
+    /**
+     * Aligns the iteration capability to return info about the next
+     * record in the IndexSegment. Must be called before the first
+     * record.
+     * @return true if there is another record described in this IndexSegment.
+     */
+    public boolean next() {
+      this.prevOffset = this.curOffset;
+      if (null == dataInputBuf) {
+        // We need to set up the iterator; this is the first use.
+        if (null == recordLenBytes) {
+          return false; // We don't have any records?
+        }
+
+        this.dataInputBuf = new DataInputBuffer();
+        this.dataInputBuf.reset(recordLenBytes.getBytes(),
+            0, recordLenBytes.getLength());
+
+        this.curOffset = this.tableEntry.getFirstIndexOffset();
+        this.prevOffset = 0;
+      } else {
+        this.curOffset += this.curLen;
+      }
+
+      boolean available = dataInputBuf.getPosition() < dataInputBuf.getLength();
+      if (available) {
+        this.prevInputBufPos = dataInputBuf.getPosition();
+        // Then read out the next record length.
+        try {
+          this.prevLen = this.curLen;
+          this.curLen = WritableUtils.readVLong(dataInputBuf);
+        } catch (IOException ioe) {
+          // Shouldn't happen; data in DataInputBuffer is materialized.
+          throw new RuntimeException(ioe);
+        }
+      }
+
+      return available;
+    }
+
+    /**
+     * Undoes a single call to next(). This cannot be called twice in a row;
+     * before calling this again, next() must be called in the interim. This
+     * makes a subsequent call to next() yield the same iterated values as the
+     * previous call.
+     */
+    public void rewindOnce() {
+      // Move the buffer backwards so we deserialize the same VLong with
+      // the next call.
+      if (prevInputBufPos == 0) {
+        // We actually rewound the first next() in the iterator.
+        // Just reset the iterator to the beginning. Otherwise we'll
+        // backfill it with bogus data.
+        reset();
+      } else {
+        // Use the normal codepath; move the serialization buffer
+        // backwards and restores the previously yielded values.
+        dataInputBuf.reset(recordLenBytes.getBytes(), prevInputBufPos,
+            recordLenBytes.getLength() - prevInputBufPos);
+
+        // And restore the previously-yielded values.
+        this.curLen = this.prevLen;
+        this.curOffset = this.prevOffset;
+      }
+    }
+
+    /**
+     * Returns the length of the current record.
+     * You must call next() and it must return true before calling this method.
+     * @return the length in bytes of the current record.
+     */
+    public long getCurRecordLen() {
+      return curLen;
+    }
+
+    /**
+     * Returns the offset of the current record from the beginning of the file.
+     * You must call next() and it must return true before calling this method.
+     * @return the offset in bytes from the beginning of the file for the
+     * current record.
+     */
+    public long getCurRecordStart() {
+      return curOffset;
+    }
+  }
+
+  /**
+   * Stores the locations and ranges indexed by each IndexSegment.
+   */
+  private static class IndexTable
+      implements Iterable<IndexTableEntry>, Writable {
+    private List<IndexTableEntry> tableEntries;
+
+    public IndexTable() {
+      tableEntries = new ArrayList<IndexTableEntry>();
+    }
+
+    public IndexTable(DataInput in) throws IOException {
+      readFields(in);
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      long recordTypeId = WritableUtils.readVLong(in);
+      if (recordTypeId != INDEX_TABLE_ID) {
+        // We expected to read an IndexTable.
+        throw new IOException("Expected IndexTable; got record with typeId="
+            + recordTypeId);
+      }
+
+      int tableCount = WritableUtils.readVInt(in);
+
+      tableEntries = new ArrayList<IndexTableEntry>(tableCount);
+      for (int i = 0; i < tableCount; i++) {
+        tableEntries.add(new IndexTableEntry(in));
+      }
+    }
+
+    public void write(DataOutput out) throws IOException {
+      // Start with the record type id.
+      WritableUtils.writeVLong(out, INDEX_TABLE_ID);
+
+      // Then the count of the records.
+      WritableUtils.writeVInt(out, tableEntries.size());
+
+      // Followed by the table itself.
+      for (IndexTableEntry entry : tableEntries) {
+        entry.write(out);
+      }
+    }
+
+    public void add(IndexTableEntry entry) {
+      tableEntries.add(entry);
+    }
+
+    public IndexTableEntry get(int i) {
+      return tableEntries.get(i);
+    }
+
+    public int size() {
+      return tableEntries.size();
+    }
+
+    public Iterator<IndexTableEntry> iterator() {
+      return tableEntries.iterator();
+    }
+  }
+
+  /**
+   * Reader implementation for LobFile format version 0. Acquire with
+   * LobFile.open().
+   */
+  private static class V0Reader extends com.cloudera.sqoop.io.LobFile.Reader {
+    public static final Log LOG = LogFactory.getLog(
+        V0Reader.class.getName());
+
+    // Forward seeks of up to this size are performed by reading, not seeking.
+    private static final long MAX_CONSUMPTION_WIDTH = 512 * 1024;
+
+    private LobFileHeader header;
+
+    private Configuration conf;
+
+    // Codec to use to decompress the file.
+    private CompressionCodec codec;
+    private Decompressor decompressor;
+
+    // Length of the entire file.
+    private long fileLen;
+
+    // State bit set to true after we've called next() and successfully
+    // aligned on a record. If true, we can hand an InputStream back to
+    // the user.
+    private boolean isAligned;
+
+    // After we've aligned on a record, this contains the record's
+    // reported length. In the presence of compression, etc, this may
+    // not represent its true length in the file.
+    private long claimedRecordLen;
+
+    // After we've aligned on a record, this contains its entryId.
+    private long curEntryId;
+
+    // After we've aligned on a record, this contains the offset of the
+    // beginning of its RSM from the start of the file.
+    private long curRecordOffset;
+
+    // After we've aligned on a record, this contains the record's
+    // true length from the index.
+    private long indexRecordLen;
+
+    // tmp buffer used to consume RecordStartMarks during alignment.
+    private byte [] tmpRsmBuf;
+
+    // The actual file stream itself, which we can move around (e.g. with
+    // seeking).
+    private FSDataInputStream underlyingInput;
+
+    // The data deserializer we typically place on top of this.
+    // If we use underlyingInput.seek(), then we instantiate a new
+    // dataIn on top of it.
+    private DataInputStream dataIn;
+
+    // The user accesses the current record through a stream memoized here.
+    // We retain a pointer here so that we can forcibly close the old
+    // userInputStream when they want to align on the next record.
+    private InputStream userInputStream;
+
+    // The current index segment to read record lengths from.
+    private IndexSegment curIndexSegment;
+
+    // The offset into the indexTable of the curIndexSegment.
+    private int curIndexSegmentId;
+
+    // The IndexTable that provides fast pointers to the IndexSegments.
+    private IndexTable indexTable;
+
+    // The path being opened.
+    private Path path;
+
+    // Users should use LobFile.open() instead of directly calling this.
+    V0Reader(Path path, Configuration conf, LobFileHeader header,
+        DataInputStream dis, FSDataInputStream stream, long fileLen)
+        throws IOException {
+      this.path = LobReaderCache.qualify(path, conf);
+      this.conf = conf;
+      this.header = header;
+      this.dataIn = dis;
+      this.underlyingInput = stream;
+      this.isAligned = false;
+      this.tmpRsmBuf = new byte[RecordStartMark.START_MARK_LENGTH];
+      this.fileLen = fileLen;
+      LOG.debug("Opening LobFile path: " + path);
+      openCodec();
+      openIndex();
+    }
+
+    /**
+     * If the user has specified a compression codec in the header metadata,
+     * create an instance of it.
+     */
+    private void openCodec() throws IOException {
+      String codecName = header.getMetaBlock().getString(
+          MetaBlock.COMPRESSION_CODEC_KEY);
+      if (null != codecName) {
+        LOG.debug("Decompressing file with codec: " + codecName);
+        this.codec = CodecMap.getCodec(codecName, conf);
+        if (null != this.codec) {
+          this.decompressor = codec.createDecompressor();
+        }
+      }
+    }
+
+    /**
+     * Get the first index segment out of the file; determine
+     * where that is by loading the index locator at the end of
+     * the file.
+     */
+    private void openIndex() throws IOException {
+      // Jump to the end of the file.
+      // At the end of the file is a RSM followed by two VLongs;
+      // the first of these is the value -2 (one byte) and the
+      // second of these is the offset of the beginning of the index (up to
+      // 9 bytes).
+      internalSeek(fileLen - RecordStartMark.START_MARK_LENGTH - 10);
+
+      byte [] finaleBuffer = new byte[RecordStartMark.START_MARK_LENGTH + 10];
+      this.dataIn.readFully(finaleBuffer);
+
+      // Figure out where in the finaleBuffer the RSM actually starts,
+      // as the finale might not fully fill the finaleBuffer.
+      int rsmStart = findRecordStartMark(finaleBuffer);
+      if (-1 == rsmStart) {
+        throw new IOException(
+            "Corrupt file index; could not find index start offset.");
+      }
+
+      // Wrap a buffer around those two vlongs.
+      int vlongStart = rsmStart + RecordStartMark.START_MARK_LENGTH;
+      DataInputBuffer inBuf = new DataInputBuffer();
+      inBuf.reset(finaleBuffer, vlongStart, finaleBuffer.length - vlongStart);
+
+      long offsetMarker = WritableUtils.readVLong(inBuf);
+      if (SEGMENT_OFFSET_ID != offsetMarker) {
+        // This isn't the correct signature; we got an RSM ahead of some
+        // other data.
+        throw new IOException("Invalid segment offset id: " + offsetMarker);
+      }
+
+      // This will contain the position of the IndexTable.
+      long indexTableStart = WritableUtils.readVLong(inBuf);
+      LOG.debug("IndexTable begins at " + indexTableStart);
+
+      readIndexTable(indexTableStart);
+
+      // Set up to read records from the beginning of the file. This
+      // starts with the first IndexSegment.
+      curIndexSegmentId = 0;
+      loadIndexSegment();
+
+      // This has moved the file pointer all over but we don't need to
+      // worry about resetting it now. The next() method will seek the
+      // file pointer to the first record when the user is ready to
+      // consume it.
+    }
+
+    /**
+     * Load the entire IndexTable into memory and decode it.
+     */
+    private void readIndexTable(long indexTableOffset) throws IOException {
+      internalSeek(indexTableOffset);
+
+      // Read the RecordStartMark ahead of the IndexTable.
+      this.dataIn.readFully(tmpRsmBuf);
+      if (!matchesRsm(tmpRsmBuf)) {
+        throw new IOException("Expected record start mark before IndexTable");
+      }
+
+      this.indexTable = new IndexTable(dataIn);
+    }
+
+    /**
+     * Ingest the next IndexSegment.
+     */
+    private void readNextIndexSegment() throws IOException {
+      this.curIndexSegmentId++;
+      loadIndexSegment();
+    }
+
+    /**
+     * Load curIndexSegment with the segment specified by curIndexSegmentId.
+     * The file pointer will be moved to the position after this segment.
+     * If the segment id does not exist, then the curIndexSegment will be
+     * set to null.
+     */
+    private void loadIndexSegment() throws IOException {
+      if (indexTable.size() <= curIndexSegmentId || curIndexSegmentId < 0) {
+        // We've iterated past the last IndexSegment. Set this to null
+        // and return; the next() method will then return false.
+        this.curIndexSegment = null;
+        return;
+      }
+
+      // Otherwise, seek to the segment and load it.
+      IndexTableEntry tableEntry = indexTable.get(curIndexSegmentId);
+      long segmentOffset = tableEntry.getSegmentOffset();
+      internalSeek(segmentOffset);
+      readPositionedIndexSegment();
+    }
+
+    /**
+     * When the underlying stream is aligned on the RecordStartMark
+     * ahead of an IndexSegment, read in the next IndexSegment.
+     * After this method the curIndexSegment contains the next
+     * IndexSegment to read in the file; if the entire index has been
+     * read in this fastion, curIndexSegment will be null.
+     */
+    private void readPositionedIndexSegment() throws IOException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Reading index segment at " + tell());
+      }
+
+      // Read the RecordStartMark ahead of the IndexSegment.
+      this.dataIn.readFully(tmpRsmBuf);
+      if (!matchesRsm(tmpRsmBuf)) {
+        throw new IOException("Expected record start mark before IndexSegment");
+      }
+
+      // Read the IndexSegment proper.
+      this.curIndexSegment = new IndexSegment(
+          this.indexTable.get(curIndexSegmentId), this.dataIn);
+    }
+
+    /**
+     * @return true if the bytes in 'buf' starting at 'offset' match
+     * the RecordStartMark.
+     * @param rsm the RecordStartMark
+     * @param buf the buffer to check
+     * @param offset the offset into buf to begin checking.
+     */
+    private boolean matchesRsm(byte [] rsm, byte [] buf, int offset) {
+      for (int i = 0; i < RecordStartMark.START_MARK_LENGTH; i++) {
+        if (buf[i + offset] != rsm[i]) {
+          return false; // Mismatch at position i.
+        }
+      }
+
+      return true; // Matched the whole thing.
+    }
+
+    private boolean matchesRsm(byte [] buf, int offset) {
+      return matchesRsm(this.header.getStartMark().getBytes(),
+          buf, offset);
+    }
+
+    private boolean matchesRsm(byte [] buf) {
+      return matchesRsm(buf, 0);
+    }
+
+    /**
+     * @return the offset in 'buf' where a RecordStartMark begins, or -1
+     * if the RecordStartMark is not present in the buffer.
+     */
+    private int findRecordStartMark(byte [] buf) {
+      byte [] rsm = this.header.getStartMark().getBytes();
+
+      for (int i = 0; i < buf.length; i++) {
+        if (matchesRsm(rsm, buf, i)) {
+          return i;
+        }
+      }
+
+      return -1; // couldn't find it.
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    public Path getPath() {
+      return this.path;
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    public long tell() throws IOException {
+      checkForNull(this.underlyingInput);
+      return this.underlyingInput.getPos();
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    public void seek(long pos) throws IOException {
+      closeUserStream();
+      checkForNull(this.underlyingInput);
+      this.isAligned = false;
+      searchForRecord(pos);
+    }
+
+    /**
+     * Search the index for the first record starting on or after 'start'.
+     * @param start the offset in the file where we should start looking
+     * for a record.
+     */
+    private void searchForRecord(long start) throws IOException {
+      LOG.debug("Looking for the first record at/after offset " + start);
+
+      // Scan through the IndexTable until we find the IndexSegment
+      // that contains the offset.
+      for (int i = 0; i < indexTable.size(); i++) {
+        IndexTableEntry tableEntry = indexTable.get(i);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Checking index table entry for range: "
+              + tableEntry.getFirstIndexOffset() + ", "
+              + tableEntry.getLastIndexOffset());
+        }
+
+        if (tableEntry.containsOffset(start)) {
+          // Seek to the IndexSegment associated with this tableEntry.
+          curIndexSegmentId = i;
+          loadIndexSegment();
+
+          // Use this index segment. The record index iterator
+          // is at the beginning of the IndexSegment, since we just
+          // read it in.
+          LOG.debug("Found matching index segment.");
+          while (this.curIndexSegment.next()) {
+            long curStart = this.curIndexSegment.getCurRecordStart();
+            if (curStart >= start) {
+              LOG.debug("Found seek target record with offset " + curStart);
+              // This is the first record to meet this criterion.
+              // Rewind the index iterator by one so that the next()
+              // method will do the right thing. next() will also
+              // take care of actually seeking to the correct position
+              // in the file to read the record proper.
+              this.curIndexSegment.rewindOnce();
+              return;
+            }
+          }
+
+          // If it wasn't actually in this IndexSegment, then we've
+          // got a corrupt IndexTableEntry; the entry represented that
+          // the segment ran longer than it actually does.
+          throw new IOException("IndexTableEntry claims last offset of "
+              + tableEntry.getLastIndexOffset()
+              + " but IndexSegment ends early."
+              + " The IndexTable appears corrupt.");
+        }
+      }
+
+      // If we didn't return inside the loop, then we've searched the entire
+      // file and it's not there. Advance the IndexSegment iterator to
+      // the end of the road so that next() returns false.
+      this.curIndexSegmentId = indexTable.size();
+      loadIndexSegment();
+    }
+
+    /**
+     * Read data from the stream and discard it.
+     * @param numBytes number of bytes to read and discard.
+     */
+    private void consumeBytes(int numBytes) throws IOException {
+      int remaining = numBytes;
+      while (remaining > 0) {
+        int received = dataIn.skipBytes(remaining);
+        if (received < 1) {
+          throw new IOException("Could not consume additional bytes");
+        }
+        remaining -= received;
+      }
+    }
+
+    /**
+     * Seek to position 'pos' (offset from start of file). If this
+     * is nearby, actually just consume data from the underlying
+     * stream rather than doing a real seek.
+     * @param targetPos the position to seek to, expressed as an offset
+     * from the start of the file.
+     */
+    private void internalSeek(long targetPos) throws IOException {
+      long curPos = this.underlyingInput.getPos();
+      LOG.debug("Internal seek: target=" + targetPos + "; cur=" + curPos);
+      long distance = targetPos - curPos;
+      if (targetPos == curPos) {
+        LOG.debug("(no motion required)");
+        return; // We're already there!
+      } else if (targetPos > curPos && distance < MAX_CONSUMPTION_WIDTH) {
+        // We're "close enough" that we should just read it.
+        LOG.debug("Advancing by " + distance + " bytes.");
+        consumeBytes((int) distance);
+      } else {
+        LOG.debug("Direct seek to target");
+        this.underlyingInput.seek(targetPos);
+        this.dataIn = new DataInputStream(this.underlyingInput);
+      }
+    }
+
+    /**
+     * Close any stream to an open record that was opened by a user.
+     */
+    private void closeUserStream() throws IOException {
+      if (this.userInputStream != null) {
+        this.userInputStream.close();
+        this.userInputStream = null;
+      }
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    public boolean next() throws IOException {
+      LOG.debug("Checking for next record");
+      checkForNull(this.underlyingInput);
+      // If the user has opened a record stream, it is now void.
+      closeUserStream();
+      this.isAligned = false; // false until proven true.
+
+      // Get the position of the next record start.
+      // Check the index: is there another record?
+      if (null == curIndexSegment) {
+        LOG.debug("Index is finished; false");
+        return false; // No index remains. Ergo, no more records.
+      }
+      boolean moreInSegment = curIndexSegment.next();
+      if (!moreInSegment) {
+        // The current IndexSegment has been exhausted. Move to the next.
+        LOG.debug("Loading next index segment.");
+        readNextIndexSegment();
+        if (null == curIndexSegment) {
+          LOG.debug("Index is finished; false");
+          return false; // No index; no records.
+        }
+
+        // Try again with the next IndexSegment.
+        moreInSegment = curIndexSegment.next();
+      }
+
+      if (!moreInSegment) {
+        // Nothing left in the last IndexSegment.
+        LOG.debug("Last index segment is finished; false.");
+        this.curIndexSegment = null;
+        return false;
+      }
+
+      // Determine where the next record starts.
+      this.indexRecordLen = this.curIndexSegment.getCurRecordLen();
+      this.curRecordOffset = this.curIndexSegment.getCurRecordStart();
+
+      LOG.debug("Next record starts at position: " + this.curRecordOffset
+          + "; indexedLen=" + this.indexRecordLen);
+
+      // Make sure we're at the target position.
+      internalSeek(this.curRecordOffset);
+
+      // We are now on top of the next record's RecordStartMark.
+      // Consume the RSM and the record header.
+      this.dataIn.readFully(this.tmpRsmBuf);
+      if (!matchesRsm(tmpRsmBuf)) {
+        // No rsm? No dice.
+        throw new IOException("Index contains bogus offset.");
+      }
+
+      this.curEntryId = WritableUtils.readVLong(this.dataIn);
+      if (this.curEntryId < 0) {
+        // We've moved past the end of the records and started
+        // trying to consume the index. This is the EOF from
+        // the client's perspective.
+        LOG.debug("Indexed position is itself an IndexSegment; false.");
+        return false;
+      }
+      LOG.debug("Aligned on record id=" + this.curEntryId);
+
+      this.claimedRecordLen = WritableUtils.readVLong(this.dataIn);
+      LOG.debug("Record has claimed length " + this.claimedRecordLen);
+      // We are now aligned on the start of the user's data.
+      this.isAligned = true;
+      return true;
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    public boolean isRecordAvailable() {
+      return this.isAligned;
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    public long getRecordLen() {
+      return this.claimedRecordLen;
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    public long getRecordId() {
+      return this.curEntryId;
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    public long getRecordOffset() {
+      return this.curRecordOffset;
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    public InputStream readBlobRecord() throws IOException {
+      if (!isRecordAvailable()) {
+        // we're not currently aligned on a record-start.
+        // Try to get the next one.
+        if (!next()) {
+          // No more records available.
+          throw new EOFException("End of file reached.");
+        }
+      }
+
+      // Ensure any previously-open user record stream is closed.
+      closeUserStream();
+
+      // Mark this record as consumed.
+      this.isAligned = false;
+
+      // The length of the stream we can return to the user is
+      // the indexRecordLen minus the length of any per-record headers.
+      // That includes the RecordStartMark, the entryId, and the claimedLen.
+      long streamLen = this.indexRecordLen - RecordStartMark.START_MARK_LENGTH
+          - WritableUtils.getVIntSize(this.curEntryId)
+          - WritableUtils.getVIntSize(this.claimedRecordLen);
+      LOG.debug("Yielding stream to user with length " + streamLen);
+      this.userInputStream = new FixedLengthInputStream(this.dataIn, streamLen);
+      if (this.codec != null) {
+        // The user needs to decompress the data; wrap the InputStream.
+        decompressor.reset();
+        this.userInputStream = new DecompressorStream(
+            this.userInputStream, decompressor);
+      }
+      return this.userInputStream;
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    public java.io.Reader readClobRecord() throws IOException {
+      // Get a handle to the binary reader and then wrap it.
+      InputStream is = readBlobRecord();
+      return new InputStreamReader(is);
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    public void close() throws IOException {
+      closeUserStream();
+
+      if (null != dataIn) {
+        dataIn.close();
+        dataIn = null;
+      }
+
+      if (null != underlyingInput) {
+        underlyingInput.close();
+        underlyingInput = null;
+      }
+
+      this.isAligned = false;
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    public boolean isClosed() {
+      return this.underlyingInput == null;
+    }
+  }
+
+
+  /**
+   * Concrete writer implementation for LobFile format version 0.
+   * Instantiate via LobFile.create().
+   */
+  private static class V0Writer extends com.cloudera.sqoop.io.LobFile.Writer {
+    public static final Log LOG = LogFactory.getLog(
+        V0Writer.class.getName());
+
+    private Configuration conf;
+    private Path path;
+    private boolean isCharData;
+    private LobFileHeader header;
+
+    private String codecName;
+    private CompressionCodec codec;
+    private Compressor compressor;
+
+    // The LobIndex we are constructing.
+    private LinkedList<IndexSegment> indexSegments;
+    // Number of entries in the current IndexSegment.
+    private int entriesInSegment;
+    private IndexTable indexTable;
+
+    // Number of entries that can be written to a single IndexSegment.
+    private int maxEntriesPerSegment;
+
+    // By default we write this many entries per IndexSegment.
+    static final int DEFAULT_MAX_SEGMENT_ENTRIES = 4096;
+
+    // Our OutputStream to the underlying file.
+    private DataOutputStream out;
+
+    // 'out' is layered on top of this stream, which gives us a count
+    // of how much data we've written so far.
+    private CountingOutputStream countingOut;
+
+    // State regarding the current record being written.
+    private long curEntryId; // entryId of the current LOB being written.
+    private long curClaimedLen; // The user claims a length for a record.
+
+    // The user's OutputStream and/or Writer that writes to us.
+    private OutputStream userOutputStream;
+    private java.io.Writer userWriter;
+
+    // The userCountingOutputStream may be the same as userOutputStream;
+    // but if the user is writing through a compressor, it is actually
+    // underneath of it. This tells us how many compressed bytes were
+    // really written.
+    private CountingOutputStream userCountingOutputStream;
+
+    /**
+     * Creates a LobFile Writer for file format version 0.
+     * @param p the path to create.
+     * @param conf the configuration to use to interact with the filesystem.
+     * @param isCharData true if this is for CLOBs, false for BLOBs.
+     * @param codecName the compression codec to use (or null for none).
+     * @param entriesPerSegment the number of index entries per IndexSegment.
+     */
+    V0Writer(Path p, Configuration conf, boolean isCharData,
+        String codecName, int entriesPerSegment) throws IOException {
+
+      this.path = LobReaderCache.qualify(p, conf);
+      this.conf = conf;
+      this.isCharData = isCharData;
+      this.header = new LobFileHeader();
+      this.indexSegments = new LinkedList<IndexSegment>();
+      this.indexTable = new IndexTable();
+      this.maxEntriesPerSegment = entriesPerSegment;
+
+      this.codecName = codecName;
+      if (this.codecName != null) {
+        this.codec = CodecMap.getCodec(codecName, conf);
+        if (null != this.codec) {
+          this.compressor = codec.createCompressor();
+        }
+      }
+
+      init();
+    }
+
+    /**
+     * Open the file and write its header.
+     */
+    private void init() throws IOException {
+      FileSystem fs = this.path.getFileSystem(conf);
+      FSDataOutputStream fsOut = fs.create(this.path);
+      this.countingOut = new CountingOutputStream(
+          new BufferedOutputStream(fsOut));
+      this.out = new DataOutputStream(this.countingOut);
+
+      // put any necessary config strings into the header.
+      MetaBlock m = this.header.getMetaBlock();
+      if (isCharData) {
+        m.put(MetaBlock.ENTRY_ENCODING_KEY, MetaBlock.CLOB_ENCODING);
+      } else {
+        m.put(MetaBlock.ENTRY_ENCODING_KEY, MetaBlock.BLOB_ENCODING);
+      }
+
+      if (null != codec) {
+        m.put(MetaBlock.COMPRESSION_CODEC_KEY, this.codecName);
+      }
+
+      // Serialize the value of maxEntriesPerSegment as a VInt in a byte array
+      // and put that into the metablock as ENTRIES_PER_SEGMENT_KEY.
+      int segmentBufLen = WritableUtils.getVIntSize(this.maxEntriesPerSegment);
+      DataOutputBuffer entriesPerSegBuf = new DataOutputBuffer(segmentBufLen);
+      WritableUtils.writeVInt(entriesPerSegBuf, this.maxEntriesPerSegment);
+      byte [] entriesPerSegArray =
+          Arrays.copyOf(entriesPerSegBuf.getData(), segmentBufLen);
+      m.put(MetaBlock.ENTRIES_PER_SEGMENT_KEY,
+          new BytesWritable(entriesPerSegArray));
+
+      // Write the file header to the file.
+      this.header.write(out);
+
+      // Now we're ready to accept record data from the user.
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    public Path getPath() {
+      return this.path;
+    }
+
+    @Override
+    /**
+     * {@inheritDoc}
+     */
+    public long tell() throws IOException {
+      checkForNull(this.out);
+      this.out.flush();
+      return this.countingOut.getByteCount();
+    }
+
+    @Override
+    /**
+     * {@inheritDoc}
+     */
+    public void close() throws IOException {
+      finishRecord();
+      writeIndex();
+      if (this.out != null) {
+        this.out.close();
+        this.out = null;
+      }
+
+      if (this.countingOut != null) {
+        this.countingOut.close();
+        this.countingOut = null;
+      }
+    }
+
+    @Override
+    /**
+     * {@inheritDoc}
+     */
+    public void finishRecord() throws IOException {
+      if (null != this.userWriter) {
+        this.userWriter.close();
+        this.userWriter = null;
+      }
+
+      if (null != this.userCountingOutputStream) {
+
+        // If there is a wrapping stream for compression,
+        // close this first.
+        if (null != this.userOutputStream
+            && this.userOutputStream != this.userCountingOutputStream) {
+          this.userOutputStream.close();
+        }
+
+        // Now close the "main" stream.
+        this.userCountingOutputStream.close();
+
+        // Write the true length of the current record to the index.
+        updateIndex(this.userCountingOutputStream.getByteCount()
+            + RecordStartMark.START_MARK_LENGTH
+            + WritableUtils.getVIntSize(curEntryId)
+            + WritableUtils.getVIntSize(curClaimedLen));
+
+        this.userOutputStream = null;
+        this.userCountingOutputStream = null;
+      }
+
+      if (null != this.out) {
+        out.flush();
+      }
+    }
+
+    /**
+     * Write in the current IndexSegment, the true compressed length of the
+     * record we just finished writing.
+     * @param curRecordLen the true length in bytes of the compressed record.
+     */
+    private void updateIndex(long curRecordLen) throws IOException {
+      LOG.debug("Adding index entry: id=" + curEntryId
+          + "; len=" + curRecordLen);
+      indexSegments.getLast().addRecordLen(curRecordLen);
+      entriesInSegment++;
+      curEntryId++;
+    }
+
+    /**
+     * Write the index itself to the file.
+     */
+    private void writeIndex() throws IOException {
+
+      // Write out all the segments in turn.
+      // As we do so, reify their offsets into the IndexTable.
+      for (IndexSegment segment : indexSegments) {
+        long segmentOffset = tell();
+        segment.getTableEntry().setSegmentOffset(segmentOffset);
+
+        header.getStartMark().write(out);
+        segment.write(out);
+      }
+
+      long indexTableStartPos = tell(); // Save for the end of the file.
+      LOG.debug("IndexTable offset: " + indexTableStartPos);
+
+      header.getStartMark().write(out);
+      indexTable.write(out); // write the IndexTable record.
+
+      // Write the finale that tells us where the IndexTable begins.
+      header.getStartMark().write(out);
+      WritableUtils.writeVLong(out, SEGMENT_OFFSET_ID);
+      WritableUtils.writeVLong(out, indexTableStartPos);
+    }
+
+    /**
+     * Prepare to index a new record that will soon be written to the file.
+     * If this is is the first record in the current IndexSegment, we need
+     * to record its entryId and the current file position.
+     */
+    private void startRecordIndex() throws IOException {
+      if (entriesInSegment == maxEntriesPerSegment
+          || indexSegments.size() == 0) {
+        // The current segment is full. Start a new one.
+        this.entriesInSegment = 0;
+        IndexTableEntry tableEntry = new IndexTableEntry();
+        IndexSegment curSegment = new IndexSegment(tableEntry);
+        this.indexSegments.add(curSegment);
+
+        long filePos = tell();
+        LOG.debug("Starting IndexSegment; first id=" + curEntryId
+            + "; off=" + filePos);
+        tableEntry.setFirstIndexId(curEntryId);
+        tableEntry.setFirstIndexOffset(filePos);
+        tableEntry.setLastIndexOffset(filePos);
+        this.indexTable.add(tableEntry);
+      }
+    }
+
+    @Override
+    /**
+     * {@inheritDoc}
+     */
+    public OutputStream writeBlobRecord(long claimedLen) throws IOException {
+      finishRecord(); // finish any previous record.
+      checkForNull(this.out);
+      startRecordIndex();
+      this.header.getStartMark().write(out);
+      LOG.debug("Starting new record; id=" + curEntryId
+          + "; claimedLen=" + claimedLen);
+      WritableUtils.writeVLong(out, curEntryId);
+      WritableUtils.writeVLong(out, claimedLen);
+      this.curClaimedLen = claimedLen;
+      this.userCountingOutputStream = new CountingOutputStream(
+          new CloseShieldOutputStream(out));
+      if (null == this.codec) {
+        // No codec; pass thru the same OutputStream to the user.
+        this.userOutputStream = this.userCountingOutputStream;
+      } else {
+        // Wrap our CountingOutputStream in a compressing OutputStream to
+        // give to the user.
+        this.compressor.reset();
+        this.userOutputStream = new CompressorStream(
+            this.userCountingOutputStream, compressor);
+      }
+
+      return this.userOutputStream;
+    }
+
+    @Override
+    /**
+     * {@inheritDoc}
+     */
+    public java.io.Writer writeClobRecord(long len) throws IOException {
+      if (!isCharData) {
+        throw new IOException(
+            "Can only write CLOB data to a Clob-specific LobFile");
+      }
+
+      // Get a binary handle to the record and wrap it in a java.io.Writer.
+      writeBlobRecord(len);
+      this.userWriter = new OutputStreamWriter(userOutputStream);
+      return this.userWriter;
+    }
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/io/LobFile.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/io/LobReaderCache.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/io/LobReaderCache.java?rev=1190430&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/io/LobReaderCache.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/io/LobReaderCache.java Fri Oct 28 16:32:43 2011
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.io;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.cloudera.sqoop.io.LobFile;
+
+/**
+ * A cache of open LobFile.Reader objects.
+ * This maps from filenames to the open Reader, if any.  This uses the
+ * Singleton pattern. While nothing prevents multiple LobReaderCache
+ * instances, it is most useful to have a single global cache. This cache is
+ * internally synchronized; only one thread can insert or retrieve a reader
+ * from the cache at a time.
+ */
+public class LobReaderCache {
+
+  public static final Log LOG =
+      LogFactory.getLog(LobReaderCache.class.getName());
+
+  private Map<Path, LobFile.Reader> readerMap;
+
+  /**
+   * Open a LobFile for read access, returning a cached reader if one is
+   * available, or a new reader otherwise.
+   * @param path the path to the LobFile to open
+   * @param conf the configuration to use to access the FS.
+   * @throws IOException if there's an error opening the file.
+   */
+  public LobFile.Reader get(Path path, Configuration conf)
+      throws IOException {
+
+    LobFile.Reader reader = null;
+    Path canonicalPath = qualify(path, conf);
+    // Look up an entry in the cache.
+    synchronized(this) {
+      reader = readerMap.remove(canonicalPath);
+    }
+
+    if (null != reader && !reader.isClosed()) {
+      // Cache hit. return it.
+      LOG.debug("Using cached reader for " + canonicalPath);
+      return reader;
+    }
+
+    // Cache miss; open the file.
+    LOG.debug("No cached reader available for " + canonicalPath);
+    return LobFile.open(path, conf);
+  }
+
+  /**
+   * Return a reader back to the cache. If there's already a reader for
+   * this path, then the current reader is closed.
+   * @param reader the opened reader. Any record-specific subreaders should be
+   * closed.
+   * @throws IOException if there's an error accessing the path's filesystem.
+   */
+  public void recycle(LobFile.Reader reader) throws IOException {
+    Path canonicalPath = reader.getPath();
+
+    // Check if the cache has a reader for this path already. If not, add this.
+    boolean cached = false;
+    synchronized(this) {
+      if (readerMap.get(canonicalPath) == null) {
+        LOG.debug("Caching reader for path: " + canonicalPath);
+        readerMap.put(canonicalPath, reader);
+        cached = true;
+      }
+    }
+
+    if (!cached) {
+      LOG.debug("Reader already present for path: " + canonicalPath
+          + "; closing.");
+      reader.close();
+    }
+  }
+
+  @Override
+  protected synchronized void finalize() throws Throwable {
+    for (LobFile.Reader r : readerMap.values()) {
+      r.close();
+    }
+
+    super.finalize();
+  }
+
+  protected LobReaderCache() {
+    this.readerMap = new TreeMap<Path, LobFile.Reader>();
+  }
+
+  /**
+   * Created a fully-qualified path object.
+   * @param path the path to fully-qualify with its fs URI.
+   * @param conf the current Hadoop FS configuration.
+   * @return a new path representing the same location as the input 'path',
+   * but with a fully-qualified URI.
+   */
+  public static Path qualify(Path path, Configuration conf)
+      throws IOException {
+    if (null == path) {
+      return null;
+    }
+
+    FileSystem fs = path.getFileSystem(conf);
+    if (null == fs) {
+      fs = FileSystem.get(conf);
+    }
+    return path.makeQualified(fs);
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/io/LobReaderCache.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/io/NamedFifo.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/io/NamedFifo.java?rev=1190430&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/io/NamedFifo.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/io/NamedFifo.java Fri Oct 28 16:32:43 2011
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.io;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.util.Shell;
+import org.apache.log4j.Logger;
+
+/**
+ * A named FIFO channel.
+ */
+public class NamedFifo {
+
+  private static final Logger LOG = Logger.getLogger(NamedFifo.class);
+
+  private File fifoFile;
+
+  /** Create a named FIFO object at the local fs path given by 'pathname'. */
+  public NamedFifo(String pathname) {
+    this.fifoFile = new File(pathname);
+  }
+
+  /** Create a named FIFO object at the local fs path given by the 'fifo' File
+   * object. */
+  public NamedFifo(File fifo) {
+    this.fifoFile = fifo;
+  }
+
+  /**
+   * Return the File object representing the FIFO.
+   */
+  public File getFile() {
+    return this.fifoFile;
+  }
+
+  /**
+   * Create a named FIFO object.
+   * The pipe will be created with permissions 0600.
+   * @throws IOException on failure.
+   */
+  public void create() throws IOException {
+    create(0600);
+  }
+
+  /**
+   * Create a named FIFO object with the specified fs permissions.
+   * This depends on the 'mknod' or 'mkfifo' (Mac OS X) system utility
+   * existing. (for example, provided by Linux coreutils). This object
+   * will be deleted when the process exits.
+   * @throws IOException on failure.
+   */
+  public void create(int permissions) throws IOException {
+    String filename = fifoFile.toString();
+
+    // Format permissions as a mode string in base 8.
+    String modeStr = Integer.toString(permissions, 8);
+
+    // Create the FIFO itself.
+    try {
+      String output = Shell.execCommand("mknod", "--mode=0" + modeStr,
+          filename, "p");
+      LOG.info("mknod output:\n"+output);
+    } catch (IOException ex) {
+      LOG.info("IO error running mknod: " + ex.getMessage());
+      LOG.debug("IO error running mknod", ex);
+    }
+    if (!this.fifoFile.exists()) {
+      LOG.info("mknod failed, falling back to mkfifo");
+      String output = Shell.execCommand("mkfifo", "-m", "0" + modeStr,
+          filename);
+      LOG.info("mkfifo output:\n"+output);
+    }
+
+    // Schedule the FIFO to be cleaned up when we exit.
+    this.fifoFile.deleteOnExit();
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/io/NamedFifo.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/io/SplittableBufferedWriter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/io/SplittableBufferedWriter.java?rev=1190430&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/io/SplittableBufferedWriter.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/io/SplittableBufferedWriter.java Fri Oct 28 16:32:43 2011
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.io;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A BufferedWriter implementation that wraps around a SplittingOutputStream
+ * and allows splitting of the underlying stream.
+ * Splits occur at allowSplit() calls, or newLine() calls.
+ */
+public class SplittableBufferedWriter extends BufferedWriter {
+
+  public static final Log LOG = LogFactory.getLog(
+      SplittableBufferedWriter.class.getName());
+
+  private SplittingOutputStream splitOutputStream;
+  private boolean alwaysFlush;
+
+  public SplittableBufferedWriter(
+      final SplittingOutputStream splitOutputStream) {
+    super(new OutputStreamWriter(splitOutputStream));
+
+    this.splitOutputStream = splitOutputStream;
+    this.alwaysFlush = false;
+  }
+
+  /** For testing. */
+  protected SplittableBufferedWriter(
+    final SplittingOutputStream splitOutputStream, final boolean alwaysFlush) {
+    super(new OutputStreamWriter(splitOutputStream));
+
+    this.splitOutputStream = splitOutputStream;
+    this.alwaysFlush = alwaysFlush;
+  }
+
+  public void newLine() throws IOException {
+    super.newLine();
+    this.allowSplit();
+  }
+
+  public void allowSplit() throws IOException {
+    if (alwaysFlush) {
+      this.flush();
+    }
+    if (this.splitOutputStream.wouldSplit()) {
+      LOG.debug("Starting new split");
+      this.flush();
+      this.splitOutputStream.allowSplit();
+    }
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/io/SplittableBufferedWriter.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message