phoenix-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Abel Fernández <>
Subject Load HFiles in Apache Phoenix
Date Wed, 27 Apr 2016 19:17:33 GMT

I am trying to load files in Apache Phoenix using HFiles. I do not have a
csv so I need to load the Hfiles from a RDD.

My problem is that I am not able to see the files using the apache api
(select * from...) but when I do a scan of the table I see the files.

Do I need to include the empty column?

This is the code I am using:

class ExtendedProductRDDFunctions[A <: scala.Product](data:
org.apache.spark.rdd.RDD[A]) extends
  ProductRDDFunctions[A](data) with Serializable with Logging {

Create the Hfiles:
  def toHFile(
              sc: SparkContext,
              tableName: String,
              columns: Seq[String],
              conf: Configuration = new Configuration,
              zkUrl: Option[String] = None
              ): RDD[(ByteArrayWrapper, FamiliesQualifiersValues)] = {

    val config = ConfigurationUtil.getOutputConfiguration(tableName,
columns, zkUrl, Some(conf))
    val tableBytes = Bytes.toBytes(tableName)
    val jdbcUrl =
    val query = QueryUtil.constructUpsertStatement(tableName,
columns.toList.asJava, null)

    val columnsInfo = ConfigurationUtil.decodeColumns(config)
    val a = sc.broadcast(columnsInfo)

    logInfo("toHFile data size: "+data.count())
    data.flatMap(x => mapRow(x, jdbcUrl, tableBytes, query, a.value))

  def mapRow(product: Product,
             jdbcUrl: String,
             tableBytes: Array[Byte],
             query: String,
             columnsInfo: List[ColumnInfo]): List[(ByteArrayWrapper,
FamiliesQualifiersValues)] = {

    val conn = DriverManager.getConnection(jdbcUrl)
    var hRows:Iterator[(ByteArrayWrapper, FamiliesQualifiersValues)] = null
    val preparedStatement = conn.prepareStatement(query)

    val uncommittedDataIterator =
PhoenixRuntime.getUncommittedDataIterator(conn, true)
    hRows = uncommittedDataIterator.asScala
      .flatMap(kvPair => => createPut(kf)))


  private def createPut(keyValue:

    val key = new ByteArrayWrapper(keyValue.getRow)
    val family = new FamiliesQualifiersValues


  Load into Apache Phoenix
  val sortedRdd = rdd
          .keyBy(k => k._1.toString)
          .reduceByKey((key,value) => value)
          .map(v => v._2)

  def apacheBulkSave(hBaseContext: HBaseContext, table: String,outputPath:
String) ={
      f => f,

Un saludo - Best Regards.

View raw message