sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject git commit: SQOOP-768 Compilation on hadoop profile 100 will fail (Jarek Jarcec Cecho)
Date Fri, 14 Dec 2012 20:05:04 GMT
Updated Branches:
  refs/heads/sqoop2 1c87cb762 -> 66a328aef


SQOOP-768 Compilation on hadoop profile 100 will fail
(Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/66a328ae
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/66a328ae
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/66a328ae

Branch: refs/heads/sqoop2
Commit: 66a328aef87ebed4d22ca7cc22d655360eedc44f
Parents: 1c87cb7
Author: Bilung Lee <blee@apache.org>
Authored: Fri Dec 14 11:52:14 2012 -0800
Committer: Bilung Lee <blee@apache.org>
Committed: Fri Dec 14 11:52:14 2012 -0800

----------------------------------------------------------------------
 execution/mapreduce/pom.xml                        |    2 -
 .../mapreduce/MapreduceExecutionEngine.java        |   22 ++++++++++-
 .../java/org/apache/sqoop/job/JobConstants.java    |    2 +
 .../sqoop/job/etl/HdfsExportPartitioner.java       |   16 ++++----
 .../sqoop/job/etl/HdfsSequenceExportExtractor.java |    3 +-
 .../sqoop/job/etl/HdfsTextExportExtractor.java     |   30 ++++-----------
 .../java/org/apache/sqoop/job/TestHdfsExtract.java |   25 +++++-------
 pom.xml                                            |    1 -
 8 files changed, 49 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/66a328ae/execution/mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/execution/mapreduce/pom.xml b/execution/mapreduce/pom.xml
index 9e1d2ec..a6299e1 100644
--- a/execution/mapreduce/pom.xml
+++ b/execution/mapreduce/pom.xml
@@ -83,8 +83,6 @@ limitations under the License.
         <dependency>
           <groupId>com.google.guava</groupId>
           <artifactId>guava</artifactId>
-          <version>${guava.version}</version>
-          <scope>provided</scope>
         </dependency>
 
         <dependency>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/66a328ae/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index 06872ca..b201a8d 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -17,8 +17,8 @@
  */
 package org.apache.sqoop.execution.mapreduce;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.framework.ExecutionEngine;
@@ -61,6 +61,9 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
     MRSubmissionRequest request = (MRSubmissionRequest) gRequest;
     ImportJobConfiguration jobConf = (ImportJobConfiguration) request.getConfigFrameworkJob();
 
+    // Add jar dependencies
+    addDependencies(request);
+
     // Configure map-reduce classes for import
     request.setInputFormatClass(SqoopInputFormat.class);
 
@@ -103,6 +106,9 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
     MRSubmissionRequest request = (MRSubmissionRequest) gRequest;
     ExportJobConfiguration jobConf = (ExportJobConfiguration) request.getConfigFrameworkJob();
 
+    // Add jar dependencies
+    addDependencies(request);
+
     // Configure map-reduce classes for import
     request.setInputFormatClass(SqoopInputFormat.class);
 
@@ -124,10 +130,22 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
 
     // We should make one extractor that will be able to read all supported file types
     context.setString(JobConstants.JOB_ETL_EXTRACTOR, HdfsTextExportExtractor.class.getName());
-    context.setString(FileInputFormat.INPUT_DIR, jobConf.input.inputDirectory);
+    context.setString(JobConstants.HADOOP_INPUTDIR, jobConf.input.inputDirectory);
 
     if(request.getExtractors() != null) {
       context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
     }
   }
+
+  /**
+   * Our execution engine have additional dependencies that needs to be available
+   * at mapreduce job time. This method will register all dependencies in the request
+   * object.
+   *
+   * @param request Active request object.
+   */
+  protected void addDependencies(MRSubmissionRequest request) {
+    // Guava
+    request.addJarForClass(ThreadFactoryBuilder.class);
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/66a328ae/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
index f5123a2..e16a2c4 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
@@ -83,6 +83,8 @@ public final class JobConstants extends Constants {
   // We're using constants from Hadoop 1. Hadoop 2 has different names, but
   // provides backward compatibility layer for those names as well.
 
+  public static final String HADOOP_INPUTDIR = "mapred.input.dir";
+
   public static final String HADOOP_OUTDIR = "mapred.output.dir";
 
   public static final String HADOOP_COMPRESS = "mapred.output.compress";

http://git-wip-us.apache.org/repos/asf/sqoop/blob/66a328ae/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
index 7ffd97c..71e0060 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.NetworkTopology;
@@ -113,12 +112,12 @@ public class HdfsExportPartitioner extends Partitioner {
       }
 
       // all the files in input set
-      String indir = conf.get(FileInputFormat.INPUT_DIR);
+      String indir = conf.get(JobConstants.HADOOP_INPUTDIR);
       FileSystem fs = FileSystem.get(conf);
 
       List<Path> paths = new LinkedList<Path>();
       for(FileStatus status : fs.listStatus(new Path(indir))) {
-        if(!status.isDirectory()) {
+        if(!status.isDir()) {
           paths.add(status.getPath());
         }
       }
@@ -143,7 +142,7 @@ public class HdfsExportPartitioner extends Partitioner {
   }
 
   private long getInputSize(Configuration conf) throws IOException {
-    String indir = conf.get(FileInputFormat.INPUT_DIR);
+    String indir = conf.get(JobConstants.HADOOP_INPUTDIR);
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] files = fs.listStatus(new Path(indir));
     long count = 0;
@@ -345,10 +344,11 @@ public class HdfsExportPartitioner extends Partitioner {
   private boolean isSplitable(Configuration conf, Path file) {
     final CompressionCodec codec =
         new CompressionCodecFactory(conf).getCodec(file);
-    if (null == codec) {
-      return true;
-    }
-    return codec instanceof SplittableCompressionCodec;
+
+    // This method might be improved for SplittableCompression codec when we
+    // drop support for Hadoop 1.0
+    return null == codec;
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/66a328ae/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
index 0693a09..2261a7c 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
@@ -74,8 +74,7 @@ public class HdfsSequenceExportExtractor extends Extractor {
     LOG.info("\t to offset " + end);
     LOG.info("\t of length " + length);
 
-    SequenceFile.Reader filereader = new SequenceFile.Reader(conf,
-        SequenceFile.Reader.file(file));
+    SequenceFile.Reader filereader = new SequenceFile.Reader(file.getFileSystem(conf), file,
conf);
 
     if (start > filereader.getPosition()) {
       filereader.sync(start); // sync to start

http://git-wip-us.apache.org/repos/asf/sqoop/blob/66a328ae/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
index c412c81..fdc7d67 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
@@ -30,8 +30,6 @@ import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.SplitCompressionInputStream;
-import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 import org.apache.hadoop.util.LineReader;
 import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.common.SqoopException;
@@ -93,32 +91,20 @@ public class HdfsTextExportExtractor extends Extractor {
       byte[] recordDelimiterBytes = String.valueOf(
           Data.DEFAULT_RECORD_DELIMITER).getBytes(
               Charset.forName(Data.CHARSET_NAME));
-      filereader = new LineReader(filestream, conf,
-          recordDelimiterBytes);
+      // Hadoop 1.0 do not have support for custom record delimiter and thus we
+      // are supporting only default one.
+      filereader = new LineReader(filestream, conf);
       fileseeker = filestream;
-
-    } else if (codec instanceof SplittableCompressionCodec) {
-      SplitCompressionInputStream compressionstream =
-          ((SplittableCompressionCodec)codec).createInputStream(
-              filestream, codec.createDecompressor(), start, end,
-              SplittableCompressionCodec.READ_MODE.BYBLOCK);
-      byte[] recordDelimiterBytes = String.valueOf(
-          Data.DEFAULT_RECORD_DELIMITER).getBytes(
-              Charset.forName(Data.CHARSET_NAME));
-      filereader = new LineReader(compressionstream,
-          conf, recordDelimiterBytes);
-      fileseeker = compressionstream;
-
-      start = compressionstream.getAdjustedStart();
-      end = compressionstream.getAdjustedEnd();
-
+    // We might add another "else if" case for SplittableCompressionCodec once
+    // we drop support for Hadoop 1.0.
     } else {
       byte[] recordDelimiterBytes = String.valueOf(
           Data.DEFAULT_RECORD_DELIMITER).getBytes(
               Charset.forName(Data.CHARSET_NAME));
+      // Hadoop 1.0 do not have support for custom record delimiter and thus we
+      // are supporting only default one.
       filereader = new LineReader(
-          codec.createInputStream(filestream, codec.createDecompressor()),
-          conf, recordDelimiterBytes);
+          codec.createInputStream(filestream, codec.createDecompressor()), conf);
       fileseeker = filestream;
     }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/66a328ae/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
index ba44de9..484eb20 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
@@ -71,7 +71,7 @@ public class TestHdfsExtract extends TestCase {
         HdfsTextExportExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
     conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
-    conf.set(FileInputFormat.INPUT_DIR, indir);
+    conf.set(JobConstants.HADOOP_INPUTDIR, indir);
     JobUtils.runJob(conf);
   }
 
@@ -89,7 +89,7 @@ public class TestHdfsExtract extends TestCase {
         HdfsTextExportExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
     conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
-    conf.set(FileInputFormat.INPUT_DIR, indir);
+    conf.set(JobConstants.HADOOP_INPUTDIR, indir);
     JobUtils.runJob(conf);
 
     FileUtils.delete(indir);
@@ -102,7 +102,7 @@ public class TestHdfsExtract extends TestCase {
         HdfsTextExportExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
     conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
-    conf.set(FileInputFormat.INPUT_DIR, indir);
+    conf.set(JobConstants.HADOOP_INPUTDIR, indir);
     JobUtils.runJob(conf);
   }
 
@@ -120,7 +120,7 @@ public class TestHdfsExtract extends TestCase {
         HdfsSequenceExportExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
     conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
-    conf.set(FileInputFormat.INPUT_DIR, indir);
+    conf.set(JobConstants.HADOOP_INPUTDIR, indir);
     JobUtils.runJob(conf);
   }
 
@@ -138,7 +138,7 @@ public class TestHdfsExtract extends TestCase {
         HdfsSequenceExportExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
     conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
-    conf.set(FileInputFormat.INPUT_DIR, indir);
+    conf.set(JobConstants.HADOOP_INPUTDIR, indir);
     JobUtils.runJob(conf);
   }
 
@@ -198,17 +198,12 @@ public class TestHdfsExtract extends TestCase {
           "part-r-" + padZeros(fi, 5) + HdfsSequenceImportLoader.EXTENSION);
       SequenceFile.Writer filewriter;
       if (codec != null) {
-        filewriter = SequenceFile.createWriter(conf,
-            SequenceFile.Writer.file(filepath),
-            SequenceFile.Writer.keyClass(Text.class),
-            SequenceFile.Writer.valueClass(NullWritable.class),
-            SequenceFile.Writer.compression(CompressionType.BLOCK, codec));
+        filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
+          conf, filepath, Text.class, NullWritable.class,
+          CompressionType.BLOCK, codec);
       } else {
-        filewriter = SequenceFile.createWriter(conf,
-          SequenceFile.Writer.file(filepath),
-          SequenceFile.Writer.keyClass(Text.class),
-          SequenceFile.Writer.valueClass(NullWritable.class),
-          SequenceFile.Writer.compression(CompressionType.NONE));
+        filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
+          conf, filepath, Text.class, NullWritable.class, CompressionType.NONE);
       }
 
       Text text = new Text();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/66a328ae/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2311934..be4f1b6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -130,7 +130,6 @@ limitations under the License.
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
             <version>${guava.version}</version>
-            <scope>provided</scope>
           </dependency>
 
           <dependency>


Mime
View raw message