sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject git commit: SQOOP-927: Sqoop2: Integration: Mapreduce specific tests should be running on MiniCluster
Date Tue, 22 Oct 2013 23:42:58 GMT
Updated Branches:
  refs/heads/sqoop2 9addddfe3 -> 39eb1e56d


SQOOP-927: Sqoop2: Integration: Mapreduce specific tests should be running on MiniCluster

(Abraham Elmahrek via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/39eb1e56
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/39eb1e56
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/39eb1e56

Branch: refs/heads/sqoop2
Commit: 39eb1e56d3898ed5663a58b319e5055109d3b7d2
Parents: 9addddf
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Tue Oct 22 16:41:34 2013 -0700
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Tue Oct 22 16:41:34 2013 -0700

----------------------------------------------------------------------
 pom.xml                                         |   7 +-
 .../apache/sqoop/test/asserts/HdfsAsserts.java  |  20 ++-
 .../sqoop/test/hadoop/HadoopLocalRunner.java    |  50 ++++++++
 .../test/hadoop/HadoopMiniClusterRunner.java    |  93 ++++++++++++++
 .../apache/sqoop/test/hadoop/HadoopRunner.java  | 128 +++++++++++++++++++
 .../sqoop/test/hadoop/HadoopRunnerFactory.java  |  38 ++++++
 .../sqoop/test/testcases/ConnectorTestCase.java |  17 +++
 .../sqoop/test/testcases/TomcatTestCase.java    |  63 +++++++--
 .../org/apache/sqoop/test/utils/HdfsUtils.java  |  58 ++++++---
 9 files changed, 432 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/39eb1e56/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 20d5e4e..4470331 100644
--- a/pom.xml
+++ b/pom.xml
@@ -164,6 +164,11 @@ limitations under the License.
             <version>${hadoop.1.version}</version>
           </dependency>
 
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-minicluster</artifactId>
+            <version>${hadoop.1.version}</version>
+          </dependency>
         </dependencies>
       </dependencyManagement>
     </profile>
@@ -485,7 +490,7 @@ limitations under the License.
           <version>2.12</version>
           <configuration>
             <forkMode>always</forkMode>
-            <forkedProcessTimeoutInSeconds>900</forkedProcessTimeoutInSeconds>
+            <forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds>
             <redirectTestOutputToFile>true</redirectTestOutputToFile>
             <argLine>-Xms256m -Xmx1g</argLine>
           </configuration>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39eb1e56/test/src/main/java/org/apache/sqoop/test/asserts/HdfsAsserts.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/asserts/HdfsAsserts.java b/test/src/main/java/org/apache/sqoop/test/asserts/HdfsAsserts.java
index c175272..d8f2b8d 100644
--- a/test/src/main/java/org/apache/sqoop/test/asserts/HdfsAsserts.java
+++ b/test/src/main/java/org/apache/sqoop/test/asserts/HdfsAsserts.java
@@ -18,12 +18,14 @@
 package org.apache.sqoop.test.asserts;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.test.utils.HdfsUtils;
 
 import java.io.BufferedReader;
-import java.io.FileReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -35,8 +37,6 @@ import static org.junit.Assert.fail;
 
 /**
  * Assert methods suitable for checking HDFS files and directories.
- *
- * TODO: This module will require clean up to work on MiniCluster/Real cluster.
  */
 public class HdfsAsserts {
 
@@ -49,15 +49,13 @@ public class HdfsAsserts {
    * @param lines Expected lines
    * @throws IOException
    */
-  public static void assertMapreduceOutput(String directory, String... lines) throws IOException
{
+  public static void assertMapreduceOutput(FileSystem fs, String directory, String... lines)
throws IOException {
     Set<String> setLines = new HashSet<String>(Arrays.asList(lines));
     List<String> notFound = new LinkedList<String>();
 
-    String []files = HdfsUtils.getOutputMapreduceFiles(directory);
-
-    for(String file : files) {
-      String filePath = directory + "/" + file;
-      BufferedReader br = new BufferedReader(new FileReader((filePath)));
+    Path[] files = HdfsUtils.getOutputMapreduceFiles(fs, directory);
+    for(Path file : files) {
+      BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(file)));
 
       String line;
       while ((line = br.readLine()) != null) {
@@ -83,8 +81,8 @@ public class HdfsAsserts {
    * @param directory Mapreduce output directory
    * @param expectedFiles Expected number of files
    */
-  public static void assertMapreduceOutputFiles(String directory, int expectedFiles) {
-    String []files = HdfsUtils.getOutputMapreduceFiles(directory);
+  public static void assertMapreduceOutputFiles(FileSystem fs, String directory, int expectedFiles)
throws IOException {
+    Path[] files = HdfsUtils.getOutputMapreduceFiles(fs, directory);
     assertEquals(expectedFiles, files.length);
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39eb1e56/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopLocalRunner.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopLocalRunner.java b/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopLocalRunner.java
new file mode 100644
index 0000000..44465b4
--- /dev/null
+++ b/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopLocalRunner.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.test.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.test.utils.HdfsUtils;
+
+/**
+ * Represents a local cluster.
+ * It uses an unchanged Configuration object.
+ * HadoopRunner implementation that is using LocalJobRunner for executing mapreduce jobs
and local filesystem instead of HDFS.
+ */
+public class HadoopLocalRunner extends HadoopRunner {
+
+  @Override
+  public Configuration prepareConfiguration(Configuration conf) {
+    return conf;
+  }
+
+  @Override
+  public void start() throws Exception {
+    // Do nothing!
+  }
+
+  @Override
+  public void stop() throws Exception {
+    // Do nothing!
+  }
+
+  @Override
+  public String getTestDirectory() {
+    return HdfsUtils.joinPathFragments(getTemporaryPath(), "/mapreduce-job-io");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39eb1e56/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopMiniClusterRunner.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopMiniClusterRunner.java
b/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopMiniClusterRunner.java
new file mode 100644
index 0000000..b06dcab
--- /dev/null
+++ b/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopMiniClusterRunner.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.test.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.log4j.Logger;
+
+/**
+ * Represents a minicluster setup.
+ * It creates a configuration object and mutates it.
+ * Clients that need to connect to the miniclusters should use
+ * the provided configuration object.
+ */
+public class HadoopMiniClusterRunner extends HadoopRunner {
+  private static final Logger LOG = Logger.getLogger(HadoopMiniClusterRunner.class);
+
+  /**
+   * Hadoop HDFS cluster
+   */
+  protected MiniDFSCluster dfsCluster;
+
+  /**
+   * Hadoop MR cluster
+   */
+  protected MiniMRCluster mrCluster;
+
+  @Override
+  public Configuration prepareConfiguration(Configuration config) {
+    config.set("dfs.block.access.token.enable", "false");
+    config.set("dfs.permissions", "true");
+    config.set("hadoop.security.authentication", "simple");
+    config.set("mapred.tasktracker.map.tasks.maximum", "1");
+    config.set("mapred.tasktracker.reduce.tasks.maximum", "1");
+    config.set("mapred.submit.replication", "1");
+    config.set("yarn.resourcemanager.scheduler.class", "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
+    config.set("yarn.application.classpath", System.getProperty("java.class.path"));
+    return config;
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public void start() throws Exception {
+    System.setProperty("test.build.data", getDataDir());
+    LOG.info("test.build.data set to: " + getDataDir());
+
+    System.setProperty("hadoop.log.dir", getLogDir());
+    LOG.info("log dir set to: " + getLogDir());
+
+    // Start DFS server
+    LOG.info("Starting DFS cluster...");
+    dfsCluster = new MiniDFSCluster(config, 1, true, null);
+    if (dfsCluster.isClusterUp()) {
+      LOG.info("Started DFS cluster on port: " + dfsCluster.getNameNodePort());
+    } else {
+      LOG.error("Could not start DFS cluster");
+    }
+
+    // Start MR server
+    LOG.info("Starting MR cluster");
+    mrCluster = new MiniMRCluster(0, 0, 1, dfsCluster.getFileSystem().getUri().toString(),
1, null, null, null, new JobConf(config));
+    LOG.info("Started MR cluster");
+    config = prepareConfiguration(mrCluster.createJobConf());
+  }
+
+  @Override
+  public void stop() throws Exception {
+    LOG.info("Stopping MR cluster");
+    mrCluster.shutdown();
+    LOG.info("Stopped MR cluster");
+
+    LOG.info("Stopping DFS cluster");
+    dfsCluster.shutdown();
+    LOG.info("Stopped DFS cluster");
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39eb1e56/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopRunner.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopRunner.java b/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopRunner.java
new file mode 100644
index 0000000..2516ff1
--- /dev/null
+++ b/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopRunner.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.test.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.test.utils.HdfsUtils;
+
+/**
+ * Hadoop cluster runner for testing purpose.
+ *
+ * Runner provides methods for bootstrapping and using Hadoop cluster.
+ * This abstract implementation is agnostic about in what mode Hadoop is running.
+ * Each mode will have it's own concrete implementation (for example LocalJobRunner, MiniCluster
or Real existing cluster).
+ */
+public abstract class HadoopRunner {
+
+  /**
+   * Temporary path that can be used as a root for other directories storing various data
like logs or stored HDFS files.
+   */
+  private String temporaryPath;
+
+  /**
+   * Configuration object for Hadoop.
+   */
+  protected Configuration config = null;
+
+  /**
+   * Prepare configuration object.
+   * This method should be called once before the start method is called.
+   *
+   * @param config is the configuration object to prepare.
+   */
+  abstract public Configuration prepareConfiguration(Configuration config);
+
+  /**
+   * Start hadoop cluster.
+   *
+   * @throws Exception
+   */
+  abstract public void start() throws Exception;
+
+  /**
+   * Stop hadoop cluster.
+   *
+   * @throws Exception
+   */
+  abstract public void stop() throws Exception;
+
+  /**
+   * Return working directory on HDFS instance that this HadoopRunner is using.
+   *
+   * This directory might be on local filesystem in case of local mode.
+   */
+  public String getTestDirectory() {
+    return "/mapreduce-job-io";
+  }
+
+  /**
+   * Get temporary path.
+   *
+   * @return
+   */
+  public String getTemporaryPath() {
+    return temporaryPath;
+  }
+
+  /**
+   * Set temporary path.
+   *
+   * @param temporaryPath
+   */
+  public void setTemporaryPath(String temporaryPath) {
+    this.temporaryPath = temporaryPath;
+  }
+
+  /**
+   * Return directory on local filesystem where logs and other
+   * data generated by the Hadoop Cluster should be stored.
+   *
+   * @return
+   */
+  public String getDataDir() {
+    return HdfsUtils.joinPathFragments(temporaryPath, "data");
+  }
+
+  /**
+   * Return directory on local filesystem where logs and other
+   * data generated by the Hadoop Cluster should be stored.
+   *
+   * @return
+   */
+  public String getLogDir() {
+    return HdfsUtils.joinPathFragments(temporaryPath, "log");
+  }
+
+  /**
+   * Get hadoop configuration.
+   *
+   * @return
+   */
+  public Configuration getConfiguration() {
+    return config;
+  }
+
+  /**
+   * Set the configuration object that should be used with Miniclusters.
+   *
+   * @param config
+   */
+  public void setConfiguration(Configuration config) {
+    this.config = config;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39eb1e56/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopRunnerFactory.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopRunnerFactory.java b/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopRunnerFactory.java
new file mode 100644
index 0000000..020fa3f
--- /dev/null
+++ b/test/src/main/java/org/apache/sqoop/test/hadoop/HadoopRunnerFactory.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.test.hadoop;
+
+import java.util.Properties;
+
+/**
+ * Create database provider.
+ */
+public class HadoopRunnerFactory {
+
+  public static final String CLUSTER_CLASS_PROPERTY = "sqoop.hadoop.runner.class";
+
+  public static HadoopRunner getHadoopCluster(Properties properties, Class<? extends HadoopRunner>
defaultClusterClass) throws ClassNotFoundException, IllegalAccessException, InstantiationException
{
+    String className = properties.getProperty(CLUSTER_CLASS_PROPERTY);
+    if(className == null) {
+      return defaultClusterClass.newInstance();
+    }
+
+    Class<?> klass = Class.forName(className);
+    return (HadoopRunner)klass.newInstance();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39eb1e56/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java
index d10b942..5ec4fa4 100644
--- a/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java
+++ b/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java
@@ -17,6 +17,8 @@
  */
 package org.apache.sqoop.test.testcases;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.client.SubmissionCallback;
 import org.apache.sqoop.framework.configuration.OutputFormat;
@@ -31,6 +33,8 @@ import org.apache.sqoop.test.data.Cities;
 import org.apache.sqoop.test.data.UbuntuReleases;
 import org.apache.sqoop.test.db.DatabaseProvider;
 import org.apache.sqoop.test.db.DatabaseProviderFactory;
+import org.apache.sqoop.test.hadoop.HadoopMiniClusterRunner;
+import org.apache.sqoop.test.hadoop.HadoopRunnerFactory;
 import org.apache.sqoop.validation.Status;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -71,6 +75,19 @@ abstract public class ConnectorTestCase extends TomcatTestCase {
   };
 
   @BeforeClass
+  public static void startHadoop() throws Exception {
+    // Start Hadoop Clusters
+    hadoopCluster = HadoopRunnerFactory.getHadoopCluster(System.getProperties(), HadoopMiniClusterRunner.class);
+    hadoopCluster.setTemporaryPath(TMP_PATH_BASE);
+    hadoopCluster.setConfiguration( hadoopCluster.prepareConfiguration(new JobConf()) );
+    hadoopCluster.start();
+
+    // Initialize Hdfs Client
+    hdfsClient = FileSystem.get(hadoopCluster.getConfiguration());
+    LOG.debug("HDFS Client: " + hdfsClient);
+  }
+
+  @BeforeClass
   public static void startProvider() throws Exception {
     provider = DatabaseProviderFactory.getProvider(System.getProperties());
     LOG.info("Starting database provider: " + provider.getClass().getName());

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39eb1e56/test/src/main/java/org/apache/sqoop/test/testcases/TomcatTestCase.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/TomcatTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/TomcatTestCase.java
index efdfed4..7e2558f 100644
--- a/test/src/main/java/org/apache/sqoop/test/testcases/TomcatTestCase.java
+++ b/test/src/main/java/org/apache/sqoop/test/testcases/TomcatTestCase.java
@@ -17,24 +17,30 @@
  */
 package org.apache.sqoop.test.testcases;
 
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.client.SqoopClient;
 import org.apache.sqoop.test.asserts.HdfsAsserts;
+import org.apache.sqoop.test.hadoop.HadoopRunner;
+import org.apache.sqoop.test.hadoop.HadoopRunnerFactory;
+import org.apache.sqoop.test.hadoop.HadoopLocalRunner;
 import org.apache.sqoop.test.minicluster.TomcatSqoopMiniCluster;
 import org.apache.sqoop.test.utils.HdfsUtils;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.rules.TestName;
 
-import java.io.IOException;
-
 /**
  * Basic test case that will bootstrap Sqoop server running in external Tomcat
  * process.
  */
 abstract public class TomcatTestCase {
-
   private static final Logger LOG = Logger.getLogger(TomcatTestCase.class);
 
   @Rule public TestName name = new TestName();
@@ -47,7 +53,7 @@ abstract public class TomcatTestCase {
    * pick up configured java.io.tmpdir value. The last results is /tmp/ directory
    * in case that no property is set.
    */
-  private static final String TMP_PATH_BASE =
+  protected static final String TMP_PATH_BASE =
     System.getProperty("sqoop.integration.tmpdir", System.getProperty("java.io.tmpdir", "/tmp"))
+ "/sqoop-cargo-tests/";
 
   /**
@@ -62,6 +68,16 @@ abstract public class TomcatTestCase {
   private String tmpPath;
 
   /**
+   * Hadoop cluster
+   */
+  protected static HadoopRunner hadoopCluster;
+
+  /**
+   * Hadoop client
+   */
+  protected static FileSystem hdfsClient;
+
+  /**
    * Tomcat based Sqoop mini cluster
    */
   private TomcatSqoopMiniCluster cluster;
@@ -71,13 +87,27 @@ abstract public class TomcatTestCase {
    */
   private SqoopClient client;
 
+  @BeforeClass
+  public static void startHadoop() throws Exception {
+    // Start Hadoop Clusters
+    hadoopCluster = HadoopRunnerFactory.getHadoopCluster(System.getProperties(), HadoopLocalRunner.class);
+    hadoopCluster.setTemporaryPath(TMP_PATH_BASE);
+    hadoopCluster.setConfiguration( hadoopCluster.prepareConfiguration(new JobConf()) );
+    hadoopCluster.start();
+
+    // Initialize Hdfs Client
+    hdfsClient = FileSystem.get(hadoopCluster.getConfiguration());
+    LOG.debug("HDFS Client: " + hdfsClient);
+  }
+
   @Before
   public void startServer() throws Exception {
-    // Set up the temporary path
-    tmpPath = TMP_PATH_BASE + getClass().getName() + "/" + name.getMethodName() + "/";
+    // Get and set temporary path in hadoop cluster.
+    tmpPath = HdfsUtils.joinPathFragments(TMP_PATH_BASE, getClass().getName(), name.getMethodName());
+    LOG.debug("Temporary Directory: " + tmpPath);
 
-    // Set up and start server
-    cluster = new TomcatSqoopMiniCluster(getTemporaryPath());
+    // Start server
+    cluster = new TomcatSqoopMiniCluster(tmpPath, hadoopCluster.getConfiguration());
     cluster.start();
 
     // Initialize Sqoop Client API
@@ -89,6 +119,11 @@ abstract public class TomcatTestCase {
     cluster.stop();
   }
 
+  @AfterClass
+  public static void stopHadoop() throws Exception {
+    hadoopCluster.stop();
+  }
+
   /**
    * Return SqoopClient configured to talk to testing server.
    *
@@ -112,12 +147,12 @@ abstract public class TomcatTestCase {
   }
 
   /**
-   * Get input/output directory for mapreduce job.
+   * Return mapreduce base directory.
    *
    * @return
    */
   public String getMapreduceDirectory() {
-    return getTemporaryPath() + "/mapreduce-job-io";
+    return HdfsUtils.joinPathFragments(hadoopCluster.getTestDirectory(), getClass().getName(),
name.getMethodName());
   }
 
   /**
@@ -130,7 +165,7 @@ abstract public class TomcatTestCase {
    * @throws IOException
    */
   protected void assertMapreduceOutput(String... lines) throws IOException {
-    HdfsAsserts.assertMapreduceOutput(getMapreduceDirectory(), lines);
+    HdfsAsserts.assertMapreduceOutput(hdfsClient, getMapreduceDirectory(), lines);
   }
 
   /**
@@ -138,8 +173,8 @@ abstract public class TomcatTestCase {
    *
    * @param expectedFiles Expected number of files
    */
-  protected void assertMapreduceOutputFiles(int expectedFiles) {
-    HdfsAsserts.assertMapreduceOutputFiles(getMapreduceDirectory(), expectedFiles);
+  protected void assertMapreduceOutputFiles(int expectedFiles) throws IOException {
+    HdfsAsserts.assertMapreduceOutputFiles(hdfsClient, getMapreduceDirectory(), expectedFiles);
   }
 
   /**
@@ -150,6 +185,6 @@ abstract public class TomcatTestCase {
    * @throws IOException
    */
   protected void createInputMapreduceFile(String filename, String...lines) throws IOException
{
-    HdfsUtils.createFile(getMapreduceDirectory(), filename, lines);
+    HdfsUtils.createFile(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(),
filename), lines);
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/39eb1e56/test/src/main/java/org/apache/sqoop/test/utils/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/utils/HdfsUtils.java b/test/src/main/java/org/apache/sqoop/test/utils/HdfsUtils.java
index 95dd177..59c5f15 100644
--- a/test/src/main/java/org/apache/sqoop/test/utils/HdfsUtils.java
+++ b/test/src/main/java/org/apache/sqoop/test/utils/HdfsUtils.java
@@ -17,50 +17,76 @@
  */
 package org.apache.sqoop.test.utils;
 
-import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
-import java.io.File;
-import java.io.FilenameFilter;
+import java.io.BufferedWriter;
+import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.Arrays;
+import java.io.OutputStreamWriter;
+import java.util.LinkedList;
 
 /**
  * Handy utilities to work with HDFS
- *
- * TODO: This module will require clean up to work on MiniCluster/Real cluster.
  */
 public class HdfsUtils {
 
+  @SuppressWarnings("unused")
   private static final Logger LOG = Logger.getLogger(HdfsUtils.class);
 
+  private static final char PATH_SEPARATOR = '/';
+
   /**
    * Get list of mapreduce output files from given directory.
    *
    * @param directory Directory to be searched for files generated by MR
    * @return
+   * @throws IOException
+   * @throws FileNotFoundException
    */
-  public static String [] getOutputMapreduceFiles(String directory) {
-    File dir = new File(directory);
-    return dir.list(new FilenameFilter() {
-      @Override
-      public boolean accept(File dir, String name) {
-        return name.startsWith("part-");
+  public static Path [] getOutputMapreduceFiles(FileSystem fs, String directory) throws FileNotFoundException,
IOException {
+    LinkedList<Path> files = new LinkedList<Path>();
+    for (FileStatus fileStatus : fs.listStatus(new Path(directory))) {
+      if (fileStatus.getPath().getName().startsWith("part-")) {
+        files.add(fileStatus.getPath());
       }
-    });
+    }
+    return files.toArray(new Path[files.size()]);
   }
 
   /**
    * Create HDFS file with given content.
    *
+   * @param fs filesystem object
    * @param directory Directory where the file should be created
    * @param filename File name
    * @param lines Individual lines that should be written into the file
    * @throws IOException
    */
-  public static void createFile(String directory, String filename, String ...lines) throws
IOException {
-    File outputFile = new File(directory, filename);
-    FileUtils.writeLines(outputFile, Arrays.asList(lines));
+  public static void createFile(FileSystem fs, String path, String ...lines) throws IOException
{
+    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(path),
true)));
+    for (String line : lines) {
+      writer.write(line);
+      writer.newLine();
+    }
+    writer.close();
+  }
+
+  /**
+   * Join several path fragments together.
+   * @param paths
+   */
+  public static String joinPathFragments(String ...paths){
+    StringBuilder builder = new StringBuilder();
+    for (String path : paths) {
+      builder.append(path);
+      if (path.charAt(path.length() - 1) != PATH_SEPARATOR) {
+        builder.append(PATH_SEPARATOR);
+      }
+    }
+    return builder.toString();
   }
 
   private HdfsUtils() {


Mime
View raw message