sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkatran...@apache.org
Subject sqoop git commit: SQOOP-2936: Provide Apache Atlas integration for hcatalog based exports
Date Sun, 21 Aug 2016 06:41:08 GMT
Repository: sqoop
Updated Branches:
  refs/heads/trunk 3bd2952a9 -> b9794f98e


SQOOP-2936: Provide Apache Atlas integration for hcatalog based exports

(Balu Vellanki via Venkat Ranganathan)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/b9794f98
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/b9794f98
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/b9794f98

Branch: refs/heads/trunk
Commit: b9794f98e397065771fb99a30e5c85ac7b84bdd0
Parents: 3bd2952
Author: Venkat Ranganathan <venkat@hortonworks.com>
Authored: Sat Aug 20 23:40:28 2016 -0700
Committer: Venkat Ranganathan <venkat@hortonworks.com>
Committed: Sat Aug 20 23:40:28 2016 -0700

----------------------------------------------------------------------
 .../org/apache/sqoop/SqoopJobDataPublisher.java | 49 +++++++++++-----
 .../apache/sqoop/mapreduce/ExportJobBase.java   | 42 ++++++++-----
 .../apache/sqoop/mapreduce/ImportJobBase.java   | 32 +++-------
 .../apache/sqoop/mapreduce/PublishJobData.java  | 62 ++++++++++++++++++++
 .../sqoop/testutil/BaseSqoopTestCase.java       | 46 ++++++++++-----
 .../apache/sqoop/TestSqoopJobDataPublisher.java | 22 ++-----
 .../apache/sqoop/hcat/HCatalogExportTest.java   | 21 +++++++
 .../apache/sqoop/hcat/HCatalogImportTest.java   | 46 +++++++++++++++
 8 files changed, 231 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/b9794f98/src/java/org/apache/sqoop/SqoopJobDataPublisher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/SqoopJobDataPublisher.java b/src/java/org/apache/sqoop/SqoopJobDataPublisher.java
index d77125f..93be02f 100644
--- a/src/java/org/apache/sqoop/SqoopJobDataPublisher.java
+++ b/src/java/org/apache/sqoop/SqoopJobDataPublisher.java
@@ -19,9 +19,10 @@
 package org.apache.sqoop;
 
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.sqoop.mapreduce.ExportJobBase;
+import org.apache.sqoop.mapreduce.ImportJobBase;
 import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
 
-import java.io.IOException;
 import java.util.Properties;
 
 /**
@@ -94,40 +95,58 @@ public class SqoopJobDataPublisher {
             this.url = url;
             this.user = user;
             this.storeType = storeType;
-            this.storeTable = storeTable;
+            this.storeTable = (storeTable == null) ? hiveTable : storeTable;
             this.storeQuery = storeQuery;
-            this.hiveDB = hiveDB;
-            if (this.hiveDB == null) {
-                this.hiveDB =   SqoopHCatUtilities.DEFHCATDB;
-            }
+            this.hiveDB = (hiveDB == null) ? SqoopHCatUtilities.DEFHCATDB : hiveDB;
             this.hiveTable = hiveTable;
             this.commandLineOpts = commandLineOpts;
             this.startTime = startTime;
             this.endTime = endTime;
         }
 
-        public Data(String operation, String url, String user, String storeType, String storeTable,
-                              String storeQuery, String hiveDB, String hiveTable, Properties
commandLineOpts,
-                              long startTime, long endTime) {
+        public Data(String operation, String url, String user, String storeType,
+                    String storeTable, String storeQuery, String hiveDB, String hiveTable,
+                    Properties commandLineOpts, long startTime, long endTime) throws Exception{
             init(operation, url, user, storeType, storeTable, storeQuery,
                     hiveDB, hiveTable, commandLineOpts, startTime, endTime);
         }
 
-        public Data(SqoopOptions options, String tableName, long startTime, long endTime)
throws IOException {
-            String hiveTableName = options.doHiveImport() ?
-                    options.getHiveTableName() : options.getHCatTableName();
-            String hiveDatabase = options.doHiveImport() ?
-                    options.getHiveDatabaseName() : options.getHCatDatabaseName();
+        public Data(String operation, SqoopOptions options, String tableName,
+                    long startTime, long endTime) throws Exception {
+            String hiveTableName = null;
+            String hiveDatabase = null;
+            if (ExportJobBase.OPERATION.equals(operation)) {
+                // export job data
+                hiveTableName = options.getHCatTableName();
+                hiveDatabase = options.getHCatDatabaseName();
+            } else if (ImportJobBase.OPERATION.equals(operation)){
+                // import job data
+                hiveTableName = options.doHiveImport() ?
+                        options.getHiveTableName() : options.getHCatTableName();
+                hiveDatabase = options.doHiveImport() ?
+                        options.getHiveDatabaseName() : options.getHCatDatabaseName();
+            } else {
+                throw new Exception("Data published for unsupported Operation "
+                        + operation + " in SqoopJobDataPublisher");
+            }
+
             String dataStoreType = JDBC_STORE;
             String[] storeTypeFields = options.getConnectString().split(":");
             if (storeTypeFields.length > 2) {
                 dataStoreType = storeTypeFields[1];
             }
 
-            init("import", options.getConnectString(), UserGroupInformation.getCurrentUser().getShortUserName(),
+            init(operation, options.getConnectString(), UserGroupInformation.getCurrentUser().getShortUserName(),
                     dataStoreType, tableName, options.getSqlQuery(), hiveDatabase, hiveTableName,
                     options.writeProperties(), startTime, endTime);
         }
+
+        public String toString() {
+            return  "Operation=" + operation + ", Url=" + url + ", User=" + user + ", StoreType="
+ storeType
+                    + ", StoreTable=" + storeTable + ", StoreQuery=" + storeQuery + ", HiveDB="
+ hiveDB
+                    + ", HiveTable=" + hiveTable + ", StartTime=" + startTime + ", EndTime="
+ endTime
+                    + ", CmdLineArgs=" + commandLineOpts;
+        }
     }
 
     public void publish(Data data) throws Exception{

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b9794f98/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
index 32de92a..27f84da 100644
--- a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
@@ -18,10 +18,14 @@
 
 package org.apache.sqoop.mapreduce;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.sql.SQLException;
-
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.config.ConfigurationHelper;
+import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.mapreduce.JobBase;
+import com.cloudera.sqoop.orm.TableClassName;
+import com.cloudera.sqoop.util.ExportException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -39,18 +43,13 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
 import org.apache.sqoop.util.LoggingUtils;
 import org.apache.sqoop.util.PerfCounters;
+import org.apache.sqoop.validation.ValidationContext;
+import org.apache.sqoop.validation.ValidationException;
 
-import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
-import com.cloudera.sqoop.config.ConfigurationHelper;
-import com.cloudera.sqoop.lib.SqoopRecord;
-import com.cloudera.sqoop.manager.ConnManager;
-import com.cloudera.sqoop.manager.ExportJobContext;
-import com.cloudera.sqoop.orm.TableClassName;
-import com.cloudera.sqoop.mapreduce.JobBase;
-import com.cloudera.sqoop.util.ExportException;
-
-import org.apache.sqoop.validation.*;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Date;
 
 /**
  * Base class for running an export MapReduce job.
@@ -94,6 +93,10 @@ public class ExportJobBase extends JobBase {
   private static final String HADOOP_MAP_TASK_MAX_ATTEMTPS =
     "mapred.map.max.attempts";
 
+  /** Start and endtime captured for export job. */
+  private long startTime;
+  public static final String OPERATION = "export";
+
   protected ExportJobContext context;
 
 
@@ -107,6 +110,7 @@ public class ExportJobBase extends JobBase {
       final Class<? extends OutputFormat> outputFormatClass) {
     super(ctxt.getOptions(), mapperClass, inputFormatClass, outputFormatClass);
     this.context = ctxt;
+    this.startTime = new Date().getTime();
   }
 
   /**
@@ -439,12 +443,20 @@ public class ExportJobBase extends JobBase {
       setJob(job);
       boolean success = runJob(job);
       if (!success) {
+        LOG.error("Export job failed!");
         throw new ExportException("Export job failed!");
       }
 
       if (options.isValidationEnabled()) {
         validateExport(tableName, conf, job);
       }
+
+      if (isHCatJob) {
+        // Publish export job data for hcat export operation
+        LOG.info("Publishing HCatalog export job data to Listeners");
+        PublishJobData.publishJobData(conf, options, OPERATION, tableName, startTime);
+      }
+
     } catch (InterruptedException ie) {
       throw new IOException(ie);
     } catch (ClassNotFoundException cnfe) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b9794f98/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
index 9b6e1a0..105917c 100644
--- a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
@@ -27,7 +27,6 @@ import com.cloudera.sqoop.orm.TableClassName;
 import com.cloudera.sqoop.util.ImportException;
 import org.apache.avro.file.DataFileConstants;
 import org.apache.avro.mapred.AvroJob;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -42,8 +41,6 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.sqoop.SqoopJobDataPublisher;
-import org.apache.sqoop.config.ConfigurationConstants;
 import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
 import org.apache.sqoop.util.PerfCounters;
 import org.apache.sqoop.validation.ValidationContext;
@@ -61,7 +58,7 @@ public class ImportJobBase extends JobBase {
 
   private ImportJobContext context;
   private long startTime;
-  private long endTime;
+  public static final String OPERATION = "import";
   public static final Log LOG = LogFactory.getLog(
       ImportJobBase.class.getName());
 
@@ -280,28 +277,13 @@ public class ImportJobBase extends JobBase {
       if (options.isValidationEnabled()) {
         validateImport(tableName, conf, job);
       }
-      this.endTime = new Date().getTime();
-
-      String publishClassName = conf.get(ConfigurationConstants.DATA_PUBLISH_CLASS);
-      if (!StringUtils.isEmpty(publishClassName)) {
-        try {
-          Class publishClass =  Class.forName(publishClassName);
-          Object obj = publishClass.newInstance();
-          if (obj instanceof SqoopJobDataPublisher) {
-            SqoopJobDataPublisher publisher = (SqoopJobDataPublisher) obj;
-            if (options.doHiveImport() || options.getHCatTableName() != null) {
-              // We need to publish the details
-              SqoopJobDataPublisher.Data data =
-                      new SqoopJobDataPublisher.Data(options, tableName, startTime, endTime);
-              publisher.publish(data);
-            }
-          } else {
-            LOG.warn("Publisher class not an instance of SqoopJobDataPublisher. Ignoring...");
-          }
-        } catch (Exception ex) {
-          LOG.warn("Unable to publish data to publisher " + ex.getMessage(), ex);
-        }
+
+      if (options.doHiveImport() || isHCatJob) {
+        // Publish data for import job, only hive/hcat import jobs are supported now.
+        LOG.info("Publishing Hive/Hcat import job data to Listeners for table " + tableName);
+        PublishJobData.publishJobData(conf, options, OPERATION, tableName, startTime);
       }
+
     } catch (InterruptedException ie) {
       throw new IOException(ie);
     } catch (ClassNotFoundException cnfe) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b9794f98/src/java/org/apache/sqoop/mapreduce/PublishJobData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/PublishJobData.java b/src/java/org/apache/sqoop/mapreduce/PublishJobData.java
new file mode 100644
index 0000000..fc18188
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/PublishJobData.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.SqoopJobDataPublisher;
+import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.config.ConfigurationConstants;
+
+import java.util.Date;
+
+/**
+ * Util class to publish job data to listeners.
+ */
+public final class PublishJobData {
+
+    public static final Log LOG = LogFactory.getLog(PublishJobData.class.getName());
+
+    private PublishJobData() {}
+
+    public static void publishJobData(Configuration conf, SqoopOptions options,
+                                      String operation, String tableName, long startTime)
{
+        // Publish metadata about export job to listeners (if they are registered with sqoop)
+        long endTime = new Date().getTime();
+        String publishClassName = conf.get(ConfigurationConstants.DATA_PUBLISH_CLASS);
+        if (!StringUtils.isEmpty(publishClassName)) {
+            try {
+                Class publishClass =  Class.forName(publishClassName);
+                Object obj = publishClass.newInstance();
+                if (obj instanceof SqoopJobDataPublisher) {
+                    SqoopJobDataPublisher publisher = (SqoopJobDataPublisher) obj;
+                    SqoopJobDataPublisher.Data data =
+                            new SqoopJobDataPublisher.Data(operation, options, tableName,
startTime, endTime);
+                    LOG.info("Published data is " + data.toString());
+                    publisher.publish(data);
+                } else {
+                    LOG.warn("Publisher class not an instance of SqoopJobDataPublisher. Ignoring...");
+                }
+            } catch (Exception ex) {
+                LOG.warn("Unable to publish " + operation + " data to publisher " + ex.getMessage(),
ex);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b9794f98/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
index 99e4293..8dddd02 100644
--- a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
+++ b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
@@ -18,14 +18,13 @@
 
 package com.cloudera.sqoop.testutil;
 
-import java.io.File;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Arrays;
-
+import com.cloudera.sqoop.ConnFactory;
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.metastore.JobData;
+import com.cloudera.sqoop.tool.ImportTool;
+import com.google.common.collect.ObjectArrays;
+import junit.framework.TestCase;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -33,23 +32,38 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.BasicConfigurator;
+import org.apache.sqoop.SqoopJobDataPublisher;
 import org.junit.After;
 import org.junit.Before;
 
-import com.cloudera.sqoop.ConnFactory;
-import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.manager.ConnManager;
-import com.cloudera.sqoop.metastore.JobData;
-import com.cloudera.sqoop.tool.ImportTool;
-import com.google.common.collect.ObjectArrays;
-
-import junit.framework.TestCase;
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
 
 /**
  * Class that implements common methods required for tests.
  */
 public abstract class BaseSqoopTestCase extends TestCase {
 
+  public static class DummyDataPublisher extends SqoopJobDataPublisher {
+    public static String hiveTable;
+    public static String storeTable;
+    public static String storeType;
+    public static String operation;
+
+    @Override
+    public void publish(Data data) {
+      hiveTable = data.getHiveTable();
+      storeTable = data.getStoreTable();
+      storeType = data.getStoreType();
+      operation = data.getOperation();
+    }
+  }
+
   public static final Log LOG = LogFactory.getLog(
       BaseSqoopTestCase.class.getName());
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b9794f98/src/test/org/apache/sqoop/TestSqoopJobDataPublisher.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/TestSqoopJobDataPublisher.java b/src/test/org/apache/sqoop/TestSqoopJobDataPublisher.java
index 99bcae0..e9698be 100644
--- a/src/test/org/apache/sqoop/TestSqoopJobDataPublisher.java
+++ b/src/test/org/apache/sqoop/TestSqoopJobDataPublisher.java
@@ -19,7 +19,6 @@
 package org.apache.sqoop;
 
 import com.cloudera.sqoop.hive.HiveImport;
-import com.cloudera.sqoop.hive.TestHiveImport;
 import com.cloudera.sqoop.testutil.CommonArgs;
 import com.cloudera.sqoop.testutil.ImportJobTestCase;
 import com.cloudera.sqoop.tool.ImportTool;
@@ -36,22 +35,7 @@ import java.util.ArrayList;
 
 public class TestSqoopJobDataPublisher extends ImportJobTestCase {
 
-    public static class DummyDataPublisher extends SqoopJobDataPublisher {
-        private static String hiveTable;
-        private static String storeTable;
-        private static String storeType;
-
-        @Override
-        public void publish(SqoopJobDataPublisher.Data data) {
-            hiveTable = data.getHiveTable();
-            storeTable = data.getStoreTable();
-            storeType = data.getStoreType();
-            assert (data.getOperation().equals("import"));
-        }
-    }
-
-    public static final Log LOG = LogFactory.getLog(
-            TestHiveImport.class.getName());
+    public static final Log LOG = LogFactory.getLog(TestSqoopJobDataPublisher.class.getName());
 
     public void setUp() {
         super.setUp();
@@ -106,6 +90,7 @@ public class TestSqoopJobDataPublisher extends ImportJobTestCase {
 
         return args.toArray(new String[0]);
     }
+
     private void runImportTest(String tableName, String [] types,
                                String [] values, String verificationScript, String [] args,
                                SqoopTool tool) throws IOException {
@@ -137,6 +122,7 @@ public class TestSqoopJobDataPublisher extends ImportJobTestCase {
 
         return opts;
     }
+
     protected void setNumCols(int numCols) {
         String [] cols = new String[numCols];
         for (int i = 0; i < numCols; i++) {
@@ -159,7 +145,7 @@ public class TestSqoopJobDataPublisher extends ImportJobTestCase {
         assert (DummyDataPublisher.hiveTable.equals("NORMAL_HIVE_IMPORT"));
         assert (DummyDataPublisher.storeTable.equals("NORMAL_HIVE_IMPORT"));
         assert (DummyDataPublisher.storeType.equals("hsqldb"));
-
+        assert (DummyDataPublisher.operation.equals("import"));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b9794f98/src/test/org/apache/sqoop/hcat/HCatalogExportTest.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/hcat/HCatalogExportTest.java b/src/test/org/apache/sqoop/hcat/HCatalogExportTest.java
index 8aa0725..6f87a18 100644
--- a/src/test/org/apache/sqoop/hcat/HCatalogExportTest.java
+++ b/src/test/org/apache/sqoop/hcat/HCatalogExportTest.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.common.type.HiveVarchar;
 import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.sqoop.config.ConfigurationConstants;
 import org.apache.sqoop.hcat.HCatalogTestUtils.ColumnGenerator;
 import org.apache.sqoop.hcat.HCatalogTestUtils.CreateMode;
 import org.apache.sqoop.hcat.HCatalogTestUtils.KeyType;
@@ -441,4 +442,24 @@ public class HCatalogExportTest extends ExportJobTestCase {
     utils.setStorageInfo(HCatalogTestUtils.STORED_AS_TEXT);
     runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
   }
+
+  public void testPublishExportJobData() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+            HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+                    "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, 0, 0,
+                    "1", "1", KeyType.STATIC_KEY),
+            HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+                    "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, 0, 0,
+                    "2", "2", KeyType.DYNAMIC_KEY), };
+
+    List<String> addlArgsArray = new ArrayList<String>();
+    addlArgsArray.add("-D");
+    addlArgsArray.add(ConfigurationConstants.DATA_PUBLISH_CLASS + "=" + DummyDataPublisher.class.getName());
+    runHCatExport(addlArgsArray, TOTAL_RECORDS, table, cols);
+    assert (DummyDataPublisher.storeTable.equals(getTableName()));
+    assert (DummyDataPublisher.storeType.equals("hsqldb"));
+    assert (DummyDataPublisher.operation.equals("export"));
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b9794f98/src/test/org/apache/sqoop/hcat/HCatalogImportTest.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/hcat/HCatalogImportTest.java b/src/test/org/apache/sqoop/hcat/HCatalogImportTest.java
index 67b7a78..fe5295a 100644
--- a/src/test/org/apache/sqoop/hcat/HCatalogImportTest.java
+++ b/src/test/org/apache/sqoop/hcat/HCatalogImportTest.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.common.type.HiveVarchar;
 import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.apache.sqoop.config.ConfigurationConstants;
 import org.apache.sqoop.hcat.HCatalogTestUtils.ColumnGenerator;
 import org.apache.sqoop.hcat.HCatalogTestUtils.CreateMode;
 import org.apache.sqoop.hcat.HCatalogTestUtils.KeyType;
@@ -918,4 +919,49 @@ public class HCatalogImportTest extends ImportJobTestCase {
     runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols,
       null, true, false);
   }
+
+  public void testPublishQueryImportData() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols =  new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, 0, 0,
+        new HiveVarchar("1", 20), "1", KeyType.STATIC_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, 0, 0,
+        new HiveVarchar("2", 20), "2", KeyType.DYNAMIC_KEY),
+    };
+    List<String> cfgParams = new ArrayList<String>();
+    cfgParams.add("-D");
+    cfgParams.add(ConfigurationConstants.DATA_PUBLISH_CLASS + "=" + DummyDataPublisher.class.getName());
+    setConfigParams(cfgParams);
+    runHCatQueryImport(new ArrayList<String>(), TOTAL_RECORDS, table, cols, null);
+    assert (DummyDataPublisher.storeType.equals("hsqldb"));
+    assert (DummyDataPublisher.operation.equals("import"));
+    assert (DummyDataPublisher.storeTable.equals(getTableName()));
+  }
+
+  public void testPublishTableImportData() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols =  new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(0),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, 0, 0,
+        new HiveVarchar("1", 20), "1", KeyType.STATIC_KEY),
+      HCatalogTestUtils.colGenerator(HCatalogTestUtils.forIdx(1),
+        "varchar(20)", Types.VARCHAR, HCatFieldSchema.Type.STRING, 0, 0,
+        new HiveVarchar("2", 20), "2", KeyType.DYNAMIC_KEY),
+    };
+    List<String> cfgParams = new ArrayList<String>();
+    cfgParams.add("-D");
+    cfgParams.add(ConfigurationConstants.DATA_PUBLISH_CLASS + "=" + DummyDataPublisher.class.getName());
+    setConfigParams(cfgParams);
+    List<String> addlArgsArray = new ArrayList<String>();
+    addlArgsArray.add("--create-hcatalog-table");
+    setExtraArgs(addlArgsArray);
+    runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null, true, false);
+    assert (DummyDataPublisher.storeType.equals("hsqldb"));
+    assert (DummyDataPublisher.operation.equals("import"));
+    assert (DummyDataPublisher.storeTable.equals(getTableName()));
+  }
 }


Mime
View raw message