phoenix-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From James Taylor <jamestay...@apache.org>
Subject Re: Load HFiles in Apache Phoenix
Date Wed, 27 Apr 2016 23:44:46 GMT
Hi Abel,
Yes, you need to either include the empty key value or you need to declare
your table as a view instead of a table (in which case it'd be read-only).
Thanks,
James

On Wed, Apr 27, 2016 at 12:17 PM, Abel Fernández <mevsmyself@gmail.com>
wrote:

> Hi,
>
> I am trying to load files in Apache Phoenix using HFiles. I do not have a
> csv so I need to load the Hfiles from a RDD.
>
> My problem is that I am not able to see the files using the apache api
> (select * from...) but when I do a scan of the table I see the files.
>
> Do I need to include the empty column?
>
> This is the code I am using:
>
> class ExtendedProductRDDFunctions[A <: scala.Product](data:
> org.apache.spark.rdd.RDD[A]) extends
>   ProductRDDFunctions[A](data) with Serializable with Logging {
>
> Create the Hfiles:
> -------------------
>   def toHFile(
>               sc: SparkContext,
>               tableName: String,
>               columns: Seq[String],
>               conf: Configuration = new Configuration,
>               zkUrl: Option[String] = None
>               ): RDD[(ByteArrayWrapper, FamiliesQualifiersValues)] = {
>
>     val config = ConfigurationUtil.getOutputConfiguration(tableName,
> columns, zkUrl, Some(conf))
>     val tableBytes = Bytes.toBytes(tableName)
>     ConfigurationUtil.encodeColumns(config)
>     val jdbcUrl = zkUrl.map(getJdbcUrl).getOrElse(getJdbcUrl(config))
>     val query = QueryUtil.constructUpsertStatement(tableName,
> columns.toList.asJava, null)
>
>     val columnsInfo = ConfigurationUtil.decodeColumns(config)
>     val a = sc.broadcast(columnsInfo)
>
>     logInfo("toHFile data size: "+data.count())
>     data.flatMap(x => mapRow(x, jdbcUrl, tableBytes, query, a.value))
>   }
>
>   def mapRow(product: Product,
>              jdbcUrl: String,
>              tableBytes: Array[Byte],
>              query: String,
>              columnsInfo: List[ColumnInfo]): List[(ByteArrayWrapper,
> FamiliesQualifiersValues)] = {
>
>     val conn = DriverManager.getConnection(jdbcUrl)
>     var hRows:Iterator[(ByteArrayWrapper, FamiliesQualifiersValues)] = null
>     val preparedStatement = conn.prepareStatement(query)
>
>
> columnsInfo.zip(product.productIterator.toList).zipWithIndex.foreach(setInStatement(preparedStatement))
>     preparedStatement.execute()
>
>     val uncommittedDataIterator =
> PhoenixRuntime.getUncommittedDataIterator(conn, true)
>     hRows = uncommittedDataIterator.asScala
>       .flatMap(kvPair => kvPair.getSecond.asScala.map(kf => createPut(kf)))
>
>     conn.rollback()
>     hRows.toList
>   }
>
>   private def createPut(keyValue:
> KeyValue):(ByteArrayWrapper,FamiliesQualifiersValues)={
>
>     val key = new ByteArrayWrapper(keyValue.getRow)
>     val family = new FamiliesQualifiersValues
>
>     family.+=(keyValue.getFamily,keyValue.getQualifier,keyValue.getValue)
>     (key,family)
>   }
>   }
>
>   Load into Apache Phoenix
>   -------------------------
>   val sortedRdd = rdd
>           .keyBy(k => k._1.toString)
>           .reduceByKey((key,value) => value)
>           .map(v => v._2)
>
>   def apacheBulkSave(hBaseContext: HBaseContext, table: String,outputPath:
> String) ={
>     rdd.hbaseBulkLoadThinRows(hBaseContext,
>       TableName.valueOf(table),
>       f => f,
>       outputPath
>     )
>   }
>
> --
> Un saludo - Best Regards.
> Abel
>

Mime
View raw message