sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject svn commit: r1310554 - in /sqoop/trunk/src/test/com/cloudera/sqoop: manager/MySQLLobAvroImportTest.java manager/OracleLobAvroImportTest.java testutil/LobAvroImportTestCase.java
Date Fri, 06 Apr 2012 19:10:18 GMT
Author: blee
Date: Fri Apr  6 19:10:18 2012
New Revision: 1310554

URL: http://svn.apache.org/viewvc?rev=1310554&view=rev
Log:
SQOOP-470 Create tests for blob support for Avro import

Added:
    sqoop/trunk/src/test/com/cloudera/sqoop/manager/MySQLLobAvroImportTest.java   (with props)
    sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleLobAvroImportTest.java   (with props)
    sqoop/trunk/src/test/com/cloudera/sqoop/testutil/LobAvroImportTestCase.java   (with props)

Added: sqoop/trunk/src/test/com/cloudera/sqoop/manager/MySQLLobAvroImportTest.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/test/com/cloudera/sqoop/manager/MySQLLobAvroImportTest.java?rev=1310554&view=auto
==============================================================================
--- sqoop/trunk/src/test/com/cloudera/sqoop/manager/MySQLLobAvroImportTest.java (added)
+++ sqoop/trunk/src/test/com/cloudera/sqoop/manager/MySQLLobAvroImportTest.java Fri Apr  6
19:10:18 2012
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.manager;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.testutil.LobAvroImportTestCase;
+
+/**
+ * Tests BLOB/CLOB import for Avro with MySQL Db.
+ */
+public class MySQLLobAvroImportTest extends LobAvroImportTestCase {
+
+  public static final Log LOG = LogFactory.getLog(
+      OracleCompatTest.class.getName());
+
+  @Override
+  protected Log getLogger() {
+    return LOG;
+  }
+
+  @Override
+  protected String getDbFriendlyName() {
+    return "MySQL";
+  }
+
+  @Override
+  protected String getConnectString() {
+    return MySQLTestUtils.CONNECT_STRING;
+  }
+
+  @Override
+  protected SqoopOptions getSqoopOptions(Configuration conf) {
+    SqoopOptions opts = new SqoopOptions(conf);
+    opts.setUsername(MySQLTestUtils.getCurrentUser());
+    return opts;
+  }
+
+  @Override
+  protected void dropTableIfExists(String table) throws SQLException {
+    Connection conn = getManager().getConnection();
+    PreparedStatement statement = conn.prepareStatement(
+        "DROP TABLE IF EXISTS " + table,
+        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    try {
+      statement.executeUpdate();
+      conn.commit();
+    } finally {
+      statement.close();
+    }
+  }
+
+  @Override
+  protected String getBlobType() {
+    return "MEDIUMBLOB";
+  }
+}

Propchange: sqoop/trunk/src/test/com/cloudera/sqoop/manager/MySQLLobAvroImportTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sqoop/trunk/src/test/com/cloudera/sqoop/manager/MySQLLobAvroImportTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleLobAvroImportTest.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleLobAvroImportTest.java?rev=1310554&view=auto
==============================================================================
--- sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleLobAvroImportTest.java (added)
+++ sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleLobAvroImportTest.java Fri Apr 
6 19:10:18 2012
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.manager;
+
+import java.io.UnsupportedEncodingException;
+import java.sql.SQLException;
+import java.util.Formatter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.testutil.LobAvroImportTestCase;
+
+/**
+ * Tests BLOB/CLOB import for Avro with Oracle Db.
+ */
+public class OracleLobAvroImportTest extends LobAvroImportTestCase {
+
+  public static final Log LOG = LogFactory.getLog(
+      OracleCompatTest.class.getName());
+
+  @Override
+  protected Log getLogger() {
+    return LOG;
+  }
+
+  @Override
+  protected String getDbFriendlyName() {
+    return "Oracle";
+  }
+
+  @Override
+  protected String getConnectString() {
+    return OracleUtils.CONNECT_STRING;
+  }
+
+  @Override
+  protected SqoopOptions getSqoopOptions(Configuration conf) {
+    SqoopOptions opts = new SqoopOptions(conf);
+    OracleUtils.setOracleAuth(opts);
+    return opts;
+  }
+
+  @Override
+  protected void dropTableIfExists(String table) throws SQLException {
+    OracleUtils.dropTable(table, getManager());
+  }
+
+  @Override
+  protected String getBlobInsertStr(String blobData) {
+    // Oracle wants blob data encoded as hex (e.g. '01fca3b5').
+
+    StringBuilder sb = new StringBuilder();
+    sb.append("'");
+
+    Formatter fmt = new Formatter(sb);
+    try {
+      for (byte b : blobData.getBytes("UTF-8")) {
+        fmt.format("%02X", b);
+      }
+    } catch (UnsupportedEncodingException uee) {
+      // Should not happen; Java always supports UTF-8.
+      fail("Could not get utf-8 bytes for blob string");
+      return null;
+    }
+    sb.append("'");
+    return sb.toString();
+  }
+}

Propchange: sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleLobAvroImportTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleLobAvroImportTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sqoop/trunk/src/test/com/cloudera/sqoop/testutil/LobAvroImportTestCase.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/LobAvroImportTestCase.java?rev=1310554&view=auto
==============================================================================
--- sqoop/trunk/src/test/com/cloudera/sqoop/testutil/LobAvroImportTestCase.java (added)
+++ sqoop/trunk/src/test/com/cloudera/sqoop/testutil/LobAvroImportTestCase.java Fri Apr  6
19:10:18 2012
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.testutil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.sql.SQLException;
+import java.util.ArrayList;
+
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.io.CodecMap;
+import org.apache.sqoop.lib.BlobRef;
+
+/**
+ * Tests BLOB/CLOB import for Avro.
+ */
+public abstract class LobAvroImportTestCase extends ImportJobTestCase {
+
+  private Log log;
+
+  public LobAvroImportTestCase() {
+    this.log = LogFactory.getLog(LobAvroImportTestCase.class.getName());
+  }
+
+  /**
+   * @return the Log object to use for reporting during this test
+   */
+  protected abstract Log getLogger();
+
+  /**
+   * @return a "friendly" name for the database. e.g "mysql" or "oracle".
+   */
+  protected abstract String getDbFriendlyName();
+
+  @Override
+  protected String getTablePrefix() {
+    return "LOB_" + getDbFriendlyName().toUpperCase() + "_";
+  }
+
+  @Override
+  protected boolean useHsqldbTestServer() {
+    // Hsqldb does not support BLOB/CLOB
+    return false;
+  }
+
+  @Override
+  public void tearDown() {
+    try {
+      // Clean up the database on our way out.
+      dropTableIfExists(getTableName());
+    } catch (SQLException e) {
+      log.warn("Error trying to drop table '" + getTableName()
+          + "' on tearDown: " + e);
+    }
+    super.tearDown();
+  }
+
+  protected String [] getArgv(String ... additionalArgs) {
+    // Import every column of the table
+    String [] colNames = getColNames();
+    String splitByCol = colNames[0];
+    String columnsString = "";
+    for (String col : colNames) {
+      columnsString += col + ",";
+    }
+
+    ArrayList<String> args = new ArrayList<String>();
+
+    CommonArgs.addHadoopFlags(args);
+
+    args.add("--table");
+    args.add(getTableName());
+    args.add("--columns");
+    args.add(columnsString);
+    args.add("--split-by");
+    args.add(splitByCol);
+    args.add("--warehouse-dir");
+    args.add(getWarehouseDir());
+    args.add("--connect");
+    args.add(getConnectString());
+    args.add("--as-avrodatafile");
+    args.add("--num-mappers");
+    args.add("2");
+
+    for (String arg : additionalArgs) {
+      args.add(arg);
+    }
+
+    return args.toArray(new String[0]);
+  }
+
+  protected String getBlobType() {
+    return "BLOB";
+  }
+
+  protected String getBlobInsertStr(String blobData) {
+    return "'" + blobData + "'";
+  }
+
+  /**
+   * Return the current table number as a string. In test, table number is used
+   * to name .lob files.
+   * @return current table number.
+   */
+  private String getTableNum() {
+    return getTableName().substring(getTablePrefix().length());
+  }
+
+  /**
+   * Return an instance of DataFileReader for the given filename.
+   * @param filename path that we're opening a reader for.
+   * @return instance of DataFileReader.
+   * @throws IOException
+   */
+  private DataFileReader<GenericRecord> read(Path filename)
+      throws IOException {
+    Configuration conf = getConf();
+    if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
+      conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
+    }
+    FsInput fsInput = new FsInput(filename, conf);
+    DatumReader<GenericRecord> datumReader =
+      new GenericDatumReader<GenericRecord>();
+    return new DataFileReader<GenericRecord>(fsInput, datumReader);
+  }
+
+  /** Import blob data that is smaller than inline lob limit. Blob data
+   * should be saved as Avro bytes.
+   * @throws IOException
+   * @throws SQLException
+   */
+  public void testBlobAvroImportInline() throws IOException, SQLException {
+    String [] types = { getBlobType() };
+    String expectedVal = "This is short BLOB data";
+    String [] vals = { getBlobInsertStr(expectedVal) };
+
+    createTableWithColTypes(types, vals);
+
+    runImport(getArgv());
+
+    Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
+    DataFileReader<GenericRecord> reader = read(outputFile);
+    GenericRecord record = reader.next();
+
+    // Verify that blob data is imported as Avro bytes.
+    ByteBuffer buf = (ByteBuffer) record.get(getColName(0));
+    String returnVal = new String(buf.array());
+
+    assertEquals(getColName(0), expectedVal, returnVal);
+  }
+
+  /**
+   * Import blob data that is larger than inline lob limit. The reference file
+   * should be saved as Avro bytes. Blob data should be saved in LOB file
+   * format.
+   * @throws IOException
+   * @throws SQLException
+   */
+  public void testBlobAvroImportExternal() throws IOException, SQLException {
+    String [] types = { getBlobType() };
+    String data = "This is short BLOB data";
+    String [] vals = { getBlobInsertStr(data) };
+
+    createTableWithColTypes(types, vals);
+
+    // Set inline lob limit to a small value so that blob data will be
+    // written to an external file.
+    runImport(getArgv("--inline-lob-limit", "1"));
+
+    Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
+    DataFileReader<GenericRecord> reader = read(outputFile);
+    GenericRecord record = reader.next();
+
+    // Verify that the reference file is written in Avro bytes.
+    ByteBuffer buf = (ByteBuffer) record.get(getColName(0));
+    String returnVal = new String(buf.array());
+    String expectedVal = "externalLob(lf,_lob/large_obj_task_local_000"
+        + getTableNum() + "_m_0000000.lob,68," + data.length() + ")";
+
+    assertEquals(expectedVal, returnVal);
+
+    // Verify that blob data stored in the external lob file is correct.
+    BlobRef br = BlobRef.parse(returnVal);
+    Path lobFileDir = new Path(getWarehouseDir(), getTableName());
+    InputStream in = br.getDataStream(getConf(), lobFileDir);
+
+    byte [] bufArray = new byte[data.length()];
+    int chars = in.read(bufArray);
+    in.close();
+
+    assertEquals(chars, data.length());
+
+    returnVal = new String(bufArray);
+    expectedVal = data;
+
+    assertEquals(getColName(0), returnVal, expectedVal);
+  }
+
+  /**
+   * Import blob data that is smaller than inline lob limit and compress with
+   * deflate codec. Blob data should be encoded and saved as Avro bytes.
+   * @throws IOException
+   * @throws SQLException
+   */
+  public void testBlobCompressedAvroImportInline()
+      throws IOException, SQLException {
+    String [] types = { getBlobType() };
+    String expectedVal = "This is short BLOB data";
+    String [] vals = { getBlobInsertStr(expectedVal) };
+
+    createTableWithColTypes(types, vals);
+
+    runImport(getArgv("--compression-codec", CodecMap.DEFLATE));
+
+    Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
+    DataFileReader<GenericRecord> reader = read(outputFile);
+    GenericRecord record = reader.next();
+
+    // Verify that the data block of the Avro file is compressed with deflate
+    // codec.
+    assertEquals(CodecMap.DEFLATE,
+        reader.getMetaString(DataFileConstants.CODEC));
+
+    // Verify that all columns are imported correctly.
+    ByteBuffer buf = (ByteBuffer) record.get(getColName(0));
+    String returnVal = new String(buf.array());
+
+    assertEquals(getColName(0), expectedVal, returnVal);
+  }
+
+  /**
+   * Import blob data that is larger than inline lob limit and compress with
+   * deflate codec. The reference file should be encoded and saved as Avro
+   * bytes. Blob data should be saved in LOB file format without compression.
+   * @throws IOException
+   * @throws SQLException
+   */
+  public void testBlobCompressedAvroImportExternal()
+      throws IOException, SQLException {
+    String [] types = { getBlobType() };
+    String data = "This is short BLOB data";
+    String [] vals = { getBlobInsertStr(data) };
+
+    createTableWithColTypes(types, vals);
+
+    // Set inline lob limit to a small value so that blob data will be
+    // written to an external file.
+    runImport(getArgv(
+        "--inline-lob-limit", "1", "--compression-codec", CodecMap.DEFLATE));
+
+    Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
+    DataFileReader<GenericRecord> reader = read(outputFile);
+    GenericRecord record = reader.next();
+
+    // Verify that the data block of the Avro file is compressed with deflate
+    // codec.
+    assertEquals(CodecMap.DEFLATE,
+        reader.getMetaString(DataFileConstants.CODEC));
+
+    // Verify that the reference file is written in Avro bytes.
+    ByteBuffer buf = (ByteBuffer) record.get(getColName(0));
+    String returnVal = new String(buf.array());
+    String expectedVal = "externalLob(lf,_lob/large_obj_task_local_000"
+        + getTableNum() + "_m_0000000.lob,68," + data.length() + ")";
+
+    assertEquals(expectedVal, returnVal);
+
+    // Verify that blob data stored in the external lob file is correct.
+    BlobRef br = BlobRef.parse(returnVal);
+    Path lobFileDir = new Path(getWarehouseDir(), getTableName());
+    InputStream in = br.getDataStream(getConf(), lobFileDir);
+
+    byte [] bufArray = new byte[data.length()];
+    int chars = in.read(bufArray);
+    in.close();
+
+    assertEquals(chars, data.length());
+
+    returnVal = new String(bufArray);
+    expectedVal = data;
+
+    assertEquals(getColName(0), returnVal, expectedVal);
+  }
+
+  /**
+   * Import multiple columns of blob data. Blob data should be saved as Avro
+   * bytes.
+   * @throws IOException
+   * @throws SQLException
+   */
+  public void testBlobAvroImportMultiCols() throws IOException, SQLException {
+    String [] types = { getBlobType(), getBlobType(), getBlobType(), };
+    String expectedVal1 = "This is short BLOB data1";
+    String expectedVal2 = "This is short BLOB data2";
+    String expectedVal3 = "This is short BLOB data3";
+    String [] vals = { getBlobInsertStr(expectedVal1),
+                       getBlobInsertStr(expectedVal2),
+                       getBlobInsertStr(expectedVal3), };
+
+    createTableWithColTypes(types, vals);
+
+    runImport(getArgv());
+
+    Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
+    DataFileReader<GenericRecord> reader = read(outputFile);
+    GenericRecord record = reader.next();
+
+    // Verify that all columns are imported correctly.
+    ByteBuffer buf = (ByteBuffer) record.get(getColName(0));
+    String returnVal = new String(buf.array());
+
+    assertEquals(getColName(0), expectedVal1, returnVal);
+
+    buf = (ByteBuffer) record.get(getColName(1));
+    returnVal = new String(buf.array());
+
+    assertEquals(getColName(1), expectedVal2, returnVal);
+
+    buf = (ByteBuffer) record.get(getColName(2));
+    returnVal = new String(buf.array());
+
+    assertEquals(getColName(2), expectedVal3, returnVal);
+  }
+
+  public void testClobAvroImportInline() throws IOException, SQLException {
+    // TODO: add tests for CLOB support for Avro import
+  }
+
+  public void testClobAvroImportExternal() throws IOException, SQLException {
+    // TODO: add tests for CLOB support for Avro import
+  }
+
+  public void testClobCompressedAvroImportInline()
+      throws IOException, SQLException {
+    // TODO: add tests for CLOB support for Avro import
+  }
+
+  public void testClobCompressedAvroImportExternal()
+      throws IOException, SQLException {
+    // TODO: add tests for CLOB support for Avro import
+  }
+
+  public void testClobAvroImportMultiCols() throws IOException, SQLException {
+    // TODO: add tests for CLOB support for Avro import
+  }
+}

Propchange: sqoop/trunk/src/test/com/cloudera/sqoop/testutil/LobAvroImportTestCase.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sqoop/trunk/src/test/com/cloudera/sqoop/testutil/LobAvroImportTestCase.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message