sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject sqoop git commit: SQOOP-1949: Sqoop2: HDFS append only support
Date Thu, 19 Mar 2015 17:04:30 GMT
Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 24feea185 -> a63da71ad


SQOOP-1949: Sqoop2: HDFS append only support

(Jarek Jarcec Cecho via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/a63da71a
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/a63da71a
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/a63da71a

Branch: refs/heads/sqoop2
Commit: a63da71adfe3e26b0d3a0c2d162782f835edf0da
Parents: 24feea1
Author: Abraham Elmahrek <abe@apache.org>
Authored: Wed Mar 18 23:43:00 2015 -0700
Committer: Abraham Elmahrek <abe@apache.org>
Committed: Wed Mar 18 23:43:00 2015 -0700

----------------------------------------------------------------------
 .../sqoop/connector/hdfs/HdfsToInitializer.java |  3 +-
 .../hdfs/configuration/ToJobConfig.java         |  2 +
 .../resources/hdfs-connector-config.properties  |  4 +-
 .../sqoop/connector/hdfs/TestToInitializer.java | 21 +++++
 .../apache/sqoop/test/asserts/HdfsAsserts.java  |  4 +-
 .../connector/hdfs/AppendModeTest.java          | 87 ++++++++++++++++++++
 6 files changed, 117 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/a63da71a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
index 05ceb23..234bb71 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
@@ -48,6 +48,7 @@ public class HdfsToInitializer extends Initializer<LinkConfiguration,
ToJobConfi
 
     Configuration configuration = HdfsUtils.createConfiguration(linkConfig);
     HdfsUtils.configurationToContext(configuration, context.getContext());
+    boolean appendMode = Boolean.TRUE.equals(jobConfig.toJobConfig.appendMode);
 
     // Verification that given HDFS directory either don't exists or is empty
     try {
@@ -59,7 +60,7 @@ public class HdfsToInitializer extends Initializer<LinkConfiguration,
ToJobConfi
           throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Output
directory already exists and is a file");
         }
 
-        if(fs.isDirectory(path)) {
+        if(fs.isDirectory(path) && !appendMode) {
           FileStatus[] fileStatuses = fs.listStatus(path);
           if(fileStatuses.length != 0) {
             throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Output
directory is not empty");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a63da71a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
index 6fc894b..d76ba5f 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfig.java
@@ -41,6 +41,8 @@ public class ToJobConfig {
 
   @Input(size = 255, validators = { @Validator(NotEmpty.class)}) public String outputDirectory;
 
+  @Input public Boolean appendMode;
+
   public static class ToJobConfigValidator extends AbstractValidator<ToJobConfig> {
     @Override
     public void validate(ToJobConfig conf) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a63da71a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
index 8d5a562..eb9c000 100644
--- a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
+++ b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
@@ -48,8 +48,8 @@ toJobConfig.customCompression.help = Full class name of the custom compression
 toJobConfig.outputDirectory.label = Output directory
 toJobConfig.outputDirectory.help = Output directory for final data
 
-toJobConfig.ignored.label = Ignored
-toJobConfig.ignored.help = This value is ignored
+toJobConfig.appendMode.label = Append mode
+toJobConfig.appendMode.help = Append new files to existing directory if the output directory
already exists
 
 toJobConfig.overrideNullValue.label = Override null value
 toJobConfig.overrideNullValue.help = If set to true, then the null value will \

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a63da71a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
index aa267a7..a98a46a 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
@@ -88,4 +88,25 @@ public class TestToInitializer extends TestHdfsBase {
     Initializer initializer = new HdfsToInitializer();
     initializer.initialize(initializerContext, linkConfig, jobConfig);
   }
+
+  @Test
+  public void testOutputDirectoryIsNotEmptyWithIncremental() throws Exception {
+    File dir = Files.createTempDir();
+    File file = File.createTempFile("MastersOfOrion", ".txt", dir);
+
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    ToJobConfiguration jobConfig = new ToJobConfiguration();
+
+    linkConfig.linkConfig.uri = "file:///";
+    jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath();
+    jobConfig.toJobConfig.appendMode = true;
+
+    InitializerContext initializerContext = new InitializerContext(new MutableMapContext());
+
+    Initializer initializer = new HdfsToInitializer();
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
+
+    assertNotNull(initializerContext.getString(HdfsConstants.WORK_DIRECTORY));
+    assertTrue(initializerContext.getString(HdfsConstants.WORK_DIRECTORY).startsWith(dir.getAbsolutePath()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a63da71a/test/src/main/java/org/apache/sqoop/test/asserts/HdfsAsserts.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/asserts/HdfsAsserts.java b/test/src/main/java/org/apache/sqoop/test/asserts/HdfsAsserts.java
index b115723..8d548ad 100644
--- a/test/src/main/java/org/apache/sqoop/test/asserts/HdfsAsserts.java
+++ b/test/src/main/java/org/apache/sqoop/test/asserts/HdfsAsserts.java
@@ -17,6 +17,8 @@
  */
 package org.apache.sqoop.test.asserts;
 
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -50,7 +52,7 @@ public class HdfsAsserts {
    * @throws IOException
    */
   public static void assertMapreduceOutput(FileSystem fs, String directory, String... lines)
throws IOException {
-    Set<String> setLines = new HashSet<String>(Arrays.asList(lines));
+    Multiset<String> setLines = HashMultiset.create(Arrays.asList(lines));
     List<String> notFound = new LinkedList<String>();
 
     Path[] files = HdfsUtils.getOutputMapreduceFiles(fs, directory);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a63da71a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/AppendModeTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/AppendModeTest.java
b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/AppendModeTest.java
new file mode 100644
index 0000000..1ba3bd4
--- /dev/null
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/AppendModeTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.integration.connector.hdfs;
+
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
+import org.apache.sqoop.model.MConfigList;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.test.testcases.ConnectorTestCase;
+import org.testng.annotations.Test;
+
+/**
+ */
+public class AppendModeTest extends ConnectorTestCase {
+
+  @Test
+  public void test() throws Exception {
+    createAndLoadTableCities();
+
+    // RDBMS link
+    MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector");
+    fillRdbmsLinkConfig(rdbmsConnection);
+    saveLink(rdbmsConnection);
+
+    // HDFS link
+    MLink hdfsConnection = getClient().createLink("hdfs-connector");
+    fillHdfsLink(hdfsConnection);
+    saveLink(hdfsConnection);
+
+    // Job creation
+    MJob job = getClient().createJob(rdbmsConnection.getPersistenceId(), hdfsConnection.getPersistenceId());
+
+    // Set rdbms "FROM" config
+    MConfigList fromConfig = job.getJobConfig(Direction.FROM);
+    fromConfig.getStringInput("fromJobConfig.tableName").setValue(provider.escapeTableName(getTableName()));
+    fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id"));
+
+    // fill the hdfs "TO" config
+    fillHdfsToConfig(job, ToFormat.TEXT_FILE);
+    MConfigList toConfig = job.getJobConfig(Direction.TO);
+    toConfig.getBooleanInput("toJobConfig.appendMode").setValue(true);
+
+
+    saveJob(job);
+
+    // First execution
+    executeJob(job);
+    assertTo(
+      "1,'USA','2004-10-23','San Francisco'",
+      "2,'USA','2004-10-24','Sunnyvale'",
+      "3,'Czech Republic','2004-10-25','Brno'",
+      "4,'USA','2004-10-26','Palo Alto'"
+    );
+
+    // Second execution
+    executeJob(job);
+    assertTo(
+      "1,'USA','2004-10-23','San Francisco'",
+      "2,'USA','2004-10-24','Sunnyvale'",
+      "3,'Czech Republic','2004-10-25','Brno'",
+      "4,'USA','2004-10-26','Palo Alto'",
+      "1,'USA','2004-10-23','San Francisco'",
+      "2,'USA','2004-10-24','Sunnyvale'",
+      "3,'Czech Republic','2004-10-25','Brno'",
+      "4,'USA','2004-10-26','Palo Alto'"
+    );
+
+    dropTable();
+  }
+
+}


Mime
View raw message