sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [02/50] [abbrv] git commit: SQOOP-1487: Sqoop2: From/To: Refactor/Create HDFS connector test cases
Date Fri, 10 Oct 2014 03:06:54 GMT
SQOOP-1487: Sqoop2: From/To: Refactor/Create HDFS connector test cases

(Abraham Elmahrek via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/c0b22b1d
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/c0b22b1d
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/c0b22b1d

Branch: refs/heads/sqoop2
Commit: c0b22b1d62221f2a24294520b8ff3851c06d06c3
Parents: 8fee134
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Wed Sep 3 09:40:11 2014 +0200
Committer: Abraham Elmahrek <abraham@elmahrek.com>
Committed: Thu Oct 9 17:58:17 2014 -0700

----------------------------------------------------------------------
 connector/connector-hdfs/pom.xml                |   5 +
 .../sqoop/connector/hdfs/HdfsExtractor.java     |   2 -
 .../apache/sqoop/connector/hdfs/FileUtils.java  |  82 ++++++
 .../sqoop/connector/hdfs/TestExtractor.java     | 125 ++++++++++
 .../sqoop/connector/hdfs/TestHdfsBase.java      | 139 +++++++++++
 .../apache/sqoop/connector/hdfs/TestLoader.java | 213 ++++++++++++++++
 .../sqoop/connector/hdfs/TestPartitioner.java   | 113 +++++++++
 .../src/test/resources/log4j.properties         |  24 ++
 .../java/org/apache/sqoop/job/FileUtils.java    |  69 -----
 .../org/apache/sqoop/job/TestHdfsExtract.java   | 241 ------------------
 .../java/org/apache/sqoop/job/TestHdfsLoad.java | 250 -------------------
 11 files changed, 701 insertions(+), 562 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b22b1d/connector/connector-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/pom.xml b/connector/connector-hdfs/pom.xml
index 8df9f11..fa4330a 100644
--- a/connector/connector-hdfs/pom.xml
+++ b/connector/connector-hdfs/pom.xml
@@ -35,6 +35,11 @@ limitations under the License.
 
   <dependencies>
     <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.sqoop</groupId>
       <artifactId>sqoop-spi</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b22b1d/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
index fc12381..7447071 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
@@ -42,8 +42,6 @@ import java.io.IOException;
  * Extract from HDFS.
  * Default field delimiter of a record is comma.
  */
-
-
 public class HdfsExtractor extends Extractor<ConnectionConfiguration, FromJobConfiguration, HdfsPartition> {
 
   public static final Logger LOG = Logger.getLogger(HdfsExtractor.class);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b22b1d/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/FileUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/FileUtils.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/FileUtils.java
new file mode 100644
index 0000000..8c19d01
--- /dev/null
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/FileUtils.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.List;
+
+public class FileUtils {
+
+  public static boolean exists(String file) throws IOException {
+    Path path = new Path(file);
+    FileSystem fs = path.getFileSystem(new Configuration());
+    return fs.exists(path);
+  }
+
+  public static void delete(String file) throws IOException {
+    Path path = new Path(file);
+    FileSystem fs = path.getFileSystem(new Configuration());
+    if (fs.exists(path)) {
+      fs.delete(path, true);
+    }
+  }
+
+  public static void mkdirs(String directory) throws IOException {
+    Path path = new Path(directory);
+    FileSystem fs = path.getFileSystem(new Configuration());
+    if (!fs.exists(path)) {
+      fs.mkdirs(path);
+    }
+  }
+
+  public static InputStream open(String fileName)
+    throws IOException, ClassNotFoundException {
+    Path filepath = new Path(fileName);
+    FileSystem fs = filepath.getFileSystem(new Configuration());
+    return fs.open(filepath);
+  }
+
+  public static OutputStream create(String fileName) throws IOException {
+    Path filepath = new Path(fileName);
+    FileSystem fs = filepath.getFileSystem(new Configuration());
+    return fs.create(filepath, false);
+  }
+
+  public static Path[] listDir(String directory) throws IOException {
+    Path dirpath = new Path(directory);
+    FileSystem fs = dirpath.getFileSystem(new Configuration());
+    List<Path> paths = new LinkedList<Path>();
+    for (FileStatus fileStatus : fs.listStatus(dirpath)) {
+      paths.add(fileStatus.getPath());
+    }
+    return paths.toArray(new Path[paths.size()]);
+  }
+
+  private FileUtils() {
+    // Disable explicit object creation
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b22b1d/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
new file mode 100644
index 0000000..6ed4087
--- /dev/null
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.sqoop.common.PrefixContext;
+import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.OutputFormat;
+import org.apache.sqoop.etl.io.DataWriter;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.ExtractorContext;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.SEQUENCE_FILE;
+import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.TEXT_FILE;
+
+@RunWith(Parameterized.class)
+public class TestExtractor extends TestHdfsBase {
+  private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/";
+  private static final int NUMBER_OF_FILES = 5;
+  private static final int NUMBER_OF_ROWS_PER_FILE = 1000;
+
+  private OutputFormat outputFileType;
+  private Class<? extends CompressionCodec> compressionClass;
+  private final String inputDirectory;
+  private Extractor extractor;
+
+  public TestExtractor(OutputFormat outputFileType,
+                       Class<? extends CompressionCodec> compressionClass)
+      throws Exception {
+    this.inputDirectory = INPUT_ROOT + getClass().getSimpleName();
+    this.outputFileType = outputFileType;
+    this.compressionClass = compressionClass;
+    this.extractor = new HdfsExtractor();
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    List<Object[]> parameters = new ArrayList<Object[]>();
+    for (Class<?> compressionClass : new Class<?>[]{null, DefaultCodec.class, BZip2Codec.class}) {
+      for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE}) {
+        parameters.add(new Object[]{outputFileType, compressionClass});
+      }
+    }
+    return parameters;
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    FileUtils.mkdirs(inputDirectory);
+    switch (this.outputFileType) {
+      case TEXT_FILE:
+        createTextInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE);
+        break;
+
+      case SEQUENCE_FILE:
+        createSequenceInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE);
+        break;
+    }
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    FileUtils.delete(inputDirectory);
+  }
+
+  @Test
+  public void testExtractor() throws Exception {
+    Configuration conf = new Configuration();
+    PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
+    ExtractorContext context = new ExtractorContext(prefixContext, new DataWriter() {
+      private long index = 1L;
+
+      @Override
+      public void writeArrayRecord(Object[] array) {
+        throw new AssertionError("Should not be writing array.");
+      }
+
+      @Override
+      public void writeStringRecord(String text) {
+        Assert.assertEquals(index + "," + index + ".0,'" + index++ + "'", text);
+      }
+
+      @Override
+      public void writeRecord(Object obj) {
+        throw new AssertionError("Should not be writing object.");
+      }
+    }, null);
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    FromJobConfiguration jobConf = new FromJobConfiguration();
+
+    HdfsPartition partition = createPartition(FileUtils.listDir(inputDirectory));
+
+    extractor.extract(context, connConf, jobConf, partition);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b22b1d/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java
new file mode 100644
index 0000000..0cc2b8b
--- /dev/null
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+
+public class TestHdfsBase {
+
+  protected HdfsPartition createPartition(Path[] paths) throws IOException {
+    long[] offsets = new long[paths.length];
+    long[] lengths = new long[paths.length];
+    String[] locations = new String[paths.length];
+    FileSystem fs = FileSystem.get(new Configuration());
+
+    for (int i = 0; i < offsets.length; ++i) {
+      locations[i] = paths[i].getName();
+      lengths[i] = fs.getFileStatus(paths[i]).getLen();
+    }
+
+    return new HdfsPartition(paths, offsets, lengths, locations);
+  }
+
+  protected void createTextInput(String indir,
+                                Class<? extends CompressionCodec> clz,
+                                int numberOfFiles,
+                                int numberOfRows)
+      throws IOException, InstantiationException, IllegalAccessException {
+    Configuration conf = new Configuration();
+
+    CompressionCodec codec = null;
+    String extension = "";
+    if (clz != null) {
+      codec = clz.newInstance();
+      if (codec instanceof Configurable) {
+        ((Configurable) codec).setConf(conf);
+      }
+      extension = codec.getDefaultExtension();
+    }
+
+    int index = 1;
+    for (int fi = 0; fi < numberOfFiles; fi++) {
+      String fileName = indir + "/" + "part-r-" + padZeros(fi, 5) + extension;
+      OutputStream filestream = FileUtils.create(fileName);
+      BufferedWriter filewriter;
+      if (codec != null) {
+        filewriter = new BufferedWriter(new OutputStreamWriter(
+            codec.createOutputStream(filestream, codec.createCompressor()),
+            "UTF-8"));
+      } else {
+        filewriter = new BufferedWriter(new OutputStreamWriter(
+            filestream, "UTF-8"));
+      }
+
+      for (int ri = 0; ri < numberOfRows; ri++) {
+        String row = index + "," + (double)index + ",'" + index + "'";
+        filewriter.write(row + HdfsConstants.DEFAULT_RECORD_DELIMITER);
+        index++;
+      }
+
+      filewriter.close();
+    }
+  }
+
+  protected void createSequenceInput(String indir,
+                                    Class<? extends CompressionCodec> clz,
+                                    int numberOfFiles,
+                                    int numberOfRows)
+      throws IOException, InstantiationException, IllegalAccessException {
+    Configuration conf = new Configuration();
+
+    CompressionCodec codec = null;
+    if (clz != null) {
+      codec = clz.newInstance();
+      if (codec instanceof Configurable) {
+        ((Configurable) codec).setConf(conf);
+      }
+    }
+
+    int index = 1;
+    for (int fi = 0; fi < numberOfFiles; fi++) {
+      Path filepath = new Path(indir,
+          "part-r-" + padZeros(fi, 5) + ".seq");
+      SequenceFile.Writer filewriter;
+      if (codec != null) {
+        filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
+            conf, filepath, Text.class, NullWritable.class,
+            SequenceFile.CompressionType.BLOCK, codec);
+      } else {
+        filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
+            conf, filepath, Text.class, NullWritable.class, SequenceFile.CompressionType.NONE);
+      }
+
+      Text text = new Text();
+      for (int ri = 0; ri < numberOfRows; ri++) {
+        String row = index + "," + (double)index + ",'" + index + "'";
+        text.set(row);
+        filewriter.append(text, NullWritable.get());
+        index++;
+      }
+
+      filewriter.close();
+    }
+  }
+
+  private String padZeros(int number, int digits) {
+    String string = String.valueOf(number);
+    for (int i = (digits - string.length()); i > 0; i--) {
+      string = "0" + string;
+    }
+    return string;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b22b1d/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
new file mode 100644
index 0000000..79cf1f1
--- /dev/null
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.sqoop.common.PrefixContext;
+import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.OutputCompression;
+import org.apache.sqoop.connector.hdfs.configuration.OutputFormat;
+import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
+import org.apache.sqoop.etl.io.DataReader;
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.etl.LoaderContext;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.SEQUENCE_FILE;
+import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.TEXT_FILE;
+
+@RunWith(Parameterized.class)
+public class TestLoader extends TestHdfsBase {
+  private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/";
+  private static final int NUMBER_OF_ROWS_PER_FILE = 1000;
+
+  private OutputFormat outputFormat;
+  private OutputCompression compression;
+  private final String outputDirectory;
+  private Loader loader;
+
+  public TestLoader(OutputFormat outputFormat,
+                    OutputCompression compression)
+      throws Exception {
+    this.outputDirectory = INPUT_ROOT + getClass().getSimpleName();
+    this.outputFormat = outputFormat;
+    this.compression = compression;
+    this.loader = new HdfsLoader();
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    List<Object[]> parameters = new ArrayList<Object[]>();
+    for (OutputCompression compression : new OutputCompression[]{
+        OutputCompression.DEFAULT,
+        OutputCompression.BZIP2,
+        OutputCompression.NONE
+    }) {
+      for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE}) {
+        parameters.add(new Object[]{outputFileType, compression});
+      }
+    }
+    return parameters;
+  }
+
+  @Before
+  public void setUp() throws Exception {}
+
+  @After
+  public void tearDown() throws IOException {
+    FileUtils.delete(outputDirectory);
+  }
+
+  @Test
+  public void testLoader() throws Exception {
+    FileSystem fs = FileSystem.get(new Configuration());
+
+    Configuration conf = new Configuration();
+    PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
+    LoaderContext context = new LoaderContext(prefixContext, new DataReader() {
+      private long index = 0L;
+
+      @Override
+      public Object[] readArrayRecord() {
+        return null;
+      }
+
+      @Override
+      public String readTextRecord() {
+        if (index++ < NUMBER_OF_ROWS_PER_FILE) {
+          return index + "," + (double)index + ",'" + index + "'";
+        } else {
+          return null;
+        }
+      }
+
+      @Override
+      public Object readContent() {
+        return null;
+      }
+    }, null);
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    ToJobConfiguration jobConf = new ToJobConfiguration();
+    jobConf.output.outputDirectory = outputDirectory;
+    jobConf.output.compression = compression;
+    jobConf.output.outputFormat = outputFormat;
+    Path outputPath = new Path(outputDirectory);
+
+    loader.load(context, connConf, jobConf);
+    Assert.assertEquals(1, fs.listStatus(outputPath).length);
+
+    for (FileStatus status : fs.listStatus(outputPath)) {
+      verifyOutput(fs, status.getPath());
+    }
+
+    loader.load(context, connConf, jobConf);
+    Assert.assertEquals(2, fs.listStatus(outputPath).length);
+    loader.load(context, connConf, jobConf);
+    loader.load(context, connConf, jobConf);
+    loader.load(context, connConf, jobConf);
+    Assert.assertEquals(5, fs.listStatus(outputPath).length);
+  }
+
+  private void verifyOutput(FileSystem fs, Path file) throws IOException {
+    Configuration conf = new Configuration();
+    FSDataInputStream fsin = fs.open(file);
+    CompressionCodec codec;
+
+    switch(outputFormat) {
+      case TEXT_FILE:
+        codec = (new CompressionCodecFactory(conf)).getCodec(file);
+
+        // Verify compression
+        switch(compression) {
+          case BZIP2:
+            Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("BZip2") != -1);
+            break;
+
+          case DEFAULT:
+            Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("Deflate") != -1);
+            break;
+
+          case NONE:
+          default:
+            Assert.assertNull(codec);
+            break;
+        }
+
+        InputStreamReader in;
+        if (codec == null) {
+          in = new InputStreamReader(fsin);
+        } else {
+          in = new InputStreamReader(codec.createInputStream(fsin, codec.createDecompressor()));
+        }
+        BufferedReader textReader = new BufferedReader(in);
+
+        for (int i = 1; i <= NUMBER_OF_ROWS_PER_FILE; ++i) {
+          Assert.assertEquals(i + "," + (double)i + ",'" + i + "'", textReader.readLine());
+        }
+        break;
+
+      case SEQUENCE_FILE:
+        SequenceFile.Reader sequenceReader = new SequenceFile.Reader(fs, file, conf);
+        codec = sequenceReader.getCompressionCodec();
+
+        // Verify compression
+        switch(compression) {
+          case BZIP2:
+            Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("BZip2") != -1);
+            break;
+
+          case DEFAULT:
+            Assert.assertTrue(codec.getClass().getCanonicalName().indexOf("Default") != -1);
+            break;
+
+          case NONE:
+          default:
+            Assert.assertNull(codec);
+            break;
+        }
+
+        Text line = new Text();
+        int index = 1;
+        while (sequenceReader.next(line)) {
+          Assert.assertEquals(index + "," + (double)index + ",'" + index++ + "'", line.toString());
+          line = new Text();
+        }
+        break;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b22b1d/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java
new file mode 100644
index 0000000..ae93b0a
--- /dev/null
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.sqoop.common.PrefixContext;
+import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.OutputFormat;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.etl.PartitionerContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.sqoop.connector.hdfs.configuration.OutputFormat.*;
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class TestPartitioner extends TestHdfsBase {
+  private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/";
+  private static final int NUMBER_OF_FILES = 5;
+  private static final int NUMBER_OF_ROWS_PER_FILE = 1000;
+
+  private OutputFormat outputFileType;
+  private Class<? extends CompressionCodec> compressionClass;
+  private Partitioner partitioner;
+
+  private final String inputDirectory;
+
+  public TestPartitioner(OutputFormat outputFileType, Class<? extends CompressionCodec> compressionClass) {
+    this.inputDirectory = INPUT_ROOT + getClass().getSimpleName();
+    this.outputFileType = outputFileType;
+    this.compressionClass = compressionClass;
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    partitioner = new HdfsPartitioner();
+    FileUtils.mkdirs(inputDirectory);
+
+    switch (this.outputFileType) {
+      case TEXT_FILE:
+        createTextInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE);
+        break;
+
+      case SEQUENCE_FILE:
+        createSequenceInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE);
+        break;
+    }
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    FileUtils.delete(inputDirectory);
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    List<Object[]> parameters = new ArrayList<Object[]>();
+    for (Class<?> compressionClass : new Class<?>[]{null, DefaultCodec.class, BZip2Codec.class}) {
+      for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE}) {
+        parameters.add(new Object[]{outputFileType, compressionClass});
+      }
+    }
+    return parameters;
+  }
+
+  @Test
+  public void testPartitioner() {
+    Configuration conf = new Configuration();
+    PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
+    PartitionerContext context = new PartitionerContext(prefixContext, 5, null);
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    FromJobConfiguration jobConf = new FromJobConfiguration();
+
+    jobConf.input.inputDirectory = inputDirectory;
+
+    List<Partition> partitions = partitioner.getPartitions(context, connConf, jobConf);
+
+    if (this.compressionClass == null) {
+      assertEquals(5, partitions.size());
+    } else {
+      assertEquals(3, partitions.size());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b22b1d/connector/connector-hdfs/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/resources/log4j.properties b/connector/connector-hdfs/src/test/resources/log4j.properties
new file mode 100644
index 0000000..44ffced
--- /dev/null
+++ b/connector/connector-hdfs/src/test/resources/log4j.properties
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=DEBUG, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b22b1d/execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java
deleted file mode 100644
index e685883..0000000
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/FileUtils.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-public class FileUtils {
-
-  public static boolean exists(String file) throws IOException {
-    Path path = new Path(file);
-    FileSystem fs = path.getFileSystem(new Configuration());
-    return fs.exists(path);
-  }
-
-  public static void delete(String file) throws IOException {
-    Path path = new Path(file);
-    FileSystem fs = path.getFileSystem(new Configuration());
-    if (fs.exists(path)) {
-      fs.delete(path, true);
-    }
-  }
-
-  public static void mkdirs(String directory) throws IOException {
-    Path path = new Path(directory);
-    FileSystem fs = path.getFileSystem(new Configuration());
-    if (!fs.exists(path)) {
-      fs.mkdirs(path);
-    }
-  }
-
-  public static InputStream open(String fileName)
-    throws IOException, ClassNotFoundException {
-    Path filepath = new Path(fileName);
-    FileSystem fs = filepath.getFileSystem(new Configuration());
-    return fs.open(filepath);
-  }
-
-  public static OutputStream create(String fileName) throws IOException {
-    Path filepath = new Path(fileName);
-    FileSystem fs = filepath.getFileSystem(new Configuration());
-    return fs.create(filepath, false);
-  }
-
-  private FileUtils() {
-    // Disable explicit object creation
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b22b1d/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
deleted file mode 100644
index 2accf77..0000000
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job;
-
-import junit.framework.TestCase;
-
-//import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
-//import org.apache.sqoop.job.etl.HdfsExportExtractor;
-//import org.apache.sqoop.job.etl.HdfsExportPartitioner;
-
-public class TestHdfsExtract extends TestCase {
-
-//  private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/";
-//  private static final int NUMBER_OF_FILES = 5;
-//  private static final int NUMBER_OF_ROWS_PER_FILE = 1000;
-//
-//  private final String indir;
-//
-//  public TestHdfsExtract() {
-//    indir = INPUT_ROOT + getClass().getSimpleName();
-//  }
-//
-//  @Override
-//  public void setUp() throws IOException {
-//    FileUtils.mkdirs(indir);
-//  }
-//
-//  @Override
-//  public void tearDown() throws IOException {
-//    FileUtils.delete(indir);
-//  }
-//
-//  /**
-//   * Test case for validating the number of partitions creation
-//   * based on input.
-//   * Success if the partitions list size is less or equal to
-//   * given max partition.
-//   * @throws Exception
-//   */
-//  @Test
-//  public void testHdfsExportPartitioner() throws Exception {
-//    createTextInput(null);
-//    Configuration conf = new Configuration();
-//    conf.set(JobConstants.HADOOP_INPUTDIR, indir);
-//
-//    conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
-//      CSVIntermediateDataFormat.class.getName());
-//    HdfsExportPartitioner partitioner = new HdfsExportPartitioner();
-//    PrefixContext prefixContext = new PrefixContext(conf, "");
-//    int[] partitionValues = {2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 17};
-//
-//    for(int maxPartitions : partitionValues) {
-//      PartitionerContext partCont = new PartitionerContext(prefixContext, maxPartitions, null);
-//      List<Partition> partitionList = partitioner.getPartitions(partCont, null, null);
-//      assertTrue(partitionList.size()<=maxPartitions);
-//    }
-//  }
-//
-//  @Test
-//  public void testUncompressedText() throws Exception {
-//    createTextInput(null);
-//
-//    JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
-//  }
-//
-//  @Test
-//  public void testDefaultCompressedText() throws Exception {
-//    createTextInput(SqoopFileOutputFormat.DEFAULT_CODEC);
-//
-//    JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
-//  }
-//
-//  @Test
-//  public void testBZip2CompressedText() throws Exception {
-//    createTextInput(BZip2Codec.class);
-//
-//    JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
-//  }
-//
-//  @Test
-//  public void testDefaultCompressedSequence() throws Exception {
-//    createSequenceInput(SqoopFileOutputFormat.DEFAULT_CODEC);
-//
-//    JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
-//  }
-//
-//  @Test
-//  public void testUncompressedSequence() throws Exception {
-//    createSequenceInput(null);
-//
-//    JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
-//  }
-//
-//  private Schema createSchema() {
-//    Schema schema = new Schema("Test");
-//    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
-//    .addColumn(new org.apache.sqoop.schema.type.Text("3"));
-//    return schema;
-//  }
-//
-//  private Configuration createConf() {
-//    Configuration conf = new Configuration();
-//    ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
-//    conf.setIfUnset(JobConstants.JOB_ETL_PARTITIONER,
-//        HdfsExportPartitioner.class.getName());
-//    conf.setIfUnset(JobConstants.JOB_ETL_EXTRACTOR,
-//        HdfsExportExtractor.class.getName());
-//    conf.setIfUnset(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
-//    conf.setIfUnset(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
-//    conf.setIfUnset(JobConstants.INTERMEDIATE_DATA_FORMAT,
-//        CSVIntermediateDataFormat.class.getName());
-//    conf.setIfUnset(JobConstants.HADOOP_INPUTDIR, indir);
-//    return conf;
-//  }
-//
-//  private Job createJob(Configuration conf, Schema schema) throws Exception {
-//    Job job = new Job(conf);
-//    ConfigurationUtils.setConnectorSchema(job, schema);
-//    job.getConfiguration().set(JobConstants.INTERMEDIATE_DATA_FORMAT,
-//        CSVIntermediateDataFormat.class.getName());
-//    return job;
-//  }
-//
-//  private void createTextInput(Class<? extends CompressionCodec> clz)
-//      throws IOException, InstantiationException, IllegalAccessException {
-//    Configuration conf = new Configuration();
-//
-//    CompressionCodec codec = null;
-//    String extension = "";
-//    if (clz != null) {
-//      codec = clz.newInstance();
-//      if (codec instanceof Configurable) {
-//        ((Configurable) codec).setConf(conf);
-//      }
-//      extension = codec.getDefaultExtension();
-//    }
-//
-//    int index = 1;
-//    for (int fi=0; fi<NUMBER_OF_FILES; fi++) {
-//      String fileName = indir + "/" + "part-r-" + padZeros(fi, 5) + extension;
-//      OutputStream filestream = FileUtils.create(fileName);
-//      BufferedWriter filewriter;
-//      if (codec != null) {
-//        filewriter = new BufferedWriter(new OutputStreamWriter(
-//            codec.createOutputStream(filestream, codec.createCompressor()),
-//            Data.CHARSET_NAME));
-//      } else {
-//        filewriter = new BufferedWriter(new OutputStreamWriter(
-//            filestream, Data.CHARSET_NAME));
-//      }
-//
-//      for (int ri=0; ri<NUMBER_OF_ROWS_PER_FILE; ri++) {
-//        String row = index + "," + (double)index + ",'" + index + "'";
-//        filewriter.write(row + Data.DEFAULT_RECORD_DELIMITER);
-//        index++;
-//      }
-//
-//      filewriter.close();
-//    }
-//  }
-//
-//  private void createSequenceInput(Class<? extends CompressionCodec> clz)
-//      throws IOException, InstantiationException, IllegalAccessException {
-//    Configuration conf = new Configuration();
-//
-//    CompressionCodec codec = null;
-//    if (clz != null) {
-//      codec = clz.newInstance();
-//      if (codec instanceof Configurable) {
-//        ((Configurable) codec).setConf(conf);
-//      }
-//    }
-//
-//    int index = 1;
-//    for (int fi=0; fi<NUMBER_OF_FILES; fi++) {
-//      Path filepath = new Path(indir,
-//          "part-r-" + padZeros(fi, 5) + HdfsSequenceImportLoader.EXTENSION);
-//      SequenceFile.Writer filewriter;
-//      if (codec != null) {
-//        filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
-//            conf, filepath, Text.class, NullWritable.class,
-//            CompressionType.BLOCK, codec);
-//      } else {
-//        filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
-//            conf, filepath, Text.class, NullWritable.class, CompressionType.NONE);
-//      }
-//
-//      Text text = new Text();
-//      for (int ri=0; ri<NUMBER_OF_ROWS_PER_FILE; ri++) {
-//        String row = index + "," + (double)index + ",'" + index + "'";
-//        text.set(row);
-//        filewriter.append(text, NullWritable.get());
-//        index++;
-//      }
-//
-//      filewriter.close();
-//    }
-//  }
-//
-//  private String padZeros(int number, int digits) {
-//    String string = String.valueOf(number);
-//    for (int i=(digits-string.length()); i>0; i--) {
-//      string = "0" + string;
-//    }
-//    return string;
-//  }
-//
-//  public static class DummyLoader extends Loader {
-//    @Override
-//    public void load(LoaderContext context, Object oc, Object oj) throws Exception {
-//      int index = 1;
-//      int sum = 0;
-//      Object[] array;
-//      while ((array = context.getDataReader().readArrayRecord()) != null) {
-//        sum += Integer.valueOf(array[0].toString());
-//        index++;
-//      };
-//
-//      int numbers = NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE;
-//      assertEquals((1+numbers)*numbers/2, sum);
-//
-//      assertEquals(NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE, index-1);
-//    }
-//  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c0b22b1d/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
deleted file mode 100644
index 8eba049..0000000
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job;
-
-import junit.framework.TestCase;
-
-//import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
-//import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
-//import org.apache.sqoop.job.etl.HdfsTextImportLoader;
-
-public class TestHdfsLoad extends TestCase {
-
-//  private static final String OUTPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/";
-//  private static final String OUTPUT_FILE = "part-r-00000";
-//  private static final int START_ID = 1;
-//  private static final int NUMBER_OF_IDS = 9;
-//  private static final int NUMBER_OF_ROWS_PER_ID = 10;
-//
-//  private String outdir;
-//
-//  public TestHdfsLoad() {
-//    outdir = OUTPUT_ROOT + "/" + getClass().getSimpleName();
-//  }
-//
-//  public void testUncompressedText() throws Exception {
-//    FileUtils.delete(outdir);
-//
-//    Configuration conf = new Configuration();
-//    ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
-//    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
-//    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
-//    conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
-//    conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
-//      CSVIntermediateDataFormat.class.getName());
-//    conf.set(JobConstants.HADOOP_OUTDIR, outdir);
-//    Schema schema = new Schema("Test");
-//    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
-//      .addColumn(new org.apache.sqoop.schema.type.Text("3"));
-//
-//    Job job = new Job(conf);
-//    ConfigurationUtils.setConnectorSchema(job, schema);
-//    JobUtils.runJob(job.getConfiguration());
-//
-//    String fileName = outdir + "/" +  OUTPUT_FILE;
-//    InputStream filestream = FileUtils.open(fileName);
-//    BufferedReader filereader = new BufferedReader(new InputStreamReader(
-//        filestream, Charsets.UTF_8));
-//    verifyOutputText(filereader);
-//  }
-//
-//  public void testCompressedText() throws Exception {
-//    FileUtils.delete(outdir);
-//
-//    Configuration conf = new Configuration();
-//    ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
-//    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
-//    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
-//    conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
-//    conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
-//      CSVIntermediateDataFormat.class.getName());
-//    conf.set(JobConstants.HADOOP_OUTDIR, outdir);
-//    conf.setBoolean(JobConstants.HADOOP_COMPRESS, true);
-//
-//    Schema schema = new Schema("Test");
-//    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
-//      .addColumn(new org.apache.sqoop.schema.type.Text("3"));
-//
-//    Job job = new Job(conf);
-//    ConfigurationUtils.setConnectorSchema(job, schema);
-//    JobUtils.runJob(job.getConfiguration());
-//
-//    Class<? extends CompressionCodec> codecClass = conf.getClass(
-//        JobConstants.HADOOP_COMPRESS_CODEC, SqoopFileOutputFormat.DEFAULT_CODEC)
-//        .asSubclass(CompressionCodec.class);
-//    CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
-//    String fileName = outdir + "/" +  OUTPUT_FILE + codec.getDefaultExtension();
-//    InputStream filestream = codec.createInputStream(FileUtils.open(fileName));
-//    BufferedReader filereader = new BufferedReader(new InputStreamReader(
-//        filestream, Charsets.UTF_8));
-//    verifyOutputText(filereader);
-//  }
-//
-//  private void verifyOutputText(BufferedReader reader) throws IOException {
-//    String actual = null;
-//    String expected;
-//    Data data = new Data();
-//    int index = START_ID*NUMBER_OF_ROWS_PER_ID;
-//    while ((actual = reader.readLine()) != null){
-//      data.setContent(new Object[] {
-//        index, (double) index, new String(new byte[] {(byte)(index + 127)}, Charsets.ISO_8859_1) },
-//          Data.ARRAY_RECORD);
-//      expected = data.toString();
-//      index++;
-//
-//      assertEquals(expected, actual);
-//    }
-//    reader.close();
-//
-//    assertEquals(NUMBER_OF_IDS*NUMBER_OF_ROWS_PER_ID,
-//        index-START_ID*NUMBER_OF_ROWS_PER_ID);
-//  }
-//
-//  public void testUncompressedSequence() throws Exception {
-//    FileUtils.delete(outdir);
-//
-//    Configuration conf = new Configuration();
-//    ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
-//    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
-//    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
-//    conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
-//    conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
-//      CSVIntermediateDataFormat.class.getName());
-//    conf.set(JobConstants.HADOOP_OUTDIR, outdir);
-//
-//    Schema schema = new Schema("Test");
-//    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
-//      .addColumn(new org.apache.sqoop.schema.type.Text("3"));
-//
-//    Job job = new Job(conf);
-//    ConfigurationUtils.setConnectorSchema(job, schema);
-//    JobUtils.runJob(job.getConfiguration());
-//
-//    Path filepath = new Path(outdir,
-//        OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION);
-//    SequenceFile.Reader filereader = new SequenceFile.Reader(
-//      filepath.getFileSystem(conf), filepath, conf);
-//    verifyOutputSequence(filereader);
-//  }
-//
-//  public void testCompressedSequence() throws Exception {
-//    FileUtils.delete(outdir);
-//
-//    Configuration conf = new Configuration();
-//    ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
-//    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
-//    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
-//    conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
-//    conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
-//      CSVIntermediateDataFormat.class.getName());
-//    conf.set(JobConstants.HADOOP_OUTDIR, outdir);
-//    conf.setBoolean(JobConstants.HADOOP_COMPRESS, true);
-//
-//    Schema schema = new Schema("Test");
-//    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
-//      .addColumn(new org.apache.sqoop.schema.type.Text("3"));
-//
-//    Job job = new Job(conf);
-//    ConfigurationUtils.setConnectorSchema(job, schema);
-//    JobUtils.runJob(job.getConfiguration());
-//    Path filepath = new Path(outdir,
-//        OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION);
-//    SequenceFile.Reader filereader = new SequenceFile.Reader(filepath.getFileSystem(conf), filepath, conf);
-//    verifyOutputSequence(filereader);
-//  }
-//
-//  private void verifyOutputSequence(SequenceFile.Reader reader) throws IOException {
-//    int index = START_ID*NUMBER_OF_ROWS_PER_ID;
-//    Text actual = new Text();
-//    Text expected = new Text();
-//    Data data = new Data();
-//    while (reader.next(actual)){
-//      data.setContent(new Object[] {
-//          index, (double) index, new String(new byte[] {(byte)(index + 127)}, Charsets.ISO_8859_1) },
-//          Data.ARRAY_RECORD);
-//      expected.set(data.toString());
-//      index++;
-//
-//      assertEquals(expected.toString(), actual.toString());
-//    }
-//    reader.close();
-//
-//    assertEquals(NUMBER_OF_IDS*NUMBER_OF_ROWS_PER_ID,
-//        index-START_ID*NUMBER_OF_ROWS_PER_ID);
-//  }
-//
-//  public static class DummyPartition extends Partition {
-//    private int id;
-//
-//    public void setId(int id) {
-//      this.id = id;
-//    }
-//
-//    public int getId() {
-//      return id;
-//    }
-//
-//    @Override
-//    public void readFields(DataInput in) throws IOException {
-//      id = in.readInt();
-//    }
-//
-//    @Override
-//    public void write(DataOutput out) throws IOException {
-//      out.writeInt(id);
-//    }
-//
-//    @Override
-//    public String toString() {
-//      return Integer.toString(id);
-//    }
-//  }
-//
-//  public static class DummyPartitioner extends Partitioner {
-//    @Override
-//    public List<Partition> getPartitions(PartitionerContext context, Object oc, Object oj) {
-//      List<Partition> partitions = new LinkedList<Partition>();
-//      for (int id = START_ID; id <= NUMBER_OF_IDS; id++) {
-//        DummyPartition partition = new DummyPartition();
-//        partition.setId(id);
-//        partitions.add(partition);
-//      }
-//      return partitions;
-//    }
-//  }
-//
-//  public static class DummyExtractor extends Extractor {
-//    @Override
-//    public void extract(ExtractorContext context, Object oc, Object oj, Object partition) {
-//      int id = ((DummyPartition)partition).getId();
-//      for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) {
-//        Object[] array = new Object[] {
-//          id * NUMBER_OF_ROWS_PER_ID + row,
-//          (double) (id * NUMBER_OF_ROWS_PER_ID + row),
-//          new String(new byte[]{(byte)(id * NUMBER_OF_ROWS_PER_ID + row + 127)}, Charsets.ISO_8859_1)
-//        };
-//        context.getDataWriter().writeArrayRecord(array);
-//      }
-//    }
-//
-//    @Override
-//    public long getRowsRead() {
-//      return NUMBER_OF_ROWS_PER_ID;
-//    }
-//  }
-}


Mime
View raw message