sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject git commit: SQOOP-702: Refactor OutputLoadExecutor
Date Sat, 17 Nov 2012 00:00:40 GMT
Updated Branches:
  refs/heads/sqoop2 e5ab9a4f3 -> 73b5b38c5


SQOOP-702: Refactor OutputLoadExecutor

(Hari Shreedharan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/73b5b38c
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/73b5b38c
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/73b5b38c

Branch: refs/heads/sqoop2
Commit: 73b5b38c53d7dbc52d35fcf71fd9e9c1e1fb876f
Parents: e5ab9a4
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Fri Nov 16 15:59:37 2012 -0800
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Fri Nov 16 15:59:37 2012 -0800

----------------------------------------------------------------------
 .../job/mr/SqoopOutputFormatLoadExecutor.java      |   74 +++++++++------
 1 files changed, 46 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/73b5b38c/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index 5a3a04e..3bd1e1b 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -18,6 +18,10 @@
 
 package org.apache.sqoop.job.mr;
 
+import com.google.common.base.Throwables;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,7 +49,7 @@ public class SqoopOutputFormatLoadExecutor {
   private volatile Data data;
   private JobContext context;
   private SqoopRecordWriter producer;
-  private ConsumerThread consumer;
+  private Future<?> consumerFuture;
   private Semaphore filled = new Semaphore(0, true);
   private Semaphore free = new Semaphore(1, true);
 
@@ -53,12 +57,11 @@ public class SqoopOutputFormatLoadExecutor {
     data = new Data();
     context = jobctx;
     producer = new SqoopRecordWriter();
-    consumer = new ConsumerThread();
   }
 
   public RecordWriter<Data, NullWritable> getRecordWriter() {
-    consumer.setDaemon(true);
-    consumer.start();
+    consumerFuture = Executors.newSingleThreadExecutor().submit(
+            new ConsumerThread());
     return producer;
   }
 
@@ -66,14 +69,11 @@ public class SqoopOutputFormatLoadExecutor {
    * This is a producer-consumer problem and can be solved
    * with two semaphores.
    */
-  public class SqoopRecordWriter extends RecordWriter<Data, NullWritable> {
+  private class SqoopRecordWriter extends RecordWriter<Data, NullWritable> {
 
     @Override
     public void write(Data key, NullWritable value) throws InterruptedException {
-
-      if(readerFinished) {
-        consumer.checkException();
-      }
+      checkConsumerCompletion();
       free.acquire();
       int type = key.getType();
       data.setContent(key.getContent(type), type);
@@ -82,21 +82,45 @@ public class SqoopOutputFormatLoadExecutor {
 
     @Override
     public void close(TaskAttemptContext context) throws InterruptedException {
-      if(readerFinished) {
-        // Reader finished before writer - something went wrong?
-        consumer.checkException();
-      }
+      checkConsumerCompletion();
       free.acquire();
       writerFinished = true;
       // This will interrupt only the acquire call in the consumer class,
       // since we have acquired the free semaphore, and close is called from
       // the same thread that writes - so filled has not been released since then
       // so the consumer is definitely blocked on the filled semaphore.
-      consumer.interrupt();
+      consumerFuture.cancel(true);
+    }
+  }
+
+  /**
+   * This method checks if the reader thread has finished, and re-throw
+   * any exceptions thrown by the reader thread.
+   *
+   * @throws SqoopException if the consumer thread threw it.
+   * @throws RuntimeException if some other exception was thrown.
+   */
+  private void checkConsumerCompletion() {
+    if (readerFinished) {
+      try {
+        consumerFuture.get();
+      } catch (ExecutionException ex) {
+        // In almost all cases, the exception will be SqoopException,
+        // because all exceptions are caught and propagated as
+        // SqoopExceptions
+        Throwable t = ex.getCause();
+        if(t instanceof SqoopException) {
+          throw (SqoopException)t;
+        }
+        //In the rare case, it was not a SqoopException
+        Throwables.propagate(t);
+      } catch (Exception ex) {
+        throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019, ex);
+      }
     }
   }
 
-  public class OutputFormatDataReader extends DataReader {
+  private class OutputFormatDataReader extends DataReader {
     @Override
     public void setFieldDelimiter(char fieldDelimiter) {
       data.setFieldDelimiter(fieldDelimiter);
@@ -133,14 +157,7 @@ public class SqoopOutputFormatLoadExecutor {
     }
   }
 
-  public class ConsumerThread extends Thread {
-    private volatile SqoopException exception = null;
-
-    public void checkException() {
-      if (exception != null) {
-        throw exception;
-      }
-    }
+  private class ConsumerThread implements Runnable {
 
     @Override
     public void run() {
@@ -158,16 +175,17 @@ public class SqoopOutputFormatLoadExecutor {
       try {
         loader.run(frameworkContext, reader);
       } catch (Throwable t) {
-        exception = new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t);
         LOG.error("Error while loading data out of MR job.", t);
+        throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t);
       }
 
       // if no exception happens yet and reader finished before writer,
       // something went wrong
-      if (exception == null && !writerFinished) {
-        // create exception if data are not all consumed
-        exception = new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019);
-        LOG.error("Reader terminated, but writer is still running!", exception);
+      if (!writerFinished) {
+        // throw exception if data are not all consumed
+        LOG.error("Reader terminated, but writer is still running!");
+        throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019);
+
       }
       // inform writer that reader is finished
       readerFinished = true;


Mime
View raw message