sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [2/2] sqoop git commit: SQOOP-3319: Extract code using Kite into separate classes
Date Tue, 29 May 2018 08:20:24 GMT
SQOOP-3319: Extract code using Kite into separate classes

(Szabolcs Vasas via Boglarka Egyed)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/3233db8e
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/3233db8e
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/3233db8e

Branch: refs/heads/trunk
Commit: 3233db8e1c481e38c538f4caaf55bcbc0c11f208
Parents: ad7d046
Author: Boglarka Egyed <bogi@apache.org>
Authored: Tue May 29 10:17:25 2018 +0200
Committer: Boglarka Egyed <bogi@apache.org>
Committed: Tue May 29 10:17:25 2018 +0200

----------------------------------------------------------------------
 src/java/org/apache/sqoop/avro/AvroUtil.java    |   4 +
 .../org/apache/sqoop/manager/ConnManager.java   |   5 +
 .../org/apache/sqoop/manager/CubridManager.java |   4 +-
 .../org/apache/sqoop/manager/Db2Manager.java    |   2 +-
 .../sqoop/manager/DirectPostgresqlManager.java  |   3 +-
 .../apache/sqoop/manager/MainframeManager.java  |   2 +-
 .../org/apache/sqoop/manager/MySQLManager.java  |   2 +-
 .../org/apache/sqoop/manager/OracleManager.java |   4 +-
 .../apache/sqoop/manager/SQLServerManager.java  |   8 +-
 .../org/apache/sqoop/manager/SqlManager.java    |  10 +-
 .../sqoop/manager/oracle/OraOopConnManager.java |   4 +-
 .../sqoop/mapreduce/DataDrivenImportJob.java    |  66 ++----
 .../apache/sqoop/mapreduce/ImportJobBase.java   |   4 +-
 .../sqoop/mapreduce/JdbcCallExportJob.java      |  10 +-
 .../apache/sqoop/mapreduce/JdbcExportJob.java   |  19 +-
 .../sqoop/mapreduce/JdbcUpdateExportJob.java    |  20 +-
 .../sqoop/mapreduce/JdbcUpsertExportJob.java    |   6 +-
 .../org/apache/sqoop/mapreduce/MergeJob.java    |  69 +-----
 .../sqoop/mapreduce/MergeParquetReducer.java    |  15 +-
 .../sqoop/mapreduce/ParquetExportMapper.java    |  43 ----
 .../sqoop/mapreduce/ParquetImportMapper.java    |  20 +-
 .../org/apache/sqoop/mapreduce/ParquetJob.java  | 220 -------------------
 .../mapreduce/mainframe/MainframeImportJob.java |   5 +-
 .../mapreduce/parquet/ParquetConstants.java     |  31 +++
 .../parquet/ParquetExportJobConfigurator.java   |  35 +++
 .../parquet/ParquetImportJobConfigurator.java   |  38 ++++
 .../parquet/ParquetJobConfiguratorFactory.java  |  29 +++
 .../ParquetJobConfiguratorFactoryProvider.java  |  34 +++
 .../parquet/ParquetMergeJobConfigurator.java    |  31 +++
 .../parquet/kite/KiteMergeParquetReducer.java   |  33 +++
 .../kite/KiteParquetExportJobConfigurator.java  |  48 ++++
 .../parquet/kite/KiteParquetExportMapper.java   |  37 ++++
 .../kite/KiteParquetImportJobConfigurator.java  |  90 ++++++++
 .../parquet/kite/KiteParquetImportMapper.java   |  52 +++++
 .../kite/KiteParquetJobConfiguratorFactory.java |  42 ++++
 .../kite/KiteParquetMergeJobConfigurator.java   | 100 +++++++++
 .../parquet/kite/KiteParquetUtils.java          | 217 ++++++++++++++++++
 .../postgresql/PostgreSQLCopyExportJob.java     |  11 +-
 .../org/apache/sqoop/tool/BaseSqoopTool.java    |   7 +
 src/java/org/apache/sqoop/tool/ImportTool.java  |   4 +-
 src/java/org/apache/sqoop/tool/MergeTool.java   |   4 +-
 .../org/apache/sqoop/TestParquetImport.java     |   7 +-
 .../org/apache/sqoop/hive/TestHiveImport.java   |   6 +-
 .../sqoop/mapreduce/TestJdbcExportJob.java      |   3 +-
 .../mainframe/TestMainframeImportJob.java       |   6 +-
 45 files changed, 961 insertions(+), 449 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/avro/AvroUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/avro/AvroUtil.java b/src/java/org/apache/sqoop/avro/AvroUtil.java
index 603cc63..57c2062 100644
--- a/src/java/org/apache/sqoop/avro/AvroUtil.java
+++ b/src/java/org/apache/sqoop/avro/AvroUtil.java
@@ -340,4 +340,8 @@ public final class AvroUtil {
 
     return LogicalTypes.decimal(precision, scale);
   }
+
+  public static Schema parseAvroSchema(String schemaString) {
+    return new Schema.Parser().parse(schemaString);
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/manager/ConnManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/ConnManager.java b/src/java/org/apache/sqoop/manager/ConnManager.java
index d7d6279..c80dd5d 100644
--- a/src/java/org/apache/sqoop/manager/ConnManager.java
+++ b/src/java/org/apache/sqoop/manager/ConnManager.java
@@ -45,6 +45,8 @@ import org.apache.sqoop.SqoopOptions;
 import org.apache.sqoop.hive.HiveTypes;
 import org.apache.sqoop.lib.BlobRef;
 import org.apache.sqoop.lib.ClobRef;
+import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory;
+import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactoryProvider;
 import org.apache.sqoop.util.ExportException;
 import org.apache.sqoop.util.ImportException;
 
@@ -866,5 +868,8 @@ public abstract class ConnManager {
     return false;
   }
 
+  public ParquetJobConfiguratorFactory getParquetJobConfigurator() {
+    return ParquetJobConfiguratorFactoryProvider.createParquetJobConfiguratorFactory(options.getConf());
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/manager/CubridManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/CubridManager.java b/src/java/org/apache/sqoop/manager/CubridManager.java
index e27f616..a75268f 100644
--- a/src/java/org/apache/sqoop/manager/CubridManager.java
+++ b/src/java/org/apache/sqoop/manager/CubridManager.java
@@ -65,7 +65,7 @@ public class CubridManager extends
       throws IOException, ExportException {
     context.setConnManager(this);
     JdbcExportJob exportJob = new JdbcExportJob(context, null, null,
-        ExportBatchOutputFormat.class);
+        ExportBatchOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
 
     exportJob.runExport();
   }
@@ -80,7 +80,7 @@ public class CubridManager extends
     context.setConnManager(this);
 
     JdbcUpsertExportJob exportJob = new JdbcUpsertExportJob(context,
-        CubridUpsertOutputFormat.class);
+        CubridUpsertOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
     exportJob.runExport();
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/manager/Db2Manager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/Db2Manager.java b/src/java/org/apache/sqoop/manager/Db2Manager.java
index 7ff68ce..c78946e 100644
--- a/src/java/org/apache/sqoop/manager/Db2Manager.java
+++ b/src/java/org/apache/sqoop/manager/Db2Manager.java
@@ -111,7 +111,7 @@ public class Db2Manager
       throws IOException, ExportException {
     context.setConnManager(this);
     JdbcExportJob exportJob = new JdbcExportJob(context, null, null,
-      ExportBatchOutputFormat.class);
+      ExportBatchOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
     exportJob.runExport();
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java b/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
index c05e1c1..70b9b43 100644
--- a/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
+++ b/src/java/org/apache/sqoop/manager/DirectPostgresqlManager.java
@@ -585,7 +585,8 @@ public class DirectPostgresqlManager
       new PostgreSQLCopyExportJob(context,
                                   null,
                                   ExportInputFormat.class,
-                                  NullOutputFormat.class);
+                                  NullOutputFormat.class,
+                                  getParquetJobConfigurator().createParquetExportJobConfigurator());
     job.runExport();
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/manager/MainframeManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/MainframeManager.java b/src/java/org/apache/sqoop/manager/MainframeManager.java
index a6002ef..4e8be15 100644
--- a/src/java/org/apache/sqoop/manager/MainframeManager.java
+++ b/src/java/org/apache/sqoop/manager/MainframeManager.java
@@ -90,7 +90,7 @@ public class MainframeManager extends org.apache.sqoop.manager.ConnManager {
       importer = new AccumuloImportJob(opts, context);
     } else {
       // Import to HDFS.
-      importer = new MainframeImportJob(opts, context);
+      importer = new MainframeImportJob(opts, context, getParquetJobConfigurator().createParquetImportJobConfigurator());
     }
 
     importer.setInputFormatClass(MainframeDatasetInputFormat.class);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/manager/MySQLManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/MySQLManager.java b/src/java/org/apache/sqoop/manager/MySQLManager.java
index 2d17707..992c461 100644
--- a/src/java/org/apache/sqoop/manager/MySQLManager.java
+++ b/src/java/org/apache/sqoop/manager/MySQLManager.java
@@ -138,7 +138,7 @@ public class MySQLManager
     LOG.warn("documentation for additional limitations.");
 
     JdbcUpsertExportJob exportJob =
-      new JdbcUpsertExportJob(context, MySQLUpsertOutputFormat.class);
+      new JdbcUpsertExportJob(context, MySQLUpsertOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
     exportJob.runExport();
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/manager/OracleManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/OracleManager.java b/src/java/org/apache/sqoop/manager/OracleManager.java
index b7005d4..cdc6c54 100644
--- a/src/java/org/apache/sqoop/manager/OracleManager.java
+++ b/src/java/org/apache/sqoop/manager/OracleManager.java
@@ -462,7 +462,7 @@ public class OracleManager
       throws IOException, ExportException {
     context.setConnManager(this);
     JdbcExportJob exportJob = new JdbcExportJob(context,
-            null, null, ExportBatchOutputFormat.class);
+            null, null, ExportBatchOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
     exportJob.runExport();
   }
 
@@ -474,7 +474,7 @@ public class OracleManager
       throws IOException, ExportException {
     context.setConnManager(this);
     JdbcUpsertExportJob exportJob =
-      new JdbcUpsertExportJob(context, OracleUpsertOutputFormat.class);
+      new JdbcUpsertExportJob(context, OracleUpsertOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
     exportJob.runExport();
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/manager/SQLServerManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/SQLServerManager.java b/src/java/org/apache/sqoop/manager/SQLServerManager.java
index d57a493..b136087 100644
--- a/src/java/org/apache/sqoop/manager/SQLServerManager.java
+++ b/src/java/org/apache/sqoop/manager/SQLServerManager.java
@@ -181,10 +181,10 @@ public class SQLServerManager
     JdbcExportJob exportJob;
     if (isNonResilientOperation()) {
       exportJob = new JdbcExportJob(context, null, null,
-      SqlServerExportBatchOutputFormat.class);
+      SqlServerExportBatchOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
     } else {
       exportJob = new JdbcExportJob(context, null, null,
-        SQLServerResilientExportOutputFormat.class);
+        SQLServerResilientExportOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
       configureConnectionRecoveryForExport(context);
     }
     exportJob.runExport();
@@ -202,7 +202,7 @@ public class SQLServerManager
     } else {
       context.setConnManager(this);
       JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context, null,
-        null, SQLServerResilientUpdateOutputFormat.class);
+        null, SQLServerResilientUpdateOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
       configureConnectionRecoveryForUpdate(context);
       exportJob.runExport();
     }
@@ -223,7 +223,7 @@ public class SQLServerManager
     }
 
     JdbcUpsertExportJob exportJob =
-        new JdbcUpsertExportJob(context, SqlServerUpsertOutputFormat.class);
+        new JdbcUpsertExportJob(context, SqlServerUpsertOutputFormat.class, getParquetJobConfigurator().createParquetExportJobConfigurator());
     exportJob.runExport();
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/manager/SqlManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/SqlManager.java b/src/java/org/apache/sqoop/manager/SqlManager.java
index 4572098..d82332a 100644
--- a/src/java/org/apache/sqoop/manager/SqlManager.java
+++ b/src/java/org/apache/sqoop/manager/SqlManager.java
@@ -682,7 +682,7 @@ public abstract class SqlManager
     } else {
       // Import to HDFS.
       importer = new DataDrivenImportJob(opts, context.getInputFormat(),
-              context);
+              context, getParquetJobConfigurator().createParquetImportJobConfigurator());
     }
 
     checkTableImportOptions(context);
@@ -725,7 +725,7 @@ public abstract class SqlManager
     } else {
       // Import to HDFS.
       importer = new DataDrivenImportJob(opts, context.getInputFormat(),
-          context);
+          context, getParquetJobConfigurator().createParquetImportJobConfigurator());
     }
 
     String splitCol = getSplitColumn(opts, null);
@@ -926,7 +926,7 @@ public abstract class SqlManager
   public void exportTable(org.apache.sqoop.manager.ExportJobContext context)
       throws IOException, ExportException {
     context.setConnManager(this);
-    JdbcExportJob exportJob = new JdbcExportJob(context);
+    JdbcExportJob exportJob = new JdbcExportJob(context, getParquetJobConfigurator().createParquetExportJobConfigurator());
     exportJob.runExport();
   }
 
@@ -935,7 +935,7 @@ public abstract class SqlManager
       throws IOException,
       ExportException {
     context.setConnManager(this);
-    JdbcCallExportJob exportJob = new JdbcCallExportJob(context);
+    JdbcCallExportJob exportJob = new JdbcCallExportJob(context, getParquetJobConfigurator().createParquetExportJobConfigurator());
     exportJob.runExport();
   }
 
@@ -960,7 +960,7 @@ public abstract class SqlManager
       org.apache.sqoop.manager.ExportJobContext context)
       throws IOException, ExportException {
     context.setConnManager(this);
-    JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context);
+    JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context, getParquetJobConfigurator().createParquetExportJobConfigurator());
     exportJob.runExport();
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/manager/oracle/OraOopConnManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopConnManager.java b/src/java/org/apache/sqoop/manager/oracle/OraOopConnManager.java
index 10524e3..95eaacf 100644
--- a/src/java/org/apache/sqoop/manager/oracle/OraOopConnManager.java
+++ b/src/java/org/apache/sqoop/manager/oracle/OraOopConnManager.java
@@ -321,7 +321,7 @@ public class OraOopConnManager extends GenericJdbcManager {
       throw ex;
     }
     JdbcExportJob exportJob =
-        new JdbcExportJob(context, null, null, oraOopOutputFormatClass);
+        new JdbcExportJob(context, null, null, oraOopOutputFormatClass, getParquetJobConfigurator().createParquetExportJobConfigurator());
     exportJob.runExport();
   }
 
@@ -343,7 +343,7 @@ public class OraOopConnManager extends GenericJdbcManager {
     }
 
     JdbcUpdateExportJob exportJob =
-        new JdbcUpdateExportJob(context, null, null, oraOopOutputFormatClass);
+        new JdbcUpdateExportJob(context, null, null, oraOopOutputFormatClass, getParquetJobConfigurator().createParquetExportJobConfigurator());
     exportJob.runExport();
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
index a5962ba..3b54210 100644
--- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
@@ -26,8 +26,6 @@ import org.apache.avro.Schema;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -48,10 +46,8 @@ import org.apache.sqoop.manager.ImportJobContext;
 import org.apache.sqoop.mapreduce.ImportJobBase;
 import org.apache.sqoop.mapreduce.db.DBConfiguration;
 import org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
 import org.apache.sqoop.orm.AvroSchemaGenerator;
-import org.apache.sqoop.util.FileSystemUtil;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
 
 /**
  * Actually runs a jdbc import job using the ORM files generated by the
@@ -62,15 +58,24 @@ public class DataDrivenImportJob extends ImportJobBase {
   public static final Log LOG = LogFactory.getLog(
       DataDrivenImportJob.class.getName());
 
-  @SuppressWarnings("unchecked")
-  public DataDrivenImportJob(final SqoopOptions opts) {
-    super(opts, null, DataDrivenDBInputFormat.class, null, null);
+  private final ParquetImportJobConfigurator parquetImportJobConfigurator;
+
+  public DataDrivenImportJob(final SqoopOptions opts,
+      final Class<? extends InputFormat> inputFormatClass,
+      ImportJobContext context, ParquetImportJobConfigurator parquetImportJobConfigurator) {
+    super(opts, null, inputFormatClass, null, context);
+    this.parquetImportJobConfigurator = parquetImportJobConfigurator;
   }
 
   public DataDrivenImportJob(final SqoopOptions opts,
       final Class<? extends InputFormat> inputFormatClass,
       ImportJobContext context) {
-    super(opts, null, inputFormatClass, null, context);
+    this(opts, inputFormatClass, context, null);
+  }
+
+  @SuppressWarnings("unchecked")
+  public DataDrivenImportJob(final SqoopOptions opts) {
+    this(opts, DataDrivenDBInputFormat.class, null);
   }
 
   @Override
@@ -101,53 +106,20 @@ public class DataDrivenImportJob extends ImportJobBase {
       AvroJob.setMapOutputSchema(job.getConfiguration(), schema);
     } else if (options.getFileLayout()
         == SqoopOptions.FileLayout.ParquetFile) {
-      JobConf conf = (JobConf)job.getConfiguration();
       // Kite SDK requires an Avro schema to represent the data structure of
       // target dataset. If the schema name equals to generated java class name,
       // the import will fail. So we use table name as schema name and add a
       // prefix "codegen_" to generated java class to avoid the conflict.
       final String schemaNameOverride = tableName;
       Schema schema = generateAvroSchema(tableName, schemaNameOverride);
-      String uri = getKiteUri(conf, tableName);
-      ParquetJob.WriteMode writeMode;
-
-      if (options.doHiveImport()) {
-        if (options.doOverwriteHiveTable()) {
-          writeMode = ParquetJob.WriteMode.OVERWRITE;
-        } else {
-          writeMode = ParquetJob.WriteMode.APPEND;
-          if (Datasets.exists(uri)) {
-            LOG.warn("Target Hive table '" + tableName + "' exists! Sqoop will " +
-                "append data into the existing Hive table. Consider using " +
-                "--hive-overwrite, if you do NOT intend to do appending.");
-          }
-        }
-      } else {
-        // Note that there is no such an import argument for overwriting HDFS
-        // dataset, so overwrite mode is not supported yet.
-        // Sqoop's append mode means to merge two independent datasets. We
-        // choose DEFAULT as write mode.
-        writeMode = ParquetJob.WriteMode.DEFAULT;
-      }
-      ParquetJob.configureImportJob(conf, schema, uri, writeMode);
+      Path destination = getContext().getDestination();
+
+      parquetImportJobConfigurator.configureMapper(job, schema, options, tableName, destination);
     }
 
     job.setMapperClass(getMapperClass());
   }
 
-  private String getKiteUri(Configuration conf, String tableName) throws IOException {
-    if (options.doHiveImport()) {
-      String hiveDatabase = options.getHiveDatabaseName() == null ? "default" :
-          options.getHiveDatabaseName();
-      String hiveTable = options.getHiveTableName() == null ? tableName :
-          options.getHiveTableName();
-      return String.format("dataset:hive:/%s/%s", hiveDatabase, hiveTable);
-    } else {
-      Path destination = getContext().getDestination();
-      return "dataset:" + FileSystemUtil.makeQualified(destination, conf);
-    }
-  }
-
   private Schema generateAvroSchema(String tableName,
       String schemaNameOverride) throws IOException {
     ConnManager connManager = getContext().getConnManager();
@@ -187,7 +159,7 @@ public class DataDrivenImportJob extends ImportJobBase {
       return AvroImportMapper.class;
     } else if (options.getFileLayout()
         == SqoopOptions.FileLayout.ParquetFile) {
-      return ParquetImportMapper.class;
+      return parquetImportJobConfigurator.getMapperClass();
     }
 
     return null;
@@ -210,7 +182,7 @@ public class DataDrivenImportJob extends ImportJobBase {
       return AvroOutputFormat.class;
     } else if (options.getFileLayout()
         == SqoopOptions.FileLayout.ParquetFile) {
-      return DatasetKeyOutputFormat.class;
+      return parquetImportJobConfigurator.getOutputFormatClass();
     }
 
     return null;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
index fb5d054..17c9ed3 100644
--- a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
@@ -49,6 +49,8 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Date;
 
+import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_OUTPUT_CODEC_KEY;
+
 /**
  * Base class for running an import MapReduce job.
  * Allows dependency injection, etc, for easy customization of import job types.
@@ -149,7 +151,7 @@ public class ImportJobBase extends JobBase {
           Configuration conf = job.getConfiguration();
           String shortName = CodecMap.getCodecShortNameByName(codecName, conf);
           if (!shortName.equalsIgnoreCase("default")) {
-            conf.set(ParquetJob.CONF_OUTPUT_CODEC, shortName);
+            conf.set(SQOOP_PARQUET_OUTPUT_CODEC_KEY, shortName);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/JdbcCallExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcCallExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcCallExportJob.java
index b7eea93..be82aed 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcCallExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcCallExportJob.java
@@ -32,6 +32,7 @@ import org.apache.sqoop.mapreduce.db.DBOutputFormat;
 import org.apache.sqoop.manager.ConnManager;
 import org.apache.sqoop.manager.ExportJobContext;
 import com.google.common.base.Strings;
+import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
 
 /**
  * Run an export using JDBC (JDBC-based ExportCallOutputFormat) to
@@ -43,15 +44,16 @@ public class JdbcCallExportJob extends JdbcExportJob {
   public static final Log LOG = LogFactory.getLog(
       JdbcCallExportJob.class.getName());
 
-  public JdbcCallExportJob(final ExportJobContext context) {
-    super(context, null, null, ExportCallOutputFormat.class);
+  public JdbcCallExportJob(final ExportJobContext context, final ParquetExportJobConfigurator parquetExportJobConfigurator) {
+    super(context, null, null, ExportCallOutputFormat.class, parquetExportJobConfigurator);
   }
 
   public JdbcCallExportJob(final ExportJobContext ctxt,
       final Class<? extends Mapper> mapperClass,
       final Class<? extends InputFormat> inputFormatClass,
-      final Class<? extends OutputFormat> outputFormatClass) {
-    super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
+      final Class<? extends OutputFormat> outputFormatClass,
+      final ParquetExportJobConfigurator parquetExportJobConfigurator) {
+    super(ctxt, mapperClass, inputFormatClass, outputFormatClass, parquetExportJobConfigurator);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
index 3719836..e283548 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
@@ -32,11 +32,10 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
-import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
+import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
 
 import java.io.IOException;
 import java.util.Map;
-import org.apache.sqoop.util.FileSystemUtil;
 
 /**
  * Run an export using JDBC (JDBC-based ExportOutputFormat).
@@ -45,18 +44,23 @@ public class JdbcExportJob extends ExportJobBase {
 
   private FileType fileType;
 
+  private ParquetExportJobConfigurator parquetExportJobConfigurator;
+
   public static final Log LOG = LogFactory.getLog(
       JdbcExportJob.class.getName());
 
-  public JdbcExportJob(final ExportJobContext context) {
+  public JdbcExportJob(final ExportJobContext context, final ParquetExportJobConfigurator parquetExportJobConfigurator) {
     super(context);
+    this.parquetExportJobConfigurator = parquetExportJobConfigurator;
   }
 
   public JdbcExportJob(final ExportJobContext ctxt,
       final Class<? extends Mapper> mapperClass,
       final Class<? extends InputFormat> inputFormatClass,
-      final Class<? extends OutputFormat> outputFormatClass) {
+      final Class<? extends OutputFormat> outputFormatClass,
+      final ParquetExportJobConfigurator parquetExportJobConfigurator) {
     super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
+    this.parquetExportJobConfigurator = parquetExportJobConfigurator;
   }
 
   @Override
@@ -78,8 +82,7 @@ public class JdbcExportJob extends ExportJobBase {
     } else if (fileType == FileType.PARQUET_FILE) {
       LOG.debug("Configuring for Parquet export");
       configureGenericRecordExportInputFormat(job, tableName);
-      String uri = "dataset:" + FileSystemUtil.makeQualified(getInputPath(), job.getConfiguration());
-      DatasetKeyInputFormat.configure(job).readFrom(uri);
+      parquetExportJobConfigurator.configureInputFormat(job, getInputPath());
     }
   }
 
@@ -120,7 +123,7 @@ public class JdbcExportJob extends ExportJobBase {
       case AVRO_DATA_FILE:
         return AvroInputFormat.class;
       case PARQUET_FILE:
-        return DatasetKeyInputFormat.class;
+        return parquetExportJobConfigurator.getInputFormatClass();
       default:
         return super.getInputFormatClass();
     }
@@ -137,7 +140,7 @@ public class JdbcExportJob extends ExportJobBase {
       case AVRO_DATA_FILE:
         return AvroExportMapper.class;
       case PARQUET_FILE:
-        return ParquetExportMapper.class;
+        return parquetExportJobConfigurator.getMapperClass();
       case UNKNOWN:
       default:
         return TextExportMapper.class;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
index 86069c4..f901d37 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
@@ -33,15 +33,13 @@ import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.sqoop.mapreduce.ExportJobBase.FileType;
 import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
-import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
 
 import org.apache.sqoop.manager.ConnManager;
 import org.apache.sqoop.manager.ExportJobContext;
 import org.apache.sqoop.mapreduce.db.DBConfiguration;
 import org.apache.sqoop.mapreduce.db.DBOutputFormat;
-import org.apache.sqoop.util.FileSystemUtil;
+import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
 
 /**
  * Run an update-based export using JDBC (JDBC-based UpdateOutputFormat).
@@ -53,6 +51,8 @@ public class JdbcUpdateExportJob extends ExportJobBase {
   public static final Log LOG = LogFactory.getLog(
       JdbcUpdateExportJob.class.getName());
 
+  private ParquetExportJobConfigurator parquetExportJobConfigurator;
+
   /**
    * Return an instance of the UpdateOutputFormat class object loaded
    * from the shim jar.
@@ -62,16 +62,19 @@ public class JdbcUpdateExportJob extends ExportJobBase {
     return UpdateOutputFormat.class;
   }
 
-  public JdbcUpdateExportJob(final ExportJobContext context)
+  public JdbcUpdateExportJob(final ExportJobContext context, final ParquetExportJobConfigurator parquetExportJobConfigurator)
       throws IOException {
     super(context, null, null, getUpdateOutputFormat());
+    this.parquetExportJobConfigurator = parquetExportJobConfigurator;
   }
 
   public JdbcUpdateExportJob(final ExportJobContext ctxt,
       final Class<? extends Mapper> mapperClass,
       final Class<? extends InputFormat> inputFormatClass,
-      final Class<? extends OutputFormat> outputFormatClass) {
+      final Class<? extends OutputFormat> outputFormatClass,
+      final ParquetExportJobConfigurator parquetExportJobConfigurator) {
     super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
+    this.parquetExportJobConfigurator = parquetExportJobConfigurator;
   }
 
   // Fix For Issue [SQOOP-2846]
@@ -86,7 +89,7 @@ public class JdbcUpdateExportJob extends ExportJobBase {
     case AVRO_DATA_FILE:
       return AvroExportMapper.class;
     case PARQUET_FILE:
-      return ParquetExportMapper.class;
+      return parquetExportJobConfigurator.getMapperClass();
     case UNKNOWN:
     default:
       return TextExportMapper.class;
@@ -186,8 +189,7 @@ public class JdbcUpdateExportJob extends ExportJobBase {
     } else if (fileType == FileType.PARQUET_FILE) {
       LOG.debug("Configuring for Parquet export");
       configureGenericRecordExportInputFormat(job, tableName);
-      String uri = "dataset:" + FileSystemUtil.makeQualified(getInputPath(), job.getConfiguration());
-      DatasetKeyInputFormat.configure(job).readFrom(uri);
+      parquetExportJobConfigurator.configureInputFormat(job, getInputPath());
     }
   }
 
@@ -222,7 +224,7 @@ public class JdbcUpdateExportJob extends ExportJobBase {
     case AVRO_DATA_FILE:
       return AvroInputFormat.class;
     case PARQUET_FILE:
-      return DatasetKeyInputFormat.class;
+      return parquetExportJobConfigurator.getInputFormatClass();
     default:
       return super.getInputFormatClass();
     }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java
index 9a8c17a..4db86da 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java
@@ -30,6 +30,7 @@ import org.apache.sqoop.manager.ConnManager;
 import org.apache.sqoop.manager.ExportJobContext;
 import org.apache.sqoop.mapreduce.db.DBConfiguration;
 import org.apache.sqoop.mapreduce.db.DBOutputFormat;
+import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
 
 /**
  * Run an update/insert export using JDBC (JDBC-based UpsertOutputFormat).
@@ -40,9 +41,10 @@ public class JdbcUpsertExportJob extends JdbcUpdateExportJob {
       JdbcUpsertExportJob.class.getName());
 
   public JdbcUpsertExportJob(final ExportJobContext context,
-      final Class<? extends OutputFormat> outputFormatClass)
+                             final Class<? extends OutputFormat> outputFormatClass,
+                             final ParquetExportJobConfigurator parquetExportJobConfigurator)
       throws IOException {
-    super(context, null, null, outputFormatClass);
+    super(context, null, null, outputFormatClass, parquetExportJobConfigurator);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/MergeJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeJob.java b/src/java/org/apache/sqoop/mapreduce/MergeJob.java
index bb21b64..c26a090 100644
--- a/src/java/org/apache/sqoop/mapreduce/MergeJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/MergeJob.java
@@ -19,18 +19,12 @@
 package org.apache.sqoop.mapreduce;
 
 import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.FileReader;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.FsInput;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -44,17 +38,8 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.sqoop.avro.AvroUtil;
 import org.apache.sqoop.mapreduce.ExportJobBase.FileType;
+import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
 import org.apache.sqoop.util.Jars;
-import org.kitesdk.data.Dataset;
-import org.kitesdk.data.DatasetDescriptor;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.Formats;
-import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
-import parquet.avro.AvroParquetInputFormat;
-import parquet.avro.AvroSchemaConverter;
-import parquet.hadoop.Footer;
-import parquet.hadoop.ParquetFileReader;
-import parquet.schema.MessageType;
 
 import org.apache.sqoop.SqoopOptions;
 import org.apache.sqoop.mapreduce.JobBase;
@@ -79,10 +64,11 @@ public class MergeJob extends JobBase {
    */
   public static final String MERGE_SQOOP_RECORD_KEY = "sqoop.merge.class";
 
-  public static final String PARQUET_AVRO_SCHEMA = "parquetjob.avro.schema";
+  private final ParquetMergeJobConfigurator parquetMergeJobConfigurator;
 
-  public MergeJob(final SqoopOptions opts) {
+  public MergeJob(final SqoopOptions opts, final ParquetMergeJobConfigurator parquetMergeJobConfigurator) {
     super(opts, null, null, null);
+    this.parquetMergeJobConfigurator = parquetMergeJobConfigurator;
   }
 
   public boolean runMergeJob() throws IOException {
@@ -147,7 +133,7 @@ public class MergeJob extends JobBase {
         case PARQUET_FILE:
           Path finalPath = new Path(options.getTargetDir());
           finalPath = FileSystemUtil.makeQualified(finalPath, jobConf);
-          configueParquetMergeJob(jobConf, job, oldPath, newPath, finalPath);
+          parquetMergeJobConfigurator.configureParquetMergeJob(jobConf, job, oldPath, newPath, finalPath);
           break;
         case AVRO_DATA_FILE:
           configueAvroMergeJob(conf, job, oldPath, newPath);
@@ -198,51 +184,6 @@ public class MergeJob extends JobBase {
     job.setReducerClass(MergeAvroReducer.class);
     AvroJob.setOutputSchema(job.getConfiguration(), oldPathSchema);
   }
-
-  private void configueParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath,
-      Path finalPath) throws IOException {
-    try {
-      FileSystem fileSystem = finalPath.getFileSystem(conf);
-      LOG.info("Trying to merge parquet files");
-      job.setOutputKeyClass(org.apache.avro.generic.GenericRecord.class);
-      job.setMapperClass(MergeParquetMapper.class);
-      job.setReducerClass(MergeParquetReducer.class);
-      job.setOutputValueClass(NullWritable.class);
-
-      List<Footer> footers = new ArrayList<Footer>();
-      FileStatus oldPathfileStatus = fileSystem.getFileStatus(oldPath);
-      FileStatus newPathfileStatus = fileSystem.getFileStatus(oldPath);
-      footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), oldPathfileStatus, true));
-      footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), newPathfileStatus, true));
-
-      MessageType schema = footers.get(0).getParquetMetadata().getFileMetaData().getSchema();
-      AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
-      Schema avroSchema = avroSchemaConverter.convert(schema);
-
-      if (!fileSystem.exists(finalPath)) {
-        Dataset dataset = createDataset(avroSchema, "dataset:" + finalPath);
-        DatasetKeyOutputFormat.configure(job).overwrite(dataset);
-      } else {
-        DatasetKeyOutputFormat.configure(job).overwrite(new URI("dataset:" + finalPath));
-      }
-
-      job.setInputFormatClass(AvroParquetInputFormat.class);
-      AvroParquetInputFormat.setAvroReadSchema(job, avroSchema);
-
-      conf.set(PARQUET_AVRO_SCHEMA, avroSchema.toString());
-      Class<DatasetKeyOutputFormat> outClass = DatasetKeyOutputFormat.class;
-
-      job.setOutputFormatClass(outClass);
-    } catch (Exception cnfe) {
-      throw new IOException(cnfe);
-    }
-  }
-
-  public static Dataset createDataset(Schema schema, String uri) {
-    DatasetDescriptor descriptor =
-        new DatasetDescriptor.Builder().schema(schema).format(Formats.PARQUET).build();
-    return Datasets.create(uri, descriptor, GenericRecord.class);
-  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/MergeParquetReducer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/MergeParquetReducer.java b/src/java/org/apache/sqoop/mapreduce/MergeParquetReducer.java
index caa4f5f..5939b01 100644
--- a/src/java/org/apache/sqoop/mapreduce/MergeParquetReducer.java
+++ b/src/java/org/apache/sqoop/mapreduce/MergeParquetReducer.java
@@ -27,16 +27,16 @@ import java.util.Map;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.sqoop.avro.AvroUtil;
 
 import org.apache.sqoop.lib.SqoopRecord;
 
+import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
 
-public class MergeParquetReducer extends Reducer<Text, MergeRecord,GenericRecord,NullWritable> {
+
+public abstract class MergeParquetReducer<KEYOUT, VALUEOUT> extends Reducer<Text, MergeRecord, KEYOUT, VALUEOUT> {
 
   private Schema schema = null;
   private boolean bigDecimalFormatString = true;
@@ -44,7 +44,7 @@ public class MergeParquetReducer extends Reducer<Text, MergeRecord,GenericRecord
 
     @Override
     protected void setup(Context context) throws IOException, InterruptedException {
-      schema = new Schema.Parser().parse(context.getConfiguration().get("parquetjob.avro.schema"));
+      schema = new Schema.Parser().parse(context.getConfiguration().get(SQOOP_PARQUET_AVRO_SCHEMA_KEY));
       bigDecimalFormatString = context.getConfiguration().getBoolean(
           ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT, ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
     }
@@ -67,9 +67,12 @@ public class MergeParquetReducer extends Reducer<Text, MergeRecord,GenericRecord
       }
 
       if (null != bestRecord) {
-        GenericRecord outKey = AvroUtil.toGenericRecord(bestRecord.getFieldMap(), schema,
+        GenericRecord record = AvroUtil.toGenericRecord(bestRecord.getFieldMap(), schema,
             bigDecimalFormatString);
-        context.write(outKey, null);
+        write(context, record);
       }
     }
+
+  protected abstract void write(Context context, GenericRecord record) throws IOException, InterruptedException;
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/ParquetExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetExportMapper.java b/src/java/org/apache/sqoop/mapreduce/ParquetExportMapper.java
deleted file mode 100644
index 2bc0cba..0000000
--- a/src/java/org/apache/sqoop/mapreduce/ParquetExportMapper.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.io.NullWritable;
-
-import java.io.IOException;
-
-/**
- * Exports Parquet records from a data source.
- */
-public class ParquetExportMapper
-    extends GenericRecordExportMapper<GenericRecord, NullWritable> {
-
-  @Override
-  protected void setup(Context context) throws IOException, InterruptedException {
-    super.setup(context);
-  }
-
-  @Override
-  protected void map(GenericRecord key, NullWritable val,
-      Context context) throws IOException, InterruptedException {
-    context.write(toSqoopRecord(key), NullWritable.get());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java b/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java
index 35ab495..62334f8 100644
--- a/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java
@@ -23,10 +23,7 @@ import org.apache.sqoop.lib.SqoopRecord;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.sqoop.avro.AvroUtil;
 
 import java.io.IOException;
@@ -35,9 +32,9 @@ import java.sql.SQLException;
 /**
  * Imports records by writing them to a Parquet File.
  */
-public class ParquetImportMapper
+public abstract class ParquetImportMapper<KEYOUT, VALOUT>
     extends AutoProgressMapper<LongWritable, SqoopRecord,
-        GenericRecord, NullWritable> {
+    KEYOUT, VALOUT> {
 
   private Schema schema = null;
   private boolean bigDecimalFormatString = true;
@@ -47,11 +44,11 @@ public class ParquetImportMapper
   protected void setup(Context context)
       throws IOException, InterruptedException {
     Configuration conf = context.getConfiguration();
-    schema = ParquetJob.getAvroSchema(conf);
+    schema = getAvroSchema(conf);
     bigDecimalFormatString = conf.getBoolean(
         ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
         ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
-    lobLoader = new LargeObjectLoader(conf, new Path(conf.get("sqoop.kite.lob.extern.dir", "/tmp/sqoop-parquet-" + context.getTaskAttemptID())));
+    lobLoader = createLobLoader(context);
   }
 
   @Override
@@ -64,9 +61,9 @@ public class ParquetImportMapper
       throw new IOException(sqlE);
     }
 
-    GenericRecord outKey = AvroUtil.toGenericRecord(val.getFieldMap(), schema,
+    GenericRecord record = AvroUtil.toGenericRecord(val.getFieldMap(), schema,
         bigDecimalFormatString);
-    context.write(outKey, null);
+    write(context, record);
   }
 
   @Override
@@ -76,4 +73,9 @@ public class ParquetImportMapper
     }
   }
 
+  protected abstract LargeObjectLoader createLobLoader(Context context) throws IOException, InterruptedException;
+
+  protected abstract Schema getAvroSchema(Configuration configuration);
+
+  protected abstract void write(Context context, GenericRecord record) throws IOException, InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
deleted file mode 100644
index 4604773..0000000
--- a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.mapreduce;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.sqoop.avro.AvroSchemaMismatchException;
-import org.apache.sqoop.hive.HiveConfig;
-import org.kitesdk.data.CompressionType;
-import org.kitesdk.data.Dataset;
-import org.kitesdk.data.DatasetDescriptor;
-import org.kitesdk.data.Datasets;
-import org.kitesdk.data.Formats;
-import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
-import org.kitesdk.data.spi.SchemaValidationUtil;
-
-import java.io.IOException;
-import java.lang.reflect.Method;
-
-/**
- * Helper class for setting up a Parquet MapReduce job.
- */
-public final class ParquetJob {
-
-  public static final Log LOG = LogFactory.getLog(ParquetJob.class.getName());
-
-  public static final String HIVE_METASTORE_CLIENT_CLASS = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient";
-
-  public static final String HIVE_METASTORE_SASL_ENABLED = "hive.metastore.sasl.enabled";
-  // Purposefully choosing the same token alias as the one Oozie chooses.
-  // Make sure we don't generate a new delegation token if oozie
-  // has already generated one.
-  public static final String HIVE_METASTORE_TOKEN_ALIAS = "HCat Token";
-
-  public static final String INCOMPATIBLE_AVRO_SCHEMA_MSG = "Target dataset was created with an incompatible Avro schema. ";
-
-  public static final String HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG = "You tried to import to an already existing Hive table in " +
-      "Parquet format. Sqoop maps date/timestamp SQL types to int/bigint Hive types during Hive Parquet import" +
-      " but it is possible that date/timestamp types were mapped to strings during table" +
-      " creation. Consider using Sqoop option --map-column-java resolve the mismatch" +
-      " (e.g. --map-column-java date_field1=String,timestamp_field1=String).";
-
-  private static final String HIVE_URI_PREFIX = "dataset:hive";
-
-  private ParquetJob() {
-  }
-
-  private static final String CONF_AVRO_SCHEMA = "parquetjob.avro.schema";
-  static final String CONF_OUTPUT_CODEC = "parquetjob.output.codec";
-  enum WriteMode {
-    DEFAULT, APPEND, OVERWRITE
-  };
-
-  public static Schema getAvroSchema(Configuration conf) {
-    return new Schema.Parser().parse(conf.get(CONF_AVRO_SCHEMA));
-  }
-
-  public static CompressionType getCompressionType(Configuration conf) {
-    CompressionType defaults = Formats.PARQUET.getDefaultCompressionType();
-    String codec = conf.get(CONF_OUTPUT_CODEC, defaults.getName());
-    try {
-      return CompressionType.forName(codec);
-    } catch (IllegalArgumentException ex) {
-      LOG.warn(String.format(
-          "Unsupported compression type '%s'. Fallback to '%s'.",
-          codec, defaults));
-    }
-    return defaults;
-  }
-
-  /**
-   * Configure the import job. The import process will use a Kite dataset to
-   * write data records into Parquet format internally. The input key class is
-   * {@link org.apache.sqoop.lib.SqoopRecord}. The output key is
-   * {@link org.apache.avro.generic.GenericRecord}.
-   */
-  public static void configureImportJob(JobConf conf, Schema schema,
-      String uri, WriteMode writeMode) throws IOException {
-    Dataset dataset;
-
-    // Add hive delegation token only if we don't already have one.
-    if (isHiveImport(uri)) {
-      Configuration hiveConf = HiveConfig.getHiveConf(conf);
-      if (isSecureMetastore(hiveConf)) {
-        // Copy hive configs to job config
-        HiveConfig.addHiveConfigs(hiveConf, conf);
-
-        if (conf.getCredentials().getToken(new Text(HIVE_METASTORE_TOKEN_ALIAS)) == null) {
-          addHiveDelegationToken(conf);
-        }
-      }
-    }
-
-    if (Datasets.exists(uri)) {
-      if (WriteMode.DEFAULT.equals(writeMode)) {
-        throw new IOException("Destination exists! " + uri);
-      }
-
-      dataset = Datasets.load(uri);
-      Schema writtenWith = dataset.getDescriptor().getSchema();
-      if (!SchemaValidationUtil.canRead(writtenWith, schema)) {
-        String exceptionMessage = buildAvroSchemaMismatchMessage(isHiveImport(uri));
-        throw new AvroSchemaMismatchException(exceptionMessage, writtenWith, schema);
-      }
-    } else {
-      dataset = createDataset(schema, getCompressionType(conf), uri);
-    }
-    conf.set(CONF_AVRO_SCHEMA, schema.toString());
-
-    DatasetKeyOutputFormat.ConfigBuilder builder =
-        DatasetKeyOutputFormat.configure(conf);
-    if (WriteMode.OVERWRITE.equals(writeMode)) {
-      builder.overwrite(dataset);
-    } else if (WriteMode.APPEND.equals(writeMode)) {
-      builder.appendTo(dataset);
-    } else {
-      builder.writeTo(dataset);
-    }
-  }
-
-  private static boolean isHiveImport(String importUri) {
-    return importUri.startsWith(HIVE_URI_PREFIX);
-  }
-
-  public static Dataset createDataset(Schema schema,
-      CompressionType compressionType, String uri) {
-    DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
-        .schema(schema)
-        .format(Formats.PARQUET)
-        .compressionType(compressionType)
-        .build();
-    return Datasets.create(uri, descriptor, GenericRecord.class);
-  }
-
-  private static boolean isSecureMetastore(Configuration conf) {
-    return conf != null && conf.getBoolean(HIVE_METASTORE_SASL_ENABLED, false);
-  }
-
-  /**
-   * Add hive delegation token to credentials store.
-   * @param conf
-   */
-  private static void addHiveDelegationToken(JobConf conf) {
-    // Need to use reflection since there's no compile time dependency on the client libs.
-    Class<?> HiveConfClass;
-    Class<?> HiveMetaStoreClientClass;
-
-    try {
-      HiveMetaStoreClientClass = Class.forName(HIVE_METASTORE_CLIENT_CLASS);
-    } catch (ClassNotFoundException ex) {
-      LOG.error("Could not load " + HIVE_METASTORE_CLIENT_CLASS
-          + " when adding hive delegation token. "
-          + "Make sure HIVE_CONF_DIR is set correctly.", ex);
-      throw new RuntimeException("Couldn't fetch delegation token.", ex);
-    }
-
-    try {
-      HiveConfClass = Class.forName(HiveConfig.HIVE_CONF_CLASS);
-    } catch (ClassNotFoundException ex) {
-      LOG.error("Could not load " + HiveConfig.HIVE_CONF_CLASS
-          + " when adding hive delegation token."
-          + " Make sure HIVE_CONF_DIR is set correctly.", ex);
-      throw new RuntimeException("Couldn't fetch delegation token.", ex);
-    }
-
-    try {
-      Object client = HiveMetaStoreClientClass.getConstructor(HiveConfClass).newInstance(
-          HiveConfClass.getConstructor(Configuration.class, Class.class).newInstance(conf, Configuration.class)
-      );
-      // getDelegationToken(String kerberosPrincial)
-      Method getDelegationTokenMethod = HiveMetaStoreClientClass.getMethod("getDelegationToken", String.class);
-      Object tokenStringForm = getDelegationTokenMethod.invoke(client, UserGroupInformation.getLoginUser().getShortUserName());
-
-      // Load token
-      Token<DelegationTokenIdentifier> metastoreToken = new Token<DelegationTokenIdentifier>();
-      metastoreToken.decodeFromUrlString(tokenStringForm.toString());
-      conf.getCredentials().addToken(new Text(HIVE_METASTORE_TOKEN_ALIAS), metastoreToken);
-
-      LOG.debug("Successfully fetched hive metastore delegation token. " + metastoreToken);
-    } catch (Exception ex) {
-      LOG.error("Couldn't fetch delegation token.", ex);
-      throw new RuntimeException("Couldn't fetch delegation token.", ex);
-    }
-  }
-
-  private static String buildAvroSchemaMismatchMessage(boolean hiveImport) {
-    String exceptionMessage = INCOMPATIBLE_AVRO_SCHEMA_MSG;
-
-    if (hiveImport) {
-      exceptionMessage += HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG;
-    }
-
-    return exceptionMessage;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java
index 7e975c7..8ef30d3 100644
--- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java
@@ -30,6 +30,7 @@ import org.apache.sqoop.SqoopOptions;
 import org.apache.sqoop.manager.ImportJobContext;
 
 import org.apache.sqoop.mapreduce.DataDrivenImportJob;
+import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
 
 /**
  * Import data from a mainframe dataset, using MainframeDatasetInputFormat.
@@ -39,8 +40,8 @@ public class MainframeImportJob extends DataDrivenImportJob {
   private static final Log LOG = LogFactory.getLog(
       MainframeImportJob.class.getName());
 
-  public MainframeImportJob(final SqoopOptions opts, ImportJobContext context) {
-    super(opts, MainframeDatasetInputFormat.class, context);
+  public MainframeImportJob(final SqoopOptions opts, ImportJobContext context, ParquetImportJobConfigurator parquetImportJobConfigurator) {
+    super(opts, MainframeDatasetInputFormat.class, context, parquetImportJobConfigurator);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/ParquetConstants.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetConstants.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetConstants.java
new file mode 100644
index 0000000..ae53a96
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetConstants.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.sqoop.mapreduce.parquet;
+
+public final class ParquetConstants {
+
+  public static final String SQOOP_PARQUET_AVRO_SCHEMA_KEY = "parquetjob.avro.schema";
+
+  public static final String SQOOP_PARQUET_OUTPUT_CODEC_KEY = "parquetjob.output.codec";
+
+  private ParquetConstants() {
+    throw new AssertionError("This class is meant for static use only.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/ParquetExportJobConfigurator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetExportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetExportJobConfigurator.java
new file mode 100644
index 0000000..8d7b87f
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetExportJobConfigurator.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.io.IOException;
+
+public interface ParquetExportJobConfigurator {
+
+  void configureInputFormat(Job job, Path inputPath) throws IOException;
+
+  Class<? extends Mapper> getMapperClass();
+
+  Class<? extends InputFormat> getInputFormatClass();
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/ParquetImportJobConfigurator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetImportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetImportJobConfigurator.java
new file mode 100644
index 0000000..fa1bc7d
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetImportJobConfigurator.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.SqoopOptions;
+
+import java.io.IOException;
+
+public interface ParquetImportJobConfigurator {
+
+  void configureMapper(Job job, Schema schema, SqoopOptions options, String tableName, Path destination) throws IOException;
+
+  Class<? extends Mapper> getMapperClass();
+
+  Class<? extends OutputFormat> getOutputFormatClass();
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactory.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactory.java
new file mode 100644
index 0000000..ed5103f
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactory.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet;
+
+public interface ParquetJobConfiguratorFactory {
+
+  ParquetImportJobConfigurator createParquetImportJobConfigurator();
+
+  ParquetExportJobConfigurator createParquetExportJobConfigurator();
+
+  ParquetMergeJobConfigurator createParquetMergeJobConfigurator();
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactoryProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactoryProvider.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactoryProvider.java
new file mode 100644
index 0000000..2286a52
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetJobConfiguratorFactoryProvider.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.mapreduce.parquet.kite.KiteParquetJobConfiguratorFactory;
+
+public final class ParquetJobConfiguratorFactoryProvider {
+
+  private ParquetJobConfiguratorFactoryProvider() {
+    throw new AssertionError("This class is meant for static use only.");
+  }
+
+  public static ParquetJobConfiguratorFactory createParquetJobConfiguratorFactory(Configuration configuration) {
+    return new KiteParquetJobConfiguratorFactory();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/ParquetMergeJobConfigurator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/ParquetMergeJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetMergeJobConfigurator.java
new file mode 100644
index 0000000..67fdf66
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/ParquetMergeJobConfigurator.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import java.io.IOException;
+
+public interface ParquetMergeJobConfigurator {
+
+  void configureParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath, Path finalPath) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java
new file mode 100644
index 0000000..7f21205
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteMergeParquetReducer.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.kite;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.sqoop.mapreduce.MergeParquetReducer;
+
+import java.io.IOException;
+
+public class KiteMergeParquetReducer extends MergeParquetReducer<GenericRecord, NullWritable> {
+
+  @Override
+  protected void write(Context context, GenericRecord record) throws IOException, InterruptedException {
+    context.write(record, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java
new file mode 100644
index 0000000..ca02c7b
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportJobConfigurator.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.kite;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
+import org.apache.sqoop.util.FileSystemUtil;
+import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
+
+import java.io.IOException;
+
+public class KiteParquetExportJobConfigurator implements ParquetExportJobConfigurator {
+
+  @Override
+  public void configureInputFormat(Job job, Path inputPath) throws IOException {
+    String uri = "dataset:" + FileSystemUtil.makeQualified(inputPath, job.getConfiguration());
+    DatasetKeyInputFormat.configure(job).readFrom(uri);
+  }
+
+  @Override
+  public Class<? extends Mapper> getMapperClass() {
+    return KiteParquetExportMapper.class;
+  }
+
+  @Override
+  public Class<? extends InputFormat> getInputFormatClass() {
+    return DatasetKeyInputFormat.class;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java
new file mode 100644
index 0000000..25555d8
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetExportMapper.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.kite;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.sqoop.mapreduce.GenericRecordExportMapper;
+
+import java.io.IOException;
+
+/**
+ * Exports Parquet records from a data source.
+ */
+public class KiteParquetExportMapper extends GenericRecordExportMapper<GenericRecord, NullWritable> {
+
+  @Override
+  protected void map(GenericRecord key, NullWritable val, Context context) throws IOException, InterruptedException {
+    context.write(toSqoopRecord(key), NullWritable.get());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java
new file mode 100644
index 0000000..87828d1
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportJobConfigurator.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.kite;
+
+import org.apache.avro.Schema;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
+import org.apache.sqoop.util.FileSystemUtil;
+import org.kitesdk.data.Datasets;
+import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
+
+import java.io.IOException;
+
+public class KiteParquetImportJobConfigurator implements ParquetImportJobConfigurator {
+
+  public static final Log LOG = LogFactory.getLog(KiteParquetImportJobConfigurator.class.getName());
+
+  @Override
+  public void configureMapper(Job job, Schema schema, SqoopOptions options, String tableName, Path destination) throws IOException {
+    JobConf conf = (JobConf) job.getConfiguration();
+    String uri = getKiteUri(conf, options, tableName, destination);
+    KiteParquetUtils.WriteMode writeMode;
+
+    if (options.doHiveImport()) {
+      if (options.doOverwriteHiveTable()) {
+        writeMode = KiteParquetUtils.WriteMode.OVERWRITE;
+      } else {
+        writeMode = KiteParquetUtils.WriteMode.APPEND;
+        if (Datasets.exists(uri)) {
+          LOG.warn("Target Hive table '" + tableName + "' exists! Sqoop will " +
+              "append data into the existing Hive table. Consider using " +
+              "--hive-overwrite, if you do NOT intend to do appending.");
+        }
+      }
+    } else {
+      // Note that there is no such an import argument for overwriting HDFS
+      // dataset, so overwrite mode is not supported yet.
+      // Sqoop's append mode means to merge two independent datasets. We
+      // choose DEFAULT as write mode.
+      writeMode = KiteParquetUtils.WriteMode.DEFAULT;
+    }
+    KiteParquetUtils.configureImportJob(conf, schema, uri, writeMode);
+  }
+
+  @Override
+  public Class<? extends Mapper> getMapperClass() {
+    return KiteParquetImportMapper.class;
+  }
+
+  @Override
+  public Class<? extends OutputFormat> getOutputFormatClass() {
+    return DatasetKeyOutputFormat.class;
+  }
+
+  private String getKiteUri(Configuration conf, SqoopOptions options, String tableName, Path destination) throws IOException {
+    if (options.doHiveImport()) {
+      String hiveDatabase = options.getHiveDatabaseName() == null ? "default" :
+          options.getHiveDatabaseName();
+      String hiveTable = options.getHiveTableName() == null ? tableName :
+          options.getHiveTableName();
+      return String.format("dataset:hive:/%s/%s", hiveDatabase, hiveTable);
+    } else {
+      return "dataset:" + FileSystemUtil.makeQualified(destination, conf);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java
new file mode 100644
index 0000000..20adf6e
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetImportMapper.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.kite;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.avro.AvroUtil;
+import org.apache.sqoop.lib.LargeObjectLoader;
+import org.apache.sqoop.mapreduce.ParquetImportMapper;
+
+import java.io.IOException;
+
+import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
+
+public class KiteParquetImportMapper extends ParquetImportMapper<GenericRecord, Void> {
+
+  @Override
+  protected LargeObjectLoader createLobLoader(Context context) throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    Path workPath = new Path(conf.get("sqoop.kite.lob.extern.dir", "/tmp/sqoop-parquet-" + context.getTaskAttemptID()));
+    return new LargeObjectLoader(conf, workPath);
+  }
+
+  @Override
+  protected Schema getAvroSchema(Configuration configuration) {
+    String schemaString = configuration.get(SQOOP_PARQUET_AVRO_SCHEMA_KEY);
+    return AvroUtil.parseAvroSchema(schemaString);
+  }
+
+  @Override
+  protected void write(Context context, GenericRecord record) throws IOException, InterruptedException {
+    context.write(record, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java
new file mode 100644
index 0000000..055e116
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetJobConfiguratorFactory.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.kite;
+
+import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
+import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
+import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory;
+import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
+
+public class KiteParquetJobConfiguratorFactory implements ParquetJobConfiguratorFactory {
+
+  @Override
+  public ParquetImportJobConfigurator createParquetImportJobConfigurator() {
+    return new KiteParquetImportJobConfigurator();
+  }
+
+  @Override
+  public ParquetExportJobConfigurator createParquetExportJobConfigurator() {
+    return new KiteParquetExportJobConfigurator();
+  }
+
+  @Override
+  public ParquetMergeJobConfigurator createParquetMergeJobConfigurator() {
+    return new KiteParquetMergeJobConfigurator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java
new file mode 100644
index 0000000..9fecf28
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetMergeJobConfigurator.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.kite;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.sqoop.mapreduce.MergeParquetMapper;
+import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.DatasetDescriptor;
+import org.kitesdk.data.Datasets;
+import org.kitesdk.data.Formats;
+import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
+import parquet.avro.AvroParquetInputFormat;
+import parquet.avro.AvroSchemaConverter;
+import parquet.hadoop.Footer;
+import parquet.hadoop.ParquetFileReader;
+import parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
+
+public class KiteParquetMergeJobConfigurator implements ParquetMergeJobConfigurator {
+
+  public static final Log LOG = LogFactory.getLog(KiteParquetMergeJobConfigurator.class.getName());
+
+  @Override
+  public void configureParquetMergeJob(Configuration conf, Job job, Path oldPath, Path newPath,
+                                       Path finalPath) throws IOException {
+    try {
+      FileSystem fileSystem = finalPath.getFileSystem(conf);
+      LOG.info("Trying to merge parquet files");
+      job.setOutputKeyClass(GenericRecord.class);
+      job.setMapperClass(MergeParquetMapper.class);
+      job.setReducerClass(KiteMergeParquetReducer.class);
+      job.setOutputValueClass(NullWritable.class);
+
+      List<Footer> footers = new ArrayList<Footer>();
+      FileStatus oldPathfileStatus = fileSystem.getFileStatus(oldPath);
+      FileStatus newPathfileStatus = fileSystem.getFileStatus(oldPath);
+      footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), oldPathfileStatus, true));
+      footers.addAll(ParquetFileReader.readFooters(job.getConfiguration(), newPathfileStatus, true));
+
+      MessageType schema = footers.get(0).getParquetMetadata().getFileMetaData().getSchema();
+      AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
+      Schema avroSchema = avroSchemaConverter.convert(schema);
+
+      if (!fileSystem.exists(finalPath)) {
+        Dataset dataset = createDataset(avroSchema, "dataset:" + finalPath);
+        DatasetKeyOutputFormat.configure(job).overwrite(dataset);
+      } else {
+        DatasetKeyOutputFormat.configure(job).overwrite(new URI("dataset:" + finalPath));
+      }
+
+      job.setInputFormatClass(AvroParquetInputFormat.class);
+      AvroParquetInputFormat.setAvroReadSchema(job, avroSchema);
+
+      conf.set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, avroSchema.toString());
+      Class<DatasetKeyOutputFormat> outClass = DatasetKeyOutputFormat.class;
+
+      job.setOutputFormatClass(outClass);
+    } catch (Exception cnfe) {
+      throw new IOException(cnfe);
+    }
+  }
+
+  public static Dataset createDataset(Schema schema, String uri) {
+    DatasetDescriptor descriptor =
+        new DatasetDescriptor.Builder().schema(schema).format(Formats.PARQUET).build();
+    return Datasets.create(uri, descriptor, GenericRecord.class);
+  }
+}


Mime
View raw message