hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1186530 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/...
Date Wed, 19 Oct 2011 22:02:58 GMT
Author: acmurthy
Date: Wed Oct 19 22:02:57 2011
New Revision: 1186530

URL: http://svn.apache.org/viewvc?rev=1186530&view=rev
Log:
Merge -c 1186529 from trunk to branch-0.23 to complete fix for MAPREDUCE-2693.

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1186530&r1=1186529&r2=1186530&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Wed Oct 19 22:02:57
2011
@@ -1641,6 +1641,8 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-2788. Normalize resource requests in FifoScheduler
     appropriately. (Ahmed Radwan via acmurthy) 
 
+    MAPREDUCE-2693. Fix NPE in job-blacklisting. (Hitesh Shah via acmurthy) 
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1186530&r1=1186529&r2=1186530&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
Wed Oct 19 22:02:57 2011
@@ -509,18 +509,6 @@ public class RMContainerAllocator extend
         request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP);
       } else {
         for (String host : event.getHosts()) {
-          //host comes from data splitLocations which are hostnames. Containers
-          // use IP addresses.
-          //TODO Temporary fix for locality. Use resolvers from h-common. 
-          // Cache to make this more efficient ?
-          InetAddress addr = null;
-          try {
-            addr = InetAddress.getByName(host);
-          } catch (UnknownHostException e) {
-            LOG.warn("Unable to resolve host to IP for host [: " + host + "]");
-          }
-          if (addr != null) //Fallback to host if resolve fails.
-            host = addr.getHostAddress();
           LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
           if (list == null) {
             list = new LinkedList<TaskAttemptId>();
@@ -557,26 +545,101 @@ public class RMContainerAllocator extend
       while (it.hasNext()) {
         Container allocated = it.next();
         LOG.info("Assigning container " + allocated);
-        ContainerRequest assigned = assign(allocated);
-          
-        if (assigned != null) {
-          // Update resource requests
-          decContainerReq(assigned);
+        
+        // check if allocated container meets memory requirements 
+        // and whether we have any scheduled tasks that need 
+        // a container to be assigned
+        boolean isAssignable = true;
+        Priority priority = allocated.getPriority();
+        if (PRIORITY_FAST_FAIL_MAP.equals(priority) 
+            || PRIORITY_MAP.equals(priority)) {
+          if (allocated.getResource().getMemory() < mapResourceReqt
+              || maps.isEmpty()) {
+            LOG.info("Cannot assign container " + allocated 
+                + " for a map as either "
+                + " container memory less than required " + mapResourceReqt
+                + " or no pending map tasks - maps.isEmpty=" 
+                + maps.isEmpty()); 
+            isAssignable = false; 
+          }
+        } 
+        else if (PRIORITY_REDUCE.equals(priority)) {
+          if (allocated.getResource().getMemory() < reduceResourceReqt
+              || reduces.isEmpty()) {
+            LOG.info("Cannot assign container " + allocated 
+                + " for a reduce as either "
+                + " container memory less than required " + reduceResourceReqt
+                + " or no pending reduce tasks - reduces.isEmpty=" 
+                + reduces.isEmpty()); 
+            isAssignable = false;
+          }
+        }          
+        
+        boolean blackListed = false;         
+        ContainerRequest assigned = null;
+        
+        if (isAssignable) {
+          // do not assign if allocated container is on a  
+          // blacklisted host
+          blackListed = isNodeBlacklisted(allocated.getNodeId().getHost());
+          if (blackListed) {
+            // we need to request for a new container 
+            // and release the current one
+            LOG.info("Got allocated container on a blacklisted "
+                + " host. Releasing container " + allocated);
 
-          // send the container-assigned event to task attempt
-          eventHandler.handle(new TaskAttemptContainerAssignedEvent(
-              assigned.attemptID, allocated));
+            // find the request matching this allocated container 
+            // and replace it with a new one 
+            ContainerRequest toBeReplacedReq = 
+                getContainerReqToReplace(allocated);
+            if (toBeReplacedReq != null) {
+              LOG.info("Placing a new container request for task attempt " 
+                  + toBeReplacedReq.attemptID);
+              ContainerRequest newReq = 
+                  getFilteredContainerRequest(toBeReplacedReq);
+              decContainerReq(toBeReplacedReq);
+              if (toBeReplacedReq.attemptID.getTaskId().getTaskType() ==
+                  TaskType.MAP) {
+                maps.put(newReq.attemptID, newReq);
+              }
+              else {
+                reduces.put(newReq.attemptID, newReq);
+              }
+              addContainerReq(newReq);
+            }
+            else {
+              LOG.info("Could not map allocated container to a valid request."
+                  + " Releasing allocated container " + allocated);
+            }
+          }
+          else {
+            assigned = assign(allocated);
+            if (assigned != null) {
+              // Update resource requests
+              decContainerReq(assigned);
+
+              // send the container-assigned event to task attempt
+              eventHandler.handle(new TaskAttemptContainerAssignedEvent(
+                  assigned.attemptID, allocated));
 
-          assignedRequests.add(allocated.getId(), assigned.attemptID);
-          
-          LOG.info("Assigned container (" + allocated + ") " +
-              " to task " + assigned.attemptID +
-              " on node " + allocated.getNodeId().toString());
-        } else {
-          //not assigned to any request, release the container
-          LOG.info("Releasing unassigned and invalid container " + allocated
-              + ". RM has gone crazy, someone go look!"
-              + " Hey RM, if you are so rich, go donate to non-profits!");
+              assignedRequests.add(allocated.getId(), assigned.attemptID);
+
+              LOG.info("Assigned container (" + allocated + ") " +
+                  " to task " + assigned.attemptID +
+                  " on node " + allocated.getNodeId().toString());
+            }
+            else {
+              //not assigned to any request, release the container
+              LOG.info("Releasing unassigned and invalid container " 
+                  + allocated + ". RM has gone crazy, someone go look!"
+                  + " Hey RM, if you are so rich, go donate to non-profits!");
+            }
+          }
+        }
+        
+        // release container if it was blacklisted 
+        // or if we could not assign it 
+        if (blackListed || assigned == null) {
           containersReleased++;
           release(allocated.getId());
         }
@@ -604,12 +667,37 @@ public class RMContainerAllocator extend
       return assigned;
     }
     
+    private ContainerRequest getContainerReqToReplace(Container allocated) {
+      Priority priority = allocated.getPriority();
+      ContainerRequest toBeReplaced = null;
+      if (PRIORITY_FAST_FAIL_MAP.equals(priority) 
+          || PRIORITY_MAP.equals(priority)) {
+        // allocated container was for a map
+        String host = allocated.getNodeId().getHost();
+        LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
+        if (list != null && list.size() > 0) {
+          TaskAttemptId tId = list.removeLast();
+          if (maps.containsKey(tId)) {
+            toBeReplaced = maps.remove(tId);
+          }
+        }
+        else {
+          TaskAttemptId tId = maps.keySet().iterator().next();
+          toBeReplaced = maps.remove(tId);          
+        }        
+      }
+      else if (PRIORITY_REDUCE.equals(priority)) {
+        TaskAttemptId tId = reduces.keySet().iterator().next();
+        toBeReplaced = reduces.remove(tId);    
+      }
+      return toBeReplaced;
+    }
+    
     
     private ContainerRequest assignToFailedMap(Container allocated) {
       //try to assign to earlierFailedMaps if present
       ContainerRequest assigned = null;
-      while (assigned == null && earlierFailedMaps.size() > 0 && 
-          allocated.getResource().getMemory() >= mapResourceReqt) {
+      while (assigned == null && earlierFailedMaps.size() > 0) {
         TaskAttemptId tId = earlierFailedMaps.removeFirst();
         if (maps.containsKey(tId)) {
           assigned = maps.remove(tId);
@@ -627,8 +715,7 @@ public class RMContainerAllocator extend
     private ContainerRequest assignToReduce(Container allocated) {
       ContainerRequest assigned = null;
       //try to assign to reduces if present
-      if (assigned == null && reduces.size() > 0
-          && allocated.getResource().getMemory() >= reduceResourceReqt) {
+      if (assigned == null && reduces.size() > 0) {
         TaskAttemptId tId = reduces.keySet().iterator().next();
         assigned = reduces.remove(tId);
         LOG.info("Assigned to reduce");
@@ -640,9 +727,8 @@ public class RMContainerAllocator extend
     //try to assign to maps if present 
       //first by host, then by rack, followed by *
       ContainerRequest assigned = null;
-      while (assigned == null && maps.size() > 0
-          && allocated.getResource().getMemory() >= mapResourceReqt) {
-        String host = getHost(allocated.getNodeId().toString());
+      while (assigned == null && maps.size() > 0) {
+        String host = allocated.getNodeId().getHost();
         LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
         while (list != null && list.size() > 0) {
           LOG.info("Host matched to the request list " + host);

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1186530&r1=1186529&r2=1186530&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
Wed Oct 19 22:02:57 2011
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapreduce.v2.app.rm;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -63,7 +65,7 @@ public abstract class RMContainerRequest
   //Key->ResourceName (e.g., hostname, rackname, *)
   //Value->Map
   //Key->Resource Capability
-  //Value->ResourceReqeust
+  //Value->ResourceRequest
   private final Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
   remoteRequestsTable =
       new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
@@ -87,14 +89,22 @@ public abstract class RMContainerRequest
     final String[] racks;
     //final boolean earlierAttemptFailed;
     final Priority priority;
+    
     public ContainerRequest(ContainerRequestEvent event, Priority priority) {
-      this.attemptID = event.getAttemptID();
-      this.capability = event.getCapability();
-      this.hosts = event.getHosts();
-      this.racks = event.getRacks();
-      //this.earlierAttemptFailed = event.getEarlierAttemptFailed();
+      this(event.getAttemptID(), event.getCapability(), event.getHosts(),
+          event.getRacks(), priority);
+    }
+    
+    public ContainerRequest(TaskAttemptId attemptID,
+        Resource capability, String[] hosts, String[] racks, 
+        Priority priority) {
+      this.attemptID = attemptID;
+      this.capability = capability;
+      this.hosts = hosts;
+      this.racks = racks;
       this.priority = priority;
     }
+    
   }
 
   @Override
@@ -149,14 +159,37 @@ public abstract class RMContainerRequest
       //remove all the requests corresponding to this hostname
       for (Map<String, Map<Resource, ResourceRequest>> remoteRequests 
           : remoteRequestsTable.values()){
-        //remove from host
-        Map<Resource, ResourceRequest> reqMap = remoteRequests.remove(hostName);
+        //remove from host if no pending allocations
+        boolean foundAll = true;
+        Map<Resource, ResourceRequest> reqMap = remoteRequests.get(hostName);
         if (reqMap != null) {
           for (ResourceRequest req : reqMap.values()) {
-            ask.remove(req);
+            if (!ask.remove(req)) {
+              foundAll = false;
+            }
+            else {
+              // if ask already sent to RM, we can try and overwrite it if possible.
+              // send a new ask to RM with numContainers
+              // specified for the blacklisted host to be 0.
+              ResourceRequest zeroedRequest = BuilderUtils.newResourceRequest(req);
+              zeroedRequest.setNumContainers(0);
+              // to be sent to RM on next heartbeat
+              ask.add(zeroedRequest);
+            }
           }
+          // if all requests were still in ask queue
+          // we can remove this request
+          if (foundAll) {
+            remoteRequests.remove(hostName);
+          }     
         }
-        //TODO: remove from rack
+        // TODO handling of rack blacklisting
+        // Removing from rack should be dependent on no. of failures within the rack 
+        // Blacklisting a rack on the basis of a single node's blacklisting 
+        // may be overly aggressive. 
+        // Node failures could be co-related with other failures on the same rack 
+        // but we probably need a better approach at trying to decide how and when 
+        // to blacklist a rack
       }
     } else {
       nodeFailures.put(hostName, failures);
@@ -171,7 +204,9 @@ public abstract class RMContainerRequest
     // Create resource requests
     for (String host : req.hosts) {
       // Data-local
-      addResourceRequest(req.priority, host, req.capability);
+      if (!isNodeBlacklisted(host)) {
+        addResourceRequest(req.priority, host, req.capability);
+      }      
     }
 
     // Nothing Rack-local for now
@@ -234,6 +269,14 @@ public abstract class RMContainerRequest
     Map<String, Map<Resource, ResourceRequest>> remoteRequests =
       this.remoteRequestsTable.get(priority);
     Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+    if (reqMap == null) {
+      // as we modify the resource requests by filtering out blacklisted hosts 
+      // when they are added, this value may be null when being 
+      // decremented
+      LOG.debug("Not decrementing resource as " + resourceName
+          + " is not present in request table");
+      return;
+    }
     ResourceRequest remoteRequest = reqMap.get(capability);
 
     LOG.info("BEFORE decResourceRequest:" + " applicationId=" + applicationId.getId()
@@ -267,4 +310,23 @@ public abstract class RMContainerRequest
     release.add(containerId);
   }
   
+  protected boolean isNodeBlacklisted(String hostname) {
+    if (!nodeBlacklistingEnabled) {
+      return false;
+    }
+    return blacklistedNodes.contains(hostname);
+  }
+  
+  protected ContainerRequest getFilteredContainerRequest(ContainerRequest orig) {
+    ArrayList<String> newHosts = new ArrayList<String>();
+    for (String host : orig.hosts) {
+      if (!isNodeBlacklisted(host)) {
+        newHosts.add(host);      
+      }
+    }
+    String[] hosts = newHosts.toArray(new String[newHosts.size()]);
+    ContainerRequest newReq = new ContainerRequest(orig.attemptID, orig.capability,
+        hosts, orig.racks, orig.priority); 
+    return newReq;
+  }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1186530&r1=1186529&r2=1186530&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
Wed Oct 19 22:02:57 2011
@@ -34,6 +34,7 @@ import junit.framework.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@@ -44,6 +45,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
@@ -478,6 +480,105 @@ public class TestRMContainerAllocator {
     Assert.assertEquals(100.0f, app.getProgress(), 0.0);
   }
 
+  @Test
+  public void testBlackListedNodes() throws Exception {
+    
+    LOG.info("Running testBlackListedNodes");
+
+    Configuration conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
+    conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
+    
+    MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+        .getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+    amNodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+    
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
+            0, 0, 0, 0, 0, 0, "jobfile"));
+    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+        appAttemptId, mockJob);
+
+    // add resources to scheduler
+    MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
+    MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
+    MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
+    dispatcher.await();
+
+    // create the container request
+    ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
+        new String[] { "h1" });
+    allocator.sendRequest(event1);
+
+    // send 1 more request with different resource req
+    ContainerRequestEvent event2 = createReq(jobId, 2, 1024,
+        new String[] { "h2" });
+    allocator.sendRequest(event2);
+
+    // send another request with different resource and priority
+    ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
+        new String[] { "h3" });
+    allocator.sendRequest(event3);
+
+    // this tells the scheduler about the requests
+    // as nodes are not added, no allocations
+    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+    // Send events to blacklist nodes h1 and h2
+    ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false);            
+    allocator.sendFailure(f1);
+    ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h2", false);            
+    allocator.sendFailure(f2);
+
+    // update resources in scheduler
+    nodeManager1.nodeHeartbeat(true); // Node heartbeat
+    nodeManager2.nodeHeartbeat(true); // Node heartbeat
+    dispatcher.await();
+
+    assigned = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());    
+
+    // mark h1/h2 as bad nodes
+    nodeManager1.nodeHeartbeat(false);
+    nodeManager2.nodeHeartbeat(false);
+    dispatcher.await();
+
+    assigned = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());    
+
+    nodeManager3.nodeHeartbeat(true); // Node heartbeat
+    assigned = allocator.schedule();    
+    dispatcher.await();
+        
+    Assert.assertTrue("No of assignments must be 3", assigned.size() == 3);
+    
+    // validate that all containers are assigned to h3
+    for (TaskAttemptContainerAssignedEvent assig : assigned) {
+      Assert.assertTrue("Assigned container host not correct", "h3".equals(assig
+          .getContainer().getNodeId().getHost()));
+    }
+  }
+  
   private static class MyFifoScheduler extends FifoScheduler {
 
     public MyFifoScheduler(RMContext rmContext) {
@@ -534,6 +635,19 @@ public class TestRMContainerAllocator {
         new String[] { NetworkTopology.DEFAULT_RACK });
   }
 
+  private ContainerFailedEvent createFailEvent(JobId jobId, int taskAttemptId,
+      String host, boolean reduce) {
+    TaskId taskId;
+    if (reduce) {
+      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
+    } else {
+      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+    }
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
+        taskAttemptId);
+    return new ContainerFailedEvent(attemptId, host);    
+  }
+  
   private void checkAssignments(ContainerRequestEvent[] requests,
       List<TaskAttemptContainerAssignedEvent> assignments,
       boolean checkHostMatch) {
@@ -653,6 +767,10 @@ public class TestRMContainerAllocator {
       }
     }
 
+    public void sendFailure(ContainerFailedEvent f) {
+      super.handle(f);
+    }
+    
     // API to be used by tests
     public List<TaskAttemptContainerAssignedEvent> schedule() {
       // run the scheduler
@@ -672,6 +790,7 @@ public class TestRMContainerAllocator {
     protected void startAllocatorThread() {
       // override to NOT start thread
     }
+        
   }
 
   public static void main(String[] args) throws Exception {
@@ -681,5 +800,7 @@ public class TestRMContainerAllocator {
     t.testMapReduceScheduling();
     t.testReportedAppProgress();
     t.testReportedAppProgressWithOnlyMaps();
+    t.testBlackListedNodes();
   }
+
 }



Mime
View raw message