sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject git commit: SQOOP-674: Sqoop2: Exceptions in special map reduce threads can cause mapreduce job to freeze
Date Tue, 18 Dec 2012 02:43:53 GMT
Updated Branches:
  refs/heads/sqoop2 7becb12de -> ac3623778


SQOOP-674: Sqoop2: Exceptions in special map reduce threads can cause mapreduce job to freeze

(Hari Shreedharan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/ac362377
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/ac362377
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/ac362377

Branch: refs/heads/sqoop2
Commit: ac362377853581eb3c5347da3c1d4404915c98d8
Parents: 7becb12
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Mon Dec 17 18:43:07 2012 -0800
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Mon Dec 17 18:43:07 2012 -0800

----------------------------------------------------------------------
 .../job/mr/SqoopOutputFormatLoadExecutor.java      |   11 +-
 .../job/mr/TestSqoopOutputFormatLoadExecutor.java  |  440 +++++++++++++++
 2 files changed, 450 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/ac362377/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index 38e2292..9714167 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -75,8 +75,8 @@ public class SqoopOutputFormatLoadExecutor {
 
     @Override
     public void write(Data key, NullWritable value) throws InterruptedException {
-      checkIfConsumerThrew();
       free.acquire();
+      checkIfConsumerThrew();
       int type = key.getType();
       data.setContent(key.getContent(type), type);
       filled.release();
@@ -192,6 +192,9 @@ public class SqoopOutputFormatLoadExecutor {
           break;
         default:
           readerFinished = true;
+          // Release so that the writer can tell the framework something went
+          // wrong.
+          free.release();
           throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023);
       }
 
@@ -202,6 +205,9 @@ public class SqoopOutputFormatLoadExecutor {
       } catch (Throwable t) {
         readerFinished = true;
         LOG.error("Error while loading data out of MR job.", t);
+        // Release so that the writer can tell the framework something went
+        // wrong.
+        free.release();
         throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t);
       }
 
@@ -211,6 +217,9 @@ public class SqoopOutputFormatLoadExecutor {
         // throw exception if data are not all consumed
         readerFinished = true;
         LOG.error("Reader terminated, but writer is still running!");
+        // Release so that the writer can tell the framework something went
+        // wrong.
+        free.release();
         throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019);
 
       }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ac362377/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
new file mode 100644
index 0000000..4234adf
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.security.Credentials;
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataReader;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ConcurrentModificationException;
+import java.util.concurrent.BrokenBarrierException;
+
+public class TestSqoopOutputFormatLoadExecutor {
+
+  private Configuration conf;
+
+  public static class ThrowingLoader extends Loader {
+
+    public ThrowingLoader() {
+
+    }
+
+    @Override
+    public void load(ImmutableContext context, Object connectionConfiguration,
+                     Object jobConfiguration, DataReader reader) throws Exception {
+      reader.readContent(Data.CSV_RECORD);
+      throw new BrokenBarrierException();
+    }
+  }
+
+  public static class ThrowingContinuousLoader extends Loader {
+
+    public ThrowingContinuousLoader() {
+    }
+
+    @Override
+    public void load(ImmutableContext context, Object connectionConfiguration,
+                     Object jobConfiguration, DataReader reader) throws Exception {
+      int runCount = 0;
+      Object o;
+      String[] arr;
+      while ((o = reader.readContent(Data.CSV_RECORD)) != null) {
+        arr = o.toString().split(",");
+        Assert.assertEquals(100, arr.length);
+        for (int i = 0; i < arr.length; i++) {
+          Assert.assertEquals(i, Integer.parseInt(arr[i]));
+        }
+        runCount++;
+        if (runCount == 5) {
+          throw new ConcurrentModificationException();
+        }
+      }
+    }
+  }
+
+  public static class GoodLoader extends Loader {
+
+    public GoodLoader() {
+
+    }
+
+    @Override
+    public void load(ImmutableContext context, Object connectionConfiguration,
+                     Object jobConfiguration, DataReader reader) throws Exception {
+      String[] arr = reader.readContent(Data.CSV_RECORD).toString().split(",");
+      Assert.assertEquals(100, arr.length);
+      for (int i = 0; i < arr.length; i++) {
+        Assert.assertEquals(i, Integer.parseInt(arr[i]));
+      }
+    }
+  }
+
+  public static class GoodContinuousLoader extends Loader {
+
+    public GoodContinuousLoader() {
+
+    }
+
+    @Override
+    public void load(ImmutableContext context, Object connectionConfiguration,
+                     Object jobConfiguration, DataReader reader) throws Exception {
+      int runCount = 0;
+      Object o;
+      String[] arr;
+      while ((o = reader.readContent(Data.CSV_RECORD)) != null) {
+        arr = o.toString().split(",");
+        Assert.assertEquals(100, arr.length);
+        for (int i = 0; i < arr.length; i++) {
+          Assert.assertEquals(i, Integer.parseInt(arr[i]));
+        }
+        runCount++;
+      }
+      Assert.assertEquals(10, runCount);
+    }
+  }
+
+
+  @Before
+  public void setUp() {
+    conf = new Configuration();
+
+  }
+
+  @Test(expected = BrokenBarrierException.class)
+  public void testWhenLoaderThrows() throws Throwable {
+    conf.set(JobConstants.JOB_TYPE, "EXPORT");
+    conf.set(JobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName());
+    SqoopOutputFormatLoadExecutor executor = new
+        SqoopOutputFormatLoadExecutor(getJobContext());
+    RecordWriter<Data, NullWritable> writer = executor.getRecordWriter();
+    Data data = new Data();
+    try {
+      for (int count = 0; count < 100; count++) {
+        data.setContent(String.valueOf(count), Data.CSV_RECORD);
+        writer.write(data, null);
+      }
+    } catch (SqoopException ex) {
+      throw ex.getCause();
+    }
+  }
+
+  @Test
+  public void testSuccessfulContinuousLoader() throws Throwable {
+    conf.set(JobConstants.JOB_TYPE, "EXPORT");
+    conf.set(JobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName());
+    SqoopOutputFormatLoadExecutor executor = new
+        SqoopOutputFormatLoadExecutor(getJobContext());
+    RecordWriter<Data, NullWritable> writer = executor.getRecordWriter();
+    Data data = new Data();
+    for (int i = 0; i < 10; i++) {
+      StringBuilder builder = new StringBuilder();
+      for (int count = 0; count < 100; count++) {
+        builder.append(String.valueOf(count));
+        if (count != 99) {
+          builder.append(",");
+        }
+      }
+      data.setContent(builder.toString(), Data.CSV_RECORD);
+      writer.write(data, null);
+    }
+    writer.close(getJobContext());
+  }
+
+  @Test
+  public void testSuccessfulLoader() throws Throwable {
+    conf.set(JobConstants.JOB_TYPE, "EXPORT");
+    conf.set(JobConstants.JOB_ETL_LOADER, GoodLoader.class.getName());
+    SqoopOutputFormatLoadExecutor executor = new
+        SqoopOutputFormatLoadExecutor(getJobContext());
+    RecordWriter<Data, NullWritable> writer = executor.getRecordWriter();
+    Data data = new Data();
+    StringBuilder builder = new StringBuilder();
+    for (int count = 0; count < 100; count++) {
+      builder.append(String.valueOf(count));
+      if (count != 99) {
+        builder.append(",");
+      }
+    }
+    data.setContent(builder.toString(), Data.CSV_RECORD);
+    writer.write(data, null);
+    writer.close(getJobContext());
+  }
+
+
+  @Test(expected = ConcurrentModificationException.class)
+  public void testThrowingContinuousLoader() throws Throwable {
+    conf.set(JobConstants.JOB_TYPE, "EXPORT");
+    conf.set(JobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName());
+    SqoopOutputFormatLoadExecutor executor = new
+        SqoopOutputFormatLoadExecutor(getJobContext());
+    RecordWriter<Data, NullWritable> writer = executor.getRecordWriter();
+    Data data = new Data();
+    try {
+      for (int i = 0; i < 10; i++) {
+        StringBuilder builder = new StringBuilder();
+        for (int count = 0; count < 100; count++) {
+          builder.append(String.valueOf(count));
+          if (count != 99) {
+            builder.append(",");
+          }
+        }
+        data.setContent(builder.toString(), Data.CSV_RECORD);
+        writer.write(data, null);
+      }
+      writer.close(getJobContext());
+    } catch (SqoopException ex) {
+      throw ex.getCause();
+    }
+  }
+
+
+  private TaskAttemptContext getJobContext() {
+    TaskAttemptContext context = new TaskAttemptContext() {
+      @Override
+      public Configuration getConfiguration() {
+        return conf;
+      }
+
+      @Override
+      public Credentials getCredentials() {
+        return null;
+      }
+
+      @Override
+      public JobID getJobID() {
+        return null;
+      }
+
+      @Override
+      public int getNumReduceTasks() {
+        return 0;
+      }
+
+      @Override
+      public Path getWorkingDirectory() throws IOException {
+        return null;
+      }
+
+      @Override
+      public Class<?> getOutputKeyClass() {
+        return null;
+      }
+
+      @Override
+      public Class<?> getOutputValueClass() {
+        return null;
+      }
+
+      @Override
+      public Class<?> getMapOutputKeyClass() {
+        return null;
+      }
+
+      @Override
+      public Class<?> getMapOutputValueClass() {
+        return null;
+      }
+
+      @Override
+      public String getJobName() {
+        return null;
+      }
+
+      @Override
+      public Class<? extends InputFormat<?, ?>> getInputFormatClass() throws
ClassNotFoundException {
+        return null;
+      }
+
+      @Override
+      public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass() throws ClassNotFoundException
{
+        return null;
+      }
+
+      @Override
+      public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass() throws ClassNotFoundException
{
+        return null;
+      }
+
+      @Override
+      public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass() throws ClassNotFoundException
{
+        return null;
+      }
+
+      @Override
+      public Class<? extends OutputFormat<?, ?>> getOutputFormatClass() throws
ClassNotFoundException {
+        return null;
+      }
+
+      @Override
+      public Class<? extends Partitioner<?, ?>> getPartitionerClass() throws
ClassNotFoundException {
+        return null;
+      }
+
+      @Override
+      public RawComparator<?> getSortComparator() {
+        return null;
+      }
+
+      @Override
+      public String getJar() {
+        return null;
+      }
+
+      @Override
+      public RawComparator<?> getGroupingComparator() {
+        return null;
+      }
+
+      @Override
+      public boolean getJobSetupCleanupNeeded() {
+        return false;
+      }
+
+      @Override
+      public boolean getTaskCleanupNeeded() {
+        return false;
+      }
+
+      @Override
+      public boolean getProfileEnabled() {
+        return false;
+      }
+
+      @Override
+      public String getProfileParams() {
+        return null;
+      }
+
+      @Override
+      public Configuration.IntegerRanges getProfileTaskRange(boolean isMap) {
+        return null;
+      }
+
+      @Override
+      public String getUser() {
+        return null;
+      }
+
+      @Override
+      public boolean getSymlink() {
+        return false;
+      }
+
+      @Override
+      public Path[] getArchiveClassPaths() {
+        return new Path[0];
+      }
+
+      @Override
+      public URI[] getCacheArchives() throws IOException {
+        return new URI[0];
+      }
+
+      @Override
+      public URI[] getCacheFiles() throws IOException {
+        return new URI[0];
+      }
+
+      @Override
+      public Path[] getLocalCacheArchives() throws IOException {
+        return new Path[0];
+      }
+
+      @Override
+      public Path[] getLocalCacheFiles() throws IOException {
+        return new Path[0];
+      }
+
+      @Override
+      public Path[] getFileClassPaths() {
+        return new Path[0];
+      }
+
+      @Override
+      public String[] getArchiveTimestamps() {
+        return new String[0];
+      }
+
+      @Override
+      public String[] getFileTimestamps() {
+        return new String[0];
+      }
+
+      @Override
+      public int getMaxMapAttempts() {
+        return 0;
+      }
+
+      @Override
+      public int getMaxReduceAttempts() {
+        return 0;
+      }
+
+      @Override
+      public TaskAttemptID getTaskAttemptID() {
+        return null;
+      }
+
+      @Override
+      public void setStatus(String msg) {
+
+      }
+
+      @Override
+      public String getStatus() {
+        return null;
+      }
+
+      @Override
+      public float getProgress() {
+        return 0;
+      }
+
+      @Override
+      public Counter getCounter(Enum<?> counterName) {
+        return null;
+      }
+
+      @Override
+      public Counter getCounter(String groupName, String counterName) {
+        return null;
+      }
+
+      @Override
+      public void progress() {
+
+      }
+    };
+    return context;
+  }
+}


Mime
View raw message