hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1611413 [2/18] - in /hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client: ./ hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client-nativetask/ hadoop-mapreduce-client-nativ...
Date Thu, 17 Jul 2014 17:45:01 GMT
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/java/org/apache/hadoop/nativetask/platform/custom/CustomPlatform.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/java/org/apache/hadoop/nativetask/platform/custom/CustomPlatform.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/java/org/apache/hadoop/nativetask/platform/custom/CustomPlatform.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/java/org/apache/hadoop/nativetask/platform/custom/CustomPlatform.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.nativetask.platform.custom;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.nativetask.Platform;
+import org.apache.hadoop.nativetask.serde.custom.CustomWritable;
+import org.apache.hadoop.nativetask.serde.custom.CustomWritableSerializer;
+
+public class CustomPlatform extends Platform{
+
+  @Override
+  public void init() throws IOException {
+    registerKey(CustomWritable.class.getName(), 
+        CustomWritableSerializer.class);
+  }
+
+  @Override
+  public String name() {
+    return "CustomPlatform";
+  }
+
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/java/org/apache/hadoop/nativetask/serde/custom/CustomWritable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/java/org/apache/hadoop/nativetask/serde/custom/CustomWritable.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/java/org/apache/hadoop/nativetask/serde/custom/CustomWritable.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/java/org/apache/hadoop/nativetask/serde/custom/CustomWritable.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.nativetask.serde.custom;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+public class CustomWritable implements WritableComparable<CustomWritable>{
+  
+  private int Id_a;
+  private long Id_b;
+
+  public CustomWritable() {
+    this.Id_a = 0;
+    this.Id_b = 0;
+  }
+  
+  public CustomWritable(int a, long b){
+    this.Id_a = a;
+    this.Id_b = b;
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    Id_a = in.readInt();
+    Id_b = in.readLong();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(Id_a);
+    out.writeLong(Id_b);
+  }
+
+  @Override
+  public int compareTo(CustomWritable that) {
+    if(Id_a > that.Id_a){
+      return 1;
+    }
+    if(Id_a < that.Id_a){
+      return -1;
+    }
+    if(Id_b > that.Id_b){
+      return 1;
+    }
+    if(Id_b < that.Id_b){
+      return -1;
+    }
+    return 0;
+  }
+  
+  @Override
+  public String toString() {
+    return Id_a + "\t" + Id_b;
+  }
+
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/java/org/apache/hadoop/nativetask/serde/custom/CustomWritableSerializer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/java/org/apache/hadoop/nativetask/serde/custom/CustomWritableSerializer.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/java/org/apache/hadoop/nativetask/serde/custom/CustomWritableSerializer.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/java/org/apache/hadoop/nativetask/serde/custom/CustomWritableSerializer.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.nativetask.serde.custom;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.nativetask.INativeComparable;
+import org.apache.hadoop.mapred.nativetask.serde.DefaultSerializer;
+
+public class CustomWritableSerializer extends DefaultSerializer implements
+  INativeComparable{
+
+  @Override
+  public int getLength(Writable w) throws IOException {
+    return 12;
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/native/src/CustomComparator.cpp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/native/src/CustomComparator.cpp?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/native/src/CustomComparator.cpp (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/sdk/example/CustomModule/src/main/native/src/CustomComparator.cpp Thu Jul 17 17:44:55 2014
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <iostream>
+#include "NativeTask.h"
+
+using namespace NativeTask;
+
+namespace Custom {
+using namespace std;
+
+inline uint32_t bswap(uint32_t val) {
+  __asm__("bswap %0" : "=r" (val) : "0" (val));
+  return val;
+}
+
+int32_t ReadInt(const char * src) {
+  return (int32_t) bswap(*(uint32_t*) src);
+}
+
+inline uint64_t bswap64(uint64_t val) {
+#ifdef __X64
+  __asm__("bswapq %0" : "=r" (val) : "0" (val));
+#else
+
+  uint64_t lower = val & 0xffffffffU;
+  uint32_t higher = (val >> 32) & 0xffffffffU;
+
+  lower = bswap(lower);
+  higher = bswap(higher);
+
+  return (lower << 32) + higher;
+
+#endif
+  return val;
+}
+
+int64_t ReadLong(const char * src) {
+  return (int64_t) bswap64(*(uint64_t*) src);
+}
+
+int CustomComparator(const char * src, uint32_t srcLength, const char * dest,
+    uint32_t destLength) {
+  int32_t src_IDa = ReadInt(src);
+  int64_t src_IDb = ReadLong(src+4);
+  int32_t dest_IDa = ReadInt(dest);
+  int64_t dest_IDb = ReadLong(dest+4);
+  if(src_IDa > dest_IDa){
+    return 1;
+  }
+  if(src_IDa < dest_IDa){
+    return -1;
+  }
+  if(src_IDb > dest_IDb){
+    return 1;
+  }
+  if(src_IDb < dest_IDb){
+    return -1;
+  }
+  return 0;
+};
+
+DEFINE_NATIVE_LIBRARY(Custom) {
+  REGISTER_FUNCTION(CustomComparator,Custom);
+}
+
+}
+
+
+
+
+
+

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/CMakeLists.txt?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/CMakeLists.txt (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/CMakeLists.txt Thu Jul 17 17:44:55 2014
@@ -0,0 +1,279 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+cmake_minimum_required(VERSION 2.6 FATAL_ERROR)
+
+# Default to release builds
+set(CMAKE_BUILD_TYPE, Release)
+
+include(JNIFlags.cmake NO_POLICY_SCOPE)
+
+# Compile a library with both shared and static variants
+function(add_dual_library LIBNAME)
+    add_library(${LIBNAME} SHARED ${ARGN})
+    add_library(${LIBNAME}_static STATIC ${ARGN})
+    set_target_properties(${LIBNAME}_static PROPERTIES OUTPUT_NAME ${LIBNAME})
+endfunction(add_dual_library)
+
+# Link both a static and a dynamic target against some libraries
+function(target_link_dual_libraries LIBNAME)
+    target_link_libraries(${LIBNAME} ${ARGN})
+    target_link_libraries(${LIBNAME}_static ${ARGN})
+endfunction(target_link_dual_libraries)
+
+function(output_directory TGT DIR)
+    SET_TARGET_PROPERTIES(${TGT} PROPERTIES
+        RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}")
+    SET_TARGET_PROPERTIES(${TGT} PROPERTIES
+        ARCHIVE_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}")
+    SET_TARGET_PROPERTIES(${TGT} PROPERTIES
+        LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/${DIR}")
+endfunction(output_directory TGT DIR)
+
+function(dual_output_directory TGT DIR)
+    output_directory(${TGT} "${DIR}")
+    output_directory(${TGT}_static "${DIR}")
+endfunction(dual_output_directory TGT DIR)
+
+#
+# This macro alters the behavior of find_package and find_library.
+# It does this by setting the CMAKE_FIND_LIBRARY_SUFFIXES global variable. 
+# You should save that variable before calling this function and restore it
+# after you have accomplished your goal.
+#
+# The behavior is altered in two ways:
+# 1. We always find shared libraries, never static;
+# 2. We find shared libraries with the given version number.
+#
+# On Windows this function is a no-op.  Windows does not encode
+# version number information information into library path names.
+#
+macro(set_find_shared_library_version LVERS)
+    IF(${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
+        # Mac OS uses .dylib
+        SET(CMAKE_FIND_LIBRARY_SUFFIXES ".${LVERS}.dylib")
+    ELSEIF(${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD")
+        # FreeBSD has always .so installed.
+        SET(CMAKE_FIND_LIBRARY_SUFFIXES ".so")
+    ELSEIF(${CMAKE_SYSTEM_NAME} MATCHES "Windows")
+        # Windows doesn't support finding shared libraries by version.
+    ELSE()
+        # Most UNIX variants use .so
+        SET(CMAKE_FIND_LIBRARY_SUFFIXES ".so.${LVERS}")
+    ENDIF()
+endmacro(set_find_shared_library_version LVERS)
+
+if (NOT GENERATED_JAVAH)
+	#Must identify where the generated headers have been placed
+	  MESSAGE(FATAL_ERROR "You must set the cmake variable GENERATED_JAVAH")
+endif (NOT GENERATED_JAVAH)
+find_package(JNI REQUIRED)
+
+SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES CMAKE_FIND_LIBRARY_SUFFIXES)
+set_find_shared_library_version("1")
+SET(CMAKE_FIND_LIBRARY_SUFFIXES STORED_CMAKE_FIND_LIBRARY_SUFFIXES)
+
+# primitive configs
+set(PRFLAGS "-DSIMPLE_MEMCPY")
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${PRFLAGS} -Wall")
+set(CMAKE_LD_FLAGS "${CMAKE_LD_FLAGS} -no-undefined -version-info 0:1:0 
+		-L${_JAVA_HOME}/jre/lib/amd64/server -ljvm")
+set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CMAKE_C_FLAGS} -g -O2 -DNDEBUG -fPIC")
+set(D main/native/)
+
+SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES CMAKE_FIND_LIBRARY_SUFFIXES)
+set_find_shared_library_version("1")
+SET(CMAKE_FIND_LIBRARY_SUFFIXES STORED_CMAKE_FIND_LIBRARY_SUFFIXES)
+
+INCLUDE(CheckFunctionExists)
+INCLUDE(CheckCSourceCompiles)
+#INCLUDE(CheckLibraryExists)
+INCLUDE(CheckIncludeFiles)
+#CHECK_FUNCTION_EXISTS(sync_file_range HAVE_SYNC_FILE_RANGE)
+#CHECK_FUNCTION_EXISTS(posix_fadvise HAVE_POSIX_FADVISE)
+#CHECK_LIBRARY_EXISTS(dl dlopen "" NEED_LINK_DL)
+CHECK_INCLUDE_FILES(fcntl.h HAVE_FCNTL_H)
+CHECK_INCLUDE_FILES(malloc.h HAVE_MALLOC_H)
+CHECK_INCLUDE_FILES(mach/mach.h HAVE_MACH_MACH_H)
+CHECK_INCLUDE_FILES(memory.h HAVE_MEMORY_H)
+CHECK_INCLUDE_FILES(stddef.h HAVE_STDDEF_H)
+CHECK_INCLUDE_FILES(stdint.h HAVE_STDINT_H)
+CHECK_INCLUDE_FILES(stdlib.h HAVE_STDLIB_H)
+CHECK_INCLUDE_FILES(string.h HAVE_STRING_H)
+CHECK_INCLUDE_FILES(unistd.h HAVE_UNITSTD_H)
+CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME)
+CHECK_FUNCTION_EXISTS(localtime_r HAVE_LOCALTIME_R)
+CHECK_FUNCTION_EXISTS(memset HAVE_MEMSET)
+CHECK_FUNCTION_EXISTS(strchr HAVE_STRCHR)
+CHECK_FUNCTION_EXISTS(strtoul HAVE_STRTOUL)
+
+
+SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES CMAKE_FIND_LIBRARY_SUFFIXES)
+set_find_shared_library_version("1")
+find_library(SNAPPY_LIBRARY 
+    NAMES snappy
+    PATHS ${CUSTOM_SNAPPY_PREFIX} ${CUSTOM_SNAPPY_PREFIX}/lib
+          ${CUSTOM_SNAPPY_PREFIX}/lib64 ${CUSTOM_SNAPPY_LIB})
+SET(CMAKE_FIND_LIBRARY_SUFFIXES STORED_CMAKE_FIND_LIBRARY_SUFFIXES)
+find_path(SNAPPY_INCLUDE_DIR 
+    NAMES snappy.h
+    PATHS ${CUSTOM_SNAPPY_PREFIX} ${CUSTOM_SNAPPY_PREFIX}/include
+          ${CUSTOM_SNAPPY_INCLUDE})
+if (SNAPPY_LIBRARY AND SNAPPY_INCLUDE_DIR)
+    GET_FILENAME_COMPONENT(HADOOP_SNAPPY_LIBRARY ${SNAPPY_LIBRARY} NAME)
+    set(SNAPPY_SOURCE_FILES
+        "${D}/src/codec/SnappyCodec.cc")
+else (SNAPPY_LIBRARY AND SNAPPY_INCLUDE_DIR)
+    set(SNAPPY_INCLUDE_DIR "")
+    set(SNAPPY_SOURCE_FILES "")
+    IF(REQUIRE_SNAPPY)
+        MESSAGE(FATAL_ERROR "Required snappy library could not be found.  SNAPPY_LIBRARY=${SNAPPY_LIBRARY}, SNAPPY_INCLUDE_DIR=${SNAPPY_INCLUDE_DIR}, CUSTOM_SNAPPY_INCLUDE_DIR=${CUSTOM_SNAPPY_INCLUDE_DIR}, CUSTOM_SNAPPY_PREFIX=${CUSTOM_SNAPPY_PREFIX}, CUSTOM_SNAPPY_INCLUDE=${CUSTOM_SNAPPY_INCLUDE}")
+    ENDIF(REQUIRE_SNAPPY)
+endif (SNAPPY_LIBRARY AND SNAPPY_INCLUDE_DIR)
+
+include_directories(
+	  ${GENERATED_JAVAH}
+		${D}
+    ${D}/src
+		${D}/src/util
+		${D}/src/lib
+		${D}/test
+    ${CMAKE_CURRENT_SOURCE_DIR}
+		#${CMAKE_CURRENT_SOURCE_DIR}/src
+		#${CMAKE_BINARY_DIR}
+    ${JNI_INCLUDE_DIRS}
+    ${SNAPPY_INCLUDE_DIR}
+)
+
+#SET(CMAKE_SOURCE_DIR "/cygdrive/c/Users/tianlunz/repo/hadoop-2.2.0-src/hadoop-common-project/hadoop-common/src")
+CONFIGURE_FILE(${CMAKE_SOURCE_DIR}/config.h.cmake ${CMAKE_BINARY_DIR}/config.h)
+
+
+SET(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)
+
+add_dual_library(nativetask
+    ${D}/lz4/lz4.c
+    ${D}/cityhash/city.cc
+    ${D}/src/codec/BlockCodec.cc
+    ${D}/src/codec/GzipCodec.cc
+    ${D}/src/codec/Lz4Codec.cc
+    ${SNAPPY_SOURCE_FILES}
+    ${D}/src/handler/BatchHandler.cc
+    ${D}/src/handler/MCollectorOutputHandler.cc
+    ${D}/src/handler/AbstractMapHandler.cc
+    ${D}/src/handler/CombineHandler.cc
+    ${D}/src/lib/Buffers.cc
+    ${D}/src/lib/BufferStream.cc
+    ${D}/src/lib/Compressions.cc
+    ${D}/src/lib/PartitionBucket.cc
+    ${D}/src/lib/PartitionBucketIterator.cc
+    ${D}/src/lib/FileSystem.cc
+    ${D}/src/lib/IFile.cc
+    ${D}/src/lib/jniutils.cc
+    ${D}/src/lib/Log.cc
+    ${D}/src/lib/MapOutputCollector.cc
+    ${D}/src/lib/MapOutputSpec.cc
+    ${D}/src/lib/MemoryBlock.cc
+    ${D}/src/lib/Merge.cc
+    ${D}/src/lib/NativeLibrary.cc
+    ${D}/src/lib/Iterator.cc
+    ${D}/src/lib/NativeObjectFactory.cc
+    ${D}/src/lib/NativeRuntimeJniImpl.cc
+    ${D}/src/lib/NativeTask.cc
+    ${D}/src/lib/SpillInfo.cc
+    ${D}/src/lib/Path.cc
+    ${D}/src/lib/Streams.cc
+    ${D}/src/lib/Combiner.cc
+    ${D}/src/lib/TaskCounters.cc
+    ${D}/src/util/Checksum.cc
+    ${D}/src/util/Hash.cc
+    ${D}/src/util/Random.cc
+    ${D}/src/util/StringUtil.cc
+    ${D}/src/util/SyncUtils.cc
+    ${D}/src/util/Timer.cc
+    ${D}/src/util/WritableUtils.cc
+)
+target_link_libraries(nativetask
+	#${LIB_DL}
+	  dl
+		rt
+		pthread
+		z
+		${SNAPPY_LIBRARY}
+		${JAVA_JVM_LIBRARY}
+)
+
+add_executable(nttest 
+    ${D}/gtest/gtest-all.cc
+    ${D}/test/lib/TestByteArray.cc      
+    ${D}/test/lib/TestByteBuffer.cc      
+    ${D}/test/lib/TestComparatorForDualPivotQuickSort.cc 
+    ${D}/test/lib/TestComparatorForStdSort.cc 
+    ${D}/test/lib/TestFixSizeContainer.cc 
+    ${D}/test/lib/TestMemoryPool.cc 
+    ${D}/test/lib/TestIterator.cc 
+    ${D}/test/lib/TestKVBuffer.cc 
+    ${D}/test/lib/TestMemBlockIterator.cc 
+    ${D}/test/lib/TestMemoryBlock.cc 
+    ${D}/test/lib/TestPartitionBucket.cc 
+    ${D}/test/lib/TestReadBuffer.cc 
+    ${D}/test/lib/TestReadWriteBuffer.cc 
+    ${D}/test/lib/TestTrackingCollector.cc 
+    ${D}/test/util/TestChecksum.cc             
+    ${D}/test/util/TestHash.cc                 
+    ${D}/test/util/TestStringUtil.cc           
+    ${D}/test/util/TestSyncUtils.cc            
+    ${D}/test/util/TestWritableUtils.cc        
+    ${D}/test/TestCommand.cc                   
+    ${D}/test/TestConfig.cc                   
+    ${D}/test/TestCounter.cc                   
+    ${D}/test/TestCompressions.cc              
+    ${D}/test/TestFileSystem.cc                
+    ${D}/test/TestIFile.cc                     
+    ${D}/test/TestPrimitives.cc                
+    ${D}/test/TestSort.cc                      
+    ${D}/test/TestMain.cc                      
+    ${D}/test/test_commons.cc)
+set(CMAKE_EXE_LINKER_FLAGS "-L${_JAVA_HOME}/jre/lib/amd64/server -ljvm")
+target_link_libraries(nttest 
+   	nativetask_static
+	  dl
+	  rt
+	  pthread
+	  z
+	  ${SNAPPY_LIBRARY}
+		#	  ${JAVA_JVM_LIBRARY}
+)
+#if (NEED_LINK_DL)
+#   set(LIB_DL dl)
+#endif (NEED_LINK_DL)
+
+IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux")
+    #
+    # By embedding '$ORIGIN' into the RPATH of libnativetask.so,
+    # dlopen will look in the directory containing libnativetask.so.
+    # However, $ORIGIN is not supported by all operating systems.
+    #
+    SET_TARGET_PROPERTIES(nativetask
+        PROPERTIES INSTALL_RPATH "\$ORIGIN/")
+ENDIF()
+
+SET(LIBNATIVETASK_VERSION "1.0.0")
+SET_TARGET_PROPERTIES(nativetask PROPERTIES SOVERSION ${LIBNATIVETASK_VERSION})
+dual_output_directory(nativetask target/usr/local/lib)
+output_directory(nttest test)

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/JNIFlags.cmake
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/JNIFlags.cmake?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/JNIFlags.cmake (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/JNIFlags.cmake Thu Jul 17 17:44:55 2014
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+cmake_minimum_required(VERSION 2.6 FATAL_ERROR)
+
+# If JVM_ARCH_DATA_MODEL is 32, compile all binaries as 32-bit.
+# This variable is set by maven.
+if (JVM_ARCH_DATA_MODEL EQUAL 32)
+    # Force 32-bit code generation on amd64/x86_64, ppc64, sparc64
+    if (CMAKE_COMPILER_IS_GNUCC AND CMAKE_SYSTEM_PROCESSOR MATCHES ".*64")
+        set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -m32")
+        set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -m32")
+        set(CMAKE_LD_FLAGS "${CMAKE_LD_FLAGS} -m32")
+    endif ()
+    if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64")
+        # Set CMAKE_SYSTEM_PROCESSOR to ensure that find_package(JNI) will use
+        # the 32-bit version of libjvm.so.
+        set(CMAKE_SYSTEM_PROCESSOR "i686")
+    endif ()
+endif (JVM_ARCH_DATA_MODEL EQUAL 32)
+
+# Determine float ABI of JVM on ARM Linux
+if (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm" AND CMAKE_SYSTEM_NAME STREQUAL "Linux")
+    find_program(READELF readelf)
+    if (READELF MATCHES "NOTFOUND")
+        message(WARNING "readelf not found; JVM float ABI detection disabled")
+    else (READELF MATCHES "NOTFOUND")
+        execute_process(
+            COMMAND ${READELF} -A ${JAVA_JVM_LIBRARY}
+            OUTPUT_VARIABLE JVM_ELF_ARCH
+            ERROR_QUIET)
+        if (NOT JVM_ELF_ARCH MATCHES "Tag_ABI_VFP_args: VFP registers")
+            message("Soft-float JVM detected")
+
+            # Test compilation with -mfloat-abi=softfp using an arbitrary libc function
+            # (typically fails with "fatal error: bits/predefs.h: No such file or directory"
+            # if soft-float dev libraries are not installed)
+            include(CMakePushCheckState)
+            cmake_push_check_state()
+            set(CMAKE_REQUIRED_FLAGS "${CMAKE_REQUIRED_FLAGS} -mfloat-abi=softfp")
+            include(CheckSymbolExists)
+            check_symbol_exists(exit stdlib.h SOFTFP_AVAILABLE)
+            if (NOT SOFTFP_AVAILABLE)
+                message(FATAL_ERROR "Soft-float dev libraries required (e.g. 'apt-get install libc6-dev-armel' on Debian/Ubuntu)")
+            endif (NOT SOFTFP_AVAILABLE)
+            cmake_pop_check_state()
+
+            set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -mfloat-abi=softfp")
+        endif ()
+    endif (READELF MATCHES "NOTFOUND")
+endif (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm" AND CMAKE_SYSTEM_NAME STREQUAL "Linux")
+
+IF("${CMAKE_SYSTEM}" MATCHES "Linux")
+    #
+    # Locate JNI_INCLUDE_DIRS and JNI_LIBRARIES.
+    # Since we were invoked from Maven, we know that the JAVA_HOME environment
+    # variable is valid.  So we ignore system paths here and just use JAVA_HOME.
+    #
+    FILE(TO_CMAKE_PATH "$ENV{JAVA_HOME}" _JAVA_HOME)
+    IF(CMAKE_SYSTEM_PROCESSOR MATCHES "^i.86$")
+        SET(_java_libarch "i386")
+    ELSEIF (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64")
+        SET(_java_libarch "amd64")
+    ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "^arm")
+        SET(_java_libarch "arm")
+    ELSE()
+        SET(_java_libarch ${CMAKE_SYSTEM_PROCESSOR})
+    ENDIF()
+    SET(_JDK_DIRS "${_JAVA_HOME}/jre/lib/${_java_libarch}/*"
+                  "${_JAVA_HOME}/jre/lib/${_java_libarch}"
+                  "${_JAVA_HOME}/jre/lib/*"
+                  "${_JAVA_HOME}/jre/lib"
+                  "${_JAVA_HOME}/lib/*"
+                  "${_JAVA_HOME}/lib"
+                  "${_JAVA_HOME}/include/*"
+                  "${_JAVA_HOME}/include"
+                  "${_JAVA_HOME}"
+    )
+    FIND_PATH(JAVA_INCLUDE_PATH
+        NAMES jni.h 
+        PATHS ${_JDK_DIRS}
+        NO_DEFAULT_PATH)
+    #In IBM java, it's jniport.h instead of jni_md.h
+    FIND_PATH(JAVA_INCLUDE_PATH2 
+        NAMES jni_md.h jniport.h
+        PATHS ${_JDK_DIRS}
+        NO_DEFAULT_PATH)
+    SET(JNI_INCLUDE_DIRS ${JAVA_INCLUDE_PATH} ${JAVA_INCLUDE_PATH2})
+    FIND_LIBRARY(JAVA_JVM_LIBRARY
+        NAMES rt jvm 
+        PATHS ${_JDK_DIRS}
+        NO_DEFAULT_PATH)
+    SET(JNI_LIBRARIES ${JAVA_JVM_LIBRARY})
+    MESSAGE("JAVA_HOME=${JAVA_HOME}, JAVA_JVM_LIBRARY=${JAVA_JVM_LIBRARY}")
+    MESSAGE("JAVA_INCLUDE_PATH=${JAVA_INCLUDE_PATH}, JAVA_INCLUDE_PATH2=${JAVA_INCLUDE_PATH2}")
+    IF(JAVA_JVM_LIBRARY AND JAVA_INCLUDE_PATH AND JAVA_INCLUDE_PATH2)
+        MESSAGE("Located all JNI components successfully.")
+    ELSE()
+        MESSAGE(FATAL_ERROR "Failed to find a viable JVM installation under JAVA_HOME.")
+    ENDIF()
+ELSE()
+    find_package(JNI REQUIRED)
+ENDIF()

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/config.h.cmake
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/config.h.cmake?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/config.h.cmake (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/config.h.cmake Thu Jul 17 17:44:55 2014
@@ -0,0 +1,23 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef CONFIG_H
+#define CONFIG_H
+
+#cmakedefine HADOOP_SNAPPY_LIBRARY "@HADOOP_SNAPPY_LIBRARY@"
+
+#endif

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Command.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Command.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Command.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Command.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+public class Command {
+
+  private int id;
+  private String description;
+
+  public Command(int id) {
+    this.id = id;
+  }
+  
+  public Command(int id, String description) {
+    this.id = id;
+    this.description = description;
+  }
+  
+  public int id() {
+    return this.id;
+  }
+  
+  public String description() {
+    return this.description;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other instanceof Command) {
+      return this.id == ((Command)other).id;
+    }
+    return false;
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/CommandDispatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/CommandDispatcher.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/CommandDispatcher.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/CommandDispatcher.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+
+/**
+ * a CommandDispatcher receives {@link Command} from upstream
+ * and performs corresponding operations
+ */
+public interface CommandDispatcher {
+
+  /**
+   *
+   * @param command
+   * @param parameter
+   * @return
+   * @throws IOException
+   */
+  public ReadWriteBuffer onCall(Command command, ReadWriteBuffer parameter) throws IOException;
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Constants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Constants.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Constants.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Constants.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+public class Constants {
+
+  public static final String MAP_SORT_CLASS = "map.sort.class";
+  public static final String MAPRED_COMBINER_CLASS = "mapred.combiner.class";
+
+  public static final String MAPRED_MAPTASK_DELEGATOR_CLASS = "mapreduce.map.task.delegator.class";
+  public static final String MAPRED_REDUCETASK_DELEGATOR_CLASS = "mapreduce.reduce.task.delegator.class";
+  public static final String NATIVE_TASK_ENABLED = "native.task.enabled";
+  public static final String NATIVE_LOG_DEVICE = "native.log.device";
+  public static final String NATIVE_HADOOP_VERSION = "native.hadoop.version";
+
+  public static final String NATIVE_MAPPER_CLASS = "native.mapper.class";
+  public static final String NATIVE_REDUCER_CLASS = "native.reducer.class";
+  public static final String NATIVE_PARTITIONER_CLASS = "native.partitioner.class";
+  public static final String NATIVE_COMBINER_CLASS = "native.combiner.class";
+  public static final String NATIVE_INPUT_SPLIT = "native.input.split";
+
+  public static final String NATIVE_RECORDREADER_CLASS = "native.recordreader.class";
+  public static final String NATIVE_RECORDWRITER_CLASS = "native.recordwriter.class";
+  public static final String NATIVE_OUTPUT_FILE_NAME = "native.output.file.name";
+
+  public static final String NATIVE_PROCESSOR_BUFFER_KB = "native.processor.buffer.kb";
+  public static int NATIVE_PROCESSOR_BUFFER_KB_DEFAULT = 64;
+  public static int NATIVE_ASYNC_PROCESSOR_BUFFER_KB_DEFAULT = 1024;
+
+  public static final String NATIVE_STATUS_UPDATE_INTERVAL = "native.update.interval";
+  public static int NATIVE_STATUS_UPDATE_INTERVAL_DEFVAL = 3000;
+
+  public static final String SERIALIZATION_FRAMEWORK = "SerializationFramework";
+  public static int SIZEOF_PARTITION_LENGTH = 4;
+  public static int SIZEOF_KEY_LENGTH = 4;
+  public static int SIZEOF_VALUE_LENGTH = 4;
+  public static int SIZEOF_KV_LENGTH = SIZEOF_KEY_LENGTH + SIZEOF_VALUE_LENGTH;
+  
+  public static final String NATIVE_CLASS_LIBRARY = "native.class.library";
+  public static final String NATIVE_CLASS_LIBRARY_CUSTOM = "native.class.library.custom";
+  public static final String NATIVE_CLASS_LIBRARY_BUILDIN = "native.class.library.buildin";
+  public static final String NATIVE_MAPOUT_KEY_COMPARATOR = "native.map.output.key.comparator";
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataChannel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataChannel.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataChannel.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataChannel.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+public enum DataChannel {
+  /**
+   * We will only read data from this channel
+   */
+  IN,
+  /**
+   * We will only write data from this channel
+   */
+  OUT,
+  /**
+   * We will do both read and write for this channel
+   */
+  INOUT,
+  /**
+   * There is no data exchange
+   */
+  NONE
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataReceiver.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataReceiver.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/DataReceiver.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+
+/**
+ * a DataReceiver pulls in arriving data, an example
+ * is {@link org.apache.hadoop.mapred.nativetask.handlers.BufferPuller}
+ */
+public interface DataReceiver {
+
+  /**
+   * Send a signal to the receiver that the data arrives.
+   * The data is transferred in another band.
+   * 
+   * @return
+   * @throws IOException
+   */
+  public boolean receiveData() throws IOException;
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/HadoopPlatform.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/HadoopPlatform.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/HadoopPlatform.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/HadoopPlatform.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VIntWritable;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.nativetask.serde.*;
+import org.apache.log4j.Logger;
+
+public class HadoopPlatform extends Platform {
+  private static final Logger LOG = Logger.getLogger(HadoopPlatform.class);
+
+  public HadoopPlatform() throws IOException {
+  }
+
+  @Override
+  public void init() throws IOException {
+    registerKey(NullWritable.class.getName(), NullWritableSerializer.class);
+    registerKey(Text.class.getName(), TextSerializer.class);
+    registerKey(LongWritable.class.getName(), LongWritableSerializer.class);
+    registerKey(IntWritable.class.getName(), IntWritableSerializer.class);
+    registerKey(Writable.class.getName(), DefaultSerializer.class);
+    registerKey(BytesWritable.class.getName(), BytesWritableSerializer.class);
+    registerKey(BooleanWritable.class.getName(), BoolWritableSerializer.class);
+    registerKey(ByteWritable.class.getName(), ByteWritableSerializer.class);
+    registerKey(FloatWritable.class.getName(), FloatWritableSerializer.class);
+    registerKey(DoubleWritable.class.getName(), DoubleWritableSerializer.class);
+    registerKey(VIntWritable.class.getName(), VIntWritableSerializer.class);
+    registerKey(VLongWritable.class.getName(), VLongWritableSerializer.class);
+
+    LOG.info("Hadoop platform inited");
+  }
+
+  @Override
+  public boolean support(String keyClassName, INativeSerializer serializer, JobConf job) {
+    if (keyClassNames.contains(keyClassName)
+      && serializer instanceof INativeComparable) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean define(Class comparatorClass) {
+    return false;
+  }
+
+  @Override
+  public String name() {
+    return "Hadoop";
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/ICombineHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/ICombineHandler.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/ICombineHandler.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/ICombineHandler.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+
+/**
+ * interacts with native side to support Java Combiner
+ */
+public interface ICombineHandler {
+
+  /**
+   * run combiner
+   * @throws IOException
+   */
+  public void combine() throws IOException;
+
+  /**
+   * @return id of this handler
+   */
+  public long getId();
+
+  /**
+   * close handlers, buffer pullers and pushers
+   * @throws IOException
+   */
+  public void close() throws IOException;
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeComparable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeComparable.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeComparable.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeComparable.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+/**
+ *
+ * Any key type that is comparable at native side must implement this interface
+ *
+ * a native comparator function should have the ComparatorPtr type
+ *
+ *   typedef int (*ComparatorPtr)(const char * src, uint32_t srcLength,
+ *   const char * dest,  uint32_t destLength);
+ *
+ * keys are in serialized format at native side. The function has passed in
+ * the keys' locations and lengths such that we can compare them in the same
+ * logic as their Java comparator
+ *
+ *
+ * For example, a HiveKey {@see HiveKey#write} is serialized as
+ * int field (containing the length of raw bytes) + raw bytes
+ * When comparing two HiveKeys, we firstly read the length field and then
+ * comparing the raw bytes invoking the BytesComparator provided by our library.
+ * We pass the location and length of raw bytes into BytesComparator
+ *
+ *   int HivePlatform::HiveKeyComparator(const char * src, uint32_t srcLength,
+ *   const char * dest, uint32_t destLength) {
+ *     uint32_t sl = bswap(*(uint32_t*)src);
+ *     uint32_t dl = bswap(*(uint32_t*)dest);
+ *     return NativeObjectFactory::BytesComparator(src + 4, sl, dest + 4, dl);
+ *   }
+ */
+public interface INativeComparable {
+
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeHandler.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeHandler.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/INativeHandler.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+
+/**
+ * A Handler accept input, and give output can be used to transfer command and data
+ */
+public interface INativeHandler extends NativeDataTarget, NativeDataSource {
+
+  public String name();
+
+  public long getNativeHandler();
+
+  /**
+   * init the native handler
+   */
+  public void init(Configuration conf) throws IOException;
+
+  /**
+   * close the native handler
+   */
+  public void close() throws IOException;
+
+  /**
+   * call command to downstream
+   * 
+   * @param command
+   * @param parameter
+   * @return
+   * @throws IOException
+   */
+  public ReadWriteBuffer call(Command command, ReadWriteBuffer parameter) throws IOException;
+
+  /**
+   * @param handler
+   */
+  void setCommandDispatcher(CommandDispatcher handler);
+
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeBatchProcessor.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.nativetask.buffer.BufferType;
+import org.apache.hadoop.mapred.nativetask.buffer.DirectBufferPool;
+import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer;
+import org.apache.hadoop.mapred.nativetask.buffer.OutputBuffer;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+import org.apache.hadoop.mapred.nativetask.util.ConfigUtil;
+
+/**
+ * used to create channel, transfer data and command between Java and native
+ */
+public class NativeBatchProcessor implements INativeHandler {
+  private static Log LOG = LogFactory.getLog(NativeBatchProcessor.class);
+
+  private final String nativeHandlerName;
+  private long nativeHandlerAddr;
+
+  private boolean isInputFinished = false;
+
+  // << Field used directly in Native, the name must NOT be changed
+  private ByteBuffer rawOutputBuffer;
+  private ByteBuffer rawInputBuffer;
+  // >>
+
+  private InputBuffer in;
+  private OutputBuffer out;
+
+  private CommandDispatcher commandDispatcher;
+  private DataReceiver dataReceiver;
+
+  static {
+    if (NativeRuntime.isNativeLibraryLoaded()) {
+      InitIDs();
+    }
+  }
+
+  public static INativeHandler create(String nativeHandlerName,
+      Configuration conf, DataChannel channel) throws IOException {
+
+    final int bufferSize = conf.getInt(Constants.NATIVE_PROCESSOR_BUFFER_KB,
+        1024) * 1024;
+
+    LOG.info("NativeHandler: direct buffer size: " + bufferSize);
+
+    OutputBuffer out = null;
+    InputBuffer in = null;
+
+    switch (channel) {
+    case IN:
+      in = new InputBuffer(BufferType.DIRECT_BUFFER, bufferSize);
+      break;
+    case OUT:
+      out = new OutputBuffer(BufferType.DIRECT_BUFFER, bufferSize);
+      break;
+    case INOUT:
+      in = new InputBuffer(BufferType.DIRECT_BUFFER, bufferSize);
+      out = new OutputBuffer(BufferType.DIRECT_BUFFER, bufferSize);
+      break;
+    case NONE:
+    }
+
+    final INativeHandler handler = new NativeBatchProcessor(nativeHandlerName,
+        in, out);
+    handler.init(conf);
+    return handler;
+  }
+
+  protected NativeBatchProcessor(String nativeHandlerName, InputBuffer input,
+      OutputBuffer output) throws IOException {
+    this.nativeHandlerName = nativeHandlerName;
+
+    if (null != input) {
+      this.in = input;
+      this.rawInputBuffer = input.getByteBuffer();
+    }
+    if (null != output) {
+      this.out = output;
+      this.rawOutputBuffer = output.getByteBuffer();
+    }
+  }
+
+  @Override
+  public void setCommandDispatcher(CommandDispatcher handler) {
+    this.commandDispatcher = handler;
+  }
+
+  @Override
+  public void init(Configuration conf) throws IOException {
+    this.nativeHandlerAddr = NativeRuntime
+        .createNativeObject(nativeHandlerName);
+    if (this.nativeHandlerAddr == 0) {
+      throw new RuntimeException("Native object create failed, class: "
+          + nativeHandlerName);
+    }
+    setupHandler(nativeHandlerAddr, ConfigUtil.toBytes(conf));
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (nativeHandlerAddr != 0) {
+      NativeRuntime.releaseNativeObject(nativeHandlerAddr);
+      nativeHandlerAddr = 0;
+    }
+    if (null != in && null != in.getByteBuffer() && in.getByteBuffer().isDirect()) {
+      DirectBufferPool.getInstance().returnBuffer(in.getByteBuffer());
+    }
+  }
+
+  @Override
+  public long getNativeHandler() {
+    return nativeHandlerAddr;
+  }
+
+  @Override
+  public ReadWriteBuffer call(Command command, ReadWriteBuffer parameter)
+      throws IOException {
+    final byte[] bytes = nativeCommand(nativeHandlerAddr, command.id(),
+        null == parameter ? null : parameter.getBuff());
+
+    final ReadWriteBuffer result = new ReadWriteBuffer(bytes);
+    result.setWritePoint(bytes.length);
+    return result;
+  }
+
+  @Override
+  public void sendData() throws IOException {
+    nativeProcessInput(nativeHandlerAddr, rawOutputBuffer.position());
+    rawOutputBuffer.position(0);
+  }
+
+  @Override
+  public void finishSendData() throws IOException {
+    if (null == rawOutputBuffer || isInputFinished) {
+      return;
+    }
+
+    sendData();
+    nativeFinish(nativeHandlerAddr);
+    isInputFinished = true;
+  }
+
+  private byte[] sendCommandToJava(int command, byte[] data) throws IOException {
+    try {
+
+      final Command cmd = new Command(command);
+      ReadWriteBuffer param = null;
+
+      if (null != data) {
+        param = new ReadWriteBuffer();
+        param.reset(data);
+        param.setWritePoint(data.length);
+      }
+
+      if (null != commandDispatcher) {
+        ReadWriteBuffer result = null;
+
+        result = commandDispatcher.onCall(cmd, param);
+        if (null != result) {
+          return result.getBuff();
+        } else {
+          return null;
+        }
+      } else {
+        return null;
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Called by native side, clean output buffer so native side can continue
+   * processing
+   */
+  private void flushOutput(int length) throws IOException {
+
+    if (null != rawInputBuffer) {
+      rawInputBuffer.position(0);
+      rawInputBuffer.limit(length);
+
+      if (null != dataReceiver) {
+        try {
+          dataReceiver.receiveData();
+        } catch (IOException e) {
+          e.printStackTrace();
+          throw e;
+        }
+      }
+    }
+  }
+
+  /**
+   * Cache JNI field & method ids
+   */
+  private static native void InitIDs();
+
+  /**
+   * Setup native side BatchHandler
+   */
+  private native void setupHandler(long nativeHandlerAddr, byte[][] configs);
+
+  /**
+   * Let native side to process data in inputBuffer
+   * 
+   * @param handler
+   * @param length
+   */
+  private native void nativeProcessInput(long handler, int length);
+
+  /**
+   * Notice native side input is finished
+   * 
+   * @param handler
+   */
+  private native void nativeFinish(long handler);
+
+  /**
+   * Send control message to native side
+   * 
+   * @param cmd
+   *          command data
+   * @return return value
+   */
+  private native byte[] nativeCommand(long handler, int cmd, byte[] parameter);
+
+  /**
+   * Load data from native
+   * 
+   * @return
+   */
+  private native void nativeLoadData(long handler);
+
+  protected void finishOutput() {
+  }
+
+  @Override
+  public InputBuffer getInputBuffer() {
+    return this.in;
+  }
+
+  @Override
+  public OutputBuffer getOutputBuffer() {
+    return this.out;
+  }
+
+  @Override
+  public void loadData() throws IOException {
+    nativeLoadData(nativeHandlerAddr);
+    //
+    // return call(Command.CMD_LOAD, param);
+  }
+
+  @Override
+  public void setDataReceiver(DataReceiver handler) {
+    this.dataReceiver = handler;
+  }
+
+  @Override
+  public String name() {
+    return nativeHandlerName;
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataSource.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataSource.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataSource.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataSource.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+
+/**
+ * NativeDataSource loads data from upstream
+ */
+public interface NativeDataSource {
+
+  /**
+   * get input buffer
+   * 
+   * @return
+   */
+  public InputBuffer getInputBuffer();
+
+  /**
+   * set listener. When data from upstream arrives, the listener will be activated.
+   * 
+   * @param handler
+   */
+  void setDataReceiver(DataReceiver handler);
+
+  /**
+   * load data from upstream
+   * 
+   * @return
+   * @throws IOException
+   */
+  public void loadData() throws IOException;
+
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataTarget.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataTarget.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataTarget.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeDataTarget.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.nativetask.buffer.OutputBuffer;
+
+/**
+ * NativeDataTarge sends data to downstream
+ */
+public interface NativeDataTarget {
+
+  /**
+   * send a signal to indicate that the data has been stored in output buffer
+   * 
+   * @throws IOException
+   */
+  public void sendData() throws IOException;
+
+  /**
+   * Send a signal that there is no more data
+   * 
+   * @throws IOException
+   */
+  public void finishSendData() throws IOException;
+
+  /**
+   * get the output buffer.
+   * 
+   * @return
+   */
+  public OutputBuffer getOutputBuffer();
+
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeMapOutputCollectorDelegator.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputCollector;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.nativetask.handlers.NativeCollectorOnlyHandler;
+import org.apache.hadoop.mapred.nativetask.serde.INativeSerializer;
+import org.apache.hadoop.mapred.nativetask.serde.NativeSerialization;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.util.QuickSort;
+import org.apache.hadoop.util.RunJar;
+
+/**
+ * native map output collector wrapped in Java interface
+ */
+public class NativeMapOutputCollectorDelegator<K, V> implements MapOutputCollector<K, V> {
+
+  private static Log LOG = LogFactory.getLog(NativeMapOutputCollectorDelegator.class);
+  private JobConf job;
+  private NativeCollectorOnlyHandler<K, V> handler;
+
+  private StatusReportChecker updater;
+
+  @Override
+  public void collect(K key, V value, int partition) throws IOException, InterruptedException {
+    handler.collect(key, value, partition);
+  }
+
+  @Override
+  public void close() throws IOException, InterruptedException {
+    handler.close();
+    if (null != updater) {
+      updater.stop();
+    }
+  }
+
+  @Override
+  public void flush() throws IOException, InterruptedException, ClassNotFoundException {
+    handler.flush();
+  }
+
+  @Override
+  public void init(Context context) throws IOException, ClassNotFoundException {
+    this.job = context.getJobConf();
+
+    Platforms.init(job);
+
+    if (job.getNumReduceTasks() == 0) {
+      String message = "There is no reducer, no need to use native output collector";
+      LOG.error(message);
+      throw new InvalidJobConfException(message);
+    }
+
+    Class comparatorClass = job.getClass(MRJobConfig.KEY_COMPARATOR, null, RawComparator.class);
+    if (comparatorClass != null && !Platforms.define(comparatorClass)) {
+      String message = "Native output collector don't support customized java comparator "
+        + job.get(MRJobConfig.KEY_COMPARATOR);
+      LOG.error(message);
+      throw new InvalidJobConfException(message);
+    }
+
+    if (job.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false) == true) {
+      if (!isCodecSupported(job.get(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC))) {
+        String message = "Native output collector don't support compression codec "
+          + job.get(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC) + ", We support Gzip, Lz4, snappy";
+        LOG.error(message);
+        throw new InvalidJobConfException(message);
+      }
+    }
+
+    if (!QuickSort.class.getName().equals(job.get(Constants.MAP_SORT_CLASS))) {
+      String message = "Native-Task don't support sort class " + job.get(Constants.MAP_SORT_CLASS);
+      LOG.error(message);
+      throw new InvalidJobConfException(message);
+    }
+
+    if (job.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, false) == true) {
+      String message = "Native-Task don't support secure shuffle";
+      LOG.error(message);
+      throw new InvalidJobConfException(message);
+    }
+
+    final Class<?> keyCls = job.getMapOutputKeyClass();
+    try {
+      @SuppressWarnings("rawtypes")
+      final INativeSerializer serializer = NativeSerialization.getInstance().getSerializer(keyCls);
+      if (null == serializer) {
+        String message = "Key type not supported. Cannot find serializer for " + keyCls.getName();
+        LOG.error(message);
+        throw new InvalidJobConfException(message);
+      } else if (!Platforms.support(keyCls.getName(), serializer, job)) {
+        String message = "Native output collector don't support this key, this key is not comparable in native "
+          + keyCls.getName();
+        LOG.error(message);
+        throw new InvalidJobConfException(message);
+      }
+    } catch (final IOException e) {
+      String message = "Cannot find serializer for " + keyCls.getName();
+      LOG.error(message);
+      throw new IOException(message);
+    }
+
+    final boolean ret = NativeRuntime.isNativeLibraryLoaded();
+    if (ret) {
+      NativeRuntime.configure(job);
+
+      final long updateInterval = job.getLong(Constants.NATIVE_STATUS_UPDATE_INTERVAL,
+          Constants.NATIVE_STATUS_UPDATE_INTERVAL_DEFVAL);
+      updater = new StatusReportChecker(context.getReporter(), updateInterval);
+      updater.start();
+
+    } else {
+      String message = "Nativeruntime cannot be loaded, please check the libnativetask.so is in hadoop library dir";
+      LOG.error(message);
+      throw new InvalidJobConfException(message);
+    }
+
+    this.handler = null;
+    try {
+      final Class<K> oKClass = (Class<K>) job.getMapOutputKeyClass();
+      final Class<K> oVClass = (Class<K>) job.getMapOutputValueClass();
+      final TaskAttemptID id = context.getMapTask().getTaskID();
+      final TaskContext taskContext = new TaskContext(job, null, null, oKClass, oVClass,
+          context.getReporter(), id);
+      handler = NativeCollectorOnlyHandler.create(taskContext);
+    } catch (final IOException e) {
+      String message = "Native output collector cannot be loaded;";
+      LOG.error(message);
+      throw new IOException(message, e);
+    }
+
+    LOG.info("Native output collector can be successfully enabled!");
+  }
+
+  private boolean isCodecSupported(String string) {
+    if ("org.apache.hadoop.io.compress.SnappyCodec".equals(string)
+        || "org.apache.hadoop.io.compress.GzipCodec".equals(string)
+        || "org.apache.hadoop.io.compress.Lz4Codec".equals(string)) {
+      return true;
+    }
+    return false;
+  }
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/NativeRuntime.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Task.TaskReporter;
+import org.apache.hadoop.mapred.nativetask.util.BytesUtil;
+import org.apache.hadoop.mapred.nativetask.util.ConfigUtil;
+import org.apache.hadoop.mapred.nativetask.util.SnappyUtil;
+import org.apache.hadoop.util.VersionInfo;
+
+/**
+ * This class stands for the native runtime It has three functions: 1. Create native handlers for map, reduce,
+ * outputcollector, and etc 2. Configure native task with provided MR configs 3. Provide file system api to native
+ * space, so that it can use File system like HDFS.
+ * 
+ */
+public class NativeRuntime {
+  private static Log LOG = LogFactory.getLog(NativeRuntime.class);
+  private static boolean nativeLibraryLoaded = false;
+
+  private static Configuration conf = new Configuration();
+
+  static {
+    try {
+      if (false == SnappyUtil.isNativeSnappyLoaded(conf)) {
+        throw new IOException("Snappy library cannot be loaded");
+      } else {
+        LOG.info("Snappy native library is available");
+      }
+      System.loadLibrary("nativetask");
+      LOG.info("Nativetask JNI library loaded.");
+      nativeLibraryLoaded = true;
+    } catch (final Throwable t) {
+      // Ignore failures
+      LOG.error("Failed to load nativetask JNI library with error: " + t);
+      LOG.info("java.library.path=" + System.getProperty("java.library.path"));
+      LOG.info("LD_LIBRARY_PATH=" + System.getenv("LD_LIBRARY_PATH"));
+    }
+  }
+
+  private static void assertNativeLibraryLoaded() {
+    if (!nativeLibraryLoaded) {
+      throw new RuntimeException("Native runtime library not loaded");
+    }
+  }
+
+  public static boolean isNativeLibraryLoaded() {
+    return nativeLibraryLoaded;
+  }
+
+  public static void configure(Configuration jobConf) {
+    assertNativeLibraryLoaded();
+    conf = new Configuration(jobConf);
+    conf.set(Constants.NATIVE_HADOOP_VERSION, VersionInfo.getVersion());
+    JNIConfigure(ConfigUtil.toBytes(conf));
+  }
+
+  /**
+   * create native object We use it to create native handlers
+   * 
+   * @param clazz
+   * @return
+   */
+  public synchronized static long createNativeObject(String clazz) {
+    assertNativeLibraryLoaded();
+    final long ret = JNICreateNativeObject(BytesUtil.toBytes(clazz));
+    if (ret == 0) {
+      LOG.warn("Can't create NativeObject for class " + clazz + ", probably not exist.");
+    }
+    return ret;
+  }
+
+  /**
+   * Register a customized library
+   * 
+   * @param clazz
+   * @return
+   */
+  public synchronized static long registerLibrary(String libraryName, String clazz) {
+    assertNativeLibraryLoaded();
+    final long ret = JNIRegisterModule(BytesUtil.toBytes(libraryName), BytesUtil.toBytes(clazz));
+    if (ret != 0) {
+      LOG.warn("Can't create NativeObject for class " + clazz + ", probably not exist.");
+    }
+    return ret;
+  }
+
+  /**
+   * destroy native object We use to destory native handlers
+   */
+  public synchronized static void releaseNativeObject(long addr) {
+    assertNativeLibraryLoaded();
+    JNIReleaseNativeObject(addr);
+  }
+
+  /**
+   * Get the status report from native space
+   * 
+   * @param reporter
+   * @throws IOException
+   */
+  public static void reportStatus(TaskReporter reporter) throws IOException {
+    assertNativeLibraryLoaded();
+    synchronized (reporter) {
+      final byte[] statusBytes = JNIUpdateStatus();
+      final DataInputBuffer ib = new DataInputBuffer();
+      ib.reset(statusBytes, statusBytes.length);
+      final FloatWritable progress = new FloatWritable();
+      progress.readFields(ib);
+      reporter.setProgress(progress.get());
+      final Text status = new Text();
+      status.readFields(ib);
+      if (status.getLength() > 0) {
+        reporter.setStatus(status.toString());
+      }
+      final IntWritable numCounters = new IntWritable();
+      numCounters.readFields(ib);
+      if (numCounters.get() == 0) {
+        return;
+      }
+      final Text group = new Text();
+      final Text name = new Text();
+      final LongWritable amount = new LongWritable();
+      for (int i = 0; i < numCounters.get(); i++) {
+        group.readFields(ib);
+        name.readFields(ib);
+        amount.readFields(ib);
+        reporter.incrCounter(group.toString(), name.toString(), amount.get());
+      }
+    }
+  }
+
+
+  /*******************************************************
+   *** The following are JNI Apis
+   ********************************************************/
+
+  /**
+   * Config the native runtime with mapreduce job configurations.
+   * 
+   * @param configs
+   */
+  private native static void JNIConfigure(byte[][] configs);
+
+  /**
+   * create a native object in native space
+   * 
+   * @param clazz
+   * @return
+   */
+  private native static long JNICreateNativeObject(byte[] clazz);
+
+  /**
+   * create the default native object for certain type
+   * 
+   * @param type
+   * @return
+   */
+  @Deprecated
+  private native static long JNICreateDefaultNativeObject(byte[] type);
+
+  /**
+   * destroy native object in native space
+   * 
+   * @param addr
+   */
+  private native static void JNIReleaseNativeObject(long addr);
+
+  /**
+   * get status update from native side Encoding: progress:float status:Text Counter number: int the count of the
+   * counters Counters: array [group:Text, name:Text, incrCount:Long]
+   * 
+   * @return
+   */
+  private native static byte[] JNIUpdateStatus();
+
+  /**
+   * Not used.
+   */
+  private native static void JNIRelease();
+
+  /**
+   * Not used.
+   */
+  private native static int JNIRegisterModule(byte[] path, byte[] name);
+}

Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platform.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platform.java?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platform.java (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/Platform.java Thu Jul 17 17:44:55 2014
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.nativetask.serde.INativeSerializer;
+import org.apache.hadoop.mapred.nativetask.serde.NativeSerialization;
+
+/**
+ * Base class for platforms. A platform is a framework running on top of
+ * MapReduce, like Hadoop, Hive, Pig, Mahout. Each framework defines its
+ * own key type and value type across a MapReduce job. For each platform,
+ * we should implement serializers such that we could communicate data with
+ * native side and native comparators so our native output collectors could
+ * sort them and write out. We've already provided the {@link HadoopPlatform}
+ * that supports all key types of Hadoop and users could implement their custom
+ * platform.
+ */
+public abstract class Platform {
+  private final NativeSerialization serialization;
+  protected Set<String> keyClassNames = new HashSet<String>();
+
+  public Platform() {
+    this.serialization = NativeSerialization.getInstance();
+  }
+
+  /**
+   * initialize a platform, where we should call registerKey
+   *
+   * @throws IOException
+   */
+  public abstract void init() throws IOException;
+
+  /**
+   * @return name of a Platform, useful for logs and debug
+   */
+  public abstract String name();
+
+
+  /**
+   * associate a key class with its serializer and platform
+   *
+   * @param keyClassName map out key class name
+   * @param key          key serializer class
+   * @throws IOException
+   */
+  protected void registerKey(String keyClassName, Class key) throws IOException {
+    serialization.register(keyClassName, key);
+    keyClassNames.add(keyClassName);
+  }
+
+  /**
+   * whether a platform supports a specific key should at least satisfy two conditions
+   *
+   * 1. the key belongs to the platform
+   * 2. the associated serializer must implement {@link INativeComparable} interface
+   *
+   *
+   * @param keyClassName map out put key class name
+   * @param serializer   serializer associated with key via registerKey
+   * @param job          job configuration
+   * @return             true if the platform has implemented native comparators of the key and
+   *                     false otherwise
+   */
+  protected abstract boolean support(String keyClassName, INativeSerializer serializer, JobConf job);
+
+
+  /**
+   * whether it's the platform that has defined a custom Java comparator
+   *
+   * NativeTask doesn't support custom Java comparator(set with mapreduce.job.output.key.comparator.class)
+   * but a platform (e.g Pig) could also set that conf and implement native comparators so
+   * we shouldn't bail out.
+   *
+   * @param keyComparator comparator set with mapreduce.job.output.key.comparator.class
+   * @return
+   */
+  protected abstract boolean define(Class keyComparator);
+}



Mime
View raw message