hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1464815 - in /hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src...
Date Fri, 05 Apr 2013 02:43:35 GMT
Author: szetszwo
Date: Fri Apr  5 02:43:29 2013
New Revision: 1464815

URL: http://svn.apache.org/r1464815
Log:
Merge r1462698 through r1464807 from trunk.

Modified:
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/impl/pb/client/MRClientProtocolPBClientImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project:r1463804
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1462698-1464807

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt?rev=1464815&r1=1464814&r2=1464815&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt Fri Apr  5 02:43:29 2013
@@ -158,6 +158,26 @@ Trunk (Unreleased)
     MAPREDUCE-5006. Fix failing streaming tests due to MAPREDUCE-4994.
     (Sandy Ryza via tomwhite)
 
+Release 2.0.5-alpha - UNRELEASED
+
+  INCOMPATIBLE CHANGES
+
+  NEW FEATURES
+
+  IMPROVEMENTS
+
+  OPTIMIZATIONS
+
+  BUG FIXES
+
+    MAPREDUCE-5113. Streaming input/output types are ignored with java 
+    mapper/reducer. (sandyr via tucu)
+
+    MAPREDUCE-5098. Fix findbugs warnings in gridmix. (kkambatl via tucu)
+
+    MAPREDUCE-5086. MR app master deletes staging dir when sent a reboot
+    command from the RM. (Jian He via jlowe)
+
 Release 2.0.4-beta - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -195,6 +215,13 @@ Release 2.0.4-beta - UNRELEASED
     MAPREDUCE-5008. Merger progress miscounts with respect to EOF_MARKER.
     (Sandy Ryza via tomwhite)
 
+    MAPREDUCE-5117. Changed MRClientProtocolPBClientImpl to be closeable and thus
+    fix failures in renewal of HistoryServer's delegations tokens. (Siddharth
+    Seth via vinodkv)
+
+    MAPREDUCE-5088. MR Client gets an renewer token exception while Oozie is
+    submitting a job (Daryn Sharp via cos)
+
 Release 2.0.3-alpha - 2013-02-06 
 
   INCOMPATIBLE CHANGES
@@ -744,6 +771,7 @@ Release 0.23.7 - UNRELEASED
     MAPREDUCE-5009. Killing the Task Attempt slated for commit does not clear
     the value from the Task commitAttempt member (Robert Parker via jeagles)
 
+    MAPREDUCE-4991. coverage for gridmix (Aleksey Gorshkov via tgraves)
 
 Release 0.23.6 - UNRELEASED
 

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/CHANGES.txt:r1463804
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1462698-1464807

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1462698-1464807

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1464815&r1=1464814&r2=1464815&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Fri Apr  5 02:43:29 2013
@@ -549,8 +549,14 @@ public class MRAppMaster extends Composi
     }
 
     try {
-      //We are finishing cleanly so this is the last retry
-      isLastAMRetry = true;
+      //if isLastAMRetry comes as true, should never set it to false
+      if ( !isLastAMRetry){
+        if (((JobImpl)job).getInternalState() != JobStateInternal.REBOOT) {
+          LOG.info("We are finishing cleanly so this is the last retry");
+          isLastAMRetry = true;
+        }
+      }
+      notifyIsLastAMRetry(isLastAMRetry);
       // Stop all services
       // This will also send the final report to the ResourceManager
       LOG.info("Calling stop for all the services");
@@ -1272,19 +1278,25 @@ public class MRAppMaster extends Composi
       // that they don't take too long in shutting down
       if(appMaster.containerAllocator instanceof ContainerAllocatorRouter) {
         ((ContainerAllocatorRouter) appMaster.containerAllocator)
-        .setSignalled(true);
-        ((ContainerAllocatorRouter) appMaster.containerAllocator)
-        .setShouldUnregister(appMaster.isLastAMRetry);
-      }
-      
-      if(appMaster.jobHistoryEventHandler != null) {
-        appMaster.jobHistoryEventHandler
-          .setForcejobCompletion(appMaster.isLastAMRetry);
+          .setSignalled(true);
       }
+      appMaster.notifyIsLastAMRetry(appMaster.isLastAMRetry);
       appMaster.stop();
     }
   }
 
+  public void notifyIsLastAMRetry(boolean isLastAMRetry){
+    if(containerAllocator instanceof ContainerAllocatorRouter) {
+      LOG.info("Notify RMCommunicator isAMLastRetry: " + isLastAMRetry);
+      ((ContainerAllocatorRouter) containerAllocator)
+        .setShouldUnregister(isLastAMRetry);
+    }
+    if(jobHistoryEventHandler != null) {
+      LOG.info("Notify JHEH isAMLastRetry: " + isLastAMRetry);
+      jobHistoryEventHandler.setForcejobCompletion(isLastAMRetry);
+    }
+  }
+
   protected static void initAndStartAppMaster(final MRAppMaster appMaster,
       final YarnConfiguration conf, String jobUserName) throws IOException,
       InterruptedException {

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java?rev=1464815&r1=1464814&r2=1464815&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java Fri Apr  5 02:43:29 2013
@@ -30,5 +30,6 @@ public enum JobStateInternal {
   KILL_WAIT,
   KILL_ABORT,
   KILLED,
-  ERROR
+  ERROR,
+  REBOOT
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java?rev=1464815&r1=1464814&r2=1464815&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java Fri Apr  5 02:43:29 2013
@@ -54,6 +54,6 @@ public enum JobEventType {
   JOB_TASK_ATTEMPT_FETCH_FAILURE,
   
   //Producer:RMContainerAllocator
-  JOB_UPDATED_NODES
-  
+  JOB_UPDATED_NODES,
+  JOB_AM_REBOOT
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1464815&r1=1464814&r2=1464815&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Fri Apr  5 02:43:29 2013
@@ -215,6 +215,8 @@ public class JobImpl implements org.apac
       DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
   private static final InternalErrorTransition
       INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
+  private static final InternalRebootTransition
+      INTERNAL_REBOOT_TRANSITION = new InternalRebootTransition();
   private static final TaskAttemptCompletedEventTransition
       TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
           new TaskAttemptCompletedEventTransition();
@@ -246,6 +248,9 @@ public class JobImpl implements org.apac
           .addTransition(JobStateInternal.NEW, JobStateInternal.ERROR,
               JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+          .addTransition(JobStateInternal.NEW, JobStateInternal.REBOOT,
+              JobEventType.JOB_AM_REBOOT,
+              INTERNAL_REBOOT_TRANSITION)
           // Ignore-able events
           .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
               JobEventType.JOB_UPDATED_NODES)
@@ -265,6 +270,9 @@ public class JobImpl implements org.apac
           .addTransition(JobStateInternal.INITED, JobStateInternal.ERROR,
               JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+          .addTransition(JobStateInternal.INITED, JobStateInternal.REBOOT,
+              JobEventType.JOB_AM_REBOOT,
+              INTERNAL_REBOOT_TRANSITION)
           // Ignore-able events
           .addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
               JobEventType.JOB_UPDATED_NODES)
@@ -287,6 +295,9 @@ public class JobImpl implements org.apac
           .addTransition(JobStateInternal.SETUP, JobStateInternal.ERROR,
               JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+          .addTransition(JobStateInternal.SETUP, JobStateInternal.REBOOT,
+              JobEventType.JOB_AM_REBOOT,
+              INTERNAL_REBOOT_TRANSITION)
           // Ignore-able events
           .addTransition(JobStateInternal.SETUP, JobStateInternal.SETUP,
               JobEventType.JOB_UPDATED_NODES)
@@ -327,6 +338,9 @@ public class JobImpl implements org.apac
               JobStateInternal.RUNNING,
               JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+          .addTransition(JobStateInternal.RUNNING, JobStateInternal.REBOOT,
+              JobEventType.JOB_AM_REBOOT,
+              INTERNAL_REBOOT_TRANSITION)
 
           // Transitions from KILL_WAIT state.
           .addTransition
@@ -352,7 +366,8 @@ public class JobImpl implements org.apac
               EnumSet.of(JobEventType.JOB_KILL,
                   JobEventType.JOB_UPDATED_NODES,
                   JobEventType.JOB_MAP_TASK_RESCHEDULED,
-                  JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
+                  JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
+                  JobEventType.JOB_AM_REBOOT))
 
           // Transitions from COMMITTING state
           .addTransition(JobStateInternal.COMMITTING,
@@ -377,7 +392,10 @@ public class JobImpl implements org.apac
           .addTransition(JobStateInternal.COMMITTING,
               JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
-          // Ignore-able events
+          .addTransition(JobStateInternal.COMMITTING, JobStateInternal.REBOOT,
+              JobEventType.JOB_AM_REBOOT,
+              INTERNAL_REBOOT_TRANSITION)
+              // Ignore-able events
           .addTransition(JobStateInternal.COMMITTING,
               JobStateInternal.COMMITTING,
               EnumSet.of(JobEventType.JOB_UPDATED_NODES,
@@ -397,7 +415,8 @@ public class JobImpl implements org.apac
           .addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,
               EnumSet.of(JobEventType.JOB_KILL, 
                   JobEventType.JOB_UPDATED_NODES,
-                  JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
+                  JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
+                  JobEventType.JOB_AM_REBOOT))
 
           // Transitions from FAIL_ABORT state
           .addTransition(JobStateInternal.FAIL_ABORT,
@@ -425,7 +444,8 @@ public class JobImpl implements org.apac
                   JobEventType.JOB_MAP_TASK_RESCHEDULED,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
                   JobEventType.JOB_COMMIT_COMPLETED,
-                  JobEventType.JOB_COMMIT_FAILED))
+                  JobEventType.JOB_COMMIT_FAILED,
+                  JobEventType.JOB_AM_REBOOT))
 
           // Transitions from KILL_ABORT state
           .addTransition(JobStateInternal.KILL_ABORT,
@@ -452,7 +472,8 @@ public class JobImpl implements org.apac
                   JobEventType.JOB_SETUP_COMPLETED,
                   JobEventType.JOB_SETUP_FAILED,
                   JobEventType.JOB_COMMIT_COMPLETED,
-                  JobEventType.JOB_COMMIT_FAILED))
+                  JobEventType.JOB_COMMIT_FAILED,
+                  JobEventType.JOB_AM_REBOOT))
 
           // Transitions from FAILED state
           .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
@@ -476,7 +497,8 @@ public class JobImpl implements org.apac
                   JobEventType.JOB_SETUP_FAILED,
                   JobEventType.JOB_COMMIT_COMPLETED,
                   JobEventType.JOB_COMMIT_FAILED,
-                  JobEventType.JOB_ABORT_COMPLETED))
+                  JobEventType.JOB_ABORT_COMPLETED,
+                  JobEventType.JOB_AM_REBOOT))
 
           // Transitions from KILLED state
           .addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
@@ -498,7 +520,8 @@ public class JobImpl implements org.apac
                   JobEventType.JOB_SETUP_FAILED,
                   JobEventType.JOB_COMMIT_COMPLETED,
                   JobEventType.JOB_COMMIT_FAILED,
-                  JobEventType.JOB_ABORT_COMPLETED))
+                  JobEventType.JOB_ABORT_COMPLETED,
+                  JobEventType.JOB_AM_REBOOT))
 
           // No transitions from INTERNAL_ERROR state. Ignore all.
           .addTransition(
@@ -517,9 +540,33 @@ public class JobImpl implements org.apac
                   JobEventType.JOB_COMMIT_COMPLETED,
                   JobEventType.JOB_COMMIT_FAILED,
                   JobEventType.JOB_ABORT_COMPLETED,
-                  JobEventType.INTERNAL_ERROR))
+                  JobEventType.INTERNAL_ERROR,
+                  JobEventType.JOB_AM_REBOOT))
           .addTransition(JobStateInternal.ERROR, JobStateInternal.ERROR,
               JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
+
+          // No transitions from AM_REBOOT state. Ignore all.
+          .addTransition(
+              JobStateInternal.REBOOT,
+              JobStateInternal.REBOOT,
+              EnumSet.of(JobEventType.JOB_INIT,
+                  JobEventType.JOB_KILL,
+                  JobEventType.JOB_TASK_COMPLETED,
+                  JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
+                  JobEventType.JOB_MAP_TASK_RESCHEDULED,
+                  JobEventType.JOB_DIAGNOSTIC_UPDATE,
+                  JobEventType.JOB_UPDATED_NODES,
+                  JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
+                  JobEventType.JOB_SETUP_COMPLETED,
+                  JobEventType.JOB_SETUP_FAILED,
+                  JobEventType.JOB_COMMIT_COMPLETED,
+                  JobEventType.JOB_COMMIT_FAILED,
+                  JobEventType.JOB_ABORT_COMPLETED,
+                  JobEventType.INTERNAL_ERROR,
+                  JobEventType.JOB_AM_REBOOT))
+          .addTransition(JobStateInternal.REBOOT, JobStateInternal.REBOOT,
+              JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
+
           // create the topology tables
           .installTopology();
  
@@ -904,6 +951,8 @@ public class JobImpl implements org.apac
       return JobState.RUNNING;
     case FAIL_ABORT:
       return JobState.FAILED;
+    case REBOOT:
+      return JobState.ERROR;
     default:
       return JobState.valueOf(smState.name());
     }
@@ -972,6 +1021,7 @@ public class JobImpl implements org.apac
       case KILLED:
         metrics.killedJob(this);
         break;
+      case REBOOT:
       case ERROR:
       case FAILED:
         metrics.failedJob(this);
@@ -1898,8 +1948,17 @@ public class JobImpl implements org.apac
     }
   }
   
-  private static class InternalErrorTransition implements
+  private static class InternalTerminationTransition implements
       SingleArcTransition<JobImpl, JobEvent> {
+    JobStateInternal terminationState = null;
+    String jobHistoryString = null;
+    public InternalTerminationTransition(JobStateInternal stateInternal,
+        String jobHistoryString) {
+      this.terminationState = stateInternal;
+      //mostly a hack for jbhistoryserver
+      this.jobHistoryString = jobHistoryString;
+    }
+
     @Override
     public void transition(JobImpl job, JobEvent event) {
       //TODO Is this JH event required.
@@ -1907,9 +1966,21 @@ public class JobImpl implements org.apac
       JobUnsuccessfulCompletionEvent failedEvent =
           new JobUnsuccessfulCompletionEvent(job.oldJobId,
               job.finishTime, 0, 0,
-              JobStateInternal.ERROR.toString());
+              jobHistoryString);
       job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
-      job.finished(JobStateInternal.ERROR);
+      job.finished(terminationState);
+    }
+  }
+
+  private static class InternalErrorTransition extends InternalTerminationTransition {
+    public InternalErrorTransition(){
+      super(JobStateInternal.ERROR, JobStateInternal.ERROR.toString());
+    }
+  }
+
+  private static class InternalRebootTransition extends InternalTerminationTransition  {
+    public InternalRebootTransition(){
+      super(JobStateInternal.REBOOT, JobStateInternal.ERROR.toString());
     }
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1464815&r1=1464814&r2=1464815&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Fri Apr  5 02:43:29 2013
@@ -123,7 +123,7 @@ public class LocalContainerAllocator ext
       // This can happen if the RM has been restarted. If it is in that state,
       // this application must clean itself up.
       eventHandler.handle(new JobEvent(this.getJob().getID(),
-                                       JobEventType.INTERNAL_ERROR));
+                                       JobEventType.JOB_AM_REBOOT));
       throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
                                this.getContext().getApplicationID());
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1464815&r1=1464814&r2=1464815&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri Apr  5 02:43:29 2013
@@ -574,7 +574,7 @@ public class RMContainerAllocator extend
       // This can happen if the RM has been restarted. If it is in that state,
       // this application must clean itself up.
       eventHandler.handle(new JobEvent(this.getJob().getID(),
-                                       JobEventType.INTERNAL_ERROR));
+                                       JobEventType.JOB_AM_REBOOT));
       throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
                                this.getContext().getApplicationID());
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1464815&r1=1464814&r2=1464815&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Fri Apr  5 02:43:29 2013
@@ -33,7 +33,9 @@ import junit.framework.TestCase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
@@ -45,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -86,9 +89,68 @@ import org.junit.Test;
      attemptId.setApplicationId(appId);
      JobId jobid = recordFactory.newRecordInstance(JobId.class);
      jobid.setAppId(appId);
-     MRAppMaster appMaster = new TestMRApp(attemptId);
+     ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+         JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
+     appMaster.init(conf);
+     appMaster.start();
+     appMaster.shutDownJob();
+     //test whether notifyIsLastAMRetry called
+     Assert.assertEquals(true, ((TestMRApp)appMaster).getTestIsLastAMRetry());
+     verify(fs).delete(stagingJobPath, true);
+   }
+
+   @Test (timeout = 30000)
+   public void testNoDeletionofStagingOnReboot() throws IOException {
+     conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+     fs = mock(FileSystem.class);
+     when(fs.delete(any(Path.class),anyBoolean())).thenReturn(true);
+     String user = UserGroupInformation.getCurrentUser().getShortUserName();
+     Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+     when(fs.exists(stagingDir)).thenReturn(true);
+     ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
+         ApplicationAttemptId.class);
+     attemptId.setAttemptId(0);
+     ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+     appId.setClusterTimestamp(System.currentTimeMillis());
+     appId.setId(0);
+     attemptId.setApplicationId(appId);
+     ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+         JobStateInternal.REBOOT, 4);
+     appMaster.init(conf);
+     appMaster.start();
+     //shutdown the job, not the lastRetry
+     appMaster.shutDownJob();
+     //test whether notifyIsLastAMRetry called
+     Assert.assertEquals(false, ((TestMRApp)appMaster).getTestIsLastAMRetry());
+     verify(fs, times(0)).delete(stagingJobPath, true);
+   }
+
+   @Test (timeout = 30000)
+   public void testDeletionofStagingOnReboot() throws IOException {
+     conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+     fs = mock(FileSystem.class);
+     when(fs.delete(any(Path.class),anyBoolean())).thenReturn(true);
+     String user = UserGroupInformation.getCurrentUser().getShortUserName();
+     Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+     when(fs.exists(stagingDir)).thenReturn(true);
+     ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
+         ApplicationAttemptId.class);
+     attemptId.setAttemptId(1);
+     ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+     appId.setClusterTimestamp(System.currentTimeMillis());
+     appId.setId(0);
+     attemptId.setApplicationId(appId);
+     ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+         JobStateInternal.REBOOT, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
      appMaster.init(conf);
+     appMaster.start();
+     //shutdown the job, is lastRetry
      appMaster.shutDownJob();
+     //test whether notifyIsLastAMRetry called
+     Assert.assertEquals(true, ((TestMRApp)appMaster).getTestIsLastAMRetry());
      verify(fs).delete(stagingJobPath, true);
    }
    
@@ -151,6 +213,8 @@ import org.junit.Test;
 
    private class TestMRApp extends MRAppMaster {
      ContainerAllocator allocator;
+     boolean testIsLastAMRetry = false;
+     JobStateInternal jobStateInternal;
 
      public TestMRApp(ApplicationAttemptId applicationAttemptId, 
          ContainerAllocator allocator, int maxAppAttempts) {
@@ -160,9 +224,11 @@ import org.junit.Test;
        this.allocator = allocator;
      }
 
-     public TestMRApp(ApplicationAttemptId applicationAttemptId) {
-       this(applicationAttemptId, null,
-           MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
+     public TestMRApp(ApplicationAttemptId applicationAttemptId,
+         ContainerAllocator allocator, JobStateInternal jobStateInternal,
+             int maxAppAttempts) {
+       this(applicationAttemptId, allocator, maxAppAttempts);
+       this.jobStateInternal = jobStateInternal;
      }
 
      @Override
@@ -180,6 +246,31 @@ import org.junit.Test;
      }
 
      @Override
+     protected Job createJob(Configuration conf, JobStateInternal forcedState,
+         String diagnostic) {
+       JobImpl jobImpl = mock(JobImpl.class);
+       when(jobImpl.getInternalState()).thenReturn(this.jobStateInternal);
+       JobID jobID = JobID.forName("job_1234567890000_0001");
+       JobId jobId = TypeConverter.toYarn(jobID);
+       when(jobImpl.getID()).thenReturn(jobId);
+       ((AppContext) getContext())
+           .getAllJobs().put(jobImpl.getID(), jobImpl);
+       return jobImpl;
+     }
+
+     @Override
+     public void start() {
+       super.start();
+       DefaultMetricsSystem.shutdown();
+     }
+
+     @Override
+     public void notifyIsLastAMRetry(boolean isLastAMRetry){
+       testIsLastAMRetry = isLastAMRetry;
+       super.notifyIsLastAMRetry(isLastAMRetry);
+     }
+
+     @Override
      public RMHeartbeatHandler getRMHeartbeatHandler() {
        return getStubbedHeartbeatHandler(getContext());
      }
@@ -197,6 +288,9 @@ import org.junit.Test;
      protected void downloadTokensAndSetupUGI(Configuration conf) {
      }
 
+     public boolean getTestIsLastAMRetry(){
+       return testIsLastAMRetry;
+     }
    }
 
   private final class MRAppTestCleanup extends MRApp {
@@ -288,7 +382,7 @@ import org.junit.Test;
     };
   }
 
-  @Test
+  @Test(timeout=20000)
   public void testStagingCleanupOrder() throws Exception {
     MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,
         this.getClass().getName(), true);

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1464815&r1=1464814&r2=1464815&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Fri Apr  5 02:43:29 2013
@@ -193,6 +193,68 @@ public class TestJobImpl {
   }
 
   @Test(timeout=20000)
+  public void testRebootedDuringSetup() throws Exception{
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    OutputCommitter committer = new StubbedOutputCommitter() {
+      @Override
+      public synchronized void setupJob(JobContext jobContext)
+          throws IOException {
+        while(!Thread.interrupted()){
+          try{
+            wait();
+          }catch (InterruptedException e) {
+          }
+        }
+      }
+    };
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    JobId jobId = job.getID();
+    job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
+    assertJobState(job, JobStateInternal.INITED);
+    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    assertJobState(job, JobStateInternal.SETUP);
+
+    job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
+    assertJobState(job, JobStateInternal.REBOOT);
+    dispatcher.stop();
+    commitHandler.stop();
+  }
+
+  @Test(timeout=20000)
+  public void testRebootedDuringCommit() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    CyclicBarrier syncBarrier = new CyclicBarrier(2);
+    OutputCommitter committer = new WaitingOutputCommitter(syncBarrier, true);
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+    completeJobTasks(job);
+    assertJobState(job, JobStateInternal.COMMITTING);
+
+    syncBarrier.await();
+    job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
+    assertJobState(job, JobStateInternal.REBOOT);
+    dispatcher.stop();
+    commitHandler.stop();
+  }
+
+  @Test(timeout=20000)
   public void testKilledDuringSetup() throws Exception {
     Configuration conf = new Configuration();
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/impl/pb/client/MRClientProtocolPBClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/impl/pb/client/MRClientProtocolPBClientImpl.java?rev=1464815&r1=1464814&r2=1464815&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/impl/pb/client/MRClientProtocolPBClientImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/impl/pb/client/MRClientProtocolPBClientImpl.java Fri Apr  5 02:43:29 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce.v2.api.impl.pb.client;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
@@ -101,7 +102,8 @@ import org.apache.hadoop.yarn.exceptions
 
 import com.google.protobuf.ServiceException;
 
-public class MRClientProtocolPBClientImpl implements MRClientProtocol {
+public class MRClientProtocolPBClientImpl implements MRClientProtocol,
+    Closeable {
 
   protected MRClientProtocolPB proxy;
   
@@ -118,6 +120,13 @@ public class MRClientProtocolPBClientImp
   }
 
   @Override
+  public void close() {
+    if (this.proxy != null) {
+      RPC.stopProxy(this.proxy);
+    }
+  }
+
+  @Override
   public GetJobReportResponse getJobReport(GetJobReportRequest request)
       throws YarnRemoteException {
     GetJobReportRequestProto requestProto = ((GetJobReportRequestPBImpl)request).getProto();

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java?rev=1464815&r1=1464814&r2=1464815&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java Fri Apr  5 02:43:29 2013
@@ -138,15 +138,6 @@ import org.apache.hadoop.util.ToolRunner
 public class JobClient extends CLI {
   public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
   private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 
-  /* notes that get delegation token was called. Again this is hack for oozie 
-   * to make sure we add history server delegation tokens to the credentials
-   *  for the job. Since the api only allows one delegation token to be returned, 
-   *  we have to add this hack.
-   */
-  private boolean getDelegationTokenCalled = false;
-  /* do we need a HS delegation token for this client */
-  static final String HS_DELEGATION_TOKEN_REQUIRED 
-      = "mapreduce.history.server.delegationtoken.required";
   
   static{
     ConfigUtil.loadResources();
@@ -569,10 +560,6 @@ public class JobClient extends CLI {
     try {
       conf.setBooleanIfUnset("mapred.mapper.new-api", false);
       conf.setBooleanIfUnset("mapred.reducer.new-api", false);
-      if (getDelegationTokenCalled) {
-        conf.setBoolean(HS_DELEGATION_TOKEN_REQUIRED, getDelegationTokenCalled);
-        getDelegationTokenCalled = false;
-      }
       Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
         @Override
         public Job run() throws IOException, ClassNotFoundException, 
@@ -1173,7 +1160,6 @@ public class JobClient extends CLI {
    */
   public Token<DelegationTokenIdentifier> 
     getDelegationToken(final Text renewer) throws IOException, InterruptedException {
-    getDelegationTokenCalled = true;
     return clientUgi.doAs(new 
         PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
       public Token<DelegationTokenIdentifier> run() throws IOException, 

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1462698-1464807

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1464815&r1=1464814&r2=1464815&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Fri Apr  5 02:43:29 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -87,6 +88,10 @@ public class ResourceMgrDelegate extends
     return oldMetrics;
   }
 
+  InetSocketAddress getConnectAddress() {
+    return rmAddress;
+  }
+  
   @SuppressWarnings("rawtypes")
   public Token getDelegationToken(Text renewer) throws IOException,
       InterruptedException {

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1464815&r1=1464814&r2=1464815&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Fri Apr  5 02:43:29 2013
@@ -60,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
@@ -81,6 +82,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.client.RMTokenSelector;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.ProtoUtils;
@@ -90,7 +92,7 @@ import com.google.common.annotations.Vis
 /**
  * This class enables the current JobClient (0.22 hadoop) to run on YARN.
  */
-@SuppressWarnings({ "rawtypes", "unchecked" })
+@SuppressWarnings("unchecked")
 public class YARNRunner implements ClientProtocol {
 
   private static final Log LOG = LogFactory.getLog(YARNRunner.class);
@@ -101,14 +103,6 @@ public class YARNRunner implements Clien
   private Configuration conf;
   private final FileContext defaultFileContext;
   
-  /* usually is false unless the jobclient get delegation token is 
-   *  called. This is a hack wherein we do return a token from RM 
-   *  on getDelegationtoken but due to the restricted api on jobclient
-   *  we just add a job history DT token when submitting a job.
-   */
-  private static final boolean DEFAULT_HS_DELEGATION_TOKEN_REQUIRED = 
-      false;
-  
   /**
    * Yarn runner incapsulates the client interface of
    * yarn
@@ -186,6 +180,28 @@ public class YARNRunner implements Clien
   }
 
   @VisibleForTesting
+  void addHistoyToken(Credentials ts) throws IOException, InterruptedException {
+    /* check if we have a hsproxy, if not, no need */
+    MRClientProtocol hsProxy = clientCache.getInitializedHSProxy();
+    if (UserGroupInformation.isSecurityEnabled() && (hsProxy != null)) {
+      /*
+       * note that get delegation token was called. Again this is hack for oozie
+       * to make sure we add history server delegation tokens to the credentials
+       */
+      RMTokenSelector tokenSelector = new RMTokenSelector();
+      Text service = SecurityUtil.buildTokenService(resMgrDelegate
+          .getConnectAddress());
+      if (tokenSelector.selectToken(service, ts.getAllTokens()) != null) {
+        Text hsService = SecurityUtil.buildTokenService(hsProxy
+            .getConnectAddress());
+        if (ts.getToken(hsService) == null) {
+          ts.addToken(hsService, getDelegationTokenFromHS(hsProxy));
+        }
+      }
+    }
+  }
+  
+  @VisibleForTesting
   Token<?> getDelegationTokenFromHS(MRClientProtocol hsProxy)
       throws IOException, InterruptedException {
     GetDelegationTokenRequest request = recordFactory
@@ -263,18 +279,8 @@ public class YARNRunner implements Clien
   public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
   throws IOException, InterruptedException {
     
-    /* check if we have a hsproxy, if not, no need */
-    MRClientProtocol hsProxy = clientCache.getInitializedHSProxy();
-    if (hsProxy != null) {
-      // JobClient will set this flag if getDelegationToken is called, if so, get
-      // the delegation tokens for the HistoryServer also.
-      if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED, 
-          DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) {
-        Token hsDT = getDelegationTokenFromHS(hsProxy);
-        ts.addToken(hsDT.getService(), hsDT);
-      }
-    }
-
+    addHistoyToken(ts);
+    
     // Upload only in security mode: TODO
     Path applicationTokensFile =
         new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java?rev=1464815&r1=1464814&r2=1464815&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java Fri Apr  5 02:43:29 2013
@@ -20,8 +20,10 @@ package org.apache.hadoop.mapred;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -30,6 +32,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.List;
@@ -39,28 +42,24 @@ import junit.framework.TestCase;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.ClientCache;
-import org.apache.hadoop.mapred.ClientServiceDelegate;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Master;
-import org.apache.hadoop.mapred.ResourceMgrDelegate;
-import org.apache.hadoop.mapred.YARNRunner;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobPriority;
 import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -69,21 +68,27 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.DelegationToken;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
-import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
 import org.apache.log4j.Appender;
 import org.apache.log4j.Layout;
 import org.apache.log4j.Logger;
@@ -146,7 +151,7 @@ public class TestYARNRunner extends Test
    }
 
 
-  @Test
+  @Test(timeout=20000)
   public void testJobKill() throws Exception {
     clientDelegate = mock(ClientServiceDelegate.class);
     when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new
@@ -171,7 +176,7 @@ public class TestYARNRunner extends Test
     verify(clientDelegate).killJob(jobId);
   }
 
-  @Test
+  @Test(timeout=20000)
   public void testJobSubmissionFailure() throws Exception {
     when(resourceMgrDelegate.submitApplication(any(ApplicationSubmissionContext.class))).
     thenReturn(appId);
@@ -193,7 +198,7 @@ public class TestYARNRunner extends Test
     }
   }
 
-  @Test
+  @Test(timeout=20000)
   public void testResourceMgrDelegate() throws Exception {
     /* we not want a mock of resource mgr delegate */
     final ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class);
@@ -259,8 +264,88 @@ public class TestYARNRunner extends Test
     delegate.getQueueAclsForCurrentUser();
     verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class));
   }
-  
-  @Test
+
+  @Test(timeout=20000)
+  public void testGetHSDelegationToken() throws Exception {
+    try {
+      Configuration conf = new Configuration();
+
+      // Setup mock service
+      InetSocketAddress mockRmAddress = new InetSocketAddress("localhost", 4444);
+      Text rmTokenSevice = SecurityUtil.buildTokenService(mockRmAddress);
+
+      InetSocketAddress mockHsAddress = new InetSocketAddress("localhost", 9200);
+      Text hsTokenSevice = SecurityUtil.buildTokenService(mockHsAddress);
+
+      // Setup mock rm token
+      RMDelegationTokenIdentifier tokenIdentifier = new RMDelegationTokenIdentifier(
+          new Text("owner"), new Text("renewer"), new Text("real"));
+      Token<RMDelegationTokenIdentifier> token = new Token<RMDelegationTokenIdentifier>(
+          new byte[0], new byte[0], tokenIdentifier.getKind(), rmTokenSevice);
+      token.setKind(RMDelegationTokenIdentifier.KIND_NAME);
+
+      // Setup mock history token
+      DelegationToken historyToken = BuilderUtils.newDelegationToken(
+          new byte[0], MRDelegationTokenIdentifier.KIND_NAME.toString(),
+          new byte[0], hsTokenSevice.toString());
+      GetDelegationTokenResponse getDtResponse = Records
+          .newRecord(GetDelegationTokenResponse.class);
+      getDtResponse.setDelegationToken(historyToken);
+
+      // mock services
+      MRClientProtocol mockHsProxy = mock(MRClientProtocol.class);
+      doReturn(mockHsAddress).when(mockHsProxy).getConnectAddress();
+      doReturn(getDtResponse).when(mockHsProxy).getDelegationToken(
+          any(GetDelegationTokenRequest.class));
+
+      ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
+      doReturn(mockRmAddress).when(rmDelegate).getConnectAddress();
+
+      ClientCache clientCache = mock(ClientCache.class);
+      doReturn(mockHsProxy).when(clientCache).getInitializedHSProxy();
+
+      Credentials creds = new Credentials();
+
+      YARNRunner yarnRunner = new YARNRunner(conf, rmDelegate, clientCache);
+
+      // No HS token if no RM token
+      yarnRunner.addHistoyToken(creds);
+      verify(mockHsProxy, times(0)).getDelegationToken(
+          any(GetDelegationTokenRequest.class));
+
+      // No HS token if RM token, but secirity disabled.
+      creds.addToken(new Text("rmdt"), token);
+      yarnRunner.addHistoyToken(creds);
+      verify(mockHsProxy, times(0)).getDelegationToken(
+          any(GetDelegationTokenRequest.class));
+
+      conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION,
+          "kerberos");
+      UserGroupInformation.setConfiguration(conf);
+      creds = new Credentials();
+
+      // No HS token if no RM token, security enabled
+      yarnRunner.addHistoyToken(creds);
+      verify(mockHsProxy, times(0)).getDelegationToken(
+          any(GetDelegationTokenRequest.class));
+
+      // HS token if RM token present, security enabled
+      creds.addToken(new Text("rmdt"), token);
+      yarnRunner.addHistoyToken(creds);
+      verify(mockHsProxy, times(1)).getDelegationToken(
+          any(GetDelegationTokenRequest.class));
+
+      // No additional call to get HS token if RM and HS token present
+      yarnRunner.addHistoyToken(creds);
+      verify(mockHsProxy, times(1)).getDelegationToken(
+          any(GetDelegationTokenRequest.class));
+    } finally {
+      // Back to defaults.
+      UserGroupInformation.setConfiguration(new Configuration());
+    }
+  }
+
+  @Test(timeout=20000)
   public void testHistoryServerToken() throws Exception {
     //Set the master principal in the config
     conf.set(YarnConfiguration.RM_PRINCIPAL,"foo@LOCAL");
@@ -303,7 +388,7 @@ public class TestYARNRunner extends Test
         });
   }
 
-  @Test
+  @Test(timeout=20000)
   public void testAMAdminCommandOpts() throws Exception {
     JobConf jobConf = new JobConf();
     
@@ -366,7 +451,7 @@ public class TestYARNRunner extends Test
       assertTrue("AM admin command opts is after user command opts.", adminIndex < userIndex);
     }
   }
-  @Test
+  @Test(timeout=20000)
   public void testWarnCommandOpts() throws Exception {
     Logger logger = Logger.getLogger(YARNRunner.class);
     



Mime
View raw message