sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject git commit: SQOOP-761: HDFSTextExportExtractor loses lines around partition boundaries
Date Sat, 15 Dec 2012 20:43:31 GMT
Updated Branches:
  refs/heads/sqoop2 2455f7423 -> c41714888


SQOOP-761: HDFSTextExportExtractor loses lines around partition boundaries

(Hari Shreedharan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/c4171488
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/c4171488
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/c4171488

Branch: refs/heads/sqoop2
Commit: c41714888808fd56d14b5af11d85b85392a89ff7
Parents: 2455f74
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Sat Dec 15 12:43:05 2012 -0800
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Sat Dec 15 12:43:05 2012 -0800

----------------------------------------------------------------------
 .../sqoop/job/etl/HdfsSequenceExportExtractor.java |    3 +
 .../sqoop/job/etl/HdfsTextExportExtractor.java     |   33 ++++++--------
 .../java/org/apache/sqoop/job/TestHdfsExtract.java |   10 ++--
 pom.xml                                            |   16 +++++++
 4 files changed, 38 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4171488/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
index 2261a7c..16afcdb 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
@@ -85,6 +85,9 @@ public class HdfsSequenceExportExtractor extends Extractor {
     while (hasNext) {
       datawriter.writeCsvRecord(line.toString());
       hasNext = filereader.next(line);
+      if(filereader.getPosition() >= end && filereader.syncSeen()) {
+        break;
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4171488/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
index fdc7d67..8055140 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
@@ -84,25 +84,16 @@ public class HdfsTextExportExtractor extends Extractor {
     FSDataInputStream filestream = fs.open(file);
     CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file);
     LineReader filereader;
-    Seekable fileseeker;
+    Seekable fileseeker = filestream;
 
-    if (codec == null) {
-      filestream.seek(start);
-      byte[] recordDelimiterBytes = String.valueOf(
-          Data.DEFAULT_RECORD_DELIMITER).getBytes(
-              Charset.forName(Data.CHARSET_NAME));
-      // Hadoop 1.0 do not have support for custom record delimiter and thus we
-      // are supporting only default one.
-      filereader = new LineReader(filestream, conf);
-      fileseeker = filestream;
+    // Hadoop 1.0 does not have support for custom record delimiter and thus we
+    // are supporting only default one.
     // We might add another "else if" case for SplittableCompressionCodec once
     // we drop support for Hadoop 1.0.
+    if (codec == null) {
+      filestream.seek(start);
+      filereader = new LineReader(filestream);
     } else {
-      byte[] recordDelimiterBytes = String.valueOf(
-          Data.DEFAULT_RECORD_DELIMITER).getBytes(
-              Charset.forName(Data.CHARSET_NAME));
-      // Hadoop 1.0 do not have support for custom record delimiter and thus we
-      // are supporting only default one.
       filereader = new LineReader(
           codec.createInputStream(filestream, codec.createDecompressor()), conf);
       fileseeker = filestream;
@@ -113,15 +104,20 @@ public class HdfsTextExportExtractor extends Extractor {
       // one extra line is read in previous split
       start += filereader.readLine(new Text(), 0);
     }
-
     Text line = new Text();
     int size;
-    while (fileseeker.getPos() <= end) {
+    LOG.info("Start position: " + String.valueOf(start));
+    long next = start;
+    while (next <= end) {
       size = filereader.readLine(line, Integer.MAX_VALUE);
       if (size == 0) {
         break;
       }
-
+      if (codec == null) {
+        next += size;
+      } else {
+        next = fileseeker.getPos();
+      }
       datawriter.writeCsvRecord(line.toString());
     }
     LOG.info("Extracting ended on position: " + fileseeker.getPos());
@@ -132,5 +128,4 @@ public class HdfsTextExportExtractor extends Extractor {
     // TODO need to return the rows read
     return 0;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4171488/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
index 484eb20..95cfe85 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
@@ -107,7 +107,7 @@ public class TestHdfsExtract extends TestCase {
   }
 
   @Test
-  public void testUncompressedSequence() throws Exception {
+  public void testCompressedSequence() throws Exception {
     FileUtils.delete(indir);
     FileUtils.mkdirs(indir);
     createSequenceInput(SqoopFileOutputFormat.DEFAULT_CODEC);
@@ -125,7 +125,7 @@ public class TestHdfsExtract extends TestCase {
   }
 
   @Test
-  public void testCompressedSequence() throws Exception {
+  public void testUncompressedSequence() throws Exception {
     FileUtils.delete(indir);
     FileUtils.mkdirs(indir);
     createSequenceInput(null);
@@ -241,9 +241,9 @@ public class TestHdfsExtract extends TestCase {
       int numbers = NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE;
 // This test is not currently working due to bug in HdfsExtractor.
 // Check SQOOP-761 for more details.
-//      assertEquals((1+numbers)*numbers/2, sum);
-//
-//      assertEquals(NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE, index-1);
+      assertEquals((1+numbers)*numbers/2, sum);
+
+      assertEquals(NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE, index-1);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4171488/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index be4f1b6..2e66f7f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -353,8 +353,24 @@ limitations under the License.
           <artifactId>maven-jar-plugin</artifactId>
           <version>2.3.2</version>
         </plugin>
+
+
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-surefire-plugin</artifactId>
+          <version>2.12</version>
+          <configuration>
+            <forkMode>always</forkMode>
+            <forkedProcessTimeoutInSeconds>900</forkedProcessTimeoutInSeconds>
+            <redirectTestOutputToFile>true</redirectTestOutputToFile>
+            <argLine>-Xms256m -Xmx1g</argLine>
+          </configuration>
+        </plugin>
+
       </plugins>
     </pluginManagement>
+
+
   </build>
 
   <!-- All reports might be generated using mvn site command -->


Mime
View raw message