sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject svn commit: r1190430 [4/5] - in /incubator/sqoop/trunk: ./ src/java/com/cloudera/sqoop/io/ src/java/com/cloudera/sqoop/lib/ src/java/org/apache/sqoop/io/ src/java/org/apache/sqoop/lib/
Date Fri, 28 Oct 2011 16:32:45 GMT
Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/io/SplittingOutputStream.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/io/SplittingOutputStream.java?rev=1190430&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/io/SplittingOutputStream.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/io/SplittingOutputStream.java Fri Oct 28 16:32:43 2011
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Formatter;
+
+import org.apache.commons.io.output.CountingOutputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+/**
+ * An output stream that writes to an underlying filesystem, opening
+ * a new file after a specified number of bytes have been written to the
+ * current one.
+ */
+public class SplittingOutputStream extends OutputStream {
+
+  public static final Log LOG = LogFactory.getLog(
+      SplittingOutputStream.class.getName());
+
+  private OutputStream writeStream;
+  private CountingOutputStream countingFilterStream;
+  private Configuration conf;
+  private Path destDir;
+  private String filePrefix;
+  private long cutoffBytes;
+  private CompressionCodec codec;
+  private int fileNum;
+
+  /**
+   * Create a new SplittingOutputStream.
+   * @param conf the Configuration to use to interface with HDFS
+   * @param destDir the directory where the files will go (should already
+   *     exist).
+   * @param filePrefix the first part of the filename, which will be appended
+   *    by a number. This file will be placed inside destDir.
+   * @param cutoff the approximate number of bytes to use per file
+   * @param doGzip if true, then output files will be gzipped and have a .gz
+   *   suffix.
+   */
+  public SplittingOutputStream(final Configuration conf, final Path destDir,
+      final String filePrefix, final long cutoff, final CompressionCodec codec)
+      throws IOException {
+
+    this.conf = conf;
+    this.destDir = destDir;
+    this.filePrefix = filePrefix;
+    this.cutoffBytes = cutoff;
+    if (this.cutoffBytes < 0) {
+      this.cutoffBytes = 0; // splitting disabled.
+    }
+    this.codec = codec;
+    this.fileNum = 0;
+
+    openNextFile();
+  }
+
+  /** Initialize the OutputStream to the next file to write to.
+   */
+  private void openNextFile() throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+
+    StringBuffer sb = new StringBuffer();
+    Formatter fmt = new Formatter(sb);
+    fmt.format("%05d", this.fileNum++);
+    String filename = filePrefix + fmt.toString();
+    if (codec != null) {
+      filename = filename + codec.getDefaultExtension();
+    }
+    Path destFile = new Path(destDir, filename);
+    LOG.debug("Opening next output file: " + destFile);
+    if (fs.exists(destFile)) {
+      Path canonicalDest = destFile.makeQualified(fs);
+      throw new IOException("Destination file " + canonicalDest
+          + " already exists");
+    }
+
+    OutputStream fsOut = fs.create(destFile);
+
+    // Count how many actual bytes hit HDFS.
+    this.countingFilterStream = new CountingOutputStream(fsOut);
+
+    if (codec != null) {
+      // Wrap that in a compressing stream.
+      this.writeStream = codec.createOutputStream(this.countingFilterStream);
+    } else {
+      // Write to the counting stream directly.
+      this.writeStream = this.countingFilterStream;
+    }
+  }
+
+  /**
+   * @return true if allowSplit() would actually cause a split.
+   */
+  public boolean wouldSplit() {
+    return this.cutoffBytes > 0
+        && this.countingFilterStream.getByteCount() >= this.cutoffBytes;
+  }
+
+  /** If we've written more to the disk than the user's split size,
+   * open the next file.
+   */
+  private void checkForNextFile() throws IOException {
+    if (wouldSplit()) {
+      LOG.debug("Starting new split");
+      this.writeStream.flush();
+      this.writeStream.close();
+      openNextFile();
+    }
+  }
+
+  /** Defines a point in the stream when it is acceptable to split to a new
+      file; e.g., the end of a record.
+    */
+  public void allowSplit() throws IOException {
+    checkForNextFile();
+  }
+
+  public void close() throws IOException {
+    this.writeStream.close();
+  }
+
+  public void flush() throws IOException {
+    this.writeStream.flush();
+  }
+
+  public void write(byte [] b) throws IOException {
+    this.writeStream.write(b);
+  }
+
+  public void write(byte [] b, int off, int len) throws IOException {
+    this.writeStream.write(b, off, len);
+  }
+
+  public void write(int b) throws IOException {
+    this.writeStream.write(b);
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/io/SplittingOutputStream.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/io/UnsupportedCodecException.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/io/UnsupportedCodecException.java?rev=1190430&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/io/UnsupportedCodecException.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/io/UnsupportedCodecException.java Fri Oct 28 16:32:43 2011
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.io;
+
+import java.io.IOException;
+
+/**
+ * Thrown when a compression codec cannot be recognized.
+ */
+public class UnsupportedCodecException extends IOException {
+
+  public UnsupportedCodecException() {
+    super("UnsupportedCodecException");
+  }
+
+  public UnsupportedCodecException(String msg) {
+    super(msg);
+  }
+
+  public UnsupportedCodecException(Throwable cause) {
+    super(cause);
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/io/UnsupportedCodecException.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/BigDecimalSerializer.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/BigDecimalSerializer.java?rev=1190430&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/BigDecimalSerializer.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/BigDecimalSerializer.java Fri Oct 28 16:32:43 2011
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.lib;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Serialize BigDecimal classes to/from DataInput and DataOutput objects.
+ *
+ * BigDecimal is comprised of a BigInteger with an integer 'scale' field.
+ * The BigDecimal/BigInteger can also return itself as a 'long' value.
+ *
+ * We serialize in one of two formats:
+ *
+ *  First, check whether the BigInt can fit in a long:
+ *  boolean b = BigIntegerPart &gt; LONG_MAX || BigIntegerPart &lt; LONG_MIN
+ *
+ *  [int: scale][boolean: b == false][long: BigInt-part]
+ *  [int: scale][boolean: b == true][string: BigInt-part.toString()]
+ *
+ * TODO(aaron): Get this to work with Hadoop's Serializations framework.
+ */
+public final class BigDecimalSerializer {
+
+  private BigDecimalSerializer() { }
+
+  public static final BigInteger LONG_MAX_AS_BIGINT =
+      BigInteger.valueOf(Long.MAX_VALUE);
+  public static final BigInteger LONG_MIN_AS_BIGINT =
+      BigInteger.valueOf(Long.MIN_VALUE);
+
+  public static void write(BigDecimal d, DataOutput out) throws IOException {
+    int scale = d.scale();
+    BigInteger bigIntPart = d.unscaledValue();
+    boolean fastpath = bigIntPart.compareTo(LONG_MAX_AS_BIGINT) < 0
+        && bigIntPart .compareTo(LONG_MIN_AS_BIGINT) > 0;
+
+    out.writeInt(scale);
+    out.writeBoolean(fastpath);
+    if (fastpath) {
+      out.writeLong(bigIntPart.longValue());
+    } else {
+      Text.writeString(out, bigIntPart.toString());
+    }
+  }
+
+  public static BigDecimal readFields(DataInput in) throws IOException {
+    int scale = in.readInt();
+    boolean fastpath = in.readBoolean();
+    BigInteger unscaledIntPart;
+    if (fastpath) {
+      long unscaledValue = in.readLong();
+      unscaledIntPart = BigInteger.valueOf(unscaledValue);
+    } else {
+      String unscaledValueStr = Text.readString(in);
+      unscaledIntPart = new BigInteger(unscaledValueStr);
+    }
+
+    return new BigDecimal(unscaledIntPart, scale);
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/BigDecimalSerializer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/BlobRef.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/BlobRef.java?rev=1190430&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/BlobRef.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/BlobRef.java Fri Oct 28 16:32:43 2011
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.lib;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.regex.Matcher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.BytesWritable;
+
+import com.cloudera.sqoop.io.LobFile;
+
+/**
+ * BlobRef is a wrapper that holds a BLOB either directly, or a
+ * reference to a file that holds the BLOB data.
+ */
+public class BlobRef extends
+  com.cloudera.sqoop.lib.LobRef<byte[], BytesWritable, InputStream> {
+
+  public static final Log LOG = LogFactory.getLog(BlobRef.class.getName());
+
+  public BlobRef() {
+    super();
+  }
+
+  public BlobRef(byte [] bytes) {
+    super(new BytesWritable(bytes));
+  }
+
+  /**
+   * Initialize a BlobRef to an external BLOB.
+   * @param file the filename to the BLOB. May be relative to the job dir.
+   * @param offset the offset (in bytes) into the LobFile for this record.
+   * @param length the length of the record in bytes.
+   */
+  public BlobRef(String file, long offset, long length) {
+    super(file, offset, length);
+  }
+
+  @Override
+  protected InputStream getExternalSource(LobFile.Reader reader)
+      throws IOException {
+    return reader.readBlobRecord();
+  }
+
+  @Override
+  protected InputStream getInternalSource(BytesWritable data) {
+    return new ByteArrayInputStream(data.getBytes(), 0, data.getLength());
+  }
+
+  @Override
+  protected byte [] getInternalData(BytesWritable data) {
+    return Arrays.copyOf(data.getBytes(), data.getLength());
+  }
+
+  @Override
+  protected BytesWritable deepCopyData(BytesWritable data) {
+    return new BytesWritable(Arrays.copyOf(data.getBytes(), data.getLength()));
+  }
+
+  @Override
+  public void readFieldsInternal(DataInput in) throws IOException {
+    // For internally-stored BLOBs, the data is a BytesWritable
+    // containing the actual data.
+
+    BytesWritable data = getDataObj();
+
+    if (null == data) {
+      data = new BytesWritable();
+    }
+    data.readFields(in);
+    setDataObj(data);
+  }
+
+  @Override
+  public void writeInternal(DataOutput out) throws IOException {
+    getDataObj().write(out);
+  }
+
+  /**
+   * Create a BlobRef based on parsed data from a line of text.
+   * This only operates correctly on external blobs; inline blobs are simply
+   * returned as null. You should store BLOB data in SequenceFile format
+   * if reparsing is necessary.
+   * @param inputString the text-based input data to parse.
+   * @return a new BlobRef containing a reference to an external BLOB, or
+   * an empty BlobRef if the data to be parsed is actually inline.
+   */
+  public static com.cloudera.sqoop.lib.BlobRef parse(String inputString) {
+    // If inputString is of the form 'externalLob(lf,%s,%d,%d)', then this is
+    // an external BLOB stored at the LobFile indicated by '%s' with the next
+    // two arguments representing its offset and length in the file.
+    // Otherwise, it is an inline BLOB, which we don't support parsing of.
+
+    Matcher m = org.apache.sqoop.lib.LobRef.EXTERNAL_MATCHER.get();
+    m.reset(inputString);
+    if (m.matches()) {
+      // This is a LobFile. Extract the filename, offset and len from the
+      // matcher.
+      return new com.cloudera.sqoop.lib.BlobRef(m.group(1),
+          Long.valueOf(m.group(2)), Long.valueOf(m.group(3)));
+    } else {
+      // This is inline BLOB string data.
+      LOG.warn(
+          "Reparsing inline BLOB data is not supported; use SequenceFiles.");
+      return new com.cloudera.sqoop.lib.BlobRef();
+    }
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/BlobRef.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/BooleanParser.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/BooleanParser.java?rev=1190430&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/BooleanParser.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/BooleanParser.java Fri Oct 28 16:32:43 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.lib;
+
+/**
+ * Parse string representations of boolean values into boolean
+ * scalar types.
+ */
+public final class BooleanParser {
+
+  /**
+   * Return a boolean based on the value contained in the string.
+   *
+   * <p>The following values are considered true:
+   * "true", "t", "yes", "on", "1".</p>
+   * <p>All other values, including 'null', are false.</p>
+   * <p>All comparisons are case-insensitive.</p>
+   */
+  public static boolean valueOf(final String s) {
+    return s != null && ("true".equalsIgnoreCase(s) || "t".equalsIgnoreCase(s)
+        || "1".equals(s) || "on".equalsIgnoreCase(s)
+        || "yes".equalsIgnoreCase(s));
+  }
+
+  private BooleanParser() { }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/BooleanParser.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/ClobRef.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/ClobRef.java?rev=1190430&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/ClobRef.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/ClobRef.java Fri Oct 28 16:32:43 2011
@@ -0,0 +1,113 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.lib;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+import java.util.regex.Matcher;
+
+import org.apache.hadoop.io.Text;
+
+import com.cloudera.sqoop.io.LobFile;
+
+/**
+ * ClobRef is a wrapper that holds a CLOB either directly, or a
+ * reference to a file that holds the CLOB data.
+ */
+public class ClobRef
+  extends com.cloudera.sqoop.lib.LobRef<String, String, Reader> {
+
+  public ClobRef() {
+    super();
+  }
+
+  public ClobRef(String chars) {
+    super(chars);
+  }
+
+  /**
+   * Initialize a clobref to an external CLOB.
+   * @param file the filename to the CLOB. May be relative to the job dir.
+   * @param offset the offset (in bytes) into the LobFile for this record.
+   * @param length the length of the record in characters.
+   */
+  public ClobRef(String file, long offset, long length) {
+    super(file, offset, length);
+  }
+
+  @Override
+  protected Reader getExternalSource(LobFile.Reader reader)
+      throws IOException {
+    return reader.readClobRecord();
+  }
+
+  @Override
+  protected Reader getInternalSource(String data) {
+    return new StringReader(data);
+  }
+
+  @Override
+  protected String deepCopyData(String data) {
+    return data;
+  }
+
+  @Override
+  protected String getInternalData(String data) {
+    return data;
+  }
+
+  @Override
+  public void readFieldsInternal(DataInput in) throws IOException {
+    // For internally-stored clobs, the data is written as UTF8 Text.
+    setDataObj(Text.readString(in));
+  }
+
+  @Override
+  public void writeInternal(DataOutput out) throws IOException {
+    Text.writeString(out, getDataObj());
+  }
+
+  /**
+   * Create a ClobRef based on parsed data from a line of text.
+   * @param inputString the text-based input data to parse.
+   * @return a ClobRef to the given data.
+   */
+  public static com.cloudera.sqoop.lib.ClobRef parse(String inputString) {
+    // If inputString is of the form 'externalLob(lf,%s,%d,%d)', then this is
+    // an external CLOB stored at the LobFile indicated by '%s' with the next
+    // two arguments representing its offset and length in the file.
+    // Otherwise, it is an inline CLOB, which we read as-is.
+
+    Matcher m = EXTERNAL_MATCHER.get();
+    m.reset(inputString);
+    if (m.matches()) {
+      // This is a LobFile. Extract the filename, offset and len from the
+      // matcher.
+      return new com.cloudera.sqoop.lib.ClobRef(m.group(1),
+          Long.valueOf(m.group(2)), Long.valueOf(m.group(3)));
+    } else {
+      // This is inline CLOB string data.
+      return new com.cloudera.sqoop.lib.ClobRef(inputString);
+    }
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/ClobRef.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/DelimiterSet.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/DelimiterSet.java?rev=1190430&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/DelimiterSet.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/DelimiterSet.java Fri Oct 28 16:32:43 2011
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.lib;
+
+
+/**
+ * Encapsulates a set of delimiters used to encode a record.
+ */
+public class DelimiterSet implements Cloneable {
+
+  public static final char NULL_CHAR = '\000';
+
+  private char fieldDelim; // fields terminated by this.
+  private char recordDelim; // records terminated by this.
+
+  // If these next two fields are '\000', then they are ignored.
+  private char enclosedBy;
+  private char escapedBy;
+
+  // If true, then the enclosed-by character is applied to every
+  // field, not just ones containing embedded delimiters.
+  private boolean encloseRequired;
+
+  /**
+   * Create a delimiter set with the default delimiters
+   * (comma for fields, newline for records).
+   */
+  public DelimiterSet() {
+    this(',', '\n', NULL_CHAR, NULL_CHAR, false);
+  }
+
+  /**
+   * Create a delimiter set with the specified delimiters.
+   * @param field the fields-terminated-by delimiter
+   * @param record the lines-terminated-by delimiter
+   * @param enclose the enclosed-by character
+   * @param escape the escaped-by character
+   * @param isEncloseRequired If true, enclosed-by is applied to all
+   * fields. If false, only applied to fields that embed delimiters.
+   */
+  public DelimiterSet(char field, char record, char enclose, char escape,
+      boolean isEncloseRequired) {
+    this.fieldDelim = field;
+    this.recordDelim = record;
+    this.enclosedBy = enclose;
+    this.escapedBy = escape;
+    this.encloseRequired = isEncloseRequired;
+  }
+
+  /**
+   * Sets the fields-terminated-by character.
+   */
+  public void setFieldsTerminatedBy(char f) {
+    this.fieldDelim = f;
+  }
+
+  /**
+   * @return the fields-terminated-by character.
+   */
+  public char getFieldsTerminatedBy() {
+    return this.fieldDelim;
+  }
+
+  /**
+   * Sets the end-of-record lines-terminated-by character.
+   */
+  public void setLinesTerminatedBy(char r) {
+    this.recordDelim = r;
+  }
+
+  /**
+   * @return the end-of-record (lines-terminated-by) character.
+   */
+  public char getLinesTerminatedBy() {
+    return this.recordDelim;
+  }
+
+  /**
+   * Sets the enclosed-by character.
+   * @param e the enclosed-by character, or '\000' for no enclosing character.
+   */
+  public void setEnclosedBy(char e) {
+    this.enclosedBy = e;
+  }
+
+  /**
+   * @return the enclosed-by character, or '\000' for none.
+   */
+  public char getEnclosedBy() {
+    return this.enclosedBy;
+  }
+
+  /**
+   * Sets the escaped-by character.
+   * @param e the escaped-by character, or '\000' for no escape character.
+   */
+  public void setEscapedBy(char e) {
+    this.escapedBy = e;
+  }
+
+  /**
+   * @return the escaped-by character, or '\000' for none.
+   */
+  public char getEscapedBy() {
+    return this.escapedBy;
+  }
+
+  /**
+   * Set whether the enclosed-by character must be applied to all fields,
+   * or only fields with embedded delimiters.
+   */
+  public void setEncloseRequired(boolean required) {
+    this.encloseRequired = required;
+  }
+
+  /**
+   * @return true if the enclosed-by character must be applied to all fields,
+   * or false if it's only used for fields with embedded delimiters.
+   */
+  public boolean isEncloseRequired() {
+    return this.encloseRequired;
+  }
+
+  @Override
+  /**
+   * @return a string representation of the delimiters.
+   */
+  public String toString() {
+    return "fields=" + this.fieldDelim
+        + " records=" + this.recordDelim
+        + " escape=" + this.escapedBy
+        + " enclose=" + this.enclosedBy
+        + " required=" + this.encloseRequired;
+  }
+
+  /**
+   * Format this set of delimiters as a call to the constructor for
+   * this object, that would generate identical delimiters.
+   * @return a String that can be embedded in generated code that
+   * provides this set of delimiters.
+   */
+  public String formatConstructor() {
+    return "new DelimiterSet((char) " + (int) this.fieldDelim + ", "
+        + "(char) " + (int) this.recordDelim + ", "
+        + "(char) " + (int) this.enclosedBy + ", "
+        + "(char) " + (int) this.escapedBy + ", "
+        + this.encloseRequired + ")";
+  }
+
+  @Override
+  /**
+   * @return a hash code for this set of delimiters.
+   */
+  public int hashCode() {
+    return (int) this.fieldDelim
+        + (((int) this.recordDelim) << 4)
+        + (((int) this.escapedBy) << 8)
+        + (((int) this.enclosedBy) << 12)
+        + (((int) this.recordDelim) << 16)
+        + (this.encloseRequired ? 0xFEFE : 0x7070);
+  }
+
+  @Override
+  /**
+   * @return true if this delimiter set is the same as another set of
+   * delimiters.
+   */
+  public boolean equals(Object other) {
+    if (null == other) {
+      return false;
+    } else if (!other.getClass().equals(getClass())) {
+      return false;
+    }
+
+    DelimiterSet set = (DelimiterSet) other;
+    return this.fieldDelim == set.fieldDelim
+        && this.recordDelim == set.recordDelim
+        && this.escapedBy == set.escapedBy
+        && this.enclosedBy == set.enclosedBy
+        && this.encloseRequired == set.encloseRequired;
+  }
+
+  @Override
+  /**
+   * @return a new copy of this same set of delimiters.
+   */
+  public Object clone() throws CloneNotSupportedException {
+    return super.clone();
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/DelimiterSet.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/FieldFormatter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/FieldFormatter.java?rev=1190430&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/FieldFormatter.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/FieldFormatter.java Fri Oct 28 16:32:43 2011
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.lib;
+
+/**
+ * Static helper class that will help format data with quotes and escape chars.
+ */
+public final class FieldFormatter {
+
+  /**
+   * only pass fields that are strings when --hive-drop-delims option is on.
+   * @param str
+   * @param delimiters
+   * @return
+   */
+  public static String hiveStringDropDelims(String str,
+      com.cloudera.sqoop.lib.DelimiterSet delimiters) {
+    return hiveStringReplaceDelims(str, "", delimiters);
+  }
+
+  /**
+   * replace hive delimiters with a user-defined string passed to the
+   * --hive-delims-replacement option.
+   * @param str
+   * @param delimiters
+   * @return
+   */
+  public static String hiveStringReplaceDelims(String str, String replacement,
+      com.cloudera.sqoop.lib.DelimiterSet delimiters) {
+    String droppedDelims = str.replaceAll("\\n|\\r|\01", replacement);
+    return escapeAndEnclose(droppedDelims, delimiters);
+  }
+
+  /**
+   * Takes an input string representing the value of a field, encloses it in
+   * enclosing chars, and escapes any occurrences of such characters in the
+   * middle.  The escape character itself is also escaped if it appears in the
+   * text of the field.  If there is no enclosing character, then any
+   * delimiters present in the field body are escaped instead.
+   *
+   * The field is enclosed only if:
+   *   enclose != '\000', and:
+   *     encloseRequired is true, or
+   *     one of the fields-terminated-by or lines-terminated-by characters is
+   *     present in the string.
+   *
+   * Escaping is not performed if the escape char is '\000'.
+   *
+   * @param str - The user's string to escape and enclose
+   * @param delimiters - The DelimiterSet to use identifying the escape and
+   * enclose semantics. If the specified escape or enclose characters are
+   * '\000', those operations are not performed.
+   * @return the escaped, enclosed version of 'str'.
+   */
+  public static String escapeAndEnclose(String str,
+      com.cloudera.sqoop.lib.DelimiterSet delimiters) {
+
+    char escape = delimiters.getEscapedBy();
+    char enclose = delimiters.getEnclosedBy();
+    boolean encloseRequired = delimiters.isEncloseRequired();
+
+    // true if we can use an escape character.
+    boolean escapingLegal =
+        com.cloudera.sqoop.lib.DelimiterSet.NULL_CHAR != escape;
+    String withEscapes;
+
+    if (null == str) {
+      return null;
+    }
+
+    if (escapingLegal) {
+      // escaping is legal. Escape any instances of the escape char itself.
+      withEscapes = str.replace("" + escape, "" + escape + escape);
+    } else {
+      // no need to double-escape
+      withEscapes = str;
+    }
+
+    if (com.cloudera.sqoop.lib.DelimiterSet.NULL_CHAR == enclose) {
+      // The enclose-with character was left unset, so we can't enclose items.
+
+      if (escapingLegal) {
+        // If the user has used the fields-terminated-by or
+        // lines-terminated-by characters in the string, escape them if we
+        // have an escape character.
+        String fields = "" + delimiters.getFieldsTerminatedBy();
+        String lines = "" + delimiters.getLinesTerminatedBy();
+        withEscapes = withEscapes.replace(fields, "" + escape + fields);
+        withEscapes = withEscapes.replace(lines, "" + escape + lines);
+      }
+
+      // No enclosing possible, so now return this.
+      return withEscapes;
+    }
+
+    // if we have an enclosing character, and escaping is legal, then the
+    // encloser must always be escaped.
+    if (escapingLegal) {
+      withEscapes = withEscapes.replace("" + enclose, "" + escape + enclose);
+    }
+
+    boolean actuallyDoEnclose = encloseRequired;
+    if (!actuallyDoEnclose) {
+      // check if the string requires enclosing.
+      char [] mustEncloseFor = new char[2];
+      mustEncloseFor[0] = delimiters.getFieldsTerminatedBy();
+      mustEncloseFor[1] = delimiters.getLinesTerminatedBy();
+      for (char reason : mustEncloseFor) {
+        if (str.indexOf(reason) != -1) {
+          actuallyDoEnclose = true;
+          break;
+        }
+      }
+    }
+
+    if (actuallyDoEnclose) {
+      return "" + enclose + withEscapes + enclose;
+    } else {
+      return withEscapes;
+    }
+  }
+
+  private FieldFormatter() { }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/FieldFormatter.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/FieldMapProcessor.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/FieldMapProcessor.java?rev=1190430&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/FieldMapProcessor.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/FieldMapProcessor.java Fri Oct 28 16:32:43 2011
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.lib;
+
+import java.io.IOException;
+
+import com.cloudera.sqoop.lib.FieldMappable;
+import com.cloudera.sqoop.lib.ProcessingException;
+
+/**
+ * Interface implemented by classes that process FieldMappable objects.
+ */
+public interface FieldMapProcessor {
+
+  /**
+   * Allow arbitrary processing of a FieldMappable object.
+   * @param record an object which can emit a map of its field names to values.
+   * @throws IOException if the processor encounters an IO error when
+   * operating on this object.
+   * @throws ProcessingException if the FieldMapProcessor encounters
+   * a general processing error when operating on this object.
+   */
+  void accept(FieldMappable record) throws IOException, ProcessingException;
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/FieldMapProcessor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/FieldMappable.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/FieldMappable.java?rev=1190430&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/FieldMappable.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/FieldMappable.java Fri Oct 28 16:32:43 2011
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.lib;
+
+import java.util.Map;
+
+/**
+ * Interface describing a class capable of returning a map of the fields
+ * of the object to their values.
+ */
+public interface FieldMappable {
+
+  /**
+   * Returns a map containing all fields of this record.
+   * @return a map from column names to the object-based values for
+   * this record. The map may not be null, though it may be empty.
+   */
+  Map<String, Object> getFieldMap();
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/FieldMappable.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/JdbcWritableBridge.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/JdbcWritableBridge.java?rev=1190430&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/JdbcWritableBridge.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/JdbcWritableBridge.java Fri Oct 28 16:32:43 2011
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.lib;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+import org.apache.hadoop.io.BytesWritable;
+
+import com.cloudera.sqoop.lib.BlobRef;
+import com.cloudera.sqoop.lib.ClobRef;
+
+/**
+ * Contains a set of methods which can read db columns from a ResultSet into
+ * Java types, and do serialization of these types to/from DataInput/DataOutput
+ * for use with Hadoop's Writable implementation. This supports null values
+ * for all types.
+ */
+public final class JdbcWritableBridge {
+
+  // Currently, cap BLOB/CLOB objects at 16 MB until we can use external
+  // storage.
+  public static final long MAX_BLOB_LENGTH = 16 * 1024 * 1024;
+  public static final long MAX_CLOB_LENGTH = 16 * 1024 * 1024;
+
+  private JdbcWritableBridge() {
+  }
+
+  public static Integer readInteger(int colNum, ResultSet r)
+      throws SQLException {
+    int val;
+    val = r.getInt(colNum);
+    if (r.wasNull()) {
+      return null;
+    } else {
+      return Integer.valueOf(val);
+    }
+  }
+
+  public static Long readLong(int colNum, ResultSet r) throws SQLException {
+    long val;
+    val = r.getLong(colNum);
+    if (r.wasNull()) {
+      return null;
+    } else {
+      return Long.valueOf(val);
+    }
+  }
+
+  public static String readString(int colNum, ResultSet r) throws SQLException {
+    return r.getString(colNum);
+  }
+
+  public static Float readFloat(int colNum, ResultSet r) throws SQLException {
+    float val;
+    val = r.getFloat(colNum);
+    if (r.wasNull()) {
+      return null;
+    } else {
+      return Float.valueOf(val);
+    }
+  }
+
+  public static Double readDouble(int colNum, ResultSet r) throws SQLException {
+    double val;
+    val = r.getDouble(colNum);
+    if (r.wasNull()) {
+      return null;
+    } else {
+      return Double.valueOf(val);
+    }
+  }
+
+  public static Boolean readBoolean(int colNum, ResultSet r)
+      throws SQLException {
+    boolean val;
+    val = r.getBoolean(colNum);
+    if (r.wasNull()) {
+      return null;
+    } else {
+      return Boolean.valueOf(val);
+    }
+  }
+
+  public static Time readTime(int colNum, ResultSet r) throws SQLException {
+    return r.getTime(colNum);
+  }
+
+  public static Timestamp readTimestamp(int colNum, ResultSet r)
+      throws SQLException {
+    return r.getTimestamp(colNum);
+  }
+
+  public static Date readDate(int colNum, ResultSet r) throws SQLException {
+    return r.getDate(colNum);
+  }
+
+  public static BytesWritable readBytesWritable(int colNum, ResultSet r)
+      throws SQLException {
+    byte [] bytes = r.getBytes(colNum);
+    return bytes == null ? null : new BytesWritable(bytes);
+  }
+
+  public static BigDecimal readBigDecimal(int colNum, ResultSet r)
+      throws SQLException {
+    return r.getBigDecimal(colNum);
+  }
+
+  public static BlobRef readBlobRef(int colNum, ResultSet r)
+      throws SQLException {
+    // Loading of BLOBs is delayed; handled by LargeObjectLoader.
+    return null;
+  }
+
+  public static ClobRef readClobRef(int colNum, ResultSet r)
+      throws SQLException {
+    // Loading of CLOBs is delayed; handled by LargeObjectLoader.
+    return null;
+  }
+
+  public static void writeInteger(Integer val, int paramIdx, int sqlType,
+      PreparedStatement s) throws SQLException {
+    if (null == val) {
+      s.setNull(paramIdx, sqlType);
+    } else {
+      s.setInt(paramIdx, val);
+    }
+  }
+
+  public static void writeLong(Long val, int paramIdx, int sqlType,
+      PreparedStatement s) throws SQLException {
+    if (null == val) {
+      s.setNull(paramIdx, sqlType);
+    } else {
+      s.setLong(paramIdx, val);
+    }
+  }
+
+  public static void writeDouble(Double val, int paramIdx, int sqlType,
+      PreparedStatement s) throws SQLException {
+    if (null == val) {
+      s.setNull(paramIdx, sqlType);
+    } else {
+      s.setDouble(paramIdx, val);
+    }
+  }
+
+  public static void writeBoolean(Boolean val, int paramIdx, int sqlType,
+      PreparedStatement s) throws SQLException {
+    if (null == val) {
+      s.setNull(paramIdx, sqlType);
+    } else {
+      s.setBoolean(paramIdx, val);
+    }
+  }
+
+  public static void writeFloat(Float val, int paramIdx, int sqlType,
+      PreparedStatement s) throws SQLException {
+    if (null == val) {
+      s.setNull(paramIdx, sqlType);
+    } else {
+      s.setFloat(paramIdx, val);
+    }
+  }
+
+  public static void writeString(String val, int paramIdx, int sqlType,
+      PreparedStatement s) throws SQLException {
+    if (null == val) {
+      s.setNull(paramIdx, sqlType);
+    } else {
+      s.setString(paramIdx, val);
+    }
+  }
+
+  public static void writeTimestamp(Timestamp val, int paramIdx, int sqlType,
+      PreparedStatement s) throws SQLException {
+    if (null == val) {
+      s.setNull(paramIdx, sqlType);
+    } else {
+      s.setTimestamp(paramIdx, val);
+    }
+  }
+
+  public static void writeTime(Time val, int paramIdx, int sqlType,
+      PreparedStatement s) throws SQLException {
+    if (null == val) {
+      s.setNull(paramIdx, sqlType);
+    } else {
+      s.setTime(paramIdx, val);
+    }
+  }
+
+  public static void writeDate(Date val, int paramIdx, int sqlType,
+      PreparedStatement s) throws SQLException {
+    if (null == val) {
+      s.setNull(paramIdx, sqlType);
+    } else {
+      s.setDate(paramIdx, val);
+    }
+  }
+
+  public static void writeBytesWritable(BytesWritable val, int paramIdx,
+      int sqlType, PreparedStatement s) throws SQLException {
+    if (null == val) {
+      s.setNull(paramIdx, sqlType);
+    } else {
+      // val.getBytes() is only valid in [0, len)
+      byte [] rawBytes = val.getBytes();
+      int len = val.getLength();
+      byte [] outBytes = new byte[len];
+      System.arraycopy(rawBytes, 0, outBytes, 0, len);
+      s.setBytes(paramIdx, outBytes);
+    }
+  }
+
+  public static void writeBigDecimal(BigDecimal val, int paramIdx,
+      int sqlType, PreparedStatement s) throws SQLException {
+    if (null == val) {
+      s.setNull(paramIdx, sqlType);
+    } else {
+      s.setBigDecimal(paramIdx, val);
+    }
+  }
+
+  public static void writeBlobRef(com.cloudera.sqoop.lib.BlobRef val,
+      int paramIdx, int sqlType, PreparedStatement s) throws SQLException {
+    // TODO: support this.
+    throw new RuntimeException("Unsupported: Cannot export BLOB data");
+  }
+
+  public static void writeClobRef(com.cloudera.sqoop.lib.ClobRef val,
+      int paramIdx, int sqlType, PreparedStatement s) throws SQLException {
+    // TODO: support this.
+    throw new RuntimeException("Unsupported: Cannot export CLOB data");
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/JdbcWritableBridge.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/LargeObjectLoader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/LargeObjectLoader.java?rev=1190430&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/LargeObjectLoader.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/LargeObjectLoader.java Fri Oct 28 16:32:43 2011
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.lib;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.io.Writer;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.cloudera.sqoop.io.LobFile;
+import com.cloudera.sqoop.util.TaskId;
+
+/**
+ * Contains a set of methods which can read db columns from a ResultSet into
+ * Java types, and do serialization of these types to/from DataInput/DataOutput
+ * for use with Hadoop's Writable implementation. This supports null values
+ * for all types.
+ *
+ * This is a singleton instance class; only one may exist at a time.
+ * However, its lifetime is limited to the current TaskInputOutputContext's
+ * life.
+ */
+public class LargeObjectLoader implements Closeable  {
+
+  // Spill to external storage for BLOB/CLOB objects > 16 MB.
+  public static final long DEFAULT_MAX_LOB_LENGTH = 16 * 1024 * 1024;
+
+  public static final String MAX_INLINE_LOB_LEN_KEY =
+      "sqoop.inline.lob.length.max";
+
+  private Configuration conf;
+  private Path workPath;
+  private FileSystem fs;
+
+  // Handles to the open BLOB / CLOB file writers.
+  private LobFile.Writer curBlobWriter;
+  private LobFile.Writer curClobWriter;
+
+  // Counter that is used with the current task attempt id to
+  // generate unique LOB file names.
+  private long nextLobFileId = 0;
+
+  /**
+   * Create a new LargeObjectLoader.
+   * @param conf the Configuration to use
+   * @param workPath the HDFS working directory for this task.
+   */
+  public LargeObjectLoader(Configuration conf, Path workPath)
+      throws IOException {
+    this.conf = conf;
+    this.workPath = workPath;
+    this.fs = FileSystem.get(conf);
+    this.curBlobWriter = null;
+    this.curClobWriter = null;
+  }
+
+  @Override
+  protected synchronized void finalize() throws Throwable {
+    close();
+    super.finalize();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (null != curBlobWriter) {
+      curBlobWriter.close();
+      curBlobWriter = null;
+    }
+
+    if (null != curClobWriter) {
+      curClobWriter.close();
+      curClobWriter = null;
+    }
+  }
+
+  /**
+   * @return a filename to use to put an external LOB in.
+   */
+  private String getNextLobFileName() {
+    String file = "_lob/large_obj_" + TaskId.get(conf, "unknown_task_id")
+        + nextLobFileId + ".lob";
+    nextLobFileId++;
+
+    return file;
+  }
+
+  /**
+   * Calculates a path to a new LobFile object, creating any
+   * missing directories.
+   * @return a Path to a LobFile to write
+   */
+  private Path getNextLobFilePath() throws IOException {
+    Path p = new Path(workPath, getNextLobFileName());
+    Path parent = p.getParent();
+    if (!fs.exists(parent)) {
+      fs.mkdirs(parent);
+    }
+
+    return p;
+  }
+
+  /**
+   * @return the current LobFile writer for BLOBs, creating one if necessary.
+   */
+  private LobFile.Writer getBlobWriter() throws IOException {
+    if (null == this.curBlobWriter) {
+      this.curBlobWriter = LobFile.create(getNextLobFilePath(), conf, false);
+    }
+
+    return this.curBlobWriter;
+  }
+
+  /**
+   * @return the current LobFile writer for CLOBs, creating one if necessary.
+   */
+  private LobFile.Writer getClobWriter() throws IOException {
+    if (null == this.curClobWriter) {
+      this.curClobWriter = LobFile.create(getNextLobFilePath(), conf, true);
+    }
+
+    return this.curClobWriter;
+  }
+
+  /**
+   * Returns the path being written to by a given LobFile.Writer, relative
+   * to the working directory of this LargeObjectLoader.
+   * @param w the LobFile.Writer whose path should be examined.
+   * @return the path this is writing to, relative to the current working dir.
+   */
+  private String getRelativePath(LobFile.Writer w) {
+    Path writerPath = w.getPath();
+
+    String writerPathStr = writerPath.toString();
+    String workPathStr = workPath.toString();
+    if (!workPathStr.endsWith(File.separator)) {
+      workPathStr = workPathStr + File.separator;
+    }
+
+    if (writerPathStr.startsWith(workPathStr)) {
+      return writerPathStr.substring(workPathStr.length());
+    }
+
+    // Outside the working dir; return the whole thing.
+    return writerPathStr;
+  }
+
+  /**
+   * Copies all character data from the provided Reader to the provided
+   * Writer. Does not close handles when it's done.
+   * @param reader data source
+   * @param writer data sink
+   * @throws IOException if an I/O error occurs either reading or writing.
+   */
+  private void copyAll(Reader reader, Writer writer) throws IOException {
+    int bufferSize = conf.getInt("io.file.buffer.size",
+        4096);
+    char [] buf = new char[bufferSize];
+
+    while (true) {
+      int charsRead = reader.read(buf);
+      if (-1 == charsRead) {
+        break; // no more stream to read.
+      }
+      writer.write(buf, 0, charsRead);
+    }
+  }
+
+  /**
+   * Copies all byte data from the provided InputStream to the provided
+   * OutputStream. Does not close handles when it's done.
+   * @param input data source
+   * @param output data sink
+   * @throws IOException if an I/O error occurs either reading or writing.
+   */
+  private void copyAll(InputStream input, OutputStream output)
+      throws IOException {
+    int bufferSize = conf.getInt("io.file.buffer.size",
+        4096);
+    byte [] buf = new byte[bufferSize];
+
+    while (true) {
+      int bytesRead = input.read(buf, 0, bufferSize);
+      if (-1 == bytesRead) {
+        break; // no more stream to read.
+      }
+      output.write(buf, 0, bytesRead);
+    }
+  }
+
+  /**
+   * Actually read a BlobRef instance from the ResultSet and materialize
+   * the data either inline or to a file.
+   *
+   * @param colNum the column of the ResultSet's current row to read.
+   * @param r the ResultSet to read from.
+   * @return a BlobRef encapsulating the data in this field.
+   * @throws IOException if an error occurs writing to the FileSystem.
+   * @throws SQLException if an error occurs reading from the database.
+   */
+  public com.cloudera.sqoop.lib.BlobRef readBlobRef(int colNum, ResultSet r)
+      throws IOException, InterruptedException, SQLException {
+
+    long maxInlineLobLen = conf.getLong(
+        MAX_INLINE_LOB_LEN_KEY,
+        DEFAULT_MAX_LOB_LENGTH);
+
+    Blob b = r.getBlob(colNum);
+    if (null == b) {
+      return null;
+    } else if (b.length() > maxInlineLobLen) {
+      // Deserialize very large BLOBs into separate files.
+      long len = b.length();
+      LobFile.Writer lobWriter = getBlobWriter();
+
+      long recordOffset = lobWriter.tell();
+      InputStream is = null;
+      OutputStream os = lobWriter.writeBlobRecord(len);
+      try {
+        is = b.getBinaryStream();
+        copyAll(is, os);
+      } finally {
+        if (null != os) {
+          os.close();
+        }
+
+        if (null != is) {
+          is.close();
+        }
+
+        // Mark the record as finished.
+        lobWriter.finishRecord();
+      }
+
+      return new com.cloudera.sqoop.lib.BlobRef(
+          getRelativePath(curBlobWriter), recordOffset, len);
+    } else {
+      // This is a 1-based array.
+      return new com.cloudera.sqoop.lib.BlobRef(
+          b.getBytes(1, (int) b.length()));
+    }
+  }
+
+
+  /**
+   * Actually read a ClobRef instance from the ResultSet and materialize
+   * the data either inline or to a file.
+   *
+   * @param colNum the column of the ResultSet's current row to read.
+   * @param r the ResultSet to read from.
+   * @return a ClobRef encapsulating the data in this field.
+   * @throws IOException if an error occurs writing to the FileSystem.
+   * @throws SQLException if an error occurs reading from the database.
+   */
+  public com.cloudera.sqoop.lib.ClobRef readClobRef(int colNum, ResultSet r)
+      throws IOException, InterruptedException, SQLException {
+
+    long maxInlineLobLen = conf.getLong(
+        MAX_INLINE_LOB_LEN_KEY,
+        DEFAULT_MAX_LOB_LENGTH);
+
+    Clob c = r.getClob(colNum);
+    if (null == c) {
+      return null;
+    } else if (c.length() > maxInlineLobLen) {
+      // Deserialize large CLOB into separate file.
+      long len = c.length();
+      LobFile.Writer lobWriter = getClobWriter();
+
+      long recordOffset = lobWriter.tell();
+      Reader reader = null;
+      Writer w = lobWriter.writeClobRecord(len);
+      try {
+        reader = c.getCharacterStream();
+        copyAll(reader, w);
+      } finally {
+        if (null != w) {
+          w.close();
+        }
+
+        if (null != reader) {
+          reader.close();
+        }
+
+        // Mark the record as finished.
+        lobWriter.finishRecord();
+      }
+
+      return new com.cloudera.sqoop.lib.ClobRef(
+          getRelativePath(lobWriter), recordOffset, len);
+    } else {
+      // This is a 1-based array.
+      return new com.cloudera.sqoop.lib.ClobRef(
+          c.getSubString(1, (int) c.length()));
+    }
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/LargeObjectLoader.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/LobRef.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/LobRef.java?rev=1190430&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/LobRef.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/LobRef.java Fri Oct 28 16:32:43 2011
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.lib;
+
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import com.cloudera.sqoop.io.LobFile;
+import com.cloudera.sqoop.io.LobReaderCache;
+
+/**
+ * Abstract base class that holds a reference to a Blob or a Clob.
+ * DATATYPE is the type being held (e.g., a byte array).
+ * CONTAINERTYPE is the type used to hold this data (e.g., BytesWritable).
+ * ACCESSORTYPE is the type used to access this data in a streaming fashion
+ *   (either an InputStream or a Reader).
+ */
+public abstract class LobRef<DATATYPE, CONTAINERTYPE, ACCESSORTYPE>
+    implements Closeable, Writable {
+
+  public static final Log LOG = LogFactory.getLog(LobRef.class.getName());
+
+  protected LobRef() {
+    this.fileName = null;
+    this.offset = 0;
+    this.length = 0;
+
+    this.realData = null;
+  }
+
+  protected LobRef(CONTAINERTYPE container) {
+    this.fileName = null;
+    this.offset = 0;
+    this.length = 0;
+
+    this.realData = container;
+  }
+
+  protected LobRef(String file, long offset, long length) {
+    this.fileName = file;
+    this.offset = offset;
+    this.length = length;
+
+    this.realData = null;
+  }
+
+  // If the data is 'small', it's held directly, here.
+  private CONTAINERTYPE realData;
+
+  /** Internal API to retrieve the data object. */
+  protected CONTAINERTYPE getDataObj() {
+    return realData;
+  }
+
+  /** Internal API to set the data object. */
+  protected void setDataObj(CONTAINERTYPE data) {
+    this.realData = data;
+  }
+
+  // If there data is too large to materialize fully, it's written into a file
+  // whose path (relative to the rest of the dataset) is recorded here. This
+  // takes precedence if the value fof fileName is non-null. These records are
+  // currently written into LobFile-formatted files, which hold multiple
+  // records. The starting offset and length of the record are recorded here
+  // as well.
+  private String fileName;
+  private long offset;
+  private long length;
+
+  // If we've opened a LobFile object, track our reference to it here.
+  private LobFile.Reader lobReader;
+
+  @Override
+  @SuppressWarnings("unchecked")
+  /**
+   * Clone the current reference object. data is deep-copied; any open
+   * file handle remains with the original only.
+   */
+  public Object clone() throws CloneNotSupportedException {
+    LobRef<DATATYPE, CONTAINERTYPE, ACCESSORTYPE> r =
+        (LobRef<DATATYPE, CONTAINERTYPE, ACCESSORTYPE>) super.clone();
+
+    r.lobReader = null; // Reference to opened reader is not duplicated.
+    if (null != realData) {
+      r.realData = deepCopyData(realData);
+    }
+
+    return r;
+  }
+
+  @Override
+  protected synchronized void finalize() throws Throwable {
+    close();
+    super.finalize();
+  }
+
+  public void close() throws IOException {
+    // Discard any open LobReader.
+    if (null != this.lobReader) {
+      LobReaderCache.getCache().recycle(this.lobReader);
+    }
+  }
+
+  /**
+   * @return true if the LOB data is in an external file; false if
+   * it materialized inline.
+   */
+  public boolean isExternal() {
+    return fileName != null;
+  }
+
+  /**
+   * Convenience method to access #getDataStream(Configuration, Path)
+   * from within a map task that read this LobRef from a file-based
+   * InputSplit.
+   * @param mapContext the Mapper.Context instance that encapsulates
+   * the current map task.
+   * @return an object that lazily streams the record to the client.
+   * @throws IllegalArgumentException if it cannot find the source
+   * path for this LOB based on the MapContext.
+   * @throws IOException if it could not read the LOB from external storage.
+   */
+  public ACCESSORTYPE getDataStream(Mapper.Context mapContext)
+      throws IOException {
+    InputSplit split = mapContext.getInputSplit();
+    if (split instanceof FileSplit) {
+      Path basePath = ((FileSplit) split).getPath().getParent();
+      return getDataStream(mapContext.getConfiguration(),
+        basePath);
+    } else {
+      throw new IllegalArgumentException(
+          "Could not ascertain LOB base path from MapContext.");
+    }
+  }
+
+  /**
+   * Get access to the LOB data itself.
+   * This method returns a lazy reader of the LOB data, accessing the
+   * filesystem for external LOB storage as necessary.
+   * @param conf the Configuration used to access the filesystem
+   * @param basePath the base directory where the table records are
+   * stored.
+   * @return an object that lazily streams the record to the client.
+   * @throws IOException if it could not read the LOB from external storage.
+   */
+  public ACCESSORTYPE getDataStream(Configuration conf, Path basePath)
+      throws IOException {
+    if (isExternal()) {
+      // Read from external storage.
+      Path pathToRead = LobReaderCache.qualify(
+          new Path(basePath, fileName), conf);
+      LOG.debug("Retreving data stream from external path: " + pathToRead);
+      if (lobReader != null) {
+        // We already have a reader open to a LobFile. Is it the correct file?
+        if (!pathToRead.equals(lobReader.getPath())) {
+          // No. Close this.lobReader and get the correct one.
+          LOG.debug("Releasing previous external reader for "
+              + lobReader.getPath());
+          LobReaderCache.getCache().recycle(lobReader);
+          lobReader = LobReaderCache.getCache().get(pathToRead, conf);
+        }
+      } else {
+        lobReader = LobReaderCache.getCache().get(pathToRead, conf);
+      }
+
+      // We now have a LobFile.Reader associated with the correct file. Get to
+      // the correct offset and return an InputStream/Reader to the user.
+      if (lobReader.tell() != offset) {
+        LOG.debug("Seeking to record start offset " + offset);
+        lobReader.seek(offset);
+      }
+
+      if (!lobReader.next()) {
+        throw new IOException("Could not locate record at " + pathToRead
+            + ":" + offset);
+      }
+
+      return getExternalSource(lobReader);
+    } else {
+      // This data is already materialized in memory; wrap it and return.
+      return getInternalSource(realData);
+    }
+  }
+
+  /**
+   * Using the LobFile reader, get an accessor InputStream or Reader to the
+   * underlying data.
+   */
+  protected abstract ACCESSORTYPE getExternalSource(LobFile.Reader reader)
+      throws IOException;
+
+  /**
+   * Wrap the materialized data in an InputStream or Reader.
+   */
+  protected abstract ACCESSORTYPE getInternalSource(CONTAINERTYPE data);
+
+  /**
+   * @return the materialized data itself.
+   */
+  protected abstract DATATYPE getInternalData(CONTAINERTYPE data);
+
+  /**
+   * Make a copy of the materialized data.
+   */
+  protected abstract CONTAINERTYPE deepCopyData(CONTAINERTYPE data);
+
+  public DATATYPE getData() {
+    if (isExternal()) {
+      throw new RuntimeException(
+          "External LOBs must be read via getDataStream()");
+    }
+
+    return getInternalData(realData);
+  }
+
+  @Override
+  public String toString() {
+    if (isExternal()) {
+      return "externalLob(lf," + fileName + "," + Long.toString(offset)
+          + "," + Long.toString(length) + ")";
+    } else {
+      return realData.toString();
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    // The serialization format for this object is:
+    // boolean isExternal
+    // if true, then:
+    //   a string identifying the external storage type
+    //   and external-storage-specific data.
+    // if false, then we use readFieldsInternal() to allow BlobRef/ClobRef
+    // to serialize as it sees fit.
+    //
+    // Currently the only external storage supported is LobFile, identified
+    // by the string "lf". This serializes with the filename (as a string),
+    // followed by a long-valued offset and a long-valued length.
+
+    boolean isExternal = in.readBoolean();
+    if (isExternal) {
+      this.realData = null;
+
+      String storageType = Text.readString(in);
+      if (!storageType.equals("lf")) {
+        throw new IOException("Unsupported external LOB storage code: "
+            + storageType);
+      }
+
+      // Storage type "lf" is LobFile: filename, offset, length.
+      this.fileName = Text.readString(in);
+      this.offset = in.readLong();
+      this.length = in.readLong();
+    } else {
+      readFieldsInternal(in);
+
+      this.fileName = null;
+      this.offset = 0;
+      this.length = 0;
+    }
+  }
+
+  /**
+   * Perform the readFields() operation on a fully-materializable record.
+   * @param in the DataInput to deserialize from.
+   */
+  protected abstract void readFieldsInternal(DataInput in) throws IOException;
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(isExternal());
+    if (isExternal()) {
+      Text.writeString(out, "lf"); // storage type "lf" for LobFile.
+      Text.writeString(out, fileName);
+      out.writeLong(offset);
+      out.writeLong(length);
+    } else {
+      writeInternal(out);
+    }
+  }
+
+  /**
+   * Perform the write() operation on a fully-materializable record.
+   * @param out the DataOutput to deserialize to.
+   */
+  protected abstract void writeInternal(DataOutput out) throws IOException;
+
+
+  protected static final ThreadLocal<Matcher> EXTERNAL_MATCHER =
+      new ThreadLocal<Matcher>() {
+        @Override protected Matcher initialValue() {
+          Pattern externalPattern = Pattern.compile(
+              "externalLob\\(lf,(.*),([0-9]+),([0-9]+)\\)");
+          return externalPattern.matcher("");
+        }
+      };
+
+
+
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/LobRef.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/LobSerializer.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/LobSerializer.java?rev=1190430&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/LobSerializer.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/LobSerializer.java Fri Oct 28 16:32:43 2011
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.lib;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Serialize LOB classes to/from DataInput and DataOutput objects.
+ */
+public final class LobSerializer {
+
+  private LobSerializer() { }
+
+  public static void writeClob(
+      com.cloudera.sqoop.lib.ClobRef clob, DataOutput out) throws IOException {
+    clob.write(out);
+  }
+
+  public static void writeBlob(
+      com.cloudera.sqoop.lib.BlobRef blob, DataOutput out) throws IOException {
+    blob.write(out);
+  }
+
+  public static com.cloudera.sqoop.lib.ClobRef readClobFields(
+      DataInput in) throws IOException {
+    com.cloudera.sqoop.lib.ClobRef clob = new com.cloudera.sqoop.lib.ClobRef();
+    clob.readFields(in);
+    return clob;
+  }
+
+  public static com.cloudera.sqoop.lib.BlobRef readBlobFields(
+      DataInput in) throws IOException {
+    com.cloudera.sqoop.lib.BlobRef blob = new com.cloudera.sqoop.lib.BlobRef();
+    blob.readFields(in);
+    return blob;
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/LobSerializer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/ProcessingException.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/ProcessingException.java?rev=1190430&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/ProcessingException.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/ProcessingException.java Fri Oct 28 16:32:43 2011
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.lib;
+
+/**
+ * General error during processing of a SqoopRecord.
+ */
+@SuppressWarnings("serial")
+public class ProcessingException extends Exception  {
+
+  public ProcessingException() {
+    super("ProcessingException");
+  }
+
+  public ProcessingException(final String message) {
+    super(message);
+  }
+
+  public ProcessingException(final Throwable cause) {
+    super(cause);
+  }
+
+  public ProcessingException(final String message, final Throwable cause) {
+    super(message, cause);
+  }
+
+  @Override
+  public String toString() {
+    String msg = getMessage();
+    return (null == msg) ? "ProcessingException" : msg;
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/ProcessingException.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/RecordParser.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/RecordParser.java?rev=1190430&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/RecordParser.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/RecordParser.java Fri Oct 28 16:32:43 2011
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.lib;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Parses a record containing one or more fields. Fields are separated
+ * by some FIELD_DELIMITER character, e.g. a comma or a ^A character.
+ * Records are terminated by a RECORD_DELIMITER character, e.g., a newline.
+ *
+ * Fields may be (optionally or mandatorily) enclosed by a quoting char
+ * e.g., '\"'
+ *
+ * Fields may contain escaped characters. An escape character may be, e.g.,
+ * the '\\' character. Any character following an escape character
+ * is treated literally. e.g., '\n' is recorded as an 'n' character, not a
+ * newline.
+ *
+ * Unexpected results may occur if the enclosing character escapes itself.
+ * e.g., this cannot parse SQL SELECT statements where the single character
+ * ['] escapes to [''].
+ *
+ * This class is not synchronized. Multiple threads must use separate
+ * instances of RecordParser.
+ *
+ * The fields parsed by RecordParser are backed by an internal buffer
+ * which is cleared when the next call to parseRecord() is made. If
+ * the buffer is required to be preserved, you must copy it yourself.
+ */
+public class RecordParser {
+
+  public static final Log LOG = LogFactory.getLog(RecordParser.class.getName());
+
+  private enum ParseState {
+    FIELD_START,
+    ENCLOSED_FIELD,
+    UNENCLOSED_FIELD,
+    ENCLOSED_ESCAPE,
+    ENCLOSED_EXPECT_DELIMITER,
+    UNENCLOSED_ESCAPE
+  }
+
+  /**
+   * An error thrown when parsing fails.
+   */
+  public static class ParseError extends Exception {
+    public ParseError() {
+      super("ParseError");
+    }
+
+    public ParseError(final String msg) {
+      super(msg);
+    }
+
+    public ParseError(final String msg, final Throwable cause) {
+      super(msg, cause);
+    }
+
+    public ParseError(final Throwable cause) {
+      super(cause);
+    }
+  }
+
+  private com.cloudera.sqoop.lib.DelimiterSet delimiters;
+  private ArrayList<String> outputs;
+
+
+  public RecordParser(final com.cloudera.sqoop.lib.DelimiterSet delimitersIn) {
+    this.delimiters = delimitersIn.copy();
+    this.outputs = new ArrayList<String>();
+  }
+
+  /**
+   * Return a list of strings representing the fields of the input line.
+   * This list is backed by an internal buffer which is cleared by the
+   * next call to parseRecord().
+   */
+  public List<String> parseRecord(CharSequence input)
+      throws com.cloudera.sqoop.lib.RecordParser.ParseError {
+    if (null == input) {
+      throw new com.cloudera.sqoop.lib.RecordParser.ParseError(
+          "null input string");
+    }
+
+    return parseRecord(CharBuffer.wrap(input));
+  }
+
+  /**
+   * Return a list of strings representing the fields of the input line.
+   * This list is backed by an internal buffer which is cleared by the
+   * next call to parseRecord().
+   */
+  public List<String> parseRecord(Text input)
+      throws com.cloudera.sqoop.lib.RecordParser.ParseError {
+    if (null == input) {
+      throw new com.cloudera.sqoop.lib.RecordParser.ParseError(
+          "null input string");
+    }
+
+    // TODO(aaron): The parser should be able to handle UTF-8 strings
+    // as well, to avoid this transcode operation.
+    return parseRecord(input.toString());
+  }
+
+  /**
+   * Return a list of strings representing the fields of the input line.
+   * This list is backed by an internal buffer which is cleared by the
+   * next call to parseRecord().
+   */
+  public List<String> parseRecord(byte [] input)
+      throws com.cloudera.sqoop.lib.RecordParser.ParseError {
+    if (null == input) {
+      throw new com.cloudera.sqoop.lib.RecordParser.ParseError(
+          "null input string");
+    }
+
+    return parseRecord(ByteBuffer.wrap(input).asCharBuffer());
+  }
+
+  /**
+   * Return a list of strings representing the fields of the input line.
+   * This list is backed by an internal buffer which is cleared by the
+   * next call to parseRecord().
+   */
+  public List<String> parseRecord(char [] input)
+      throws com.cloudera.sqoop.lib.RecordParser.ParseError {
+    if (null == input) {
+      throw new com.cloudera.sqoop.lib.RecordParser.ParseError(
+          "null input string");
+    }
+
+    return parseRecord(CharBuffer.wrap(input));
+  }
+
+  public List<String> parseRecord(ByteBuffer input)
+      throws com.cloudera.sqoop.lib.RecordParser.ParseError {
+    if (null == input) {
+      throw new com.cloudera.sqoop.lib.RecordParser.ParseError(
+          "null input string");
+    }
+
+    return parseRecord(input.asCharBuffer());
+  }
+
+  // TODO(aaron): Refactor this method to be much shorter.
+  // CHECKSTYLE:OFF
+  /**
+   * Return a list of strings representing the fields of the input line.
+   * This list is backed by an internal buffer which is cleared by the
+   * next call to parseRecord().
+   */
+  public List<String> parseRecord(CharBuffer input)
+      throws com.cloudera.sqoop.lib.RecordParser.ParseError {
+    if (null == input) {
+      throw new com.cloudera.sqoop.lib.RecordParser.ParseError(
+          "null input string");
+    }
+
+    /*
+      This method implements the following state machine to perform
+      parsing.
+
+      Note that there are no restrictions on whether particular characters
+      (e.g., field-sep, record-sep, etc) are distinct or the same. The
+      state transitions are processed in the order seen in this comment.
+
+      Starting state is FIELD_START
+        encloser -> ENCLOSED_FIELD
+        escape char -> UNENCLOSED_ESCAPE
+        field delim -> FIELD_START (for a new field)
+        record delim -> stops processing
+        all other letters get added to current field, -> UNENCLOSED FIELD
+
+      ENCLOSED_FIELD state:
+        escape char goes to ENCLOSED_ESCAPE
+        encloser goes to ENCLOSED_EXPECT_DELIMITER
+        field sep or record sep gets added to the current string
+        normal letters get added to the current string
+
+      ENCLOSED_ESCAPE state:
+        any character seen here is added literally, back to ENCLOSED_FIELD
+
+      ENCLOSED_EXPECT_DELIMITER state:
+        field sep goes to FIELD_START
+        record sep halts processing.
+        all other characters are errors.
+
+      UNENCLOSED_FIELD state:
+        ESCAPE char goes to UNENCLOSED_ESCAPE
+        FIELD_SEP char goes to FIELD_START
+        RECORD_SEP char halts processing
+        normal chars or the enclosing char get added to the current string
+
+      UNENCLOSED_ESCAPE:
+        add charater literal to current string, return to UNENCLOSED_FIELD
+    */
+
+    char curChar = com.cloudera.sqoop.lib.DelimiterSet.NULL_CHAR;
+    ParseState state = ParseState.FIELD_START;
+    int len = input.length();
+    StringBuilder sb = null;
+
+    outputs.clear();
+
+    char enclosingChar = delimiters.getEnclosedBy();
+    char fieldDelim = delimiters.getFieldsTerminatedBy();
+    char recordDelim = delimiters.getLinesTerminatedBy();
+    char escapeChar = delimiters.getEscapedBy();
+    boolean enclosingRequired = delimiters.isEncloseRequired();
+
+    for (int pos = 0; pos < len; pos++) {
+      curChar = input.get();
+      switch (state) {
+      case FIELD_START:
+        // ready to start processing a new field.
+        if (null != sb) {
+          // We finished processing a previous field. Add to the list.
+          outputs.add(sb.toString());
+        }
+
+        sb = new StringBuilder();
+        if (enclosingChar == curChar) {
+          // got an opening encloser.
+          state = ParseState.ENCLOSED_FIELD;
+        } else if (escapeChar == curChar) {
+          state = ParseState.UNENCLOSED_ESCAPE;
+        } else if (fieldDelim == curChar) {
+          // we have a zero-length field. This is a no-op.
+          continue;
+        } else if (recordDelim == curChar) {
+          // we have a zero-length field, that ends processing.
+          pos = len;
+        } else {
+          // current char is part of the field.
+          state = ParseState.UNENCLOSED_FIELD;
+          sb.append(curChar);
+
+          if (enclosingRequired) {
+            throw new com.cloudera.sqoop.lib.RecordParser.ParseError(
+                "Opening field-encloser expected at position " + pos);
+          }
+        }
+
+        break;
+
+      case ENCLOSED_FIELD:
+        if (escapeChar == curChar) {
+          // the next character is escaped. Treat it literally.
+          state = ParseState.ENCLOSED_ESCAPE;
+        } else if (enclosingChar == curChar) {
+          // we're at the end of the enclosing field. Expect an EOF or EOR char.
+          state = ParseState.ENCLOSED_EXPECT_DELIMITER;
+        } else {
+          // this is a regular char, or an EOF / EOR inside an encloser. Add to
+          // the current field string, and remain in this state.
+          sb.append(curChar);
+        }
+
+        break;
+
+      case UNENCLOSED_FIELD:
+        if (escapeChar == curChar) {
+          // the next character is escaped. Treat it literally.
+          state = ParseState.UNENCLOSED_ESCAPE;
+        } else if (fieldDelim == curChar) {
+          // we're at the end of this field; may be the start of another one.
+          state = ParseState.FIELD_START;
+        } else if (recordDelim == curChar) {
+          pos = len; // terminate processing immediately.
+        } else {
+          // this is a regular char. Add to the current field string,
+          // and remain in this state.
+          sb.append(curChar);
+        }
+
+        break;
+
+      case ENCLOSED_ESCAPE:
+        // Treat this character literally, whatever it is, and return to
+        // enclosed field processing.
+        sb.append(curChar);
+        state = ParseState.ENCLOSED_FIELD;
+        break;
+
+      case ENCLOSED_EXPECT_DELIMITER:
+        // We were in an enclosed field, but got the final encloser. Now we
+        // expect either an end-of-field or an end-of-record.
+        if (fieldDelim == curChar) {
+          // end of one field is the beginning of the next.
+          state = ParseState.FIELD_START;
+        } else if (recordDelim == curChar) {
+          // stop processing.
+          pos = len;
+        } else {
+          // Don't know what to do with this character.
+          throw new com.cloudera.sqoop.lib.RecordParser.ParseError(
+              "Expected delimiter at position " + pos);
+        }
+
+        break;
+
+      case UNENCLOSED_ESCAPE:
+        // Treat this character literally, whatever it is, and return to
+        // non-enclosed field processing.
+        sb.append(curChar);
+        state = ParseState.UNENCLOSED_FIELD;
+        break;
+
+      default:
+        throw new com.cloudera.sqoop.lib.RecordParser.ParseError(
+            "Unexpected parser state: " + state);
+      }
+    }
+
+    if (state == ParseState.FIELD_START && curChar == fieldDelim) {
+      // we hit an EOF/EOR as the last legal character and we need to mark
+      // that string as recorded. This if block is outside the for-loop since
+      // we don't have a physical 'epsilon' token in our string.
+      if (null != sb) {
+        outputs.add(sb.toString());
+        sb = new StringBuilder();
+      }
+    }
+
+    if (null != sb) {
+      // There was a field that terminated by running out of chars or an EOR
+      // character. Add to the list.
+      outputs.add(sb.toString());
+    }
+
+    return outputs;
+  }
+  // CHECKSTYLE:ON
+
+  public boolean isEnclosingRequired() {
+    return delimiters.isEncloseRequired();
+  }
+
+  @Override
+  public String toString() {
+    return "RecordParser[" + delimiters.toString() + "]";
+  }
+
+  @Override
+  public int hashCode() {
+    return this.delimiters.hashCode();
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/RecordParser.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message