From mapreduce-commits-return-3261-apmail-hadoop-mapreduce-commits-archive=hadoop.apache.org@hadoop.apache.org Wed Jan 18 23:03:06 2012 Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A235BB418 for ; Wed, 18 Jan 2012 23:03:06 +0000 (UTC) Received: (qmail 59278 invoked by uid 500); 18 Jan 2012 23:03:05 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 58580 invoked by uid 500); 18 Jan 2012 23:03:04 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 58550 invoked by uid 99); 18 Jan 2012 23:03:04 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Jan 2012 23:03:04 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FRT_OPPORTUN1 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Jan 2012 23:03:01 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 14F5E2388A2C; Wed, 18 Jan 2012 23:02:41 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1233105 [2/2] - in /hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-common/s... Date: Wed, 18 Jan 2012 23:02:38 -0000 To: mapreduce-commits@hadoop.apache.org From: atm@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120118230241.14F5E2388A2C@eris.apache.org> Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 18 23:02:17 2012 @@ -1,2 +1,2 @@ -/hadoop/common/trunk/hadoop-mapreduce-project:1152502-1232180 +/hadoop/common/trunk/hadoop-mapreduce-project:1152502-1233103 /hadoop/core/branches/branch-0.19/mapred:713112 Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/.gitignore ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 18 23:02:17 2012 @@ -1,3 +1,3 @@ -/hadoop/common/trunk/hadoop-mapreduce-project/.gitignore:1161333-1232180 +/hadoop/common/trunk/hadoop-mapreduce-project/.gitignore:1161333-1233103 /hadoop/core/branches/branch-0.19/mapred/.gitignore:713112 /hadoop/core/trunk/.gitignore:784664-785643 Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/CHANGES.txt?rev=1233105&r1=1233104&r2=1233105&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/CHANGES.txt Wed Jan 18 23:02:17 2012 @@ -183,6 +183,10 @@ Release 0.23.1 - Unreleased MAPREDUCE-3553. Add support for data returned when exceptions thrown from web service apis to be in either xml or in JSON. (Thomas Graves via mahadev) + MAPREDUCE-3641. Making CapacityScheduler more conservative so as to + assign only one off-switch container in a single scheduling + iteration. (Arun C Murthy via vinodkv) + OPTIMIZATIONS MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar @@ -499,6 +503,18 @@ Release 0.23.1 - Unreleased MAPREDUCE-3657. State machine visualize build fails. (Jason Lowe via mahadev) + MAPREDUCE-2450. Fixed a corner case with interrupted communication threads + leading to a long timeout in Task. (Rajesh Balamohan via acmurthy) + + MAPREDUCE-3669. Allow clients to talk to MR HistoryServer using both + delegation tokens and kerberos. (mahadev via acmurthy) + + MAPREDUCE-3684. LocalDistributedCacheManager does not shut down its thread + pool (tomwhite) + + MAPREDUCE-3582. Move successfully passing MR1 tests to MR2 maven tree. + (ahmed via tucu) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/CHANGES.txt ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 18 23:02:17 2012 @@ -1,3 +1,3 @@ -/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:1161333-1232180 +/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:1161333-1233103 /hadoop/core/branches/branch-0.19/mapred/CHANGES.txt:713112 /hadoop/mapreduce/branches/HDFS-641/CHANGES.txt:817878-835964 Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/conf/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 18 23:02:17 2012 @@ -1,3 +1,3 @@ -/hadoop/common/trunk/hadoop-mapreduce-project/conf:1152502-1232180 +/hadoop/common/trunk/hadoop-mapreduce-project/conf:1152502-1233103 /hadoop/core/branches/branch-0.19/mapred/conf:713112 /hadoop/core/trunk/conf:784664-785643 Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java?rev=1233105&r1=1233104&r2=1233105&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java Wed Jan 18 23:02:17 2012 @@ -39,6 +39,7 @@ import java.util.concurrent.ExecutionExc import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -57,6 +58,8 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.FSDownload; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * A helper class for managing the distributed cache for {@link LocalJobRunner}. */ @@ -111,43 +114,52 @@ class LocalDistributedCacheManager { FileContext localFSFileContext = FileContext.getLocalFSFileContext(); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - Map> resourcesToPaths = Maps.newHashMap(); - ExecutorService exec = Executors.newCachedThreadPool(); - Path destPath = localDirAllocator.getLocalPathForWrite(".", conf); - for (LocalResource resource : localResources.values()) { - Callable download = new FSDownload(localFSFileContext, ugi, conf, - destPath, resource, new Random()); - Future future = exec.submit(download); - resourcesToPaths.put(resource, future); - } - for (LocalResource resource : localResources.values()) { - Path path; - try { - path = resourcesToPaths.get(resource).get(); - } catch (InterruptedException e) { - throw new IOException(e); - } catch (ExecutionException e) { - throw new IOException(e); - } - String pathString = path.toUri().toString(); - if (resource.getType() == LocalResourceType.ARCHIVE) { - localArchives.add(pathString); - } else if (resource.getType() == LocalResourceType.FILE) { - localFiles.add(pathString); - } - Path resourcePath; - try { - resourcePath = ConverterUtils.getPathFromYarnURL(resource.getResource()); - } catch (URISyntaxException e) { - throw new IOException(e); - } - LOG.info(String.format("Localized %s as %s", resourcePath, path)); - String cp = resourcePath.toUri().getPath(); - if (classpaths.keySet().contains(cp)) { - localClasspaths.add(path.toUri().getPath().toString()); + ExecutorService exec = null; + try { + ThreadFactory tf = new ThreadFactoryBuilder() + .setNameFormat("LocalDistributedCacheManager Downloader #%d") + .build(); + exec = Executors.newCachedThreadPool(tf); + Path destPath = localDirAllocator.getLocalPathForWrite(".", conf); + Map> resourcesToPaths = Maps.newHashMap(); + for (LocalResource resource : localResources.values()) { + Callable download = new FSDownload(localFSFileContext, ugi, conf, + destPath, resource, new Random()); + Future future = exec.submit(download); + resourcesToPaths.put(resource, future); + } + for (LocalResource resource : localResources.values()) { + Path path; + try { + path = resourcesToPaths.get(resource).get(); + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e); + } + String pathString = path.toUri().toString(); + if (resource.getType() == LocalResourceType.ARCHIVE) { + localArchives.add(pathString); + } else if (resource.getType() == LocalResourceType.FILE) { + localFiles.add(pathString); + } + Path resourcePath; + try { + resourcePath = ConverterUtils.getPathFromYarnURL(resource.getResource()); + } catch (URISyntaxException e) { + throw new IOException(e); + } + LOG.info(String.format("Localized %s as %s", resourcePath, path)); + String cp = resourcePath.toUri().getPath(); + if (classpaths.keySet().contains(cp)) { + localClasspaths.add(path.toUri().getPath().toString()); + } + } + } finally { + if (exec != null) { + exec.shutdown(); } - } - + } // Update the configuration object with localized data. if (!localArchives.isEmpty()) { conf.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils @@ -171,7 +183,7 @@ class LocalDistributedCacheManager { } setupCalled = true; } - + /** * Are the resources that should be added to the classpath? * Should be called after setup(). Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1233105&r1=1233104&r2=1233105&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java Wed Jan 18 23:02:17 2012 @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -61,6 +62,8 @@ import org.apache.hadoop.security.author import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ReflectionUtils; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** Implements MapReduce locally, in-process, for debugging. */ @InterfaceAudience.Private @InterfaceStability.Unstable @@ -302,7 +305,10 @@ public class LocalJobRunner implements C LOG.debug("Map tasks to process: " + this.numMapTasks); // Create a new executor service to drain the work queue. - ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads); + ThreadFactory tf = new ThreadFactoryBuilder() + .setNameFormat("LocalJobRunner Map Task Executor #%d") + .build(); + ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads, tf); return executor; } Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/security/client/ClientHSSecurityInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/security/client/ClientHSSecurityInfo.java?rev=1233105&r1=1233104&r2=1233105&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/security/client/ClientHSSecurityInfo.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/security/client/ClientHSSecurityInfo.java Wed Jan 18 23:02:17 2012 @@ -27,14 +27,14 @@ import org.apache.hadoop.security.Securi import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenInfo; import org.apache.hadoop.security.token.TokenSelector; -import org.apache.hadoop.yarn.proto.MRClientProtocol; +import org.apache.hadoop.yarn.proto.HSClientProtocol; public class ClientHSSecurityInfo extends SecurityInfo { @Override public KerberosInfo getKerberosInfo(Class protocol, Configuration conf) { if (!protocol - .equals(MRClientProtocol.MRClientProtocolService.BlockingInterface.class)) { + .equals(HSClientProtocol.HSClientProtocolService.BlockingInterface.class)) { return null; } return new KerberosInfo() { @@ -59,7 +59,7 @@ public class ClientHSSecurityInfo extend @Override public TokenInfo getTokenInfo(Class protocol, Configuration conf) { if (!protocol - .equals(MRClientProtocol.MRClientProtocolService.BlockingInterface.class)) { + .equals(HSClientProtocol.HSClientProtocolService.BlockingInterface.class)) { return null; } return new TokenInfo() { Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/MRClientProtocol.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/MRClientProtocol.proto?rev=1233105&r1=1233104&r2=1233105&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/MRClientProtocol.proto (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/MRClientProtocol.proto Wed Jan 18 23:02:17 2012 @@ -22,6 +22,7 @@ option java_generic_services = true; import "mr_service_protos.proto"; +/* If making changes to this, please edit HSClientProtocolService */ service MRClientProtocolService { rpc getJobReport (GetJobReportRequestProto) returns (GetJobReportResponseProto); rpc getTaskReport (GetTaskReportRequestProto) returns (GetTaskReportResponseProto); Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1233105&r1=1233104&r2=1233105&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Wed Jan 18 23:02:17 2012 @@ -552,6 +552,8 @@ abstract public class Task implements Wr private InputSplit split = null; private Progress taskProgress; private Thread pingThread = null; + private boolean done = true; + private Object lock = new Object(); /** * flag that indicates whether progress update needs to be sent to parent. @@ -648,6 +650,9 @@ abstract public class Task implements Wr // get current flag value and reset it as well boolean sendProgress = resetProgressFlag(); while (!taskDone.get()) { + synchronized (lock) { + done = false; + } try { boolean taskFound = true; // whether TT knows about this task // sleep for a bit @@ -680,6 +685,7 @@ abstract public class Task implements Wr // came back up), kill ourselves if (!taskFound) { LOG.warn("Parent died. Exiting "+taskId); + resetDoneFlag(); System.exit(66); } @@ -692,10 +698,19 @@ abstract public class Task implements Wr if (remainingRetries == 0) { ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0); LOG.warn("Last retry, killing "+taskId); + resetDoneFlag(); System.exit(65); } } } + //Notify that we are done with the work + resetDoneFlag(); + } + void resetDoneFlag() { + synchronized (lock) { + done = true; + lock.notify(); + } } public void startCommunicationThread() { if (pingThread == null) { @@ -706,6 +721,11 @@ abstract public class Task implements Wr } public void stopCommunicationThread() throws InterruptedException { if (pingThread != null) { + synchronized (lock) { + while (!done) { + lock.wait(); + } + } pingThread.interrupt(); pingThread.join(); } Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 18 23:02:17 2012 @@ -1,3 +1,3 @@ -/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:1166973-1232180 +/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:1166973-1233103 /hadoop/core/branches/branch-0.19/mapred/src/java/mapred-default.xml:713112 /hadoop/core/trunk/src/mapred/mapred-default.xml:776175-785643 Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1233105&r1=1233104&r2=1233105&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Wed Jan 18 23:02:17 2012 @@ -35,6 +35,7 @@ import java.util.concurrent.ScheduledThr import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,6 +50,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapred.JobACLsManager; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobSummary; @@ -86,6 +88,9 @@ public class JobHistory extends Abstract private static final Log LOG = LogFactory.getLog(JobHistory.class); private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class); + public static final Pattern CONF_FILENAME_REGEX = + Pattern.compile("(" + JobID.JOBID_REGEX + ")_conf.xml(?:\\.[0-9]+\\.old)?"); + public static final String OLD_SUFFIX = ".old"; private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils.doneSubdirsBeforeSerialTail(); Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java?rev=1233105&r1=1233104&r2=1233105&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientCache.java Wed Jan 18 23:02:17 2012 @@ -28,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.net.NetUtils; @@ -91,7 +92,7 @@ public class ClientCache { return currentUser.doAs(new PrivilegedAction() { @Override public MRClientProtocol run() { - return (MRClientProtocol) rpc.getProxy(MRClientProtocol.class, + return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class, NetUtils.createSocketAddr(serviceAddr), conf); } }); Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-examples/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-examples/pom.xml?rev=1233105&r1=1233104&r2=1233105&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-examples/pom.xml (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-examples/pom.xml Wed Jan 18 23:02:17 2012 @@ -39,14 +39,59 @@ org.apache.hadoop + hadoop-mapreduce-client-jobclient + test + test-jar + + + org.apache.hadoop hadoop-common provided org.apache.hadoop + hadoop-common + test + test-jar + + + org.apache.hadoop hadoop-hdfs provided + + org.apache.hadoop + hadoop-hdfs + test + test-jar + + + org.apache.hadoop + hadoop-yarn-server-tests + test + test-jar + + + org.apache.hadoop + hadoop-mapreduce-client-app + provided + + + org.apache.hadoop + hadoop-mapreduce-client-app + test-jar + test + + + org.apache.hadoop + hadoop-mapreduce-client-hs + provided + + + org.apache.hadoop + hadoop-mapreduce-client-hs + test + Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java?rev=1233105&r1=1233104&r2=1233105&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java Wed Jan 18 23:02:17 2012 @@ -26,8 +26,26 @@ import org.apache.hadoop.yarn.util.Recor @Private @Evolving public class Resources { + // Java doesn't have const :( - private static final Resource NONE = createResource(0); + private static final Resource NONE = new Resource() { + + @Override + public int getMemory() { + return 0; + } + + @Override + public void setMemory(int memory) { + throw new RuntimeException("NONE cannot be modified!"); + } + + @Override + public int compareTo(Resource o) { + return (0 - o.getMemory()); + } + + }; public static Resource createResource(int memory) { Resource resource = Records.newRecord(Resource.class); @@ -36,7 +54,6 @@ public class Resources { } public static Resource none() { - assert NONE.getMemory() == 0 : "NONE should be empty"; return NONE; } Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java?rev=1233105&r1=1233104&r2=1233105&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java Wed Jan 18 23:02:17 2012 @@ -155,9 +155,10 @@ extends org.apache.hadoop.yarn.server.re * Assign containers to applications in the queue or it's children (if any). * @param clusterResource the resource of the cluster. * @param node node on which resources are available - * @return the resource that is being assigned. + * @return the assignment */ - public Resource assignContainers(Resource clusterResource, SchedulerNode node); + public CSAssignment assignContainers( + Resource clusterResource, SchedulerNode node); /** * A container assigned to the queue has completed. Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1233105&r1=1233104&r2=1233105&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Wed Jan 18 23:02:17 2012 @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; -import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -35,7 +34,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; @@ -703,8 +701,11 @@ public class LeafQueue implements CSQueu return applicationsMap.get(applicationAttemptId); } + private static final CSAssignment NULL_ASSIGNMENT = + new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL); + @Override - public synchronized Resource + public synchronized CSAssignment assignContainers(Resource clusterResource, SchedulerNode node) { if(LOG.isDebugEnabled()) { @@ -717,8 +718,11 @@ public class LeafQueue implements CSQueu if (reservedContainer != null) { SchedulerApp application = getApplication(reservedContainer.getApplicationAttemptId()); - return assignReservedContainer(application, node, reservedContainer, - clusterResource); + return new CSAssignment( + assignReservedContainer(application, node, reservedContainer, + clusterResource), + NodeType.NODE_LOCAL); // Don't care about locality constraints + // for reserved containers } // Try to assign containers to applications in order @@ -746,7 +750,7 @@ public class LeafQueue implements CSQueu // Are we going over limits by allocating to this application? // Maximum Capacity of the queue if (!assignToQueue(clusterResource, required)) { - return Resources.none(); + return NULL_ASSIGNMENT; } // User limits @@ -760,24 +764,23 @@ public class LeafQueue implements CSQueu application.addSchedulingOpportunity(priority); // Try to schedule - Resource assigned = + CSAssignment assignment = assignContainersOnNode(clusterResource, node, application, priority, null); - + + Resource assigned = assignment.getResource(); + // Did we schedule or reserve a container? if (Resources.greaterThan(assigned, Resources.none())) { - Resource assignedResource = - application.getResourceRequest(priority, RMNode.ANY).getCapability(); // Book-keeping - allocateResource(clusterResource, - application, assignedResource); + allocateResource(clusterResource, application, assigned); // Reset scheduling opportunities application.resetSchedulingOpportunities(priority); // Done - return assignedResource; + return assignment; } else { // Do not assign out of order w.r.t priorities break; @@ -792,7 +795,7 @@ public class LeafQueue implements CSQueu application.showRequests(); } - return Resources.none(); + return NULL_ASSIGNMENT; } @@ -809,11 +812,12 @@ public class LeafQueue implements CSQueu container.getId(), SchedulerUtils.UNRESERVED_CONTAINER), RMContainerEventType.RELEASED); - return container.getResource(); + return container.getResource(); // Ugh, return resource to force re-sort } // Try to assign if we have sufficient resources - assignContainersOnNode(clusterResource, node, application, priority, rmContainer); + assignContainersOnNode(clusterResource, node, application, priority, + rmContainer); // Doesn't matter... since it's already charged for at time of reservation // "re-reservation" is *free* @@ -966,7 +970,7 @@ public class LeafQueue implements CSQueu return (((starvation + requiredContainers) - reservedContainers) > 0); } - private Resource assignContainersOnNode(Resource clusterResource, + private CSAssignment assignContainersOnNode(Resource clusterResource, SchedulerNode node, SchedulerApp application, Priority priority, RMContainer reservedContainer) { @@ -977,7 +981,7 @@ public class LeafQueue implements CSQueu assignNodeLocalContainers(clusterResource, node, application, priority, reservedContainer); if (Resources.greaterThan(assigned, Resources.none())) { - return assigned; + return new CSAssignment(assigned, NodeType.NODE_LOCAL); } // Rack-local @@ -985,12 +989,14 @@ public class LeafQueue implements CSQueu assignRackLocalContainers(clusterResource, node, application, priority, reservedContainer); if (Resources.greaterThan(assigned, Resources.none())) { - return assigned; + return new CSAssignment(assigned, NodeType.RACK_LOCAL); } // Off-switch - return assignOffSwitchContainers(clusterResource, node, application, - priority, reservedContainer); + return new CSAssignment( + assignOffSwitchContainers(clusterResource, node, application, + priority, reservedContainer), + NodeType.OFF_SWITCH); } private Resource assignNodeLocalContainers(Resource clusterResource, @@ -1272,7 +1278,7 @@ public class LeafQueue implements CSQueu metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); LOG.info(getQueueName() + " used=" + usedResources + " numContainers=" + numContainers + - " user=" + userName + " resources=" + user.getConsumedResources()); + " user=" + userName + " user-resources=" + user.getConsumedResources()); } synchronized void releaseResource(Resource clusterResource, @@ -1290,7 +1296,7 @@ public class LeafQueue implements CSQueu LOG.info(getQueueName() + " used=" + usedResources + " numContainers=" + numContainers + - " user=" + userName + " resources=" + user.getConsumedResources()); + " user=" + userName + " user-resources=" + user.getConsumedResources()); } @Override Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1233105&r1=1233104&r2=1233105&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java Wed Jan 18 23:02:17 2012 @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.factory.pr import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; @@ -500,10 +501,12 @@ public class ParentQueue implements CSQu } @Override - public synchronized Resource assignContainers( + public synchronized CSAssignment assignContainers( Resource clusterResource, SchedulerNode node) { - Resource assigned = Resources.createResource(0); - + CSAssignment assignment = + new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL); + boolean assignedOffSwitch = false; + while (canAssign(node)) { if (LOG.isDebugEnabled()) { LOG.debug("Trying to assign containers to child-queue of " @@ -516,16 +519,18 @@ public class ParentQueue implements CSQu } // Schedule - Resource assignedToChild = + CSAssignment assignedToChild = assignContainersToChildQueues(clusterResource, node); + assignedOffSwitch = (assignedToChild.getType() == NodeType.OFF_SWITCH); // Done if no child-queue assigned anything - if (Resources.greaterThan(assignedToChild, Resources.none())) { + if (Resources.greaterThan(assignedToChild.getResource(), + Resources.none())) { // Track resource utilization for the parent-queue - allocateResource(clusterResource, assignedToChild); + allocateResource(clusterResource, assignedToChild.getResource()); // Track resource utilization in this pass of the scheduler - Resources.addTo(assigned, assignedToChild); + Resources.addTo(assignment.getResource(), assignedToChild.getResource()); LOG.info("assignedContainer" + " queue=" + getQueueName() + @@ -539,17 +544,26 @@ public class ParentQueue implements CSQu if (LOG.isDebugEnabled()) { LOG.debug("ParentQ=" + getQueueName() - + " assignedSoFarInThisIteration=" + assigned + + " assignedSoFarInThisIteration=" + assignment.getResource() + " utilization=" + getUtilization()); } // Do not assign more than one container if this isn't the root queue - if (!rootQueue) { + // or if we've already assigned an off-switch container + if (rootQueue) { + if (assignedOffSwitch) { + if (LOG.isDebugEnabled()) { + LOG.debug("Not assigning more than one off-switch container," + + " assignments so far: " + assignment); + } + break; + } + } else { break; } } - return assigned; + return assignment; } private synchronized boolean assignToQueue(Resource clusterResource) { @@ -573,9 +587,10 @@ public class ParentQueue implements CSQu minimumAllocation); } - synchronized Resource assignContainersToChildQueues(Resource cluster, + synchronized CSAssignment assignContainersToChildQueues(Resource cluster, SchedulerNode node) { - Resource assigned = Resources.createResource(0); + CSAssignment assignment = + new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL); printChildQueues(); @@ -586,25 +601,28 @@ public class ParentQueue implements CSQu LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath() + " stats: " + childQueue); } - assigned = childQueue.assignContainers(cluster, node); + assignment = childQueue.assignContainers(cluster, node); if(LOG.isDebugEnabled()) { - LOG.debug("Assignedto queue: " + childQueue.getQueuePath() - + " stats: " + childQueue + " --> " + assigned.getMemory()); + LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + + " stats: " + childQueue + " --> " + + assignment.getResource().getMemory() + ", " + assignment.getType()); } // If we do assign, remove the queue and re-insert in-order to re-sort - if (Resources.greaterThan(assigned, Resources.none())) { + if (Resources.greaterThan(assignment.getResource(), Resources.none())) { // Remove and re-insert to sort iter.remove(); LOG.info("Re-sorting queues since queue: " + childQueue.getQueuePath() + " stats: " + childQueue); childQueues.add(childQueue); - printChildQueues(); + if (LOG.isDebugEnabled()) { + printChildQueues(); + } break; } } - return assigned; + return assignment; } String getChildQueuesToPrint() { Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1233105&r1=1233104&r2=1233105&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Wed Jan 18 23:02:17 2012 @@ -811,49 +811,56 @@ public class TestLeafQueue { app_0.updateResourceRequests(app_0_requests_0); // Start testing... + CSAssignment assignment = null; // Start with off switch, shouldn't allocate due to delay scheduling - a.assignContainers(clusterResource, node_2); + assignment = a.assignContainers(clusterResource, node_2); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(1, app_0.getSchedulingOpportunities(priority)); assertEquals(3, app_0.getTotalRequiredResources(priority)); + assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL // Another off switch, shouldn't allocate due to delay scheduling - a.assignContainers(clusterResource, node_2); + assignment = a.assignContainers(clusterResource, node_2); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(2, app_0.getSchedulingOpportunities(priority)); assertEquals(3, app_0.getTotalRequiredResources(priority)); + assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL // Another off switch, shouldn't allocate due to delay scheduling - a.assignContainers(clusterResource, node_2); + assignment = a.assignContainers(clusterResource, node_2); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(3, app_0.getSchedulingOpportunities(priority)); assertEquals(3, app_0.getTotalRequiredResources(priority)); + assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL // Another off switch, now we should allocate // since missedOpportunities=3 and reqdContainers=3 - a.assignContainers(clusterResource, node_2); + assignment = a.assignContainers(clusterResource, node_2); verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(2, app_0.getTotalRequiredResources(priority)); + assertEquals(NodeType.OFF_SWITCH, assignment.getType()); // NODE_LOCAL - node_0 - a.assignContainers(clusterResource, node_0); + assignment = a.assignContainers(clusterResource, node_0); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(1, app_0.getTotalRequiredResources(priority)); + assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // NODE_LOCAL - node_1 - a.assignContainers(clusterResource, node_1); + assignment = a.assignContainers(clusterResource, node_1); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(0, app_0.getTotalRequiredResources(priority)); + assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // Add 1 more request to check for RACK_LOCAL app_0_requests_0.clear(); @@ -872,11 +879,12 @@ public class TestLeafQueue { String host_3 = "host_3"; // on rack_1 SchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB); - a.assignContainers(clusterResource, node_3); + assignment = a.assignContainers(clusterResource, node_3); verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset assertEquals(0, app_0.getTotalRequiredResources(priority)); + assertEquals(NodeType.RACK_LOCAL, assignment.getType()); } @Test Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java?rev=1233105&r1=1233104&r2=1233105&view=diff ============================================================================== --- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java (original) +++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java Wed Jan 18 23:02:17 2012 @@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; @@ -92,11 +93,18 @@ public class TestParentQueue { private void stubQueueAllocation(final CSQueue queue, final Resource clusterResource, final SchedulerNode node, final int allocation) { + stubQueueAllocation(queue, clusterResource, node, allocation, + NodeType.NODE_LOCAL); + } + + private void stubQueueAllocation(final CSQueue queue, + final Resource clusterResource, final SchedulerNode node, + final int allocation, final NodeType type) { // Simulate the queue allocation - doAnswer(new Answer() { + doAnswer(new Answer() { @Override - public Resource answer(InvocationOnMock invocation) throws Throwable { + public CSAssignment answer(InvocationOnMock invocation) throws Throwable { try { throw new Exception(); } catch (Exception e) { @@ -115,8 +123,8 @@ public class TestParentQueue { // Next call - nothing if (allocation > 0) { - doReturn(Resources.none()).when(queue).assignContainers( - eq(clusterResource), eq(node)); + doReturn(new CSAssignment(Resources.none(), type)). + when(queue).assignContainers(eq(clusterResource), eq(node)); // Mock the node's resource availability Resource available = node.getAvailableResource(); @@ -124,7 +132,7 @@ public class TestParentQueue { when(node).getAvailableResource(); } - return allocatedResource; + return new CSAssignment(allocatedResource, type); } }). when(queue).assignContainers(eq(clusterResource), eq(node)); @@ -401,6 +409,78 @@ public class TestParentQueue { } + @Test + public void testOffSwitchScheduling() throws Exception { + // Setup queue configs + setupSingleLevelQueues(csConf); + + Map queues = new HashMap(); + CSQueue root = + CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, queues, queues, + CapacityScheduler.queueComparator, + CapacityScheduler.applicationComparator, + TestUtils.spyHook); + + // Setup some nodes + final int memoryPerNode = 10; + final int numNodes = 2; + + SchedulerNode node_0 = + TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB); + SchedulerNode node_1 = + TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB); + + final Resource clusterResource = + Resources.createResource(numNodes * (memoryPerNode*GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Start testing + LeafQueue a = (LeafQueue)queues.get(A); + LeafQueue b = (LeafQueue)queues.get(B); + final float delta = 0.0001f; + + // Simulate B returning a container on node_0 + stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH); + stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); + root.assignContainers(clusterResource, node_0); + assertEquals(0.0f, a.getUtilization(), delta); + assertEquals(computeQueueUtilization(b, 1*GB, clusterResource), + b.getUtilization(), delta); + + // Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G + // also, B gets a scheduling opportunity since A allocates RACK_LOCAL + stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL); + stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH); + root.assignContainers(clusterResource, node_1); + InOrder allocationOrder = inOrder(a, b); + allocationOrder.verify(a).assignContainers(eq(clusterResource), + any(SchedulerNode.class)); + allocationOrder.verify(b).assignContainers(eq(clusterResource), + any(SchedulerNode.class)); + assertEquals(computeQueueUtilization(a, 2*GB, clusterResource), + a.getUtilization(), delta); + assertEquals(computeQueueUtilization(b, 2*GB, clusterResource), + b.getUtilization(), delta); + + // Now, B should get the scheduling opportunity + // since A has 2/6G while B has 2/14G, + // However, since B returns off-switch, A won't get an opportunity + stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL); + stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH); + root.assignContainers(clusterResource, node_0); + allocationOrder = inOrder(b, a); + allocationOrder.verify(b).assignContainers(eq(clusterResource), + any(SchedulerNode.class)); + allocationOrder.verify(a).assignContainers(eq(clusterResource), + any(SchedulerNode.class)); + assertEquals(computeQueueUtilization(a, 2*GB, clusterResource), + a.getUtilization(), delta); + assertEquals(computeQueueUtilization(b, 4*GB, clusterResource), + b.getUtilization(), delta); + + } + @After public void tearDown() throws Exception { } Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/c++/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 18 23:02:17 2012 @@ -1,3 +1,3 @@ -/hadoop/common/trunk/hadoop-mapreduce-project/src/c++:1159757-1232180 +/hadoop/common/trunk/hadoop-mapreduce-project/src/c++:1159757-1233103 /hadoop/core/branches/branch-0.19/mapred/src/c++:713112 /hadoop/core/trunk/src/c++:776175-784663 Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 18 23:02:17 2012 @@ -1,3 +1,3 @@ -/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib:1152502-1232180 +/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib:1152502-1233103 /hadoop/core/branches/branch-0.19/mapred/src/contrib:713112 /hadoop/core/trunk/src/contrib:784664-785643 Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/block_forensics/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 18 23:02:17 2012 @@ -1,4 +1,4 @@ -/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/block_forensics:1152502-1232180 +/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/block_forensics:1152502-1233103 /hadoop/core/branches/branch-0.19/hdfs/src/contrib/block_forensics:713112 /hadoop/core/branches/branch-0.19/mapred/src/contrib/block_forensics:713112 /hadoop/core/trunk/src/contrib/block_forensics:784664-785643 Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/build-contrib.xml ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 18 23:02:17 2012 @@ -1,3 +1,3 @@ -/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/build-contrib.xml:1161333-1232180 +/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/build-contrib.xml:1161333-1233103 /hadoop/core/branches/branch-0.19/mapred/src/contrib/build-contrib.xml:713112 /hadoop/core/trunk/src/contrib/build-contrib.xml:776175-786373 Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/build.xml ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 18 23:02:17 2012 @@ -1,3 +1,3 @@ -/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/build.xml:1161333-1232180 +/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/build.xml:1161333-1233103 /hadoop/core/branches/branch-0.19/mapred/src/contrib/build.xml:713112 /hadoop/core/trunk/src/contrib/build.xml:776175-786373 Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/data_join/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 18 23:02:17 2012 @@ -1,3 +1,3 @@ -/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/data_join:1159757-1232180 +/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/data_join:1159757-1233103 /hadoop/core/branches/branch-0.19/mapred/src/contrib/data_join:713112 /hadoop/core/trunk/src/contrib/data_join:776175-786373 Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/eclipse-plugin/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 18 23:02:17 2012 @@ -1,4 +1,4 @@ -/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/eclipse-plugin:1159757-1232180 +/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/eclipse-plugin:1159757-1233103 /hadoop/core/branches/branch-0.19/core/src/contrib/eclipse-plugin:713112 /hadoop/core/branches/branch-0.19/mapred/src/contrib/eclipse-plugin:713112 /hadoop/core/trunk/src/contrib/eclipse-plugin:776175-785643 Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/index/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 18 23:02:17 2012 @@ -1,3 +1,3 @@ -/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/index:1159757-1232180 +/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/index:1159757-1233103 /hadoop/core/branches/branch-0.19/mapred/src/contrib/index:713112 /hadoop/core/trunk/src/contrib/index:776175-786373 Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/contrib/vaidya/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 18 23:02:17 2012 @@ -1,3 +1,3 @@ -/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/vaidya:1159757-1232180 +/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/vaidya:1159757-1233103 /hadoop/core/branches/branch-0.19/mapred/src/contrib/vaidya:713112 /hadoop/core/trunk/src/contrib/vaidya:776175-786373 Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/examples/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 18 23:02:17 2012 @@ -1,3 +1,3 @@ -/hadoop/common/trunk/hadoop-mapreduce-project/src/examples:1152502-1232180 +/hadoop/common/trunk/hadoop-mapreduce-project/src/examples:1152502-1233103 /hadoop/core/branches/branch-0.19/mapred/src/examples:713112 /hadoop/core/trunk/src/examples:776175-784663 Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/java/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 18 23:02:17 2012 @@ -1,3 +1,3 @@ -/hadoop/common/trunk/hadoop-mapreduce-project/src/java:1152502-1232180 +/hadoop/common/trunk/hadoop-mapreduce-project/src/java:1152502-1233103 /hadoop/core/branches/branch-0.19/mapred/src/java:713112 /hadoop/core/trunk/src/mapred:776175-785643 Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/test/mapred/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 18 23:02:17 2012 @@ -1,3 +1,3 @@ -/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred:1152502-1232180 +/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred:1152502-1233103 /hadoop/core/branches/branch-0.19/mapred/src/test/mapred:713112 /hadoop/core/trunk/src/test/mapred:776175-785643 Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/fs/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 18 23:02:17 2012 @@ -1,4 +1,4 @@ -/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/fs:1159757-1232180 +/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/fs:1159757-1233103 /hadoop/core/branches/branch-0.19/mapred/src/test/mapred/org/apache/hadoop/fs:713112 /hadoop/core/trunk/src/test/mapred/org/apache/hadoop/fs:776175-785643 /hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs:817878-835934 Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/hdfs/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 18 23:02:17 2012 @@ -1,4 +1,4 @@ -/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/hdfs:1152502-1232180 +/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/hdfs:1152502-1233103 /hadoop/core/branches/branch-0.19/mapred/src/test/mapred/org/apache/hadoop/hdfs:713112 /hadoop/core/trunk/src/test/mapred/org/apache/hadoop/hdfs:776175-785643 /hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/hdfs:817878-835934 Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/ipc/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 18 23:02:17 2012 @@ -1,4 +1,4 @@ -/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/ipc:1159757-1232180 +/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/ipc:1159757-1233103 /hadoop/core/branches/branch-0.19/hdfs/src/test/hdfs-with-mr/org/apache/hadoop/ipc:713112 /hadoop/core/branches/branch-0.19/mapred/src/test/mapred/org/apache/hadoop/ipc:713112 /hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/ipc:776175-784663 Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 18 23:02:17 2012 @@ -1,4 +1,4 @@ -/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java:1161333-1232180 +/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java:1161333-1233103 /hadoop/core/branches/branch-0.19/mapred/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java:713112 /hadoop/core/trunk/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java:776175-785643 /hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java:817878-835934 Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 18 23:02:17 2012 @@ -1,4 +1,4 @@ -/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java:1161333-1232180 +/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java:1161333-1233103 /hadoop/core/branches/branch-0.19/mapred/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java:713112 /hadoop/core/trunk/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java:776175-785643 /hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java:817878-835934 Propchange: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/src/webapps/job/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Jan 18 23:02:17 2012 @@ -1,3 +1,3 @@ -/hadoop/common/trunk/hadoop-mapreduce-project/src/webapps/job:1152502-1232180 +/hadoop/common/trunk/hadoop-mapreduce-project/src/webapps/job:1152502-1233103 /hadoop/core/branches/branch-0.19/mapred/src/webapps/job:713112 /hadoop/core/trunk/src/webapps/job:776175-785643