hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l..@apache.org
Subject svn commit: r1136595 - in /hadoop/common/branches/MR-279/mapreduce: CHANGES.txt mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
Date Thu, 16 Jun 2011 18:43:45 GMT
Author: llu
Date: Thu Jun 16 18:43:45 2011
New Revision: 1136595

URL: http://svn.apache.org/viewvc?rev=1136595&view=rev
Log:
Fix NPE when requesting attempts for completed jobs. (Siddharth Seth via llu)

Modified:
    hadoop/common/branches/MR-279/mapreduce/CHANGES.txt
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java

Modified: hadoop/common/branches/MR-279/mapreduce/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/CHANGES.txt?rev=1136595&r1=1136594&r2=1136595&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/CHANGES.txt (original)
+++ hadoop/common/branches/MR-279/mapreduce/CHANGES.txt Thu Jun 16 18:43:45 2011
@@ -5,6 +5,8 @@ Trunk (unreleased changes)
 
     MAPREDUCE-279
 
+    Fix NPE when requesting attempts for completed jobs. (Siddharth Seth via llu)
+
     Changes a couple of usages of FileContext to FileSystem in TaskAttemptImpl 
     to handle distributed cache path resolutions on non-default filesystems.
     (ddas)

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1136595&r1=1136594&r2=1136595&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
Thu Jun 16 18:43:45 2011
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.net.UnknownHostException;
 import java.security.PrivilegedAction;
 import java.util.List;
 
@@ -40,11 +41,11 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -98,6 +99,9 @@ public class ClientServiceDelegate {
   }
 
   private void refreshProxy() throws YarnRemoteException {
+    //TODO RM NPEs for unknown jobs. History may still be aware.
+    // Possibly allow nulls through the PB tunnel, otherwise deal with an exception
+    // and redirect to the history server.
     ApplicationMaster appMaster = rm.getApplicationMaster(currentAppId);
     while (!ApplicationState.COMPLETED.equals(appMaster.getState()) &&
         !ApplicationState.FAILED.equals(appMaster.getState()) && 
@@ -160,8 +164,12 @@ public class ClientServiceDelegate {
           JHConfig.DEFAULT_HS_BIND_ADDRESS);
       LOG.info("Application state is completed. " +
           "Redirecting to job history server " + serviceAddr);
-      //TODO:
-      serviceHttpAddr = "";
+      try {
+        serviceHttpAddr = JobHistoryUtils.getHistoryUrl(conf, currentAppId);
+      } catch (UnknownHostException e) {
+        LOG.warn("Unable to get history url", e);
+        serviceHttpAddr = "UNKNOWN";
+      }
       try {
         instantiateHistoryProxy(serviceAddr);
         return;
@@ -222,7 +230,12 @@ public class ClientServiceDelegate {
       try {
         GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class);
         request.setJobId(jobID);
-        return TypeConverter.fromYarn(getRefreshedProxy(arg0).getCounters(request).getCounters());
+        MRClientProtocol protocol = getRefreshedProxy(arg0);
+        if (protocol == null) {
+          /* no History to connect to, fake counters */
+          return new org.apache.hadoop.mapreduce.Counters();
+        }
+        return TypeConverter.fromYarn(protocol.getCounters(request).getCounters());
       } catch(YarnRemoteException yre) {
         LOG.warn(RPCUtil.toString(yre));
         throw yre;
@@ -231,8 +244,7 @@ public class ClientServiceDelegate {
   }
 
   public String getJobHistoryDir() throws IOException, InterruptedException {
-    //TODO fix this
-    return "";
+    return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
   }
 
   public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
@@ -266,7 +278,7 @@ public class ClientServiceDelegate {
         if (protocol == null) {
           return new TaskCompletionEvent[0];
         }
-        list = getRefreshedProxy(arg0).getTaskAttemptCompletionEvents(request).getCompletionEventList();
+        list = protocol.getTaskAttemptCompletionEvents(request).getCompletionEventList();
       } catch(YarnRemoteException yre) {
         LOG.warn(RPCUtil.toString(yre));
         throw yre;
@@ -284,8 +296,13 @@ public class ClientServiceDelegate {
     List<String> list = null;
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(arg0);
     GetDiagnosticsRequest request = recordFactory.newRecordInstance(GetDiagnosticsRequest.class);
+    MRClientProtocol protocol;
     try {
       request.setTaskAttemptId(attemptID);
+      protocol = getProxy(arg0.getJobID());
+      if (protocol == null) {
+        return new String[0];
+      }
       list = getProxy(arg0.getJobID()).getDiagnostics(request).getDiagnosticsList();
     } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
       LOG.warn(RPCUtil.toString(yre));
@@ -293,8 +310,11 @@ public class ClientServiceDelegate {
     } catch(Exception e) {
       LOG.debug("Failed to contact application master ", e);
       try {
-        request.setTaskAttemptId(attemptID);
-        list = getRefreshedProxy(arg0.getJobID()).getDiagnostics(request).getDiagnosticsList();
+        protocol = getRefreshedProxy(arg0.getJobID());
+        if (protocol == null) {
+          return new String[0];
+        }
+        list = protocol.getDiagnostics(request).getDiagnosticsList();
       } catch(YarnRemoteException yre) {
         LOG.warn(RPCUtil.toString(yre));
         throw yre;
@@ -361,7 +381,7 @@ public class ClientServiceDelegate {
         if (protocol == null)  {
           return createFakeJobReport(currentAppState, jobId, jobFile);
         }
-        report = getRefreshedProxy(oldJobID).getJobReport(request).getJobReport();
+        report = protocol.getJobReport(request).getJobReport();
       } catch(YarnRemoteException yre) {
         LOG.warn(RPCUtil.toString(yre));
         throw yre;
@@ -375,9 +395,14 @@ public class ClientServiceDelegate {
     List<org.apache.hadoop.mapreduce.v2.api.records.TaskReport> taskReports = null;
     org.apache.hadoop.mapreduce.v2.api.records.JobId nJobID = TypeConverter.toYarn(jobID);
     GetTaskReportsRequest request = recordFactory.newRecordInstance(GetTaskReportsRequest.class);
+    MRClientProtocol protocol = null;
     try {
       request.setJobId(nJobID);
       request.setTaskType(TypeConverter.toYarn(taskType));
+      protocol = getProxy(jobID);
+      if (protocol == null) {
+        return new org.apache.hadoop.mapreduce.TaskReport[0];
+      }
       taskReports = getProxy(jobID).getTaskReports(request).getTaskReportList();
     } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
       LOG.warn(RPCUtil.toString(yre));
@@ -387,7 +412,11 @@ public class ClientServiceDelegate {
       try {
         request.setJobId(nJobID);
         request.setTaskType(TypeConverter.toYarn(taskType));
-        taskReports = getRefreshedProxy(jobID).getTaskReports(request).getTaskReportList();
+        protocol = getRefreshedProxy(jobID);
+        if (protocol == null) {
+          return new org.apache.hadoop.mapreduce.TaskReport[0];
+        }
+        taskReports = protocol.getTaskReports(request).getTaskReportList();
       } catch(YarnRemoteException yre) {
         LOG.warn(RPCUtil.toString(yre));
         throw yre;
@@ -403,6 +432,10 @@ public class ClientServiceDelegate {
     = TypeConverter.toYarn(taskAttemptID);
     KillTaskAttemptRequest killRequest = recordFactory.newRecordInstance(KillTaskAttemptRequest.class);
     FailTaskAttemptRequest failRequest = recordFactory.newRecordInstance(FailTaskAttemptRequest.class);
+    MRClientProtocol protocol = getProxy(taskAttemptID.getJobID());
+    if (protocol == null) {
+      return false;
+    }
     try {
       if (fail) {
         failRequest.setTaskAttemptId(attemptID);



Mime
View raw message