sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject [1/2] SQOOP-777: Sqoop2: Implement intermediate data format representation policy
Date Sat, 26 Jul 2014 18:39:24 GMT
Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 17c7219b9 -> 3c93930bf


http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
index b7079dd..8061c78 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
@@ -30,10 +30,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
 import org.apache.sqoop.job.etl.HdfsExportExtractor;
 import org.apache.sqoop.job.etl.HdfsExportPartitioner;
 import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
@@ -45,6 +47,9 @@ import org.apache.sqoop.job.io.Data;
 import org.apache.sqoop.job.mr.ConfigurationUtils;
 import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
 import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.FloatingPoint;
 import org.junit.Test;
 
 public class TestHdfsExtract extends TestCase {
@@ -53,12 +58,22 @@ public class TestHdfsExtract extends TestCase {
   private static final int NUMBER_OF_FILES = 5;
   private static final int NUMBER_OF_ROWS_PER_FILE = 1000;
 
-  private String indir;
+  private final String indir;
 
   public TestHdfsExtract() {
     indir = INPUT_ROOT + getClass().getSimpleName();
   }
 
+  @Override
+  public void setUp() throws IOException {
+    FileUtils.mkdirs(indir);
+  }
+
+  @Override
+  public void tearDown() throws IOException {
+    FileUtils.delete(indir);
+  }
+
   /**
    * Test case for validating the number of partitions creation
    * based on input.
@@ -68,12 +83,12 @@ public class TestHdfsExtract extends TestCase {
    */
   @Test
   public void testHdfsExportPartitioner() throws Exception {
-    FileUtils.delete(indir);
-    FileUtils.mkdirs(indir);
     createTextInput(null);
     Configuration conf = new Configuration();
     conf.set(JobConstants.HADOOP_INPUTDIR, indir);
 
+    conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+      CSVIntermediateDataFormat.class.getName());
     HdfsExportPartitioner partitioner = new HdfsExportPartitioner();
     PrefixContext prefixContext = new PrefixContext(conf, "");
     int[] partitionValues = {2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 17};
@@ -87,87 +102,67 @@ public class TestHdfsExtract extends TestCase {
 
   @Test
   public void testUncompressedText() throws Exception {
-    FileUtils.delete(indir);
-    FileUtils.mkdirs(indir);
     createTextInput(null);
 
-    Configuration conf = new Configuration();
-    ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
-    conf.set(JobConstants.JOB_ETL_PARTITIONER,
-        HdfsExportPartitioner.class.getName());
-    conf.set(JobConstants.JOB_ETL_EXTRACTOR,
-        HdfsExportExtractor.class.getName());
-    conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
-    conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
-    conf.set(JobConstants.HADOOP_INPUTDIR, indir);
-    JobUtils.runJob(conf);
+    JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
   }
 
   @Test
-  public void testCompressedText() throws Exception {
-    FileUtils.delete(indir);
-    FileUtils.mkdirs(indir);
+  public void testDefaultCompressedText() throws Exception {
     createTextInput(SqoopFileOutputFormat.DEFAULT_CODEC);
 
-    Configuration conf = new Configuration();
-    ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
-    conf.set(JobConstants.JOB_ETL_PARTITIONER,
-        HdfsExportPartitioner.class.getName());
-    conf.set(JobConstants.JOB_ETL_EXTRACTOR,
-        HdfsExportExtractor.class.getName());
-    conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
-    conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
-    conf.set(JobConstants.HADOOP_INPUTDIR, indir);
-    JobUtils.runJob(conf);
+    JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
+  }
 
-    FileUtils.delete(indir);
-    FileUtils.mkdirs(indir);
+  @Test
+  public void testBZip2CompressedText() throws Exception {
     createTextInput(BZip2Codec.class);
 
-    conf.set(JobConstants.JOB_ETL_PARTITIONER,
-        HdfsExportPartitioner.class.getName());
-    conf.set(JobConstants.JOB_ETL_EXTRACTOR,
-        HdfsExportExtractor.class.getName());
-    conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
-    conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
-    conf.set(JobConstants.HADOOP_INPUTDIR, indir);
-    JobUtils.runJob(conf);
+    JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
   }
 
   @Test
-  public void testCompressedSequence() throws Exception {
-    FileUtils.delete(indir);
-    FileUtils.mkdirs(indir);
+  public void testDefaultCompressedSequence() throws Exception {
     createSequenceInput(SqoopFileOutputFormat.DEFAULT_CODEC);
 
-    Configuration conf = new Configuration();
-    ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
-    conf.set(JobConstants.JOB_ETL_PARTITIONER,
-        HdfsExportPartitioner.class.getName());
-    conf.set(JobConstants.JOB_ETL_EXTRACTOR,
-        HdfsExportExtractor.class.getName());
-    conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
-    conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
-    conf.set(JobConstants.HADOOP_INPUTDIR, indir);
-    JobUtils.runJob(conf);
+    JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
   }
 
   @Test
   public void testUncompressedSequence() throws Exception {
-    FileUtils.delete(indir);
-    FileUtils.mkdirs(indir);
     createSequenceInput(null);
 
+    JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
+  }
+
+  private Schema createSchema() {
+    Schema schema = new Schema("Test");
+    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+    .addColumn(new org.apache.sqoop.schema.type.Text("3"));
+    return schema;
+  }
+
+  private Configuration createConf() {
     Configuration conf = new Configuration();
     ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
-    conf.set(JobConstants.JOB_ETL_PARTITIONER,
+    conf.setIfUnset(JobConstants.JOB_ETL_PARTITIONER,
         HdfsExportPartitioner.class.getName());
-    conf.set(JobConstants.JOB_ETL_EXTRACTOR,
+    conf.setIfUnset(JobConstants.JOB_ETL_EXTRACTOR,
         HdfsExportExtractor.class.getName());
-    conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
-    conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
-    conf.set(JobConstants.HADOOP_INPUTDIR, indir);
-    JobUtils.runJob(conf);
+    conf.setIfUnset(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
+    conf.setIfUnset(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
+    conf.setIfUnset(JobConstants.INTERMEDIATE_DATA_FORMAT,
+        CSVIntermediateDataFormat.class.getName());
+    conf.setIfUnset(JobConstants.HADOOP_INPUTDIR, indir);
+    return conf;
+  }
+
+  private Job createJob(Configuration conf, Schema schema) throws Exception {
+    Job job = new Job(conf);
+    ConfigurationUtils.setConnectorSchema(job, schema);
+    job.getConfiguration().set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+        CSVIntermediateDataFormat.class.getName());
+    return job;
   }
 
   private void createTextInput(Class<? extends CompressionCodec> clz)
@@ -227,11 +222,11 @@ public class TestHdfsExtract extends TestCase {
       SequenceFile.Writer filewriter;
       if (codec != null) {
         filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
-          conf, filepath, Text.class, NullWritable.class,
-          CompressionType.BLOCK, codec);
+            conf, filepath, Text.class, NullWritable.class,
+            CompressionType.BLOCK, codec);
       } else {
         filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
-          conf, filepath, Text.class, NullWritable.class, CompressionType.NONE);
+            conf, filepath, Text.class, NullWritable.class, CompressionType.NONE);
       }
 
       Text text = new Text();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
index f849aae..721bba6 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
@@ -26,6 +26,7 @@ import java.io.InputStreamReader;
 import java.util.LinkedList;
 import java.util.List;
 
+import com.google.common.base.Charsets;
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
@@ -33,7 +34,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
 import org.apache.sqoop.job.etl.Extractor;
 import org.apache.sqoop.job.etl.ExtractorContext;
 import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
@@ -45,6 +48,9 @@ import org.apache.sqoop.job.io.Data;
 import org.apache.sqoop.job.mr.ConfigurationUtils;
 import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
 import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.FloatingPoint;
 
 public class TestHdfsLoad extends TestCase {
 
@@ -68,13 +74,21 @@ public class TestHdfsLoad extends TestCase {
     conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
+    conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+      CSVIntermediateDataFormat.class.getName());
     conf.set(JobConstants.HADOOP_OUTDIR, outdir);
-    JobUtils.runJob(conf);
+    Schema schema = new Schema("Test");
+    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+      .addColumn(new org.apache.sqoop.schema.type.Text("3"));
+
+    Job job = new Job(conf);
+    ConfigurationUtils.setConnectorSchema(job, schema);
+    JobUtils.runJob(job.getConfiguration());
 
     String fileName = outdir + "/" +  OUTPUT_FILE;
     InputStream filestream = FileUtils.open(fileName);
     BufferedReader filereader = new BufferedReader(new InputStreamReader(
-        filestream, Data.CHARSET_NAME));
+        filestream, Charsets.UTF_8));
     verifyOutputText(filereader);
   }
 
@@ -86,9 +100,18 @@ public class TestHdfsLoad extends TestCase {
     conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
+    conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+      CSVIntermediateDataFormat.class.getName());
     conf.set(JobConstants.HADOOP_OUTDIR, outdir);
     conf.setBoolean(JobConstants.HADOOP_COMPRESS, true);
-    JobUtils.runJob(conf);
+
+    Schema schema = new Schema("Test");
+    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+      .addColumn(new org.apache.sqoop.schema.type.Text("3"));
+
+    Job job = new Job(conf);
+    ConfigurationUtils.setConnectorSchema(job, schema);
+    JobUtils.runJob(job.getConfiguration());
 
     Class<? extends CompressionCodec> codecClass = conf.getClass(
         JobConstants.HADOOP_COMPRESS_CODEC, SqoopFileOutputFormat.DEFAULT_CODEC)
@@ -97,7 +120,7 @@ public class TestHdfsLoad extends TestCase {
     String fileName = outdir + "/" +  OUTPUT_FILE + codec.getDefaultExtension();
     InputStream filestream = codec.createInputStream(FileUtils.open(fileName));
     BufferedReader filereader = new BufferedReader(new InputStreamReader(
-        filestream, Data.CHARSET_NAME));
+        filestream, Charsets.UTF_8));
     verifyOutputText(filereader);
   }
 
@@ -108,7 +131,7 @@ public class TestHdfsLoad extends TestCase {
     int index = START_ID*NUMBER_OF_ROWS_PER_ID;
     while ((actual = reader.readLine()) != null){
       data.setContent(new Object[] {
-        index, (double) index, String.valueOf(index) },
+        index, (double) index, new String(new byte[] {(byte)(index + 127)}, Charsets.ISO_8859_1)
},
           Data.ARRAY_RECORD);
       expected = data.toString();
       index++;
@@ -129,8 +152,17 @@ public class TestHdfsLoad extends TestCase {
     conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
+    conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+      CSVIntermediateDataFormat.class.getName());
     conf.set(JobConstants.HADOOP_OUTDIR, outdir);
-    JobUtils.runJob(conf);
+
+    Schema schema = new Schema("Test");
+    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+      .addColumn(new org.apache.sqoop.schema.type.Text("3"));
+
+    Job job = new Job(conf);
+    ConfigurationUtils.setConnectorSchema(job, schema);
+    JobUtils.runJob(job.getConfiguration());
 
     Path filepath = new Path(outdir,
         OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION);
@@ -147,10 +179,18 @@ public class TestHdfsLoad extends TestCase {
     conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
+    conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+      CSVIntermediateDataFormat.class.getName());
     conf.set(JobConstants.HADOOP_OUTDIR, outdir);
     conf.setBoolean(JobConstants.HADOOP_COMPRESS, true);
-    JobUtils.runJob(conf);
 
+    Schema schema = new Schema("Test");
+    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+      .addColumn(new org.apache.sqoop.schema.type.Text("3"));
+
+    Job job = new Job(conf);
+    ConfigurationUtils.setConnectorSchema(job, schema);
+    JobUtils.runJob(job.getConfiguration());
     Path filepath = new Path(outdir,
         OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION);
     SequenceFile.Reader filereader = new SequenceFile.Reader(filepath.getFileSystem(conf),
filepath, conf);
@@ -164,7 +204,7 @@ public class TestHdfsLoad extends TestCase {
     Data data = new Data();
     while (reader.next(actual)){
       data.setContent(new Object[] {
-          index, (double) index, String.valueOf(index) },
+          index, (double) index, new String(new byte[] {(byte)(index + 127)}, Charsets.ISO_8859_1)
},
           Data.ARRAY_RECORD);
       expected.set(data.toString());
       index++;
@@ -225,7 +265,7 @@ public class TestHdfsLoad extends TestCase {
         Object[] array = new Object[] {
           id * NUMBER_OF_ROWS_PER_ID + row,
           (double) (id * NUMBER_OF_ROWS_PER_ID + row),
-          String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row)
+          new String(new byte[]{(byte)(id * NUMBER_OF_ROWS_PER_ID + row + 127)}, Charsets.ISO_8859_1)
         };
         context.getDataWriter().writeArrayRecord(array);
       }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index 7b264c6..ba16b3c 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
 import org.apache.sqoop.job.etl.Extractor;
 import org.apache.sqoop.job.etl.ExtractorContext;
 import org.apache.sqoop.job.etl.Loader;
@@ -42,12 +43,17 @@ import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.Partitioner;
 import org.apache.sqoop.job.etl.PartitionerContext;
 import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.SqoopWritable;
 import org.apache.sqoop.job.mr.ConfigurationUtils;
 import org.apache.sqoop.job.mr.SqoopInputFormat;
 import org.apache.sqoop.job.mr.SqoopMapper;
 import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
 import org.apache.sqoop.job.mr.SqoopSplit;
 import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.FloatingPoint;
+import org.apache.sqoop.schema.type.Text;
 
 public class TestMapReduce extends TestCase {
 
@@ -59,6 +65,8 @@ public class TestMapReduce extends TestCase {
     Configuration conf = new Configuration();
     ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
     conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+    conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+      CSVIntermediateDataFormat.class.getName());
     Job job = new Job(conf);
 
     SqoopInputFormat inputformat = new SqoopInputFormat();
@@ -77,8 +85,15 @@ public class TestMapReduce extends TestCase {
     ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
     conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+    conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+      CSVIntermediateDataFormat.class.getName());
+    Schema schema = new Schema("Test");
+    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+      .addColumn(new org.apache.sqoop.schema.type.Text("3"));
 
-    JobUtils.runJob(conf, SqoopInputFormat.class, SqoopMapper.class,
+    Job job = new Job(conf);
+    ConfigurationUtils.setConnectorSchema(job, schema);
+    JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
         DummyOutputFormat.class);
   }
 
@@ -88,8 +103,15 @@ public class TestMapReduce extends TestCase {
     conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
+    conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+      CSVIntermediateDataFormat.class.getName());
+    Schema schema = new Schema("Test");
+    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+      .addColumn(new Text("3"));
 
-    JobUtils.runJob(conf, SqoopInputFormat.class, SqoopMapper.class,
+    Job job = new Job(conf);
+    ConfigurationUtils.setConnectorSchema(job, schema);
+    JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
         SqoopNullOutputFormat.class);
   }
 
@@ -152,14 +174,14 @@ public class TestMapReduce extends TestCase {
   }
 
   public static class DummyOutputFormat
-      extends OutputFormat<Data, NullWritable> {
+      extends OutputFormat<SqoopWritable, NullWritable> {
     @Override
     public void checkOutputSpecs(JobContext context) {
       // do nothing
     }
 
     @Override
-    public RecordWriter<Data, NullWritable> getRecordWriter(
+    public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(
         TaskAttemptContext context) {
       return new DummyRecordWriter();
     }
@@ -170,12 +192,13 @@ public class TestMapReduce extends TestCase {
     }
 
     public static class DummyRecordWriter
-        extends RecordWriter<Data, NullWritable> {
+        extends RecordWriter<SqoopWritable, NullWritable> {
       private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
       private Data data = new Data();
 
       @Override
-      public void write(Data key, NullWritable value) {
+      public void write(SqoopWritable key, NullWritable value) {
+
         data.setContent(new Object[] {
           index,
           (double) index,
@@ -215,22 +238,22 @@ public class TestMapReduce extends TestCase {
   public static class DummyLoader extends Loader {
     private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
     private Data expected = new Data();
-    private Data actual = new Data();
+    private CSVIntermediateDataFormat actual = new CSVIntermediateDataFormat();
 
     @Override
     public void load(LoaderContext context, Object oc, Object oj) throws Exception{
-      Object[] array;
-      while ((array = context.getDataReader().readArrayRecord()) != null) {
-        actual.setContent(array, Data.ARRAY_RECORD);
+      String data;
+      while ((data = context.getDataReader().readTextRecord()) != null) {
 
+//        actual.setSchema(context.getSchema());
+//        actual.setObjectData(array, false);
         expected.setContent(new Object[] {
           index,
           (double) index,
           String.valueOf(index)},
           Data.ARRAY_RECORD);
         index++;
-
-        assertEquals(expected.toString(), actual.toString());
+        assertEquals(expected.toString(), data);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java
new file mode 100644
index 0000000..b78b140
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.job.io;
+
+import com.google.common.base.Charsets;
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
+import org.apache.sqoop.job.JobConstants;
+
+public class SqoopWritableTest extends TestCase {
+
+  private final SqoopWritable writable = new SqoopWritable();
+
+  public void testStringInStringOut() {
+    String testData = "Live Long and prosper";
+    writable.setString(testData);
+    Assert.assertEquals(testData,writable.getString());
+  }
+
+  public void testDataWritten() throws IOException {
+    String testData = "One ring to rule them all";
+    byte[] testDataBytes = testData.getBytes(Charsets.UTF_8);
+    writable.setString(testData);
+    ByteArrayOutputStream ostream = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(ostream);
+    writable.write(out);
+    byte[] written = ostream.toByteArray();
+    InputStream instream = new ByteArrayInputStream(written);
+    DataInput in = new DataInputStream(instream);
+    String readData = in.readUTF();
+    Assert.assertEquals(testData, readData);
+  }
+
+  public void testDataRead() throws IOException {
+    String testData = "Brandywine Bridge - 20 miles!";
+    ByteArrayOutputStream ostream = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(ostream);
+    out.writeUTF(testData);
+    InputStream instream = new ByteArrayInputStream(ostream.toByteArray());
+    DataInput in = new DataInputStream(instream);
+    writable.readFields(in);
+    Assert.assertEquals(testData, writable.getString());
+  }
+
+  public void testWriteReadUsingStream() throws IOException {
+    String testData = "You shall not pass";
+    ByteArrayOutputStream ostream = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(ostream);
+    writable.setString(testData);
+    writable.write(out);
+    byte[] written = ostream.toByteArray();
+
+    //Don't test what the data is, test that SqoopWritable can read it.
+    InputStream instream = new ByteArrayInputStream(written);
+    SqoopWritable newWritable = new SqoopWritable();
+    DataInput in = new DataInputStream(instream);
+    newWritable.readFields(in);
+    Assert.assertEquals(testData, newWritable.getString());
+    ostream.close();
+    instream.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
index bee8ab7..1f55f1b 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
@@ -23,11 +23,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.etl.Loader;
 import org.apache.sqoop.job.etl.LoaderContext;
-import org.apache.sqoop.job.io.Data;
 import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.job.io.SqoopWritable;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -47,7 +49,7 @@ public class TestSqoopOutputFormatLoadExecutor {
 
     @Override
     public void load(LoaderContext context, Object cc, Object jc) throws Exception {
-      context.getDataReader().readContent(Data.CSV_RECORD);
+      context.getDataReader().readTextRecord();
       throw new BrokenBarrierException();
     }
   }
@@ -62,7 +64,7 @@ public class TestSqoopOutputFormatLoadExecutor {
       int runCount = 0;
       Object o;
       String[] arr;
-      while ((o = context.getDataReader().readContent(Data.CSV_RECORD)) != null) {
+      while ((o = context.getDataReader().readTextRecord()) != null) {
         arr = o.toString().split(",");
         Assert.assertEquals(100, arr.length);
         for (int i = 0; i < arr.length; i++) {
@@ -84,7 +86,7 @@ public class TestSqoopOutputFormatLoadExecutor {
 
     @Override
     public void load(LoaderContext context, Object cc, Object jc) throws Exception {
-      String[] arr = context.getDataReader().readContent(Data.CSV_RECORD).toString().split(",");
+      String[] arr = context.getDataReader().readTextRecord().toString().split(",");
       Assert.assertEquals(100, arr.length);
       for (int i = 0; i < arr.length; i++) {
         Assert.assertEquals(i, Integer.parseInt(arr[i]));
@@ -103,7 +105,7 @@ public class TestSqoopOutputFormatLoadExecutor {
       int runCount = 0;
       Object o;
       String[] arr;
-      while ((o = context.getDataReader().readContent(Data.CSV_RECORD)) != null) {
+      while ((o = context.getDataReader().readTextRecord()) != null) {
         arr = o.toString().split(",");
         Assert.assertEquals(100, arr.length);
         for (int i = 0; i < arr.length; i++) {
@@ -119,6 +121,7 @@ public class TestSqoopOutputFormatLoadExecutor {
   @Before
   public void setUp() {
     conf = new Configuration();
+    conf.setIfUnset(JobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
 
   }
 
@@ -128,12 +131,14 @@ public class TestSqoopOutputFormatLoadExecutor {
     conf.set(JobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName());
     SqoopOutputFormatLoadExecutor executor = new
         SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName());
-    RecordWriter<Data, NullWritable> writer = executor.getRecordWriter();
-    Data data = new Data();
+    RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
+    IntermediateDataFormat data = new CSVIntermediateDataFormat();
+    SqoopWritable writable = new SqoopWritable();
     try {
       for (int count = 0; count < 100; count++) {
-        data.setContent(String.valueOf(count), Data.CSV_RECORD);
-        writer.write(data, null);
+        data.setTextData(String.valueOf(count));
+        writable.setString(data.getTextData());
+        writer.write(writable, null);
       }
     } catch (SqoopException ex) {
       throw ex.getCause();
@@ -146,8 +151,9 @@ public class TestSqoopOutputFormatLoadExecutor {
     conf.set(JobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName());
     SqoopOutputFormatLoadExecutor executor = new
         SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName());
-    RecordWriter<Data, NullWritable> writer = executor.getRecordWriter();
-    Data data = new Data();
+    RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
+    IntermediateDataFormat data = new CSVIntermediateDataFormat();
+    SqoopWritable writable = new SqoopWritable();
     for (int i = 0; i < 10; i++) {
       StringBuilder builder = new StringBuilder();
       for (int count = 0; count < 100; count++) {
@@ -156,8 +162,9 @@ public class TestSqoopOutputFormatLoadExecutor {
           builder.append(",");
         }
       }
-      data.setContent(builder.toString(), Data.CSV_RECORD);
-      writer.write(data, null);
+      data.setTextData(builder.toString());
+      writable.setString(data.getTextData());
+      writer.write(writable, null);
     }
     writer.close(null);
   }
@@ -166,8 +173,9 @@ public class TestSqoopOutputFormatLoadExecutor {
   public void testSuccessfulLoader() throws Throwable {
     SqoopOutputFormatLoadExecutor executor = new
         SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName());
-    RecordWriter<Data, NullWritable> writer = executor.getRecordWriter();
-    Data data = new Data();
+    RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
+    IntermediateDataFormat data = new CSVIntermediateDataFormat();
+    SqoopWritable writable = new SqoopWritable();
     StringBuilder builder = new StringBuilder();
     for (int count = 0; count < 100; count++) {
       builder.append(String.valueOf(count));
@@ -175,8 +183,10 @@ public class TestSqoopOutputFormatLoadExecutor {
         builder.append(",");
       }
     }
-    data.setContent(builder.toString(), Data.CSV_RECORD);
-    writer.write(data, null);
+    data.setTextData(builder.toString());
+    writable.setString(data.getTextData());
+    writer.write(writable, null);
+
     //Allow writer to complete.
     TimeUnit.SECONDS.sleep(5);
     writer.close(null);
@@ -189,8 +199,9 @@ public class TestSqoopOutputFormatLoadExecutor {
     conf.set(JobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName());
     SqoopOutputFormatLoadExecutor executor = new
         SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName());
-    RecordWriter<Data, NullWritable> writer = executor.getRecordWriter();
-    Data data = new Data();
+    RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
+    IntermediateDataFormat data = new CSVIntermediateDataFormat();
+    SqoopWritable writable = new SqoopWritable();
     try {
       for (int i = 0; i < 10; i++) {
         StringBuilder builder = new StringBuilder();
@@ -200,8 +211,9 @@ public class TestSqoopOutputFormatLoadExecutor {
             builder.append(",");
           }
         }
-        data.setContent(builder.toString(), Data.CSV_RECORD);
-        writer.write(data, null);
+        data.setTextData(builder.toString());
+        writable.setString(data.getTextData());
+        writer.write(writable, null);
       }
       writer.close(null);
     } catch (SqoopException ex) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1e2f005..a722c74 100644
--- a/pom.xml
+++ b/pom.xml
@@ -143,12 +143,6 @@ limitations under the License.
           </dependency>
 
           <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-            <version>${guava.version}</version>
-          </dependency>
-
-          <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-core</artifactId>
             <version>${hadoop.1.version}</version>
@@ -345,6 +339,11 @@ limitations under the License.
         <version>${commons-lang.version}</version>
       </dependency>
       <dependency>
+        <groupId>com.google.guava</groupId>
+        <artifactId>guava</artifactId>
+        <version>${guava.version}</version>
+      </dependency>
+      <dependency>
         <groupId>javax.servlet</groupId>
         <artifactId>servlet-api</artifactId>
         <version>${servlet.version}</version>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/spi/pom.xml
----------------------------------------------------------------------
diff --git a/spi/pom.xml b/spi/pom.xml
index 0b240e8..43f17d4 100644
--- a/spi/pom.xml
+++ b/spi/pom.xml
@@ -36,5 +36,10 @@ limitations under the License.
       <groupId>org.apache.sqoop</groupId>
       <artifactId>sqoop-common</artifactId>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>connector-sdk</artifactId>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
index 2becc56..50eb940 100644
--- a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
+++ b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
@@ -20,6 +20,8 @@ package org.apache.sqoop.connector.spi;
 import java.util.Locale;
 import java.util.ResourceBundle;
 
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
 import org.apache.sqoop.job.etl.Exporter;
 import org.apache.sqoop.job.etl.Importer;
 import org.apache.sqoop.model.MJob;
@@ -79,4 +81,14 @@ public abstract class SqoopConnector {
    */
   public abstract MetadataUpgrader getMetadataUpgrader();
 
+  /**
+   * Returns the {@linkplain IntermediateDataFormat} this connector
+   * can return natively in. This will support retrieving the data as text
+   * and an array of objects. This should never return null.
+   *
+   * @return {@linkplain IntermediateDataFormat} object
+   */
+  public Class<? extends IntermediateDataFormat<?>> getIntermediateDataFormat()
{
+    return CSVIntermediateDataFormat.class;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index bfc28ef..a05274a 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -198,6 +198,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
       ConfigurationUtils.setConfigConnectorJob(job, request.getConfigConnectorJob());
       ConfigurationUtils.setConfigFrameworkConnection(job, request.getConfigFrameworkConnection());
       ConfigurationUtils.setConfigFrameworkJob(job, request.getConfigFrameworkJob());
+      ConfigurationUtils.setConnectorSchema(job, request.getSummary().getConnectorSchema());
 
       if(request.getJobName() != null) {
         job.setJobName("Sqoop: " + request.getJobName());


Mime
View raw message