sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject git commit: SQOOP-690: Fix threading issues in SqoopOutputFormatLoadExecutor
Date Fri, 16 Nov 2012 20:10:31 GMT
Updated Branches:
  refs/heads/sqoop2 adef39bbb -> e5ab9a4f3


SQOOP-690: Fix threading issues in SqoopOutputFormatLoadExecutor

(Hari Shreedharan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/e5ab9a4f
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/e5ab9a4f
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/e5ab9a4f

Branch: refs/heads/sqoop2
Commit: e5ab9a4f3456f74092c1ac7335a236cbd8103f69
Parents: adef39b
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Fri Nov 16 12:09:28 2012 -0800
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Fri Nov 16 12:09:28 2012 -0800

----------------------------------------------------------------------
 .../connector/jdbc/GenericJdbcExportLoader.java    |    2 +-
 .../sqoop/job/etl/HdfsSequenceImportLoader.java    |    2 +-
 .../apache/sqoop/job/etl/HdfsTextImportLoader.java |    2 +-
 .../main/java/org/apache/sqoop/job/io/Data.java    |    2 +-
 .../job/mr/SqoopOutputFormatLoadExecutor.java      |  164 +++++----------
 .../java/org/apache/sqoop/job/TestMapReduce.java   |    2 +-
 .../main/java/org/apache/sqoop/job/etl/Loader.java |    2 +-
 .../java/org/apache/sqoop/job/io/DataReader.java   |    6 +-
 8 files changed, 65 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/e5ab9a4f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
index ff7384c..13574b2 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
@@ -29,7 +29,7 @@ public class GenericJdbcExportLoader extends Loader {
   private int batchesPerTransaction = DEFAULT_BATCHES_PER_TRANSACTION;
 
   @Override
-  public void run(ImmutableContext context, DataReader reader) {
+  public void run(ImmutableContext context, DataReader reader) throws Exception{
     String driver = context.getString(
         GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER);
     String url = context.getString(

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e5ab9a4f/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
index 29a73b0..7c0ef08 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
@@ -46,7 +46,7 @@ public class HdfsSequenceImportLoader extends Loader {
   }
 
   @Override
-  public void run(ImmutableContext context, DataReader reader) {
+  public void run(ImmutableContext context, DataReader reader) throws Exception{
     reader.setFieldDelimiter(fieldDelimiter);
 
     Configuration conf = new Configuration();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e5ab9a4f/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
index 711df0f..55eb389 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
@@ -46,7 +46,7 @@ public class HdfsTextImportLoader extends Loader {
   }
 
   @Override
-  public void run(ImmutableContext context, DataReader reader) {
+  public void run(ImmutableContext context, DataReader reader) throws Exception{
     reader.setFieldDelimiter(fieldDelimiter);
 
     Configuration conf = new Configuration();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e5ab9a4f/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
index 41fceb8..83c670c 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
@@ -37,7 +37,7 @@ public class Data implements WritableComparable<Data> {
   // For example, it can be:
   // - Object[] for an array of object record
   // - String for a text of CSV record
-  private Object content = null;
+  private volatile Object content = null;
 
   public static final int EMPTY_DATA = 0;
   public static final int CSV_RECORD = 1;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e5ab9a4f/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index 0d636ae..5a3a04e 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -18,6 +18,7 @@
 
 package org.apache.sqoop.job.mr;
 
+import java.util.concurrent.Semaphore;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -39,12 +40,14 @@ public class SqoopOutputFormatLoadExecutor {
   public static final Log LOG =
       LogFactory.getLog(SqoopOutputFormatLoadExecutor.class.getName());
 
-  private boolean readerFinished;
-  private boolean writerFinished;
-  private Data data;
+  private volatile boolean readerFinished = false;
+  private volatile boolean writerFinished = false;
+  private volatile Data data;
   private JobContext context;
   private SqoopRecordWriter producer;
   private ConsumerThread consumer;
+  private Semaphore filled = new Semaphore(0, true);
+  private Semaphore free = new Semaphore(1, true);
 
   public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
     data = new Data();
@@ -59,69 +62,37 @@ public class SqoopOutputFormatLoadExecutor {
     return producer;
   }
 
+  /*
+   * This is a producer-consumer problem and can be solved
+   * with two semaphores.
+   */
   public class SqoopRecordWriter extends RecordWriter<Data, NullWritable> {
-    @Override
-    public void write(Data key, NullWritable value) {
-      synchronized (data) {
-        if (readerFinished) {
-          consumer.checkException();
-          return;
-        }
-
-        try {
-          if (!data.isEmpty()) {
-            // wait for reader to consume data
-            data.wait();
-          }
-
-          int type = key.getType();
-          data.setContent(key.getContent(type), type);
 
-          // notify reader that the data is ready
-          data.notify();
-
-        } catch (InterruptedException e) {
-          // inform reader that writer is finished
-          writerFinished = true;
-
-          // unlock reader so it can continue
-          data.notify();
+    @Override
+    public void write(Data key, NullWritable value) throws InterruptedException {
 
-          // throw exception
-          throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0015, e);
-        }
+      if(readerFinished) {
+        consumer.checkException();
       }
+      free.acquire();
+      int type = key.getType();
+      data.setContent(key.getContent(type), type);
+      filled.release();
     }
 
     @Override
-    public void close(TaskAttemptContext context) {
-      synchronized (data) {
-        if (readerFinished) {
-          consumer.checkException();
-          return;
-        }
-
-        try {
-          if (!data.isEmpty()) {
-            // wait for reader to consume data
-            data.wait();
-          }
-
-          writerFinished = true;
-
-          data.notify();
-
-        } catch (InterruptedException e) {
-          // inform reader that writer is finished
-          writerFinished = true;
-
-          // unlock reader so it can continue
-          data.notify();
-
-          // throw exception
-          throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0015, e);
-        }
+    public void close(TaskAttemptContext context) throws InterruptedException {
+      if(readerFinished) {
+        // Reader finished before writer - something went wrong?
+        consumer.checkException();
       }
+      free.acquire();
+      writerFinished = true;
+      // This will interrupt only the acquire call in the consumer class,
+      // since we have acquired the free semaphore, and close is called from
+      // the same thread that writes - so filled has not been released since then
+      // so the consumer is definitely blocked on the filled semaphore.
+      consumer.interrupt();
     }
   }
 
@@ -132,52 +103,38 @@ public class SqoopOutputFormatLoadExecutor {
     }
 
     @Override
-    public Object[] readArrayRecord() {
+    public Object[] readArrayRecord() throws InterruptedException {
       return (Object[])readContent(Data.ARRAY_RECORD);
     }
 
     @Override
-    public String readCsvRecord() {
+    public String readCsvRecord() throws InterruptedException {
       return (String)readContent(Data.CSV_RECORD);
     }
 
     @Override
-    public Object readContent(int type) {
-      synchronized (data) {
-        if (writerFinished) {
+    public Object readContent(int type) throws InterruptedException {
+      // Has any more data been produced after I last consumed.
+      // If no, wait for the producer to produce.
+      if (writerFinished && (filled.availablePermits() == 0)) {
+        return null;
+      }
+      try {
+        filled.acquire();
+      } catch (InterruptedException ex) {
+        if(writerFinished) {
           return null;
         }
-
-        try {
-          if (data.isEmpty()) {
-            // wait for writer to produce data
-            data.wait();
-          }
-
-          Object content = data.getContent(type);
-          data.setContent(null, Data.EMPTY_DATA);
-
-          // notify writer that data is consumed
-          data.notify();
-
-          return content;
-
-        } catch (InterruptedException e) {
-          // inform writer that reader is finished
-          readerFinished = true;
-
-          // unlock writer so it can continue
-          data.notify();
-
-          // throw exception
-          throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0016, e);
-        }
+        throw ex;
       }
+      Object content = data.getContent(type);
+      free.release();
+      return content;
     }
   }
 
   public class ConsumerThread extends Thread {
-    private SqoopException exception = null;
+    private volatile SqoopException exception = null;
 
     public void checkException() {
       if (exception != null) {
@@ -201,28 +158,19 @@ public class SqoopOutputFormatLoadExecutor {
       try {
         loader.run(frameworkContext, reader);
       } catch (Throwable t) {
-        throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t);
+        exception = new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t);
+        LOG.error("Error while loading data out of MR job.", t);
       }
 
-      synchronized (data) {
-        // inform writer that reader is finished
-        readerFinished = true;
-
-        // unlock writer so it can continue
-        data.notify();
-
-        // if no exception happens yet
-        if (exception == null && !writerFinished) {
-          // create exception if data are not all consumed
-          exception = new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019);
-        }
-
-        // throw deferred exception if exist
-        if (exception != null) {
-          throw exception;
-        }
+      // if no exception happens yet and reader finished before writer,
+      // something went wrong
+      if (exception == null && !writerFinished) {
+        // create exception if data are not all consumed
+        exception = new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019);
+        LOG.error("Reader terminated, but writer is still running!", exception);
       }
+      // inform writer that reader is finished
+      readerFinished = true;
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e5ab9a4f/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index 6dcf784..3e498ec 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -216,7 +216,7 @@ public class TestMapReduce extends TestCase {
     private Data actual = new Data();
 
     @Override
-    public void run(ImmutableContext context, DataReader reader) {
+    public void run(ImmutableContext context, DataReader reader) throws Exception{
       Object[] array;
       while ((array = reader.readArrayRecord()) != null) {
         actual.setContent(array, Data.ARRAY_RECORD);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e5ab9a4f/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
index 3a708df..046b939 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
@@ -25,6 +25,6 @@ import org.apache.sqoop.job.io.DataReader;
  */
 public abstract class Loader {
 
-  public abstract void run(ImmutableContext context, DataReader reader);
+  public abstract void run(ImmutableContext context, DataReader reader) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e5ab9a4f/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java b/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java
index 18e2fb7..a50f591 100644
--- a/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java
+++ b/spi/src/main/java/org/apache/sqoop/job/io/DataReader.java
@@ -23,11 +23,11 @@ package org.apache.sqoop.job.io;
  */
 public abstract class DataReader {
 
-  public abstract Object[] readArrayRecord();
+  public abstract Object[] readArrayRecord() throws Exception;
 
-  public abstract String readCsvRecord();
+  public abstract String readCsvRecord() throws Exception;
 
-  public abstract Object readContent(int type);
+  public abstract Object readContent(int type) throws Exception;
 
   public abstract void setFieldDelimiter(char fieldDelimiter);
 


Mime
View raw message