sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [1/2] sqoop git commit: SQOOP-3319: Extract code using Kite into separate classes
Date Tue, 29 May 2018 08:20:23 GMT
Repository: sqoop
Updated Branches:
  refs/heads/trunk ad7d046ef -> 3233db8e1


http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetUtils.java b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetUtils.java
new file mode 100644
index 0000000..e68bba9
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/parquet/kite/KiteParquetUtils.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.parquet.kite;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.sqoop.avro.AvroSchemaMismatchException;
+import org.apache.sqoop.hive.HiveConfig;
+import org.kitesdk.data.CompressionType;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.DatasetDescriptor;
+import org.kitesdk.data.Datasets;
+import org.kitesdk.data.Formats;
+import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat;
+import org.kitesdk.data.spi.SchemaValidationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+
+import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_AVRO_SCHEMA_KEY;
+import static org.apache.sqoop.mapreduce.parquet.ParquetConstants.SQOOP_PARQUET_OUTPUT_CODEC_KEY;
+
+/**
+ * Helper class for setting up a Parquet MapReduce job.
+ */
+public final class KiteParquetUtils {
+
+  public static final Log LOG = LogFactory.getLog(KiteParquetUtils.class.getName());
+
+  public static final String HIVE_METASTORE_CLIENT_CLASS = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient";
+
+  public static final String HIVE_METASTORE_SASL_ENABLED = "hive.metastore.sasl.enabled";
+  // Purposefully choosing the same token alias as the one Oozie chooses.
+  // Make sure we don't generate a new delegation token if oozie
+  // has already generated one.
+  public static final String HIVE_METASTORE_TOKEN_ALIAS = "HCat Token";
+
+  public static final String INCOMPATIBLE_AVRO_SCHEMA_MSG = "Target dataset was created with
an incompatible Avro schema. ";
+
+  public static final String HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG = "You tried to import to
an already existing Hive table in " +
+      "Parquet format. Sqoop maps date/timestamp SQL types to int/bigint Hive types during
Hive Parquet import" +
+      " but it is possible that date/timestamp types were mapped to strings during table"
+
+      " creation. Consider using Sqoop option --map-column-java resolve the mismatch" +
+      " (e.g. --map-column-java date_field1=String,timestamp_field1=String).";
+
+  private static final String HIVE_URI_PREFIX = "dataset:hive";
+
+  private KiteParquetUtils() {
+  }
+
+  public enum WriteMode {
+    DEFAULT, APPEND, OVERWRITE
+  };
+
+  public static CompressionType getCompressionType(Configuration conf) {
+    CompressionType defaults = Formats.PARQUET.getDefaultCompressionType();
+    String codec = conf.get(SQOOP_PARQUET_OUTPUT_CODEC_KEY, defaults.getName());
+    try {
+      return CompressionType.forName(codec);
+    } catch (IllegalArgumentException ex) {
+      LOG.warn(String.format(
+          "Unsupported compression type '%s'. Fallback to '%s'.",
+          codec, defaults));
+    }
+    return defaults;
+  }
+
+  /**
+   * Configure the import job. The import process will use a Kite dataset to
+   * write data records into Parquet format internally. The input key class is
+   * {@link org.apache.sqoop.lib.SqoopRecord}. The output key is
+   * {@link org.apache.avro.generic.GenericRecord}.
+   */
+  public static void configureImportJob(JobConf conf, Schema schema,
+      String uri, WriteMode writeMode) throws IOException {
+    Dataset dataset;
+
+    // Add hive delegation token only if we don't already have one.
+    if (isHiveImport(uri)) {
+      Configuration hiveConf = HiveConfig.getHiveConf(conf);
+      if (isSecureMetastore(hiveConf)) {
+        // Copy hive configs to job config
+        HiveConfig.addHiveConfigs(hiveConf, conf);
+
+        if (conf.getCredentials().getToken(new Text(HIVE_METASTORE_TOKEN_ALIAS)) == null)
{
+          addHiveDelegationToken(conf);
+        }
+      }
+    }
+
+    if (Datasets.exists(uri)) {
+      if (WriteMode.DEFAULT.equals(writeMode)) {
+        throw new IOException("Destination exists! " + uri);
+      }
+
+      dataset = Datasets.load(uri);
+      Schema writtenWith = dataset.getDescriptor().getSchema();
+      if (!SchemaValidationUtil.canRead(writtenWith, schema)) {
+        String exceptionMessage = buildAvroSchemaMismatchMessage(isHiveImport(uri));
+        throw new AvroSchemaMismatchException(exceptionMessage, writtenWith, schema);
+      }
+    } else {
+      dataset = createDataset(schema, getCompressionType(conf), uri);
+    }
+    conf.set(SQOOP_PARQUET_AVRO_SCHEMA_KEY, schema.toString());
+
+    DatasetKeyOutputFormat.ConfigBuilder builder =
+        DatasetKeyOutputFormat.configure(conf);
+    if (WriteMode.OVERWRITE.equals(writeMode)) {
+      builder.overwrite(dataset);
+    } else if (WriteMode.APPEND.equals(writeMode)) {
+      builder.appendTo(dataset);
+    } else {
+      builder.writeTo(dataset);
+    }
+  }
+
+  private static boolean isHiveImport(String importUri) {
+    return importUri.startsWith(HIVE_URI_PREFIX);
+  }
+
+  public static Dataset createDataset(Schema schema,
+      CompressionType compressionType, String uri) {
+    DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
+        .schema(schema)
+        .format(Formats.PARQUET)
+        .compressionType(compressionType)
+        .build();
+    return Datasets.create(uri, descriptor, GenericRecord.class);
+  }
+
+  private static boolean isSecureMetastore(Configuration conf) {
+    return conf != null && conf.getBoolean(HIVE_METASTORE_SASL_ENABLED, false);
+  }
+
+  /**
+   * Add hive delegation token to credentials store.
+   * @param conf
+   */
+  private static void addHiveDelegationToken(JobConf conf) {
+    // Need to use reflection since there's no compile time dependency on the client libs.
+    Class<?> HiveConfClass;
+    Class<?> HiveMetaStoreClientClass;
+
+    try {
+      HiveMetaStoreClientClass = Class.forName(HIVE_METASTORE_CLIENT_CLASS);
+    } catch (ClassNotFoundException ex) {
+      LOG.error("Could not load " + HIVE_METASTORE_CLIENT_CLASS
+          + " when adding hive delegation token. "
+          + "Make sure HIVE_CONF_DIR is set correctly.", ex);
+      throw new RuntimeException("Couldn't fetch delegation token.", ex);
+    }
+
+    try {
+      HiveConfClass = Class.forName(HiveConfig.HIVE_CONF_CLASS);
+    } catch (ClassNotFoundException ex) {
+      LOG.error("Could not load " + HiveConfig.HIVE_CONF_CLASS
+          + " when adding hive delegation token."
+          + " Make sure HIVE_CONF_DIR is set correctly.", ex);
+      throw new RuntimeException("Couldn't fetch delegation token.", ex);
+    }
+
+    try {
+      Object client = HiveMetaStoreClientClass.getConstructor(HiveConfClass).newInstance(
+          HiveConfClass.getConstructor(Configuration.class, Class.class).newInstance(conf,
Configuration.class)
+      );
+      // getDelegationToken(String kerberosPrincial)
+      Method getDelegationTokenMethod = HiveMetaStoreClientClass.getMethod("getDelegationToken",
String.class);
+      Object tokenStringForm = getDelegationTokenMethod.invoke(client, UserGroupInformation.getLoginUser().getShortUserName());
+
+      // Load token
+      Token<DelegationTokenIdentifier> metastoreToken = new Token<DelegationTokenIdentifier>();
+      metastoreToken.decodeFromUrlString(tokenStringForm.toString());
+      conf.getCredentials().addToken(new Text(HIVE_METASTORE_TOKEN_ALIAS), metastoreToken);
+
+      LOG.debug("Successfully fetched hive metastore delegation token. " + metastoreToken);
+    } catch (Exception ex) {
+      LOG.error("Couldn't fetch delegation token.", ex);
+      throw new RuntimeException("Couldn't fetch delegation token.", ex);
+    }
+  }
+
+  private static String buildAvroSchemaMismatchMessage(boolean hiveImport) {
+    String exceptionMessage = INCOMPATIBLE_AVRO_SCHEMA_MSG;
+
+    if (hiveImport) {
+      exceptionMessage += HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG;
+    }
+
+    return exceptionMessage;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java
index e4b1350..ea2b064 100644
--- a/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PostgreSQLCopyExportJob.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.sqoop.lib.DelimiterSet;
 import org.apache.sqoop.mapreduce.JdbcExportJob;
-
+import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
 
 
 /**
@@ -42,15 +42,16 @@ public class PostgreSQLCopyExportJob extends JdbcExportJob {
   public static final Log LOG =
     LogFactory.getLog(PostgreSQLCopyExportJob.class.getName());
 
-  public PostgreSQLCopyExportJob(final ExportJobContext context) {
-    super(context);
+  public PostgreSQLCopyExportJob(final ExportJobContext context, final ParquetExportJobConfigurator
parquetExportJobConfigurator) {
+    super(context, parquetExportJobConfigurator);
   }
 
   public PostgreSQLCopyExportJob(final ExportJobContext ctxt,
       final Class<? extends Mapper> mapperClass,
       final Class<? extends InputFormat> inputFormatClass,
-      final Class<? extends OutputFormat> outputFormatClass) {
-    super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
+      final Class<? extends OutputFormat> outputFormatClass,
+      final ParquetExportJobConfigurator parquetExportJobConfigurator) {
+    super(ctxt, mapperClass, inputFormatClass, outputFormatClass, parquetExportJobConfigurator);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
index 783651a..c62ee98 100644
--- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
+++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
@@ -34,9 +34,12 @@ import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.OptionGroup;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.sqoop.manager.SupportedManagers;
 import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
+import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactory;
+import org.apache.sqoop.mapreduce.parquet.ParquetJobConfiguratorFactoryProvider;
 import org.apache.sqoop.util.CredentialsUtil;
 import org.apache.sqoop.util.LoggingUtils;
 import org.apache.sqoop.util.password.CredentialProviderHelper;
@@ -1904,4 +1907,8 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool
{
     }
 
   }
+
+  public ParquetJobConfiguratorFactory getParquetJobConfigurator(Configuration configuration)
{
+    return ParquetJobConfiguratorFactoryProvider.createParquetJobConfiguratorFactory(configuration);
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/tool/ImportTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java
index ee79d8b..2c474b7 100644
--- a/src/java/org/apache/sqoop/tool/ImportTool.java
+++ b/src/java/org/apache/sqoop/tool/ImportTool.java
@@ -46,6 +46,7 @@ import org.apache.sqoop.hive.HiveClient;
 import org.apache.sqoop.hive.HiveClientFactory;
 import org.apache.sqoop.manager.ImportJobContext;
 import org.apache.sqoop.mapreduce.MergeJob;
+import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
 import org.apache.sqoop.metastore.JobData;
 import org.apache.sqoop.metastore.JobStorage;
 import org.apache.sqoop.metastore.JobStorageFactory;
@@ -472,7 +473,8 @@ public class ImportTool extends BaseSqoopTool {
           loadJars(options.getConf(), context.getJarFile(), context.getTableName());
         }
 
-        MergeJob mergeJob = new MergeJob(options);
+        ParquetMergeJobConfigurator parquetMergeJobConfigurator = getParquetJobConfigurator(options.getConf()).createParquetMergeJobConfigurator();
+        MergeJob mergeJob = new MergeJob(options, parquetMergeJobConfigurator);
         if (mergeJob.runMergeJob()) {
           // Rename destination directory to proper location.
           Path tmpDir = getOutputPath(options, context.getTableName());

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/java/org/apache/sqoop/tool/MergeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/MergeTool.java b/src/java/org/apache/sqoop/tool/MergeTool.java
index 311fee8..4c20f7d 100644
--- a/src/java/org/apache/sqoop/tool/MergeTool.java
+++ b/src/java/org/apache/sqoop/tool/MergeTool.java
@@ -30,6 +30,7 @@ import org.apache.sqoop.SqoopOptions.InvalidOptionsException;
 import org.apache.sqoop.cli.RelatedOptions;
 import org.apache.sqoop.cli.ToolOptions;
 import org.apache.sqoop.mapreduce.MergeJob;
+import org.apache.sqoop.mapreduce.parquet.ParquetMergeJobConfigurator;
 import org.apache.sqoop.util.LoggingUtils;
 
 /**
@@ -52,7 +53,8 @@ public class MergeTool extends BaseSqoopTool {
   public int run(SqoopOptions options) {
     try {
       // Configure and execute a MapReduce job to merge these datasets.
-      MergeJob mergeJob = new MergeJob(options);
+      ParquetMergeJobConfigurator parquetMergeJobConfigurator = getParquetJobConfigurator(options.getConf()).createParquetMergeJobConfigurator();
+      MergeJob mergeJob = new MergeJob(options, parquetMergeJobConfigurator);
       if (!mergeJob.runMergeJob()) {
         LOG.error("MapReduce job failed!");
         return 1;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/test/org/apache/sqoop/TestParquetImport.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/TestParquetImport.java b/src/test/org/apache/sqoop/TestParquetImport.java
index 0f9c7f3..27d407a 100644
--- a/src/test/org/apache/sqoop/TestParquetImport.java
+++ b/src/test/org/apache/sqoop/TestParquetImport.java
@@ -32,10 +32,12 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sqoop.util.ParquetReader;
 import org.junit.Test;
+import parquet.avro.AvroSchemaConverter;
 import parquet.format.CompressionCodec;
 import parquet.hadoop.Footer;
 import parquet.hadoop.ParquetFileReader;
 import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.schema.MessageType;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -311,8 +313,9 @@ public class TestParquetImport extends ImportJobTestCase {
   }
 
   private Schema getSchema() {
-    String schemaString = getOutputMetadata().getFileMetaData().getKeyValueMetaData().get("parquet.avro.schema");
-    return new Schema.Parser().parse(schemaString);
+    MessageType parquetSchema = getOutputMetadata().getFileMetaData().getSchema();
+    AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter();
+    return avroSchemaConverter.convert(parquetSchema);
   }
 
   private void checkField(Field field, String name, Type type) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/test/org/apache/sqoop/hive/TestHiveImport.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/hive/TestHiveImport.java b/src/test/org/apache/sqoop/hive/TestHiveImport.java
index 77674db..436f0e5 100644
--- a/src/test/org/apache/sqoop/hive/TestHiveImport.java
+++ b/src/test/org/apache/sqoop/hive/TestHiveImport.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.sqoop.avro.AvroSchemaMismatchException;
-import org.apache.sqoop.mapreduce.ParquetJob;
+import org.apache.sqoop.mapreduce.parquet.kite.KiteParquetUtils;
 import org.apache.sqoop.util.ParquetReader;
 import org.junit.After;
 import org.junit.Before;
@@ -404,7 +404,7 @@ public class TestHiveImport extends ImportJobTestCase {
     createTableWithColTypes(types, vals);
 
     thrown.expect(AvroSchemaMismatchException.class);
-    thrown.expectMessage(ParquetJob.INCOMPATIBLE_AVRO_SCHEMA_MSG + ParquetJob.HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG);
+    thrown.expectMessage(KiteParquetUtils.INCOMPATIBLE_AVRO_SCHEMA_MSG + KiteParquetUtils.HIVE_INCOMPATIBLE_AVRO_SCHEMA_MSG);
 
     SqoopOptions sqoopOptions = getSqoopOptions(getConf());
     sqoopOptions.setThrowOnError(true);
@@ -422,7 +422,7 @@ public class TestHiveImport extends ImportJobTestCase {
             .name(getColName(2)).type().nullable().stringType().noDefault()
             .endRecord();
     String dataSetUri = "dataset:hive:/default/" + tableName;
-    ParquetJob.createDataset(dataSetSchema, ParquetJob.getCompressionType(new Configuration()),
dataSetUri);
+    KiteParquetUtils.createDataset(dataSetSchema, KiteParquetUtils.getCompressionType(new
Configuration()), dataSetUri);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/test/org/apache/sqoop/mapreduce/TestJdbcExportJob.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/mapreduce/TestJdbcExportJob.java b/src/test/org/apache/sqoop/mapreduce/TestJdbcExportJob.java
index a900b1c..81ab677 100644
--- a/src/test/org/apache/sqoop/mapreduce/TestJdbcExportJob.java
+++ b/src/test/org/apache/sqoop/mapreduce/TestJdbcExportJob.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.sqoop.mapreduce.ExportJobBase.FileType;
+import org.apache.sqoop.mapreduce.parquet.ParquetExportJobConfigurator;
 import org.junit.Test;
 
 import org.apache.sqoop.SqoopOptions;
@@ -117,7 +118,7 @@ public class TestJdbcExportJob {
     when(mockConnManager.getColumnTypes(anyString(), anyString())).thenReturn(columnTypeInts);
     when(mockConnManager.toJavaType(anyString(), anyString(), anyInt())).thenReturn("String");
     when(mockContext.getConnManager()).thenReturn(mockConnManager);
-    JdbcExportJob jdbcExportJob = new JdbcExportJob(mockContext) {
+    JdbcExportJob jdbcExportJob = new JdbcExportJob(mockContext, mock(ParquetExportJobConfigurator.class))
{
       @Override
       protected FileType getInputFileType() {
         return inputFileType;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3233db8e/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java
index a133e58..be62efd 100644
--- a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java
+++ b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java
@@ -19,6 +19,7 @@
 package org.apache.sqoop.mapreduce.mainframe;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
 
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -27,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -56,7 +58,7 @@ public class TestMainframeImportJob {
     Path path = new Path("dummyPath");
     ImportJobContext context = new ImportJobContext(tableName, jarFile,
         options, path);
-    mfImportJob = new MainframeImportJob(options, context);
+    mfImportJob = new MainframeImportJob(options, context, mock(ParquetImportJobConfigurator.class));
 
     // To access protected method by means of reflection
     Class[] types = {};
@@ -79,7 +81,7 @@ public class TestMainframeImportJob {
     options.setFileLayout(SqoopOptions.FileLayout.AvroDataFile);
     ImportJobContext context = new ImportJobContext(tableName, jarFile,
         options, path);
-    avroImportJob = new MainframeImportJob(options, context);
+    avroImportJob = new MainframeImportJob(options, context, mock(ParquetImportJobConfigurator.class));
 
     // To access protected method by means of reflection
     Class[] types = {};


Mime
View raw message