From mapreduce-commits-return-2603-apmail-hadoop-mapreduce-commits-archive=hadoop.apache.org@hadoop.apache.org Tue Oct 25 06:27:49 2011 Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EF8E8974C for ; Tue, 25 Oct 2011 06:27:49 +0000 (UTC) Received: (qmail 16492 invoked by uid 500); 25 Oct 2011 06:27:49 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 16465 invoked by uid 500); 25 Oct 2011 06:27:49 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 16457 invoked by uid 99); 25 Oct 2011 06:27:48 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Oct 2011 06:27:48 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Oct 2011 06:27:45 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id B13812388978; Tue, 25 Oct 2011 06:27:25 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1188529 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src... Date: Tue, 25 Oct 2011 06:27:25 -0000 To: mapreduce-commits@hadoop.apache.org From: acmurthy@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111025062725.B13812388978@eris.apache.org> Author: acmurthy Date: Tue Oct 25 06:27:24 2011 New Revision: 1188529 URL: http://svn.apache.org/viewvc?rev=1188529&view=rev Log: Merge -c 1188528 from trunk to branch-0.23 to complete fix for MAPREDUCE-2821. Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java - copied unchanged from r1188528, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/NormalizedResourceEvent.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1188529&r1=1188528&r2=1188529&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue Oct 25 06:27:24 2011 @@ -1703,6 +1703,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2746. Yarn servers can't communicate with each other with hadoop.security.authorization set to true (acmurthy via mahadev) + MAPREDUCE-2821. Added missing fields (resourcePerMap & resourcePerReduce) + to JobSummary logs. (mahadev via acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1188529&r1=1188528&r2=1188529&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Tue Oct 25 06:27:24 2011 @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.Counter; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; @@ -91,7 +92,8 @@ public class JobHistoryEventHandler exte } /* (non-Javadoc) - * @see org.apache.hadoop.yarn.service.AbstractService#init(org.apache.hadoop.conf.Configuration) + * @see org.apache.hadoop.yarn.service.AbstractService#init(org. + * apache.hadoop.conf.Configuration) * Initializes the FileSystem and Path objects for the log and done directories. * Creates these directories if they do not already exist. */ @@ -155,14 +157,15 @@ public class JobHistoryEventHandler exte + doneDirPath + "] based on conf: " + MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR - + ". Either set to true or pre-create this directory with appropriate permissions"; + + ". Either set to true or pre-create this directory with" + + " appropriate permissions"; LOG.error(message); throw new YarnException(message); } } } catch (IOException e) { - LOG.error("Failed checking for the existance of history intermediate done directory: [" - + doneDirPath + "]"); + LOG.error("Failed checking for the existance of history intermediate " + + "done directory: [" + doneDirPath + "]"); throw new YarnException(e); } @@ -380,8 +383,11 @@ public class JobHistoryEventHandler exte MetaInfo mi = fileMap.get(event.getJobID()); try { HistoryEvent historyEvent = event.getHistoryEvent(); - mi.writeEvent(historyEvent); - processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), event.getJobID()); + if (! (historyEvent instanceof NormalizedResourceEvent)) { + mi.writeEvent(historyEvent); + } + processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), + event.getJobID()); LOG.info("In HistoryEventHandler " + event.getHistoryEvent().getEventType()); } catch (IOException e) { @@ -395,7 +401,7 @@ public class JobHistoryEventHandler exte (JobSubmittedEvent) event.getHistoryEvent(); mi.getJobIndexInfo().setSubmitTime(jobSubmittedEvent.getSubmitTime()); } - + // If this is JobFinishedEvent, close the writer and setup the job-index if (event.getHistoryEvent().getEventType() == EventType.JOB_FINISHED) { try { @@ -415,7 +421,8 @@ public class JobHistoryEventHandler exte if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED || event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) { try { - JobUnsuccessfulCompletionEvent jucEvent = (JobUnsuccessfulCompletionEvent) event + JobUnsuccessfulCompletionEvent jucEvent = + (JobUnsuccessfulCompletionEvent) event .getHistoryEvent(); mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime()); mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps()); @@ -429,7 +436,8 @@ public class JobHistoryEventHandler exte } } - private void processEventForJobSummary(HistoryEvent event, JobSummary summary, JobId jobId) { + public void processEventForJobSummary(HistoryEvent event, JobSummary summary, + JobId jobId) { // context.getJob could be used for some of this info as well. switch (event.getEventType()) { case JOB_SUBMITTED: @@ -438,6 +446,15 @@ public class JobHistoryEventHandler exte summary.setQueue(jse.getJobQueueName()); summary.setJobSubmitTime(jse.getSubmitTime()); break; + case NORMALIZED_RESOURCE: + NormalizedResourceEvent normalizedResourceEvent = + (NormalizedResourceEvent) event; + if (normalizedResourceEvent.getTaskType() == TaskType.MAP) { + summary.setResourcesPerMap(normalizedResourceEvent.getMemory()); + } else if (normalizedResourceEvent.getTaskType() == TaskType.REDUCE) { + summary.setResourcesPerReduce(normalizedResourceEvent.getMemory()); + } + break; case JOB_INITED: JobInitedEvent jie = (JobInitedEvent) event; summary.setJobLaunchTime(jie.getLaunchTime()); @@ -503,7 +520,8 @@ public class JobHistoryEventHandler exte if (!mi.isWriterActive()) { throw new IOException( - "Inactive Writer: Likely received multiple JobFinished / JobUnsuccessful events for JobId: [" + "Inactive Writer: Likely received multiple JobFinished / " + + "JobUnsuccessful events for JobId: [" + jobId + "]"); } @@ -594,7 +612,8 @@ public class JobHistoryEventHandler exte this.historyFile = historyFile; this.confFile = conf; this.writer = writer; - this.jobIndexInfo = new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null); + this.jobIndexInfo = new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, + null); this.jobSummary = new JobSummary(); } Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java?rev=1188529&r1=1188528&r2=1188529&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java Tue Oct 25 06:27:24 2011 @@ -34,7 +34,8 @@ public class JobSummary { private int numFailedMaps; private int numFinishedReduces; private int numFailedReduces; - // private int numSlotsPerMap; | Doesn't make sense with potentially different + private int resourcesPerMap; // resources used per map/min resource + private int resourcesPerReduce; // resources used per reduce/min resource // resource models // private int numSlotsPerReduce; | Doesn't make sense with potentially // different resource models @@ -112,14 +113,14 @@ public class JobSummary { this.numFailedMaps = numFailedMaps; } - // public int getNumSlotsPerMap() { - // return numSlotsPerMap; - // } - // - // public void setNumSlotsPerMap(int numSlotsPerMap) { - // this.numSlotsPerMap = numSlotsPerMap; - // } - + public int getResourcesPerMap() { + return resourcesPerMap; + } + + public void setResourcesPerMap(int resourcesPerMap) { + this.resourcesPerMap = resourcesPerMap; + } + public int getNumFinishedReduces() { return numFinishedReduces; } @@ -136,14 +137,14 @@ public class JobSummary { this.numFailedReduces = numFailedReduces; } - // public int getNumSlotsPerReduce() { - // return numSlotsPerReduce; - // } - // - // public void setNumSlotsPerReduce(int numSlotsPerReduce) { - // this.numSlotsPerReduce = numSlotsPerReduce; - // } - + public int getResourcesPerReduce() { + return this.resourcesPerReduce; + } + + public void setResourcesPerReduce(int resourcesPerReduce) { + this.resourcesPerReduce = resourcesPerReduce; + } + public String getUser() { return user; } @@ -184,14 +185,6 @@ public class JobSummary { this.reduceSlotSeconds = reduceSlotSeconds; } - // public int getClusterSlotCapacity() { - // return clusterSlotCapacity; - // } - // - // public void setClusterSlotCapacity(int clusterSlotCapacity) { - // this.clusterSlotCapacity = clusterSlotCapacity; - // } - public String getJobSummaryString() { SummaryBuilder summary = new SummaryBuilder() .add("jobId", jobId) @@ -200,6 +193,8 @@ public class JobSummary { .add("firstMapTaskLaunchTime", firstMapTaskLaunchTime) .add("firstReduceTaskLaunchTime", firstReduceTaskLaunchTime) .add("finishTime", jobFinishTime) + .add("resourcesPerMap", resourcesPerMap) + .add("resourcesPerReduce", resourcesPerReduce) .add("numMaps", numFinishedMaps + numFailedMaps) .add("numReduces", numFinishedReduces + numFailedReduces) .add("user", user) Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1188529&r1=1188528&r2=1188529&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Tue Oct 25 06:27:24 2011 @@ -91,12 +91,12 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent; import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent; import org.apache.hadoop.mapreduce.v2.util.MRApps; -import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; @@ -115,10 +115,10 @@ import org.apache.hadoop.yarn.state.Inva import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; +import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.RackResolver; -import org.apache.hadoop.util.StringUtils; /** @@ -856,7 +856,7 @@ public abstract class TaskAttemptImpl im private static long computeSlotMillis(TaskAttemptImpl taskAttempt) { TaskType taskType = taskAttempt.getID().getTaskId().getTaskType(); int slotMemoryReq = - taskAttempt.getMemoryRequired(taskAttempt.conf, taskType); + taskAttempt.getMemoryRequired(taskAttempt.conf, taskType); int simSlotsRequired = slotMemoryReq / (taskType == TaskType.MAP ? MAP_MEMORY_MB_DEFAULT @@ -994,7 +994,7 @@ public abstract class TaskAttemptImpl im private static class ContainerAssignedTransition implements SingleArcTransition { - @SuppressWarnings("unchecked") + @SuppressWarnings({ "unchecked", "deprecation" }) @Override public void transition(final TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { @@ -1164,6 +1164,7 @@ public abstract class TaskAttemptImpl im @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { + @SuppressWarnings("deprecation") TaskAttemptContext taskContext = new TaskAttemptContextImpl(new JobConf(taskAttempt.conf), TypeConverter.fromYarn(taskAttempt.attemptId)); Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1188529&r1=1188528&r2=1188529&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Tue Oct 25 06:27:24 2011 @@ -18,8 +18,6 @@ package org.apache.hadoop.mapreduce.v2.app.rm; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -37,7 +35,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobCounter; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; @@ -125,7 +128,7 @@ public class RMContainerAllocator extend private float maxReduceRampupLimit = 0; private float maxReducePreemptionLimit = 0; private float reduceSlowStart = 0; - + public RMContainerAllocator(ClientService clientService, AppContext context) { super(clientService, context); } @@ -169,6 +172,7 @@ public class RMContainerAllocator extend LOG.info("Final Stats: " + getStat()); } + @SuppressWarnings("unchecked") @Override public synchronized void handle(ContainerAllocatorEvent event) { LOG.info("Processing the event " + event.toString()); @@ -179,7 +183,13 @@ public class RMContainerAllocator extend if (mapResourceReqt == 0) { mapResourceReqt = reqEvent.getCapability().getMemory(); int minSlotMemSize = getMinContainerCapability().getMemory(); - mapResourceReqt = (int) Math.ceil((float) mapResourceReqt/minSlotMemSize) * minSlotMemSize; + mapResourceReqt = (int) Math.ceil((float) mapResourceReqt/minSlotMemSize) + * minSlotMemSize; + JobID id = TypeConverter.fromYarn(applicationId); + JobId jobId = TypeConverter.toYarn(id); + eventHandler.handle(new JobHistoryEvent(jobId, + new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP, + mapResourceReqt))); LOG.info("mapResourceReqt:"+mapResourceReqt); if (mapResourceReqt > getMaxContainerCapability().getMemory()) { String diagMsg = "MAP capability required is more than the supported " + @@ -199,12 +209,20 @@ public class RMContainerAllocator extend reduceResourceReqt = reqEvent.getCapability().getMemory(); int minSlotMemSize = getMinContainerCapability().getMemory(); //round off on slotsize - reduceResourceReqt = (int) Math.ceil((float) reduceResourceReqt/minSlotMemSize) * minSlotMemSize; + reduceResourceReqt = (int) Math.ceil((float) + reduceResourceReqt/minSlotMemSize) * minSlotMemSize; + JobID id = TypeConverter.fromYarn(applicationId); + JobId jobId = TypeConverter.toYarn(id); + eventHandler.handle(new JobHistoryEvent(jobId, + new NormalizedResourceEvent( + org.apache.hadoop.mapreduce.TaskType.REDUCE, + reduceResourceReqt))); LOG.info("reduceResourceReqt:"+reduceResourceReqt); if (reduceResourceReqt > getMaxContainerCapability().getMemory()) { - String diagMsg = "REDUCE capability required is more than the supported " + - "max container capability in the cluster. Killing the Job. reduceResourceReqt: " + - reduceResourceReqt + " maxContainerCapability:" + getMaxContainerCapability().getMemory(); + String diagMsg = "REDUCE capability required is more than the " + + "supported max container capability in the cluster. Killing the " + + "Job. reduceResourceReqt: " + reduceResourceReqt + + " maxContainerCapability:" + getMaxContainerCapability().getMemory(); LOG.info(diagMsg); eventHandler.handle(new JobDiagnosticsUpdateEvent( getJob().getID(), diagMsg)); @@ -217,7 +235,8 @@ public class RMContainerAllocator extend //add to the front of queue for fail fast pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE)); } else { - pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE));//reduces are added to pending and are slowly ramped up + pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE)); + //reduces are added to pending and are slowly ramped up } } @@ -411,6 +430,7 @@ public class RMContainerAllocator extend " availableResources(headroom):" + getAvailableResources(); } + @SuppressWarnings("unchecked") private List getResources() throws Exception { int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null AMResponse response = makeRemoteRequest(); @@ -538,6 +558,7 @@ public class RMContainerAllocator extend addContainerReq(req); } + @SuppressWarnings("unchecked") private void assign(List allocatedContainers) { Iterator it = allocatedContainers.iterator(); LOG.info("Got allocated containers " + allocatedContainers.size()); @@ -694,6 +715,7 @@ public class RMContainerAllocator extend } + @SuppressWarnings("unchecked") private ContainerRequest assignToFailedMap(Container allocated) { //try to assign to earlierFailedMaps if present ContainerRequest assigned = null; @@ -723,6 +745,7 @@ public class RMContainerAllocator extend return assigned; } + @SuppressWarnings("unchecked") private ContainerRequest assignToMap(Container allocated) { //try to assign to maps if present //first by host, then by rack, followed by * @@ -798,7 +821,8 @@ public class RMContainerAllocator extend } void preemptReduce(int toPreempt) { - List reduceList = new ArrayList(reduces.keySet()); + List reduceList = new ArrayList + (reduces.keySet()); //sort reduces on progress Collections.sort(reduceList, new Comparator() { Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1188529&r1=1188528&r2=1188529&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Tue Oct 25 06:27:24 2011 @@ -31,9 +31,12 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.WrappedJvmID; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; @@ -360,6 +363,16 @@ public class MRApp extends MRAppMaster { NodeId nodeId = BuilderUtils.newNodeId("localhost", 1234); Container container = BuilderUtils.newContainer(cId, nodeId, "localhost:9999", null, null, null); + JobID id = TypeConverter.fromYarn(applicationId); + JobId jobId = TypeConverter.toYarn(id); + getContext().getEventHandler().handle(new JobHistoryEvent(jobId, + new NormalizedResourceEvent( + org.apache.hadoop.mapreduce.TaskType.REDUCE, + 100))); + getContext().getEventHandler().handle(new JobHistoryEvent(jobId, + new NormalizedResourceEvent( + org.apache.hadoop.mapreduce.TaskType.MAP, + 100))); getContext().getEventHandler().handle( new TaskAttemptContainerAssignedEvent(event.getAttemptID(), container, null)); Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr?rev=1188529&r1=1188528&r2=1188529&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr Tue Oct 25 06:27:24 2011 @@ -225,7 +225,7 @@ {"name": "counters", "type": "JhCounters"} ] }, - + {"type": "record", "name": "TaskStarted", "fields": [ {"name": "taskid", "type": "string"}, @@ -256,6 +256,7 @@ "TASK_FINISHED", "TASK_FAILED", "TASK_UPDATED", + "NORMALIZED_RESOURCE", "MAP_ATTEMPT_STARTED", "MAP_ATTEMPT_FINISHED", "MAP_ATTEMPT_FAILED", Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java?rev=1188529&r1=1188528&r2=1188529&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java Tue Oct 25 06:27:24 2011 @@ -18,8 +18,6 @@ package org.apache.hadoop.mapreduce.jobhistory; -import java.io.IOException; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java?rev=1188529&r1=1188528&r2=1188529&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java Tue Oct 25 06:27:24 2011 @@ -18,14 +18,11 @@ package org.apache.hadoop.mapreduce.jobhistory; -import java.io.IOException; - +import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.JobID; -import org.apache.avro.util.Utf8; - /** * Event to record Failed and Killed completion of jobs * Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java?rev=1188529&r1=1188528&r2=1188529&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java Tue Oct 25 06:27:24 2011 @@ -18,19 +18,15 @@ package org.apache.hadoop.mapreduce.jobhistory; -import java.io.IOException; - +import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.mapred.ProgressSplitsBlock; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.mapred.ProgressSplitsBlock; - -import org.apache.avro.util.Utf8; - /** * Event to record successful completion of a reduce attempt * Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java?rev=1188529&r1=1188528&r2=1188529&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java Tue Oct 25 06:27:24 2011 @@ -18,15 +18,12 @@ package org.apache.hadoop.mapreduce.jobhistory; -import java.io.IOException; - +import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; -import org.apache.avro.util.Utf8; - /** * Event to record the start of a task * Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1188529&r1=1188528&r2=1188529&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Tue Oct 25 06:27:24 2011 @@ -1,20 +1,20 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.mapreduce.v2.hs; @@ -54,27 +54,32 @@ import org.junit.Test; public class TestJobHistoryParsing { private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class); + @Test public void testHistoryParsing() throws Exception { Configuration conf = new Configuration(); long amStartTimeEst = System.currentTimeMillis(); - MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(), true); + MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(), + true); app.submit(conf); Job job = app.getContext().getAllJobs().values().iterator().next(); JobId jobId = job.getID(); LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString()); app.waitForState(job, JobState.SUCCEEDED); - - //make sure all events are flushed + + // make sure all events are flushed app.waitForState(Service.STATE.STOPPED); - - String jobhistoryDir = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf); + + String jobhistoryDir = JobHistoryUtils + .getHistoryIntermediateDoneDirForUser(conf); JobHistory jobHistory = new JobHistory(); jobHistory.init(conf); - - JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId).getJobIndexInfo(); - String jobhistoryFileName = FileNameIndexUtils.getDoneFileName(jobIndexInfo); - + + JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId) + .getJobIndexInfo(); + String jobhistoryFileName = FileNameIndexUtils + .getDoneFileName(jobIndexInfo); + Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName); FSDataInputStream in = null; LOG.info("JobHistoryFile is: " + historyFilePath); @@ -86,27 +91,24 @@ public class TestJobHistoryParsing { LOG.info("Can not open history file: " + historyFilePath, ioe); throw (new Exception("Can not open History File")); } - + JobHistoryParser parser = new JobHistoryParser(in); JobInfo jobInfo = parser.parse(); - - Assert.assertEquals ("Incorrect username ", - "mapred", jobInfo.getUsername()); - Assert.assertEquals("Incorrect jobName ", - "test", jobInfo.getJobname()); - Assert.assertEquals("Incorrect queuename ", - "default", jobInfo.getJobQueueName()); - Assert.assertEquals("incorrect conf path", - "test", jobInfo.getJobConfPath()); - Assert.assertEquals("incorrect finishedMap ", - 2, jobInfo.getFinishedMaps()); - Assert.assertEquals("incorrect finishedReduces ", - 1, jobInfo.getFinishedReduces()); - Assert.assertEquals("incorrect uberized ", - job.isUber(), jobInfo.getUberized()); + + Assert.assertEquals("Incorrect username ", "mapred", jobInfo.getUsername()); + Assert.assertEquals("Incorrect jobName ", "test", jobInfo.getJobname()); + Assert.assertEquals("Incorrect queuename ", "default", + jobInfo.getJobQueueName()); + Assert + .assertEquals("incorrect conf path", "test", jobInfo.getJobConfPath()); + Assert.assertEquals("incorrect finishedMap ", 2, jobInfo.getFinishedMaps()); + Assert.assertEquals("incorrect finishedReduces ", 1, + jobInfo.getFinishedReduces()); + Assert.assertEquals("incorrect uberized ", job.isUber(), + jobInfo.getUberized()); int totalTasks = jobInfo.getAllTasks().size(); Assert.assertEquals("total number of tasks is incorrect ", 3, totalTasks); - + // Verify aminfo Assert.assertEquals(1, jobInfo.getAMInfos().size()); Assert.assertEquals("testhost", jobInfo.getAMInfos().get(0) @@ -120,15 +122,15 @@ public class TestJobHistoryParsing { && amInfo.getStartTime() >= amStartTimeEst); ContainerId fakeCid = BuilderUtils.newContainerId(-1, -1, -1, -1); - //Assert at taskAttempt level + // Assert at taskAttempt level for (TaskInfo taskInfo : jobInfo.getAllTasks().values()) { int taskAttemptCount = taskInfo.getAllTaskAttempts().size(); - Assert.assertEquals("total number of task attempts ", - 1, taskAttemptCount); - TaskAttemptInfo taInfo = - taskInfo.getAllTaskAttempts().values().iterator().next(); + Assert + .assertEquals("total number of task attempts ", 1, taskAttemptCount); + TaskAttemptInfo taInfo = taskInfo.getAllTaskAttempts().values() + .iterator().next(); Assert.assertNotNull(taInfo.getContainerId()); - //Verify the wrong ctor is not being used. Remove after mrv1 is removed. + // Verify the wrong ctor is not being used. Remove after mrv1 is removed. Assert.assertFalse(taInfo.getContainerId().equals(fakeCid)); } @@ -138,9 +140,8 @@ public class TestJobHistoryParsing { TypeConverter.fromYarn(task.getID())); Assert.assertNotNull("TaskInfo not found", taskInfo); for (TaskAttempt taskAttempt : task.getAttempts().values()) { - TaskAttemptInfo taskAttemptInfo = - taskInfo.getAllTaskAttempts().get( - TypeConverter.fromYarn((taskAttempt.getID()))); + TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get( + TypeConverter.fromYarn((taskAttempt.getID()))); Assert.assertNotNull("TaskAttemptInfo not found", taskAttemptInfo); Assert.assertEquals("Incorrect shuffle port for task attempt", taskAttempt.getShufflePort(), taskAttemptInfo.getShufflePort()); @@ -151,6 +152,8 @@ public class TestJobHistoryParsing { .getIntermediateSummaryFileName(jobId); Path summaryFile = new Path(jobhistoryDir, summaryFileName); String jobSummaryString = jobHistory.getJobSummary(fc, summaryFile); + Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100")); + Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100")); Assert.assertNotNull(jobSummaryString); Map jobSummaryElements = new HashMap(); Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java?rev=1188529&r1=1188528&r2=1188529&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEvents.java Tue Oct 25 06:27:24 2011 @@ -17,16 +17,13 @@ */ package org.apache.hadoop.mapreduce.jobhistory; -import java.util.List; -import java.util.ArrayList; +import junit.framework.TestCase; import org.apache.hadoop.mapred.TaskStatus; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskType; -import junit.framework.TestCase; - /** * Test various jobhistory events */