sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arv...@apache.org
Subject svn commit: r1221843 - in /incubator/sqoop/trunk/src: java/com/cloudera/sqoop/mapreduce/ java/com/cloudera/sqoop/mapreduce/db/ java/org/apache/sqoop/config/ java/org/apache/sqoop/lib/ java/org/apache/sqoop/mapreduce/ java/org/apache/sqoop/mapreduce/db/...
Date Wed, 21 Dec 2011 19:25:53 GMT
Author: arvind
Date: Wed Dec 21 19:25:52 2011
New Revision: 1221843

URL: http://svn.apache.org/viewvc?rev=1221843&view=rev
Log:
SQOOP-413. Port files into Sqoop that are not found in prior versions of Hadoop.

(Jarek Jarcec Cecho via Arvind Prabhakar)

Added:
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineFileRecordReader.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineFileSplit.java   (with props)
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DBWritable.java   (with props)
Modified:
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/CombineShimRecordReader.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBOutputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBRecordReader.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBRecordReader.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDBRecordReader.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDataDrivenDBInputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDataDrivenDBRecordReader.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/config/ConfigurationHelper.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/SqoopRecord.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineShimRecordReader.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportInputFormat.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpImportJob.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLExportJob.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBOutputFormat.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBInputFormat.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/OracleDBRecordReader.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/OracleDataDrivenDBInputFormat.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/OracleDataDrivenDBRecordReader.java
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/mapreduce/db/TestDataDrivenDBInputFormat.java

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/CombineShimRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/CombineShimRecordReader.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/CombineShimRecordReader.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/CombineShimRecordReader.java Wed Dec 21 19:25:52 2011
@@ -20,7 +20,7 @@ package com.cloudera.sqoop.mapreduce;
 
 import java.io.IOException;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.sqoop.mapreduce.CombineFileSplit;
 
 /**
  * @deprecated Moving to use org.apache.sqoop namespace.

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBInputFormat.java Wed Dec 21 19:25:52 2011
@@ -17,7 +17,7 @@
  */
 package com.cloudera.sqoop.mapreduce.db;
 
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.sqoop.mapreduce.DBWritable;
 
 /**
  * A InputFormat that reads input data from an SQL table.

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBOutputFormat.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBOutputFormat.java Wed Dec 21 19:25:52 2011
@@ -21,7 +21,7 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.sqoop.mapreduce.DBWritable;
 
 /**
  * A OutputFormat that sends the reduce output to a SQL table.

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBRecordReader.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBRecordReader.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DBRecordReader.java Wed Dec 21 19:25:52 2011
@@ -21,7 +21,7 @@ import java.sql.Connection;
 import java.sql.SQLException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.sqoop.mapreduce.DBWritable;
 
 /**
  * A RecordReader that reads records from a SQL table.

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBInputFormat.java Wed Dec 21 19:25:52 2011
@@ -19,7 +19,7 @@ package com.cloudera.sqoop.mapreduce.db;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.sqoop.mapreduce.DBWritable;
 
 /**
  * A InputFormat that reads input data from an SQL table.

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBRecordReader.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBRecordReader.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/DataDrivenDBRecordReader.java Wed Dec 21 19:25:52 2011
@@ -21,7 +21,7 @@ import java.sql.Connection;
 import java.sql.SQLException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.sqoop.mapreduce.DBWritable;
 
 /**
  * A RecordReader that reads records from a SQL table,

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDBRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDBRecordReader.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDBRecordReader.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDBRecordReader.java Wed Dec 21 19:25:52 2011
@@ -21,7 +21,7 @@ import java.sql.Connection;
 import java.sql.SQLException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.sqoop.mapreduce.DBWritable;
 
 /**
  * A RecordReader that reads records from an Oracle SQL table.

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDataDrivenDBInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDataDrivenDBInputFormat.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDataDrivenDBInputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDataDrivenDBInputFormat.java Wed Dec 21 19:25:52 2011
@@ -17,7 +17,7 @@
  */
 package com.cloudera.sqoop.mapreduce.db;
 
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.sqoop.mapreduce.DBWritable;
 
 /**
  * A InputFormat that reads input data from an SQL table in an Oracle db.

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDataDrivenDBRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDataDrivenDBRecordReader.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDataDrivenDBRecordReader.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/db/OracleDataDrivenDBRecordReader.java Wed Dec 21 19:25:52 2011
@@ -21,7 +21,7 @@ import java.sql.Connection;
 import java.sql.SQLException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.sqoop.mapreduce.DBWritable;
 
 /**
  * A RecordReader that reads records from a Oracle table

Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/config/ConfigurationHelper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/config/ConfigurationHelper.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/config/ConfigurationHelper.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/config/ConfigurationHelper.java Wed Dec 21 19:25:52 2011
@@ -180,7 +180,8 @@ public final class ConfigurationHelper {
    * @return a <code>List</code> of objects implementing <code>xface</code>.
    */
   @SuppressWarnings("unchecked")
-  public static <U> List<U> getInstances(Configuration conf, String name, Class<U> xface) {
+  public static <U> List<U> getInstances(Configuration conf,
+                                              String name, Class<U> xface) {
     List<U> ret = new ArrayList<U>();
     Class<?>[] classes = conf.getClasses(name);
     for (Class<?> cl: classes) {

Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/SqoopRecord.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/SqoopRecord.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/SqoopRecord.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/lib/SqoopRecord.java Wed Dec 21 19:25:52 2011
@@ -26,7 +26,7 @@ import java.util.Map;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.sqoop.mapreduce.DBWritable;
 
 /**
  * Interface implemented by the classes generated by sqoop's orm.ClassWriter.

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java?rev=1221843&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java Wed Dec 21 19:25:52 2011
@@ -0,0 +1,623 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.net.NetworkTopology;
+
+/**
+ * An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in
+ * {@link InputFormat#getSplits(JobContext)} method.
+ *
+ * Splits are constructed from the files under the input path
+ * A split cannot have files from different pools.
+ * Each split returned may contain blocks from different files.
+ * If a maxSplitSize is specified, then blocks on the same node are
+ * combined to form a single split. Blocks that are left over are
+ * then combined with other blocks in the same rack.
+ * If maxSplitSize is not specified, then blocks from the same rack
+ * are combined in a single split; no attempt is made to create
+ * node-local splits.
+ * If the maxSplitSize is equal to the block size, then this class
+ * is similar to the default splitting behavior in Hadoop: each
+ * block is a locally processed split.
+ * Subclasses implement
+ * {@link InputFormat#createRecordReader(InputSplit, TaskAttemptContext)}
+ * to construct <code>RecordReader</code>'s for
+ * <code>CombineFileSplit</code>'s.
+ *
+ * @see CombineFileSplit
+ */
+public abstract class CombineFileInputFormat<K, V>
+  extends FileInputFormat<K, V> {
+
+  // ability to limit the size of a single split
+  private long maxSplitSize = 0;
+  private long minSplitSizeNode = 0;
+  private long minSplitSizeRack = 0;
+
+  // A pool of input paths filters. A split cannot have blocks from files
+  // across multiple pools.
+  private ArrayList<MultiPathFilter> pools = new  ArrayList<MultiPathFilter>();
+
+  // mapping from a rack name to the set of Nodes in the rack
+  private HashMap<String, Set<String>> rackToNodes =
+                            new HashMap<String, Set<String>>();
+  /**
+   * Specify the maximum size (in bytes) of each split. Each split is
+   * approximately equal to the specified size.
+   */
+  protected void setMaxSplitSize(long val) {
+    this.maxSplitSize = val;
+  }
+
+  /**
+   * Specify the minimum size (in bytes) of each split per node.
+   * This applies to data that is left over after combining data on a single
+   * node into splits that are of maximum size specified by maxSplitSize.
+   * This leftover data will be combined into its own split if its size
+   * exceeds minSplitSizeNode.
+   */
+  protected void setMinSplitSizeNode(long val) {
+    this.minSplitSizeNode = val;
+  }
+
+  /**
+   * Specify the minimum size (in bytes) of each split per rack.
+   * This applies to data that is left over after combining data on a single
+   * rack into splits that are of maximum size specified by maxSplitSize.
+   * This leftover data will be combined into its own split if its size
+   * exceeds minSplitSizeRack.
+   */
+  protected void setMinSplitSizeRack(long val) {
+    this.minSplitSizeRack = val;
+  }
+
+  /**
+   * Create a new pool and add the filters to it.
+   * A split cannot have files from different pools.
+   */
+  protected void createPool(List<PathFilter> filters) {
+    pools.add(new MultiPathFilter(filters));
+  }
+
+  /**
+   * Create a new pool and add the filters to it.
+   * A pathname can satisfy any one of the specified filters.
+   * A split cannot have files from different pools.
+   */
+  protected void createPool(PathFilter... filters) {
+    MultiPathFilter multi = new MultiPathFilter();
+    for (PathFilter f: filters) {
+      multi.add(f);
+    }
+    pools.add(multi);
+  }
+
+  /**
+   * default constructor.
+   */
+  public CombineFileInputFormat() {
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext job)
+    throws IOException {
+
+    long minSizeNode = 0;
+    long minSizeRack = 0;
+    long maxSize = 0;
+    Configuration conf = job.getConfiguration();
+
+    // the values specified by setxxxSplitSize() takes precedence over the
+    // values that might have been specified in the config
+    if (minSplitSizeNode != 0) {
+      minSizeNode = minSplitSizeNode;
+    } else {
+      minSizeNode = conf.getLong("mapred.min.split.size.per.node", 0);
+    }
+    if (minSplitSizeRack != 0) {
+      minSizeRack = minSplitSizeRack;
+    } else {
+      minSizeRack = conf.getLong("mapred.min.split.size.per.rack", 0);
+    }
+    if (maxSplitSize != 0) {
+      maxSize = maxSplitSize;
+    } else {
+      maxSize = conf.getLong("mapred.max.split.size", 0);
+    }
+    if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
+      throw new IOException("Minimum split size pernode " + minSizeNode
+                            + " cannot be larger than maximum split size "
+                            + maxSize);
+    }
+    if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
+      throw new IOException("Minimum split size per rack" + minSizeRack
+                            + " cannot be larger than maximum split size "
+                            + maxSize);
+    }
+    if (minSizeRack != 0 && minSizeNode > minSizeRack) {
+      throw new IOException("Minimum split size per node" + minSizeNode
+                            + " cannot be smaller than minimum split "
+                            + "size per rack " + minSizeRack);
+    }
+
+    // all the files in input set
+    Path[] paths = FileUtil.stat2Paths(
+                     listStatus(job).toArray(new FileStatus[0]));
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    if (paths.length == 0) {
+      return splits;
+    }
+
+    // Convert them to Paths first. This is a costly operation and
+    // we should do it first, otherwise we will incur doing it multiple
+    // times, one time each for each pool in the next loop.
+    List<Path> newpaths = new LinkedList<Path>();
+    for (int i = 0; i < paths.length; i++) {
+      Path p = new Path(paths[i].toUri().getPath());
+      newpaths.add(p);
+    }
+    paths = null;
+
+    // In one single iteration, process all the paths in a single pool.
+    // Processing one pool at a time ensures that a split contains paths
+    // from a single pool only.
+    for (MultiPathFilter onepool : pools) {
+      ArrayList<Path> myPaths = new ArrayList<Path>();
+
+      // pick one input path. If it matches all the filters in a pool,
+      // add it to the output set
+      for (Iterator<Path> iter = newpaths.iterator(); iter.hasNext();) {
+        Path p = iter.next();
+        if (onepool.accept(p)) {
+          myPaths.add(p); // add it to my output set
+          iter.remove();
+        }
+      }
+      // create splits for all files in this pool.
+      getMoreSplits(conf, myPaths.toArray(new Path[myPaths.size()]),
+                    maxSize, minSizeNode, minSizeRack, splits);
+    }
+
+    // create splits for all files that are not in any pool.
+    getMoreSplits(conf, newpaths.toArray(new Path[newpaths.size()]),
+                  maxSize, minSizeNode, minSizeRack, splits);
+
+    // free up rackToNodes map
+    rackToNodes.clear();
+    return splits;
+  }
+
+  /**
+   * Return all the splits in the specified set of paths.
+   */
+  // CHECKSTYLE:OFF
+  private void getMoreSplits(Configuration conf, Path[] paths,
+                             long maxSize, long minSizeNode, long minSizeRack,
+                             List<InputSplit> splits)
+    throws IOException {
+
+    // all blocks for all the files in input set
+    OneFileInfo[] files;
+
+    // mapping from a rack name to the list of blocks it has
+    HashMap<String, List<OneBlockInfo>> rackToBlocks =
+                              new HashMap<String, List<OneBlockInfo>>();
+
+    // mapping from a block to the nodes on which it has replicas
+    HashMap<OneBlockInfo, String[]> blockToNodes =
+                              new HashMap<OneBlockInfo, String[]>();
+
+    // mapping from a node to the list of blocks that it contains
+    HashMap<String, List<OneBlockInfo>> nodeToBlocks =
+                              new HashMap<String, List<OneBlockInfo>>();
+
+    files = new OneFileInfo[paths.length];
+    if (paths.length == 0) {
+      return;
+    }
+
+    // populate all the blocks for all files
+    long totLength = 0;
+    for (int i = 0; i < paths.length; i++) {
+      files[i] = new OneFileInfo(paths[i], conf, rackToBlocks, blockToNodes,
+                                      nodeToBlocks, rackToNodes);
+      totLength += files[i].getLength();
+    }
+
+    ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
+    ArrayList<String> nodes = new ArrayList<String>();
+    long curSplitSize = 0;
+
+    // process all nodes and create splits that are local
+    // to a node.
+    for (Iterator<Map.Entry<String,
+         List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();
+         iter.hasNext();) {
+
+      Map.Entry<String, List<OneBlockInfo>> one = iter.next();
+      nodes.add(one.getKey());
+      List<OneBlockInfo> blocksInNode = one.getValue();
+
+      // for each block, copy it into validBlocks. Delete it from
+      // blockToNodes so that the same block does not appear in
+      // two different splits.
+      for (OneBlockInfo oneblock : blocksInNode) {
+        if (blockToNodes.containsKey(oneblock)) {
+          validBlocks.add(oneblock);
+          blockToNodes.remove(oneblock);
+          curSplitSize += oneblock.length;
+
+          // if the accumulated split size exceeds the maximum, then
+          // create this split.
+          if (maxSize != 0 && curSplitSize >= maxSize) {
+            // create an input split and add it to the splits array
+            addCreatedSplit(splits, nodes, validBlocks);
+            curSplitSize = 0;
+            validBlocks.clear();
+          }
+        }
+      }
+      // if there were any blocks left over and their combined size is
+      // larger than minSplitNode, then combine them into one split.
+      // Otherwise add them back to the unprocessed pool. It is likely
+      // that they will be combined with other blocks from the
+      // same rack later on.
+      if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
+        // create an input split and add it to the splits array
+        addCreatedSplit(splits, nodes, validBlocks);
+      } else {
+        for (OneBlockInfo oneblock : validBlocks) {
+          blockToNodes.put(oneblock, oneblock.hosts);
+        }
+      }
+      validBlocks.clear();
+      nodes.clear();
+      curSplitSize = 0;
+    }
+
+    // if blocks in a rack are below the specified minimum size, then keep them
+    // in 'overflow'. After the processing of all racks is complete, these
+    // overflow blocks will be combined into splits.
+    ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
+    ArrayList<String> racks = new ArrayList<String>();
+
+    // Process all racks over and over again until there is no more work to do.
+    while (blockToNodes.size() > 0) {
+
+      // Create one split for this rack before moving over to the next rack.
+      // Come back to this rack after creating a single split for each of the
+      // remaining racks.
+      // Process one rack location at a time, Combine all possible blocks that
+      // reside on this rack as one split. (constrained by minimum and maximum
+      // split size).
+
+      // iterate over all racks
+      for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =
+           rackToBlocks.entrySet().iterator(); iter.hasNext();) {
+
+        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
+        racks.add(one.getKey());
+        List<OneBlockInfo> blocks = one.getValue();
+
+        // for each block, copy it into validBlocks. Delete it from
+        // blockToNodes so that the same block does not appear in
+        // two different splits.
+        boolean createdSplit = false;
+        for (OneBlockInfo oneblock : blocks) {
+          if (blockToNodes.containsKey(oneblock)) {
+            validBlocks.add(oneblock);
+            blockToNodes.remove(oneblock);
+            curSplitSize += oneblock.length;
+
+            // if the accumulated split size exceeds the maximum, then
+            // create this split.
+            if (maxSize != 0 && curSplitSize >= maxSize) {
+              // create an input split and add it to the splits array
+              addCreatedSplit(splits, getHosts(racks), validBlocks);
+              createdSplit = true;
+              break;
+            }
+          }
+        }
+
+        // if we created a split, then just go to the next rack
+        if (createdSplit) {
+          curSplitSize = 0;
+          validBlocks.clear();
+          racks.clear();
+          continue;
+        }
+
+        if (!validBlocks.isEmpty()) {
+          if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
+            // if there is a minimum size specified, then create a single split
+            // otherwise, store these blocks into overflow data structure
+            addCreatedSplit(splits, getHosts(racks), validBlocks);
+          } else {
+            // There were a few blocks in this rack that
+            // remained to be processed. Keep them in 'overflow' block list.
+            // These will be combined later.
+            overflowBlocks.addAll(validBlocks);
+          }
+        }
+        curSplitSize = 0;
+        validBlocks.clear();
+        racks.clear();
+      }
+    }
+
+    assert blockToNodes.isEmpty();
+    assert curSplitSize == 0;
+    assert validBlocks.isEmpty();
+    assert racks.isEmpty();
+
+    // Process all overflow blocks
+    for (OneBlockInfo oneblock : overflowBlocks) {
+      validBlocks.add(oneblock);
+      curSplitSize += oneblock.length;
+
+      // This might cause an exiting rack location to be re-added,
+      // but it should be ok.
+      for (int i = 0; i < oneblock.racks.length; i++) {
+        racks.add(oneblock.racks[i]);
+      }
+
+      // if the accumulated split size exceeds the maximum, then
+      // create this split.
+      if (maxSize != 0 && curSplitSize >= maxSize) {
+        // create an input split and add it to the splits array
+        addCreatedSplit(splits, getHosts(racks), validBlocks);
+        curSplitSize = 0;
+        validBlocks.clear();
+        racks.clear();
+      }
+    }
+
+    // Process any remaining blocks, if any.
+    if (!validBlocks.isEmpty()) {
+      addCreatedSplit(splits, getHosts(racks), validBlocks);
+    }
+  }
+  // CHECKSTYLE:ON
+
+  /**
+   * Create a single split from the list of blocks specified in validBlocks
+   * Add this new split into splitList.
+   */
+  private void addCreatedSplit(List<InputSplit> splitList,
+                               List<String> locations,
+                               ArrayList<OneBlockInfo> validBlocks) {
+    // create an input split
+    Path[] fl = new Path[validBlocks.size()];
+    long[] offset = new long[validBlocks.size()];
+    long[] length = new long[validBlocks.size()];
+    for (int i = 0; i < validBlocks.size(); i++) {
+      fl[i] = validBlocks.get(i).onepath;
+      offset[i] = validBlocks.get(i).offset;
+      length[i] = validBlocks.get(i).length;
+    }
+
+     // add this split to the list that is returned
+    CombineFileSplit thissplit = new CombineFileSplit(fl, offset,
+                                   length, locations.toArray(new String[0]));
+    splitList.add(thissplit);
+  }
+
+  /**
+   * This is not implemented yet.
+   */
+  public abstract RecordReader<K, V> createRecordReader(InputSplit split,
+      TaskAttemptContext context) throws IOException;
+
+  /**
+   * information about one file from the File System.
+   */
+  private static class OneFileInfo {
+    private long fileSize;               // size of the file
+    private OneBlockInfo[] blocks;       // all blocks in this file
+
+    OneFileInfo(Path path, Configuration conf,
+                HashMap<String, List<OneBlockInfo>> rackToBlocks,
+                HashMap<OneBlockInfo, String[]> blockToNodes,
+                HashMap<String, List<OneBlockInfo>> nodeToBlocks,
+                HashMap<String, Set<String>> rackToNodes)
+                throws IOException {
+      this.fileSize = 0;
+
+      // get block locations from file system
+      FileSystem fs = path.getFileSystem(conf);
+      FileStatus stat = fs.getFileStatus(path);
+      BlockLocation[] locations = fs.getFileBlockLocations(stat, 0,
+                                                           stat.getLen());
+      // create a list of all block and their locations
+      if (locations == null) {
+        blocks = new OneBlockInfo[0];
+      } else {
+        blocks = new OneBlockInfo[locations.length];
+        for (int i = 0; i < locations.length; i++) {
+
+          fileSize += locations[i].getLength();
+          OneBlockInfo oneblock =  new OneBlockInfo(path,
+                                       locations[i].getOffset(),
+                                       locations[i].getLength(),
+                                       locations[i].getHosts(),
+                                       locations[i].getTopologyPaths());
+          blocks[i] = oneblock;
+
+          // add this block to the block --> node locations map
+          blockToNodes.put(oneblock, oneblock.hosts);
+
+          // add this block to the rack --> block map
+          for (int j = 0; j < oneblock.racks.length; j++) {
+            String rack = oneblock.racks[j];
+            List<OneBlockInfo> blklist = rackToBlocks.get(rack);
+            if (blklist == null) {
+              blklist = new ArrayList<OneBlockInfo>();
+              rackToBlocks.put(rack, blklist);
+            }
+            blklist.add(oneblock);
+            // Add this host to rackToNodes map
+            addHostToRack(rackToNodes, oneblock.racks[j], oneblock.hosts[j]);
+         }
+
+          // add this block to the node --> block map
+          for (int j = 0; j < oneblock.hosts.length; j++) {
+            String node = oneblock.hosts[j];
+            List<OneBlockInfo> blklist = nodeToBlocks.get(node);
+            if (blklist == null) {
+              blklist = new ArrayList<OneBlockInfo>();
+              nodeToBlocks.put(node, blklist);
+            }
+            blklist.add(oneblock);
+          }
+        }
+      }
+    }
+
+    long getLength() {
+      return fileSize;
+    }
+
+    OneBlockInfo[] getBlocks() {
+      return blocks;
+    }
+  }
+
+  /**
+   * information about one block from the File System.
+   */
+  private static class OneBlockInfo {
+    // CHECKSTYLE:OFF
+    Path onepath;                // name of this file
+    long offset;                 // offset in file
+    long length;                 // length of this block
+    String[] hosts;              // nodes on which this block resides
+    String[] racks;              // network topology of hosts
+    // CHECKSTYLE:ON
+
+    OneBlockInfo(Path path, long offset, long len,
+                 String[] hosts, String[] topologyPaths) {
+      this.onepath = path;
+      this.offset = offset;
+      this.hosts = hosts;
+      this.length = len;
+      assert (hosts.length == topologyPaths.length
+               || topologyPaths.length == 0);
+
+      // if the file system does not have any rack information, then
+      // use dummy rack location.
+      if (topologyPaths.length == 0) {
+        topologyPaths = new String[hosts.length];
+        for (int i = 0; i < topologyPaths.length; i++) {
+          topologyPaths[i] = (new NodeBase(hosts[i],
+                              NetworkTopology.DEFAULT_RACK)).toString();
+        }
+      }
+
+      // The topology paths have the host name included as the last
+      // component. Strip it.
+      this.racks = new String[topologyPaths.length];
+      for (int i = 0; i < topologyPaths.length; i++) {
+        this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation();
+      }
+    }
+  }
+
+  private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
+                                    String rack, String host) {
+    Set<String> hosts = rackToNodes.get(rack);
+    if (hosts == null) {
+      hosts = new HashSet<String>();
+      rackToNodes.put(rack, hosts);
+    }
+    hosts.add(host);
+  }
+
+  private List<String> getHosts(List<String> racks) {
+    List<String> hosts = new ArrayList<String>();
+    for (String rack : racks) {
+      hosts.addAll(rackToNodes.get(rack));
+    }
+    return hosts;
+  }
+
+  /**
+   * Accept a path only if any one of filters given in the
+   * constructor do.
+   */
+  private static class MultiPathFilter implements PathFilter {
+    private List<PathFilter> filters;
+
+    public MultiPathFilter() {
+      this.filters = new ArrayList<PathFilter>();
+    }
+
+    public MultiPathFilter(List<PathFilter> filters) {
+      this.filters = filters;
+    }
+
+    public void add(PathFilter one) {
+      filters.add(one);
+    }
+
+    public boolean accept(Path path) {
+      for (PathFilter filter : filters) {
+        if (filter.accept(path)) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    public String toString() {
+      StringBuffer buf = new StringBuffer();
+      buf.append("[");
+      for (PathFilter f: filters) {
+        buf.append(f);
+        buf.append(",");
+      }
+      buf.append("]");
+      return buf.toString();
+    }
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineFileRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineFileRecordReader.java?rev=1221843&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineFileRecordReader.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineFileRecordReader.java Wed Dec 21 19:25:52 2011
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.*;
+import java.lang.reflect.*;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A generic RecordReader that can hand out different recordReaders
+ * for each chunk in a {@link CombineFileSplit}.
+ * A CombineFileSplit can combine data chunks from multiple files.
+ * This class allows using different RecordReaders for processing
+ * these data chunks from different files.
+ * @see CombineFileSplit
+ */
+
+public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
+
+  // CHECKSTYLE:OFF
+  static final Class [] constructorSignature = new Class []
+                                         {CombineFileSplit.class,
+                                          TaskAttemptContext.class,
+                                          Integer.class};
+  // CHECKSTYLE:ON
+
+  protected CombineFileSplit split;
+  protected Class<? extends RecordReader<K, V>> rrClass;
+  protected Constructor<? extends RecordReader<K, V>> rrConstructor;
+  protected FileSystem fs;
+  protected TaskAttemptContext context;
+
+  protected int idx;
+  protected long progress;
+  protected RecordReader<K, V> curReader;
+
+  public void initialize(InputSplit psplit,
+      TaskAttemptContext pcontext) throws IOException, InterruptedException {
+    this.split = (CombineFileSplit)psplit;
+    this.context = pcontext;
+    if (null != this.curReader) {
+      this.curReader.initialize(split, context);
+    }
+  }
+
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+
+    while ((curReader == null) || !curReader.nextKeyValue()) {
+      if (!initNextRecordReader()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public K getCurrentKey() throws IOException, InterruptedException {
+    return curReader.getCurrentKey();
+  }
+
+  public V getCurrentValue() throws IOException, InterruptedException {
+    return curReader.getCurrentValue();
+  }
+
+  public void close() throws IOException {
+    if (curReader != null) {
+      curReader.close();
+      curReader = null;
+    }
+  }
+
+  /**
+   * return progress based on the amount of data processed so far.
+   */
+  public float getProgress() throws IOException, InterruptedException {
+    long subprogress = 0;    // bytes processed in current split
+    if (null != curReader) {
+      // idx is always one past the current subsplit's true index.
+      subprogress = (long)(curReader.getProgress() * split.getLength(idx - 1));
+    }
+    return Math.min(1.0f,  (progress + subprogress)/(float)(split.getLength()));
+  }
+
+  /**
+   * A generic RecordReader that can hand out different recordReaders
+   * for each chunk in the CombineFileSplit.
+   */
+  public CombineFileRecordReader(CombineFileSplit split,
+                                 TaskAttemptContext context,
+                                 Class<? extends RecordReader<K, V>> rrClass)
+    throws IOException {
+    this.split = split;
+    this.context = context;
+    this.rrClass = rrClass;
+    this.idx = 0;
+    this.curReader = null;
+    this.progress = 0;
+
+    try {
+      rrConstructor = rrClass.getDeclaredConstructor(constructorSignature);
+      rrConstructor.setAccessible(true);
+    } catch (Exception e) {
+      throw new RuntimeException(rrClass.getName()
+                                 + " does not have valid constructor", e);
+    }
+    initNextRecordReader();
+  }
+
+  /**
+   * Get the record reader for the next chunk in this CombineFileSplit.
+   */
+  protected boolean initNextRecordReader() throws IOException {
+
+    if (curReader != null) {
+      curReader.close();
+      curReader = null;
+      if (idx > 0) {
+        progress += split.getLength(idx-1);    // done processing so far
+      }
+    }
+
+    // if all chunks have been processed, nothing more to do.
+    if (idx == split.getNumPaths()) {
+      return false;
+    }
+
+    // get a record reader for the idx-th chunk
+    try {
+      curReader =  rrConstructor.newInstance(new Object []
+                            {split, context, Integer.valueOf(idx)});
+
+      Configuration conf = context.getConfiguration();
+      // setup some helper config variables.
+      conf.set("map.input.file", split.getPath(idx).toString());
+      conf.setLong("map.input.start", split.getOffset(idx));
+      conf.setLong("map.input.length", split.getLength(idx));
+
+      curReader =  rrConstructor.newInstance(new Object []
+                            {split, context, Integer.valueOf(idx)});
+
+      if (idx > 0) {
+        // initialize() for the first RecordReader will be called by MapTask;
+        // we're responsible for initializing subsequent RecordReaders.
+        curReader.initialize(split, context);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    idx++;
+    return true;
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineFileRecordReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineFileSplit.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineFileSplit.java?rev=1221843&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineFileSplit.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineFileSplit.java Wed Dec 21 19:25:52 2011
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * A sub-collection of input files.
+ *
+ * Unlike {@link FileSplit}, CombineFileSplit class does not represent
+ * a split of a file, but a split of input files into smaller sets.
+ * A split may contain blocks from different file but all
+ * the blocks in the same split are probably local to some rack <br>
+ * CombineFileSplit can be used to implement {@link RecordReader}'s,
+ * with reading one record per file.
+ *
+ * @see FileSplit
+ * @see CombineFileInputFormat
+ */
+public class CombineFileSplit extends InputSplit implements Writable {
+
+  private Path[] paths;
+  private long[] startoffset;
+  private long[] lengths;
+  private String[] locations;
+  private long totLength;
+
+  /**
+   * default constructor.
+   */
+  public CombineFileSplit() {}
+  public CombineFileSplit(Path[] files, long[] start,
+                          long[] lengths, String[] locations) {
+    initSplit(files, start, lengths, locations);
+  }
+
+  public CombineFileSplit(Path[] files, long[] lengths) {
+    long[] pstartoffset = new long[files.length];
+    for (int i = 0; i < pstartoffset.length; i++) {
+      pstartoffset[i] = 0;
+    }
+    String[] plocations = new String[files.length];
+    for (int i = 0; i < plocations.length; i++) {
+      plocations[i] = "";
+    }
+    initSplit(files, pstartoffset, lengths, plocations);
+  }
+
+  private void initSplit(Path[] files, long[] start,
+                         long[] plengths, String[] plocations) {
+    this.startoffset = start;
+    this.lengths = plengths;
+    this.paths = files;
+    this.totLength = 0;
+    this.locations = plocations;
+    for(long length : lengths) {
+      totLength += length;
+    }
+  }
+
+  /**
+   * Copy constructor.
+   */
+  public CombineFileSplit(CombineFileSplit old) throws IOException {
+    this(old.getPaths(), old.getStartOffsets(),
+         old.getLengths(), old.getLocations());
+  }
+
+  public long getLength() {
+    return totLength;
+  }
+
+  /** Returns an array containing the start offsets of the files in the split.*/
+  public long[] getStartOffsets() {
+    return startoffset;
+  }
+
+  /** Returns an array containing the lengths of the files in the split.*/
+  public long[] getLengths() {
+    return lengths;
+  }
+
+  /** Returns the start offset of the i<sup>th</sup> Path.*/
+  public long getOffset(int i) {
+    return startoffset[i];
+  }
+
+  /** Returns the length of the i<sup>th</sup> Path. */
+  public long getLength(int i) {
+    return lengths[i];
+  }
+
+  /** Returns the number of Paths in the split.*/
+  public int getNumPaths() {
+    return paths.length;
+  }
+
+  /** Returns the i<sup>th</sup> Path. */
+  public Path getPath(int i) {
+    return paths[i];
+  }
+
+  /** Returns all the Paths in the split. */
+  public Path[] getPaths() {
+    return paths;
+  }
+
+  /** Returns all the Paths where this input-split resides. */
+  public String[] getLocations() throws IOException {
+    return locations;
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    totLength = in.readLong();
+    int arrLength = in.readInt();
+    lengths = new long[arrLength];
+    for(int i=0; i<arrLength; i++) {
+      lengths[i] = in.readLong();
+    }
+    int filesLength = in.readInt();
+    paths = new Path[filesLength];
+    for(int i=0; i<filesLength; i++) {
+      paths[i] = new Path(Text.readString(in));
+    }
+    arrLength = in.readInt();
+    startoffset = new long[arrLength];
+    for(int i=0; i<arrLength; i++) {
+      startoffset[i] = in.readLong();
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(totLength);
+    out.writeInt(lengths.length);
+    for(long length : lengths) {
+      out.writeLong(length);
+    }
+    out.writeInt(paths.length);
+    for(Path p : paths) {
+      Text.writeString(out, p.toString());
+    }
+    out.writeInt(startoffset.length);
+    for(long length : startoffset) {
+      out.writeLong(length);
+    }
+  }
+
+  @Override
+ public String toString() {
+    StringBuffer sb = new StringBuffer();
+    for (int i = 0; i < paths.length; i++) {
+      if (i == 0) {
+        sb.append("Paths:");
+      }
+      sb.append(paths[i].toUri().getPath() + ":" + startoffset[i]
+                + "+" + lengths[i]);
+      if (i < paths.length -1) {
+        sb.append(",");
+      }
+    }
+    if (locations != null) {
+      String locs = "";
+      StringBuffer locsb = new StringBuffer();
+      for (int i = 0; i < locations.length; i++) {
+        locsb.append(locations[i] + ":");
+      }
+      locs = locsb.toString();
+      sb.append(" Locations:" + locs + "; ");
+    }
+    return sb.toString();
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineFileSplit.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineShimRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineShimRecordReader.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineShimRecordReader.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineShimRecordReader.java Wed Dec 21 19:25:52 2011
@@ -26,7 +26,6 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DBWritable.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DBWritable.java?rev=1221843&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DBWritable.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DBWritable.java Wed Dec 21 19:25:52 2011
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * Objects that are read from/written to a database should implement
+ * <code>DBWritable</code>. DBWritable, is similar to {@link Writable}
+ * except that the {@link #write(PreparedStatement)} method takes a
+ * {@link PreparedStatement}, and {@link #readFields(ResultSet)}
+ * takes a {@link ResultSet}.
+ * <p>
+ * Implementations are responsible for writing the fields of the object
+ * to PreparedStatement, and reading the fields of the object from the
+ * ResultSet.
+ *
+ * <p>Example:</p>
+ * If we have the following table in the database:
+ * <pre>
+ * CREATE TABLE MyTable (
+ *   counter        INTEGER NOT NULL,
+ *   timestamp      BIGINT  NOT NULL,
+ * );
+ * </pre>
+ * then we can read/write the tuples from/to the table with:
+ * <p><pre>
+ * public class MyWritable implements Writable, DBWritable {
+ *   // Some data
+ *   private int counter;
+ *   private long timestamp;
+ *
+ *   //Writable#write() implementation
+ *   public void write(DataOutput out) throws IOException {
+ *     out.writeInt(counter);
+ *     out.writeLong(timestamp);
+ *   }
+ *
+ *   //Writable#readFields() implementation
+ *   public void readFields(DataInput in) throws IOException {
+ *     counter = in.readInt();
+ *     timestamp = in.readLong();
+ *   }
+ *
+ *   public void write(PreparedStatement statement) throws SQLException {
+ *     statement.setInt(1, counter);
+ *     statement.setLong(2, timestamp);
+ *   }
+ *
+ *   public void readFields(ResultSet resultSet) throws SQLException {
+ *     counter = resultSet.getInt(1);
+ *     timestamp = resultSet.getLong(2);
+ *   }
+ * }
+ * </pre></p>
+ */
+public interface DBWritable {
+
+  /**
+   * Sets the fields of the object in the {@link PreparedStatement}.
+   * @param statement the statement that the fields are put into.
+   * @throws SQLException
+   */
+  void write(PreparedStatement statement) throws SQLException;
+
+  /**
+   * Reads the fields of the object from the {@link ResultSet}.
+   * @param resultSet the {@link ResultSet} to get the fields from.
+   * @throws SQLException
+   */
+  void readFields(ResultSet resultSet) throws SQLException;
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DBWritable.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java Wed Dec 21 19:25:52 2011
@@ -29,7 +29,6 @@ import org.apache.hadoop.mapreduce.Input
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.config.ConfigurationHelper;

Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportInputFormat.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportInputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportInputFormat.java Wed Dec 21 19:25:52 2011
@@ -28,9 +28,6 @@ import org.apache.hadoop.mapreduce.Input
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
 
 /**
  * InputFormat that generates a user-defined number of splits to inject data

Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpImportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpImportJob.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpImportJob.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpImportJob.java Wed Dec 21 19:25:52 2011
@@ -24,7 +24,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.manager.ConnManager;
 import com.cloudera.sqoop.manager.ImportJobContext;

Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLExportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLExportJob.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLExportJob.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLExportJob.java Wed Dec 21 19:25:52 2011
@@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 import com.cloudera.sqoop.manager.ConnManager;
 import com.cloudera.sqoop.manager.ExportJobContext;
 import com.cloudera.sqoop.manager.MySQLUtils;

Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBConfiguration.java Wed Dec 21 19:25:52 2011
@@ -22,7 +22,7 @@ import java.sql.DriverManager;
 import java.sql.SQLException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.sqoop.mapreduce.DBWritable;
 
 import com.cloudera.sqoop.mapreduce.db.DBInputFormat.NullDBWritable;
 

Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java Wed Dec 21 19:25:52 2011
@@ -39,7 +39,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.sqoop.mapreduce.DBWritable;
 
 import com.cloudera.sqoop.config.ConfigurationHelper;
 import com.cloudera.sqoop.mapreduce.db.DBConfiguration;

Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBOutputFormat.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBOutputFormat.java Wed Dec 21 19:25:52 2011
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.sqoop.mapreduce.DBWritable;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.StringUtils;

Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DBRecordReader.java Wed Dec 21 19:25:52 2011
@@ -31,7 +31,7 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.sqoop.mapreduce.DBWritable;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import com.cloudera.sqoop.mapreduce.db.DBConfiguration;

Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBInputFormat.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBInputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBInputFormat.java Wed Dec 21 19:25:52 2011
@@ -38,7 +38,7 @@ import org.apache.hadoop.mapreduce.Input
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.sqoop.mapreduce.DBWritable;
 
 import com.cloudera.sqoop.config.ConfigurationHelper;
 import com.cloudera.sqoop.mapreduce.db.BigDecimalSplitter;

Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/DataDrivenDBRecordReader.java Wed Dec 21 19:25:52 2011
@@ -25,7 +25,7 @@ import java.sql.SQLException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.sqoop.mapreduce.DBWritable;
 
 import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
 import com.cloudera.sqoop.mapreduce.db.DBInputFormat;

Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/OracleDBRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/OracleDBRecordReader.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/OracleDBRecordReader.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/OracleDBRecordReader.java Wed Dec 21 19:25:52 2011
@@ -25,7 +25,7 @@ import java.sql.SQLException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.sqoop.mapreduce.DBWritable;
 
 import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
 import com.cloudera.sqoop.mapreduce.db.DBInputFormat;

Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/OracleDataDrivenDBInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/OracleDataDrivenDBInputFormat.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/OracleDataDrivenDBInputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/OracleDataDrivenDBInputFormat.java Wed Dec 21 19:25:52 2011
@@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configurab
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.sqoop.mapreduce.DBWritable;
 
 import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
 import com.cloudera.sqoop.mapreduce.db.DBSplitter;

Modified: incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/OracleDataDrivenDBRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/OracleDataDrivenDBRecordReader.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/OracleDataDrivenDBRecordReader.java (original)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/db/OracleDataDrivenDBRecordReader.java Wed Dec 21 19:25:52 2011
@@ -21,7 +21,7 @@ import java.sql.Connection;
 import java.sql.SQLException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.sqoop.mapreduce.DBWritable;
 
 import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
 import com.cloudera.sqoop.mapreduce.db.DBInputFormat;

Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/mapreduce/db/TestDataDrivenDBInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/mapreduce/db/TestDataDrivenDBInputFormat.java?rev=1221843&r1=1221842&r2=1221843&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/mapreduce/db/TestDataDrivenDBInputFormat.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/mapreduce/db/TestDataDrivenDBInputFormat.java Wed Dec 21 19:25:52 2011
@@ -31,10 +31,12 @@ import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.lib.db.*;
-import org.apache.hadoop.mapreduce.lib.output.*;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.sqoop.mapreduce.DBWritable;
 
 /**
  * Test aspects of DataDrivenDBInputFormat.



Mime
View raw message