sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject sqoop git commit: SQOOP-1738: Sqoop2: HDFS Connector : Check for output directory
Date Tue, 17 Mar 2015 19:40:14 GMT
Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 6ca31c505 -> f98fc2885


SQOOP-1738: Sqoop2: HDFS Connector : Check for output directory

(Jarek Jarcec Cecho via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/f98fc288
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/f98fc288
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/f98fc288

Branch: refs/heads/sqoop2
Commit: f98fc28859fa58037f44017fb9934a403567442d
Parents: 6ca31c5
Author: Abraham Elmahrek <abe@apache.org>
Authored: Tue Mar 17 12:39:30 2015 -0700
Committer: Abraham Elmahrek <abe@apache.org>
Committed: Tue Mar 17 12:39:30 2015 -0700

----------------------------------------------------------------------
 .../sqoop/error/code/HdfsConnectorError.java    |   4 +-
 .../sqoop/connector/hdfs/HdfsToInitializer.java |  28 +++
 .../sqoop/connector/hdfs/TestToInitializer.java |  67 +++++++
 .../connector/hdfs/OutputDirectoryTest.java     | 177 +++++++++++++++++++
 4 files changed, 275 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/f98fc288/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java b/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java
index 8514541..c85e7fc 100644
--- a/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java
+++ b/common/src/main/java/org/apache/sqoop/error/code/HdfsConnectorError.java
@@ -32,7 +32,9 @@ public enum HdfsConnectorError implements ErrorCode{
   GENERIC_HDFS_CONNECTOR_0004("Unable to instantiate the specified class"),
   /** Error occurs during loader run */
   GENERIC_HDFS_CONNECTOR_0005("Error occurs during loader run"),
-  GENERIC_HDFS_CONNECTOR_0006("Unknown job type")
+  GENERIC_HDFS_CONNECTOR_0006("Unknown job type"),
+
+  GENERIC_HDFS_CONNECTOR_0007("Invalid output directory"),
 
   ;
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f98fc288/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
index ad500c2..83bac27 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
@@ -18,11 +18,18 @@
 package org.apache.sqoop.connector.hdfs;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
+import org.apache.sqoop.error.code.HdfsConnectorError;
 import org.apache.sqoop.job.etl.Initializer;
 import org.apache.sqoop.job.etl.InitializerContext;
 
+import java.io.IOException;
+
 public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfiguration>
{
   /**
    * Initialize new submission based on given configuration properties. Any
@@ -37,5 +44,26 @@ public class HdfsToInitializer extends Initializer<LinkConfiguration,
ToJobConfi
   public void initialize(InitializerContext context, LinkConfiguration linkConfig, ToJobConfiguration
jobConfig) {
     Configuration configuration = HdfsUtils.createConfiguration(linkConfig);
     HdfsUtils.configurationToContext(configuration, context.getContext());
+
+    // Verification that given HDFS directory either don't exists or is empty
+    try {
+      FileSystem fs = FileSystem.get(configuration);
+      Path path = new Path(jobConfig.toJobConfig.outputDirectory);
+
+      if(fs.exists(path)) {
+        if(fs.isFile(path)) {
+          throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Output
directory already exists and is a file");
+        }
+
+        if(fs.isDirectory(path)) {
+          FileStatus[] fileStatuses = fs.listStatus(path);
+          if(fileStatuses.length != 0) {
+            throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Output
directory is not empty");
+          }
+        }
+      }
+    } catch (IOException e) {
+      throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Unexpected
exception", e);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f98fc288/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
new file mode 100644
index 0000000..1daa25a
--- /dev/null
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import com.google.common.io.Files;
+import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.InitializerContext;
+import org.testng.annotations.Test;
+
+import java.io.File;
+
+/**
+ *
+ */
+public class TestToInitializer extends TestHdfsBase {
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testOutputDirectoryIsAFile() throws Exception {
+    File file = File.createTempFile("MastersOfOrion", ".txt");
+    file.createNewFile();
+
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    ToJobConfiguration jobConfig = new ToJobConfiguration();
+
+    jobConfig.toJobConfig.outputDirectory = file.getAbsolutePath();
+
+    InitializerContext initializerContext = new InitializerContext(new MutableMapContext());
+
+    Initializer initializer = new HdfsToInitializer();
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testOutputDirectoryIsNotEmpty() throws Exception {
+    File dir = Files.createTempDir();
+    File file = File.createTempFile("MastersOfOrion", ".txt", dir);
+
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    ToJobConfiguration jobConfig = new ToJobConfiguration();
+
+    jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath();
+
+    InitializerContext initializerContext = new InitializerContext(new MutableMapContext());
+
+    Initializer initializer = new HdfsToInitializer();
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f98fc288/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java
b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java
new file mode 100644
index 0000000..b454263
--- /dev/null
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/OutputDirectoryTest.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.integration.connector.hdfs;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.client.ClientError;
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
+import org.apache.sqoop.error.code.HdfsConnectorError;
+import org.apache.sqoop.model.MConfigList;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.test.testcases.ConnectorTestCase;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ */
+public class OutputDirectoryTest extends ConnectorTestCase {
+  @Test
+  public void testOutputDirectoryIsAFile() throws Exception {
+    createAndLoadTableCities();
+
+    hdfsClient.createNewFile(new Path(getMapreduceDirectory()));
+
+    // RDBMS link
+    MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector");
+    fillRdbmsLinkConfig(rdbmsConnection);
+    saveLink(rdbmsConnection);
+
+    // HDFS link
+    MLink hdfsConnection = getClient().createLink("hdfs-connector");
+    fillHdfsLink(hdfsConnection);
+    saveLink(hdfsConnection);
+
+    // Job creation
+    MJob job = getClient().createJob(rdbmsConnection.getPersistenceId(), hdfsConnection.getPersistenceId());
+
+    // Set rdbms "FROM" config
+    MConfigList fromConfig = job.getJobConfig(Direction.FROM);
+    fromConfig.getStringInput("fromJobConfig.tableName").setValue(provider.escapeTableName(getTableName()));
+    fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id"));
+
+    // fill the hdfs "TO" config
+    fillHdfsToConfig(job, ToFormat.TEXT_FILE);
+
+    saveJob(job);
+
+    assertJobSubmissionFailure(job,
+      HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007.toString(),
+      "is a file"
+    );
+
+    dropTable();
+  }
+
+  @Test
+  public void testOutputDirectoryIsNotEmpty() throws Exception {
+    createAndLoadTableCities();
+
+    hdfsClient.mkdirs(new Path(getMapreduceDirectory()));
+    hdfsClient.createNewFile(new Path(getMapreduceDirectory() + "/x"));
+
+    // RDBMS link
+    MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector");
+    fillRdbmsLinkConfig(rdbmsConnection);
+    saveLink(rdbmsConnection);
+
+    // HDFS link
+    MLink hdfsConnection = getClient().createLink("hdfs-connector");
+    fillHdfsLink(hdfsConnection);
+    saveLink(hdfsConnection);
+
+    // Job creation
+    MJob job = getClient().createJob(rdbmsConnection.getPersistenceId(), hdfsConnection.getPersistenceId());
+
+    // Set rdbms "FROM" config
+    MConfigList fromConfig = job.getJobConfig(Direction.FROM);
+    fromConfig.getStringInput("fromJobConfig.tableName").setValue(provider.escapeTableName(getTableName()));
+    fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id"));
+
+    // fill the hdfs "TO" config
+    fillHdfsToConfig(job, ToFormat.TEXT_FILE);
+
+    saveJob(job);
+
+    assertJobSubmissionFailure(job,
+      HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007.toString(),
+      "is not empty"
+    );
+
+    dropTable();
+  }
+
+  @Test
+  public void testOutputDirectoryIsEmpty() throws Exception {
+    createAndLoadTableCities();
+
+    hdfsClient.mkdirs(new Path(getMapreduceDirectory()));
+
+    // RDBMS link
+    MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector");
+    fillRdbmsLinkConfig(rdbmsConnection);
+    saveLink(rdbmsConnection);
+
+    // HDFS link
+    MLink hdfsConnection = getClient().createLink("hdfs-connector");
+    fillHdfsLink(hdfsConnection);
+    saveLink(hdfsConnection);
+
+    // Job creation
+    MJob job = getClient().createJob(rdbmsConnection.getPersistenceId(), hdfsConnection.getPersistenceId());
+
+    // Set rdbms "FROM" config
+    MConfigList fromConfig = job.getJobConfig(Direction.FROM);
+    fromConfig.getStringInput("fromJobConfig.tableName").setValue(provider.escapeTableName(getTableName()));
+    fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id"));
+
+    // fill the hdfs "TO" config
+    fillHdfsToConfig(job, ToFormat.TEXT_FILE);
+
+    saveJob(job);
+
+    executeJob(job);
+
+    // Assert correct output
+    assertTo(
+      "1,'USA','2004-10-23','San Francisco'",
+      "2,'USA','2004-10-24','Sunnyvale'",
+      "3,'Czech Republic','2004-10-25','Brno'",
+      "4,'USA','2004-10-26','Palo Alto'"
+    );
+
+    dropTable();
+  }
+
+  public void assertJobSubmissionFailure(MJob job, String ...fragments) throws Exception
{
+    // Try to execute the job and verify that the it was not successful
+    try {
+      executeJob(job);
+      fail("Expected failure in the job submission.");
+    } catch (SqoopException ex) {
+      // Top level exception should be CLIENT_0001
+      assertEquals(ClientError.CLIENT_0001, ex.getErrorCode());
+
+      // We can directly verify the ErrorCode from SqoopException as client side
+      // is not rebuilding SqoopExceptions per missing ErrorCodes. E.g. the cause
+      // will be generic Throwable and not SqoopException instance.
+      Throwable cause = ex.getCause();
+      assertNotNull(cause);
+
+      for(String fragment : fragments) {
+        assertTrue(cause.getMessage().contains(fragment), "Expected fragment " + fragment
+ " in error message " + cause.getMessage());
+      }
+    }
+  }
+}


Mime
View raw message