hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject svn commit: r1507384 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/...
Date Fri, 26 Jul 2013 17:58:07 GMT
Author: jlowe
Date: Fri Jul 26 17:58:07 2013
New Revision: 1507384

URL: http://svn.apache.org/r1507384
Log:
MAPREDUCE-5251. Reducer should not implicate map attempt if it has insufficient space to fetch
map output. Contributed by Ashwin Shankar

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1507384&r1=1507383&r2=1507384&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Jul 26 17:58:07
2013
@@ -21,6 +21,9 @@ Release 0.23.10 - UNRELEASED
     MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via
     jlowe)
 
+    MAPREDUCE-5251. Reducer should not implicate map attempt if it has
+    insufficient space to fetch map output (Ashwin Shankar via jlowe)
+
 Release 0.23.9 - 2013-07-08
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1507384&r1=1507383&r2=1507384&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
Fri Jul 26 17:58:07 2013
@@ -338,7 +338,14 @@ class Fetcher<K,V> extends Thread {
       }
       
       // Get the location for the map output - either in-memory or on-disk
-      mapOutput = merger.reserve(mapId, decompressedLength, id);
+      try {
+        mapOutput = merger.reserve(mapId, decompressedLength, id);
+      } catch (IOException ioe) {
+        // kill this reduce attempt
+        ioErrs.increment(1);
+        scheduler.reportLocalError(ioe);
+        return EMPTY_ATTEMPT_ID_ARRAY;
+      }
       
       // Check if we can shuffle *now* ...
       if (mapOutput.getType() == Type.WAIT) {

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java?rev=1507384&r1=1507383&r2=1507384&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
Fri Jul 26 17:58:07 2013
@@ -18,6 +18,8 @@
 package org.apache.hadoop.mapreduce.task.reduce;
 
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -301,6 +303,16 @@ class ShuffleScheduler<K,V> {
     host.addKnownMap(mapId);
   }
 
+  public void reportLocalError(IOException ioe) {
+    try {
+      LOG.error("Shuffle failed : local error on this node: "
+          + InetAddress.getLocalHost());
+    } catch (UnknownHostException e) {
+      LOG.error("Shuffle failed : local error on this node");
+    }
+    reporter.reportException(ioe);
+  }
+
   public synchronized MapHost getHost() throws InterruptedException {
       while(pendingHosts.isEmpty()) {
         wait();

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java?rev=1507384&r1=1507383&r2=1507384&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
Fri Jul 26 17:58:07 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.junit.Test;
 
 /**
@@ -73,6 +74,56 @@ public class TestFetcher {
   }
   
   @SuppressWarnings("unchecked")
+  @Test
+  public void testReduceOutOfDiskSpace() throws Throwable {
+    LOG.info("testReduceOutOfDiskSpace");
+    JobConf job = new JobConf();
+    TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
+    ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
+    MergeManager<Text, Text> mm = mock(MergeManager.class);
+    Reporter r = mock(Reporter.class);
+    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
+    ExceptionReporter except = mock(ExceptionReporter.class);
+    SecretKey key = JobTokenSecretManager.createSecretKey(new byte[] { 0, 0, 0,
+        0 });
+    HttpURLConnection connection = mock(HttpURLConnection.class);
+
+    Counters.Counter allErrs = mock(Counters.Counter.class);
+    when(r.getCounter(anyString(), anyString())).thenReturn(allErrs);
+
+    Fetcher<Text, Text> underTest = new FakeFetcher<Text, Text>(job, id, ss,
+        mm, r, metrics, except, key, connection);
+
+    MapHost host = new MapHost("localhost", "http://localhost:8080/");
+    ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1);
+    TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
+    maps.add(map1ID);
+    TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
+    maps.add(map2ID);
+    String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg=";
+    String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
+    ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
+    ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    header.write(new DataOutputStream(bout));
+
+    ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
+
+    when(ss.getMapsForHost(host)).thenReturn(maps);
+    when(connection.getResponseCode()).thenReturn(200);
+    when(
+        connection
+            .getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
+        .thenReturn(replyHash);
+    when(connection.getInputStream()).thenReturn(in);
+
+    when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())).thenThrow(
+        new DiskErrorException("No disk space available"));
+
+    underTest.copyFromHost(host);
+    verify(ss).reportLocalError(any(IOException.class));
+  }
+  
+  @SuppressWarnings("unchecked")
   @Test(timeout=30000)
   public void testCopyFromHostConnectionTimeout() throws Exception {
     LOG.info("testCopyFromHostConnectionTimeout");



Mime
View raw message