sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject git commit: SQOOP-783: Sqoop2: Merge HdfsSequenceExportExtractor and HdfsTextExportExtractor to one Extractor
Date Sun, 17 Mar 2013 19:27:54 GMT
Updated Branches:
  refs/heads/sqoop2 3b8e8d15d -> 3d9aaa0d0


SQOOP-783: Sqoop2: Merge HdfsSequenceExportExtractor and HdfsTextExportExtractor to one Extractor

(Vasanth kumar RJ via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/3d9aaa0d
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/3d9aaa0d
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/3d9aaa0d

Branch: refs/heads/sqoop2
Commit: 3d9aaa0d0d8f559798f667749ebd406d9a20af91
Parents: 3b8e8d1
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Sun Mar 17 12:27:21 2013 -0700
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Sun Mar 17 12:27:21 2013 -0700

----------------------------------------------------------------------
 .../mapreduce/MapreduceExecutionEngine.java        |    6 +-
 .../apache/sqoop/job/etl/HdfsExportExtractor.java  |  203 +++++++++++++++
 .../sqoop/job/etl/HdfsSequenceExportExtractor.java |  101 -------
 .../sqoop/job/etl/HdfsTextExportExtractor.java     |  131 ----------
 .../java/org/apache/sqoop/job/TestHdfsExtract.java |   13 +-
 5 files changed, 212 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d9aaa0d/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index b201a8d..767080c 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -29,9 +29,9 @@ import org.apache.sqoop.framework.configuration.OutputFormat;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.MapreduceExecutionError;
 import org.apache.sqoop.job.etl.Exporter;
+import org.apache.sqoop.job.etl.HdfsExportExtractor;
 import org.apache.sqoop.job.etl.HdfsExportPartitioner;
 import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
-import org.apache.sqoop.job.etl.HdfsTextExportExtractor;
 import org.apache.sqoop.job.etl.HdfsTextImportLoader;
 import org.apache.sqoop.job.etl.Importer;
 import org.apache.sqoop.job.io.Data;
@@ -128,8 +128,8 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
     context.setString(JobConstants.JOB_ETL_LOADER, exporter.getLoader().getName());
     context.setString(JobConstants.JOB_ETL_DESTROYER, exporter.getDestroyer().getName());
 
-    // We should make one extractor that will be able to read all supported file types
-    context.setString(JobConstants.JOB_ETL_EXTRACTOR, HdfsTextExportExtractor.class.getName());
+    // Extractor that will be able to read all supported file types
+    context.setString(JobConstants.JOB_ETL_EXTRACTOR, HdfsExportExtractor.class.getName());
     context.setString(JobConstants.HADOOP_INPUTDIR, jobConf.input.inputDirectory);
 
     if(request.getExtractors() != null) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d9aaa0d/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
new file mode 100644
index 0000000..9281bb4
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.util.LineReader;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.etl.io.DataWriter;
+import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
+import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
+import org.apache.sqoop.job.MapreduceExecutionError;
+import org.apache.sqoop.job.PrefixContext;
+import org.apache.sqoop.job.io.Data;
+
+/**
+ * Extract from HDFS.
+ * Default field delimiter of a record is comma.
+ */
+public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration,
HdfsExportPartition> {
+
+  public static final Log LOG = LogFactory.getLog(HdfsExportExtractor.class.getName());
+
+  private Configuration conf;
+  private DataWriter dataWriter;
+  private long rowRead = 0;
+
+  private final char fieldDelimiter;
+
+  public HdfsExportExtractor() {
+    fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
+  }
+
+  @Override
+  public void extract(ExtractorContext context,
+      ConnectionConfiguration connectionConfiguration,
+      ExportJobConfiguration jobConfiguration, HdfsExportPartition partition) {
+
+    conf = ((PrefixContext) context.getContext()).getConfiguration();
+    dataWriter = context.getDataWriter();
+    dataWriter.setFieldDelimiter(fieldDelimiter);
+
+    try {
+      HdfsExportPartition p = partition;
+      LOG.info("Working on partition: " + p);
+      int numFiles = p.getNumberOfFiles();
+      for (int i = 0; i < numFiles; i++) {
+        extractFile(p.getFile(i), p.getOffset(i), p.getLength(i));
+      }
+    } catch (IOException e) {
+      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e);
+    }
+  }
+
+  private void extractFile(Path file, long start, long length)
+      throws IOException {
+    long end = start + length;
+    LOG.info("Extracting file " + file);
+    LOG.info("\t from offset " + start);
+    LOG.info("\t to offset " + end);
+    LOG.info("\t of length " + length);
+    if(isSequenceFile(file)) {
+      extractSequenceFile(file, start, length);
+    } else {
+      extractTextFile(file, start, length);
+    }
+  }
+
+  /**
+   * Extracts Sequence file
+   * @param file
+   * @param start
+   * @param length
+   * @throws IOException
+   */
+  private void extractSequenceFile(Path file, long start, long length)
+      throws IOException {
+    LOG.info("Extracting sequence file");
+    long end = start + length;
+    SequenceFile.Reader filereader = new SequenceFile.Reader(
+        file.getFileSystem(conf), file, conf);
+
+    if (start > filereader.getPosition()) {
+      filereader.sync(start); // sync to start
+    }
+
+    Text line = new Text();
+    boolean hasNext = filereader.next(line);
+    while (hasNext) {
+      rowRead++;
+      dataWriter.writeCsvRecord(line.toString());
+      line = new Text();
+      hasNext = filereader.next(line);
+      if (filereader.getPosition() >= end && filereader.syncSeen()) {
+        break;
+      }
+    }
+    filereader.close();
+  }
+
+  /**
+   * Extracts Text file
+   * @param file
+   * @param start
+   * @param length
+   * @throws IOException
+   */
+  private void extractTextFile(Path file, long start, long length)
+      throws IOException {
+    LOG.info("Extracting text file");
+    long end = start + length;
+    FileSystem fs = file.getFileSystem(conf);
+    FSDataInputStream filestream = fs.open(file);
+    CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file);
+    LineReader filereader;
+    Seekable fileseeker = filestream;
+
+    // Hadoop 1.0 does not have support for custom record delimiter and thus
+    // we
+    // are supporting only default one.
+    // We might add another "else if" case for SplittableCompressionCodec once
+    // we drop support for Hadoop 1.0.
+    if (codec == null) {
+      filestream.seek(start);
+      filereader = new LineReader(filestream);
+    } else {
+      filereader = new LineReader(codec.createInputStream(filestream,
+          codec.createDecompressor()), conf);
+      fileseeker = filestream;
+    }
+    if (start != 0) {
+      // always throw away first record because
+      // one extra line is read in previous split
+      start += filereader.readLine(new Text(), 0);
+    }
+    int size;
+    LOG.info("Start position: " + String.valueOf(start));
+    long next = start;
+    while (next <= end) {
+      Text line = new Text();
+      size = filereader.readLine(line, Integer.MAX_VALUE);
+      if (size == 0) {
+        break;
+      }
+      if (codec == null) {
+        next += size;
+      } else {
+        next = fileseeker.getPos();
+      }
+      rowRead++;
+      dataWriter.writeCsvRecord(line.toString());
+    }
+    LOG.info("Extracting ended on position: " + fileseeker.getPos());
+    filestream.close();
+  }
+
+  @Override
+  public long getRowsRead() {
+    return rowRead;
+  }
+
+  /**
+   * Returns true if given file is sequence
+   * @param file
+   * @return boolean
+   */
+  private boolean isSequenceFile(Path file) {
+    SequenceFile.Reader filereader = null;
+    try {
+      filereader = new SequenceFile.Reader(file.getFileSystem(conf), file, conf);
+      filereader.close();
+    } catch (IOException e) {
+      return false;
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d9aaa0d/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
deleted file mode 100644
index 2280828..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
-import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
-import org.apache.sqoop.job.MapreduceExecutionError;
-import org.apache.sqoop.job.PrefixContext;
-import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.etl.io.DataWriter;
-
-public class HdfsSequenceExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration,
HdfsExportPartition> {
-
-  public static final Log LOG =
-    LogFactory.getLog(HdfsSequenceExportExtractor.class.getName());
-
-  private Configuration conf;
-  private DataWriter dataWriter;
-
-  private final char fieldDelimiter;
-
-  public HdfsSequenceExportExtractor() {
-    fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
-  }
-
-  @Override
-  public void extract(ExtractorContext context, ConnectionConfiguration connectionConfiguration,
-      ExportJobConfiguration jobConfiguration, HdfsExportPartition partition) {
-
-    conf = ((PrefixContext)context.getContext()).getConfiguration();
-    dataWriter = context.getDataWriter();
-    dataWriter.setFieldDelimiter(fieldDelimiter);
-
-    try {
-      LOG.info("Working on partition: " + partition);
-      int numFiles = partition.getNumberOfFiles();
-      for (int i=0; i<numFiles; i++) {
-        extractFile(partition.getFile(i), partition.getOffset(i), partition.getLength(i));
-      }
-    } catch (IOException e) {
-      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e);
-    }
-  }
-
-  private void extractFile(Path file, long start, long length)
-      throws IOException {
-    long end = start + length;
-    LOG.info("Extracting file " + file);
-    LOG.info("\t from offset " + start);
-    LOG.info("\t to offset " + end);
-    LOG.info("\t of length " + length);
-
-    SequenceFile.Reader filereader = new SequenceFile.Reader(file.getFileSystem(conf), file,
conf);
-
-    if (start > filereader.getPosition()) {
-      filereader.sync(start); // sync to start
-    }
-
-    Text line = new Text();
-    boolean hasNext = filereader.next(line);
-    while (hasNext) {
-      dataWriter.writeCsvRecord(line.toString());
-      line = new Text();
-      hasNext = filereader.next(line);
-      if(filereader.getPosition() >= end && filereader.syncSeen()) {
-        break;
-      }
-    }
-  }
-
-  @Override
-  public long getRowsRead() {
-    // TODO need to return the rows read
-    return 0;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d9aaa0d/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
deleted file mode 100644
index ae419ff..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.util.LineReader;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
-import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
-import org.apache.sqoop.job.MapreduceExecutionError;
-import org.apache.sqoop.job.PrefixContext;
-import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.etl.io.DataWriter;
-
-public class HdfsTextExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration,
HdfsExportPartition>  {
-
-  public static final Log LOG =
-    LogFactory.getLog(HdfsTextExportExtractor.class.getName());
-
-  private Configuration conf;
-  private DataWriter dataWriter;
-
-  private final char fieldDelimiter;
-
-  public HdfsTextExportExtractor() {
-    fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
-  }
-
-  @Override
-  public void extract(ExtractorContext context, ConnectionConfiguration connectionConfiguration,
-      ExportJobConfiguration jobConfiguration, HdfsExportPartition partition) {
-
-    conf = ((PrefixContext)context.getContext()).getConfiguration();
-    dataWriter = context.getDataWriter();
-    dataWriter.setFieldDelimiter(fieldDelimiter);
-
-    try {
-      HdfsExportPartition p = partition;
-      LOG.info("Working on partition: " + p);
-      int numFiles = p.getNumberOfFiles();
-      for (int i=0; i<numFiles; i++) {
-        extractFile(p.getFile(i), p.getOffset(i), p.getLength(i));
-      }
-    } catch (IOException e) {
-      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e);
-    }
-  }
-
-  private void extractFile(Path file, long start, long length)
-      throws IOException {
-    long end = start + length;
-    LOG.info("Extracting file " + file);
-    LOG.info("\t from offset " + start);
-    LOG.info("\t to offset " + end);
-    LOG.info("\t of length " + length);
-
-    FileSystem fs = file.getFileSystem(conf);
-    FSDataInputStream filestream = fs.open(file);
-    CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file);
-    LineReader filereader;
-    Seekable fileseeker = filestream;
-
-    // Hadoop 1.0 does not have support for custom record delimiter and thus we
-    // are supporting only default one.
-    // We might add another "else if" case for SplittableCompressionCodec once
-    // we drop support for Hadoop 1.0.
-    if (codec == null) {
-      filestream.seek(start);
-      filereader = new LineReader(filestream);
-    } else {
-      filereader = new LineReader(
-          codec.createInputStream(filestream, codec.createDecompressor()), conf);
-      fileseeker = filestream;
-    }
-
-    if (start != 0) {
-      // always throw away first record because
-      // one extra line is read in previous split
-      start += filereader.readLine(new Text(), 0);
-    }
-    int size;
-    LOG.info("Start position: " + String.valueOf(start));
-    long next = start;
-    while (next <= end) {
-      Text line = new Text();
-      size = filereader.readLine(line, Integer.MAX_VALUE);
-      if (size == 0) {
-        break;
-      }
-      if (codec == null) {
-        next += size;
-      } else {
-        next = fileseeker.getPos();
-      }
-      dataWriter.writeCsvRecord(line.toString());
-    }
-    LOG.info("Extracting ended on position: " + fileseeker.getPos());
-  }
-
-  @Override
-  public long getRowsRead() {
-    // TODO need to return the rows read
-    return 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d9aaa0d/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
index fae6573..62f3a03 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
@@ -33,10 +33,9 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.sqoop.job.etl.HdfsExportExtractor;
 import org.apache.sqoop.job.etl.HdfsExportPartitioner;
-import org.apache.sqoop.job.etl.HdfsSequenceExportExtractor;
 import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
-import org.apache.sqoop.job.etl.HdfsTextExportExtractor;
 import org.apache.sqoop.job.etl.Loader;
 import org.apache.sqoop.job.etl.LoaderContext;
 import org.apache.sqoop.job.io.Data;
@@ -66,7 +65,7 @@ public class TestHdfsExtract extends TestCase {
     conf.set(JobConstants.JOB_ETL_PARTITIONER,
         HdfsExportPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR,
-        HdfsTextExportExtractor.class.getName());
+        HdfsExportExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
     conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
     conf.set(JobConstants.HADOOP_INPUTDIR, indir);
@@ -84,7 +83,7 @@ public class TestHdfsExtract extends TestCase {
     conf.set(JobConstants.JOB_ETL_PARTITIONER,
         HdfsExportPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR,
-        HdfsTextExportExtractor.class.getName());
+        HdfsExportExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
     conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
     conf.set(JobConstants.HADOOP_INPUTDIR, indir);
@@ -97,7 +96,7 @@ public class TestHdfsExtract extends TestCase {
     conf.set(JobConstants.JOB_ETL_PARTITIONER,
         HdfsExportPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR,
-        HdfsTextExportExtractor.class.getName());
+        HdfsExportExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
     conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
     conf.set(JobConstants.HADOOP_INPUTDIR, indir);
@@ -115,7 +114,7 @@ public class TestHdfsExtract extends TestCase {
     conf.set(JobConstants.JOB_ETL_PARTITIONER,
         HdfsExportPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR,
-        HdfsSequenceExportExtractor.class.getName());
+        HdfsExportExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
     conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
     conf.set(JobConstants.HADOOP_INPUTDIR, indir);
@@ -133,7 +132,7 @@ public class TestHdfsExtract extends TestCase {
     conf.set(JobConstants.JOB_ETL_PARTITIONER,
         HdfsExportPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR,
-        HdfsSequenceExportExtractor.class.getName());
+        HdfsExportExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
     conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
     conf.set(JobConstants.HADOOP_INPUTDIR, indir);


Mime
View raw message