sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arv...@apache.org
Subject svn commit: r1195674 - in /incubator/sqoop/trunk/src/java: com/cloudera/sqoop/hbase/ com/cloudera/sqoop/hive/ com/cloudera/sqoop/io/ org/apache/sqoop/hbase/ org/apache/sqoop/hive/
Date Mon, 31 Oct 2011 21:15:29 GMT
Author: arvind
Date: Mon Oct 31 21:15:29 2011
New Revision: 1195674

URL: http://svn.apache.org/viewvc?rev=1195674&view=rev
Log:
SQOOP-380. Migrate hive and hbase packages to new namespace.

(Bilung Lee via Arvind Prabhakar)

Added:
    incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/
    incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/HBaseUtil.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/PutTransformer.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/
    incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/HiveImport.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/HiveTypes.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/TableDefWriter.java
Modified:
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/HBasePutProcessor.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/HBaseUtil.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/PutTransformer.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/ToStringPutTransformer.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveImport.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveTypes.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/TableDefWriter.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/SplittableBufferedWriter.java

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/HBasePutProcessor.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/HBasePutProcessor.java?rev=1195674&r1=1195673&r2=1195674&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/HBasePutProcessor.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/HBasePutProcessor.java Mon Oct 31 21:15:29 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,116 +18,19 @@
 
 package com.cloudera.sqoop.hbase;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.cloudera.sqoop.lib.FieldMappable;
-import com.cloudera.sqoop.lib.FieldMapProcessor;
-import com.cloudera.sqoop.lib.ProcessingException;
-
 /**
- * SqoopRecordProcessor that performs an HBase "put" operation
- * that contains all the fields of the record.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public class HBasePutProcessor implements Closeable, Configurable,
-    FieldMapProcessor {
-
-  /** Configuration key specifying the table to insert into. */
-  public static final String TABLE_NAME_KEY = "sqoop.hbase.insert.table";
+public class HBasePutProcessor
+    extends org.apache.sqoop.hbase.HBasePutProcessor {
 
-  /** Configuration key specifying the column family to insert into. */
+  public static final String TABLE_NAME_KEY =
+      org.apache.sqoop.hbase.HBasePutProcessor.TABLE_NAME_KEY;
   public static final String COL_FAMILY_KEY =
-      "sqoop.hbase.insert.column.family";
-
-  /** Configuration key specifying the column of the input whose value
-   * should be used as the row id.
-   */
+      org.apache.sqoop.hbase.HBasePutProcessor.COL_FAMILY_KEY;
   public static final String ROW_KEY_COLUMN_KEY =
-      "sqoop.hbase.insert.row.key.column";
-
-  /**
-   * Configuration key specifying the PutTransformer implementation to use.
-   */
+      org.apache.sqoop.hbase.HBasePutProcessor.ROW_KEY_COLUMN_KEY;
   public static final String TRANSFORMER_CLASS_KEY =
-      "sqoop.hbase.insert.put.transformer.class";
-
-  private Configuration conf;
-
-  // An object that can transform a map of fieldName->object
-  // into a Put command.
-  private PutTransformer putTransformer;
-
-  private String tableName;
-  private HTable table;
-
-  public HBasePutProcessor() {
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public void setConf(Configuration config) {
-    this.conf = config;
-
-    // Get the implementation of PutTransformer to use.
-    // By default, we call toString() on every non-null field.
-    Class<? extends PutTransformer> xformerClass =
-        (Class<? extends PutTransformer>)
-        this.conf.getClass(TRANSFORMER_CLASS_KEY, ToStringPutTransformer.class);
-    this.putTransformer = (PutTransformer)
-        ReflectionUtils.newInstance(xformerClass, this.conf);
-    if (null == putTransformer) {
-      throw new RuntimeException("Could not instantiate PutTransformer.");
-    }
-
-    this.putTransformer.setColumnFamily(conf.get(COL_FAMILY_KEY, null));
-    this.putTransformer.setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null));
-
-    this.tableName = conf.get(TABLE_NAME_KEY, null);
-    try {
-      this.table = new HTable(conf, this.tableName);
-    } catch (IOException ioe) {
-      throw new RuntimeException("Could not access HBase table " + tableName,
-          ioe);
-    }
-    this.table.setAutoFlush(false);
-  }
-
-  @Override
-  public Configuration getConf() {
-    return this.conf;
-  }
-
-  @Override
-  /**
-   * Processes a record by extracting its field map and converting
-   * it into a list of Put commands into HBase.
-   */
-  public void accept(FieldMappable record)
-      throws IOException, ProcessingException {
-    Map<String, Object> fields = record.getFieldMap();
-
-    List<Put> putList = putTransformer.getPutCommand(fields);
-    if (null != putList) {
-      for (Put put : putList) {
-        this.table.put(put);
-      }
-    }
-  }
+      org.apache.sqoop.hbase.HBasePutProcessor.TRANSFORMER_CLASS_KEY;
 
-  @Override
-  /**
-   * Closes the HBase table and commits all pending operations.
-   */
-  public void close() throws IOException {
-    this.table.flushCommits();
-    this.table.close();
-  }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/HBaseUtil.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/HBaseUtil.java?rev=1195674&r1=1195673&r2=1195674&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/HBaseUtil.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/HBaseUtil.java Mon Oct 31 21:15:29 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,33 +19,21 @@
 package com.cloudera.sqoop.hbase;
 
 /**
- * This class provides a method that checks if HBase jars are present in the
- * current classpath. It also provides a setAlwaysNoHBaseJarMode mechanism for
- * testing and simulation the condition where the is on HBase jar (since hbase
- * is pulled automatically by ivy)
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public final class HBaseUtil {
-  private static boolean testingMode = false;
 
-  private HBaseUtil() {
-  }
+  private HBaseUtil() { }
 
   /**
    * This is a way to make this always return false for testing.
    */
   public static void setAlwaysNoHBaseJarMode(boolean mode) {
-    testingMode = mode;
+    org.apache.sqoop.hbase.HBaseUtil.setAlwaysNoHBaseJarMode(mode);
   }
 
   public static boolean isHBaseJarPresent() {
-    if (testingMode) {
-      return false;
-    }
-    try {
-      Class.forName("org.apache.hadoop.hbase.client.HTable");
-    } catch (ClassNotFoundException cnfe) {
-      return false;
-    }
-    return true;
+    return org.apache.sqoop.hbase.HBaseUtil.isHBaseJarPresent();
   }
+
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/PutTransformer.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/PutTransformer.java?rev=1195674&r1=1195673&r2=1195674&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/PutTransformer.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/PutTransformer.java Mon Oct 31 21:15:29 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,58 +18,9 @@
 
 package com.cloudera.sqoop.hbase;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.hbase.client.Put;
-
 /**
- * Interface that takes a map of jdbc field names to values
- * and converts them to a Put command for HBase.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public abstract class PutTransformer {
-
-  public PutTransformer() {
-  }
-
-  private String columnFamily;
-  private String rowKeyColumn;
-
-  /**
-   * @return the default column family to insert into.
-   */
-  public String getColumnFamily() {
-    return this.columnFamily;
-  }
-
-  /**
-   * Set the default column family to insert into.
-   */
-  public void setColumnFamily(String colFamily) {
-    this.columnFamily = colFamily;
-  }
-
-  /**
-   * @return the field name identifying the value to use as the row id.
-   */
-  public String getRowKeyColumn() {
-    return this.rowKeyColumn;
-  }
-
-  /**
-   * Set the column of the input fields which should be used to calculate
-   * the row id.
-   */
-  public void setRowKeyColumn(String rowKeyCol) {
-    this.rowKeyColumn = rowKeyCol;
-  }
-
-  /**
-   * Returns a list of Put commands that inserts the fields into a row in HBase.
-   * @param fields a map of field names to values to insert.
-   * @return A list of Put commands that inserts these into HBase.
-   */
-  public abstract List<Put> getPutCommand(Map<String, Object> fields)
-      throws IOException;
+public abstract class PutTransformer
+    extends org.apache.sqoop.hbase.PutTransformer {
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/ToStringPutTransformer.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/ToStringPutTransformer.java?rev=1195674&r1=1195673&r2=1195674&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/ToStringPutTransformer.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hbase/ToStringPutTransformer.java Mon Oct 31 21:15:29 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -33,70 +31,8 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * PutTransformer that calls toString on all non-null fields.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public class ToStringPutTransformer extends PutTransformer {
-
-  public static final Log LOG = LogFactory.getLog(
-      ToStringPutTransformer.class.getName());
-
-  // A mapping from field name -> bytes for that field name.
-  // Used to cache serialization work done for fields names.
-  private Map<String, byte[]> serializedFieldNames;
-
-  public ToStringPutTransformer() {
-    serializedFieldNames = new TreeMap<String, byte[]>();
-  }
-
-  /**
-   * Return the serialized bytes for a field name, using
-   * the cache if it's already in there.
-   */
-  private byte [] getFieldNameBytes(String fieldName) {
-    byte [] cachedName = serializedFieldNames.get(fieldName);
-    if (null != cachedName) {
-      // Cache hit. We're done.
-      return cachedName;
-    }
-
-    // Do the serialization and memoize the result.
-    byte [] nameBytes = Bytes.toBytes(fieldName);
-    serializedFieldNames.put(fieldName, nameBytes);
-    return nameBytes;
-  }
-
-  @Override
-  /** {@inheritDoc} */
-  public List<Put> getPutCommand(Map<String, Object> fields)
-      throws IOException {
-
-    String rowKeyCol = getRowKeyColumn();
-    String colFamily = getColumnFamily();
-    byte [] colFamilyBytes = Bytes.toBytes(colFamily);
-
-    Object rowKey = fields.get(rowKeyCol);
-    if (null == rowKey) {
-      // If the row-key column is null, we don't insert this row.
-      LOG.warn("Could not insert row with null value for row-key column: "
-          + rowKeyCol);
-      return null;
-    }
-
-    Put put = new Put(Bytes.toBytes(rowKey.toString()));
-
-    for (Map.Entry<String, Object> fieldEntry : fields.entrySet()) {
-      String colName = fieldEntry.getKey();
-      if (!colName.equals(rowKeyCol)) {
-        // This is a regular field, not the row key.
-        // Add it if it's not null.
-        Object val = fieldEntry.getValue();
-        if (null != val) {
-          put.add(colFamilyBytes, getFieldNameBytes(colName),
-              Bytes.toBytes(val.toString()));
-        }
-      }
-    }
-
-    return Collections.singletonList(put);
-  }
+public class ToStringPutTransformer
+    extends org.apache.sqoop.hbase.ToStringPutTransformer {
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveImport.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveImport.java?rev=1195674&r1=1195673&r2=1195674&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveImport.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveImport.java Mon Oct 31 21:15:29 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,334 +18,20 @@
 
 package com.cloudera.sqoop.hive;
 
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.OutputStreamWriter;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
 import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.io.CodecMap;
 import com.cloudera.sqoop.manager.ConnManager;
-import com.cloudera.sqoop.util.Executor;
-import com.cloudera.sqoop.util.ExitSecurityException;
-import com.cloudera.sqoop.util.LoggingAsyncSink;
-import com.cloudera.sqoop.util.SubprocessSecurityManager;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.util.Tool;
 
 /**
- * Utility to import a table into the Hive metastore. Manages the connection
- * to Hive itself as well as orchestrating the use of the other classes in this
- * package.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public class HiveImport {
-
-  public static final Log LOG = LogFactory.getLog(HiveImport.class.getName());
-
-  private SqoopOptions options;
-  private ConnManager connManager;
-  private Configuration configuration;
-  private boolean generateOnly;
-
-  /** Entry point through which Hive invocation should be attempted. */
-  private static final String HIVE_MAIN_CLASS =
-      "org.apache.hadoop.hive.cli.CliDriver";
+public class HiveImport
+    extends org.apache.sqoop.hive.HiveImport {
 
   public HiveImport(final SqoopOptions opts, final ConnManager connMgr,
       final Configuration conf, final boolean generateOnly) {
-    this.options = opts;
-    this.connManager = connMgr;
-    this.configuration = conf;
-    this.generateOnly = generateOnly;
-  }
-
-
-  /**
-   * @return the filename of the hive executable to run to do the import
-   */
-  private String getHiveBinPath() {
-    // If the user has $HIVE_HOME set, then use $HIVE_HOME/bin/hive if it
-    // exists.
-    // Fall back to just plain 'hive' and hope it's in the path.
-
-    String hiveHome = options.getHiveHome();
-    if (null == hiveHome) {
-      return "hive";
-    }
-
-    Path p = new Path(hiveHome);
-    p = new Path(p, "bin");
-    p = new Path(p, "hive");
-    String hiveBinStr = p.toString();
-    if (new File(hiveBinStr).exists()) {
-      return hiveBinStr;
-    } else {
-      return "hive";
-    }
-  }
-
-  /**
-   * If we used a MapReduce-based upload of the data, remove the _logs dir
-   * from where we put it, before running Hive LOAD DATA INPATH.
-   */
-  private void removeTempLogs(String tableName) throws IOException {
-    FileSystem fs = FileSystem.get(configuration);
-    Path tablePath;
-    if (null != tableName) {
-        String warehouseDir = options.getWarehouseDir();
-        if (warehouseDir != null) {
-          tablePath = new Path(new Path(warehouseDir), tableName);
-        } else {
-          tablePath = new Path(tableName);
-        }
-    } else {
-        // --table option is not used, so use the target dir instead
-        tablePath = new Path(options.getTargetDir());
-    }
-
-    Path logsPath = new Path(tablePath, "_logs");
-    if (fs.exists(logsPath)) {
-      LOG.info("Removing temporary files from import process: " + logsPath);
-      if (!fs.delete(logsPath, true)) {
-        LOG.warn("Could not delete temporary files; "
-            + "continuing with import, but it may fail.");
-      }
-    }
-  }
-
-  /**
-   * @return true if we're just generating the DDL for the import, but
-   * not actually running it (i.e., --generate-only mode). If so, don't
-   * do any side-effecting actions in Hive.
-   */
-  private boolean isGenerateOnly() {
-    return generateOnly;
-  }
-
-  /**
-   * @return a File object that can be used to write the DDL statement.
-   * If we're in gen-only mode, this should be a file in the outdir, named
-   * after the Hive table we're creating. If we're in import mode, this should
-   * be a one-off temporary file.
-   */
-  private File getScriptFile(String outputTableName) throws IOException {
-    if (!isGenerateOnly()) {
-      return File.createTempFile("hive-script-", ".txt",
-          new File(options.getTempDir()));
-    } else {
-      return new File(new File(options.getCodeOutputDir()),
-          outputTableName + ".q");
-    }
+    super(opts, connMgr, conf, generateOnly);
   }
 
-  /**
-   * Perform the import of data from an HDFS path to a Hive table.
-   *
-   * @param inputTableName the name of the table as loaded into HDFS
-   * @param outputTableName the name of the table to create in Hive.
-   * @param createOnly if true, run the CREATE TABLE statement but not
-   * LOAD DATA.
-   */
-  public void importTable(String inputTableName, String outputTableName,
-      boolean createOnly) throws IOException {
-
-    if (!isGenerateOnly()) {
-      removeTempLogs(inputTableName);
-      LOG.info("Loading uploaded data into Hive");
-    }
-
-    if (null == outputTableName) {
-      outputTableName = inputTableName;
-    }
-    LOG.debug("Hive.inputTable: " + inputTableName);
-    LOG.debug("Hive.outputTable: " + outputTableName);
-
-    // For testing purposes against our mock hive implementation,
-    // if the sysproperty "expected.script" is set, we set the EXPECTED_SCRIPT
-    // environment variable for the child hive process. We also disable
-    // timestamp comments so that we have deterministic table creation scripts.
-    String expectedScript = System.getProperty("expected.script");
-    List<String> env = Executor.getCurEnvpStrings();
-    boolean debugMode = expectedScript != null;
-    if (debugMode) {
-      env.add("EXPECTED_SCRIPT=" + expectedScript);
-      env.add("TMPDIR=" + options.getTempDir());
-    }
-
-    // generate the HQL statements to run.
-    TableDefWriter tableWriter = new TableDefWriter(options, connManager,
-        inputTableName, outputTableName,
-        configuration, !debugMode);
-    String createTableStr = tableWriter.getCreateTableStmt() + ";\n";
-    String loadDataStmtStr = tableWriter.getLoadDataStmt() + ";\n";
-
-    if (!isGenerateOnly()) {
-      String codec = options.getCompressionCodec();
-      if (codec != null && (codec.equals(CodecMap.LZOP)
-              || codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
-        try {
-          String finalPathStr = tableWriter.getFinalPathStr();
-          Tool tool = ReflectionUtils.newInstance(Class.
-                  forName("com.hadoop.compression.lzo.DistributedLzoIndexer").
-                  asSubclass(Tool.class), configuration);
-          ToolRunner.run(configuration, tool, new String[] { finalPathStr });
-        } catch (Exception ex) {
-          LOG.error("Error indexing lzo files", ex);
-          throw new IOException("Error indexing lzo files", ex);
-        }
-      }
-    }
-
-    // write them to a script file.
-    File scriptFile = getScriptFile(outputTableName);
-    try {
-      String filename = scriptFile.toString();
-      BufferedWriter w = null;
-      try {
-        FileOutputStream fos = new FileOutputStream(scriptFile);
-        w = new BufferedWriter(new OutputStreamWriter(fos));
-        w.write(createTableStr, 0, createTableStr.length());
-        if (!createOnly) {
-          w.write(loadDataStmtStr, 0, loadDataStmtStr.length());
-        }
-      } catch (IOException ioe) {
-        LOG.error("Error writing Hive load-in script: " + ioe.toString());
-        ioe.printStackTrace();
-        throw ioe;
-      } finally {
-        if (null != w) {
-          try {
-            w.close();
-          } catch (IOException ioe) {
-            LOG.warn("IOException closing stream to Hive script: "
-                + ioe.toString());
-          }
-        }
-      }
-
-      if (!isGenerateOnly()) {
-        executeScript(filename, env);
-
-        LOG.info("Hive import complete.");
-      }
-    } finally {
-      if (!isGenerateOnly()) {
-        // User isn't interested in saving the DDL. Remove the file.
-        if (!scriptFile.delete()) {
-          LOG.warn("Could not remove temporary file: " + scriptFile.toString());
-          // try to delete the file later.
-          scriptFile.deleteOnExit();
-        }
-      }
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  /**
-   * Execute the script file via Hive.
-   * If Hive's jars are on the classpath, run it in the same process.
-   * Otherwise, execute the file with 'bin/hive'.
-   *
-   * @param filename The script file to run.
-   * @param env the environment strings to pass to any subprocess.
-   * @throws IOException if Hive did not exit successfully.
-   */
-  private void executeScript(String filename, List<String> env)
-      throws IOException {
-    SubprocessSecurityManager subprocessSM = null;
-
-    try {
-      Class cliDriverClass = Class.forName(HIVE_MAIN_CLASS);
-
-      // We loaded the CLI Driver in this JVM, so we will just
-      // call it in-process. The CliDriver class has a method:
-      // void main(String [] args) throws Exception.
-      //
-      // We'll call that here to invoke 'hive -f scriptfile'.
-      // Because this method will call System.exit(), we use
-      // a SecurityManager to prevent this.
-      LOG.debug("Using in-process Hive instance.");
-
-      subprocessSM = new SubprocessSecurityManager();
-      subprocessSM.install();
-
-      // Create the argv for the Hive Cli Driver.
-      String [] argArray = new String[2];
-      argArray[0] = "-f";
-      argArray[1] = filename;
-
-      // And invoke the static method on this array.
-      Method mainMethod = cliDriverClass.getMethod("main", argArray.getClass());
-      mainMethod.invoke(null, (Object) argArray);
-
-    } catch (ClassNotFoundException cnfe) {
-      // Hive is not on the classpath. Run externally.
-      // This is not an error path.
-      LOG.debug("Using external Hive process.");
-      executeExternalHiveScript(filename, env);
-    } catch (NoSuchMethodException nsme) {
-      // Could not find a handle to the main() method.
-      throw new IOException("Could not access CliDriver.main()", nsme);
-    } catch (IllegalAccessException iae) {
-      // Error getting a handle on the main() method.
-      throw new IOException("Could not access CliDriver.main()", iae);
-    } catch (InvocationTargetException ite) {
-      // We ran CliDriver.main() and an exception was thrown from within Hive.
-      // This may have been the ExitSecurityException triggered by the
-      // SubprocessSecurityManager. If so, handle it. Otherwise, wrap in
-      // an IOException and rethrow.
-
-      Throwable cause = ite.getCause();
-      if (cause instanceof ExitSecurityException) {
-        ExitSecurityException ese = (ExitSecurityException) cause;
-        int status = ese.getExitStatus();
-        if (status != 0) {
-          throw new IOException("Hive CliDriver exited with status=" + status);
-        }
-      } else {
-        throw new IOException("Exception thrown in Hive", ite);
-      }
-    } finally {
-      if (null != subprocessSM) {
-        // Uninstall the SecurityManager used to trap System.exit().
-        subprocessSM.uninstall();
-      }
-    }
-  }
-
-  /**
-   * Execute Hive via an external 'bin/hive' process.
-   * @param filename the Script file to run.
-   * @param env the environment strings to pass to any subprocess.
-   * @throws IOException if Hive did not exit successfully.
-   */
-  private void executeExternalHiveScript(String filename, List<String> env)
-      throws IOException {
-    // run Hive on the script and note the return code.
-    String hiveExec = getHiveBinPath();
-    ArrayList<String> args = new ArrayList<String>();
-    args.add(hiveExec);
-    args.add("-f");
-    args.add(filename);
-
-    LoggingAsyncSink logSink = new LoggingAsyncSink(LOG);
-    int ret = Executor.exec(args.toArray(new String[0]),
-        env.toArray(new String[0]), logSink, logSink);
-    if (0 != ret) {
-      throw new IOException("Hive exited with status " + ret);
-    }
-  }
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveTypes.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveTypes.java?rev=1195674&r1=1195673&r2=1195674&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveTypes.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/HiveTypes.java Mon Oct 31 21:15:29 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,71 +18,19 @@
 
 package com.cloudera.sqoop.hive;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.sql.Types;
-
 /**
- * Defines conversion between SQL types and Hive types.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public final class HiveTypes {
 
-  public static final Log LOG = LogFactory.getLog(HiveTypes.class.getName());
+  private HiveTypes() { }
 
-  private HiveTypes() {
-  }
-
-  /**
-   * Given JDBC SQL types coming from another database, what is the best
-   * mapping to a Hive-specific type?
-   */
   public static String toHiveType(int sqlType) {
-
-      switch (sqlType) {
-          case Types.INTEGER:
-          case Types.SMALLINT:
-              return "INT";
-          case Types.VARCHAR:
-          case Types.CHAR:
-          case Types.LONGVARCHAR:
-          case Types.NVARCHAR:
-          case Types.NCHAR:
-          case Types.LONGNVARCHAR:
-          case Types.DATE:
-          case Types.TIME:
-          case Types.TIMESTAMP:
-          case Types.CLOB:
-              return "STRING";
-          case Types.NUMERIC:
-          case Types.DECIMAL:
-          case Types.FLOAT:
-          case Types.DOUBLE:
-          case Types.REAL:
-              return "DOUBLE";
-          case Types.BIT:
-          case Types.BOOLEAN:
-              return "BOOLEAN";
-          case Types.TINYINT:
-              return "TINYINT";
-          case Types.BIGINT:
-              return "BIGINT";
-          default:
-        // TODO(aaron): Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT,
-        // BLOB, ARRAY, STRUCT, REF, JAVA_OBJECT.
-        return null;
-      }
+    return org.apache.sqoop.hive.HiveTypes.toHiveType(sqlType);
   }
 
-  /**
-   * @return true if a sql type can't be translated to a precise match
-   * in Hive, and we have to cast it to something more generic.
-   */
   public static boolean isHiveTypeImprovised(int sqlType) {
-    return sqlType == Types.DATE || sqlType == Types.TIME
-        || sqlType == Types.TIMESTAMP
-        || sqlType == Types.DECIMAL
-        || sqlType == Types.NUMERIC;
+    return org.apache.sqoop.hive.HiveTypes.isHiveTypeImprovised(sqlType);
   }
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/TableDefWriter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/TableDefWriter.java?rev=1195674&r1=1195673&r2=1195674&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/TableDefWriter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/hive/TableDefWriter.java Mon Oct 31 21:15:29 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,268 +19,25 @@
 package com.cloudera.sqoop.hive;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.io.CodecMap;
 import com.cloudera.sqoop.manager.ConnManager;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.Date;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-
-import java.util.Properties;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 /**
- * Creates (Hive-specific) SQL DDL statements to create tables to hold data
- * we're importing from another source.
- *
- * After we import the database into HDFS, we can inject it into Hive using
- * the CREATE TABLE and LOAD DATA INPATH statements generated by this object.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public class TableDefWriter {
-
-  public static final Log LOG = LogFactory.getLog(
-      TableDefWriter.class.getName());
+public class TableDefWriter
+    extends org.apache.sqoop.hive.TableDefWriter {
 
-  private SqoopOptions options;
-  private ConnManager connManager;
-  private Configuration configuration;
-  private String inputTableName;
-  private String outputTableName;
-  private boolean commentsEnabled;
-
-  /**
-   * Creates a new TableDefWriter to generate a Hive CREATE TABLE statement.
-   * @param opts program-wide options
-   * @param connMgr the connection manager used to describe the table.
-   * @param inputTable the name of the table to load.
-   * @param outputTable the name of the Hive table to create.
-   * @param config the Hadoop configuration to use to connect to the dfs
-   * @param withComments if true, then tables will be created with a
-   *        timestamp comment.
-   */
   public TableDefWriter(final SqoopOptions opts, final ConnManager connMgr,
       final String inputTable, final String outputTable,
       final Configuration config, final boolean withComments) {
-    this.options = opts;
-    this.connManager = connMgr;
-    this.inputTableName = inputTable;
-    this.outputTableName = outputTable;
-    this.configuration = config;
-    this.commentsEnabled = withComments;
+    super(opts, connMgr, inputTable, outputTable, config, withComments);
   }
 
-  private Map<String, Integer> externalColTypes;
-
-  /**
-   * Set the column type map to be used.
-   * (dependency injection for testing; not used in production.)
-   */
-  void setColumnTypes(Map<String, Integer> colTypes) {
-    this.externalColTypes = colTypes;
-    LOG.debug("Using test-controlled type map");
-  }
-
-  /**
-   * Get the column names to import.
-   */
-  private String [] getColumnNames() {
-    String [] colNames = options.getColumns();
-    if (null != colNames) {
-      return colNames; // user-specified column names.
-    } else if (null != externalColTypes) {
-      // Test-injection column mapping. Extract the col names from this.
-      ArrayList<String> keyList = new ArrayList<String>();
-      for (String key : externalColTypes.keySet()) {
-        keyList.add(key);
-      }
-
-      return keyList.toArray(new String[keyList.size()]);
-    } else if (null != inputTableName) {
-      return connManager.getColumnNames(inputTableName);
-    } else {
-      return connManager.getColumnNamesForQuery(options.getSqlQuery());
-    }
+  public static String getHiveOctalCharCode(int charNum) {
+    return org.apache.sqoop.hive.TableDefWriter.getHiveOctalCharCode(charNum);
   }
 
-  /**
-   * @return the CREATE TABLE statement for the table to load into hive.
-   */
-  public String getCreateTableStmt() throws IOException {
-    Map<String, Integer> columnTypes;
-    Properties userMapping = options.getMapColumnHive();
-
-    if (externalColTypes != null) {
-      // Use pre-defined column types.
-      columnTypes = externalColTypes;
-    } else {
-      // Get these from the database.
-      if (null != inputTableName) {
-        columnTypes = connManager.getColumnTypes(inputTableName);
-      } else {
-        columnTypes = connManager.getColumnTypesForQuery(options.getSqlQuery());
-      }
-    }
-
-    String [] colNames = getColumnNames();
-    StringBuilder sb = new StringBuilder();
-    if (options.doFailIfHiveTableExists()) {
-      sb.append("CREATE TABLE `").append(outputTableName).append("` ( ");
-    } else {
-      sb.append("CREATE TABLE IF NOT EXISTS `");
-      sb.append(outputTableName).append("` ( ");
-    }
-
-    // Check that all explicitly mapped columns are present in result set
-    for(Object column : userMapping.keySet()) {
-      boolean found = false;
-      for(String c : colNames) {
-        if(c.equals(column)) {
-          found = true;
-          break;
-        }
-      }
-
-      if(!found) {
-        throw new IllegalArgumentException("No column by the name " + column
-                + "found while importing data");
-      }
-    }
-
-    boolean first = true;
-    for (String col : colNames) {
-      if (!first) {
-        sb.append(", ");
-      }
-
-      first = false;
-
-      Integer colType = columnTypes.get(col);
-      String hiveColType = userMapping.getProperty(col);
-      if(hiveColType == null) { hiveColType = connManager.toHiveType(colType); }
-      if (null == hiveColType) {
-        throw new IOException("Hive does not support the SQL type for column "
-            + col);
-      }
-
-      sb.append('`').append(col).append("` ").append(hiveColType);
-
-      if (HiveTypes.isHiveTypeImprovised(colType)) {
-        LOG.warn(
-            "Column " + col + " had to be cast to a less precise type in Hive");
-      }
-    }
-
-    sb.append(") ");
-
-    if (commentsEnabled) {
-      DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
-      String curDateStr = dateFormat.format(new Date());
-      sb.append("COMMENT 'Imported by sqoop on " + curDateStr + "' ");
-    }
-
-    if (options.getHivePartitionKey() != null) {
-      sb.append("PARTITIONED BY (")
-        .append(options.getHivePartitionKey())
-        .append(" STRING) ");
-     }
-
-    sb.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '");
-    sb.append(getHiveOctalCharCode((int) options.getOutputFieldDelim()));
-    sb.append("' LINES TERMINATED BY '");
-    sb.append(getHiveOctalCharCode((int) options.getOutputRecordDelim()));
-    String codec = options.getCompressionCodec();
-    if (codec != null && (codec.equals(CodecMap.LZOP)
-            || codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
-      sb.append("' STORED AS INPUTFORMAT "
-              + "'com.hadoop.mapred.DeprecatedLzoTextInputFormat'");
-      sb.append(" OUTPUTFORMAT "
-              + "'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'");
-    } else {
-      sb.append("' STORED AS TEXTFILE");
-    }
-
-    LOG.debug("Create statement: " + sb.toString());
-    return sb.toString();
-  }
-
-  private static final int DEFAULT_HDFS_PORT =
-      org.apache.hadoop.hdfs.server.namenode.NameNode.DEFAULT_PORT;
-
-  /**
-   * @return the LOAD DATA statement to import the data in HDFS into hive.
-   */
-  public String getLoadDataStmt() throws IOException {
-    String finalPathStr = getFinalPathStr();
-
-    StringBuilder sb = new StringBuilder();
-    sb.append("LOAD DATA INPATH '");
-    sb.append(finalPathStr + "'");
-    if (options.doOverwriteHiveTable()) {
-      sb.append(" OVERWRITE");
-    }
-    sb.append(" INTO TABLE `");
-    sb.append(outputTableName);
-    sb.append('`');
-
-    if (options.getHivePartitionKey() != null) {
-      sb.append(" PARTITION (")
-        .append(options.getHivePartitionKey())
-        .append("='").append(options.getHivePartitionValue())
-        .append("')");
-    }
-
-    LOG.debug("Load statement: " + sb.toString());
-    return sb.toString();
-  }
-
-  public String getFinalPathStr() throws IOException {
-    String warehouseDir = options.getWarehouseDir();
-    if (null == warehouseDir) {
-      warehouseDir = "";
-    } else if (!warehouseDir.endsWith(File.separator)) {
-      warehouseDir = warehouseDir + File.separator;
-    }
-
-    String tablePath;
-    if (null != inputTableName) {
-      tablePath = warehouseDir + inputTableName;
-    } else {
-      tablePath = options.getTargetDir();
-    }
-    FileSystem fs = FileSystem.get(configuration);
-    Path finalPath = new Path(tablePath).makeQualified(fs);
-    return finalPath.toString();
-  }
-
-  /**
-   * Return a string identifying the character to use as a delimiter
-   * in Hive, in octal representation.
-   * Hive can specify delimiter characters in the form '\ooo' where
-   * ooo is a three-digit octal number between 000 and 177. Values
-   * may not be truncated ('\12' is wrong; '\012' is ok) nor may they
-   * be zero-prefixed (e.g., '\0177' is wrong).
-   *
-   * @param charNum the character to use as a delimiter
-   * @return a string of the form "\ooo" where ooo is an octal number
-   * in [000, 177].
-   * @throws IllegalArgumentException if charNum &gt; 0177.
-   */
-  static String getHiveOctalCharCode(int charNum) {
-    if (charNum > 0177) {
-      throw new IllegalArgumentException(
-          "Character " + charNum + " is an out-of-range delimiter");
-    }
-
-    return String.format("\\%03o", charNum);
-  }
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/SplittableBufferedWriter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/SplittableBufferedWriter.java?rev=1195674&r1=1195673&r2=1195674&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/SplittableBufferedWriter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/io/SplittableBufferedWriter.java Mon Oct 31 21:15:29 2011
@@ -18,9 +18,6 @@
 
 package com.cloudera.sqoop.io;
 
-import org.apache.sqoop.io.SplittingOutputStream;
-
-
 /**
  * A BufferedWriter implementation that wraps around a SplittingOutputStream
  * and allows splitting of the underlying stream.

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java?rev=1195674&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java Mon Oct 31 21:15:29 2011
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hbase;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.cloudera.sqoop.lib.FieldMappable;
+import com.cloudera.sqoop.lib.FieldMapProcessor;
+import com.cloudera.sqoop.lib.ProcessingException;
+
+/**
+ * SqoopRecordProcessor that performs an HBase "put" operation
+ * that contains all the fields of the record.
+ */
+public class HBasePutProcessor implements Closeable, Configurable,
+    FieldMapProcessor {
+
+  /** Configuration key specifying the table to insert into. */
+  public static final String TABLE_NAME_KEY = "sqoop.hbase.insert.table";
+
+  /** Configuration key specifying the column family to insert into. */
+  public static final String COL_FAMILY_KEY =
+      "sqoop.hbase.insert.column.family";
+
+  /** Configuration key specifying the column of the input whose value
+   * should be used as the row id.
+   */
+  public static final String ROW_KEY_COLUMN_KEY =
+      "sqoop.hbase.insert.row.key.column";
+
+  /**
+   * Configuration key specifying the PutTransformer implementation to use.
+   */
+  public static final String TRANSFORMER_CLASS_KEY =
+      "sqoop.hbase.insert.put.transformer.class";
+
+  private Configuration conf;
+
+  // An object that can transform a map of fieldName->object
+  // into a Put command.
+  private PutTransformer putTransformer;
+
+  private String tableName;
+  private HTable table;
+
+  public HBasePutProcessor() {
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void setConf(Configuration config) {
+    this.conf = config;
+
+    // Get the implementation of PutTransformer to use.
+    // By default, we call toString() on every non-null field.
+    Class<? extends PutTransformer> xformerClass =
+        (Class<? extends PutTransformer>)
+        this.conf.getClass(TRANSFORMER_CLASS_KEY, ToStringPutTransformer.class);
+    this.putTransformer = (PutTransformer)
+        ReflectionUtils.newInstance(xformerClass, this.conf);
+    if (null == putTransformer) {
+      throw new RuntimeException("Could not instantiate PutTransformer.");
+    }
+
+    this.putTransformer.setColumnFamily(conf.get(COL_FAMILY_KEY, null));
+    this.putTransformer.setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null));
+
+    this.tableName = conf.get(TABLE_NAME_KEY, null);
+    try {
+      this.table = new HTable(conf, this.tableName);
+    } catch (IOException ioe) {
+      throw new RuntimeException("Could not access HBase table " + tableName,
+          ioe);
+    }
+    this.table.setAutoFlush(false);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  @Override
+  /**
+   * Processes a record by extracting its field map and converting
+   * it into a list of Put commands into HBase.
+   */
+  public void accept(FieldMappable record)
+      throws IOException, ProcessingException {
+    Map<String, Object> fields = record.getFieldMap();
+
+    List<Put> putList = putTransformer.getPutCommand(fields);
+    if (null != putList) {
+      for (Put put : putList) {
+        this.table.put(put);
+      }
+    }
+  }
+
+  @Override
+  /**
+   * Closes the HBase table and commits all pending operations.
+   */
+  public void close() throws IOException {
+    this.table.flushCommits();
+    this.table.close();
+  }
+
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/HBaseUtil.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/HBaseUtil.java?rev=1195674&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/HBaseUtil.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/HBaseUtil.java Mon Oct 31 21:15:29 2011
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hbase;
+
+/**
+ * This class provides a method that checks if HBase jars are present in the
+ * current classpath. It also provides a setAlwaysNoHBaseJarMode mechanism for
+ * testing and simulation the condition where the is on HBase jar (since hbase
+ * is pulled automatically by ivy)
+ */
+public final class HBaseUtil {
+
+  private static boolean testingMode = false;
+
+  private HBaseUtil() {
+  }
+
+  /**
+   * This is a way to make this always return false for testing.
+   */
+  public static void setAlwaysNoHBaseJarMode(boolean mode) {
+    testingMode = mode;
+  }
+
+  public static boolean isHBaseJarPresent() {
+    if (testingMode) {
+      return false;
+    }
+    try {
+      Class.forName("org.apache.hadoop.hbase.client.HTable");
+    } catch (ClassNotFoundException cnfe) {
+      return false;
+    }
+    return true;
+  }
+
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/PutTransformer.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/PutTransformer.java?rev=1195674&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/PutTransformer.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/PutTransformer.java Mon Oct 31 21:15:29 2011
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hbase;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.client.Put;
+
+/**
+ * Interface that takes a map of jdbc field names to values
+ * and converts them to a Put command for HBase.
+ */
+public abstract class PutTransformer {
+
+  private String columnFamily;
+  private String rowKeyColumn;
+
+  /**
+   * @return the default column family to insert into.
+   */
+  public String getColumnFamily() {
+    return this.columnFamily;
+  }
+
+  /**
+   * Set the default column family to insert into.
+   */
+  public void setColumnFamily(String colFamily) {
+    this.columnFamily = colFamily;
+  }
+
+  /**
+   * @return the field name identifying the value to use as the row id.
+   */
+  public String getRowKeyColumn() {
+    return this.rowKeyColumn;
+  }
+
+  /**
+   * Set the column of the input fields which should be used to calculate
+   * the row id.
+   */
+  public void setRowKeyColumn(String rowKeyCol) {
+    this.rowKeyColumn = rowKeyCol;
+  }
+
+  /**
+   * Returns a list of Put commands that inserts the fields into a row in HBase.
+   * @param fields a map of field names to values to insert.
+   * @return A list of Put commands that inserts these into HBase.
+   */
+  public abstract List<Put> getPutCommand(Map<String, Object> fields)
+      throws IOException;
+
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java?rev=1195674&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java Mon Oct 31 21:15:29 2011
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hbase;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.cloudera.sqoop.hbase.PutTransformer;
+
+/**
+ * PutTransformer that calls toString on all non-null fields.
+ */
+public class ToStringPutTransformer extends PutTransformer {
+
+  public static final Log LOG = LogFactory.getLog(
+      ToStringPutTransformer.class.getName());
+
+  // A mapping from field name -> bytes for that field name.
+  // Used to cache serialization work done for fields names.
+  private Map<String, byte[]> serializedFieldNames;
+
+  public ToStringPutTransformer() {
+    serializedFieldNames = new TreeMap<String, byte[]>();
+  }
+
+  /**
+   * Return the serialized bytes for a field name, using
+   * the cache if it's already in there.
+   */
+  private byte [] getFieldNameBytes(String fieldName) {
+    byte [] cachedName = serializedFieldNames.get(fieldName);
+    if (null != cachedName) {
+      // Cache hit. We're done.
+      return cachedName;
+    }
+
+    // Do the serialization and memoize the result.
+    byte [] nameBytes = Bytes.toBytes(fieldName);
+    serializedFieldNames.put(fieldName, nameBytes);
+    return nameBytes;
+  }
+
+  @Override
+  /** {@inheritDoc} */
+  public List<Put> getPutCommand(Map<String, Object> fields)
+      throws IOException {
+
+    String rowKeyCol = getRowKeyColumn();
+    String colFamily = getColumnFamily();
+    byte [] colFamilyBytes = Bytes.toBytes(colFamily);
+
+    Object rowKey = fields.get(rowKeyCol);
+    if (null == rowKey) {
+      // If the row-key column is null, we don't insert this row.
+      LOG.warn("Could not insert row with null value for row-key column: "
+          + rowKeyCol);
+      return null;
+    }
+
+    Put put = new Put(Bytes.toBytes(rowKey.toString()));
+
+    for (Map.Entry<String, Object> fieldEntry : fields.entrySet()) {
+      String colName = fieldEntry.getKey();
+      if (!colName.equals(rowKeyCol)) {
+        // This is a regular field, not the row key.
+        // Add it if it's not null.
+        Object val = fieldEntry.getValue();
+        if (null != val) {
+          put.add(colFamilyBytes, getFieldNameBytes(colName),
+              Bytes.toBytes(val.toString()));
+        }
+      }
+    }
+
+    return Collections.singletonList(put);
+  }
+
+}

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/HiveImport.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/HiveImport.java?rev=1195674&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/HiveImport.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/HiveImport.java Mon Oct 31 21:15:29 2011
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.util.Tool;
+import org.apache.sqoop.io.CodecMap;
+import org.apache.sqoop.util.Executor;
+import org.apache.sqoop.util.LoggingAsyncSink;
+import org.apache.sqoop.util.SubprocessSecurityManager;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.util.ExitSecurityException;
+
+/**
+ * Utility to import a table into the Hive metastore. Manages the connection
+ * to Hive itself as well as orchestrating the use of the other classes in this
+ * package.
+ */
+public class HiveImport {
+
+  public static final Log LOG = LogFactory.getLog(HiveImport.class.getName());
+
+  private SqoopOptions options;
+  private ConnManager connManager;
+  private Configuration configuration;
+  private boolean generateOnly;
+
+  /** Entry point through which Hive invocation should be attempted. */
+  private static final String HIVE_MAIN_CLASS =
+      "org.apache.hadoop.hive.cli.CliDriver";
+
+  public HiveImport(final SqoopOptions opts, final ConnManager connMgr,
+      final Configuration conf, final boolean generateOnly) {
+    this.options = opts;
+    this.connManager = connMgr;
+    this.configuration = conf;
+    this.generateOnly = generateOnly;
+  }
+
+
+  /**
+   * @return the filename of the hive executable to run to do the import
+   */
+  private String getHiveBinPath() {
+    // If the user has $HIVE_HOME set, then use $HIVE_HOME/bin/hive if it
+    // exists.
+    // Fall back to just plain 'hive' and hope it's in the path.
+
+    String hiveHome = options.getHiveHome();
+    if (null == hiveHome) {
+      return "hive";
+    }
+
+    Path p = new Path(hiveHome);
+    p = new Path(p, "bin");
+    p = new Path(p, "hive");
+    String hiveBinStr = p.toString();
+    if (new File(hiveBinStr).exists()) {
+      return hiveBinStr;
+    } else {
+      return "hive";
+    }
+  }
+
+  /**
+   * If we used a MapReduce-based upload of the data, remove the _logs dir
+   * from where we put it, before running Hive LOAD DATA INPATH.
+   */
+  private void removeTempLogs(String tableName) throws IOException {
+    FileSystem fs = FileSystem.get(configuration);
+    Path tablePath;
+    if (null != tableName) {
+        String warehouseDir = options.getWarehouseDir();
+        if (warehouseDir != null) {
+          tablePath = new Path(new Path(warehouseDir), tableName);
+        } else {
+          tablePath = new Path(tableName);
+        }
+    } else {
+        // --table option is not used, so use the target dir instead
+        tablePath = new Path(options.getTargetDir());
+    }
+
+    Path logsPath = new Path(tablePath, "_logs");
+    if (fs.exists(logsPath)) {
+      LOG.info("Removing temporary files from import process: " + logsPath);
+      if (!fs.delete(logsPath, true)) {
+        LOG.warn("Could not delete temporary files; "
+            + "continuing with import, but it may fail.");
+      }
+    }
+  }
+
+  /**
+   * @return true if we're just generating the DDL for the import, but
+   * not actually running it (i.e., --generate-only mode). If so, don't
+   * do any side-effecting actions in Hive.
+   */
+  private boolean isGenerateOnly() {
+    return generateOnly;
+  }
+
+  /**
+   * @return a File object that can be used to write the DDL statement.
+   * If we're in gen-only mode, this should be a file in the outdir, named
+   * after the Hive table we're creating. If we're in import mode, this should
+   * be a one-off temporary file.
+   */
+  private File getScriptFile(String outputTableName) throws IOException {
+    if (!isGenerateOnly()) {
+      return File.createTempFile("hive-script-", ".txt",
+          new File(options.getTempDir()));
+    } else {
+      return new File(new File(options.getCodeOutputDir()),
+          outputTableName + ".q");
+    }
+  }
+
+  /**
+   * Perform the import of data from an HDFS path to a Hive table.
+   *
+   * @param inputTableName the name of the table as loaded into HDFS
+   * @param outputTableName the name of the table to create in Hive.
+   * @param createOnly if true, run the CREATE TABLE statement but not
+   * LOAD DATA.
+   */
+  public void importTable(String inputTableName, String outputTableName,
+      boolean createOnly) throws IOException {
+
+    if (!isGenerateOnly()) {
+      removeTempLogs(inputTableName);
+      LOG.info("Loading uploaded data into Hive");
+    }
+
+    if (null == outputTableName) {
+      outputTableName = inputTableName;
+    }
+    LOG.debug("Hive.inputTable: " + inputTableName);
+    LOG.debug("Hive.outputTable: " + outputTableName);
+
+    // For testing purposes against our mock hive implementation,
+    // if the sysproperty "expected.script" is set, we set the EXPECTED_SCRIPT
+    // environment variable for the child hive process. We also disable
+    // timestamp comments so that we have deterministic table creation scripts.
+    String expectedScript = System.getProperty("expected.script");
+    List<String> env = Executor.getCurEnvpStrings();
+    boolean debugMode = expectedScript != null;
+    if (debugMode) {
+      env.add("EXPECTED_SCRIPT=" + expectedScript);
+      env.add("TMPDIR=" + options.getTempDir());
+    }
+
+    // generate the HQL statements to run.
+    TableDefWriter tableWriter = new TableDefWriter(options, connManager,
+        inputTableName, outputTableName,
+        configuration, !debugMode);
+    String createTableStr = tableWriter.getCreateTableStmt() + ";\n";
+    String loadDataStmtStr = tableWriter.getLoadDataStmt() + ";\n";
+
+    if (!isGenerateOnly()) {
+      String codec = options.getCompressionCodec();
+      if (codec != null && (codec.equals(CodecMap.LZOP)
+              || codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
+        try {
+          String finalPathStr = tableWriter.getFinalPathStr();
+          Tool tool = ReflectionUtils.newInstance(Class.
+                  forName("com.hadoop.compression.lzo.DistributedLzoIndexer").
+                  asSubclass(Tool.class), configuration);
+          ToolRunner.run(configuration, tool, new String[] { finalPathStr });
+        } catch (Exception ex) {
+          LOG.error("Error indexing lzo files", ex);
+          throw new IOException("Error indexing lzo files", ex);
+        }
+      }
+    }
+
+    // write them to a script file.
+    File scriptFile = getScriptFile(outputTableName);
+    try {
+      String filename = scriptFile.toString();
+      BufferedWriter w = null;
+      try {
+        FileOutputStream fos = new FileOutputStream(scriptFile);
+        w = new BufferedWriter(new OutputStreamWriter(fos));
+        w.write(createTableStr, 0, createTableStr.length());
+        if (!createOnly) {
+          w.write(loadDataStmtStr, 0, loadDataStmtStr.length());
+        }
+      } catch (IOException ioe) {
+        LOG.error("Error writing Hive load-in script: " + ioe.toString());
+        ioe.printStackTrace();
+        throw ioe;
+      } finally {
+        if (null != w) {
+          try {
+            w.close();
+          } catch (IOException ioe) {
+            LOG.warn("IOException closing stream to Hive script: "
+                + ioe.toString());
+          }
+        }
+      }
+
+      if (!isGenerateOnly()) {
+        executeScript(filename, env);
+
+        LOG.info("Hive import complete.");
+      }
+    } finally {
+      if (!isGenerateOnly()) {
+        // User isn't interested in saving the DDL. Remove the file.
+        if (!scriptFile.delete()) {
+          LOG.warn("Could not remove temporary file: " + scriptFile.toString());
+          // try to delete the file later.
+          scriptFile.deleteOnExit();
+        }
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  /**
+   * Execute the script file via Hive.
+   * If Hive's jars are on the classpath, run it in the same process.
+   * Otherwise, execute the file with 'bin/hive'.
+   *
+   * @param filename The script file to run.
+   * @param env the environment strings to pass to any subprocess.
+   * @throws IOException if Hive did not exit successfully.
+   */
+  private void executeScript(String filename, List<String> env)
+      throws IOException {
+    SubprocessSecurityManager subprocessSM = null;
+
+    try {
+      Class cliDriverClass = Class.forName(HIVE_MAIN_CLASS);
+
+      // We loaded the CLI Driver in this JVM, so we will just
+      // call it in-process. The CliDriver class has a method:
+      // void main(String [] args) throws Exception.
+      //
+      // We'll call that here to invoke 'hive -f scriptfile'.
+      // Because this method will call System.exit(), we use
+      // a SecurityManager to prevent this.
+      LOG.debug("Using in-process Hive instance.");
+
+      subprocessSM = new SubprocessSecurityManager();
+      subprocessSM.install();
+
+      // Create the argv for the Hive Cli Driver.
+      String [] argArray = new String[2];
+      argArray[0] = "-f";
+      argArray[1] = filename;
+
+      // And invoke the static method on this array.
+      Method mainMethod = cliDriverClass.getMethod("main", argArray.getClass());
+      mainMethod.invoke(null, (Object) argArray);
+
+    } catch (ClassNotFoundException cnfe) {
+      // Hive is not on the classpath. Run externally.
+      // This is not an error path.
+      LOG.debug("Using external Hive process.");
+      executeExternalHiveScript(filename, env);
+    } catch (NoSuchMethodException nsme) {
+      // Could not find a handle to the main() method.
+      throw new IOException("Could not access CliDriver.main()", nsme);
+    } catch (IllegalAccessException iae) {
+      // Error getting a handle on the main() method.
+      throw new IOException("Could not access CliDriver.main()", iae);
+    } catch (InvocationTargetException ite) {
+      // We ran CliDriver.main() and an exception was thrown from within Hive.
+      // This may have been the ExitSecurityException triggered by the
+      // SubprocessSecurityManager. If so, handle it. Otherwise, wrap in
+      // an IOException and rethrow.
+
+      Throwable cause = ite.getCause();
+      if (cause instanceof ExitSecurityException) {
+        ExitSecurityException ese = (ExitSecurityException) cause;
+        int status = ese.getExitStatus();
+        if (status != 0) {
+          throw new IOException("Hive CliDriver exited with status=" + status);
+        }
+      } else {
+        throw new IOException("Exception thrown in Hive", ite);
+      }
+    } finally {
+      if (null != subprocessSM) {
+        // Uninstall the SecurityManager used to trap System.exit().
+        subprocessSM.uninstall();
+      }
+    }
+  }
+
+  /**
+   * Execute Hive via an external 'bin/hive' process.
+   * @param filename the Script file to run.
+   * @param env the environment strings to pass to any subprocess.
+   * @throws IOException if Hive did not exit successfully.
+   */
+  private void executeExternalHiveScript(String filename, List<String> env)
+      throws IOException {
+    // run Hive on the script and note the return code.
+    String hiveExec = getHiveBinPath();
+    ArrayList<String> args = new ArrayList<String>();
+    args.add(hiveExec);
+    args.add("-f");
+    args.add(filename);
+
+    LoggingAsyncSink logSink = new LoggingAsyncSink(LOG);
+    int ret = Executor.exec(args.toArray(new String[0]),
+        env.toArray(new String[0]), logSink, logSink);
+    if (0 != ret) {
+      throw new IOException("Hive exited with status " + ret);
+    }
+  }
+}
+

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/HiveTypes.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/HiveTypes.java?rev=1195674&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/HiveTypes.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/HiveTypes.java Mon Oct 31 21:15:29 2011
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive;
+
+import java.sql.Types;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Defines conversion between SQL types and Hive types.
+ */
+public final class HiveTypes {
+
+  public static final Log LOG = LogFactory.getLog(HiveTypes.class.getName());
+
+  private HiveTypes() { }
+
+  /**
+   * Given JDBC SQL types coming from another database, what is the best
+   * mapping to a Hive-specific type?
+   */
+  public static String toHiveType(int sqlType) {
+
+      switch (sqlType) {
+          case Types.INTEGER:
+          case Types.SMALLINT:
+              return "INT";
+          case Types.VARCHAR:
+          case Types.CHAR:
+          case Types.LONGVARCHAR:
+          case Types.NVARCHAR:
+          case Types.NCHAR:
+          case Types.LONGNVARCHAR:
+          case Types.DATE:
+          case Types.TIME:
+          case Types.TIMESTAMP:
+          case Types.CLOB:
+              return "STRING";
+          case Types.NUMERIC:
+          case Types.DECIMAL:
+          case Types.FLOAT:
+          case Types.DOUBLE:
+          case Types.REAL:
+              return "DOUBLE";
+          case Types.BIT:
+          case Types.BOOLEAN:
+              return "BOOLEAN";
+          case Types.TINYINT:
+              return "TINYINT";
+          case Types.BIGINT:
+              return "BIGINT";
+          default:
+        // TODO(aaron): Support BINARY, VARBINARY, LONGVARBINARY, DISTINCT,
+        // BLOB, ARRAY, STRUCT, REF, JAVA_OBJECT.
+        return null;
+      }
+  }
+
+  /**
+   * @return true if a sql type can't be translated to a precise match
+   * in Hive, and we have to cast it to something more generic.
+   */
+  public static boolean isHiveTypeImprovised(int sqlType) {
+    return sqlType == Types.DATE || sqlType == Types.TIME
+        || sqlType == Types.TIMESTAMP
+        || sqlType == Types.DECIMAL
+        || sqlType == Types.NUMERIC;
+  }
+}
+

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/TableDefWriter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/TableDefWriter.java?rev=1195674&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/TableDefWriter.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/hive/TableDefWriter.java Mon Oct 31 21:15:29 2011
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.hive;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Date;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.io.CodecMap;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.manager.ConnManager;
+
+/**
+ * Creates (Hive-specific) SQL DDL statements to create tables to hold data
+ * we're importing from another source.
+ *
+ * After we import the database into HDFS, we can inject it into Hive using
+ * the CREATE TABLE and LOAD DATA INPATH statements generated by this object.
+ */
+public class TableDefWriter {
+
+  public static final Log LOG = LogFactory.getLog(
+      TableDefWriter.class.getName());
+
+  private SqoopOptions options;
+  private ConnManager connManager;
+  private Configuration configuration;
+  private String inputTableName;
+  private String outputTableName;
+  private boolean commentsEnabled;
+
+  /**
+   * Creates a new TableDefWriter to generate a Hive CREATE TABLE statement.
+   * @param opts program-wide options
+   * @param connMgr the connection manager used to describe the table.
+   * @param inputTable the name of the table to load.
+   * @param outputTable the name of the Hive table to create.
+   * @param config the Hadoop configuration to use to connect to the dfs
+   * @param withComments if true, then tables will be created with a
+   *        timestamp comment.
+   */
+  public TableDefWriter(final SqoopOptions opts, final ConnManager connMgr,
+      final String inputTable, final String outputTable,
+      final Configuration config, final boolean withComments) {
+    this.options = opts;
+    this.connManager = connMgr;
+    this.inputTableName = inputTable;
+    this.outputTableName = outputTable;
+    this.configuration = config;
+    this.commentsEnabled = withComments;
+  }
+
+  private Map<String, Integer> externalColTypes;
+
+  /**
+   * Set the column type map to be used.
+   * (dependency injection for testing; not used in production.)
+   */
+  public void setColumnTypes(Map<String, Integer> colTypes) {
+    this.externalColTypes = colTypes;
+    LOG.debug("Using test-controlled type map");
+  }
+
+  /**
+   * Get the column names to import.
+   */
+  private String [] getColumnNames() {
+    String [] colNames = options.getColumns();
+    if (null != colNames) {
+      return colNames; // user-specified column names.
+    } else if (null != externalColTypes) {
+      // Test-injection column mapping. Extract the col names from this.
+      ArrayList<String> keyList = new ArrayList<String>();
+      for (String key : externalColTypes.keySet()) {
+        keyList.add(key);
+      }
+
+      return keyList.toArray(new String[keyList.size()]);
+    } else if (null != inputTableName) {
+      return connManager.getColumnNames(inputTableName);
+    } else {
+      return connManager.getColumnNamesForQuery(options.getSqlQuery());
+    }
+  }
+
+  /**
+   * @return the CREATE TABLE statement for the table to load into hive.
+   */
+  public String getCreateTableStmt() throws IOException {
+    Map<String, Integer> columnTypes;
+    Properties userMapping = options.getMapColumnHive();
+
+    if (externalColTypes != null) {
+      // Use pre-defined column types.
+      columnTypes = externalColTypes;
+    } else {
+      // Get these from the database.
+      if (null != inputTableName) {
+        columnTypes = connManager.getColumnTypes(inputTableName);
+      } else {
+        columnTypes = connManager.getColumnTypesForQuery(options.getSqlQuery());
+      }
+    }
+
+    String [] colNames = getColumnNames();
+    StringBuilder sb = new StringBuilder();
+    if (options.doFailIfHiveTableExists()) {
+      sb.append("CREATE TABLE `").append(outputTableName).append("` ( ");
+    } else {
+      sb.append("CREATE TABLE IF NOT EXISTS `");
+      sb.append(outputTableName).append("` ( ");
+    }
+
+    // Check that all explicitly mapped columns are present in result set
+    for(Object column : userMapping.keySet()) {
+      boolean found = false;
+      for(String c : colNames) {
+        if(c.equals(column)) {
+          found = true;
+          break;
+        }
+      }
+
+      if(!found) {
+        throw new IllegalArgumentException("No column by the name " + column
+                + "found while importing data");
+      }
+    }
+
+    boolean first = true;
+    for (String col : colNames) {
+      if (!first) {
+        sb.append(", ");
+      }
+
+      first = false;
+
+      Integer colType = columnTypes.get(col);
+      String hiveColType = userMapping.getProperty(col);
+      if(hiveColType == null) { hiveColType = connManager.toHiveType(colType); }
+      if (null == hiveColType) {
+        throw new IOException("Hive does not support the SQL type for column "
+            + col);
+      }
+
+      sb.append('`').append(col).append("` ").append(hiveColType);
+
+      if (HiveTypes.isHiveTypeImprovised(colType)) {
+        LOG.warn(
+            "Column " + col + " had to be cast to a less precise type in Hive");
+      }
+    }
+
+    sb.append(") ");
+
+    if (commentsEnabled) {
+      DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+      String curDateStr = dateFormat.format(new Date());
+      sb.append("COMMENT 'Imported by sqoop on " + curDateStr + "' ");
+    }
+
+    if (options.getHivePartitionKey() != null) {
+      sb.append("PARTITIONED BY (")
+        .append(options.getHivePartitionKey())
+        .append(" STRING) ");
+     }
+
+    sb.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '");
+    sb.append(getHiveOctalCharCode((int) options.getOutputFieldDelim()));
+    sb.append("' LINES TERMINATED BY '");
+    sb.append(getHiveOctalCharCode((int) options.getOutputRecordDelim()));
+    String codec = options.getCompressionCodec();
+    if (codec != null && (codec.equals(CodecMap.LZOP)
+            || codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
+      sb.append("' STORED AS INPUTFORMAT "
+              + "'com.hadoop.mapred.DeprecatedLzoTextInputFormat'");
+      sb.append(" OUTPUTFORMAT "
+              + "'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'");
+    } else {
+      sb.append("' STORED AS TEXTFILE");
+    }
+
+    LOG.debug("Create statement: " + sb.toString());
+    return sb.toString();
+  }
+
+  /**
+   * @return the LOAD DATA statement to import the data in HDFS into hive.
+   */
+  public String getLoadDataStmt() throws IOException {
+    String finalPathStr = getFinalPathStr();
+
+    StringBuilder sb = new StringBuilder();
+    sb.append("LOAD DATA INPATH '");
+    sb.append(finalPathStr + "'");
+    if (options.doOverwriteHiveTable()) {
+      sb.append(" OVERWRITE");
+    }
+    sb.append(" INTO TABLE `");
+    sb.append(outputTableName);
+    sb.append('`');
+
+    if (options.getHivePartitionKey() != null) {
+      sb.append(" PARTITION (")
+        .append(options.getHivePartitionKey())
+        .append("='").append(options.getHivePartitionValue())
+        .append("')");
+    }
+
+    LOG.debug("Load statement: " + sb.toString());
+    return sb.toString();
+  }
+
+  public String getFinalPathStr() throws IOException {
+    String warehouseDir = options.getWarehouseDir();
+    if (null == warehouseDir) {
+      warehouseDir = "";
+    } else if (!warehouseDir.endsWith(File.separator)) {
+      warehouseDir = warehouseDir + File.separator;
+    }
+
+    String tablePath;
+    if (null != inputTableName) {
+      tablePath = warehouseDir + inputTableName;
+    } else {
+      tablePath = options.getTargetDir();
+    }
+    FileSystem fs = FileSystem.get(configuration);
+    Path finalPath = new Path(tablePath).makeQualified(fs);
+    return finalPath.toString();
+  }
+
+  /**
+   * Return a string identifying the character to use as a delimiter
+   * in Hive, in octal representation.
+   * Hive can specify delimiter characters in the form '\ooo' where
+   * ooo is a three-digit octal number between 000 and 177. Values
+   * may not be truncated ('\12' is wrong; '\012' is ok) nor may they
+   * be zero-prefixed (e.g., '\0177' is wrong).
+   *
+   * @param charNum the character to use as a delimiter
+   * @return a string of the form "\ooo" where ooo is an octal number
+   * in [000, 177].
+   * @throws IllegalArgumentException if charNum &gt; 0177.
+   */
+  public static String getHiveOctalCharCode(int charNum) {
+    if (charNum > 0177) {
+      throw new IllegalArgumentException(
+          "Character " + charNum + " is an out-of-range delimiter");
+    }
+
+    return String.format("\\%03o", charNum);
+  }
+
+}
+



Mime
View raw message