sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [3/5] SQOOP-666 Introduce execution engine (Jarek Jarcec Cecho)
Date Tue, 06 Nov 2012 20:53:35 GMT
http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/test/java/org/apache/sqoop/job/JobUtils.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/job/JobUtils.java b/core/src/test/java/org/apache/sqoop/job/JobUtils.java
deleted file mode 100644
index e6ead3f..0000000
--- a/core/src/test/java/org/apache/sqoop/job/JobUtils.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job;
-
-import java.io.IOException;
-
-import junit.framework.Assert;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
-import org.apache.sqoop.job.mr.SqoopInputFormat;
-import org.apache.sqoop.job.mr.SqoopMapper;
-import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
-import org.apache.sqoop.job.mr.SqoopSplit;
-
-public class JobUtils {
-
-  public static void runJob(Configuration conf)
-      throws IOException, InterruptedException, ClassNotFoundException {
-    runJob(conf, SqoopInputFormat.class, SqoopMapper.class,
-        (conf.get(FileOutputFormat.OUTDIR) != null) ?
-        SqoopFileOutputFormat.class : SqoopNullOutputFormat.class);
-  }
-
-  public static void runJob(Configuration conf,
-      Class<? extends InputFormat<SqoopSplit, NullWritable>> input,
-      Class<? extends Mapper<SqoopSplit, NullWritable, Data, NullWritable>> mapper,
-      Class<? extends OutputFormat<Data, NullWritable>> output)
-      throws IOException, InterruptedException, ClassNotFoundException {
-    Job job = Job.getInstance(conf);
-    job.setInputFormatClass(input);
-    job.setMapperClass(mapper);
-    job.setMapOutputKeyClass(Data.class);
-    job.setMapOutputValueClass(NullWritable.class);
-    job.setOutputFormatClass(output);
-    job.setOutputKeyClass(Data.class);
-    job.setOutputValueClass(NullWritable.class);
-
-    boolean success = job.waitForCompletion(true);
-    Assert.assertEquals("Job failed!", true, success);
-  }
-
-  private JobUtils() {
-    // Disable explicit object creation
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
deleted file mode 100644
index c74faa2..0000000
--- a/core/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job;
-
-import java.io.BufferedReader;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.LinkedList;
-import java.util.List;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.sqoop.job.etl.Extractor;
-import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
-import org.apache.sqoop.job.etl.HdfsTextImportLoader;
-import org.apache.sqoop.job.etl.Partition;
-import org.apache.sqoop.job.etl.Partitioner;
-import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.io.DataWriter;
-import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
-import org.junit.Test;
-
-public class TestHdfsLoad extends TestCase {
-
-  private static final String OUTPUT_ROOT = "/tmp/sqoop/warehouse/";
-  private static final String OUTPUT_FILE = "part-r-00000";
-  private static final int START_ID = 1;
-  private static final int NUMBER_OF_IDS = 9;
-  private static final int NUMBER_OF_ROWS_PER_ID = 10;
-
-  private String outdir;
-
-  public TestHdfsLoad() {
-    outdir = OUTPUT_ROOT + "/" + getClass().getSimpleName();
-  }
-
-  public void testVoid() {}
-  /*
-  @Test
-  public void testUncompressedText() throws Exception {
-    FileUtils.delete(outdir);
-
-    Configuration conf = new Configuration();
-    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
-    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
-    conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
-    conf.set(FileOutputFormat.OUTDIR, outdir);
-    JobUtils.runJob(conf);
-
-    String fileName = outdir + "/" +  OUTPUT_FILE;
-    InputStream filestream = FileUtils.open(fileName);
-    BufferedReader filereader = new BufferedReader(new InputStreamReader(
-        filestream, Data.CHARSET_NAME));
-    verifyOutputText(filereader);
-  }
-
-  @Test
-  public void testCompressedText() throws Exception {
-    FileUtils.delete(outdir);
-
-    Configuration conf = new Configuration();
-    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
-    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
-    conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
-    conf.set(FileOutputFormat.OUTDIR, outdir);
-    conf.setBoolean(FileOutputFormat.COMPRESS, true);
-    JobUtils.runJob(conf);
-
-    Class<? extends CompressionCodec> codecClass = conf.getClass(
-        FileOutputFormat.COMPRESS_CODEC, SqoopFileOutputFormat.DEFAULT_CODEC)
-        .asSubclass(CompressionCodec.class);
-    CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
-    String fileName = outdir + "/" +  OUTPUT_FILE + codec.getDefaultExtension();
-    InputStream filestream = codec.createInputStream(FileUtils.open(fileName));
-    BufferedReader filereader = new BufferedReader(new InputStreamReader(
-        filestream, Data.CHARSET_NAME));
-    verifyOutputText(filereader);
-  }
-
-  private void verifyOutputText(BufferedReader reader) throws IOException {
-    String actual = null;
-    String expected;
-    Data data = new Data();
-    int index = START_ID*NUMBER_OF_ROWS_PER_ID;
-    while ((actual = reader.readLine()) != null){
-      data.setContent(new Object[] {
-          new Integer(index), new Double(index), String.valueOf(index) },
-          Data.ARRAY_RECORD);
-      expected = data.toString();
-      index++;
-
-      assertEquals(expected, actual);
-    }
-    reader.close();
-
-    assertEquals(NUMBER_OF_IDS*NUMBER_OF_ROWS_PER_ID,
-        index-START_ID*NUMBER_OF_ROWS_PER_ID);
-  }
-
-  @Test
-  public void testUncompressedSequence() throws Exception {
-    FileUtils.delete(outdir);
-
-    Configuration conf = new Configuration();
-    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
-    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
-    conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
-    conf.set(FileOutputFormat.OUTDIR, outdir);
-    JobUtils.runJob(conf);
-
-    Path filepath = new Path(outdir,
-        OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION);
-    SequenceFile.Reader filereader = new SequenceFile.Reader(conf,
-        SequenceFile.Reader.file(filepath));
-    verifyOutputSequence(filereader);
-  }
-
-  @Test
-  public void testCompressedSequence() throws Exception {
-    FileUtils.delete(outdir);
-
-    Configuration conf = new Configuration();
-    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
-    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
-    conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
-    conf.set(FileOutputFormat.OUTDIR, outdir);
-    conf.setBoolean(FileOutputFormat.COMPRESS, true);
-    JobUtils.runJob(conf);
-
-    Path filepath = new Path(outdir,
-        OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION);
-    SequenceFile.Reader filereader = new SequenceFile.Reader(conf,
-        SequenceFile.Reader.file(filepath));
-    verifyOutputSequence(filereader);
-  }
-
-  private void verifyOutputSequence(SequenceFile.Reader reader) throws IOException {
-    int index = START_ID*NUMBER_OF_ROWS_PER_ID;
-    Text actual = new Text();
-    Text expected = new Text();
-    Data data = new Data();
-    while (reader.next(actual)){
-      data.setContent(new Object[] {
-          new Integer(index), new Double(index), String.valueOf(index) },
-          Data.ARRAY_RECORD);
-      expected.set(data.toString());
-      index++;
-
-      assertEquals(expected.toString(), actual.toString());
-    }
-    reader.close();
-
-    assertEquals(NUMBER_OF_IDS*NUMBER_OF_ROWS_PER_ID,
-        index-START_ID*NUMBER_OF_ROWS_PER_ID);
-  }
-
-  public static class DummyPartition extends Partition {
-    private int id;
-
-    public void setId(int id) {
-      this.id = id;
-    }
-
-    public int getId() {
-      return id;
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      id = in.readInt();
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-      out.writeInt(id);
-    }
-  }
-
-  public static class DummyPartitioner extends Partitioner {
-    @Override
-    public List<Partition> initialize(Context context) {
-      List<Partition> partitions = new LinkedList<Partition>();
-      for (int id = START_ID; id <= NUMBER_OF_IDS; id++) {
-        DummyPartition partition = new DummyPartition();
-        partition.setId(id);
-        partitions.add(partition);
-      }
-      return partitions;
-    }
-  }
-
-  public static class DummyExtractor extends Extractor {
-    @Override
-    public void initialize(Context context, Partition partition, DataWriter writer) {
-      int id = ((DummyPartition)partition).getId();
-      for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) {
-        Object[] array = new Object[] {
-          new Integer(id*NUMBER_OF_ROWS_PER_ID+row),
-          new Double(id*NUMBER_OF_ROWS_PER_ID+row),
-          String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row)
-        };
-        writer.writeArrayRecord(array);
-      }
-    }
-  }
-  */
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/test/java/org/apache/sqoop/job/TestJobEngine.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/job/TestJobEngine.java b/core/src/test/java/org/apache/sqoop/job/TestJobEngine.java
deleted file mode 100644
index 51dddb4..0000000
--- a/core/src/test/java/org/apache/sqoop/job/TestJobEngine.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job;
-
-import java.io.BufferedReader;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Locale;
-import java.util.ResourceBundle;
-
-import junit.framework.TestCase;
-
-import org.apache.sqoop.connector.spi.SqoopConnector;
-import org.apache.sqoop.job.etl.Exporter;
-import org.apache.sqoop.job.etl.Extractor;
-import org.apache.sqoop.job.etl.Importer;
-import org.apache.sqoop.job.etl.Initializer;
-import org.apache.sqoop.job.etl.Partition;
-import org.apache.sqoop.job.etl.Partitioner;
-import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.io.DataWriter;
-import org.apache.sqoop.model.MConnectionForms;
-import org.apache.sqoop.model.MJob.Type;
-import org.apache.sqoop.model.MJobForms;
-import org.apache.sqoop.validation.Validator;
-import org.junit.Test;
-
-public class TestJobEngine extends TestCase {
-
-  private static final String DATA_DIR = TestJobEngine.class.getSimpleName();
-  private static final String WAREHOUSE_ROOT = "/tmp/sqoop/warehouse/";
-
-  private static final String OUTPUT_DIR = WAREHOUSE_ROOT + DATA_DIR;
-  private static final String OUTPUT_FILE = "part-r-00000";
-  private static final int START_PARTITION = 1;
-  private static final int NUMBER_OF_PARTITIONS = 9;
-  private static final int NUMBER_OF_ROWS_PER_PARTITION = 10;
-
-  public void testVoid() { }
-/*
-  @Test
-  public void testImport() throws Exception {
-    FileUtils.delete(OUTPUT_DIR);
-
-    DummyConnector connector = new DummyConnector();
-    EtlOptions options = new EtlOptions(connector);
-
-    JobEngine engine = new JobEngine();
-    engine.initialize(options);
-
-    String fileName = OUTPUT_DIR + "/" + OUTPUT_FILE;
-    InputStream filestream = FileUtils.open(fileName);
-    BufferedReader filereader = new BufferedReader(new InputStreamReader(
-        filestream, Data.CHARSET_NAME));
-    verifyOutput(filereader);
-  }
-
-  private void verifyOutput(BufferedReader reader)
-      throws IOException {
-    String line = null;
-    int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
-    Data expected = new Data();
-    while ((line = reader.readLine()) != null){
-      expected.setContent(new Object[] {
-          new Integer(index),
-          new Double(index),
-          String.valueOf(index) },
-          Data.ARRAY_RECORD);
-      index++;
-
-      assertEquals(expected.toString(), line);
-    }
-    reader.close();
-
-    assertEquals(NUMBER_OF_PARTITIONS*NUMBER_OF_ROWS_PER_PARTITION,
-        index-START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION);
-  }
-
-  public class DummyConnector implements SqoopConnector {
-
-    @Override
-    public Importer getImporter() {
-      return new Importer(
-          DummyImportInitializer.class,
-          DummyImportPartitioner.class,
-          DummyImportExtractor.class,
-          null);
-    }
-
-    @Override
-    public Exporter getExporter() {
-      fail("This method should not be invoked.");
-      return null;
-    }
-
-    @Override
-    public ResourceBundle getBundle(Locale locale) {
-      fail("This method should not be invoked.");
-      return null;
-    }
-
-    @Override
-    public Validator getValidator() {
-      fail("This method should not be invoked.");
-      return null;
-    }
-
-    @Override
-    public Class getConnectionConfigurationClass() {
-      fail("This method should not be invoked.");
-      return null;
-    }
-
-    @Override
-    public Class getJobConfigurationClass(Type jobType) {
-      fail("This method should not be invoked.");
-      return null;
-    }
-  }
-
-  public static class DummyImportInitializer extends Initializer {
-    @Override
-    public void initialize(MutableContext context, Options options) {
-      context.setString(Constants.JOB_ETL_OUTPUT_DIRECTORY, OUTPUT_DIR);
-    }
-  }
-
-  public static class DummyImportPartition extends Partition {
-    private int id;
-
-    public void setId(int id) {
-      this.id = id;
-    }
-
-    public int getId() {
-      return id;
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      id = in.readInt();
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-      out.writeInt(id);
-    }
-  }
-
-  public static class DummyImportPartitioner extends Partitioner {
-    @Override
-    public List<Partition> initialize(Context context) {
-      List<Partition> partitions = new LinkedList<Partition>();
-      for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
-        DummyImportPartition partition = new DummyImportPartition();
-        partition.setId(id);
-        partitions.add(partition);
-      }
-      return partitions;
-    }
-  }
-
-  public static class DummyImportExtractor extends Extractor {
-    @Override
-    public void initialize(Context context, Partition partition, DataWriter writer) {
-      int id = ((DummyImportPartition)partition).getId();
-      for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
-        writer.writeArrayRecord(new Object[] {
-            new Integer(id*NUMBER_OF_ROWS_PER_PARTITION+row),
-            new Double(id*NUMBER_OF_ROWS_PER_PARTITION+row),
-            String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)});
-      }
-    }
-  }
-*/
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java
deleted file mode 100644
index 94ab560..0000000
--- a/core/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.sqoop.job.etl.Extractor;
-import org.apache.sqoop.job.etl.Loader;
-import org.apache.sqoop.job.etl.Partition;
-import org.apache.sqoop.job.etl.Partitioner;
-import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.io.DataReader;
-import org.apache.sqoop.job.io.DataWriter;
-import org.apache.sqoop.job.mr.SqoopInputFormat;
-import org.apache.sqoop.job.mr.SqoopMapper;
-import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
-import org.apache.sqoop.job.mr.SqoopSplit;
-import org.junit.Test;
-
-public class TestMapReduce extends TestCase {
-
-  private static final int START_PARTITION = 1;
-  private static final int NUMBER_OF_PARTITIONS = 9;
-  private static final int NUMBER_OF_ROWS_PER_PARTITION = 10;
-
-  public void testVoid() {}
-
-  /*
-  @Test
-  public void testInputFormat() throws Exception {
-    Configuration conf = new Configuration();
-    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
-    Job job = Job.getInstance(conf);
-
-    SqoopInputFormat inputformat = new SqoopInputFormat();
-    List<InputSplit> splits = inputformat.getSplits(job);
-    assertEquals(9, splits.size());
-
-    for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
-      SqoopSplit split = (SqoopSplit)splits.get(id-1);
-      DummyPartition partition = (DummyPartition)split.getPartition();
-      assertEquals(id, partition.getId());
-    }
-  }
-
-  @Test
-  public void testMapper() throws Exception {
-    Configuration conf = new Configuration();
-    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
-    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
-
-    JobUtils.runJob(conf, SqoopInputFormat.class, SqoopMapper.class,
-        DummyOutputFormat.class);
-  }
-
-  @Test
-  public void testOutputFormat() throws Exception {
-    Configuration conf = new Configuration();
-    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
-    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
-    conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
-
-    JobUtils.runJob(conf, SqoopInputFormat.class, SqoopMapper.class,
-        SqoopNullOutputFormat.class);
-  }
-
-  public static class DummyPartition extends Partition {
-    private int id;
-
-    public void setId(int id) {
-      this.id = id;
-    }
-
-    public int getId() {
-      return id;
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      id = in.readInt();
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-      out.writeInt(id);
-    }
-  }
-
-  public static class DummyPartitioner extends Partitioner {
-    @Override
-    public List<Partition> initialize(Context context) {
-      List<Partition> partitions = new LinkedList<Partition>();
-      for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
-        DummyPartition partition = new DummyPartition();
-        partition.setId(id);
-        partitions.add(partition);
-      }
-      return partitions;
-    }
-  }
-
-  public static class DummyExtractor extends Extractor {
-    @Override
-    public void initialize(Context context, Partition partition, DataWriter writer) {
-      int id = ((DummyPartition)partition).getId();
-      for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
-        writer.writeArrayRecord(new Object[] {
-            new Integer(id*NUMBER_OF_ROWS_PER_PARTITION+row),
-            new Double(id*NUMBER_OF_ROWS_PER_PARTITION+row),
-            String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)});
-      }
-    }
-  }
-
-  public static class DummyOutputFormat
-      extends OutputFormat<Data, NullWritable> {
-    @Override
-    public void checkOutputSpecs(JobContext context) {
-      // do nothing
-    }
-
-    @Override
-    public RecordWriter<Data, NullWritable> getRecordWriter(
-        TaskAttemptContext context) {
-      return new DummyRecordWriter();
-    }
-
-    @Override
-    public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
-      return new DummyOutputCommitter();
-    }
-
-    public static class DummyRecordWriter
-        extends RecordWriter<Data, NullWritable> {
-      private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
-      private Data data = new Data();
-
-      @Override
-      public void write(Data key, NullWritable value) {
-        data.setContent(new Object[] {
-          new Integer(index),
-          new Double(index),
-          String.valueOf(index)},
-          Data.ARRAY_RECORD);
-        index++;
-
-        assertEquals(data.toString(), key.toString());
-      }
-
-      @Override
-      public void close(TaskAttemptContext context) {
-        // do nothing
-      }
-    }
-
-    public static class DummyOutputCommitter extends OutputCommitter {
-      @Override
-      public void setupJob(JobContext jobContext) { }
-
-      @Override
-      public void setupTask(TaskAttemptContext taskContext) { }
-
-      @Override
-      public void commitTask(TaskAttemptContext taskContext) { }
-
-      @Override
-      public void abortTask(TaskAttemptContext taskContext) { }
-
-      @Override
-      public boolean needsTaskCommit(TaskAttemptContext taskContext) {
-        return false;
-      }
-    }
-  }
-
-  public static class DummyLoader extends Loader {
-    private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
-    private Data expected = new Data();
-    private Data actual = new Data();
-
-    @Override
-    public void initialize(Context context, DataReader reader) {
-      Object[] array;
-      while ((array = reader.readArrayRecord()) != null) {
-        actual.setContent(array, Data.ARRAY_RECORD);
-
-        expected.setContent(new Object[] {
-          new Integer(index),
-          new Double(index),
-          String.valueOf(index)},
-          Data.ARRAY_RECORD);
-        index++;
-
-        assertEquals(expected.toString(), actual.toString());
-      };
-    }
-  }
-  */
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/dist/src/main/server/conf/sqoop.properties
----------------------------------------------------------------------
diff --git a/dist/src/main/server/conf/sqoop.properties b/dist/src/main/server/conf/sqoop.properties
index d429c3a..5131aad 100755
--- a/dist/src/main/server/conf/sqoop.properties
+++ b/dist/src/main/server/conf/sqoop.properties
@@ -108,3 +108,8 @@ org.apache.sqoop.submission.engine=org.apache.sqoop.submission.mapreduce.Mapredu
 
 # Hadoop configuration directory
 org.apache.sqoop.submission.engine.mapreduce.configuration.directory=/etc/hadoop/conf/
+
+#
+# Execution engine configuration
+#
+org.apache.sqoop.execution.engine=org.apache.sqoop.execution.mapreduce.MapreduceExecutionEngine

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/execution/mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/execution/mapreduce/pom.xml b/execution/mapreduce/pom.xml
new file mode 100644
index 0000000..e529f55
--- /dev/null
+++ b/execution/mapreduce/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.sqoop</groupId>
+    <artifactId>execution</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.sqoop.execution</groupId>
+  <artifactId>sqoop-execution-mapreduce</artifactId>
+  <version>2.0.0-SNAPSHOT</version>
+  <name>Sqoop Mapreduce Execution Engine</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>sqoop-core</artifactId>
+      <version>2.0.0-SNAPSHOT</version>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>sqoop-core</artifactId>
+      <version>2.0.0-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java
new file mode 100644
index 0000000..3f37222
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.execution.mapreduce;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.framework.SubmissionRequest;
+import org.apache.sqoop.model.MSubmission;
+
+/**
+ * Map-reduce specific submission request containing all extra information
+ * needed for bootstrapping map-reduce job.
+ */
+public class MRSubmissionRequest extends SubmissionRequest {
+
+  /**
+   * Map-reduce specific options.
+   */
+  Class<? extends InputFormat> inputFormatClass;
+  Class<? extends Mapper> mapperClass;
+  Class<? extends Writable> mapOutputKeyClass;
+  Class<? extends Writable> mapOutputValueClass;
+  Class<? extends OutputFormat> outputFormatClass;
+  Class<? extends Writable> outputKeyClass;
+  Class<? extends Writable> outputValueClass;
+
+  public MRSubmissionRequest(MSubmission submission,
+                             SqoopConnector connector,
+                             Object configConnectorConnection,
+                             Object configConnectorJob,
+                             Object configFrameworkConnection,
+                             Object configFrameworkJob) {
+    super(submission, connector, configConnectorConnection, configConnectorJob,
+      configFrameworkConnection, configFrameworkJob);
+  }
+
+  public Class<? extends InputFormat> getInputFormatClass() {
+    return inputFormatClass;
+  }
+
+  public void setInputFormatClass(Class<? extends InputFormat> inputFormatClass) {
+    this.inputFormatClass = inputFormatClass;
+  }
+
+  public Class<? extends Mapper> getMapperClass() {
+    return mapperClass;
+  }
+
+  public void setMapperClass(Class<? extends Mapper> mapperClass) {
+    this.mapperClass = mapperClass;
+  }
+
+  public Class<? extends Writable> getMapOutputKeyClass() {
+    return mapOutputKeyClass;
+  }
+
+  public void setMapOutputKeyClass(Class<? extends Writable> mapOutputKeyClass) {
+    this.mapOutputKeyClass = mapOutputKeyClass;
+  }
+
+  public Class<? extends Writable> getMapOutputValueClass() {
+    return mapOutputValueClass;
+  }
+
+  public void setMapOutputValueClass(Class<? extends Writable> mapOutputValueClass) {
+    this.mapOutputValueClass = mapOutputValueClass;
+  }
+
+  public Class<? extends OutputFormat> getOutputFormatClass() {
+    return outputFormatClass;
+  }
+
+  public void setOutputFormatClass(Class<? extends OutputFormat> outputFormatClass) {
+    this.outputFormatClass = outputFormatClass;
+  }
+
+  public Class<? extends Writable> getOutputKeyClass() {
+    return outputKeyClass;
+  }
+
+  public void setOutputKeyClass(Class<? extends Writable> outputKeyClass) {
+    this.outputKeyClass = outputKeyClass;
+  }
+
+  public Class<? extends Writable> getOutputValueClass() {
+    return outputValueClass;
+  }
+
+  public void setOutputValueClass(Class<? extends Writable> outputValueClass) {
+    this.outputValueClass = outputValueClass;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
new file mode 100644
index 0000000..77ca59b
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.execution.mapreduce;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.framework.ExecutionEngine;
+import org.apache.sqoop.framework.SubmissionRequest;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.etl.HdfsTextImportLoader;
+import org.apache.sqoop.job.etl.Importer;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
+import org.apache.sqoop.job.mr.SqoopInputFormat;
+import org.apache.sqoop.job.mr.SqoopMapper;
+import org.apache.sqoop.model.MSubmission;
+
+/**
+ *
+ */
+public class MapreduceExecutionEngine extends ExecutionEngine {
+
+  @Override
+  public SubmissionRequest createSubmissionRequest(MSubmission summary,
+                                                   SqoopConnector connector,
+                                                   Object connectorConnection,
+                                                   Object connectorJob,
+                                                   Object frameworkConnection,
+                                                   Object frameworkJob) {
+    return new MRSubmissionRequest(summary, connector, connectorConnection,
+      connectorJob, frameworkConnection, frameworkJob);
+  }
+
+  @Override
+  public void prepareImportSubmission(SubmissionRequest gRequest) {
+    MRSubmissionRequest request = (MRSubmissionRequest) gRequest;
+
+    // Configure map-reduce classes for import
+    request.setInputFormatClass(SqoopInputFormat.class);
+
+    request.setMapperClass(SqoopMapper.class);
+    request.setMapOutputKeyClass(Data.class);
+    request.setMapOutputValueClass(NullWritable.class);
+
+    request.setOutputFormatClass(SqoopFileOutputFormat.class);
+    request.setOutputKeyClass(Data.class);
+    request.setOutputValueClass(NullWritable.class);
+
+    Importer importer = (Importer)request.getConnectorCallbacks();
+
+    // Set up framework context
+    MutableMapContext context = request.getFrameworkContext();
+    context.setString(JobConstants.JOB_ETL_PARTITIONER, importer.getPartitioner().getName());
+    context.setString(JobConstants.JOB_ETL_EXTRACTOR, importer.getExtractor().getName());
+    context.setString(JobConstants.JOB_ETL_DESTROYER, importer.getDestroyer().getName());
+    context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
new file mode 100644
index 0000000..19ac91e
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import org.apache.sqoop.core.ConfigurationConstants;
+
+public final class JobConstants extends Constants {
+  /**
+   * All job related configuration is prefixed with this:
+   * <tt>org.apache.sqoop.job.</tt>
+   */
+  public static final String PREFIX_JOB_CONFIG =
+      ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "job.";
+
+
+  public static final String JOB_ETL_PARTITIONER = PREFIX_JOB_CONFIG
+      + "etl.partitioner";
+
+  public static final String JOB_ETL_EXTRACTOR = PREFIX_JOB_CONFIG
+      + "etl.extractor";
+
+  public static final String JOB_ETL_LOADER = PREFIX_JOB_CONFIG
+      + "etl.loader";
+
+  public static final String JOB_ETL_DESTROYER = PREFIX_JOB_CONFIG
+      + "etl.destroyer";
+
+
+  public static final String JOB_MR_OUTPUT_FILE = PREFIX_JOB_CONFIG
+      + "mr.output.file";
+
+  public static final String JOB_MR_OUTPUT_CODEC = PREFIX_JOB_CONFIG
+      + "mr.output.codec";
+
+
+  public static final String JOB_CONFIG_CLASS_CONNECTOR_CONNECTION =
+    PREFIX_JOB_CONFIG + "config.class.connector.connection";
+
+  public static final String JOB_CONFIG_CLASS_CONNECTOR_JOB =
+    PREFIX_JOB_CONFIG + "config.class.connector.job";
+
+  public static final String JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION =
+    PREFIX_JOB_CONFIG + "config.class.framework.connection";
+
+  public static final String JOB_CONFIG_CLASS_FRAMEWORK_JOB =
+    PREFIX_JOB_CONFIG + "config.class.framework.job";
+
+  public static final String JOB_CONFIG_CONNECTOR_CONNECTION =
+    PREFIX_JOB_CONFIG + "config.connector.connection";
+
+  public static final String JOB_CONFIG_CONNECTOR_JOB =
+    PREFIX_JOB_CONFIG + "config.connector.job";
+
+  public static final String JOB_CONFIG_FRAMEWORK_CONNECTION =
+    PREFIX_JOB_CONFIG + "config.framework.connection";
+
+  public static final String JOB_CONFIG_FRAMEWORK_JOB =
+    PREFIX_JOB_CONFIG + "config.framework.job";
+
+  public static final String PREFIX_CONNECTOR_CONTEXT =
+    PREFIX_JOB_CONFIG + "connector.context.";
+
+
+  private JobConstants() {
+    // Disable explicit object creation
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java
new file mode 100644
index 0000000..5488b46
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.common.ImmutableContext;
+
+/**
+ * Implementation of immutable context that is based on Hadoop configuration
+ * object. Each context property is prefixed with special prefix and loaded
+ * directly.
+ */
+public class PrefixContext implements ImmutableContext {
+
+  Configuration configuration;
+  String prefix;
+
+  public PrefixContext(Configuration configuration, String prefix) {
+    this.configuration = configuration;
+    this.prefix = prefix;
+  }
+
+  @Override
+  public String getString(String key) {
+    return configuration.get(prefix + key);
+  }
+
+  @Override
+  public String getString(String key, String defaultValue) {
+    return configuration.get(prefix + key, defaultValue);
+  }
+
+  @Override
+  public long getLong(String key, long defaultValue) {
+    return configuration.getLong(prefix + key, defaultValue);
+  }
+
+  @Override
+  public int getInt(String key, int defaultValue) {
+    return  configuration.getInt(prefix + key, defaultValue);
+  }
+
+  @Override
+  public boolean getBoolean(String key, boolean defaultValue) {
+    return configuration.getBoolean(prefix + key, defaultValue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
new file mode 100644
index 0000000..1235d1d
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataReader;
+import org.apache.sqoop.utils.ClassUtils;
+
+public class HdfsSequenceImportLoader extends Loader {
+
+  public static final String EXTENSION = ".seq";
+
+  private final char fieldDelimiter;
+
+  public HdfsSequenceImportLoader() {
+    fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
+  }
+
+  @Override
+  public void run(ImmutableContext context, DataReader reader) {
+    reader.setFieldDelimiter(fieldDelimiter);
+
+    Configuration conf = new Configuration();
+//    Configuration conf = ((EtlContext)context).getConfiguration();
+    String filename =
+        context.getString(JobConstants.JOB_MR_OUTPUT_FILE);
+    String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC);
+
+    CompressionCodec codec = null;
+    if (codecname != null) {
+      Class<?> clz = ClassUtils.loadClass(codecname);
+      if (clz == null) {
+        throw new SqoopException(CoreError.CORE_0009, codecname);
+      }
+
+      try {
+        codec = (CompressionCodec) clz.newInstance();
+        if (codec instanceof Configurable) {
+          ((Configurable) codec).setConf(conf);
+        }
+      } catch (Exception e) {
+        throw new SqoopException(CoreError.CORE_0010, codecname, e);
+      }
+    }
+
+    filename += EXTENSION;
+
+    try {
+      Path filepath = new Path(filename);
+      SequenceFile.Writer filewriter;
+      if (codec != null) {
+        filewriter = SequenceFile.createWriter(conf,
+            SequenceFile.Writer.file(filepath),
+            SequenceFile.Writer.keyClass(Text.class),
+            SequenceFile.Writer.valueClass(NullWritable.class),
+            SequenceFile.Writer.compression(CompressionType.BLOCK, codec));
+      } else {
+        filewriter = SequenceFile.createWriter(conf,
+          SequenceFile.Writer.file(filepath),
+          SequenceFile.Writer.keyClass(Text.class),
+          SequenceFile.Writer.valueClass(NullWritable.class),
+          SequenceFile.Writer.compression(CompressionType.NONE));
+      }
+
+      String csv;
+      Text text = new Text();
+      while ((csv = reader.readCsvRecord()) != null) {
+        text.set(csv);
+        filewriter.append(text, NullWritable.get());
+      }
+      filewriter.close();
+
+    } catch (IOException e) {
+      throw new SqoopException(CoreError.CORE_0018, e);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
new file mode 100644
index 0000000..36aa11f
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataReader;
+import org.apache.sqoop.utils.ClassUtils;
+
+public class HdfsTextImportLoader extends Loader {
+
+  private final char fieldDelimiter;
+  private final char recordDelimiter;
+
+  public HdfsTextImportLoader() {
+    fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
+    recordDelimiter = Data.DEFAULT_RECORD_DELIMITER;
+  }
+
+  @Override
+  public void run(ImmutableContext context, DataReader reader) {
+    reader.setFieldDelimiter(fieldDelimiter);
+
+    Configuration conf = new Configuration();
+//    Configuration conf = ((EtlContext)context).getConfiguration();
+    String filename = context.getString(JobConstants.JOB_MR_OUTPUT_FILE);
+    String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC);
+
+    CompressionCodec codec = null;
+    if (codecname != null) {
+      Class<?> clz = ClassUtils.loadClass(codecname);
+      if (clz == null) {
+        throw new SqoopException(CoreError.CORE_0009, codecname);
+      }
+
+      try {
+        codec = (CompressionCodec) clz.newInstance();
+        if (codec instanceof Configurable) {
+          ((Configurable) codec).setConf(conf);
+        }
+      } catch (Exception e) {
+        throw new SqoopException(CoreError.CORE_0010, codecname, e);
+      }
+
+      filename += codec.getDefaultExtension();
+    }
+
+    try {
+      Path filepath = new Path(filename);
+      FileSystem fs = filepath.getFileSystem(conf);
+
+      BufferedWriter filewriter;
+      DataOutputStream filestream = fs.create(filepath, false);
+      if (codec != null) {
+        filewriter = new BufferedWriter(new OutputStreamWriter(
+            codec.createOutputStream(filestream, codec.createCompressor()),
+            Data.CHARSET_NAME));
+      } else {
+        filewriter = new BufferedWriter(new OutputStreamWriter(
+            filestream, Data.CHARSET_NAME));
+      }
+
+      String csv;
+      while ((csv = reader.readCsvRecord()) != null) {
+        filewriter.write(csv + recordDelimiter);
+      }
+      filewriter.close();
+
+    } catch (IOException e) {
+      throw new SqoopException(CoreError.CORE_0018, e);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
new file mode 100644
index 0000000..4ddd132
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.regex.Matcher;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+
+public class Data implements WritableComparable<Data> {
+
+  // The content is an Object to accommodate different kinds of data.
+  // For example, it can be:
+  // - Object[] for an array of object record
+  // - String for a text of CSV record
+  private Object content = null;
+
+  public static final int EMPTY_DATA = 0;
+  public static final int CSV_RECORD = 1;
+  public static final int ARRAY_RECORD = 2;
+  private int type = EMPTY_DATA;
+
+  public static final String CHARSET_NAME = "UTF-8";
+
+  public static final char DEFAULT_RECORD_DELIMITER = '\n';
+  public static final char DEFAULT_FIELD_DELIMITER = ',';
+  public static final char DEFAULT_STRING_DELIMITER = '\'';
+  public static final char DEFAULT_STRING_ESCAPE = '\\';
+  private char fieldDelimiter = DEFAULT_FIELD_DELIMITER;
+  private char stringDelimiter = DEFAULT_STRING_DELIMITER;
+  private char stringEscape = DEFAULT_STRING_ESCAPE;
+  private String escapedStringDelimiter = String.valueOf(new char[] {
+      stringEscape, stringDelimiter
+  });
+
+  public void setFieldDelimiter(char fieldDelimiter) {
+    this.fieldDelimiter = fieldDelimiter;
+  }
+
+  public void setContent(Object content, int type) {
+    switch (type) {
+    case EMPTY_DATA:
+    case CSV_RECORD:
+    case ARRAY_RECORD:
+      this.type = type;
+      this.content = content;
+      break;
+    default:
+      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
+    }
+  }
+
+  public Object getContent(int targetType) {
+    switch (targetType) {
+    case CSV_RECORD:
+      return format();
+    case ARRAY_RECORD:
+      return parse();
+    default:
+      throw new SqoopException(CoreError.CORE_0012, String.valueOf(targetType));
+    }
+  }
+
+  public int getType() {
+    return type;
+  }
+
+  public boolean isEmpty() {
+    return (type == EMPTY_DATA);
+  }
+
+  @Override
+  public String toString() {
+    return (String)getContent(CSV_RECORD);
+  }
+
+  @Override
+  public int compareTo(Data other) {
+    byte[] myBytes = toString().getBytes(Charset.forName(CHARSET_NAME));
+    byte[] otherBytes = other.toString().getBytes(
+        Charset.forName(CHARSET_NAME));
+    return WritableComparator.compareBytes(
+        myBytes, 0, myBytes.length, otherBytes, 0, otherBytes.length);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof Data)) {
+      return false;
+    }
+
+    Data data = (Data)other;
+    if (type != data.getType()) {
+      return false;
+    }
+
+    return toString().equals(data.toString());
+  }
+
+  @Override
+  public int hashCode() {
+    int result = super.hashCode();
+    switch (type) {
+    case CSV_RECORD:
+      result += 31 * content.hashCode();
+      return result;
+    case ARRAY_RECORD:
+      Object[] array = (Object[])content;
+      for (int i = 0; i < array.length; i++) {
+        result += 31 * array[i].hashCode();
+      }
+      return result;
+    default:
+      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    type = readType(in);
+    switch (type) {
+    case CSV_RECORD:
+      readCsv(in);
+      break;
+    case ARRAY_RECORD:
+      readArray(in);
+      break;
+    default:
+      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    writeType(out, type);
+    switch (type) {
+    case CSV_RECORD:
+      writeCsv(out);
+      break;
+    case ARRAY_RECORD:
+      writeArray(out);
+      break;
+    default:
+      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
+    }
+  }
+
+  private int readType(DataInput in) throws IOException {
+    return WritableUtils.readVInt(in);
+  }
+
+  private void writeType(DataOutput out, int type) throws IOException {
+    WritableUtils.writeVInt(out, type);
+  }
+
+  private void readCsv(DataInput in) throws IOException {
+    content = in.readUTF();
+  }
+
+  private void writeCsv(DataOutput out) throws IOException {
+    out.writeUTF((String)content);
+  }
+
+  private void readArray(DataInput in) throws IOException {
+    // read number of columns
+    int columns = in.readInt();
+    content = new Object[columns];
+    Object[] array = (Object[])content;
+    // read each column
+    for (int i = 0; i < array.length; i++) {
+      int type = readType(in);
+      switch (type) {
+      case FieldTypes.UTF:
+        array[i] = in.readUTF();
+        break;
+
+      case FieldTypes.BIN:
+        int length = in.readInt();
+        byte[] bytes = new byte[length];
+        in.readFully(bytes);
+        array[i] = bytes;
+        break;
+
+      case FieldTypes.DOUBLE:
+        array[i] = in.readDouble();
+        break;
+
+      case FieldTypes.FLOAT:
+        array[i] = in.readFloat();
+        break;
+
+      case FieldTypes.LONG:
+        array[i] = in.readLong();
+        break;
+
+      case FieldTypes.INT:
+        array[i] = in.readInt();
+        break;
+
+      case FieldTypes.SHORT:
+        array[i] = in.readShort();
+        break;
+
+      case FieldTypes.CHAR:
+        array[i] = in.readChar();
+        break;
+
+      case FieldTypes.BYTE:
+        array[i] = in.readByte();
+        break;
+
+      case FieldTypes.BOOLEAN:
+        array[i] = in.readBoolean();
+        break;
+
+      case FieldTypes.NULL:
+        array[i] = null;
+        break;
+
+      default:
+        throw new IOException(
+          new SqoopException(CoreError.CORE_0012, Integer.toString(type))
+        );
+      }
+    }
+  }
+
+  private void writeArray(DataOutput out) throws IOException {
+    Object[] array = (Object[])content;
+    // write number of columns
+    out.writeInt(array.length);
+    // write each column
+    for (int i = 0; i < array.length; i++) {
+      if (array[i] instanceof String) {
+        writeType(out, FieldTypes.UTF);
+        out.writeUTF((String)array[i]);
+
+      } else if (array[i] instanceof byte[]) {
+        writeType(out, FieldTypes.BIN);
+        out.writeInt(((byte[])array[i]).length);
+        out.write((byte[])array[i]);
+
+      } else if (array[i] instanceof Double) {
+        writeType(out, FieldTypes.DOUBLE);
+        out.writeDouble((Double)array[i]);
+
+      } else if (array[i] instanceof Float) {
+        writeType(out, FieldTypes.FLOAT);
+        out.writeFloat((Float)array[i]);
+
+      } else if (array[i] instanceof Long) {
+        writeType(out, FieldTypes.LONG);
+        out.writeLong((Long)array[i]);
+
+      } else if (array[i] instanceof Integer) {
+        writeType(out, FieldTypes.INT);
+        out.writeInt((Integer)array[i]);
+
+      } else if (array[i] instanceof Short) {
+        writeType(out, FieldTypes.SHORT);
+        out.writeShort((Short)array[i]);
+
+      } else if (array[i] instanceof Character) {
+        writeType(out, FieldTypes.CHAR);
+        out.writeChar((Character)array[i]);
+
+      } else if (array[i] instanceof Byte) {
+        writeType(out, FieldTypes.BYTE);
+        out.writeByte((Byte)array[i]);
+
+      } else if (array[i] instanceof Boolean) {
+        writeType(out, FieldTypes.BOOLEAN);
+        out.writeBoolean((Boolean)array[i]);
+
+      } else if (array[i] == null) {
+        writeType(out, FieldTypes.NULL);
+
+      } else {
+        throw new IOException(
+          new SqoopException(
+              CoreError.CORE_0012, array[i].getClass().getName()
+          )
+        );
+      }
+    }
+  }
+
+  private String format() {
+    switch (type) {
+    case EMPTY_DATA:
+      return null;
+
+    case CSV_RECORD:
+      if (fieldDelimiter == DEFAULT_FIELD_DELIMITER) {
+        return (String)content;
+      } else {
+        // TODO: need to exclude the case where comma is part of a string.
+        return ((String)content).replaceAll(
+            String.valueOf(DEFAULT_FIELD_DELIMITER),
+            String.valueOf(fieldDelimiter));
+      }
+
+    case ARRAY_RECORD:
+      StringBuilder sb = new StringBuilder();
+      Object[] array = (Object[])content;
+      for (int i = 0; i < array.length; i++) {
+        if (i != 0) {
+          sb.append(fieldDelimiter);
+        }
+
+        if (array[i] instanceof String) {
+          sb.append(stringDelimiter);
+          sb.append(escape((String)array[i]));
+          sb.append(stringDelimiter);
+        } else if (array[i] instanceof byte[]) {
+          sb.append(Arrays.toString((byte[])array[i]));
+        } else {
+          sb.append(String.valueOf(array[i]));
+        }
+      }
+      return sb.toString();
+
+    default:
+      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
+    }
+  }
+
+  private Object[] parse() {
+    switch (type) {
+    case EMPTY_DATA:
+      return null;
+
+    case CSV_RECORD:
+      ArrayList<Object> list = new ArrayList<Object>();
+      // todo: need to parse CSV into Array
+      return list.toArray();
+
+    case ARRAY_RECORD:
+      return (Object[])content;
+
+    default:
+      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
+    }
+  }
+
+  private String escape(String string) {
+    // TODO: Also need to escape those special characters as documented in:
+    // https://cwiki.apache.org/confluence/display/SQOOP/Sqoop2+Intermediate+representation#Sqoop2Intermediaterepresentation-Intermediateformatrepresentationproposal
+    String regex = String.valueOf(stringDelimiter);
+    String replacement = Matcher.quoteReplacement(escapedStringDelimiter);
+    return string.replaceAll(regex, replacement);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/FieldTypes.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/FieldTypes.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/FieldTypes.java
new file mode 100644
index 0000000..e96dc6e
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/FieldTypes.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.io;
+
+public final class FieldTypes {
+
+  public static final int NULL    = 0;
+
+  public static final int BOOLEAN = 1;
+
+  public static final int BYTE    = 10;
+  public static final int CHAR    = 11;
+
+  public static final int SHORT   = 20;
+  public static final int INT     = 21;
+  public static final int LONG    = 22;
+
+  public static final int FLOAT   = 50;
+  public static final int DOUBLE  = 51;
+
+  public static final int BIN     = 100;
+  public static final int UTF     = 101;
+
+  private FieldTypes() {
+    // Disable explicit object creation
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
new file mode 100644
index 0000000..59baaf6
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.model.FormUtils;
+import org.apache.sqoop.utils.ClassUtils;
+
+/**
+ * Helper class to load configuration specific objects from job configuration
+ */
+public final class ConfigurationUtils {
+
+  public static Object getConnectorConnection(Configuration configuration) {
+    return loadConfiguration(configuration,
+      JobConstants.JOB_CONFIG_CLASS_CONNECTOR_CONNECTION,
+      JobConstants.JOB_CONFIG_CONNECTOR_CONNECTION);
+  }
+
+  public static Object getConnectorJob(Configuration configuration) {
+    return loadConfiguration(configuration,
+      JobConstants.JOB_CONFIG_CLASS_CONNECTOR_JOB,
+      JobConstants.JOB_CONFIG_CONNECTOR_JOB);
+  }
+
+  public static Object getFrameworkConnection(Configuration configuration) {
+    return loadConfiguration(configuration,
+      JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION,
+      JobConstants.JOB_CONFIG_FRAMEWORK_CONNECTION);
+  }
+
+  public static Object getFrameworkJob(Configuration configuration) {
+    return loadConfiguration(configuration,
+      JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_JOB,
+      JobConstants.JOB_CONFIG_FRAMEWORK_JOB);
+  }
+
+  private static Object loadConfiguration(Configuration configuration,
+                                          String classProperty,
+                                          String valueProperty) {
+    Object object = ClassUtils.instantiate(configuration.get(classProperty));
+    FormUtils.fillValues(configuration.get(valueProperty), object);
+    return object;
+  }
+
+  private ConfigurationUtils() {
+    // Instantiation is prohibited
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
new file mode 100644
index 0000000..c465f10
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.job.mr;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.io.Data;
+
+/**
+ * An output format for MapReduce job.
+ */
+public class SqoopFileOutputFormat
+    extends FileOutputFormat<Data, NullWritable> {
+
+  public static final Log LOG =
+      LogFactory.getLog(SqoopFileOutputFormat.class.getName());
+
+  public static final Class<? extends CompressionCodec> DEFAULT_CODEC =
+      DefaultCodec.class;
+
+  @Override
+  public RecordWriter<Data, NullWritable> getRecordWriter(
+      TaskAttemptContext context) throws IOException {
+    Configuration conf = context.getConfiguration();
+
+    Path filepath = getDefaultWorkFile(context, "");
+    String filename = filepath.toString();
+    conf.set(JobConstants.JOB_MR_OUTPUT_FILE, filename);
+
+    boolean isCompressed = getCompressOutput(context);
+    if (isCompressed) {
+      String codecname =
+          conf.get(FileOutputFormat.COMPRESS_CODEC, DEFAULT_CODEC.getName());
+      conf.set(JobConstants.JOB_MR_OUTPUT_CODEC, codecname);
+    }
+
+    SqoopOutputFormatLoadExecutor executor =
+        new SqoopOutputFormatLoadExecutor(context);
+    return executor.getRecordWriter();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
new file mode 100644
index 0000000..8fcdc99
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.PrefixContext;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.utils.ClassUtils;
+
+/**
+ * An InputFormat for MapReduce job.
+ */
+public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> {
+
+  public static final Log LOG =
+      LogFactory.getLog(SqoopInputFormat.class.getName());
+
+  @Override
+  public RecordReader<SqoopSplit, NullWritable> createRecordReader(
+      InputSplit split, TaskAttemptContext context) {
+    return new SqoopRecordReader();
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context)
+      throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+
+    String partitionerName = conf.get(JobConstants.JOB_ETL_PARTITIONER);
+    Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName);
+
+    PrefixContext connectorContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
+    Object connectorConnection = ConfigurationUtils.getConnectorConnection(conf);
+    Object connectorJob = ConfigurationUtils.getConnectorJob(conf);
+
+    List<Partition> partitions = partitioner.getPartitions(connectorContext, connectorConnection, connectorJob);
+    List<InputSplit> splits = new LinkedList<InputSplit>();
+    for (Partition partition : partitions) {
+      LOG.debug("Partition: " + partition);
+      SqoopSplit split = new SqoopSplit();
+      split.setPartition(partition);
+      splits.add(split);
+    }
+
+    return splits;
+  }
+
+  public static class SqoopRecordReader
+      extends RecordReader<SqoopSplit, NullWritable> {
+
+    private boolean delivered = false;
+    private SqoopSplit split = null;
+
+    @Override
+    public boolean nextKeyValue() {
+      if (delivered) {
+        return false;
+      } else {
+        delivered = true;
+        return true;
+      }
+    }
+
+    @Override
+    public SqoopSplit getCurrentKey() {
+      return split;
+    }
+
+    @Override
+    public NullWritable getCurrentValue() {
+      return NullWritable.get();
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public float getProgress() {
+      return delivered ? 1.0f : 0.0f;
+    }
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context) {
+      this.split = (SqoopSplit)split;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
new file mode 100644
index 0000000..6892b4b
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.PrefixContext;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataWriter;
+import org.apache.sqoop.utils.ClassUtils;
+
+/**
+ * A mapper to perform map function.
+ */
+public class SqoopMapper
+    extends Mapper<SqoopSplit, NullWritable, Data, NullWritable> {
+
+  public static final Log LOG =
+      LogFactory.getLog(SqoopMapper.class.getName());
+
+  @Override
+  public void run(Context context) throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+
+    String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR);
+    Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName);
+
+    PrefixContext connectorContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
+    Object connectorConnection = ConfigurationUtils.getConnectorConnection(conf);
+    Object connectorJob = ConfigurationUtils.getConnectorJob(conf);
+
+    SqoopSplit split = context.getCurrentKey();
+
+    try {
+      extractor.run(connectorContext, connectorConnection, connectorJob, split.getPartition(),
+        new MapDataWriter(context));
+
+    } catch (Exception e) {
+      throw new SqoopException(CoreError.CORE_0017, e);
+    }
+  }
+
+  public class MapDataWriter extends DataWriter {
+    private Context context;
+    private Data data;
+
+    public MapDataWriter(Context context) {
+      this.context = context;
+    }
+
+    @Override
+    public void setFieldDelimiter(char fieldDelimiter) {
+      if (data == null) {
+        data = new Data();
+      }
+
+      data.setFieldDelimiter(fieldDelimiter);
+    }
+
+    @Override
+    public void writeArrayRecord(Object[] array) {
+      writeContent(array, Data.ARRAY_RECORD);
+    }
+
+    @Override
+    public void writeCsvRecord(String csv) {
+      writeContent(csv, Data.CSV_RECORD);
+    }
+
+    @Override
+    public void writeContent(Object content, int type) {
+      if (data == null) {
+        data = new Data();
+      }
+
+      data.setContent(content, type);
+      try {
+        context.write(data, NullWritable.get());
+      } catch (Exception e) {
+        throw new SqoopException(CoreError.CORE_0013, e);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
new file mode 100644
index 0000000..1242f90
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.job.mr;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.job.io.Data;
+
+/**
+ * An output format for MapReduce job.
+ */
+public class SqoopNullOutputFormat extends OutputFormat<Data, NullWritable> {
+
+  public static final Log LOG =
+      LogFactory.getLog(SqoopNullOutputFormat.class.getName());
+
+  @Override
+  public void checkOutputSpecs(JobContext context) {
+    // do nothing
+  }
+
+  @Override
+  public RecordWriter<Data, NullWritable> getRecordWriter(
+      TaskAttemptContext context) {
+    SqoopOutputFormatLoadExecutor executor =
+        new SqoopOutputFormatLoadExecutor(context);
+    return executor.getRecordWriter();
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
+    // return an output committer that does nothing
+    return new NullOutputCommitter();
+  }
+
+  class NullOutputCommitter extends OutputCommitter {
+    @Override
+    public void setupJob(JobContext jobContext) { }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskContext) { }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskContext) { }
+
+    @Override
+    public void abortTask(TaskAttemptContext taskContext) { }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+      return false;
+    }
+  }
+
+}


Mime
View raw message