sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kathl...@apache.org
Subject sqoop git commit: SQOOP-2938: Mainframe import module extension to support data sets on tape (Chris Teoh via Kate Ting)
Date Tue, 07 Jun 2016 00:32:53 GMT
Repository: sqoop
Updated Branches:
  refs/heads/trunk 28bbe4d46 -> b302d89fa


SQOOP-2938: Mainframe import module extension to support data sets on tape (Chris Teoh via Kate Ting)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/b302d89f
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/b302d89f
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/b302d89f

Branch: refs/heads/trunk
Commit: b302d89faebd3f21cdbddcf1e1d74bf8ebd8035d
Parents: 28bbe4d
Author: Kate Ting <kathleen@apache.org>
Authored: Mon Jun 6 17:31:17 2016 -0700
Committer: Kate Ting <kathleen@apache.org>
Committed: Mon Jun 6 17:31:17 2016 -0700

----------------------------------------------------------------------
 src/docs/man/mainframe-connection-args.txt      |   6 +
 src/java/org/apache/sqoop/SqoopOptions.java     |  35 +++++
 .../mainframe/MainframeConfiguration.java       |  11 ++
 .../MainframeDatasetFTPRecordReader.java        |  18 ++-
 .../mainframe/MainframeDatasetInputFormat.java  |   6 +-
 .../mainframe/MainframeDatasetPath.java         | 117 ++++++++++++++
 .../mainframe/MainframeDatasetType.java         |  19 +++
 .../mainframe/MainframeFTPFileEntryParser.java  |  93 +++++++++++
 .../mapreduce/mainframe/MainframeImportJob.java |   6 +
 .../apache/sqoop/tool/MainframeImportTool.java  |  38 +++++
 .../sqoop/util/MainframeFTPClientUtils.java     |  79 ++++++++--
 .../mainframe/TestMainframeDatasetPath.java     |  86 ++++++++++
 .../TestMainframeFTPFileEntryParser.java        |  64 ++++++++
 .../mainframe/TestMainframeImportJob.java       |   2 +
 .../sqoop/tool/TestMainframeImportTool.java     |  77 +++++++++
 .../sqoop/util/TestMainframeFTPClientUtils.java | 157 +++++++++++++++++++
 16 files changed, 799 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/b302d89f/src/docs/man/mainframe-connection-args.txt
----------------------------------------------------------------------
diff --git a/src/docs/man/mainframe-connection-args.txt b/src/docs/man/mainframe-connection-args.txt
index 04e33c7..4dd1f71 100644
--- a/src/docs/man/mainframe-connection-args.txt
+++ b/src/docs/man/mainframe-connection-args.txt
@@ -23,3 +23,9 @@
 
 --dataset (partitioned dataset name)::
   Specify a partitioned dataset name
+
+--datasettype (data set type)::
+  Specify a dataset type. "s" for sequential, "p" for partitioned dataset (default), "g" for generational data group
+  
+--tape (dataset on tape)::
+  Specify whether a dataset is on tape or not (true|false)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b302d89f/src/java/org/apache/sqoop/SqoopOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java
index ff96280..e14a0b7 100644
--- a/src/java/org/apache/sqoop/SqoopOptions.java
+++ b/src/java/org/apache/sqoop/SqoopOptions.java
@@ -33,6 +33,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.sqoop.accumulo.AccumuloConstants;
+import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration;
 import org.apache.sqoop.util.CredentialsUtil;
 import org.apache.sqoop.util.LoggingUtils;
 import org.apache.sqoop.util.SqoopJsonUtil;
@@ -282,6 +283,14 @@ public class SqoopOptions implements Cloneable {
   @StoredAsProperty("mainframe.input.dataset.name")
   private String mainframeInputDatasetName;
 
+  // Dataset type for mainframe import tool
+  @StoredAsProperty("mainframe.input.dataset.type")
+  private String mainframeInputDatasetType;
+
+  // Indicates if the data set is on tape to use different FTP parser
+  @StoredAsProperty("mainframe.input.dataset.tape")
+  private String mainframeInputDatasetTape;
+
   // Accumulo home directory
   private String accumuloHome; // not serialized to metastore.
   // Zookeeper home directory
@@ -1023,6 +1032,9 @@ public class SqoopOptions implements Cloneable {
     // Relaxed isolation will not enabled by default which is the behavior
     // of sqoop until now.
     this.relaxedIsolation = false;
+    
+    // set default mainframe data set type to partitioned data set
+    this.mainframeInputDatasetType = MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED;
   }
 
   /**
@@ -2276,6 +2288,11 @@ public class SqoopOptions implements Cloneable {
   public void setMainframeInputDatasetName(String name) {
     mainframeInputDatasetName = name;
     tableName = name;
+    // may need to set this in the conf variable otherwise it gets lost.
+  }
+
+  public void setMainframeInputDatasetType(String name) {
+    mainframeInputDatasetType = name;
   }
 
   /**
@@ -2285,6 +2302,24 @@ public class SqoopOptions implements Cloneable {
     return mainframeInputDatasetName;
   }
 
+  /*
+   * Return the mainframe dataset type
+   */
+  public String getMainframeInputDatasetType() {
+    return mainframeInputDatasetType;
+  }
+
+  // return whether the dataset is on tape
+  public Boolean getMainframeInputDatasetTape() {
+	  if (mainframeInputDatasetTape == null) { return false; }
+	  return Boolean.parseBoolean(mainframeInputDatasetTape);
+  }
+
+  // sets whether the dataset is on tape
+  public void setMainframeInputDatasetTape(String txtIsFromTape) {
+	  mainframeInputDatasetTape = Boolean.valueOf(Boolean.parseBoolean(txtIsFromTape)).toString();
+  }
+
   public static String getAccumuloHomeDefault() {
     // Set this with $ACCUMULO_HOME, but -Daccumulo.home can override.
     String accumuloHome = System.getenv("ACCUMULO_HOME");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b302d89f/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java
index f889435..ea54b07 100644
--- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java
+++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeConfiguration.java
@@ -22,4 +22,15 @@ public class MainframeConfiguration
   public static final String MAINFRAME_INPUT_DATASET_NAME
       = "mapreduce.mainframe.input.dataset.name";
 
+  public static final String MAINFRAME_INPUT_DATASET_TYPE
+      = "mapreduce.mainframe.input.dataset.type";
+  public static final String MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL
+		= "s";
+  public static final String MAINFRAME_INPUT_DATASET_TYPE_GDG
+	= "g";
+  public static final String MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED
+	= "p";
+  public static final String MAINFRAME_INPUT_DATASET_TAPE = "mainframe.input.dataset.tape";
+
+  public static final String MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME = "org.apache.sqoop.mapreduce.mainframe.MainframeFTPFileEntryParser";
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b302d89f/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetFTPRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetFTPRecordReader.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetFTPRecordReader.java
index 7c36842..1f78384 100644
--- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetFTPRecordReader.java
+++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetFTPRecordReader.java
@@ -28,7 +28,6 @@ import org.apache.commons.net.ftp.FTPClient;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
 import org.apache.sqoop.lib.SqoopRecord;
 import org.apache.sqoop.util.MainframeFTPClientUtils;
 
@@ -52,9 +51,20 @@ public class MainframeDatasetFTPRecordReader <T extends SqoopRecord>
     Configuration conf = getConfiguration();
     ftp = MainframeFTPClientUtils.getFTPConnection(conf);
     if (ftp != null) {
-      String dsName
-          = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
-      ftp.changeWorkingDirectory("'" + dsName + "'");
+		String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
+		String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE);
+		MainframeDatasetPath p = null;
+		try {
+			p = new MainframeDatasetPath(dsName,conf);
+		} catch (Exception e) {
+			LOG.error(e.getMessage());
+			LOG.error("MainframeDatasetPath helper class incorrectly initialised");
+			e.printStackTrace();
+		}
+		if (dsType != null && p != null) {
+			dsName = p.getMainframeDatasetFolder();
+		}
+		ftp.changeWorkingDirectory("'" + dsName + "'");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b302d89f/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetInputFormat.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetInputFormat.java
index 045bbd2..55c0fdc 100644
--- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetInputFormat.java
+++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetInputFormat.java
@@ -60,9 +60,13 @@ public class MainframeDatasetInputFormat<T extends SqoopRecord>
     String dsName
         = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
     LOG.info("Datasets to transfer from: " + dsName);
+    String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE);
+    LOG.info("Dataset type: " + dsType);
+    String dsTape = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TAPE);
+    LOG.info("Dataset on tape?: " + dsTape);
     List<String> datasets = retrieveDatasets(dsName, conf);
     if (datasets.isEmpty()) {
-      throw new IOException ("No sequential datasets retrieved from " + dsName);
+      throw new IOException ("No datasets retrieved from " + dsName);
     } else {
       int count = datasets.size();
       int chunks = Math.min(count, ConfigurationHelper.getJobNumMaps(job));

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b302d89f/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetPath.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetPath.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetPath.java
new file mode 100644
index 0000000..56c70d6
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetPath.java
@@ -0,0 +1,117 @@
+/**
+ * 
+ */
+package org.apache.sqoop.mapreduce.mainframe;
+
+import java.text.ParseException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * This class determines resolves FTP folder paths
+ * given the data set type and the data set name
+ */
+
+public class MainframeDatasetPath {
+
+	private static final Log LOG =
+		      LogFactory.getLog(MainframeDatasetPath.class);	
+	private String datasetName;
+	private MainframeDatasetType datasetType;
+	private String dsFolderName = null;
+	private String dsFileName = null;
+	
+	// default constructor
+	public MainframeDatasetPath(){}
+	
+	// constructor that takes dataset name job configuration
+	public MainframeDatasetPath(String dsName, Configuration conf) throws Exception {
+		String inputName
+        	= conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
+		// this should always be true
+		assert(inputName.equals(dsName));
+		LOG.info("Datasets to transfer from: " + dsName);
+		this.datasetName = dsName;
+		// initialise dataset type
+		String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE);
+		this.setMainframeDatasetType(dsType);
+		initialisePaths();
+	}
+	
+	public MainframeDatasetPath(String dsName, MainframeDatasetType dsType) {
+		this.setMainframeDatasetName(dsName);
+		this.setMainframeDatasetType(dsType);
+		initialisePaths();
+	}
+	
+	public MainframeDatasetPath(String dsName, String dsType) throws ParseException {
+		this.setMainframeDatasetName(dsName);
+		this.setMainframeDatasetType(dsType);
+		initialisePaths();
+	}
+	
+	public void initialisePaths() throws IllegalStateException {
+		if (this.datasetName == null || this.datasetType == null) {
+			throw new IllegalStateException("Please set data set name and type first.");
+		}
+		boolean isSequentialDs = this.datasetType.equals(MainframeDatasetType.SEQUENTIAL);
+	    boolean isGDG = this.datasetType.equals(MainframeDatasetType.GDG);
+	    LOG.info(String.format("dsName: %s", this.datasetName));
+	    LOG.info(String.format("isSequentialDs: %s isGDG: %s", isSequentialDs, isGDG));
+	    if (isSequentialDs)
+	    {
+	      // truncate the tailing string until the dot
+	      // usually dataset qualifiers are dots. eg blah1.blah2.blah3
+	      // so in this case, we should return blah1.blah2
+	      int lastDotIndex = this.datasetName.lastIndexOf(".");
+	      // if not found, it is probably in the root
+	      if (lastDotIndex == -1) { this.datasetName = ""; } else {
+	        // if found, return the truncated name
+	        dsFolderName = this.datasetName.substring(0, lastDotIndex);
+	        if (lastDotIndex + 1 < this.datasetName.length()) {
+	          dsFileName = this.datasetName.substring(lastDotIndex + 1);
+	        }
+	      }
+	    } else {
+	    	// GDG or PDS
+	    	dsFolderName = this.datasetName;
+	    	dsFileName = null; // handle parentheses parsing later
+	    }
+	}
+	
+	// getters and setters
+	public MainframeDatasetType getMainframeDatasetType() {
+		return this.datasetType;
+	}
+	
+	public void setMainframeDatasetType(MainframeDatasetType dsType) {
+		this.datasetType = dsType;
+	}
+	
+	// overloaded setter to parse string
+	public void setMainframeDatasetType(String dsType) throws ParseException {
+		if (dsType.equals("s")) { this.datasetType = MainframeDatasetType.SEQUENTIAL; }
+		else if (dsType.equals("p")) { this.datasetType = MainframeDatasetType.PARTITIONED; }
+		else if (dsType.equals("g")) { this.datasetType = MainframeDatasetType.GDG; }
+		else { throw new ParseException(String.format("Invalid data set type specified: %s",dsType), 0); }
+	}
+	
+	public String getMainframeDatasetName() {
+		return this.datasetName;
+	}
+	
+	public void setMainframeDatasetName(String dsName) {
+		this.datasetName = dsName;
+	}
+	
+	public String getMainframeDatasetFolder() {
+		return this.dsFolderName;
+	}
+
+	public String getMainframeDatasetFileName() {
+		// returns filename in the folder and null if it is a GDG as this requires a file listing
+		return dsFileName;
+	}
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b302d89f/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetType.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetType.java
new file mode 100644
index 0000000..bcaa56d
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeDatasetType.java
@@ -0,0 +1,19 @@
+package org.apache.sqoop.mapreduce.mainframe;
+
+/********
+ * Basic enumeration for Mainframe data set types
+ ********/
+
+public enum MainframeDatasetType {
+	GDG, PARTITIONED, SEQUENTIAL;
+	
+	@Override
+	  public String toString() {
+	    switch(this) {
+	      case GDG: return MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG;
+	      case PARTITIONED: return MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED;
+	      case SEQUENTIAL: return MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL;
+	      default: throw new IllegalArgumentException();
+	    }
+	  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b302d89f/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeFTPFileEntryParser.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeFTPFileEntryParser.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeFTPFileEntryParser.java
new file mode 100644
index 0000000..661b1cc
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeFTPFileEntryParser.java
@@ -0,0 +1,93 @@
+package org.apache.sqoop.mapreduce.mainframe;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.net.ftp.FTPClientConfig;
+import org.apache.commons.net.ftp.FTPFile;
+import org.apache.commons.net.ftp.parser.ConfigurableFTPFileEntryParserImpl;
+
+public class MainframeFTPFileEntryParser extends ConfigurableFTPFileEntryParserImpl {
+
+	/* Volume Unit    Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname
+	 * xxx300 3390   2016/05/25  1   45  VB    2349 27998  PS  UNLOAD.EDH.UNLOADED
+	 * x31167 Tape                                             UNLOAD.EDH.UNLOADT
+	 * xxx305 3390   2016/05/23  1   45  VB    2349 27998  PS  UNLOAD.EDH.UNLOAD1
+	 */
+
+	// match Unit and Dsname
+	private static String REGEX = "^\\S+\\s+(\\S+)\\s+.*?\\s+(\\S+)$";
+	// match Unit, BlkSz, Dsorg, DsName
+	private static String NON_TAPE_REGEX = "^\\S+\\s+(\\S+)\\s+.*?(\\d+)\\s+(\\S+)\\s+(\\S+)$";
+	static final String DEFAULT_DATE_FORMAT = "yyyy/MM/dd HH:mm";
+	//= "MMM d yyyy"; //Nov 9 2001
+
+	static final String DEFAULT_RECENT_DATE_FORMAT = "MMM d HH:mm"; //Nov 9 20:06
+
+	private static String tapeUnitString = "Tape";
+	private static String unitHeaderString = "Unit";
+	private static String dsNameHeaderString = "Dsname";
+	private static String dsOrgPDSString = "PO";
+	private static String dsOrgPDSExtendedString = "PO-E";
+	private static String dsOrgSeqString = "PS";
+	private static Pattern nonTapePattern = Pattern.compile(NON_TAPE_REGEX);
+
+	private static final Log LOG = LogFactory.getLog(MainframeFTPFileEntryParser.class.getName());
+
+	public MainframeFTPFileEntryParser() {
+		super(REGEX);
+		LOG.info("MainframeFTPFileEntryParser constructor");
+	}
+
+	public MainframeFTPFileEntryParser(String regex) {
+		super(REGEX);
+	}
+
+	public FTPFile parseFTPEntry(String entry) {
+		LOG.info("parseFTPEntry: "+entry);
+
+        if (matches(entry)) {
+        	String unit = group(1);
+        	String dsName = group(2);
+        	LOG.info("parseFTPEntry match: "+group(1)+" "+group(2));
+        	if (unit.equals(unitHeaderString) && dsName.equals(dsNameHeaderString)) {
+        		return null;
+        	}
+        	FTPFile file = new FTPFile();
+            file.setRawListing(entry);
+            file.setName(dsName);
+        	// match non tape values
+        	if (!unit.equals("Tape")) {
+	        	Matcher m = nonTapePattern.matcher(entry);
+	        	// get sizes
+	        	if (m.matches()) {
+	        		// PO/PO-E = PDS = directory
+	        		// PS = Sequential data set = file
+	        		String size = m.group(2);
+	        		String dsOrg = m.group(3);
+	        		file.setSize(Long.parseLong(size));
+	        		LOG.info(String.format("Non tape match: %s, %s, %s", file.getName(), file.getSize(), dsOrg));
+	        		if (dsOrg.equals(dsOrgPDSString) || dsOrg.equals(dsOrgPDSExtendedString)) {
+	        			file.setType(FTPFile.DIRECTORY_TYPE);
+	        		}
+	        		if (dsOrg.equals(dsOrgSeqString)) {
+	        			file.setType(FTPFile.FILE_TYPE);
+	        		}
+	        	}
+        	} else {
+        		LOG.info(String.format("Tape match: %s, %s", file.getName(), unit));
+        		file.setType(FTPFile.FILE_TYPE);
+        	}
+        	return file;
+        }
+		return null;
+	}
+	
+	@Override
+	protected FTPClientConfig getDefaultConfiguration() {
+		return new FTPClientConfig(FTPClientConfig.SYST_MVS,
+				DEFAULT_DATE_FORMAT, null, null, null, null);
+	} 
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b302d89f/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java
index 16c2f75..f222dc8 100644
--- a/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/mainframe/MainframeImportJob.java
@@ -59,6 +59,12 @@ public class MainframeImportJob extends DataDrivenImportJob {
     job.getConfiguration().set(
         MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,
         options.getMainframeInputDatasetName());
+    job.getConfiguration().set(
+            MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,
+            options.getMainframeInputDatasetType());
+    job.getConfiguration().set(
+            MainframeConfiguration.MAINFRAME_INPUT_DATASET_TAPE,
+            options.getMainframeInputDatasetTape().toString());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b302d89f/src/java/org/apache/sqoop/tool/MainframeImportTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/MainframeImportTool.java b/src/java/org/apache/sqoop/tool/MainframeImportTool.java
index bc4ae6c..0cb91db 100644
--- a/src/java/org/apache/sqoop/tool/MainframeImportTool.java
+++ b/src/java/org/apache/sqoop/tool/MainframeImportTool.java
@@ -23,6 +23,7 @@ import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration;
 
 import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
@@ -36,6 +37,8 @@ public class MainframeImportTool extends ImportTool {
   private static final Log LOG
       = LogFactory.getLog(MainframeImportTool.class.getName());
   public static final String DS_ARG = "dataset";
+  public static final String DS_TYPE_ARG = "datasettype";
+  public static final String DS_TAPE_ARG = "tape";
 
   public MainframeImportTool() {
     super("import-mainframe", false);
@@ -59,6 +62,14 @@ public class MainframeImportTool extends ImportTool {
         .hasArg().withDescription("HDFS plain file destination")
         .withLongOpt(TARGET_DIR_ARG)
         .create());
+    importOpts.addOption(OptionBuilder.withArgName("Dataset type")
+            .hasArg().withDescription("Dataset type (p=partitioned data set|s=sequential data set|g=GDG)")
+            .withLongOpt(DS_TYPE_ARG)
+            .create());
+    importOpts.addOption(OptionBuilder.withArgName("Dataset is on tape")
+    		.hasArg().withDescription("Dataset is on tape (true|false)")
+    		.withLongOpt(DS_TAPE_ARG)
+    		.create());
 
     addValidationOpts(importOpts);
 
@@ -142,6 +153,20 @@ public class MainframeImportTool extends ImportTool {
     if (in.hasOption(DS_ARG)) {
       out.setMainframeInputDatasetName(in.getOptionValue(DS_ARG));
     }
+
+    if (in.hasOption(DS_TYPE_ARG)) {
+      out.setMainframeInputDatasetType(in.getOptionValue(DS_TYPE_ARG));
+    } else {
+    	// set default data set type to partitioned
+    	out.setMainframeInputDatasetType(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED);
+    }
+
+    if (in.hasOption(DS_TAPE_ARG)) {
+    	out.setMainframeInputDatasetTape(in.getOptionValue(DS_TAPE_ARG));
+    } else {
+    	// set default tape value to false
+    	out.setMainframeInputDatasetTape("false");
+    }
   }
 
   @Override
@@ -151,6 +176,19 @@ public class MainframeImportTool extends ImportTool {
       throw new InvalidOptionsException(
           "--" + DS_ARG + " is required for mainframe import. " + HELP_STR);
     }
+    String dsType = options.getMainframeInputDatasetType();
+    LOG.info("Dataset type: "+dsType);
+    if (!dsType.equals(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED)
+    		&& !dsType.equals(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL)
+    		&& !dsType.equals(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG)) {
+      throw new InvalidOptionsException(
+    		  "--" + DS_TYPE_ARG + " specified is invalid. " + HELP_STR);
+    }
+    Boolean dsTape = options.getMainframeInputDatasetTape();
+	if (dsTape == null && dsTape != true && dsTape != false) {
+		throw new InvalidOptionsException(
+				"--" + DS_TAPE_ARG + " specified is invalid. " + HELP_STR);
+	}
     super.validateImportOptions(options);
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b302d89f/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
index eae7a63..f61b983 100644
--- a/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
+++ b/src/java/org/apache/sqoop/util/MainframeFTPClientUtils.java
@@ -29,16 +29,16 @@ import org.apache.commons.net.ftp.FTPClient;
 import org.apache.commons.net.ftp.FTPClientConfig;
 import org.apache.commons.net.ftp.FTPConnectionClosedException;
 import org.apache.commons.net.ftp.FTPFile;
+import org.apache.commons.net.ftp.FTPListParseEngine;
 import org.apache.commons.net.ftp.FTPReply;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
-
 import org.apache.sqoop.mapreduce.JobBase;
 import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration;
+import org.apache.sqoop.mapreduce.mainframe.MainframeDatasetPath;
 
 /**
  * Utility methods used when accessing a mainframe server through FTP client.
@@ -52,20 +52,78 @@ public final class MainframeFTPClientUtils {
   private MainframeFTPClientUtils() {
   }
 
-  public static List<String> listSequentialDatasets(
-      String pdsName, Configuration conf) throws IOException {
+  public static List<String> listSequentialDatasets(String pdsName, Configuration conf) throws IOException {
     List<String> datasets = new ArrayList<String>();
+    String dsName = pdsName;
+    String fileName = "";
+    MainframeDatasetPath p = null;
+    try {
+    	p = new MainframeDatasetPath(dsName,conf);
+    } catch (Exception e) {
+    	LOG.error(e.getMessage());
+    	LOG.error("MainframeDatasetPath helper class incorrectly initialised");
+    	e.printStackTrace();
+    }
+    String dsType = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE);
+    boolean isTape = Boolean.parseBoolean(conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TAPE));
+    boolean isSequentialDs = false;
+    boolean isGDG = false;
+    if (dsType != null && p != null) {
+    	isSequentialDs = p.getMainframeDatasetType().toString().equals(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL);
+        isGDG = p.getMainframeDatasetType().toString().equals(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG);
+    	pdsName = p.getMainframeDatasetFolder();
+    	fileName = p.getMainframeDatasetFileName();
+    }
     FTPClient ftp = null;
     try {
       ftp = getFTPConnection(conf);
       if (ftp != null) {
         ftp.changeWorkingDirectory("'" + pdsName + "'");
-        FTPFile[] ftpFiles = ftp.listFiles();
-        for (FTPFile f : ftpFiles) {
-          if (f.getType() == FTPFile.FILE_TYPE) {
-            datasets.add(f.getName());
-          }
+        FTPFile[] ftpFiles = null;
+        if (isTape) {
+        	FTPListParseEngine parser = ftp.initiateListParsing(MainframeConfiguration.MAINFRAME_FTP_FILE_ENTRY_PARSER_CLASSNAME, "");
+        	List<FTPFile> listing = new ArrayList<FTPFile>();
+        	while(parser.hasNext()) {
+        		FTPFile[] files = parser.getNext(25);
+        		for (FTPFile file : files) {
+        			if (file != null) {
+        				listing.add(file);
+        				LOG.info(String.format("Name: %s Type: %s", file.getName(), file.getType()));
+        			}
+        			// skip nulls returned from parser
+        		}
+        		ftpFiles = new FTPFile[listing.size()];
+        		for (int i = 0;i < listing.size(); i++) {
+        			ftpFiles[i] = listing.get(i);
+        		}
+        		LOG.info("Files returned from mainframe parser:-");
+        		for (FTPFile f : ftpFiles) {
+        			LOG.info(String.format("Name: %s, Type: %s",f.getName(),f.getType()));
+        		}
+        	}
         }
+		else { ftpFiles = ftp.listFiles(); }
+		if (!isGDG) {
+			for (FTPFile f : ftpFiles) {
+				LOG.info(String.format("Name: %s Type: %s",f.getName(), f.getType()));
+				if (f.getType() == FTPFile.FILE_TYPE) {
+					// only add datasets if default behaviour of partitioned data sets
+					// or if it is a sequential data set, only add if the file name matches exactly
+					if (!isSequentialDs || isSequentialDs && f.getName().equals(fileName) && !fileName.equals("")) {
+						datasets.add(f.getName());
+					}
+				}
+			}
+		} else {
+			LOG.info("GDG branch. File list:-");
+			for (FTPFile f : ftpFiles) {
+				LOG.info(String.format("Name: %s Type: %s",f.getName(), f.getType()));
+			}
+			if (ftpFiles.length > 0 && ftpFiles[ftpFiles.length-1].getType() == FTPFile.FILE_TYPE) {
+				// for GDG - add the last file in the collection
+				datasets.add(ftpFiles[ftpFiles.length-1].getName());
+			}
+		}
       }
     } catch(IOException ioe) {
       throw new IOException ("Could not list datasets from " + pdsName + ":"
@@ -144,6 +202,7 @@ public final class MainframeFTPClientUtils {
       ftp.setFileType(FTP.ASCII_FILE_TYPE);
       // Use passive mode as default.
       ftp.enterLocalPassiveMode();
+      LOG.info("System type detected: " + ftp.getSystemType());
     } catch(IOException ioe) {
       if (ftp != null && ftp.isConnected()) {
         try {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b302d89f/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetPath.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetPath.java b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetPath.java
new file mode 100644
index 0000000..3492fee
--- /dev/null
+++ b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeDatasetPath.java
@@ -0,0 +1,86 @@
+package org.apache.sqoop.mapreduce.mainframe;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.text.ParseException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+public class TestMainframeDatasetPath {
+
+	@Test
+	public void testCanGetFileNameOnSequential() throws Exception {
+		String dsName = "a.b.c.d";
+		String expectedFileName = "d";
+		MainframeDatasetPath p = new MainframeDatasetPath(dsName,MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL);
+		assertEquals(expectedFileName,p.getMainframeDatasetFileName());
+	}
+	
+	@Test
+	public void testCanGetFolderNameOnSequential() throws Exception {
+		String dsName = "a.b.c.d";
+		String expectedFolderName = "a.b.c";
+		MainframeDatasetPath p = new MainframeDatasetPath(dsName,MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL);
+		assertEquals(expectedFolderName,p.getMainframeDatasetFolder());
+	}
+	
+	@Test
+	public void testCanGetFolderNameOnGDG() throws Exception {
+		String dsName = "a.b.c.d";
+		String expectedFolderName = "a.b.c.d";
+		MainframeDatasetPath p = new MainframeDatasetPath(dsName,MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG);
+		assertEquals(expectedFolderName,p.getMainframeDatasetFolder());
+	}
+	
+	@Test
+	public void testFileNameIsNullOnGDG() throws Exception {
+		String dsName = "a.b.c.d";
+		MainframeDatasetPath p = new MainframeDatasetPath(dsName,MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG);
+		assertEquals(null,p.getMainframeDatasetFileName());
+	}
+	
+	@Test
+	public void testConstructor1() throws Exception {
+		String dsName = "a.b.c.d";
+		String expectedFolderName = "a.b.c";
+		String expectedFileName = "d";
+		Configuration conf = new Configuration();
+		conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME, dsName);
+		conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE, MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_SEQUENTIAL);
+		MainframeDatasetPath p = new MainframeDatasetPath(dsName,conf);
+		assertEquals(dsName,p.getMainframeDatasetName());
+		assertEquals(MainframeDatasetType.SEQUENTIAL.toString(),p.getMainframeDatasetType().toString());
+		assertEquals(expectedFolderName, p.getMainframeDatasetFolder());
+		assertEquals(expectedFileName, p.getMainframeDatasetFileName());
+	}
+	
+	@Test
+	public void testConstructor2() throws Exception {
+		String dsName = "a.b.c.d";
+		String expectedFolderName = "a.b.c.d";
+		String dsType = "p";
+		MainframeDatasetPath p = new MainframeDatasetPath(dsName,dsType);
+		assertEquals(expectedFolderName,p.getMainframeDatasetFolder());
+	}
+	
+	@Test
+	public void testConstructor3() {
+		String dsName = "a.b.c.d";
+		String expectedFolderName = "a.b.c.d";
+		MainframeDatasetPath p = new MainframeDatasetPath(dsName,MainframeDatasetType.GDG);
+		assertEquals(expectedFolderName, p.getMainframeDatasetFolder());
+	}
+	
+	@Test
+	public void testUninitialisedThrowsException() {
+		MainframeDatasetPath p = new MainframeDatasetPath();
+		try {
+			p.initialisePaths();
+		} catch (IllegalStateException e) {
+			assertNotNull(e);
+			assertEquals("Please set data set name and type first.",e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b302d89f/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeFTPFileEntryParser.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeFTPFileEntryParser.java b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeFTPFileEntryParser.java
new file mode 100644
index 0000000..8926bb9
--- /dev/null
+++ b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeFTPFileEntryParser.java
@@ -0,0 +1,64 @@
+package org.apache.sqoop.mapreduce.mainframe;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.net.ftp.FTPFile;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestMainframeFTPFileEntryParser {
+	static List<String> listing;
+	static MainframeFTPFileEntryParser parser2;
+	@BeforeClass
+	public static void setUpBeforeClass() throws Exception {
+		/* Volume Unit    Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname
+		 * xxx300 3390   2016/05/25  1   45  VB    2349 27998  PS  UNLOAD.EDH.UNLOADED
+x31167 Tape                                                                               UNLOAD.EDH.UNLOADT
+xxx305 3390   2016/05/23  1   45  VB    2349 27998  PS  UNLOAD.EDH.UNLOAD1
+xxx305 3390   2016/05/25  1   45  VB    2349 27998  PS  UNLOAD.EDH.UNLOAD2
+xxx305 3390   2016/05/25  1   45  VB    2349 27998  PS  UNLOAD.EDH.UNLOAD3
+		 */
+		listing = new ArrayList<String>();
+		listing.add("Volume Unit    Referred Ext Used Recfm Lrecl BlkSz Dsorg Dsname");
+		listing.add("xxx300 3390   2016/05/25  1   45  VB    2349 27998  PS  UNLOAD.EDH.UNLOADED");
+		listing.add("x31167 Tape                                                                               UNLOAD.EDH.UNLOADT");
+		listing.add("xxx305 3390   2016/05/23  1   45  VB    2349 27998  PS  UNLOAD.EDH.UNLOAD1");
+		listing.add("xxx305 3390   2016/05/25  1   45  VB    2349 27998  PS  UNLOAD.EDH.UNLOAD2");
+		listing.add("xxx305 3390   2016/05/25  1   45  VB    2349 27998  PS  UNLOAD.EDH.UNLOAD3");
+	}
+
+	@AfterClass
+	public static void tearDownAfterClass() throws Exception {
+	}
+
+	@Before
+	public void setUp() throws Exception {
+	}
+
+	@After
+	public void tearDown() throws Exception {
+	}
+
+	@Test
+	public void testMainframeFTPFileEntryParserString() {
+		MainframeFTPFileEntryParser parser = new MainframeFTPFileEntryParser();
+		assert(parser != null);
+	}
+
+	@Test
+	public void testParseFTPEntry() {
+		parser2 = new MainframeFTPFileEntryParser();
+		int i = 0;
+		for (String j : listing) {
+			FTPFile file = parser2.parseFTPEntry(j);
+			if (file != null) {
+				i++;
+			}
+		}
+		assert(i == listing.size()-1);
+	}
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b302d89f/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java
index ecaa8d5..4c8f584 100644
--- a/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java
+++ b/src/test/org/apache/sqoop/mapreduce/mainframe/TestMainframeImportJob.java
@@ -23,7 +23,9 @@ import static org.junit.Assert.assertEquals;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b302d89f/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java b/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java
index d91bbbe..3e502d0 100644
--- a/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java
+++ b/src/test/org/apache/sqoop/tool/TestMainframeImportTool.java
@@ -24,7 +24,9 @@ import java.lang.reflect.Method;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sqoop.Sqoop;
 import org.apache.sqoop.cli.RelatedOptions;
+import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -75,6 +77,7 @@ public class TestMainframeImportTool extends BaseSqoopTestCase {
     assertTrue(rOptions.hasOption(MainframeImportTool.MAPREDUCE_JOB_NAME));
     assertTrue(rOptions.hasOption(MainframeImportTool.COMPRESS_ARG));
     assertTrue(rOptions.hasOption(MainframeImportTool.COMPRESSION_CODEC_ARG));
+    assertTrue(rOptions.hasOption(MainframeImportTool.DS_TYPE_ARG));
   }
 
   @Test
@@ -101,4 +104,78 @@ public class TestMainframeImportTool extends BaseSqoopTestCase {
     assertEquals(sqoopOption.getConnManagerClassName(), "dummy_ClassName");
     assertNull(sqoopOption.getTableName());
   }
+  
+  @Test
+  public void testDataSetTypeOptionIsSet() throws ParseException, InvalidOptionsException {
+	  String[] args = new String[] { "--datasettype", MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG };
+	  ToolOptions toolOptions = new ToolOptions();
+	  SqoopOptions sqoopOption = new SqoopOptions();
+	  mfImportTool.configureOptions(toolOptions);
+	  sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
+	  assertEquals(sqoopOption.getMainframeInputDatasetType(), MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG);
+  }
+  
+  @Test
+  public void testDefaultDataSetTypeOptionIsSet() throws ParseException, InvalidOptionsException {
+	  String[] args = new String[] {};
+	  ToolOptions toolOptions = new ToolOptions();
+	  SqoopOptions sqoopOption = new SqoopOptions();
+	  mfImportTool.configureOptions(toolOptions);
+	  sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
+	  assertEquals(sqoopOption.getMainframeInputDatasetType(), MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_PARTITIONED);
+  }
+  
+  @Test
+  public void testInvalidDataSetTypeOptionThrowsException() {
+	  // validateImportOptions gets called from Sqoop.run() function
+	  String[] args = new String[] { "--dataset", "mydataset","--datasettype", "fjfdksjjf" };
+	  ToolOptions toolOptions = new ToolOptions();
+	  SqoopOptions sqoopOption = new SqoopOptions();
+	  mfImportTool.configureOptions(toolOptions);
+	  try {
+		  sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
+		  mfImportTool.validateImportOptions(sqoopOption);
+		  String dsType = sqoopOption.getMainframeInputDatasetType();
+		  System.out.println("dsType: "+dsType);
+	  } catch (InvalidOptionsException e) {
+		  String errorMessage = "--datasettype specified is invalid.";
+		  e.printStackTrace();
+		  assert(e.getMessage().contains(errorMessage));
+	  } catch (ParseException e) {
+		e.printStackTrace();
+		assertFalse(e != null);
+	}
+  }
+
+  @Test
+  public void testTapeOptionIsSet() throws ParseException, InvalidOptionsException {
+	  String[] args = new String[] { "--tape", "true" };
+	  ToolOptions toolOptions = new ToolOptions();
+	  SqoopOptions sqoopOption = new SqoopOptions();
+	  mfImportTool.configureOptions(toolOptions);
+	  sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
+	  Boolean isTape = sqoopOption.getMainframeInputDatasetTape();
+	  assert(isTape != null && isTape.toString().equals("true"));
+  }
+  @Test
+  public void testTapeOptionDefaultIsSet() throws ParseException, InvalidOptionsException {
+	  String[] args = new String[] { "--datasettype", MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE_GDG };
+	  ToolOptions toolOptions = new ToolOptions();
+	  SqoopOptions sqoopOption = new SqoopOptions();
+	  mfImportTool.configureOptions(toolOptions);
+	  sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
+	  Boolean isTape = sqoopOption.getMainframeInputDatasetTape();
+	  assert(isTape != null && isTape.toString().equals("false"));
+  }
+  @Test
+  public void testTapeOptionInvalidReturnsFalse() throws ParseException, InvalidOptionsException {
+	  String[] args = new String[] { "--dataset", "mydatasetname", "--tape", "invalidvalue" };
+	  ToolOptions toolOptions = new ToolOptions();
+	  SqoopOptions sqoopOption = new SqoopOptions();
+	  mfImportTool.configureOptions(toolOptions);
+	  sqoopOption = mfImportTool.parseArguments(args, null, sqoopOption, false);
+	  mfImportTool.validateImportOptions(sqoopOption);
+	  Boolean isTape = sqoopOption.getMainframeInputDatasetTape();
+	  assert(isTape != null && isTape.toString().equals("false"));
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b302d89f/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java
index 6b89502..d87c75d 100644
--- a/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java
+++ b/src/test/org/apache/sqoop/util/TestMainframeFTPClientUtils.java
@@ -24,11 +24,14 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 
+import java.util.List;
 import org.apache.commons.net.ftp.FTPClient;
+import org.apache.commons.net.ftp.FTPFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.sqoop.mapreduce.JobBase;
 import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -155,4 +158,158 @@ public class TestMainframeFTPClientUtils {
           ioe.toString());
     }
   }
+
+  @Test
+  public void testListSequentialDatasets() {
+    try {
+      when(mockFTPClient.login("user", "pssword")).thenReturn(true);
+      when(mockFTPClient.logout()).thenReturn(true);
+      when(mockFTPClient.isConnected()).thenReturn(false);
+      when(mockFTPClient.getReplyCode()).thenReturn(200);
+      when(mockFTPClient.changeWorkingDirectory("a.b.c")).thenReturn(true);
+        FTPFile file1 = new FTPFile();
+        file1.setName("blah1");
+        file1.setType(FTPFile.FILE_TYPE);
+        FTPFile file2 = new FTPFile();
+        file2.setName("blah2");
+        file2.setType(FTPFile.FILE_TYPE);
+      when(mockFTPClient.listFiles()).thenReturn(new FTPFile[] {file1,file2});
+    } catch (IOException e) {
+      fail("No IOException should be thrown!");
+    }
+
+    conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
+    conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
+    conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
+    conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"s");
+    conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1");
+    // set the password in the secure credentials object
+    Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
+    conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
+            "pssword".getBytes());
+
+    try {
+      List<String> files = MainframeFTPClientUtils.listSequentialDatasets("a.b.c.blah1", conf);
+        Assert.assertEquals(1,files.size());
+    } catch (IOException ioe) {
+		fail("No IOException should be thrown!");
+    }
+  }
+
+  @Test
+  public void testListEmptySequentialDatasets() {
+    try {
+      when(mockFTPClient.login("user", "pssword")).thenReturn(true);
+      when(mockFTPClient.logout()).thenReturn(true);
+      when(mockFTPClient.isConnected()).thenReturn(false);
+      when(mockFTPClient.getReplyCode()).thenReturn(200);
+      when(mockFTPClient.changeWorkingDirectory("a.b.c")).thenReturn(true);
+        FTPFile file1 = new FTPFile();
+        file1.setName("blah0");
+        file1.setType(FTPFile.FILE_TYPE);
+        FTPFile file2 = new FTPFile();
+        file2.setName("blah2");
+        file2.setType(FTPFile.FILE_TYPE);
+      when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2});
+    } catch (IOException e) {
+      fail("No IOException should be thrown!");
+    }
+
+    conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
+    conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
+    conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
+    conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"s");
+    conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1");
+    // set the password in the secure credentials object
+    Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
+    conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
+            "pssword".getBytes());
+
+    try {
+		String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
+		List<String> files = MainframeFTPClientUtils.listSequentialDatasets(dsName, conf);
+        Assert.assertEquals(0,files.size());
+    } catch (IOException ioe) {
+		fail("No IOException should be thrown!");
+    }
+  }
+
+  @Test
+  public void testTrailingDotSequentialDatasets() {
+    try {
+      when(mockFTPClient.login("user", "pssword")).thenReturn(true);
+      when(mockFTPClient.logout()).thenReturn(true);
+      when(mockFTPClient.isConnected()).thenReturn(false);
+      when(mockFTPClient.getReplyCode()).thenReturn(200);
+      when(mockFTPClient.changeWorkingDirectory("'a.b.c.blah1'")).thenThrow(new IOException("Folder not found"));
+        FTPFile file1 = new FTPFile();
+        file1.setName("blah1");
+        file1.setType(FTPFile.FILE_TYPE);
+        FTPFile file2 = new FTPFile();
+        file2.setName("blah2");
+        file2.setType(FTPFile.FILE_TYPE);
+      when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2});
+    } catch (IOException e) {
+      fail("No IOException should be thrown!");
+    }
+
+    conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
+    conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
+    conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
+    conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"s");
+    conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.blah1.");
+    // set the password in the secure credentials object
+    Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
+    conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
+            "pssword".getBytes());
+
+    try {
+		String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
+		List<String> files = MainframeFTPClientUtils.listSequentialDatasets(dsName, conf);
+        Assert.assertEquals(0,files.size());
+    } catch (IOException ioe) {
+		String ioeString = ioe.getMessage();
+		Assert.assertEquals("Could not list datasets from a.b.c.blah1:java.io.IOException: Folder not found",ioeString);
+    }
+  }
+  
+  @Test
+  public void testGdgGetLatest() {
+	  try {
+	      when(mockFTPClient.login("user", "pssword")).thenReturn(true);
+	      when(mockFTPClient.logout()).thenReturn(true);
+	      when(mockFTPClient.isConnected()).thenReturn(false);
+	      when(mockFTPClient.getReplyCode()).thenReturn(200);
+	      when(mockFTPClient.changeWorkingDirectory("'a.b.c'")).thenReturn(true);
+	        FTPFile file1 = new FTPFile();
+	        file1.setName("G0100V00");
+	        file1.setType(FTPFile.FILE_TYPE);
+	        FTPFile file2 = new FTPFile();
+	        file2.setName("G0101V00");
+	        file2.setType(FTPFile.FILE_TYPE);
+	      when(mockFTPClient.listFiles()).thenReturn(new FTPFile[]{file1,file2});
+	    } catch (IOException e) {
+	      fail("No IOException should be thrown!");
+	    }
+
+	    conf.set(DBConfiguration.URL_PROPERTY, "localhost:11111");
+	    conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
+	    conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
+	    conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_TYPE,"g");
+	    conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME,"a.b.c.d");
+	    // set the password in the secure credentials object
+	    Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
+	    conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
+	            "pssword".getBytes());
+
+	    try {
+			String dsName = conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
+			List<String> files = MainframeFTPClientUtils.listSequentialDatasets(dsName, conf);
+	        Assert.assertEquals("G0101V00", files.get(0));
+			Assert.assertEquals(1,files.size());
+	    } catch (IOException ioe) {
+			String ioeString = ioe.getMessage();
+			Assert.assertEquals("Could not list datasets from a.b.c.blah1:java.io.IOException: Folder not found",ioeString);
+	    }
+  }
 }


Mime
View raw message