From mapreduce-commits-return-2593-apmail-hadoop-mapreduce-commits-archive=hadoop.apache.org@hadoop.apache.org Tue Oct 25 00:42:18 2011 Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 67FD1730B for ; Tue, 25 Oct 2011 00:42:18 +0000 (UTC) Received: (qmail 3569 invoked by uid 500); 25 Oct 2011 00:42:18 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 3529 invoked by uid 500); 25 Oct 2011 00:42:18 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 3521 invoked by uid 99); 25 Oct 2011 00:42:18 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Oct 2011 00:42:18 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Oct 2011 00:42:14 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 039CF238889B; Tue, 25 Oct 2011 00:41:53 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1188467 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-client-ap... Date: Tue, 25 Oct 2011 00:41:52 -0000 To: mapreduce-commits@hadoop.apache.org From: acmurthy@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111025004153.039CF238889B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: acmurthy Date: Tue Oct 25 00:41:52 2011 New Revision: 1188467 URL: http://svn.apache.org/viewvc?rev=1188467&view=rev Log: Merge -c 1188388 from trunk to branch-0.23 to complete fix for MAPREDUCE-3252. Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1188467&r1=1188466&r2=1188467&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue Oct 25 00:41:52 2011 @@ -1691,6 +1691,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3028. Added job-end notification support. (Ravi Prakash via acmurthy) + MAPREDUCE-3249. Ensure shuffle-port is correctly used duringMR AM recovery. + (vinodkv via acmurthy) + MAPREDUCE-3252. Fix map tasks to not rewrite data an extra time when map output fits in spill buffer. (todd) Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1188467&r1=1188466&r2=1188467&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Tue Oct 25 00:41:52 2011 @@ -568,7 +568,8 @@ public abstract class TaskImpl implement //raise the completion event only if the container is assigned // to nextAttemptNumber if (attempt.getNodeHttpAddress() != null) { - TaskAttemptCompletionEvent tce = recordFactory.newRecordInstance(TaskAttemptCompletionEvent.class); + TaskAttemptCompletionEvent tce = recordFactory + .newRecordInstance(TaskAttemptCompletionEvent.class); tce.setEventId(-1); tce.setMapOutputServerAddress("http://" + attempt.getNodeHttpAddress().split(":")[0] + ":" Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1188467&r1=1188466&r2=1188467&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Tue Oct 25 00:41:52 2011 @@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.factories. import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.Service; +import org.apache.hadoop.yarn.util.BuilderUtils; /* * Recovers the completed tasks from the previous life of Application Master. @@ -313,8 +314,8 @@ public class RecoveryService extends Com TaskAttemptId aId = ((ContainerRemoteLaunchEvent) event) .getTaskAttemptID(); TaskAttemptInfo attInfo = getTaskAttemptInfo(aId); - //TODO need to get the real port number MAPREDUCE-2666 - actualHandler.handle(new TaskAttemptContainerLaunchedEvent(aId, -1)); + actualHandler.handle(new TaskAttemptContainerLaunchedEvent(aId, + attInfo.getShufflePort())); // send the status update event sendStatusUpdateEvent(aId, attInfo); @@ -392,16 +393,15 @@ public class RecoveryService extends Com TaskAttemptInfo attemptInfo) { LOG.info("Sending assigned event to " + yarnAttemptID); ContainerId cId = attemptInfo.getContainerId(); - Container container = recordFactory - .newRecordInstance(Container.class); - container.setId(cId); - container.setNodeId(recordFactory - .newRecordInstance(NodeId.class)); - // NodeId can be obtained from TaskAttemptInfo.hostname - but this will - // eventually contain rack info. - container.setContainerToken(null); - container.setNodeHttpAddress(attemptInfo.getTrackerName() + ":" + - attemptInfo.getHttpPort()); + String[] splits = attemptInfo.getHostname().split(":"); + NodeId nodeId = BuilderUtils.newNodeId(splits[0], Integer + .parseInt(splits[1])); + // Resource/Priority/ApplicationACLs are only needed while launching the + // container on an NM, these are already completed tasks, so setting them + // to null + Container container = BuilderUtils.newContainer(cId, nodeId, + attemptInfo.getTrackerName() + ":" + attemptInfo.getHttpPort(), + null, null, null); actualHandler.handle(new TaskAttemptContainerAssignedEvent(yarnAttemptID, container, null)); } Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1188467&r1=1188466&r2=1188467&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Tue Oct 25 00:41:52 2011 @@ -315,15 +315,17 @@ public class MRApp extends MRAppMaster { } class MockContainerLauncher implements ContainerLauncher { + + //We are running locally so set the shuffle port to -1 + int shufflePort = -1; + @Override public void handle(ContainerLauncherEvent event) { switch (event.getType()) { case CONTAINER_REMOTE_LAUNCH: - //We are running locally so set the shuffle port to -1 getContext().getEventHandler().handle( new TaskAttemptContainerLaunchedEvent(event.getTaskAttemptID(), - -1) - ); + shufflePort)); attemptLaunched(event.getTaskAttemptID()); break; @@ -355,13 +357,9 @@ public class MRApp extends MRAppMaster { ContainerId cId = recordFactory.newRecordInstance(ContainerId.class); cId.setApplicationAttemptId(getContext().getApplicationAttemptId()); cId.setId(containerCount++); - Container container = recordFactory.newRecordInstance(Container.class); - container.setId(cId); - container.setNodeId(recordFactory.newRecordInstance(NodeId.class)); - container.getNodeId().setHost("dummy"); - container.getNodeId().setPort(1234); - container.setContainerToken(null); - container.setNodeHttpAddress("localhost:9999"); + NodeId nodeId = BuilderUtils.newNodeId("localhost", 1234); + Container container = BuilderUtils.newContainer(cId, nodeId, + "localhost:9999", null, null, null); getContext().getEventHandler().handle( new TaskAttemptContainerAssignedEvent(event.getAttemptID(), container, null)); Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1188467&r1=1188466&r2=1188467&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Tue Oct 25 00:41:52 2011 @@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.event.EventHandler; import org.junit.Test; @@ -269,6 +270,9 @@ public class TestRecovery { //wait for map task to complete app.waitForState(mapTask1, TaskState.SUCCEEDED); + + // Verify the shuffle-port + Assert.assertEquals(5467, task1Attempt1.getShufflePort()); app.waitForState(reduceTask1, TaskState.RUNNING); TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next(); @@ -290,7 +294,8 @@ public class TestRecovery { //rerun //in rerun the map will be recovered from previous run - app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false, ++runCount); + app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false, + ++runCount); conf = new Configuration(); conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); conf.setBoolean("mapred.mapper.new-api", true); @@ -308,6 +313,10 @@ public class TestRecovery { // map will be recovered, no need to send done app.waitForState(mapTask1, TaskState.SUCCEEDED); + + // Verify the shuffle-port after recovery + task1Attempt1 = mapTask1.getAttempts().values().iterator().next(); + Assert.assertEquals(5467, task1Attempt1.getShufflePort()); // first reduce will be recovered, no need to send done app.waitForState(reduceTask1, TaskState.SUCCEEDED); @@ -398,6 +407,13 @@ public class TestRecovery { } @Override + protected ContainerLauncher createContainerLauncher(AppContext context) { + MockContainerLauncher launcher = new MockContainerLauncher(); + launcher.shufflePort = 5467; + return launcher; + } + + @Override protected EventHandler createJobHistoryHandler( AppContext context) { JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context,