sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject sqoop git commit: SQOOP-2463: Sqoop2: Add support for schema-less to schema-less transfer for CSV IDF
Date Wed, 21 Oct 2015 15:07:25 GMT
Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 77be2a8f9 -> 4e2204504


SQOOP-2463: Sqoop2: Add support for schema-less to schema-less transfer for CSV IDF

(Abraham Fine via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/4e220450
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/4e220450
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/4e220450

Branch: refs/heads/sqoop2
Commit: 4e22045043dfa3d3c3032d4a433c300980728984
Parents: 77be2a8
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Wed Oct 21 08:06:41 2015 -0700
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Wed Oct 21 08:06:41 2015 -0700

----------------------------------------------------------------------
 .../sqoop/connector/hdfs/HdfsExtractor.java     | 28 ++++----
 .../apache/sqoop/connector/hdfs/HdfsLoader.java | 26 ++++---
 .../org/apache/sqoop/job/mr/SqoopMapper.java    |  2 +-
 .../org/apache/sqoop/test/utils/HdfsUtils.java  |  4 +-
 .../connector/hdfs/FromHDFSToHDFSTest.java      | 74 ++++++++++++++++++++
 5 files changed, 109 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/4e220450/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
index b35c957..23bbcc0 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
@@ -18,6 +18,7 @@
 package org.apache.sqoop.connector.hdfs;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -38,6 +39,7 @@ import org.apache.sqoop.error.code.HdfsConnectorError;
 import org.apache.sqoop.etl.io.DataWriter;
 import org.apache.sqoop.job.etl.Extractor;
 import org.apache.sqoop.job.etl.ExtractorContext;
+import org.apache.sqoop.schema.ByteArraySchema;
 import org.apache.sqoop.schema.Schema;
 
 /**
@@ -112,12 +114,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
     boolean hasNext = filereader.next(line);
     while (hasNext) {
       rowsRead++;
-      if (HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)) {
-        Object[] data = SqoopIDFUtils.fromCSV(line.toString(), schema);
-        dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, fromJobConfiguration,
data));
-      } else {
-        dataWriter.writeStringRecord(line.toString());
-      }
+      extractRow(linkConfiguration, fromJobConfiguration, line);
       line = new Text();
       hasNext = filereader.next(line);
       if (filereader.getPosition() >= end && filereader.syncSeen()) {
@@ -180,12 +177,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
         next = fileseeker.getPos();
       }
       rowsRead++;
-      if (HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)) {
-        Object[] data = SqoopIDFUtils.fromCSV(line.toString(), schema);
-        dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, fromJobConfiguration,
data));
-      } else {
-        dataWriter.writeStringRecord(line.toString());
-      }
+      extractRow(linkConfiguration, fromJobConfiguration, line);
     }
     LOG.info("Extracting ended on position: " + fileseeker.getPos());
     filestream.close();
@@ -213,5 +205,17 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
     return true;
   }
 
+  private void extractRow(LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration,
Text line) throws UnsupportedEncodingException {
+    if (schema instanceof ByteArraySchema) {
+      dataWriter.writeArrayRecord(new Object[] {line.toString().getBytes(SqoopIDFUtils.BYTE_FIELD_CHARSET)});
+    } else if (!HdfsUtils.hasCustomFormat(linkConfiguration,
+      fromJobConfiguration)) {
+      dataWriter.writeStringRecord(line.toString());
+    } else {
+      Object[] data = SqoopIDFUtils.fromCSV(line.toString(), schema);
+      dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, fromJobConfiguration,
data));
+    }
+  }
+
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/4e220450/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
index 948b1b6..798e552 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
@@ -36,6 +36,7 @@ import org.apache.sqoop.error.code.HdfsConnectorError;
 import org.apache.sqoop.etl.io.DataReader;
 import org.apache.sqoop.job.etl.Loader;
 import org.apache.sqoop.job.etl.LoaderContext;
+import org.apache.sqoop.schema.ByteArraySchema;
 import org.apache.sqoop.utils.ClassUtils;
 
 public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
@@ -86,21 +87,24 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration>
{
 
       filewriter.initialize(filepath, conf, codec);
 
-      if (HdfsUtils.hasCustomFormat(linkConfiguration, toJobConfig)) {
-        Object[] record;
-
-        while ((record = reader.readArrayRecord()) != null) {
-          filewriter.write(
-              SqoopIDFUtils.toCSV(
-                  HdfsUtils.formatRecord(linkConfiguration, toJobConfig, record),
-                  context.getSchema()));
+      if (!HdfsUtils.hasCustomFormat(linkConfiguration, toJobConfig) || (context.getSchema()
instanceof ByteArraySchema)) {
+        String record;
+        while ((record = reader.readTextRecord()) != null) {
+          if (context.getSchema() instanceof ByteArraySchema) {
+            filewriter.write(SqoopIDFUtils.toText(record));
+          } else {
+            filewriter.write(record);
+          }
           rowsWritten++;
         }
       } else {
-        String record;
+        Object[] record;
 
-        while ((record = reader.readTextRecord()) != null) {
-          filewriter.write(record);
+        while ((record = reader.readArrayRecord()) != null) {
+          filewriter.write(
+            SqoopIDFUtils.toCSV(
+              HdfsUtils.formatRecord(linkConfiguration, toJobConfig, record),
+              context.getSchema()));
           rowsWritten++;
         }
       }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/4e220450/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 937ef5a..c93813b 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -85,7 +85,7 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
     Object fromJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
 
     SqoopSplit split = context.getCurrentKey();
-    ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context),
fromSchema);
+    ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context),
matcher.getFromSchema());
 
     try {
       LOG.info("Starting progress service");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/4e220450/test/src/main/java/org/apache/sqoop/test/utils/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/utils/HdfsUtils.java b/test/src/main/java/org/apache/sqoop/test/utils/HdfsUtils.java
index 610156e..0369994 100644
--- a/test/src/main/java/org/apache/sqoop/test/utils/HdfsUtils.java
+++ b/test/src/main/java/org/apache/sqoop/test/utils/HdfsUtils.java
@@ -64,7 +64,9 @@ public class HdfsUtils {
     LinkedList<Path> files = new LinkedList<Path>();
     for (FileStatus fileStatus : fs.listStatus(new Path(directory), filterHiddenFiles)) {
       LOG.debug("Found mapreduce output file: " + fileStatus.getPath() + " with size " +
fileStatus.getLen());
-      files.add(fileStatus.getPath());
+      if (fileStatus.isFile()) {
+        files.add(fileStatus.getPath());
+      }
     }
     return files.toArray(new Path[files.size()]);
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/4e220450/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/FromHDFSToHDFSTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/FromHDFSToHDFSTest.java
b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/FromHDFSToHDFSTest.java
new file mode 100644
index 0000000..4b2fa06
--- /dev/null
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/FromHDFSToHDFSTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.integration.connector.hdfs;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
+import org.apache.sqoop.model.MDriverConfig;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.test.asserts.HdfsAsserts;
+import org.apache.sqoop.test.testcases.ConnectorTestCase;
+import org.apache.sqoop.test.utils.HdfsUtils;
+import org.testng.annotations.Test;
+
+/**
+ * Test schemaless to schemaless transfer by using two hdfs connectors
+ */
+public class FromHDFSToHDFSTest extends ConnectorTestCase {
+
+  @Test
+  public void test() throws Exception {
+    String[] sampleData = new String[]{
+      "1,'USA','2004-10-23','San Francisco'",
+      "2,'USA','2004-10-24','Sunnyvale'",
+      "3,'Czech Republic','2004-10-25','Brno'",
+      "4,'USA','2004-10-26','Palo Alto'"
+    };
+
+    createFromFile("input-0001", sampleData);
+
+    MLink hdfsLinkFrom = getClient().createLink("hdfs-connector");
+    fillHdfsLink(hdfsLinkFrom);
+    saveLink(hdfsLinkFrom);
+
+    MLink hdfsLinkTo = getClient().createLink("hdfs-connector");
+    fillHdfsLink(hdfsLinkTo);
+    saveLink(hdfsLinkTo);
+
+    MJob job = getClient().createJob(hdfsLinkFrom.getPersistenceId(), hdfsLinkTo.getPersistenceId());
+
+    fillHdfsFromConfig(job);
+
+    fillHdfsToConfig(job, ToFormat.TEXT_FILE);
+    hdfsClient.mkdirs(new Path(HdfsUtils.joinPathFragments
+      (getMapreduceDirectory(), "TO")));
+
+    job.getToJobConfig().getStringInput("toJobConfig.outputDirectory")
+      .setValue(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO"));
+
+
+    MDriverConfig driverConfig = job.getDriverConfig();
+    driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(3);
+    saveJob(job);
+
+    executeJob(job);
+
+    HdfsAsserts.assertMapreduceOutput(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(),
"TO"), sampleData);
+  }
+}


Mime
View raw message