hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1480824 - in /hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/...
Date Thu, 09 May 2013 22:46:43 GMT
Author: szetszwo
Date: Thu May  9 22:46:39 2013
New Revision: 1480824

URL: http://svn.apache.org/r1480824
Log:
Merge r1480440 through r1480820 from trunk.

Modified:
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt   (contents, props
changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
  (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1480440-1480820

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt Thu May  9 22:46:39
2013
@@ -224,6 +224,9 @@ Release 2.0.5-beta - UNRELEASED
     MAPREDUCE-5159. Change ValueAggregatorJob to add APIs which can support
     binary compatibility with hadoop-1 examples. (Zhijie Shen via vinodkv)
 
+    MAPREDUCE-5157. Bring back old sampler related code so that we can support
+    binary compatibility with hadoop-1 sorter example. (Zhijie Shen via vinodkv)
+
   OPTIMIZATIONS
 
     MAPREDUCE-4974. Optimising the LineRecordReader initialize() method 
@@ -383,7 +386,19 @@ Release 2.0.5-beta - UNRELEASED
     MAPREDUCE-5204. Handling YarnRemoteException separately from IOException in
     MR app after YARN-629. (Xuan Gong via vinodkv)
 
-Release 2.0.4-alpha - UNRELEASED
+    MAPREDUCE-5209. Fix units in a ShuffleScheduler log message.
+    (Tsuyoshi OZAWA via cdouglas)
+
+    MAPREDUCE-5212. Handling YarnRemoteException separately from IOException in
+    MR App's use of ClientRMProtocol after YARN-631. (Xuan Gong via vinodkv)
+
+    MAPREDUCE-5226. Handling YarnRemoteException separately from IOException in
+    MR App's use of AMRMProtocol after YARN-630. (Xuan Gong via vinodkv)
+
+    MAPREDUCE-4942. mapreduce.Job has a bunch of methods that throw 
+    InterruptedException so its incompatible with MR1. (rkanter via tucu)
+
+Release 2.0.4-alpha - 2013-04-25
 
   INCOMPATIBLE CHANGES
 

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1480440-1480820

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1480440-1480820

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
(original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
Thu May  9 22:46:39 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce.v2.app.rm;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -144,7 +145,8 @@ public abstract class RMContainerRequest
     LOG.info("blacklistDisablePercent is " + blacklistDisablePercent);
   }
 
-  protected AllocateResponse makeRemoteRequest() throws YarnRemoteException {
+  protected AllocateResponse makeRemoteRequest() throws YarnRemoteException,
+      IOException {
     AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
         applicationAttemptId, lastResponseID, super.getApplicationProgress(),
         new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>(

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
(original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
Thu May  9 22:46:39 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
@@ -202,7 +203,7 @@ public class MRAppBenchmark {
               public RegisterApplicationMasterResponse
                   registerApplicationMaster(
                       RegisterApplicationMasterRequest request)
-                      throws YarnRemoteException {
+                      throws YarnRemoteException, IOException {
                 RegisterApplicationMasterResponse response =
                     Records.newRecord(RegisterApplicationMasterResponse.class);
                 response.setMinimumResourceCapability(BuilderUtils
@@ -215,7 +216,7 @@ public class MRAppBenchmark {
               @Override
               public FinishApplicationMasterResponse finishApplicationMaster(
                   FinishApplicationMasterRequest request)
-                  throws YarnRemoteException {
+                  throws YarnRemoteException, IOException {
                 FinishApplicationMasterResponse response =
                     Records.newRecord(FinishApplicationMasterResponse.class);
                 return response;
@@ -223,7 +224,7 @@ public class MRAppBenchmark {
 
               @Override
               public AllocateResponse allocate(AllocateRequest request)
-                  throws YarnRemoteException {
+                  throws YarnRemoteException, IOException {
 
                 AllocateResponse response =
                     Records.newRecord(AllocateResponse.class);

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
(original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
Thu May  9 22:46:39 2013
@@ -100,6 +100,7 @@ public class TestLocalContainerAllocator
         when(scheduler.allocate(isA(AllocateRequest.class)))
           .thenThrow(RPCUtil.getRemoteException(new IOException("forcefail")));
       } catch (YarnRemoteException e) {
+      } catch (IOException e) {
       }
       return scheduler;
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java
(original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java
Thu May  9 22:46:39 2013
@@ -209,11 +209,7 @@ public class JobClient extends CLI {
      * completed.
      */
     public float mapProgress() throws IOException {
-      try {
-        return job.mapProgress();
-      } catch (InterruptedException ie) {
-        throw new IOException(ie);
-      }
+      return job.mapProgress();
     }
 
     /**
@@ -221,11 +217,7 @@ public class JobClient extends CLI {
      * completed.
      */
     public float reduceProgress() throws IOException {
-      try {
-        return job.reduceProgress();
-      } catch (InterruptedException ie) {
-        throw new IOException(ie);
-      }
+      return job.reduceProgress();
     }
 
     /**
@@ -245,33 +237,21 @@ public class JobClient extends CLI {
      * completed.
      */
     public float setupProgress() throws IOException {
-      try {
-        return job.setupProgress();
-      } catch (InterruptedException ie) {
-        throw new IOException(ie);
-      }
+      return job.setupProgress();
     }
 
     /**
      * Returns immediately whether the whole job is done yet or not.
      */
     public synchronized boolean isComplete() throws IOException {
-      try {
-        return job.isComplete();
-      } catch (InterruptedException ie) {
-        throw new IOException(ie);
-      }
+      return job.isComplete();
     }
 
     /**
      * True iff job completed successfully.
      */
     public synchronized boolean isSuccessful() throws IOException {
-      try {
-        return job.isSuccessful();
-      } catch (InterruptedException ie) {
-        throw new IOException(ie);
-      }
+      return job.isSuccessful();
     }
 
     /**
@@ -302,11 +282,7 @@ public class JobClient extends CLI {
      * Tells the service to terminate the current job.
      */
     public synchronized void killJob() throws IOException {
-      try {
-        job.killJob();
-      } catch (InterruptedException ie) {
-        throw new IOException(ie);
-      }
+      job.killJob();
     }
    
     
@@ -331,14 +307,10 @@ public class JobClient extends CLI {
      */
     public synchronized void killTask(TaskAttemptID taskId,
         boolean shouldFail) throws IOException {
-      try {
-        if (shouldFail) {
-          job.failTask(taskId);
-        } else {
-          job.killTask(taskId);
-        }
-      } catch (InterruptedException ie) {
-        throw new IOException(ie);
+      if (shouldFail) {
+        job.failTask(taskId);
+      } else {
+        job.killTask(taskId);
       }
     }
 
@@ -378,16 +350,12 @@ public class JobClient extends CLI {
      * Returns the counters for this job
      */
     public Counters getCounters() throws IOException {
-      try { 
-        Counters result = null;
-        org.apache.hadoop.mapreduce.Counters temp = job.getCounters();
-        if(temp != null) {
-          result = Counters.downgrade(temp);
-        }
-        return result;
-      } catch (InterruptedException ie) {
-        throw new IOException(ie);
+      Counters result = null;
+      org.apache.hadoop.mapreduce.Counters temp = job.getCounters();
+      if(temp != null) {
+        result = Counters.downgrade(temp);
       }
+      return result;
     }
     
     @Override

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java
(original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java
Thu May  9 22:46:39 2013
@@ -19,10 +19,18 @@
 package org.apache.hadoop.mapred.lib;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Random;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.Job;
 
 @InterfaceAudience.Public
@@ -30,6 +38,8 @@ import org.apache.hadoop.mapreduce.Job;
 public class InputSampler<K,V> extends 
   org.apache.hadoop.mapreduce.lib.partition.InputSampler<K, V> {
 
+  private static final Log LOG = LogFactory.getLog(InputSampler.class);
+
   public InputSampler(JobConf conf) {
     super(conf);
   }
@@ -38,4 +48,219 @@ public class InputSampler<K,V> extends 
       throws IOException, ClassNotFoundException, InterruptedException {
     writePartitionFile(new Job(job), sampler);
   }
+  /**
+   * Interface to sample using an {@link org.apache.hadoop.mapred.InputFormat}.
+   */
+  public interface Sampler<K,V> extends
+    org.apache.hadoop.mapreduce.lib.partition.InputSampler.Sampler<K, V> {
+    /**
+     * For a given job, collect and return a subset of the keys from the
+     * input data.
+     */
+    K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException;
+  }
+
+  /**
+   * Samples the first n records from s splits.
+   * Inexpensive way to sample random data.
+   */
+  public static class SplitSampler<K,V> extends
+      org.apache.hadoop.mapreduce.lib.partition.InputSampler.SplitSampler<K, V>
+          implements Sampler<K,V> {
+
+    /**
+     * Create a SplitSampler sampling <em>all</em> splits.
+     * Takes the first numSamples / numSplits records from each split.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     */
+    public SplitSampler(int numSamples) {
+      this(numSamples, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Create a new SplitSampler.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     * @param maxSplitsSampled The maximum number of splits to examine.
+     */
+    public SplitSampler(int numSamples, int maxSplitsSampled) {
+      super(numSamples, maxSplitsSampled);
+    }
+
+    /**
+     * From each split sampled, take the first numSamples / numSplits records.
+     */
+    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
+      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
+      ArrayList<K> samples = new ArrayList<K>(numSamples);
+      int splitsToSample = Math.min(maxSplitsSampled, splits.length);
+      int splitStep = splits.length / splitsToSample;
+      int samplesPerSplit = numSamples / splitsToSample;
+      long records = 0;
+      for (int i = 0; i < splitsToSample; ++i) {
+        RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
+            job, Reporter.NULL);
+        K key = reader.createKey();
+        V value = reader.createValue();
+        while (reader.next(key, value)) {
+          samples.add(key);
+          key = reader.createKey();
+          ++records;
+          if ((i+1) * samplesPerSplit <= records) {
+            break;
+          }
+        }
+        reader.close();
+      }
+      return (K[])samples.toArray();
+    }
+  }
+
+  /**
+   * Sample from random points in the input.
+   * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
+   * each split.
+   */
+  public static class RandomSampler<K,V> extends
+      org.apache.hadoop.mapreduce.lib.partition.InputSampler.RandomSampler<K, V>
+          implements Sampler<K,V> {
+
+    /**
+     * Create a new RandomSampler sampling <em>all</em> splits.
+     * This will read every split at the client, which is very expensive.
+     * @param freq Probability with which a key will be chosen.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     */
+    public RandomSampler(double freq, int numSamples) {
+      this(freq, numSamples, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Create a new RandomSampler.
+     * @param freq Probability with which a key will be chosen.
+     * @param numSamples Total number of samples to obtain from all selected
+     *                   splits.
+     * @param maxSplitsSampled The maximum number of splits to examine.
+     */
+    public RandomSampler(double freq, int numSamples, int maxSplitsSampled) {
+      super(freq, numSamples, maxSplitsSampled);
+    }
+
+    /**
+     * Randomize the split order, then take the specified number of keys from
+     * each split sampled, where each key is selected with the specified
+     * probability and possibly replaced by a subsequently selected key when
+     * the quota of keys from that split is satisfied.
+     */
+    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
+      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
+      ArrayList<K> samples = new ArrayList<K>(numSamples);
+      int splitsToSample = Math.min(maxSplitsSampled, splits.length);
+
+      Random r = new Random();
+      long seed = r.nextLong();
+      r.setSeed(seed);
+      LOG.debug("seed: " + seed);
+      // shuffle splits
+      for (int i = 0; i < splits.length; ++i) {
+        InputSplit tmp = splits[i];
+        int j = r.nextInt(splits.length);
+        splits[i] = splits[j];
+        splits[j] = tmp;
+      }
+      // our target rate is in terms of the maximum number of sample splits,
+      // but we accept the possibility of sampling additional splits to hit
+      // the target sample keyset
+      for (int i = 0; i < splitsToSample ||
+                     (i < splits.length && samples.size() < numSamples); ++i)
{
+        RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
+            Reporter.NULL);
+        K key = reader.createKey();
+        V value = reader.createValue();
+        while (reader.next(key, value)) {
+          if (r.nextDouble() <= freq) {
+            if (samples.size() < numSamples) {
+              samples.add(key);
+            } else {
+              // When exceeding the maximum number of samples, replace a
+              // random element with this one, then adjust the frequency
+              // to reflect the possibility of existing elements being
+              // pushed out
+              int ind = r.nextInt(numSamples);
+              if (ind != numSamples) {
+                samples.set(ind, key);
+              }
+              freq *= (numSamples - 1) / (double) numSamples;
+            }
+            key = reader.createKey();
+          }
+        }
+        reader.close();
+      }
+      return (K[])samples.toArray();
+    }
+  }
+
+  /**
+   * Sample from s splits at regular intervals.
+   * Useful for sorted data.
+   */
+  public static class IntervalSampler<K,V> extends
+      org.apache.hadoop.mapreduce.lib.partition.InputSampler.IntervalSampler<K, V>
+          implements Sampler<K,V> {
+
+    /**
+     * Create a new IntervalSampler sampling <em>all</em> splits.
+     * @param freq The frequency with which records will be emitted.
+     */
+    public IntervalSampler(double freq) {
+      this(freq, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Create a new IntervalSampler.
+     * @param freq The frequency with which records will be emitted.
+     * @param maxSplitsSampled The maximum number of splits to examine.
+     * @see #getSample
+     */
+    public IntervalSampler(double freq, int maxSplitsSampled) {
+      super(freq, maxSplitsSampled);
+    }
+
+    /**
+     * For each split sampled, emit when the ratio of the number of records
+     * retained to the total record count is less than the specified
+     * frequency.
+     */
+    @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
+    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
+      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
+      ArrayList<K> samples = new ArrayList<K>();
+      int splitsToSample = Math.min(maxSplitsSampled, splits.length);
+      int splitStep = splits.length / splitsToSample;
+      long records = 0;
+      long kept = 0;
+      for (int i = 0; i < splitsToSample; ++i) {
+        RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
+            job, Reporter.NULL);
+        K key = reader.createKey();
+        V value = reader.createValue();
+        while (reader.next(key, value)) {
+          ++records;
+          if ((double) kept / records < freq) {
+            ++kept;
+            samples.add(key);
+            key = reader.createKey();
+          }
+        }
+        reader.close();
+      }
+      return (K[])samples.toArray();
+    }
+  }
+
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
(original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
Thu May  9 22:46:39 2013
@@ -296,7 +296,7 @@ public class Job extends JobContextImpl 
    * it, if necessary
    */
   synchronized void ensureFreshStatus() 
-      throws IOException, InterruptedException {
+      throws IOException {
     if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
       updateStatus();
     }
@@ -306,13 +306,18 @@ public class Job extends JobContextImpl 
    * immediately
    * @throws IOException
    */
-  synchronized void updateStatus() throws IOException, InterruptedException {
-    this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
-      @Override
-      public JobStatus run() throws IOException, InterruptedException {
-        return cluster.getClient().getJobStatus(status.getJobID());
-      }
-    });
+  synchronized void updateStatus() throws IOException {
+    try {
+      this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
+        @Override
+        public JobStatus run() throws IOException, InterruptedException {
+          return cluster.getClient().getJobStatus(status.getJobID());
+        }
+      });
+    }
+    catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
     if (this.status == null) {
       throw new IOException("Job status not available ");
     }
@@ -537,7 +542,7 @@ public class Job extends JobContextImpl 
    * @return the progress of the job's map-tasks.
    * @throws IOException
    */
-  public float mapProgress() throws IOException, InterruptedException {
+  public float mapProgress() throws IOException {
     ensureState(JobState.RUNNING);
     ensureFreshStatus();
     return status.getMapProgress();
@@ -550,7 +555,7 @@ public class Job extends JobContextImpl 
    * @return the progress of the job's reduce-tasks.
    * @throws IOException
    */
-  public float reduceProgress() throws IOException, InterruptedException {
+  public float reduceProgress() throws IOException {
     ensureState(JobState.RUNNING);
     ensureFreshStatus();
     return status.getReduceProgress();
@@ -576,7 +581,7 @@ public class Job extends JobContextImpl 
    * @return the progress of the job's setup-tasks.
    * @throws IOException
    */
-  public float setupProgress() throws IOException, InterruptedException {
+  public float setupProgress() throws IOException {
     ensureState(JobState.RUNNING);
     ensureFreshStatus();
     return status.getSetupProgress();
@@ -589,7 +594,7 @@ public class Job extends JobContextImpl 
    * @return <code>true</code> if the job is complete, else <code>false</code>.
    * @throws IOException
    */
-  public boolean isComplete() throws IOException, InterruptedException {
+  public boolean isComplete() throws IOException {
     ensureState(JobState.RUNNING);
     updateStatus();
     return status.isJobComplete();
@@ -601,7 +606,7 @@ public class Job extends JobContextImpl 
    * @return <code>true</code> if the job succeeded, else <code>false</code>.
    * @throws IOException
    */
-  public boolean isSuccessful() throws IOException, InterruptedException {
+  public boolean isSuccessful() throws IOException {
     ensureState(JobState.RUNNING);
     updateStatus();
     return status.getState() == JobStatus.State.SUCCEEDED;
@@ -613,9 +618,14 @@ public class Job extends JobContextImpl 
    * 
    * @throws IOException
    */
-  public void killJob() throws IOException, InterruptedException {
+  public void killJob() throws IOException {
     ensureState(JobState.RUNNING);
-    cluster.getClient().killJob(getJobID());
+    try {
+      cluster.getClient().killJob(getJobID());
+    }
+    catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
   }
 
   /**
@@ -673,7 +683,7 @@ public class Job extends JobContextImpl 
     try {
       return getTaskCompletionEvents(startFrom, 10);
     } catch (InterruptedException ie) {
-      throw new RuntimeException(ie);
+      throw new IOException(ie);
     }
   }
 
@@ -684,13 +694,18 @@ public class Job extends JobContextImpl 
    * @throws IOException
    */
   public boolean killTask(final TaskAttemptID taskId) 
-      throws IOException, InterruptedException {
+      throws IOException {
     ensureState(JobState.RUNNING);
-    return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
-      public Boolean run() throws IOException, InterruptedException {
-        return cluster.getClient().killTask(taskId, false);
-      }
-    });
+    try {
+      return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+        public Boolean run() throws IOException, InterruptedException {
+          return cluster.getClient().killTask(taskId, false);
+        }
+      });
+    }
+    catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
   }
 
   /**
@@ -700,14 +715,19 @@ public class Job extends JobContextImpl 
    * @throws IOException
    */
   public boolean failTask(final TaskAttemptID taskId) 
-      throws IOException, InterruptedException {
+      throws IOException {
     ensureState(JobState.RUNNING);
-    return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
-      @Override
-      public Boolean run() throws IOException, InterruptedException {
-        return cluster.getClient().killTask(taskId, true);
-      }
-    });
+    try {
+      return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+        @Override
+        public Boolean run() throws IOException, InterruptedException {
+          return cluster.getClient().killTask(taskId, true);
+        }
+      });
+    }
+    catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
   }
 
   /**
@@ -718,14 +738,19 @@ public class Job extends JobContextImpl 
    * @throws IOException
    */
   public Counters getCounters() 
-      throws IOException, InterruptedException {
+      throws IOException {
     ensureState(JobState.RUNNING);
-    return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
-      @Override
-      public Counters run() throws IOException, InterruptedException {
-        return cluster.getClient().getJobCounters(getJobID());
-      }
-    });
+    try {
+      return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
+        @Override
+        public Counters run() throws IOException, InterruptedException {
+          return cluster.getClient().getJobCounters(getJobID());
+        }
+      });
+    }
+    catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
   }
 
   /**

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java
(original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java
Thu May  9 22:46:39 2013
@@ -96,8 +96,8 @@ public class InputSampler<K,V> extends C
    */
   public static class SplitSampler<K,V> implements Sampler<K,V> {
 
-    private final int numSamples;
-    private final int maxSplitsSampled;
+    protected final int numSamples;
+    protected final int maxSplitsSampled;
 
     /**
      * Create a SplitSampler sampling <em>all</em> splits.
@@ -157,9 +157,9 @@ public class InputSampler<K,V> extends C
    * each split.
    */
   public static class RandomSampler<K,V> implements Sampler<K,V> {
-    private double freq;
-    private final int numSamples;
-    private final int maxSplitsSampled;
+    protected double freq;
+    protected final int numSamples;
+    protected final int maxSplitsSampled;
 
     /**
      * Create a new RandomSampler sampling <em>all</em> splits.
@@ -249,8 +249,8 @@ public class InputSampler<K,V> extends C
    * Useful for sorted data.
    */
   public static class IntervalSampler<K,V> implements Sampler<K,V> {
-    private final double freq;
-    private final int maxSplitsSampled;
+    protected final double freq;
+    protected final int maxSplitsSampled;
 
     /**
      * Create a new IntervalSampler sampling <em>all</em> splits.

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
(original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
Thu May  9 22:46:39 2013
@@ -359,7 +359,7 @@ class ShuffleScheduler<K,V> {
       }
     }
     LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " + 
-             (System.currentTimeMillis()-shuffleStart.get()) + "s");
+             (System.currentTimeMillis()-shuffleStart.get()) + "ms");
   }
     
   public synchronized void resetKnownMaps() {

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1480440-1480820

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
(original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
Thu May  9 22:46:39 2013
@@ -137,7 +137,7 @@ public class ClientServiceDelegate {
     }
   }
 
-  private MRClientProtocol getProxy() throws YarnRemoteException {
+  private MRClientProtocol getProxy() throws YarnRemoteException, IOException {
     if (realProxy != null) {
       return realProxy;
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
(original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
Thu May  9 22:46:39 2013
@@ -362,7 +362,7 @@ public class TestClientServiceDelegate {
   }
 
   private void testRMDownForJobStatusBeforeGetAMReport(Configuration conf,
-      int noOfRetries) throws YarnRemoteException {
+      int noOfRetries) throws YarnRemoteException, IOException {
     conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
     conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
         !isAMReachableFromClient);
@@ -429,7 +429,8 @@ public class TestClientServiceDelegate {
         "N/A", 0.0f);
   }
 
-  private ResourceMgrDelegate getRMDelegate() throws YarnRemoteException {
+  private ResourceMgrDelegate getRMDelegate() throws YarnRemoteException,
+      IOException {
     ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
     when(rm.getApplicationReport(jobId.getAppId())).thenReturn(null);
     return rm;

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java?rev=1480824&r1=1480823&r2=1480824&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java
(original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java
Thu May  9 22:46:39 2013
@@ -17,23 +17,26 @@
  */
 package org.apache.hadoop.mapreduce.lib.partition;
 
+import static org.junit.Assert.assertEquals;
+
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.junit.Test;
-import static org.junit.Assert.*;
-
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.junit.Test;
 
 public class TestInputSampler {
 
@@ -47,6 +50,24 @@ public class TestInputSampler {
     public int getInit() { return i; }
   }
 
+  static class MapredSequentialSplit implements org.apache.hadoop.mapred.InputSplit {
+    private int i;
+    MapredSequentialSplit(int i) {
+      this.i = i;
+    }
+    @Override
+    public long getLength() { return 0; }
+    @Override
+    public String[] getLocations() { return new String[0]; }
+    public int getInit() { return i; }
+    @Override
+    public void write(DataOutput out) throws IOException {
+    }
+    @Override
+    public void readFields(DataInput in) throws IOException {
+    }
+  }
+
   static class TestInputSamplerIF
       extends InputFormat<IntWritable,NullWritable> {
 
@@ -90,6 +111,71 @@ public class TestInputSampler {
 
   }
 
+  static class TestMapredInputSamplerIF extends TestInputSamplerIF implements
+      org.apache.hadoop.mapred.InputFormat<IntWritable,NullWritable> {
+
+    TestMapredInputSamplerIF(int maxDepth, int numSplits, int... splitInit) {
+      super(maxDepth, numSplits, splitInit);
+    }
+
+    @Override
+    public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job,
+        int numSplits) throws IOException {
+      List<InputSplit> splits = null;
+      try {
+        splits = getSplits(Job.getInstance(job));
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+      org.apache.hadoop.mapred.InputSplit[] retVals =
+          new org.apache.hadoop.mapred.InputSplit[splits.size()];
+      for (int i = 0; i < splits.size(); ++i) {
+        MapredSequentialSplit split = new MapredSequentialSplit(
+            ((SequentialSplit) splits.get(i)).getInit());
+        retVals[i] = split;
+      }
+      return retVals;
+    }
+
+    @Override
+    public org.apache.hadoop.mapred.RecordReader<IntWritable, NullWritable>
+        getRecordReader(final org.apache.hadoop.mapred.InputSplit split,
+            JobConf job, Reporter reporter) throws IOException {
+      return new org.apache.hadoop.mapred.RecordReader
+          <IntWritable, NullWritable>() {
+        private final IntWritable i =
+            new IntWritable(((MapredSequentialSplit)split).getInit());
+        private int maxVal = i.get() + maxDepth + 1;
+
+        @Override
+        public boolean next(IntWritable key, NullWritable value)
+            throws IOException {
+          i.set(i.get() + 1);
+          return i.get() < maxVal;
+        }
+        @Override
+        public IntWritable createKey() {
+          return new IntWritable(i.get());
+        }
+        @Override
+        public NullWritable createValue() {
+          return NullWritable.get();
+        }
+        @Override
+        public long getPos() throws IOException {
+          return 0;
+        }
+        @Override
+        public void close() throws IOException {
+        }
+        @Override
+        public float getProgress() throws IOException {
+          return 0;
+        }
+      };
+    }
+  }
+
   /**
    * Verify SplitSampler contract, that an equal number of records are taken
    * from the first splits.
@@ -119,6 +205,36 @@ public class TestInputSampler {
   }
 
   /**
+   * Verify SplitSampler contract in mapred.lib.InputSampler, which is added
+   * back for binary compatibility of M/R 1.x
+   */
+  @Test (timeout = 30000)
+  @SuppressWarnings("unchecked") // IntWritable comparator not typesafe
+  public void testMapredSplitSampler() throws Exception {
+    final int TOT_SPLITS = 15;
+    final int NUM_SPLITS = 5;
+    final int STEP_SAMPLE = 5;
+    final int NUM_SAMPLES = NUM_SPLITS * STEP_SAMPLE;
+    org.apache.hadoop.mapred.lib.InputSampler.Sampler<IntWritable,NullWritable>
+        sampler = new org.apache.hadoop.mapred.lib.InputSampler.SplitSampler
+            <IntWritable,NullWritable>(NUM_SAMPLES, NUM_SPLITS);
+    int inits[] = new int[TOT_SPLITS];
+    for (int i = 0; i < TOT_SPLITS; ++i) {
+      inits[i] = i * STEP_SAMPLE;
+    }
+    Object[] samples = sampler.getSample(
+        new TestMapredInputSamplerIF(100000, TOT_SPLITS, inits),
+        new JobConf());
+    assertEquals(NUM_SAMPLES, samples.length);
+    Arrays.sort(samples, new IntWritable.Comparator());
+    for (int i = 0; i < NUM_SAMPLES; ++i) {
+      // mapred.lib.InputSampler.SplitSampler has a sampling step
+      assertEquals(i % STEP_SAMPLE + TOT_SPLITS * (i / STEP_SAMPLE),
+          ((IntWritable)samples[i]).get());
+    }
+  }
+
+  /**
    * Verify IntervalSampler contract, that samples are taken at regular
    * intervals from the given splits.
    */
@@ -146,4 +262,33 @@ public class TestInputSampler {
     }
   }
 
+  /**
+   * Verify IntervalSampler in mapred.lib.InputSampler, which is added back
+   * for binary compatibility of M/R 1.x
+   */
+  @Test (timeout = 30000)
+  @SuppressWarnings("unchecked") // IntWritable comparator not typesafe
+  public void testMapredIntervalSampler() throws Exception {
+    final int TOT_SPLITS = 16;
+    final int PER_SPLIT_SAMPLE = 4;
+    final int NUM_SAMPLES = TOT_SPLITS * PER_SPLIT_SAMPLE;
+    final double FREQ = 1.0 / TOT_SPLITS;
+    org.apache.hadoop.mapred.lib.InputSampler.Sampler<IntWritable,NullWritable>
+        sampler = new org.apache.hadoop.mapred.lib.InputSampler.IntervalSampler
+            <IntWritable,NullWritable>(FREQ, NUM_SAMPLES);
+    int inits[] = new int[TOT_SPLITS];
+    for (int i = 0; i < TOT_SPLITS; ++i) {
+      inits[i] = i;
+    }
+    Job ignored = Job.getInstance();
+    Object[] samples = sampler.getSample(new TestInputSamplerIF(
+          NUM_SAMPLES, TOT_SPLITS, inits), ignored);
+    assertEquals(NUM_SAMPLES, samples.length);
+    Arrays.sort(samples, new IntWritable.Comparator());
+    for (int i = 0; i < NUM_SAMPLES; ++i) {
+      assertEquals(i,
+          ((IntWritable)samples[i]).get());
+    }
+  }
+
 }



Mime
View raw message