sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject svn commit: r1190441 [2/3] - in /incubator/sqoop/trunk/src: java/com/cloudera/sqoop/mapreduce/db/ java/org/apache/sqoop/mapreduce/ java/org/apache/sqoop/mapreduce/db/ test/com/cloudera/sqoop/mapreduce/db/ test/org/apache/sqoop/ test/org/apache/sqoop/ma...
Date Fri, 28 Oct 2011 16:50:41 GMT
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDBRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDBRecordReader.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDBRecordReader.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDBRecordReader.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,29 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package com.cloudera.sqoop.mapreduce.db;
 
-import java.io.IOException;
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.lang.reflect.Method;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 /**
  * A RecordReader that reads records from an Oracle SQL table.
+ * @deprecated use org.apache.sqoop.mapreduce.db.OracleDBRecordReader instead.
+ * @see org.apache.sqoop.mapreduce.db.OracleDBRecordReader
  */
-public class OracleDBRecordReader<T extends DBWritable>
-    extends DBRecordReader<T> {
+public class OracleDBRecordReader<T extends DBWritable> extends
+  org.apache.sqoop.mapreduce.db.OracleDBRecordReader<T> {
 
   /** Configuration key to set to a timezone string. */
-  public static final String SESSION_TIMEZONE_KEY = "oracle.sessionTimeZone";
-
-  private static final Log LOG = LogFactory.getLog(OracleDBRecordReader.class);
+  public static final String SESSION_TIMEZONE_KEY =
+      org.apache.sqoop.mapreduce.db.OracleDBRecordReader.SESSION_TIMEZONE_KEY;
 
   // CHECKSTYLE:OFF
   public OracleDBRecordReader(DBInputFormat.DBInputSplit split,
@@ -47,61 +41,9 @@ public class OracleDBRecordReader<T exte
       DBConfiguration dbConfig, String cond, String [] fields,
       String table) throws SQLException {
     super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
-    setSessionTimeZone(conf, conn);
   }
   // CHECKSTYLE:ON
 
-  /** Returns the query for selecting the records from an Oracle DB. */
-  protected String getSelectQuery() {
-    StringBuilder query = new StringBuilder();
-    DBConfiguration dbConf = getDBConf();
-    String conditions = getConditions();
-    String tableName = getTableName();
-    String [] fieldNames = getFieldNames();
-
-    // Oracle-specific codepath to use rownum instead of LIMIT/OFFSET.
-    if(dbConf.getInputQuery() == null) {
-      query.append("SELECT ");
-
-      for (int i = 0; i < fieldNames.length; i++) {
-        query.append(fieldNames[i]);
-        if (i != fieldNames.length -1) {
-          query.append(", ");
-        }
-      }
-
-      query.append(" FROM ").append(tableName);
-      if (conditions != null && conditions.length() > 0) {
-        query.append(" WHERE ").append(conditions);
-      }
-      String orderBy = dbConf.getInputOrderBy();
-      if (orderBy != null && orderBy.length() > 0) {
-        query.append(" ORDER BY ").append(orderBy);
-      }
-    } else {
-      //PREBUILT QUERY
-      query.append(dbConf.getInputQuery());
-    }
-
-    try {
-      DBInputFormat.DBInputSplit split = getSplit();
-      if (split.getLength() > 0 && split.getStart() > 0) {
-        String querystring = query.toString();
-
-        query = new StringBuilder();
-        query.append("SELECT * FROM (SELECT a.*,ROWNUM dbif_rno FROM ( ");
-        query.append(querystring);
-        query.append(" ) a WHERE rownum <= ").append(split.getStart());
-        query.append(" + ").append(split.getLength());
-        query.append(" ) WHERE dbif_rno >= ").append(split.getStart());
-      }
-    } catch (IOException ex) {
-      // ignore, will not throw.
-    }
-
-    return query.toString();
-  }
-
   /**
    * Set session time zone.
    * @param conf The current configuration.
@@ -110,41 +52,7 @@ public class OracleDBRecordReader<T exte
    */
   public static void setSessionTimeZone(Configuration conf,
       Connection conn) throws SQLException {
-    // need to use reflection to call the method setSessionTimeZone on
-    // the OracleConnection class because oracle specific java libraries are
-    // not accessible in this context.
-    Method method;
-    try {
-      method = conn.getClass().getMethod(
-              "setSessionTimeZone", new Class [] {String.class});
-    } catch (Exception ex) {
-      LOG.error("Could not find method setSessionTimeZone in "
-          + conn.getClass().getName(), ex);
-      // rethrow SQLException
-      throw new SQLException(ex);
-    }
-
-    // Need to set the time zone in order for Java
-    // to correctly access the column "TIMESTAMP WITH LOCAL TIME ZONE".
-    // We can't easily get the correct Oracle-specific timezone string
-    // from Java; just let the user set the timezone in a property.
-    String clientTimeZone = conf.get(SESSION_TIMEZONE_KEY, "GMT");
-    try {
-      method.setAccessible(true);
-      method.invoke(conn, clientTimeZone);
-      LOG.info("Time zone has been set to " + clientTimeZone);
-    } catch (Exception ex) {
-      LOG.warn("Time zone " + clientTimeZone
-               + " could not be set on Oracle database.");
-      LOG.warn("Setting default time zone: GMT");
-      try {
-        // "GMT" timezone is guaranteed to exist.
-        method.invoke(conn, "GMT");
-      } catch (Exception ex2) {
-        LOG.error("Could not set time zone for oracle connection", ex2);
-        // rethrow SQLException
-        throw new SQLException(ex);
-      }
-    }
+    org.apache.sqoop.mapreduce.db.OracleDBRecordReader.setSessionTimeZone(
+        conf, conn);
   }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDataDrivenDBInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDataDrivenDBInputFormat.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDataDrivenDBInputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDataDrivenDBInputFormat.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,57 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package com.cloudera.sqoop.mapreduce.db;
 
-import java.io.IOException;
-import java.sql.SQLException;
-import java.sql.Types;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * A InputFormat that reads input data from an SQL table in an Oracle db.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.OracleDataDrivenDBInputFormat
+ *   instead.
+ * @see org.apache.sqoop.mapreduce.db.OracleDataDrivenDBInputFormat
  */
 public class OracleDataDrivenDBInputFormat<T extends DBWritable>
-    extends DataDrivenDBInputFormat<T> implements Configurable {
-
-  /**
-   * @return the DBSplitter implementation to use to divide the table/query
-   * into InputSplits.
-   */
-  @Override
-  protected DBSplitter getSplitter(int sqlDataType) {
-    switch (sqlDataType) {
-    case Types.DATE:
-    case Types.TIME:
-    case Types.TIMESTAMP:
-      return new OracleDateSplitter();
-
-    default:
-      return super.getSplitter(sqlDataType);
-    }
-  }
-
-  @Override
-  protected RecordReader<LongWritable, T> createDBRecordReader(
-      DBInputSplit split, Configuration conf) throws IOException {
-
-    DBConfiguration dbConf = getDBConf();
-    @SuppressWarnings("unchecked")
-    Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
-
-    try {
-      // Use Oracle-specific db reader
-      return new OracleDataDrivenDBRecordReader<T>(split, inputClass,
-          conf, getConnection(), dbConf, dbConf.getInputConditions(),
-          dbConf.getInputFieldNames(), dbConf.getInputTableName());
-    } catch (SQLException ex) {
-      throw new IOException(ex);
-    }
-  }
+    extends org.apache.sqoop.mapreduce.db.OracleDataDrivenDBInputFormat<T> {
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDataDrivenDBRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDataDrivenDBRecordReader.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDataDrivenDBRecordReader.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDataDrivenDBRecordReader.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package com.cloudera.sqoop.mapreduce.db;
 
 import java.sql.Connection;
@@ -29,9 +26,13 @@ import org.apache.hadoop.mapreduce.lib.d
 /**
  * A RecordReader that reads records from a Oracle table
  * via DataDrivenDBRecordReader.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.OracleDataDrivenDBRecordReader
+ *   instead.
+ * @see org.apache.sqoop.mapreduce.db.OracleDataDrivenDBRecordReader
  */
 public class OracleDataDrivenDBRecordReader<T extends DBWritable>
-    extends DataDrivenDBRecordReader<T> {
+    extends org.apache.sqoop.mapreduce.db.OracleDataDrivenDBRecordReader<T> {
 
   // CHECKSTYLE:OFF
   // TODO(aaron): Enable checkstyle after refactoring DBRecordReader c'tor.
@@ -40,11 +41,7 @@ public class OracleDataDrivenDBRecordRea
       DBConfiguration dbConfig, String cond, String [] fields,
       String table) throws SQLException {
 
-    super(split, inputClass, conf, conn, dbConfig, cond, fields, table,
-        "ORACLE");
-
-    // Must initialize the tz used by the connection for Oracle.
-    OracleDBRecordReader.setSessionTimeZone(conf, conn);
+    super(split, inputClass, conf, conn, dbConfig, cond, fields, table);
   }
   // CHECKSTYLE:ON
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDateSplitter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDateSplitter.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDateSplitter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDateSplitter.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,24 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package com.cloudera.sqoop.mapreduce.db;
 
-import java.util.Date;
-
-
 /**
  * Implement DBSplitter over date/time values returned by an Oracle db.
  * Make use of logic from DateSplitter, since this just needs to use
  * some Oracle-specific functions on the formatting end when generating
  * InputSplits.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.OracleDateSplitter instead.
+ * @see org.apache.sqoop.mapreduce.db.OracleDateSplitter
  */
-public class OracleDateSplitter extends DateSplitter {
+public class OracleDateSplitter
+    extends org.apache.sqoop.mapreduce.db.OracleDateSplitter {
 
-  @SuppressWarnings("unchecked")
-  @Override
-  protected String dateToString(Date d) {
-    // Oracle Data objects are always actually Timestamps
-    return "TO_TIMESTAMP('" + d.toString() + "', 'YYYY-MM-DD HH24:MI:SS.FF')";
-  }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/TextSplitter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/TextSplitter.java?rev=1190441&r1=1190440&r2=1190441&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/TextSplitter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/TextSplitter.java Fri Oct 28 16:50:39 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,213 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package com.cloudera.sqoop.mapreduce.db;
 
-import java.math.BigDecimal;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-
-import com.cloudera.sqoop.config.ConfigurationHelper;
 
 /**
  * Implement DBSplitter over text strings.
+ *
+ * @deprecated use org.apache.sqoop.mapreduce.db.TextSplitter instead.
+ * @see org.apache.sqoop.mapreduce.db.TextSplitter
  */
-public class TextSplitter extends BigDecimalSplitter {
-
-  private static final Log LOG = LogFactory.getLog(TextSplitter.class);
-
-  /**
-   * This method needs to determine the splits between two user-provided
-   * strings.  In the case where the user's strings are 'A' and 'Z', this is
-   * not hard; we could create two splits from ['A', 'M') and ['M', 'Z'], 26
-   * splits for strings beginning with each letter, etc.
-   *
-   * If a user has provided us with the strings "Ham" and "Haze", however, we
-   * need to create splits that differ in the third letter.
-   *
-   * The algorithm used is as follows:
-   * Since there are 2**16 unicode characters, we interpret characters as
-   * digits in base 65536. Given a string 's' containing characters s_0, s_1
-   * .. s_n, we interpret the string as the number: 0.s_0 s_1 s_2.. s_n in
-   * base 65536. Having mapped the low and high strings into floating-point
-   * values, we then use the BigDecimalSplitter to establish the even split
-   * points, then map the resulting floating point values back into strings.
-   */
-  public List<InputSplit> split(Configuration conf, ResultSet results,
-      String colName) throws SQLException {
-
-    LOG.warn("Generating splits for a textual index column.");
-    LOG.warn("If your database sorts in a case-insensitive order, "
-        + "this may result in a partial import or duplicate records.");
-    LOG.warn("You are strongly encouraged to choose an integral split column.");
-
-    String minString = results.getString(1);
-    String maxString = results.getString(2);
-
-    boolean minIsNull = false;
-
-    // If the min value is null, switch it to an empty string instead for
-    // purposes of interpolation. Then add [null, null] as a special case
-    // split.
-    if (null == minString) {
-      minString = "";
-      minIsNull = true;
-    }
-
-    if (null == maxString) {
-      // If the max string is null, then the min string has to be null too.
-      // Just return a special split for this case.
-      List<InputSplit> splits = new ArrayList<InputSplit>();
-      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-          colName + " IS NULL", colName + " IS NULL"));
-      return splits;
-    }
-
-    // Use this as a hint. May need an extra task if the size doesn't
-    // divide cleanly.
-    int numSplits = ConfigurationHelper.getConfNumMaps(conf);
-
-    String lowClausePrefix = colName + " >= '";
-    String highClausePrefix = colName + " < '";
-
-    // If there is a common prefix between minString and maxString, establish
-    // it and pull it out of minString and maxString.
-    int maxPrefixLen = Math.min(minString.length(), maxString.length());
-    int sharedLen;
-    for (sharedLen = 0; sharedLen < maxPrefixLen; sharedLen++) {
-      char c1 = minString.charAt(sharedLen);
-      char c2 = maxString.charAt(sharedLen);
-      if (c1 != c2) {
-        break;
-      }
-    }
-
-    // The common prefix has length 'sharedLen'. Extract it from both.
-    String commonPrefix = minString.substring(0, sharedLen);
-    minString = minString.substring(sharedLen);
-    maxString = maxString.substring(sharedLen);
-
-    List<String> splitStrings = split(numSplits, minString, maxString,
-        commonPrefix);
-    List<InputSplit> splits = new ArrayList<InputSplit>();
-
-    // Convert the list of split point strings into an actual set of
-    // InputSplits.
-    String start = splitStrings.get(0);
-    for (int i = 1; i < splitStrings.size(); i++) {
-      String end = splitStrings.get(i);
-
-      if (i == splitStrings.size() - 1) {
-        // This is the last one; use a closed interval.
-        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-            lowClausePrefix + start + "'", colName + " <= '" + end + "'"));
-      } else {
-        // Normal open-interval case.
-        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-            lowClausePrefix + start + "'", highClausePrefix + end + "'"));
-      }
-
-      start = end;
-    }
-
-    if (minIsNull) {
-      // Add the special null split at the end.
-      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
-          colName + " IS NULL", colName + " IS NULL"));
-    }
-
-    return splits;
-  }
-
-  List<String> split(int numSplits, String minString, String maxString,
-      String commonPrefix) throws SQLException {
-
-    BigDecimal minVal = stringToBigDecimal(minString);
-    BigDecimal maxVal = stringToBigDecimal(maxString);
-
-    List<BigDecimal> splitPoints = split(
-        new BigDecimal(numSplits), minVal, maxVal);
-    List<String> splitStrings = new ArrayList<String>();
-
-    // Convert the BigDecimal splitPoints into their string representations.
-    for (BigDecimal bd : splitPoints) {
-      splitStrings.add(commonPrefix + bigDecimalToString(bd));
-    }
-
-    // Make sure that our user-specified boundaries are the first and last
-    // entries in the array.
-    if (splitStrings.size() == 0
-        || !splitStrings.get(0).equals(commonPrefix + minString)) {
-      splitStrings.add(0, commonPrefix + minString);
-    }
-    if (splitStrings.size() == 1
-        || !splitStrings.get(splitStrings.size() - 1).equals(
-        commonPrefix + maxString)) {
-      splitStrings.add(commonPrefix + maxString);
-    }
-
-    return splitStrings;
-  }
-
-  private static final BigDecimal ONE_PLACE = new BigDecimal(65536);
-
-  // Maximum number of characters to convert. This is to prevent rounding
-  // errors or repeating fractions near the very bottom from getting out of
-  // control. Note that this still gives us a huge number of possible splits.
-  private static final int MAX_CHARS = 8;
-
-  /**
-   * Return a BigDecimal representation of string 'str' suitable for use in a
-   * numerically-sorting order.
-   */
-  BigDecimal stringToBigDecimal(String str) {
-    // Start with 1/65536 to compute the first digit.
-    BigDecimal curPlace = ONE_PLACE;
-    BigDecimal result = BigDecimal.ZERO;
-
-    int len = Math.min(str.length(), MAX_CHARS);
-
-    for (int i = 0; i < len; i++) {
-      int codePoint = str.codePointAt(i);
-      result = result.add(tryDivide(new BigDecimal(codePoint), curPlace));
-      // advance to the next less significant place. e.g., 1/(65536^2) for the
-      // second char.
-      curPlace = curPlace.multiply(ONE_PLACE);
-    }
-
-    return result;
-  }
-
-  /**
-   * Return the string encoded in a BigDecimal.
-   * Repeatedly multiply the input value by 65536; the integer portion after
-   * such a multiplication represents a single character in base 65536.
-   * Convert that back into a char and create a string out of these until we
-   * have no data left.
-   */
-  String bigDecimalToString(BigDecimal bd) {
-    BigDecimal cur = bd.stripTrailingZeros();
-    StringBuilder sb = new StringBuilder();
-
-    for (int numConverted = 0; numConverted < MAX_CHARS; numConverted++) {
-      cur = cur.multiply(ONE_PLACE);
-      int curCodePoint = cur.intValue();
-      if (0 == curCodePoint) {
-        break;
-      }
-
-      cur = cur.subtract(new BigDecimal(curCodePoint));
-      sb.append(Character.toChars(curCodePoint));
-    }
+public class TextSplitter extends org.apache.sqoop.mapreduce.db.TextSplitter {
 
-    return sb.toString();
-  }
 }

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/BigDecimalSplitter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/BigDecimalSplitter.java?rev=1190441&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/BigDecimalSplitter.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/BigDecimalSplitter.java Fri Oct 28 16:50:39 2011
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.db;
+
+import java.math.BigDecimal;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import com.cloudera.sqoop.config.ConfigurationHelper;
+import com.cloudera.sqoop.mapreduce.db.DBSplitter;
+import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+
+/**
+ * Implement DBSplitter over BigDecimal values.
+ */
+public class BigDecimalSplitter implements DBSplitter  {
+  private static final Log LOG = LogFactory.getLog(BigDecimalSplitter.class);
+
+  public List<InputSplit> split(Configuration conf, ResultSet results,
+      String colName) throws SQLException {
+
+    BigDecimal minVal = results.getBigDecimal(1);
+    BigDecimal maxVal = results.getBigDecimal(2);
+
+    String lowClausePrefix = colName + " >= ";
+    String highClausePrefix = colName + " < ";
+
+    BigDecimal numSplits = new BigDecimal(
+        ConfigurationHelper.getConfNumMaps(conf));
+
+    if (minVal == null && maxVal == null) {
+      // Range is null to null. Return a null split accordingly.
+      List<InputSplit> splits = new ArrayList<InputSplit>();
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " IS NULL", colName + " IS NULL"));
+      return splits;
+    }
+
+    if (minVal == null || maxVal == null) {
+      // Don't know what is a reasonable min/max value for interpolation. Fail.
+      LOG.error("Cannot find a range for NUMERIC or DECIMAL "
+          + "fields with one end NULL.");
+      return null;
+    }
+
+    // Get all the split points together.
+    List<BigDecimal> splitPoints = split(numSplits, minVal, maxVal);
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+
+    // Turn the split points into a set of intervals.
+    BigDecimal start = splitPoints.get(0);
+    for (int i = 1; i < splitPoints.size(); i++) {
+      BigDecimal end = splitPoints.get(i);
+
+      if (i == splitPoints.size() - 1) {
+        // This is the last one; use a closed interval.
+        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+            lowClausePrefix + start.toString(),
+            colName + " <= " + end.toString()));
+      } else {
+        // Normal open-interval case.
+        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+            lowClausePrefix + start.toString(),
+            highClausePrefix + end.toString()));
+      }
+
+      start = end;
+    }
+
+    return splits;
+  }
+
+  private static final BigDecimal MIN_INCREMENT =
+      new BigDecimal(10000 * Double.MIN_VALUE);
+
+  /**
+   * Divide numerator by denominator. If impossible in exact mode, use rounding.
+   */
+  protected BigDecimal tryDivide(BigDecimal numerator, BigDecimal denominator) {
+    try {
+      return numerator.divide(denominator);
+    } catch (ArithmeticException ae) {
+      return numerator.divide(denominator, BigDecimal.ROUND_HALF_UP);
+    }
+  }
+
+  /**
+   * Returns a list of BigDecimals one element longer than the list of input
+   * splits.  This represents the boundaries between input splits.  All splits
+   * are open on the top end, except the last one.
+   *
+   * So the list [0, 5, 8, 12, 18] would represent splits capturing the
+   * intervals:
+   *
+   * [0, 5)
+   * [5, 8)
+   * [8, 12)
+   * [12, 18] note the closed interval for the last split.
+   */
+  protected List<BigDecimal> split(BigDecimal numSplits, BigDecimal minVal,
+      BigDecimal maxVal) throws SQLException {
+
+    List<BigDecimal> splits = new ArrayList<BigDecimal>();
+
+    // Use numSplits as a hint. May need an extra task if the size doesn't
+    // divide cleanly.
+
+    BigDecimal splitSize = tryDivide(maxVal.subtract(minVal), (numSplits));
+    if (splitSize.compareTo(MIN_INCREMENT) < 0) {
+      splitSize = MIN_INCREMENT;
+      LOG.warn("Set BigDecimal splitSize to MIN_INCREMENT");
+    }
+
+    BigDecimal curVal = minVal;
+
+    while (curVal.compareTo(maxVal) <= 0) {
+      splits.add(curVal);
+      curVal = curVal.add(splitSize);
+    }
+
+    if (splits.get(splits.size() - 1).compareTo(maxVal) != 0
+        || splits.size() == 1) {
+      // We didn't end on the maxVal. Add that to the end of the list.
+      splits.add(maxVal);
+    }
+
+    return splits;
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/BigDecimalSplitter.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/BooleanSplitter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/BooleanSplitter.java?rev=1190441&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/BooleanSplitter.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/BooleanSplitter.java Fri Oct 28 16:50:39 2011
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.db;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import com.cloudera.sqoop.mapreduce.db.DBSplitter;
+import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+
+/**
+ * Implement DBSplitter over boolean values.
+ */
+public class BooleanSplitter implements DBSplitter {
+  public List<InputSplit> split(Configuration conf, ResultSet results,
+      String colName) throws SQLException {
+
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+
+    if (results.getString(1) == null && results.getString(2) == null) {
+      // Range is null to null. Return a null split accordingly.
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " IS NULL", colName + " IS NULL"));
+      return splits;
+    }
+
+    boolean minVal = results.getBoolean(1);
+    boolean maxVal = results.getBoolean(2);
+
+    // Use one or two splits.
+    if (!minVal) {
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " = FALSE", colName + " = FALSE"));
+    }
+
+    if (maxVal) {
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " = TRUE", colName + " = TRUE"));
+    }
+
+    if (results.getString(1) == null || results.getString(2) == null) {
+      // Include a null value.
+      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
+          colName + " IS NULL", colName + " IS NULL"));
+    }
+
+    return splits;
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/BooleanSplitter.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java?rev=1190441&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java Fri Oct 28 16:50:39 2011
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.db;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+
+import com.cloudera.sqoop.mapreduce.db.DBInputFormat.NullDBWritable;
+import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
+
+/**
+ * A container for configuration property names for jobs with DB input/output.
+ *
+ * The job can be configured using the static methods in this class,
+ * {@link DBInputFormat}, and {@link DBOutputFormat}.
+ * Alternatively, the properties can be set in the configuration with proper
+ * values.
+ *
+ * @see DBConfiguration#configureDB(Configuration, String, String, String,
+ * String)
+ * @see DBInputFormat#setInput(Job, Class, String, String)
+ * @see DBInputFormat#setInput(Job, Class, String, String, String, String...)
+ * @see DBOutputFormat#setOutput(Job, String, String...)
+ */
+public class DBConfiguration {
+
+  /** The JDBC Driver class name. */
+  public static final String DRIVER_CLASS_PROPERTY =
+    "mapreduce.jdbc.driver.class";
+
+  /** JDBC Database access URL. */
+  public static final String URL_PROPERTY = "mapreduce.jdbc.url";
+
+  /** User name to access the database. */
+  public static final String USERNAME_PROPERTY = "mapreduce.jdbc.username";
+
+  /** Password to access the database. */
+  public static final String PASSWORD_PROPERTY = "mapreduce.jdbc.password";
+
+  /** Fetch size. */
+  public static final String FETCH_SIZE = "mapreduce.jdbc.fetchsize";
+
+  /** Input table name. */
+  public static final String INPUT_TABLE_NAME_PROPERTY =
+    "mapreduce.jdbc.input.table.name";
+
+  /** Field names in the Input table. */
+  public static final String INPUT_FIELD_NAMES_PROPERTY =
+    "mapreduce.jdbc.input.field.names";
+
+  /** WHERE clause in the input SELECT statement. */
+  public static final String INPUT_CONDITIONS_PROPERTY =
+    "mapreduce.jdbc.input.conditions";
+
+  /** ORDER BY clause in the input SELECT statement. */
+  public static final String INPUT_ORDER_BY_PROPERTY =
+    "mapreduce.jdbc.input.orderby";
+
+  /** Whole input query, exluding LIMIT...OFFSET. */
+  public static final String INPUT_QUERY = "mapreduce.jdbc.input.query";
+
+  /** Input query to get the count of records. */
+  public static final String INPUT_COUNT_QUERY =
+    "mapreduce.jdbc.input.count.query";
+
+  /** Input query to get the max and min values of the jdbc.input.query. */
+  public static final String INPUT_BOUNDING_QUERY =
+      "mapred.jdbc.input.bounding.query";
+
+  /** Class name implementing DBWritable which will hold input tuples. */
+  public static final String INPUT_CLASS_PROPERTY =
+    "mapreduce.jdbc.input.class";
+
+  /** Output table name. */
+  public static final String OUTPUT_TABLE_NAME_PROPERTY =
+    "mapreduce.jdbc.output.table.name";
+
+  /** Field names in the Output table. */
+  public static final String OUTPUT_FIELD_NAMES_PROPERTY =
+    "mapreduce.jdbc.output.field.names";
+
+  /** Number of fields in the Output table. */
+  public static final String OUTPUT_FIELD_COUNT_PROPERTY =
+    "mapreduce.jdbc.output.field.count";
+
+  /**
+   * Sets the DB access related fields in the {@link Configuration}.
+   * @param conf the configuration
+   * @param driverClass JDBC Driver class name
+   * @param dbUrl JDBC DB access URL
+   * @param userName DB access username
+   * @param passwd DB access passwd
+   * @param fetchSize DB fetch size
+   */
+  public static void configureDB(Configuration conf, String driverClass,
+      String dbUrl, String userName, String passwd, Integer fetchSize) {
+
+    conf.set(DRIVER_CLASS_PROPERTY, driverClass);
+    conf.set(URL_PROPERTY, dbUrl);
+    if (userName != null) {
+      conf.set(USERNAME_PROPERTY, userName);
+    }
+    if (passwd != null) {
+      conf.set(PASSWORD_PROPERTY, passwd);
+    }
+    if (fetchSize != null) {
+      conf.setInt(FETCH_SIZE, fetchSize);
+    }
+  }
+
+  /**
+   * Sets the DB access related fields in the JobConf.
+   * @param job the job
+   * @param driverClass JDBC Driver class name
+   * @param dbUrl JDBC DB access URL
+   * @param fetchSize DB fetch size
+   */
+  public static void configureDB(Configuration job, String driverClass,
+      String dbUrl, Integer fetchSize) {
+    configureDB(job, driverClass, dbUrl, null, null, fetchSize);
+  }
+
+  /**
+   * Sets the DB access related fields in the {@link Configuration}.
+   * @param conf the configuration
+   * @param driverClass JDBC Driver class name
+   * @param dbUrl JDBC DB access URL
+   * @param userName DB access username
+   * @param passwd DB access passwd
+   */
+  public static void configureDB(Configuration conf, String driverClass,
+      String dbUrl, String userName, String passwd) {
+    configureDB(conf, driverClass, dbUrl, userName, passwd, null);
+  }
+
+  /**
+   * Sets the DB access related fields in the JobConf.
+   * @param job the job
+   * @param driverClass JDBC Driver class name
+   * @param dbUrl JDBC DB access URL.
+   */
+  public static void configureDB(Configuration job, String driverClass,
+      String dbUrl) {
+    configureDB(job, driverClass, dbUrl, null);
+  }
+
+
+  private Configuration conf;
+
+  public DBConfiguration(Configuration job) {
+    this.conf = job;
+  }
+
+  /** Returns a connection object to the DB.
+   * @throws ClassNotFoundException
+   * @throws SQLException */
+  public Connection getConnection()
+      throws ClassNotFoundException, SQLException {
+
+    Class.forName(conf.get(DBConfiguration.DRIVER_CLASS_PROPERTY));
+
+    if(conf.get(DBConfiguration.USERNAME_PROPERTY) == null) {
+      return DriverManager.getConnection(
+               conf.get(DBConfiguration.URL_PROPERTY));
+    } else {
+      return DriverManager.getConnection(
+          conf.get(DBConfiguration.URL_PROPERTY),
+          conf.get(DBConfiguration.USERNAME_PROPERTY),
+          conf.get(DBConfiguration.PASSWORD_PROPERTY));
+    }
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public Integer getFetchSize() {
+    if (conf.get(DBConfiguration.FETCH_SIZE) == null) {
+      return null;
+    }
+    return conf.getInt(DBConfiguration.FETCH_SIZE, 0);
+  }
+
+  public void setFetchSize(Integer fetchSize) {
+    if (fetchSize != null) {
+      conf.setInt(DBConfiguration.FETCH_SIZE, fetchSize);
+    } else {
+      conf.set(FETCH_SIZE, null);
+    }
+  }
+  public String getInputTableName() {
+    return conf.get(DBConfiguration.INPUT_TABLE_NAME_PROPERTY);
+  }
+
+  public void setInputTableName(String tableName) {
+    conf.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
+  }
+
+  public String[] getInputFieldNames() {
+    return conf.getStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY);
+  }
+
+  public void setInputFieldNames(String... fieldNames) {
+    conf.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames);
+  }
+
+  public String getInputConditions() {
+    return conf.get(DBConfiguration.INPUT_CONDITIONS_PROPERTY);
+  }
+
+  public void setInputConditions(String conditions) {
+    if (conditions != null && conditions.length() > 0) {
+      conf.set(DBConfiguration.INPUT_CONDITIONS_PROPERTY, conditions);
+    }
+  }
+
+  public String getInputOrderBy() {
+    return conf.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY);
+  }
+
+  public void setInputOrderBy(String orderby) {
+    if(orderby != null && orderby.length() >0) {
+      conf.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, orderby);
+    }
+  }
+
+  public String getInputQuery() {
+    return conf.get(DBConfiguration.INPUT_QUERY);
+  }
+
+  public void setInputQuery(String query) {
+    if(query != null && query.length() >0) {
+      conf.set(DBConfiguration.INPUT_QUERY, query);
+    }
+  }
+
+  public String getInputCountQuery() {
+    return conf.get(DBConfiguration.INPUT_COUNT_QUERY);
+  }
+
+  public void setInputCountQuery(String query) {
+    if(query != null && query.length() > 0) {
+      conf.set(DBConfiguration.INPUT_COUNT_QUERY, query);
+    }
+  }
+
+  public void setInputBoundingQuery(String query) {
+    if (query != null && query.length() > 0) {
+      conf.set(DBConfiguration.INPUT_BOUNDING_QUERY, query);
+    }
+  }
+
+  public String getInputBoundingQuery() {
+    return conf.get(DBConfiguration.INPUT_BOUNDING_QUERY);
+  }
+
+  public Class<?> getInputClass() {
+    return conf.getClass(DBConfiguration.INPUT_CLASS_PROPERTY,
+                         NullDBWritable.class);
+  }
+
+  public void setInputClass(Class<? extends DBWritable> inputClass) {
+    conf.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass,
+                  DBWritable.class);
+  }
+
+  public String getOutputTableName() {
+    return conf.get(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY);
+  }
+
+  public void setOutputTableName(String tableName) {
+    conf.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
+  }
+
+  public String[] getOutputFieldNames() {
+    return conf.getStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY);
+  }
+
+  public void setOutputFieldNames(String... fieldNames) {
+    conf.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, fieldNames);
+  }
+
+  public void setOutputFieldCount(int fieldCount) {
+    conf.setInt(DBConfiguration.OUTPUT_FIELD_COUNT_PROPERTY, fieldCount);
+  }
+
+  public int getOutputFieldCount() {
+    return conf.getInt(OUTPUT_FIELD_COUNT_PROPERTY, 0);
+  }
+
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java?rev=1190441&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java Fri Oct 28 16:50:39 2011
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.db;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+
+import com.cloudera.sqoop.config.ConfigurationHelper;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DBRecordReader;
+import com.cloudera.sqoop.mapreduce.db.OracleDBRecordReader;
+
+/**
+ * A InputFormat that reads input data from an SQL table.
+ * <p>
+ * DBInputFormat emits LongWritables containing the record number as
+ * key and DBWritables as value.
+ *
+ * The SQL query, and input class can be using one of the two
+ * setInput methods.
+ */
+public class DBInputFormat<T extends DBWritable>
+extends InputFormat<LongWritable, T> implements Configurable  {
+
+
+  private String dbProductName = "DEFAULT";
+
+  /**
+   * A Class that does nothing, implementing DBWritable.
+   */
+  public static class NullDBWritable implements DBWritable, Writable {
+    @Override
+    public void readFields(DataInput in) throws IOException { }
+    @Override
+    public void readFields(ResultSet arg0) throws SQLException { }
+    @Override
+    public void write(DataOutput out) throws IOException { }
+    @Override
+    public void write(PreparedStatement arg0) throws SQLException { }
+  }
+
+  /**
+   * A InputSplit that spans a set of rows.
+   */
+  public static class DBInputSplit extends InputSplit implements Writable {
+
+    private long end = 0;
+    private long start = 0;
+
+    /**
+     * Default Constructor.
+     */
+    public DBInputSplit() {
+    }
+
+    /**
+     * Convenience Constructor.
+     * @param start the index of the first row to select
+     * @param end the index of the last row to select
+     */
+    public DBInputSplit(long start, long end) {
+      this.start = start;
+      this.end = end;
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    public String[] getLocations() throws IOException {
+      // TODO Add a layer to enable SQL "sharding" and support locality
+      return new String[] {};
+    }
+
+    /**
+     * @return The index of the first row to select
+     */
+    public long getStart() {
+      return start;
+    }
+
+    /**
+     * @return The index of the last row to select
+     */
+    public long getEnd() {
+      return end;
+    }
+
+    /**
+     * @return The total row count in this split
+     */
+    public long getLength() throws IOException {
+      return end - start;
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    public void readFields(DataInput input) throws IOException {
+      start = input.readLong();
+      end = input.readLong();
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    public void write(DataOutput output) throws IOException {
+      output.writeLong(start);
+      output.writeLong(end);
+    }
+  }
+
+  private String conditions;
+
+  private Connection connection;
+
+  private String tableName;
+
+  private String[] fieldNames;
+
+  private DBConfiguration dbConf;
+
+  @Override
+  /** {@inheritDoc} */
+  public void setConf(Configuration conf) {
+
+    dbConf = new DBConfiguration(conf);
+
+    try {
+      getConnection();
+
+      DatabaseMetaData dbMeta = connection.getMetaData();
+      this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase();
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+
+    tableName = dbConf.getInputTableName();
+    fieldNames = dbConf.getInputFieldNames();
+    conditions = dbConf.getInputConditions();
+  }
+
+  public Configuration getConf() {
+    return dbConf.getConf();
+  }
+
+  public DBConfiguration getDBConf() {
+    return dbConf;
+  }
+
+  public Connection getConnection() {
+    try {
+      if (null == this.connection) {
+        // The connection was closed; reinstantiate it.
+        this.connection = dbConf.getConnection();
+        this.connection.setAutoCommit(false);
+        this.connection.setTransactionIsolation(
+            Connection.TRANSACTION_READ_COMMITTED);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return connection;
+  }
+
+  public String getDBProductName() {
+    return dbProductName;
+  }
+
+  protected RecordReader<LongWritable, T> createDBRecordReader(
+      com.cloudera.sqoop.mapreduce.db.DBInputFormat.DBInputSplit split,
+      Configuration conf) throws IOException {
+
+    @SuppressWarnings("unchecked")
+    Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
+    try {
+      // use database product name to determine appropriate record reader.
+      if (dbProductName.startsWith("ORACLE")) {
+        // use Oracle-specific db reader.
+        return new OracleDBRecordReader<T>(split, inputClass,
+            conf, getConnection(), getDBConf(), conditions, fieldNames,
+            tableName);
+      } else {
+        // Generic reader.
+        return new DBRecordReader<T>(split, inputClass,
+            conf, getConnection(), getDBConf(), conditions, fieldNames,
+            tableName);
+      }
+    } catch (SQLException ex) {
+      throw new IOException(ex);
+    }
+  }
+
+  @Override
+  /** {@inheritDoc} */
+  public RecordReader<LongWritable, T> createRecordReader(InputSplit split,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+
+    return createDBRecordReader(
+        (com.cloudera.sqoop.mapreduce.db.DBInputFormat.DBInputSplit) split,
+        context.getConfiguration());
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public List<InputSplit> getSplits(JobContext job) throws IOException {
+
+    ResultSet results = null;
+    Statement statement = null;
+    try {
+      statement = connection.createStatement();
+
+      results = statement.executeQuery(getCountQuery());
+      results.next();
+
+      long count = results.getLong(1);
+      int chunks = ConfigurationHelper.getJobNumMaps(job);
+      long chunkSize = (count / chunks);
+
+      results.close();
+      statement.close();
+
+      List<InputSplit> splits = new ArrayList<InputSplit>();
+
+      // Split the rows into n-number of chunks and adjust the last chunk
+      // accordingly
+      for (int i = 0; i < chunks; i++) {
+        DBInputSplit split;
+
+        if ((i + 1) == chunks) {
+          split = new DBInputSplit(i * chunkSize, count);
+        } else {
+          split = new DBInputSplit(i * chunkSize, (i * chunkSize)
+              + chunkSize);
+        }
+
+        splits.add(split);
+      }
+
+      connection.commit();
+      return splits;
+    } catch (SQLException e) {
+      throw new IOException("Got SQLException", e);
+    } finally {
+      try {
+        if (results != null) { results.close(); }
+      } catch (SQLException e1) { /* ignored */ }
+      try {
+        if (statement != null) { statement.close(); }
+      } catch (SQLException e1) { /* ignored */ }
+
+      closeConnection();
+    }
+  }
+
+  /** Returns the query for getting the total number of rows,
+   * subclasses can override this for custom behaviour.*/
+  protected String getCountQuery() {
+
+    if(dbConf.getInputCountQuery() != null) {
+      return dbConf.getInputCountQuery();
+    }
+
+    StringBuilder query = new StringBuilder();
+    query.append("SELECT COUNT(*) FROM " + tableName);
+
+    if (conditions != null && conditions.length() > 0) {
+      query.append(" WHERE " + conditions);
+    }
+    return query.toString();
+  }
+
+  /**
+   * Initializes the map-part of the job with the appropriate input settings.
+   *
+   * @param job The map-reduce job
+   * @param inputClass the class object implementing DBWritable, which is the
+   * Java object holding tuple fields.
+   * @param tableName The table to read data from
+   * @param conditions The condition which to select data with,
+   * eg. '(updated &gt; 20070101 AND length &gt; 0)'
+   * @param orderBy the fieldNames in the orderBy clause.
+   * @param fieldNames The field names in the table
+   * @see #setInput(Job, Class, String, String)
+   */
+  public static void setInput(Job job,
+      Class<? extends DBWritable> inputClass,
+      String tableName, String conditions,
+      String orderBy, String... fieldNames) {
+    job.setInputFormatClass(DBInputFormat.class);
+    DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
+    dbConf.setInputClass(inputClass);
+    dbConf.setInputTableName(tableName);
+    dbConf.setInputFieldNames(fieldNames);
+    dbConf.setInputConditions(conditions);
+    dbConf.setInputOrderBy(orderBy);
+  }
+
+  /**
+   * Initializes the map-part of the job with the appropriate input settings.
+   *
+   * @param job The map-reduce job
+   * @param inputClass the class object implementing DBWritable, which is the
+   * Java object holding tuple fields.
+   * @param inputQuery the input query to select fields. Example :
+   * "SELECT f1, f2, f3 FROM Mytable ORDER BY f1"
+   * @param inputCountQuery the input query that returns
+   * the number of records in the table.
+   * Example : "SELECT COUNT(f1) FROM Mytable"
+   * @see #setInput(Job, Class, String, String, String, String...)
+   */
+  public static void setInput(Job job,
+      Class<? extends DBWritable> inputClass,
+      String inputQuery, String inputCountQuery) {
+    job.setInputFormatClass(DBInputFormat.class);
+    DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
+    dbConf.setInputClass(inputClass);
+    dbConf.setInputQuery(inputQuery);
+    dbConf.setInputCountQuery(inputCountQuery);
+  }
+
+  protected void closeConnection() {
+    try {
+      if (null != this.connection) {
+        this.connection.close();
+        this.connection = null;
+      }
+    } catch (SQLException sqlE) { /* ignore exception on close. */ }
+  }
+
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBOutputFormat.java?rev=1190441&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBOutputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBOutputFormat.java Fri Oct 28 16:50:39 2011
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.db;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+
+import com.cloudera.sqoop.config.ConfigurationHelper;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DBOutputFormat.DBRecordWriter;
+
+/**
+ * A OutputFormat that sends the reduce output to a SQL table.
+ * <p>
+ * {@link DBOutputFormat} accepts &lt;key,value&gt; pairs, where
+ * key has a type extending DBWritable. Returned {@link RecordWriter}
+ * writes <b>only the key</b> to the database with a batch SQL query.
+ *
+ */
+public class DBOutputFormat<K extends DBWritable, V>
+    extends OutputFormat<K, V> {
+
+
+  private static final Log LOG = LogFactory.getLog(DBOutputFormat.class);
+  public void checkOutputSpecs(JobContext context)
+      throws IOException, InterruptedException {}
+
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),
+                                   context);
+  }
+
+  /**
+   * Constructs the query used as the prepared statement to insert data.
+   *
+   * @param table
+   *          the table to insert into
+   * @param fieldNames
+   *          the fields to insert into. If field names are unknown, supply an
+   *          array of nulls.
+   */
+  public String constructQuery(String table, String[] fieldNames) {
+    if(fieldNames == null) {
+      throw new IllegalArgumentException("Field names may not be null");
+    }
+
+    StringBuilder query = new StringBuilder();
+    query.append("INSERT INTO ").append(table);
+
+    if (fieldNames.length > 0 && fieldNames[0] != null) {
+      query.append(" (");
+      for (int i = 0; i < fieldNames.length; i++) {
+        query.append(fieldNames[i]);
+        if (i != fieldNames.length - 1) {
+          query.append(",");
+        }
+      }
+      query.append(")");
+    }
+    query.append(" VALUES (");
+
+    for (int i = 0; i < fieldNames.length; i++) {
+      query.append("?");
+      if(i != fieldNames.length - 1) {
+        query.append(",");
+      }
+    }
+    query.append(");");
+
+    return query.toString();
+  }
+
+  @Override
+  /** {@inheritDoc} */
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+      throws IOException {
+    DBConfiguration dbConf = new DBConfiguration(context.getConfiguration());
+    String tableName = dbConf.getOutputTableName();
+    String[] fieldNames = dbConf.getOutputFieldNames();
+
+    if(fieldNames == null) {
+      fieldNames = new String[dbConf.getOutputFieldCount()];
+    }
+
+    try {
+      Connection connection = dbConf.getConnection();
+      PreparedStatement statement = null;
+
+      statement = connection.prepareStatement(
+                    constructQuery(tableName, fieldNames));
+      return new DBRecordWriter(connection, statement);
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+  }
+
+  /**
+   * Initializes the reduce-part of the job with
+   * the appropriate output settings.
+   *
+   * @param job The job
+   * @param tableName The table to insert data into
+   * @param fieldNames The field names in the table.
+   */
+  public static void setOutput(Job job, String tableName,
+      String... fieldNames) throws IOException {
+    if(fieldNames.length > 0 && fieldNames[0] != null) {
+      DBConfiguration dbConf = setOutput(job, tableName);
+      dbConf.setOutputFieldNames(fieldNames);
+    } else {
+      if (fieldNames.length > 0) {
+        setOutput(job, tableName, fieldNames.length);
+      } else {
+        throw new IllegalArgumentException(
+            "Field names must be greater than 0");
+      }
+    }
+  }
+
+  /**
+   * Initializes the reduce-part of the job
+   * with the appropriate output settings.
+   *
+   * @param job The job
+   * @param tableName The table to insert data into
+   * @param fieldCount the number of fields in the table.
+   */
+  public static void setOutput(Job job, String tableName,
+      int fieldCount) throws IOException {
+    DBConfiguration dbConf = setOutput(job, tableName);
+    dbConf.setOutputFieldCount(fieldCount);
+  }
+
+  private static DBConfiguration setOutput(Job job,
+      String tableName) throws IOException {
+    job.setOutputFormatClass(DBOutputFormat.class);
+    ConfigurationHelper.setJobReduceSpeculativeExecution(job, false);
+
+    DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
+
+    dbConf.setOutputTableName(tableName);
+    return dbConf;
+  }
+
+  /**
+   * A RecordWriter that writes the reduce output to a SQL table.
+   */
+  public static class DBRecordWriter<K extends DBWritable, V>
+      extends RecordWriter<K, V> {
+
+    private Connection connection;
+    private PreparedStatement statement;
+
+    public DBRecordWriter() throws SQLException {
+    }
+
+    public DBRecordWriter(Connection connection
+        , PreparedStatement statement) throws SQLException {
+      this.connection = connection;
+      this.statement = statement;
+      this.connection.setAutoCommit(false);
+    }
+
+    public Connection getConnection() {
+      return connection;
+    }
+
+    public PreparedStatement getStatement() {
+      return statement;
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    public void close(TaskAttemptContext context) throws IOException {
+      try {
+        statement.executeBatch();
+        connection.commit();
+      } catch (SQLException e) {
+        try {
+          connection.rollback();
+        } catch (SQLException ex) {
+          LOG.warn(StringUtils.stringifyException(ex));
+        }
+        throw new IOException(e);
+      } finally {
+        try {
+          statement.close();
+          connection.close();
+        } catch (SQLException ex) {
+          LOG.error("Unable to close connection", ex);
+        }
+      }
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    public void write(K key, V value) throws IOException {
+      try {
+        key.write(statement);
+        statement.addBatch();
+      } catch (SQLException e) {
+        LOG.error("Exception encountered", e);
+      }
+    }
+  }
+
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBOutputFormat.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java?rev=1190441&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java Fri Oct 28 16:50:39 2011
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.db;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DBInputFormat;
+import com.cloudera.sqoop.util.LoggingUtils;
+
+/**
+ * A RecordReader that reads records from a SQL table.
+ * Emits LongWritables containing the record number as
+ * key and DBWritables as value.
+ */
+public class DBRecordReader<T extends DBWritable> extends
+  RecordReader<LongWritable, T> {
+
+  private static final Log LOG = LogFactory.getLog(DBRecordReader.class);
+
+  private ResultSet results = null;
+
+  private Class<T> inputClass;
+
+  private Configuration conf;
+
+  private DBInputFormat.DBInputSplit split;
+
+  private long pos = 0;
+
+  private LongWritable key = null;
+
+  private T value = null;
+
+  private Connection connection;
+
+  protected PreparedStatement statement;
+
+  private DBConfiguration dbConf;
+
+  private String conditions;
+
+  private String [] fieldNames;
+
+  private String tableName;
+
+  /**
+   * @param split The InputSplit to read data for
+   * @throws SQLException
+   */
+  // CHECKSTYLE:OFF
+  // TODO (aaron): Refactor constructor to take fewer arguments
+  public DBRecordReader(DBInputFormat.DBInputSplit split,
+      Class<T> inputClass, Configuration conf, Connection conn,
+      DBConfiguration dbConfig, String cond, String [] fields, String table)
+      throws SQLException {
+    this.inputClass = inputClass;
+    this.split = split;
+    this.conf = conf;
+    this.connection = conn;
+    this.dbConf = dbConfig;
+    this.conditions = cond;
+    if (fields != null) {
+      this.fieldNames = Arrays.copyOf(fields, fields.length);
+    }
+    this.tableName = table;
+  }
+  // CHECKSTYLE:ON
+
+  protected ResultSet executeQuery(String query) throws SQLException {
+    this.statement = connection.prepareStatement(query,
+        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+
+    Integer fetchSize = dbConf.getFetchSize();
+    if (fetchSize != null) {
+      LOG.debug("Using fetchSize for next query: " + fetchSize);
+      statement.setFetchSize(fetchSize);
+    }
+
+    LOG.debug("Executing query: " + query);
+    return statement.executeQuery();
+  }
+
+  /** Returns the query for selecting the records,
+   * subclasses can override this for custom behaviour.*/
+  protected String getSelectQuery() {
+    StringBuilder query = new StringBuilder();
+
+    // Default codepath for MySQL, HSQLDB, etc.
+    // Relies on LIMIT/OFFSET for splits.
+    if(dbConf.getInputQuery() == null) {
+      query.append("SELECT ");
+
+      for (int i = 0; i < fieldNames.length; i++) {
+        query.append(fieldNames[i]);
+        if (i != fieldNames.length -1) {
+          query.append(", ");
+        }
+      }
+
+      query.append(" FROM ").append(tableName);
+      query.append(" AS ").append(tableName); //in hsqldb this is necessary
+      if (conditions != null && conditions.length() > 0) {
+        query.append(" WHERE (").append(conditions).append(")");
+      }
+
+      String orderBy = dbConf.getInputOrderBy();
+      if (orderBy != null && orderBy.length() > 0) {
+        query.append(" ORDER BY ").append(orderBy);
+      }
+    } else {
+      //PREBUILT QUERY
+      query.append(dbConf.getInputQuery());
+    }
+
+    try {
+      query.append(" LIMIT ").append(split.getLength());
+      query.append(" OFFSET ").append(split.getStart());
+    } catch (IOException ex) {
+      // Ignore, will not throw.
+    }
+
+    return query.toString();
+  }
+
+  @Override
+  /** {@inheritDoc} */
+  public void close() throws IOException {
+    try {
+      if (null != results) {
+        results.close();
+      }
+      if (null != statement) {
+        statement.close();
+      }
+      if (null != connection) {
+        connection.commit();
+        connection.close();
+      }
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    //do nothing
+  }
+
+  @Override
+  /** {@inheritDoc} */
+  public LongWritable getCurrentKey() {
+    return key;
+  }
+
+  @Override
+  /** {@inheritDoc} */
+  public T getCurrentValue() {
+    return value;
+  }
+
+  /**
+   * @deprecated
+   */
+  @Deprecated
+  public T createValue() {
+    return ReflectionUtils.newInstance(inputClass, conf);
+  }
+
+  /**
+   * @deprecated
+   */
+  @Deprecated
+  public long getPos() throws IOException {
+    return pos;
+  }
+
+  /**
+   * @deprecated Use {@link #nextKeyValue()}
+   */
+  @Deprecated
+  public boolean next(LongWritable k, T v) throws IOException {
+    this.key = k;
+    this.value = v;
+    return nextKeyValue();
+  }
+
+  @Override
+  /** {@inheritDoc} */
+  public float getProgress() throws IOException {
+    return pos / (float)split.getLength();
+  }
+
+  @Override
+  /** {@inheritDoc} */
+  public boolean nextKeyValue() throws IOException {
+    try {
+      if (key == null) {
+        key = new LongWritable();
+      }
+      if (value == null) {
+        value = createValue();
+      }
+      if (null == this.results) {
+        // First time into this method, run the query.
+        this.results = executeQuery(getSelectQuery());
+      }
+      if (!results.next()) {
+        return false;
+      }
+
+      // Set the key field value as the output key value
+      key.set(pos + split.getStart());
+
+      value.readFields(results);
+
+      pos++;
+    } catch (SQLException e) {
+      LoggingUtils.logAll(LOG, e);
+      throw new IOException("SQLException in nextKeyValue", e);
+    }
+    return true;
+  }
+
+  /**
+   * @return true if nextKeyValue() would return false.
+   */
+  protected boolean isDone() {
+    try {
+      return this.results != null
+          && (results.isLast() || results.isAfterLast());
+    } catch (SQLException sqlE) {
+      return true;
+    }
+  }
+
+  protected DBInputFormat.DBInputSplit getSplit() {
+    return split;
+  }
+
+  protected String [] getFieldNames() {
+    return fieldNames;
+  }
+
+  protected String getTableName() {
+    return tableName;
+  }
+
+  protected String getConditions() {
+    return conditions;
+  }
+
+  protected DBConfiguration getDBConf() {
+    return dbConf;
+  }
+
+  protected Connection getConnection() {
+    return connection;
+  }
+
+  protected PreparedStatement getStatement() {
+    return statement;
+  }
+
+  protected void setStatement(PreparedStatement stmt) {
+    this.statement = stmt;
+  }
+
+  /**
+   * @return the configuration. Allows subclasses to access the configuration
+   */
+  protected Configuration getConf(){
+    return conf;
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBSplitter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBSplitter.java?rev=1190441&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBSplitter.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBSplitter.java Fri Oct 28 16:50:39 2011
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.db;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * DBSplitter will generate DBInputSplits to use with DataDrivenDBInputFormat.
+ * DataDrivenDBInputFormat needs to interpolate between two values that
+ * represent the lowest and highest valued records to import. Depending
+ * on the data-type of the column, this requires different behavior.
+ * DBSplitter implementations should perform this for a data type or family
+ * of data types.
+ */
+public interface DBSplitter {
+
+  /**
+   * Given a ResultSet containing one record (and already advanced to that
+   * record) with two columns (a low value, and a high value, both of the same
+   * type), determine a set of splits that span the given values.
+   */
+  List<InputSplit> split(Configuration conf, ResultSet results, String colName)
+      throws SQLException;
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBSplitter.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBInputFormat.java?rev=1190441&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBInputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBInputFormat.java Fri Oct 28 16:50:39 2011
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.db;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+
+import com.cloudera.sqoop.config.ConfigurationHelper;
+import com.cloudera.sqoop.mapreduce.db.BigDecimalSplitter;
+import com.cloudera.sqoop.mapreduce.db.BooleanSplitter;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DBInputFormat;
+import com.cloudera.sqoop.mapreduce.db.DBSplitter;
+import com.cloudera.sqoop.mapreduce.db.DataDrivenDBRecordReader;
+import com.cloudera.sqoop.mapreduce.db.DateSplitter;
+import com.cloudera.sqoop.mapreduce.db.FloatSplitter;
+import com.cloudera.sqoop.mapreduce.db.IntegerSplitter;
+import com.cloudera.sqoop.mapreduce.db.TextSplitter;
+import com.cloudera.sqoop.mapreduce.db.DBInputFormat.DBInputSplit;
+
+/**
+ * A InputFormat that reads input data from an SQL table.
+ * Operates like DBInputFormat, but instead of using LIMIT and OFFSET to
+ * demarcate splits, it tries to generate WHERE clauses which separate the
+ * data into roughly equivalent shards.
+ */
+public class DataDrivenDBInputFormat<T extends DBWritable>
+      extends DBInputFormat<T> implements Configurable  {
+
+  private static final Log LOG =
+      LogFactory.getLog(DataDrivenDBInputFormat.class);
+
+  /**
+   * If users are providing their own query, the following string is expected
+   * to appear in the WHERE clause, which will be substituted with a pair of
+   * conditions on the input to allow input splits to parallelise the import.
+   */
+  public static final String SUBSTITUTE_TOKEN = "$CONDITIONS";
+
+  /**
+   * @return the DBSplitter implementation to use to divide the table/query
+   * into InputSplits.
+   */
+  protected DBSplitter getSplitter(int sqlDataType) {
+    switch (sqlDataType) {
+    case Types.NUMERIC:
+    case Types.DECIMAL:
+      return new BigDecimalSplitter();
+
+    case Types.BIT:
+    case Types.BOOLEAN:
+      return new BooleanSplitter();
+
+    case Types.INTEGER:
+    case Types.TINYINT:
+    case Types.SMALLINT:
+    case Types.BIGINT:
+      return new IntegerSplitter();
+
+    case Types.REAL:
+    case Types.FLOAT:
+    case Types.DOUBLE:
+      return new FloatSplitter();
+
+    case Types.CHAR:
+    case Types.VARCHAR:
+    case Types.LONGVARCHAR:
+      return new TextSplitter();
+
+    case Types.DATE:
+    case Types.TIME:
+    case Types.TIMESTAMP:
+      return new DateSplitter();
+
+    default:
+      // TODO: Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT, CLOB,
+      // BLOB, ARRAY, STRUCT, REF, DATALINK, and JAVA_OBJECT.
+      return null;
+    }
+  }
+
+  @Override
+  /** {@inheritDoc} */
+  public List<InputSplit> getSplits(JobContext job) throws IOException {
+
+    int targetNumTasks = ConfigurationHelper.getJobNumMaps(job);
+    String boundaryQuery = getDBConf().getInputBoundingQuery();
+
+    // If user do not forced us to use his boundary query and we don't have to
+    // bacause there is only one mapper we will return single split that
+    // separates nothing. This can be considerably more optimal for a large
+    // table with no index.
+    if (1 == targetNumTasks
+            && (boundaryQuery == null || boundaryQuery.isEmpty())) {
+      List<InputSplit> singletonSplit = new ArrayList<InputSplit>();
+      singletonSplit.add(new com.cloudera.sqoop.mapreduce.db.
+          DataDrivenDBInputFormat.DataDrivenDBInputSplit("1=1", "1=1"));
+      return singletonSplit;
+    }
+
+    ResultSet results = null;
+    Statement statement = null;
+    Connection connection = getConnection();
+    try {
+      statement = connection.createStatement();
+
+      String query = getBoundingValsQuery();
+      LOG.info("BoundingValsQuery: " + query);
+
+      results = statement.executeQuery(query);
+      results.next();
+
+      // Based on the type of the results, use a different mechanism
+      // for interpolating split points (i.e., numeric splits, text splits,
+      // dates, etc.)
+      int sqlDataType = results.getMetaData().getColumnType(1);
+      boolean isSigned = results.getMetaData().isSigned(1);
+
+      // MySQL has an unsigned integer which we need to allocate space for
+      if (sqlDataType == Types.INTEGER && !isSigned){
+          sqlDataType = Types.BIGINT;
+      }
+
+      DBSplitter splitter = getSplitter(sqlDataType);
+      if (null == splitter) {
+        throw new IOException("Unknown SQL data type: " + sqlDataType);
+      }
+
+      return splitter.split(job.getConfiguration(), results,
+          getDBConf().getInputOrderBy());
+    } catch (SQLException e) {
+      throw new IOException(e);
+    } finally {
+      // More-or-less ignore SQL exceptions here, but log in case we need it.
+      try {
+        if (null != results) {
+          results.close();
+        }
+      } catch (SQLException se) {
+        LOG.debug("SQLException closing resultset: " + se.toString());
+      }
+
+      try {
+        if (null != statement) {
+          statement.close();
+        }
+      } catch (SQLException se) {
+        LOG.debug("SQLException closing statement: " + se.toString());
+      }
+
+      try {
+        connection.commit();
+        closeConnection();
+      } catch (SQLException se) {
+        LOG.debug("SQLException committing split transaction: "
+            + se.toString());
+      }
+    }
+  }
+
+  /**
+   * @return a query which returns the minimum and maximum values for
+   * the order-by column.
+   *
+   * The min value should be in the first column, and the
+   * max value should be in the second column of the results.
+   */
+  protected String getBoundingValsQuery() {
+    // If the user has provided a query, use that instead.
+    String userQuery = getDBConf().getInputBoundingQuery();
+    if (null != userQuery) {
+      return userQuery;
+    }
+
+    // Auto-generate one based on the table name we've been provided with.
+    StringBuilder query = new StringBuilder();
+
+    String splitCol = getDBConf().getInputOrderBy();
+    query.append("SELECT MIN(").append(splitCol).append("), ");
+    query.append("MAX(").append(splitCol).append(") FROM ");
+    query.append(getDBConf().getInputTableName());
+    String conditions = getDBConf().getInputConditions();
+    if (null != conditions) {
+      query.append(" WHERE ( " + conditions + " )");
+    }
+
+    return query.toString();
+  }
+
+  protected RecordReader<LongWritable, T> createDBRecordReader(
+      DBInputSplit split, Configuration conf) throws IOException {
+
+    DBConfiguration dbConf = getDBConf();
+    @SuppressWarnings("unchecked")
+    Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
+    String dbProductName = getDBProductName();
+
+    LOG.debug("Creating db record reader for db product: " + dbProductName);
+
+    try {
+      return new DataDrivenDBRecordReader<T>(split, inputClass,
+          conf, getConnection(), dbConf, dbConf.getInputConditions(),
+          dbConf.getInputFieldNames(), dbConf.getInputTableName(),
+          dbProductName);
+    } catch (SQLException ex) {
+      throw new IOException(ex);
+    }
+  }
+
+
+  /*
+   * Set the user-defined bounding query to use with a user-defined query.
+   * This *must* include the substring "$CONDITIONS"
+   * (DataDrivenDBInputFormat.SUBSTITUTE_TOKEN) inside the WHERE clause,
+   * so that DataDrivenDBInputFormat knows where to insert split clauses.
+   * e.g., "SELECT foo FROM mytable WHERE $CONDITIONS"
+   * This will be expanded to something like:
+   * SELECT foo FROM mytable WHERE (id &gt; 100) AND (id &lt; 250)
+   * inside each split.
+   */
+  public static void setBoundingQuery(Configuration conf, String query) {
+    if (null != query) {
+      // If the user's settng a query, warn if they don't allow conditions.
+      if (query.indexOf(SUBSTITUTE_TOKEN) == -1) {
+        LOG.warn("Could not find " + SUBSTITUTE_TOKEN + " token in query: "
+            + query + "; splits may not partition data.");
+      }
+    }
+
+    conf.set(DBConfiguration.INPUT_BOUNDING_QUERY, query);
+  }
+
+  // Configuration methods override superclass to ensure that the proper
+  // DataDrivenDBInputFormat gets used.
+
+  /** Note that the "orderBy" column is called the "splitBy" in this version.
+    * We reuse the same field, but it's not strictly ordering it
+    * -- just partitioning the results.
+    */
+  public static void setInput(Job job,
+      Class<? extends DBWritable> inputClass,
+      String tableName, String conditions,
+      String splitBy, String... fieldNames) {
+    DBInputFormat.setInput(job, inputClass, tableName, conditions,
+        splitBy, fieldNames);
+    job.setInputFormatClass(DataDrivenDBInputFormat.class);
+  }
+
+  /** setInput() takes a custom query and a separate "bounding query" to use
+      instead of the custom "count query" used by DBInputFormat.
+    */
+  public static void setInput(Job job,
+      Class<? extends DBWritable> inputClass,
+      String inputQuery, String inputBoundingQuery) {
+    DBInputFormat.setInput(job, inputClass, inputQuery, "");
+    job.getConfiguration().set(DBConfiguration.INPUT_BOUNDING_QUERY,
+        inputBoundingQuery);
+    job.setInputFormatClass(DataDrivenDBInputFormat.class);
+  }
+
+
+  /**
+   * A InputSplit that spans a set of rows.
+   */
+  public static class DataDrivenDBInputSplit
+      extends DBInputFormat.DBInputSplit {
+
+    private String lowerBoundClause;
+    private String upperBoundClause;
+
+    /**
+     * Default Constructor.
+     */
+    public DataDrivenDBInputSplit() {
+    }
+
+    /**
+     * Convenience Constructor.
+     * @param lower the string to be put in the WHERE clause to guard
+     * on the 'lower' end.
+     * @param upper the string to be put in the WHERE clause to guard
+     * on the 'upper' end.
+     */
+    public DataDrivenDBInputSplit(final String lower, final String upper) {
+      this.lowerBoundClause = lower;
+      this.upperBoundClause = upper;
+    }
+
+    /**
+     * @return The total row count in this split.
+     */
+    public long getLength() throws IOException {
+      return 0; // unfortunately, we don't know this.
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    public void readFields(DataInput input) throws IOException {
+      this.lowerBoundClause = Text.readString(input);
+      this.upperBoundClause = Text.readString(input);
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    public void write(DataOutput output) throws IOException {
+      Text.writeString(output, this.lowerBoundClause);
+      Text.writeString(output, this.upperBoundClause);
+    }
+
+    public String getLowerClause() {
+      return lowerBoundClause;
+    }
+
+    public String getUpperClause() {
+      return upperBoundClause;
+    }
+  }
+
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBInputFormat.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message