sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject git commit: QOOP-705: Framework-defined text/sequence partitioner/extractor for HDFS
Date Tue, 04 Dec 2012 20:09:01 GMT
Updated Branches:
  refs/heads/sqoop2 cf3d71049 -> c4f9ef846


QOOP-705: Framework-defined text/sequence partitioner/extractor for HDFS

(Bilung Lee via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/c4f9ef84
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/c4f9ef84
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/c4f9ef84

Branch: refs/heads/sqoop2
Commit: c4f9ef846cc142f3a292c255db370a9a267cfade
Parents: cf3d710
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Tue Dec 4 12:08:20 2012 -0800
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Tue Dec 4 12:08:20 2012 -0800

----------------------------------------------------------------------
 .../java/org/apache/sqoop/job/PrefixContext.java   |    8 +
 .../apache/sqoop/job/etl/HdfsExportPartition.java  |  150 ++++
 .../sqoop/job/etl/HdfsExportPartitioner.java       |  554 +++++++++++++++
 .../sqoop/job/etl/HdfsSequenceExportExtractor.java |   88 +++
 .../sqoop/job/etl/HdfsTextExportExtractor.java     |  140 ++++
 .../java/org/apache/sqoop/job/TestHdfsExtract.java |  253 +++++++
 6 files changed, 1193 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4f9ef84/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java
index 5488b46..c3beed7 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java
@@ -59,4 +59,12 @@ public class PrefixContext implements ImmutableContext {
   public boolean getBoolean(String key, boolean defaultValue) {
     return configuration.getBoolean(prefix + key, defaultValue);
   }
+
+  /*
+   * TODO: Use getter methods for retrieval instead of
+   * exposing configuration directly.
+   */
+  public Configuration getConfiguration() {
+    return configuration;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4f9ef84/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartition.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartition.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartition.java
new file mode 100644
index 0000000..0e0e53f
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartition.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.job.etl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This class derives mostly from CombineFileSplit of Hadoop, i.e.
+ * org.apache.hadoop.mapreduce.lib.input.CombineFileSplit.
+ */
+public class HdfsExportPartition extends Partition {
+
+  private long lenFiles;
+  private int numFiles;
+  private Path[] files;
+  private long[] offsets;
+  private long[] lengths;
+  private String[] locations;
+
+  public HdfsExportPartition() {}
+
+  public HdfsExportPartition(Path[] files, long[] offsets,
+      long[] lengths, String[] locations) {
+    for(long length : lengths) {
+      this.lenFiles += length;
+    }
+    this.numFiles = files.length;
+    this.files = files;
+    this.offsets = offsets;
+    this.lengths = lengths;
+    this.locations = locations;
+  }
+
+  public long getLengthOfFiles() {
+    return lenFiles;
+  }
+
+  public int getNumberOfFiles() {
+    return numFiles;
+  }
+
+  public Path getFile(int i) {
+    return files[i];
+  }
+
+  public long getOffset(int i) {
+    return offsets[i];
+  }
+
+  public long getLength(int i) {
+    return lengths[i];
+  }
+
+  public String[] getLocations() {
+    return locations;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    numFiles = in.readInt();
+
+    files = new Path[numFiles];
+    for(int i=0; i<numFiles; i++) {
+      files[i] = new Path(in.readUTF());
+    }
+
+    offsets = new long[numFiles];
+    for(int i=0; i<numFiles; i++) {
+      offsets[i] = in.readLong();
+    }
+
+    lengths = new long[numFiles];
+    for(int i=0; i<numFiles; i++) {
+      lengths[i] = in.readLong();
+    }
+
+    for(long length : lengths) {
+      lenFiles += length;
+    }
+
+    int numLocations = in.readInt();
+    if (numLocations == 0) {
+      locations = null;
+    } else {
+      locations = new String[numLocations];
+      for(int i=0; i<numLocations; i++) {
+        locations[i] = in.readUTF();
+      }
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(numFiles);
+
+    for(Path file : files) {
+      out.writeUTF(file.toString());
+    }
+
+    for(long offset : offsets) {
+      out.writeLong(offset);
+    }
+
+    for(long length : lengths) {
+      out.writeLong(length);
+    }
+
+    if (locations == null || locations.length == 0) {
+      out.writeInt(0);
+    } else {
+      out.writeInt(locations.length);
+      for(String location : locations) {
+        out.writeUTF(location);
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("{");
+    sb.append(files[0]);
+    for(int i = 1; i < files.length; i++) {
+      sb.append(", " + files[i]);
+    }
+    sb.append("}");
+    return sb.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4f9ef84/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
new file mode 100644
index 0000000..abe986e
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
@@ -0,0 +1,554 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.job.etl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.job.Constants;
+import org.apache.sqoop.job.MapreduceExecutionError;
+import org.apache.sqoop.job.PrefixContext;
+
+/**
+ * This class derives mostly from CombineFileInputFormat of Hadoop, i.e.
+ * org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.
+ */
+public class HdfsExportPartitioner extends Partitioner {
+
+  public static final String SPLIT_MINSIZE_PERNODE =
+      "mapreduce.input.fileinputformat.split.minsize.per.node";
+  public static final String SPLIT_MINSIZE_PERRACK =
+      "mapreduce.input.fileinputformat.split.minsize.per.rack";
+
+  // ability to limit the size of a single split
+  private long maxSplitSize = 0;
+  private long minSplitSizeNode = 0;
+  private long minSplitSizeRack = 0;
+
+  // mapping from a rack name to the set of Nodes in the rack
+  private HashMap<String, Set<String>> rackToNodes =
+      new HashMap<String, Set<String>>();
+
+  @Override
+  public List<Partition> getPartitions(ImmutableContext context,
+      Object connectionConfiguration, Object jobConfiguration) {
+    Configuration conf = ((PrefixContext)context).getConfiguration();
+
+    try {
+      int numTasks = Integer.parseInt(conf.get(
+          Constants.JOB_ETL_NUMBER_PARTITIONS));
+      long numInputBytes = getInputSize(conf);
+      maxSplitSize = numInputBytes / numTasks;
+
+      long minSizeNode = 0;
+      long minSizeRack = 0;
+      long maxSize = 0;
+
+      // the values specified by setxxxSplitSize() takes precedence over the
+      // values that might have been specified in the config
+      if (minSplitSizeNode != 0) {
+        minSizeNode = minSplitSizeNode;
+      } else {
+        minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
+      }
+      if (minSplitSizeRack != 0) {
+        minSizeRack = minSplitSizeRack;
+      } else {
+        minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
+      }
+      if (maxSplitSize != 0) {
+        maxSize = maxSplitSize;
+      } else {
+        maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
+      }
+      if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
+        throw new IOException("Minimum split size pernode " + minSizeNode +
+                              " cannot be larger than maximum split size " +
+                              maxSize);
+      }
+      if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
+        throw new IOException("Minimum split size per rack" + minSizeRack +
+                              " cannot be larger than maximum split size " +
+                              maxSize);
+      }
+      if (minSizeRack != 0 && minSizeNode > minSizeRack) {
+        throw new IOException("Minimum split size per node" + minSizeNode +
+                              " cannot be smaller than minimum split " +
+                              "size per rack " + minSizeRack);
+      }
+
+      // all the files in input set
+      String indir = conf.get(FileInputFormat.INPUT_DIR);
+      FileSystem fs = FileSystem.get(conf);
+      Path[] paths = FileUtil.stat2Paths(fs.listStatus(new Path(indir)));
+      List<Partition> partitions = new ArrayList<Partition>();
+      if (paths.length == 0) {
+        return partitions;
+      }
+
+      // Convert them to Paths first. This is a costly operation and
+      // we should do it first, otherwise we will incur doing it multiple
+      // times, one time each for each pool in the next loop.
+      List<Path> newpaths = new ArrayList<Path>();
+      for (int i = 0; i < paths.length; i++) {
+        Path p = new Path(paths[i].toUri().getPath());
+        newpaths.add(p);
+      }
+      paths = null;
+
+      // create splits for all files that are not in any pool.
+      getMoreSplits(conf, newpaths.toArray(new Path[newpaths.size()]),
+                    maxSize, minSizeNode, minSizeRack, partitions);
+
+      // free up rackToNodes map
+      rackToNodes.clear();
+
+      return partitions;
+
+    } catch (IOException e) {
+      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0021, e);
+    }
+  }
+
+  private long getInputSize(Configuration conf) throws IOException {
+    String indir = conf.get(FileInputFormat.INPUT_DIR);
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] files = fs.listStatus(new Path(indir));
+    long count = 0;
+    for (FileStatus file : files) {
+      count += file.getLen();
+    }
+    return count;
+  }
+
+  /**
+   * Return all the splits in the specified set of paths
+   */
+  private void getMoreSplits(Configuration conf, Path[] paths,
+      long maxSize, long minSizeNode, long minSizeRack,
+      List<Partition> partitions) throws IOException {
+
+    // all blocks for all the files in input set
+    OneFileInfo[] files;
+
+    // mapping from a rack name to the list of blocks it has
+    HashMap<String, List<OneBlockInfo>> rackToBlocks =
+                              new HashMap<String, List<OneBlockInfo>>();
+
+    // mapping from a block to the nodes on which it has replicas
+    HashMap<OneBlockInfo, String[]> blockToNodes =
+                              new HashMap<OneBlockInfo, String[]>();
+
+    // mapping from a node to the list of blocks that it contains
+    HashMap<String, List<OneBlockInfo>> nodeToBlocks =
+                              new HashMap<String, List<OneBlockInfo>>();
+
+    files = new OneFileInfo[paths.length];
+    if (paths.length == 0) {
+      return;
+    }
+
+    // populate all the blocks for all files
+    for (int i = 0; i < paths.length; i++) {
+      files[i] = new OneFileInfo(paths[i], conf, isSplitable(conf, paths[i]),
+                                 rackToBlocks, blockToNodes, nodeToBlocks,
+                                 rackToNodes, maxSize);
+    }
+
+    ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
+    Set<String> nodes = new HashSet<String>();
+    long curSplitSize = 0;
+
+    // process all nodes and create splits that are local
+    // to a node.
+    for (Iterator<Map.Entry<String,
+         List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();
+         iter.hasNext();) {
+
+      Map.Entry<String, List<OneBlockInfo>> one = iter.next();
+      nodes.add(one.getKey());
+      List<OneBlockInfo> blocksInNode = one.getValue();
+
+      // for each block, copy it into validBlocks. Delete it from
+      // blockToNodes so that the same block does not appear in
+      // two different splits.
+      for (OneBlockInfo oneblock : blocksInNode) {
+        if (blockToNodes.containsKey(oneblock)) {
+          validBlocks.add(oneblock);
+          blockToNodes.remove(oneblock);
+          curSplitSize += oneblock.length;
+
+          // if the accumulated split size exceeds the maximum, then
+          // create this split.
+          if (maxSize != 0 && curSplitSize >= maxSize) {
+            // create an input split and add it to the splits array
+            addCreatedSplit(partitions, nodes, validBlocks);
+            curSplitSize = 0;
+            validBlocks.clear();
+          }
+        }
+      }
+      // if there were any blocks left over and their combined size is
+      // larger than minSplitNode, then combine them into one split.
+      // Otherwise add them back to the unprocessed pool. It is likely
+      // that they will be combined with other blocks from the
+      // same rack later on.
+      if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
+        // create an input split and add it to the splits array
+        addCreatedSplit(partitions, nodes, validBlocks);
+      } else {
+        for (OneBlockInfo oneblock : validBlocks) {
+          blockToNodes.put(oneblock, oneblock.hosts);
+        }
+      }
+      validBlocks.clear();
+      nodes.clear();
+      curSplitSize = 0;
+    }
+
+    // if blocks in a rack are below the specified minimum size, then keep them
+    // in 'overflow'. After the processing of all racks is complete, these
+    // overflow blocks will be combined into splits.
+    ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
+    Set<String> racks = new HashSet<String>();
+
+    // Process all racks over and over again until there is no more work to do.
+    while (blockToNodes.size() > 0) {
+
+      // Create one split for this rack before moving over to the next rack.
+      // Come back to this rack after creating a single split for each of the
+      // remaining racks.
+      // Process one rack location at a time, Combine all possible blocks that
+      // reside on this rack as one split. (constrained by minimum and maximum
+      // split size).
+
+      // iterate over all racks
+      for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =
+           rackToBlocks.entrySet().iterator(); iter.hasNext();) {
+
+        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
+        racks.add(one.getKey());
+        List<OneBlockInfo> blocks = one.getValue();
+
+        // for each block, copy it into validBlocks. Delete it from
+        // blockToNodes so that the same block does not appear in
+        // two different splits.
+        boolean createdSplit = false;
+        for (OneBlockInfo oneblock : blocks) {
+          if (blockToNodes.containsKey(oneblock)) {
+            validBlocks.add(oneblock);
+            blockToNodes.remove(oneblock);
+            curSplitSize += oneblock.length;
+
+            // if the accumulated split size exceeds the maximum, then
+            // create this split.
+            if (maxSize != 0 && curSplitSize >= maxSize) {
+              // create an input split and add it to the splits array
+              addCreatedSplit(partitions, getHosts(racks), validBlocks);
+              createdSplit = true;
+              break;
+            }
+          }
+        }
+
+        // if we created a split, then just go to the next rack
+        if (createdSplit) {
+          curSplitSize = 0;
+          validBlocks.clear();
+          racks.clear();
+          continue;
+        }
+
+        if (!validBlocks.isEmpty()) {
+          if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
+            // if there is a minimum size specified, then create a single split
+            // otherwise, store these blocks into overflow data structure
+            addCreatedSplit(partitions, getHosts(racks), validBlocks);
+          } else {
+            // There were a few blocks in this rack that
+            // remained to be processed. Keep them in 'overflow' block list.
+            // These will be combined later.
+            overflowBlocks.addAll(validBlocks);
+          }
+        }
+        curSplitSize = 0;
+        validBlocks.clear();
+        racks.clear();
+      }
+    }
+
+    assert blockToNodes.isEmpty();
+    assert curSplitSize == 0;
+    assert validBlocks.isEmpty();
+    assert racks.isEmpty();
+
+    // Process all overflow blocks
+    for (OneBlockInfo oneblock : overflowBlocks) {
+      validBlocks.add(oneblock);
+      curSplitSize += oneblock.length;
+
+      // This might cause an exiting rack location to be re-added,
+      // but it should be ok.
+      for (int i = 0; i < oneblock.racks.length; i++) {
+        racks.add(oneblock.racks[i]);
+      }
+
+      // if the accumulated split size exceeds the maximum, then
+      // create this split.
+      if (maxSize != 0 && curSplitSize >= maxSize) {
+        // create an input split and add it to the splits array
+        addCreatedSplit(partitions, getHosts(racks), validBlocks);
+        curSplitSize = 0;
+        validBlocks.clear();
+        racks.clear();
+      }
+    }
+
+    // Process any remaining blocks, if any.
+    if (!validBlocks.isEmpty()) {
+      addCreatedSplit(partitions, getHosts(racks), validBlocks);
+    }
+  }
+
+  private boolean isSplitable(Configuration conf, Path file) {
+    final CompressionCodec codec =
+        new CompressionCodecFactory(conf).getCodec(file);
+    if (null == codec) {
+      return true;
+    }
+    return codec instanceof SplittableCompressionCodec;
+  }
+
+  /**
+   * Create a single split from the list of blocks specified in validBlocks
+   * Add this new split into list.
+   */
+  private void addCreatedSplit(List<Partition> partitions,
+                               Collection<String> locations,
+                               ArrayList<OneBlockInfo> validBlocks) {
+    // create an input split
+    Path[] files = new Path[validBlocks.size()];
+    long[] offsets = new long[validBlocks.size()];
+    long[] lengths = new long[validBlocks.size()];
+    for (int i = 0; i < validBlocks.size(); i++) {
+      files[i] = validBlocks.get(i).onepath;
+      offsets[i] = validBlocks.get(i).offset;
+      lengths[i] = validBlocks.get(i).length;
+    }
+
+     // add this split to the list that is returned
+    HdfsExportPartition partition = new HdfsExportPartition(
+        files, offsets, lengths, locations.toArray(new String[0]));
+    partitions.add(partition);
+  }
+
+  private Set<String> getHosts(Set<String> racks) {
+    Set<String> hosts = new HashSet<String>();
+    for (String rack : racks) {
+      if (rackToNodes.containsKey(rack)) {
+        hosts.addAll(rackToNodes.get(rack));
+      }
+    }
+    return hosts;
+  }
+
+  private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
+      String rack, String host) {
+    Set<String> hosts = rackToNodes.get(rack);
+    if (hosts == null) {
+      hosts = new HashSet<String>();
+      rackToNodes.put(rack, hosts);
+    }
+    hosts.add(host);
+  }
+
+  /**
+   * information about one file from the File System
+   */
+  private static class OneFileInfo {
+    private long fileSize;               // size of the file
+    private OneBlockInfo[] blocks;       // all blocks in this file
+
+    OneFileInfo(Path path, Configuration conf,
+                boolean isSplitable,
+                HashMap<String, List<OneBlockInfo>> rackToBlocks,
+                HashMap<OneBlockInfo, String[]> blockToNodes,
+                HashMap<String, List<OneBlockInfo>> nodeToBlocks,
+                HashMap<String, Set<String>> rackToNodes,
+                long maxSize)
+                throws IOException {
+      this.fileSize = 0;
+
+      // get block locations from file system
+      FileSystem fs = path.getFileSystem(conf);
+      FileStatus stat = fs.getFileStatus(path);
+      BlockLocation[] locations = fs.getFileBlockLocations(stat, 0,
+                                                           stat.getLen());
+      // create a list of all block and their locations
+      if (locations == null) {
+        blocks = new OneBlockInfo[0];
+      } else {
+        if (!isSplitable) {
+          // if the file is not splitable, just create the one block with
+          // full file length
+          blocks = new OneBlockInfo[1];
+          fileSize = stat.getLen();
+          blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0]
+              .getHosts(), locations[0].getTopologyPaths());
+        } else {
+          ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
+              locations.length);
+          for (int i = 0; i < locations.length; i++) {
+            fileSize += locations[i].getLength();
+
+            // each split can be a maximum of maxSize
+            long left = locations[i].getLength();
+            long myOffset = locations[i].getOffset();
+            long myLength = 0;
+            do {
+              if (maxSize == 0) {
+                myLength = left;
+              } else {
+                if (left > maxSize && left < 2 * maxSize) {
+                  // if remainder is between max and 2*max - then
+                  // instead of creating splits of size max, left-max we
+                  // create splits of size left/2 and left/2. This is
+                  // a heuristic to avoid creating really really small
+                  // splits.
+                  myLength = left / 2;
+                } else {
+                  myLength = Math.min(maxSize, left);
+                }
+              }
+              OneBlockInfo oneblock = new OneBlockInfo(path, myOffset,
+                  myLength, locations[i].getHosts(), locations[i]
+                      .getTopologyPaths());
+              left -= myLength;
+              myOffset += myLength;
+
+              blocksList.add(oneblock);
+            } while (left > 0);
+          }
+          blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
+        }
+
+        for (OneBlockInfo oneblock : blocks) {
+          // add this block to the block --> node locations map
+          blockToNodes.put(oneblock, oneblock.hosts);
+
+          // For blocks that do not have host/rack information,
+          // assign to default  rack.
+          String[] racks = null;
+          if (oneblock.hosts.length == 0) {
+            racks = new String[]{NetworkTopology.DEFAULT_RACK};
+          } else {
+            racks = oneblock.racks;
+          }
+
+          // add this block to the rack --> block map
+          for (int j = 0; j < racks.length; j++) {
+            String rack = racks[j];
+            List<OneBlockInfo> blklist = rackToBlocks.get(rack);
+            if (blklist == null) {
+              blklist = new ArrayList<OneBlockInfo>();
+              rackToBlocks.put(rack, blklist);
+            }
+            blklist.add(oneblock);
+            if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
+              // Add this host to rackToNodes map
+              addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
+            }
+          }
+
+          // add this block to the node --> block map
+          for (int j = 0; j < oneblock.hosts.length; j++) {
+            String node = oneblock.hosts[j];
+            List<OneBlockInfo> blklist = nodeToBlocks.get(node);
+            if (blklist == null) {
+              blklist = new ArrayList<OneBlockInfo>();
+              nodeToBlocks.put(node, blklist);
+            }
+            blklist.add(oneblock);
+          }
+        }
+      }
+    }
+
+  }
+
+  /**
+   * information about one block from the File System
+   */
+  private static class OneBlockInfo {
+    Path onepath;                // name of this file
+    long offset;                 // offset in file
+    long length;                 // length of this block
+    String[] hosts;              // nodes on which this block resides
+    String[] racks;              // network topology of hosts
+
+    OneBlockInfo(Path path, long offset, long len,
+                 String[] hosts, String[] topologyPaths) {
+      this.onepath = path;
+      this.offset = offset;
+      this.hosts = hosts;
+      this.length = len;
+      assert (hosts.length == topologyPaths.length ||
+              topologyPaths.length == 0);
+
+      // if the file system does not have any rack information, then
+      // use dummy rack location.
+      if (topologyPaths.length == 0) {
+        topologyPaths = new String[hosts.length];
+        for (int i = 0; i < topologyPaths.length; i++) {
+          topologyPaths[i] = (new NodeBase(hosts[i],
+                              NetworkTopology.DEFAULT_RACK)).toString();
+        }
+      }
+
+      // The topology paths have the host name included as the last
+      // component. Strip it.
+      this.racks = new String[topologyPaths.length];
+      for (int i = 0; i < topologyPaths.length; i++) {
+        this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4f9ef84/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
new file mode 100644
index 0000000..1f6714d
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.job.MapreduceExecutionError;
+import org.apache.sqoop.job.PrefixContext;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataWriter;
+
+public class HdfsSequenceExportExtractor extends Extractor {
+
+  private Configuration conf;
+  private DataWriter datawriter;
+
+  private final char fieldDelimiter;
+
+  public HdfsSequenceExportExtractor() {
+    fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
+  }
+
+  @Override
+  public void run(ImmutableContext context, Object connectionConfiguration,
+      Object jobConfiguration, Partition partition, DataWriter writer) {
+    writer.setFieldDelimiter(fieldDelimiter);
+
+    conf = ((PrefixContext)context).getConfiguration();
+    datawriter = writer;
+
+    try {
+      HdfsExportPartition p = (HdfsExportPartition)partition;
+      int numFiles = p.getNumberOfFiles();
+      for (int i=0; i<numFiles; i++) {
+        extractFile(p.getFile(i), p.getOffset(i), p.getLength(i));
+      }
+    } catch (IOException e) {
+      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e);
+    }
+  }
+
+  private void extractFile(Path file, long offset, long length)
+      throws IOException {
+    long start = offset;
+
+    SequenceFile.Reader filereader = new SequenceFile.Reader(conf,
+        SequenceFile.Reader.file(file));
+
+    if (start > filereader.getPosition()) {
+      filereader.sync(start); // sync to start
+    }
+
+    Text line = new Text();
+    boolean hasNext = filereader.next(line);
+    while (hasNext) {
+      datawriter.writeCsvRecord(line.toString());
+      hasNext = filereader.next(line);
+    }
+  }
+
+  @Override
+  public long getRowsRead() {
+    // TODO need to return the rows read
+    return 0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4f9ef84/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
new file mode 100644
index 0000000..7f1b144
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.SplitCompressionInputStream;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
+import org.apache.hadoop.util.LineReader;
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.job.MapreduceExecutionError;
+import org.apache.sqoop.job.PrefixContext;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataWriter;
+
+public class HdfsTextExportExtractor extends Extractor {
+
+  private Configuration conf;
+  private DataWriter datawriter;
+
+  private final char fieldDelimiter;
+
+  public HdfsTextExportExtractor() {
+    fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
+  }
+
+  @Override
+  public void run(ImmutableContext context, Object connectionConfiguration,
+      Object jobConfiguration, Partition partition, DataWriter writer) {
+    writer.setFieldDelimiter(fieldDelimiter);
+
+    conf = ((PrefixContext)context).getConfiguration();
+    datawriter = writer;
+
+    try {
+      HdfsExportPartition p = (HdfsExportPartition)partition;
+      int numFiles = p.getNumberOfFiles();
+      for (int i=0; i<numFiles; i++) {
+        extractFile(p.getFile(i), p.getOffset(i), p.getLength(i));
+      }
+    } catch (IOException e) {
+      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e);
+    }
+  }
+
+  private void extractFile(Path file, long offset, long length)
+      throws IOException {
+    long start = offset;
+    long end = start + length;
+
+    FileSystem fs = file.getFileSystem(conf);
+    FSDataInputStream filestream = fs.open(file);
+    CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file);
+    LineReader filereader;
+    Seekable fileseeker;
+
+    if (codec == null) {
+      filestream.seek(start);
+      byte[] recordDelimiterBytes = String.valueOf(
+          Data.DEFAULT_RECORD_DELIMITER).getBytes(
+              Charset.forName(Data.CHARSET_NAME));
+      filereader = new LineReader(filestream, conf,
+          recordDelimiterBytes);
+      fileseeker = filestream;
+
+    } else if (codec instanceof SplittableCompressionCodec) {
+      SplitCompressionInputStream compressionstream =
+          ((SplittableCompressionCodec)codec).createInputStream(
+              filestream, codec.createDecompressor(), start, end,
+              SplittableCompressionCodec.READ_MODE.BYBLOCK);
+      byte[] recordDelimiterBytes = String.valueOf(
+          Data.DEFAULT_RECORD_DELIMITER).getBytes(
+              Charset.forName(Data.CHARSET_NAME));
+      filereader = new LineReader(compressionstream,
+          conf, recordDelimiterBytes);
+      fileseeker = compressionstream;
+
+      start = compressionstream.getAdjustedStart();
+      end = compressionstream.getAdjustedEnd();
+
+    } else {
+      byte[] recordDelimiterBytes = String.valueOf(
+          Data.DEFAULT_RECORD_DELIMITER).getBytes(
+              Charset.forName(Data.CHARSET_NAME));
+      filereader = new LineReader(
+          codec.createInputStream(filestream, codec.createDecompressor()),
+          conf, recordDelimiterBytes);
+      fileseeker = filestream;
+    }
+
+    if (start != 0) {
+      // always throw away first record because
+      // one extra line is read in previous split
+      start += filereader.readLine(new Text(), 0);
+    }
+
+    Text line = new Text();
+    int size;
+    while (fileseeker.getPos() <= end) {
+      size = filereader.readLine(line, Integer.MAX_VALUE);
+      if (size == 0) {
+        break;
+      }
+
+      datawriter.writeCsvRecord(line.toString());
+    }
+  }
+
+  @Override
+  public long getRowsRead() {
+    // TODO need to return the rows read
+    return 0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4f9ef84/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
new file mode 100644
index 0000000..585fac7
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.job.etl.HdfsExportPartitioner;
+import org.apache.sqoop.job.etl.HdfsSequenceExportExtractor;
+import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
+import org.apache.sqoop.job.etl.HdfsTextExportExtractor;
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataReader;
+import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
+import org.junit.Test;
+
+public class TestHdfsExtract extends TestCase {
+
+  private static final String INPUT_ROOT = "/tmp/sqoop/warehouse/";
+  private static final int NUMBER_OF_FILES = 5;
+  private static final int NUMBER_OF_ROWS_PER_FILE = 1000;
+
+  private String indir;
+
+  public TestHdfsExtract() {
+    indir = INPUT_ROOT + getClass().getSimpleName();
+  }
+
+  @Test
+  public void testUncompressedText() throws Exception {
+    FileUtils.delete(indir);
+    FileUtils.mkdirs(indir);
+    createTextInput(null);
+
+    Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_TYPE, "EXPORT");
+    conf.set(JobConstants.JOB_ETL_PARTITIONER,
+        HdfsExportPartitioner.class.getName());
+    conf.set(JobConstants.JOB_ETL_EXTRACTOR,
+        HdfsTextExportExtractor.class.getName());
+    conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
+    conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
+    conf.set(FileInputFormat.INPUT_DIR, indir);
+    JobUtils.runJob(conf);
+  }
+
+  @Test
+  public void testCompressedText() throws Exception {
+    FileUtils.delete(indir);
+    FileUtils.mkdirs(indir);
+    createTextInput(SqoopFileOutputFormat.DEFAULT_CODEC);
+
+    Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_TYPE, "EXPORT");
+    conf.set(JobConstants.JOB_ETL_PARTITIONER,
+        HdfsExportPartitioner.class.getName());
+    conf.set(JobConstants.JOB_ETL_EXTRACTOR,
+        HdfsTextExportExtractor.class.getName());
+    conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
+    conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
+    conf.set(FileInputFormat.INPUT_DIR, indir);
+    JobUtils.runJob(conf);
+
+    FileUtils.delete(indir);
+    FileUtils.mkdirs(indir);
+    createTextInput(BZip2Codec.class);
+
+    conf.set(JobConstants.JOB_ETL_PARTITIONER,
+        HdfsExportPartitioner.class.getName());
+    conf.set(JobConstants.JOB_ETL_EXTRACTOR,
+        HdfsTextExportExtractor.class.getName());
+    conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
+    conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
+    conf.set(FileInputFormat.INPUT_DIR, indir);
+    JobUtils.runJob(conf);
+  }
+
+  @Test
+  public void testUncompressedSequence() throws Exception {
+    FileUtils.delete(indir);
+    FileUtils.mkdirs(indir);
+    createSequenceInput(SqoopFileOutputFormat.DEFAULT_CODEC);
+
+    Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_TYPE, "EXPORT");
+    conf.set(JobConstants.JOB_ETL_PARTITIONER,
+        HdfsExportPartitioner.class.getName());
+    conf.set(JobConstants.JOB_ETL_EXTRACTOR,
+        HdfsSequenceExportExtractor.class.getName());
+    conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
+    conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
+    conf.set(FileInputFormat.INPUT_DIR, indir);
+    JobUtils.runJob(conf);
+  }
+
+  @Test
+  public void testCompressedSequence() throws Exception {
+    FileUtils.delete(indir);
+    FileUtils.mkdirs(indir);
+    createSequenceInput(null);
+
+    Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_TYPE, "EXPORT");
+    conf.set(JobConstants.JOB_ETL_PARTITIONER,
+        HdfsExportPartitioner.class.getName());
+    conf.set(JobConstants.JOB_ETL_EXTRACTOR,
+        HdfsSequenceExportExtractor.class.getName());
+    conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
+    conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
+    conf.set(FileInputFormat.INPUT_DIR, indir);
+    JobUtils.runJob(conf);
+  }
+
+  private void createTextInput(Class<? extends CompressionCodec> clz)
+      throws IOException, InstantiationException, IllegalAccessException {
+    Configuration conf = new Configuration();
+
+    CompressionCodec codec = null;
+    String extension = "";
+    if (clz != null) {
+      codec = clz.newInstance();
+      if (codec instanceof Configurable) {
+        ((Configurable) codec).setConf(conf);
+      }
+      extension = codec.getDefaultExtension();
+    }
+
+    int index = 1;
+    for (int fi=0; fi<NUMBER_OF_FILES; fi++) {
+      String fileName = indir + "/" + "part-r-" + padZeros(fi, 5) + extension;
+      OutputStream filestream = FileUtils.create(fileName);
+      BufferedWriter filewriter;
+      if (codec != null) {
+        filewriter = new BufferedWriter(new OutputStreamWriter(
+            codec.createOutputStream(filestream, codec.createCompressor()),
+            Data.CHARSET_NAME));
+      } else {
+        filewriter = new BufferedWriter(new OutputStreamWriter(
+            filestream, Data.CHARSET_NAME));
+      }
+
+      for (int ri=0; ri<NUMBER_OF_ROWS_PER_FILE; ri++) {
+        String row = index + "," + (double)index + ",'" + index + "'";
+        filewriter.write(row + Data.DEFAULT_RECORD_DELIMITER);
+        index++;
+      }
+
+      filewriter.close();
+    }
+  }
+
+  private void createSequenceInput(Class<? extends CompressionCodec> clz)
+      throws IOException, InstantiationException, IllegalAccessException {
+    Configuration conf = new Configuration();
+
+    CompressionCodec codec = null;
+    if (clz != null) {
+      codec = clz.newInstance();
+      if (codec instanceof Configurable) {
+        ((Configurable) codec).setConf(conf);
+      }
+    }
+
+    int index = 1;
+    for (int fi=0; fi<NUMBER_OF_FILES; fi++) {
+      Path filepath = new Path(indir,
+          "part-r-" + padZeros(fi, 5) + HdfsSequenceImportLoader.EXTENSION);
+      SequenceFile.Writer filewriter;
+      if (codec != null) {
+        filewriter = SequenceFile.createWriter(conf,
+            SequenceFile.Writer.file(filepath),
+            SequenceFile.Writer.keyClass(Text.class),
+            SequenceFile.Writer.valueClass(NullWritable.class),
+            SequenceFile.Writer.compression(CompressionType.BLOCK, codec));
+      } else {
+        filewriter = SequenceFile.createWriter(conf,
+          SequenceFile.Writer.file(filepath),
+          SequenceFile.Writer.keyClass(Text.class),
+          SequenceFile.Writer.valueClass(NullWritable.class),
+          SequenceFile.Writer.compression(CompressionType.NONE));
+      }
+
+      Text text = new Text();
+      for (int ri=0; ri<NUMBER_OF_ROWS_PER_FILE; ri++) {
+        String row = index + "," + (double)index + ",'" + index + "'";
+        text.set(row);
+        filewriter.append(text, NullWritable.get());
+        index++;
+      }
+
+      filewriter.close();
+    }
+  }
+
+  private String padZeros(int number, int digits) {
+    String string = String.valueOf(number);
+    for (int i=(digits-string.length()); i>0; i--) {
+      string = "0" + string;
+    }
+    return string;
+  }
+
+  public static class DummyLoader extends Loader {
+    @Override
+    public void run(ImmutableContext context, DataReader reader)
+        throws Exception {
+      int index = 1;
+      int sum = 0;
+      Object[] array;
+      while ((array = reader.readArrayRecord()) != null) {
+        sum += Integer.valueOf(array[0].toString());
+        index++;
+      };
+
+      int numbers = NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE;
+      assertEquals((1+numbers)*numbers/2, sum);
+
+      assertEquals(NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE, index-1);
+    }
+  }
+
+}


Mime
View raw message