sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject sqoop git commit: SQOOP-3232: Remove Sqoop dependency on deprecated HBase APIs
Date Mon, 11 Sep 2017 12:18:15 GMT
Repository: sqoop
Updated Branches:
  refs/heads/trunk 184452908 -> e13dd2120


SQOOP-3232: Remove Sqoop dependency on deprecated HBase APIs

(Szabolcs Vasas via Boglarka Egyed)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/e13dd212
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/e13dd212
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/e13dd212

Branch: refs/heads/trunk
Commit: e13dd21209c26316d43350a23f5d533321b61352
Parents: 1844529
Author: Boglarka Egyed <bogi@apache.org>
Authored: Mon Sep 11 14:15:46 2017 +0200
Committer: Boglarka Egyed <bogi@apache.org>
Committed: Mon Sep 11 14:15:46 2017 +0200

----------------------------------------------------------------------
 .../apache/sqoop/hbase/HBasePutProcessor.java   |  99 +++++++++-----
 .../sqoop/mapreduce/HBaseBulkImportJob.java     |  60 ++++++---
 .../apache/sqoop/mapreduce/HBaseImportJob.java  |  69 +++-------
 .../com/cloudera/sqoop/hbase/HBaseTestCase.java |  31 +++--
 .../sqoop/hbase/TestHBasePutProcessor.java      | 133 +++++++++++++++++++
 5 files changed, 282 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/e13dd212/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
index 032fd38..cf97b8a 100644
--- a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
+++ b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
@@ -25,8 +25,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -84,12 +87,19 @@ public class HBasePutProcessor implements Closeable, Configurable,
   // into a Put command.
   private PutTransformer putTransformer;
 
-  private String tableName;
-  private HTable table;
+  private Connection hbaseConnection;
+  private BufferedMutator bufferedMutator;
 
   public HBasePutProcessor() {
   }
 
+  HBasePutProcessor(Configuration conf, PutTransformer putTransformer, Connection hbaseConnection,
BufferedMutator bufferedMutator) {
+    this.conf = conf;
+    this.putTransformer = putTransformer;
+    this.hbaseConnection = hbaseConnection;
+    this.bufferedMutator = bufferedMutator;
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public void setConf(Configuration config) {
@@ -106,15 +116,24 @@ public class HBasePutProcessor implements Closeable, Configurable,
       throw new RuntimeException("Could not instantiate PutTransformer.");
     }
     putTransformer.init(conf);
+    initHBaseMutator();
+  }
 
-    this.tableName = conf.get(TABLE_NAME_KEY, null);
+  private void initHBaseMutator() {
+    String tableName = conf.get(TABLE_NAME_KEY, null);
     try {
-      this.table = new HTable(conf, this.tableName);
-    } catch (IOException ioe) {
-      throw new RuntimeException("Could not access HBase table " + tableName,
-          ioe);
+      hbaseConnection = ConnectionFactory.createConnection(conf);
+      bufferedMutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName));
+    } catch (IOException e) {
+      if (hbaseConnection != null) {
+        try {
+          hbaseConnection.close();
+        } catch (IOException connCloseException){
+          LOG.error("Cannot close HBase connection.", connCloseException);
+        }
+      }
+      throw new RuntimeException("Could not create mutator for HBase table " + tableName,
e);
     }
-    this.table.setAutoFlush(false);
   }
 
   @Override
@@ -131,38 +150,54 @@ public class HBasePutProcessor implements Closeable, Configurable,
       throws IOException, ProcessingException {
     Map<String, Object> fields = record.getFieldMap();
     List<Mutation> mutationList = putTransformer.getMutationCommand(fields);
-    if (null != mutationList) {
-      for (Mutation mutation : mutationList) {
-        if (mutation!=null) {
-            if(mutation instanceof Put) {
-              Put putObject = (Put) mutation;
-              if (putObject.isEmpty()) {
-                LOG.warn("Could not insert row with no columns "
-                      + "for row-key column: " + Bytes.toString(putObject.getRow()));
-                } else {
-                  this.table.put(putObject);
-                }
-              } else if(mutation instanceof Delete) {
-                Delete deleteObject = (Delete) mutation;
-                if (deleteObject.isEmpty()) {
-                  LOG.warn("Could not delete row with no columns "
-                        + "for row-key column: " + Bytes.toString(deleteObject.getRow()));
-                } else {
-                  this.table.delete(deleteObject);
-                }
-            }
-        }
+    if (mutationList == null) {
+      return;
+    }
+    for (Mutation mutation : mutationList) {
+      if (!canAccept(mutation)) {
+        continue;
+      }
+      if (!mutation.isEmpty()) {
+        bufferedMutator.mutate(mutation);
+      } else {
+        logEmptyMutation(mutation);
       }
     }
   }
 
+  private void logEmptyMutation(Mutation mutation) {
+    String action = null;
+    if (mutation instanceof Put) {
+      action = "insert";
+    } else if (mutation instanceof Delete) {
+      action = "delete";
+    }
+    LOG.warn("Could not " + action + " row with no columns "
+        + "for row-key column: " + Bytes.toString(mutation.getRow()));
+  }
+
+  private boolean canAccept(Mutation mutation) {
+    return mutation != null && (mutation instanceof Put || mutation instanceof Delete);
+  }
+
   @Override
   /**
    * Closes the HBase table and commits all pending operations.
    */
   public void close() throws IOException {
-    this.table.flushCommits();
-    this.table.close();
+    try {
+      bufferedMutator.flush();
+    } finally {
+      try {
+        bufferedMutator.close();
+      } finally {
+        try {
+          hbaseConnection.close();
+        } catch (IOException e) {
+          LOG.error("Cannot close HBase connection.", e);
+        }
+      }
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e13dd212/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
index 2bbfffe..ed89aeb 100644
--- a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java
@@ -26,10 +26,14 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.mapreduce.Job;
@@ -50,6 +54,8 @@ public class HBaseBulkImportJob extends HBaseImportJob {
   public static final Log LOG = LogFactory.getLog(
       HBaseBulkImportJob.class.getName());
 
+  private Connection hbaseConnection;
+
   public HBaseBulkImportJob(final SqoopOptions opts,
       final ImportJobContext importContext) {
     super(opts, importContext);
@@ -81,8 +87,21 @@ public class HBaseBulkImportJob extends HBaseImportJob {
 
     TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Preconditions.class);
     FileOutputFormat.setOutputPath(job, getContext().getDestination());
-    HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable());
-    HFileOutputFormat.configureIncrementalLoad(job, hTable);
+    TableName hbaseTableName = TableName.valueOf(options.getHBaseTable());
+    hbaseConnection = ConnectionFactory.createConnection(job.getConfiguration());
+
+    try (
+        Table hbaseTable = hbaseConnection.getTable(hbaseTableName)
+    ) {
+      HFileOutputFormat2.configureIncrementalLoad(job, hbaseTable, hbaseConnection.getRegionLocator(hbaseTableName));
+    } catch (IOException | RuntimeException e) {
+      try {
+        hbaseConnection.close();
+      } catch (IOException ioException) {
+        LOG.error("Cannot close HBase connection.", ioException);
+      }
+      throw e;
+    }
   }
 
   /**
@@ -99,15 +118,16 @@ public class HBaseBulkImportJob extends HBaseImportJob {
     setPermission(fileSystem, fileSystem.getFileStatus(bulkLoadDir),
       FsPermission.createImmutable((short) 00777));
 
-    HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable());
+    TableName hbaseTableName = TableName.valueOf(options.getHBaseTable());
 
     // Load generated HFiles into table
-    try {
-      LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
-        job.getConfiguration());
-      loader.doBulkLoad(bulkLoadDir, hTable);
-    }
-    catch (Exception e) {
+    try (
+        Table hbaseTable = hbaseConnection.getTable(hbaseTableName);
+        Admin hbaseAdmin = hbaseConnection.getAdmin()
+    ) {
+      LoadIncrementalHFiles loader = new LoadIncrementalHFiles(job.getConfiguration());
+      loader.doBulkLoad(bulkLoadDir, hbaseAdmin, hbaseTable, hbaseConnection.getRegionLocator(hbaseTableName));
+    } catch (Exception e) {
       String errorMessage = String.format("Unrecoverable error while " +
         "performing the bulk load of files in [%s]",
         bulkLoadDir.toString());
@@ -117,11 +137,19 @@ public class HBaseBulkImportJob extends HBaseImportJob {
 
   @Override
   protected void jobTeardown(Job job) throws IOException, ImportException {
-    super.jobTeardown(job);
-    // Delete the hfiles directory after we are finished.
-    Path destination = getContext().getDestination();
-    FileSystem fileSystem = destination.getFileSystem(job.getConfiguration());
-    fileSystem.delete(destination, true);
+    try {
+	    super.jobTeardown(job);
+      // Delete the hfiles directory after we are finished.
+      Path destination = getContext().getDestination();
+      FileSystem fileSystem = destination.getFileSystem(job.getConfiguration());
+      fileSystem.delete(destination, true);
+    } finally {
+      try {
+        hbaseConnection.close();
+      } catch (IOException e) {
+        LOG.error("Cannot close HBase connection.", e);
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e13dd212/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java b/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java
index 523d0a7..5adb788 100644
--- a/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java
@@ -19,7 +19,6 @@
 package org.apache.sqoop.mapreduce;
 
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
 import org.apache.commons.logging.Log;
@@ -28,10 +27,14 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.token.TokenUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
@@ -160,65 +163,28 @@ public class HBaseImportJob extends DataDrivenImportJob {
       HBaseConfiguration.addHbaseResources(conf);
     }
 
-    HBaseAdmin admin = new HBaseAdmin(conf);
+    Connection hbaseConnection = ConnectionFactory.createConnection(conf);
+    Admin admin = hbaseConnection.getAdmin();
 
     if (!skipDelegationTokens(conf)) {
-      // Add authentication token to the job if we're running on secure cluster.
-      //
-      // We're currently supporting HBase version 0.90 that do not have security
-      // patches which means that it do not have required methods
-      // "isSecurityEnabled" and "obtainAuthTokenForJob".
-      //
-      // We're using reflection API to see if those methods are available and call
-      // them only if they are present.
-      //
-      // After we will remove support for HBase 0.90 we can simplify the code to
-      // following code fragment:
-      /*
       try {
-        if (User.isSecurityEnabled()) {
-          User user = User.getCurrent();
-          user.obtainAuthTokenForJob(conf, job);
+        if (User.isHBaseSecurityEnabled(conf)) {
+          TokenUtil.obtainTokenForJob(hbaseConnection, User.getCurrent(), job);
         }
       } catch(InterruptedException ex) {
         throw new ImportException("Can't get authentication token", ex);
       }
-      */
-      try {
-        // Get method isSecurityEnabled
-        Method isHBaseSecurityEnabled = User.class.getMethod(
-            "isHBaseSecurityEnabled", Configuration.class);
-
-        // Get method obtainAuthTokenForJob
-        Method obtainAuthTokenForJob = User.class.getMethod(
-            "obtainAuthTokenForJob", Configuration.class, Job.class);
-
-        // Get current user
-        User user = User.getCurrent();
-
-        // Obtain security token if needed
-        if ((Boolean)isHBaseSecurityEnabled.invoke(null, conf)) {
-          obtainAuthTokenForJob.invoke(user, conf, job);
-        }
-      } catch (NoSuchMethodException e) {
-        LOG.info("It seems that we're running on HBase without security"
-            + " additions. Security additions will not be used during this job.");
-      } catch (InvocationTargetException e) {
-        throw new ImportException("Can't get authentication token", e);
-      } catch (IllegalAccessException e) {
-        throw new ImportException("Can't get authentication token", e);
-      }
     }
 
     // Check to see if the table exists.
     HTableDescriptor tableDesc = null;
     byte [] familyBytes = Bytes.toBytes(familyName);
     HColumnDescriptor colDesc = new HColumnDescriptor(familyBytes);
-    if (!admin.tableExists(tableName)) {
+    if (!admin.tableExists(TableName.valueOf(tableName))) {
       if (options.getCreateHBaseTable()) {
         // Create the table.
         LOG.info("Creating missing HBase table " + tableName);
-        tableDesc =  new HTableDescriptor(tableName);
+        tableDesc =  new HTableDescriptor(TableName.valueOf(tableName));
         tableDesc.addFamily(colDesc);
         admin.createTable(tableDesc);
       } else {
@@ -228,16 +194,16 @@ public class HBaseImportJob extends DataDrivenImportJob {
       }
     } else {
       // Table exists, so retrieve their current version
-      tableDesc = admin.getTableDescriptor(Bytes.toBytes(tableName));
+	    tableDesc = admin.getTableDescriptor(TableName.valueOf(tableName));
 
       // Check if current version do have specified column family
       if (!tableDesc.hasFamily(familyBytes)) {
         if (options.getCreateHBaseTable()) {
           // Create the column family.
           LOG.info("Creating missing column family " + familyName);
-          admin.disableTable(tableName);
-          admin.addColumn(tableName, colDesc);
-          admin.enableTable(tableName);
+          admin.disableTable(TableName.valueOf(tableName));
+          admin.addColumn(TableName.valueOf(tableName), colDesc);
+          admin.enableTable(TableName.valueOf(tableName));
         } else {
           LOG.warn("Could not find column family " + familyName + " in table "
             + tableName);
@@ -250,10 +216,11 @@ public class HBaseImportJob extends DataDrivenImportJob {
     // Make sure we close the connection to HBA, this is only relevant in
     // unit tests
     admin.close();
+    hbaseConnection.close();
 
     // Make sure HBase libraries are shipped as part of the job.
     TableMapReduceUtil.addDependencyJars(job);
-    TableMapReduceUtil.addDependencyJars(conf, HTable.class);
+    TableMapReduceUtil.addDependencyJars(conf, Table.class);
 
     super.jobSetup(job);
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e13dd212/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
index 8b29b5f..cf42d31 100644
--- a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
+++ b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
@@ -39,8 +39,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
@@ -206,9 +209,10 @@ public abstract class HBaseTestCase extends ImportJobTestCase {
       String colFamily, String colName, String val) throws IOException {
     Get get = new Get(Bytes.toBytes(rowKey));
     get.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(colName));
-    HTable table = new HTable(new Configuration(
-        hbaseTestUtil.getConfiguration()), Bytes.toBytes(tableName));
-    try {
+    try (
+        Connection hbaseConnection = createHBaseConnection();
+        Table table = getHBaseTable(hbaseConnection, tableName)
+    ) {
       Result r = table.get(get);
       byte [] actualVal = r.getValue(Bytes.toBytes(colFamily),
           Bytes.toBytes(colName));
@@ -218,29 +222,34 @@ public abstract class HBaseTestCase extends ImportJobTestCase {
         assertNotNull("No result, but we expected one", actualVal);
         assertEquals(val, Bytes.toString(actualVal));
       }
-    } finally {
-      table.close();
     }
   }
 
   protected int countHBaseTable(String tableName, String colFamily)
       throws IOException {
     int count = 0;
-    HTable table = new HTable(new Configuration(
-        hbaseTestUtil.getConfiguration()), Bytes.toBytes(tableName));
-    try {
+    try (
+        Connection hbaseConnection = createHBaseConnection();
+        Table table = getHBaseTable(hbaseConnection, tableName)
+    ) {
       ResultScanner scanner = table.getScanner(Bytes.toBytes(colFamily));
       for(Result result = scanner.next();
           result != null;
           result = scanner.next()) {
         count++;
       }
-    } finally {
-      table.close();
     }
     return count;
   }
 
+  private Connection createHBaseConnection() throws IOException {
+    return ConnectionFactory.createConnection(new Configuration(hbaseTestUtil.getConfiguration()));
+  }
+
+  private Table getHBaseTable(Connection connection, String tableName) throws IOException
{
+    return connection.getTable(TableName.valueOf(tableName));
+  }
+
   protected boolean isKerberized() {
     return kerberosConfigurationProvider != null;
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e13dd212/src/test/org/apache/sqoop/hbase/TestHBasePutProcessor.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/hbase/TestHBasePutProcessor.java b/src/test/org/apache/sqoop/hbase/TestHBasePutProcessor.java
new file mode 100644
index 0000000..73b3177
--- /dev/null
+++ b/src/test/org/apache/sqoop/hbase/TestHBasePutProcessor.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.hbase;
+
+import com.cloudera.sqoop.lib.FieldMappable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.sqoop.util.ExpectedLogMessage;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static java.util.Collections.singletonList;
+import static org.mockito.Matchers.anyMap;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class TestHBasePutProcessor {
+
+  @Rule
+  public ExpectedLogMessage expectedLogMessage = new ExpectedLogMessage();
+
+  private Configuration configuration;
+  private Connection hbaseConnection;
+  private PutTransformer putTransformer;
+  private BufferedMutator bufferedMutator;
+  private FieldMappable fieldMappable;
+
+  private HBasePutProcessor hBasePutProcessor;
+
+  @Before
+  public void before() {
+    configuration = mock(Configuration.class);
+    hbaseConnection = mock(Connection.class);
+    putTransformer = mock(PutTransformer.class);
+    bufferedMutator = mock(BufferedMutator.class);
+    fieldMappable = mock(FieldMappable.class);
+
+    hBasePutProcessor = new HBasePutProcessor(configuration, putTransformer, hbaseConnection,
bufferedMutator);
+  }
+
+  @Test
+  public void testNoMutationIsDoneWhenNullListIsReceived() throws Exception {
+    when(putTransformer.getMutationCommand(anyMap())).thenReturn(null);
+    verifyNoMoreInteractions(bufferedMutator);
+
+    hBasePutProcessor.accept(fieldMappable);
+  }
+
+  @Test
+  public void testNoMutationIsDoneWhenListContainingNullsIsReceived() throws Exception {
+    List<Mutation> inputList = Arrays.asList(null, null, null);
+    when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList);
+    verifyNoMoreInteractions(bufferedMutator);
+
+    hBasePutProcessor.accept(fieldMappable);
+  }
+
+  @Test
+  public void testNoMutationIsDoneWhenListContainingUnknownMutationIsReceived() throws Exception
{
+    List<Mutation> inputList = singletonList(mock(Mutation.class));
+    when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList);
+    verifyNoMoreInteractions(bufferedMutator);
+
+    hBasePutProcessor.accept(fieldMappable);
+  }
+
+  @Test
+  public void testWarningIsLoggedWhenListContainingEmptyPutIsReceived() throws Exception
{
+    Mutation emptyPutMutation = mock(Put.class);
+    when(emptyPutMutation.getRow()).thenReturn("emptyPutMutation".getBytes());
+    when(emptyPutMutation.isEmpty()).thenReturn(true);
+    List<Mutation> inputList = singletonList(emptyPutMutation);
+    when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList);
+    verifyNoMoreInteractions(bufferedMutator);
+    expectedLogMessage.expectWarn("Could not insert row with no columns for row-key column:
emptyPutMutation");
+
+    hBasePutProcessor.accept(fieldMappable);
+  }
+
+  @Test
+  public void testWarningIsLoggedWhenListContainingEmptyDeleteIsReceived() throws Exception
{
+    Mutation emptyDeleteMutation = mock(Delete.class);
+    when(emptyDeleteMutation.getRow()).thenReturn("emptyDeleteMutation".getBytes());
+    when(emptyDeleteMutation.isEmpty()).thenReturn(true);
+    List<Mutation> inputList = singletonList(emptyDeleteMutation);
+    when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList);
+    verifyNoMoreInteractions(bufferedMutator);
+    expectedLogMessage.expectWarn("Could not delete row with no columns for row-key column:
emptyDeleteMutation");
+
+    hBasePutProcessor.accept(fieldMappable);
+  }
+
+  @Test
+  public void testMutationIsDoneForAllElementsWhenListContainingValidMutationsIsReceived()
throws Exception {
+    Mutation aPutMutation = mock(Put.class);
+    Mutation aDeleteMutation = mock(Delete.class);
+    Mutation anotherPutMutation = mock(Put.class);
+    List<Mutation> inputList = Arrays.asList(aPutMutation, aDeleteMutation, anotherPutMutation);
+    when(putTransformer.getMutationCommand(anyMap())).thenReturn(inputList);
+
+    hBasePutProcessor.accept(fieldMappable);
+
+    verify(bufferedMutator).mutate(aPutMutation);
+    verify(bufferedMutator).mutate(aDeleteMutation);
+    verify(bufferedMutator).mutate(anotherPutMutation);
+  }
+
+}
\ No newline at end of file


Mime
View raw message