sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arv...@apache.org
Subject svn commit: r1170979 - in /incubator/sqoop/trunk/src: java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java test/com/cloudera/sqoop/TestAvroImport.java
Date Thu, 15 Sep 2011 07:05:25 GMT
Author: arvind
Date: Thu Sep 15 07:05:25 2011
New Revision: 1170979

URL: http://svn.apache.org/viewvc?rev=1170979&view=rev
Log:
SQOOP-336. Avro import does not support varbinary types.

(Tom White via Arvind Prabhakar)

Modified:
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImport.java

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java?rev=1170979&r1=1170978&r2=1170979&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/orm/AvroSchemaGenerator.java Thu Sep
15 07:05:25 2011
@@ -101,6 +101,7 @@ public class AvroSchemaGenerator {
       case Types.TIMESTAMP:
         return Type.LONG;
       case Types.BINARY:
+      case Types.VARBINARY:
         return Type.BYTES;
       default:
         throw new IllegalArgumentException("Cannot convert SQL type "

Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImport.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImport.java?rev=1170979&r1=1170978&r2=1170979&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImport.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImport.java Thu Sep 15 07:05:25
2011
@@ -19,19 +19,20 @@
 package com.cloudera.sqoop;
 
 import java.io.IOException;
-import java.sql.Connection;
+import java.nio.ByteBuffer;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.mapred.FsInput;
+import org.apache.avro.util.Utf8;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -63,7 +64,7 @@ public class TestAvroImport extends Impo
     }
 
     args.add("--table");
-    args.add(HsqldbTestServer.getTableName());
+    args.add(getTableName());
     args.add("--connect");
     args.add(HsqldbTestServer.getUrl());
     args.add("--warehouse-dir");
@@ -75,13 +76,14 @@ public class TestAvroImport extends Impo
     return args.toArray(new String[0]);
   }
 
-  // this test just uses the two int table.
-  protected String getTableName() {
-    return HsqldbTestServer.getTableName();
-  }
-
   public void testAvroImport() throws IOException {
 
+    String [] types = { "BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE",
+        "VARCHAR(6)", "VARBINARY(2)", };
+    String [] vals = { "true", "100", "200", "1.0", "2.0",
+        "'s'", "'0102'", };
+    createTableWithColTypes(types, vals);
+
     runImport(getOutputArgv(true));
 
     Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
@@ -89,72 +91,50 @@ public class TestAvroImport extends Impo
     Schema schema = reader.getSchema();
     assertEquals(Schema.Type.RECORD, schema.getType());
     List<Field> fields = schema.getFields();
-    assertEquals(2, fields.size());
+    assertEquals(types.length, fields.size());
 
-    assertEquals("INTFIELD1", fields.get(0).name());
-    assertEquals(Schema.Type.UNION, fields.get(0).schema().getType());
-    assertEquals(Schema.Type.INT,
-            fields.get(0).schema().getTypes().get(0).getType());
-    assertEquals(Schema.Type.NULL,
-            fields.get(0).schema().getTypes().get(1).getType());
-
-    assertEquals("INTFIELD2", fields.get(1).name());
-    assertEquals(Schema.Type.UNION, fields.get(1).schema().getType());
-    assertEquals(Schema.Type.INT,
-            fields.get(1).schema().getTypes().get(0).getType());
-    assertEquals(Schema.Type.NULL,
-            fields.get(1).schema().getTypes().get(1).getType());
+    checkField(fields.get(0), "DATA_COL0", Schema.Type.BOOLEAN);
+    checkField(fields.get(1), "DATA_COL1", Schema.Type.INT);
+    checkField(fields.get(2), "DATA_COL2", Schema.Type.LONG);
+    checkField(fields.get(3), "DATA_COL3", Schema.Type.FLOAT);
+    checkField(fields.get(4), "DATA_COL4", Schema.Type.DOUBLE);
+    checkField(fields.get(5), "DATA_COL5", Schema.Type.STRING);
+    checkField(fields.get(6), "DATA_COL6", Schema.Type.BYTES);
 
     GenericRecord record1 = reader.next();
-    assertEquals(1, record1.get("INTFIELD1"));
-    assertEquals(8, record1.get("INTFIELD2"));
+    assertEquals("DATA_COL0", true, record1.get("DATA_COL0"));
+    assertEquals("DATA_COL1", 100, record1.get("DATA_COL1"));
+    assertEquals("DATA_COL2", 200L, record1.get("DATA_COL2"));
+    assertEquals("DATA_COL3", 1.0f, record1.get("DATA_COL3"));
+    assertEquals("DATA_COL4", 2.0, record1.get("DATA_COL4"));
+    assertEquals("DATA_COL5", new Utf8("s"), record1.get("DATA_COL5"));
+    Object object = record1.get("DATA_COL6");
+    assertTrue(object instanceof ByteBuffer);
+    ByteBuffer b = ((ByteBuffer) object);
+    assertEquals((byte) 1, b.get(0));
+    assertEquals((byte) 2, b.get(1));
+  }
+
+  private void checkField(Field field, String name, Type type) {
+    assertEquals(name, field.name());
+    assertEquals(Schema.Type.UNION, field.schema().getType());
+    assertEquals(type, field.schema().getTypes().get(0).getType());
+    assertEquals(Schema.Type.NULL, field.schema().getTypes().get(1).getType());
   }
 
   public void testNullableAvroImport() throws IOException, SQLException {
-    addNullRecord(); // Add a pair of NULL values to twointtable.
+    String [] types = { "INT" };
+    String [] vals = { null };
+    createTableWithColTypes(types, vals);
+
     runImport(getOutputArgv(true));
 
     Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
     DataFileReader<GenericRecord> reader = read(outputFile);
-    boolean foundNullRecord = false;
-
-    // Iterate thru the records in the output file til we find one that
-    // matches (NULL, NULL).
-    for (GenericRecord record : reader) {
-      LOG.debug("Input record: " + record);
-      if (record.get("INTFIELD1") == null && record.get("INTFIELD2") == null) {
-        LOG.debug("Got null record");
-        foundNullRecord = true;
-      }
-    }
 
-    assertTrue(foundNullRecord);
-  }
+    GenericRecord record1 = reader.next();
+    assertNull(record1.get("DATA_COL0"));
 
-  /**
-   * Add a record to the TWOINTTABLE that contains (NULL, NULL).
-   *
-   * @throws SQLException if there's a problem doing the INSERT statement.
-   */
-  private void addNullRecord() throws SQLException {
-    Connection connection = null;
-    Statement st = null;
-    try {
-      connection = this.getManager().getConnection();
-      st = connection.createStatement();
-      st.executeUpdate("INSERT INTO " + getTableName()
-          + " VALUES(NULL, NULL)");
-
-      connection.commit();
-    } finally {
-      if (null != st) {
-        st.close();
-      }
-
-      if (null != connection) {
-        connection.close();
-      }
-    }
   }
 
   private DataFileReader<GenericRecord> read(Path filename) throws IOException {



Mime
View raw message