phoenix-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From James Taylor <jamestay...@apache.org>
Subject Re: Spark-Phoenix Plugin
Date Mon, 06 Aug 2018 15:02:34 GMT
For the UPSERTs on a PreparedStatement that are done by Phoenix for writing
in the Spark adapter, not that these are *not* doing RPCs to the HBase
server to write data (i.e. they are never committed). Instead the UPSERTs
are used to ensure that the correct serialization is performed given the
Phoenix schema. We use a PhoenixRuntime API to get the List<Cell> from the
uncommitted data and then perform a rollback. Using this technique,
features like salting, column encoding, row timestamp, etc. will continue
to work with the Spark integration.

Thanks,
James

On Mon, Aug 6, 2018 at 7:44 AM, Jaanai Zhang <cloud.poster@gmail.com> wrote:

> you can get better performance if directly read/write HBase. you also use
> spark-phoenix, this is an example, reading data from CSV file and writing
> into Phoenix table:
>
> def main(args: Array[String]): Unit = {
>
>   val sc = new SparkContext("local", "phoenix-test")
>   val path = "/tmp/data"
>   val hbaseConnectionString = "host1,host2,host3"
>   val customSchema = StructType(Array(
>     StructField("O_ORDERKEY", StringType, true),
>     StructField("O_CUSTKEY", StringType, true),
>     StructField("O_ORDERSTATUS", StringType, true),
>     StructField("O_TOTALPRICE", StringType, true),
>     StructField("O_ORDERDATE", StringType, true),
>     StructField("O_ORDERPRIORITY", StringType, true),
>     StructField("O_CLERK", StringType, true),
>     StructField("O_SHIPPRIORITY", StringType, true),
>     StructField("O_COMMENT", StringType, true)))
>
>   //    import com.databricks.spark.csv._
>   val sqlContext = new SQLContext(sc)
>
>   val df = sqlContext.read
>     .format("com.databricks.spark.csv")
>     .option("delimiter", "|")
>     .option("header", "false")
>     .schema(customSchema)
>     .load(path)
>
>   val start = System.currentTimeMillis()
>   df.write.format("org.apache.phoenix.spark")
>     .mode("overwrite")
>     .option("table", "DATAX")
>     .option("zkUrl", hbaseConnectionString)
>     .save()
>
>   val end = System.currentTimeMillis()
>   print("taken time:" + ((end - start) / 1000) + "s")
> }
>
>
>
>
> ----------------------------------------
>    Yun Zhang
>    Best regards!
>
>
> 2018-08-06 20:10 GMT+08:00 Brandon Geise <brandongeise@gmail.com>:
>
>> Thanks for the reply Yun.
>>
>>
>>
>> I’m not quite clear how this would exactly help on the upsert side?  Are
>> you suggesting deriving the type from Phoenix then doing the
>> encoding/decoding and writing/reading directly from HBase?
>>
>>
>>
>> Thanks,
>>
>> Brandon
>>
>>
>>
>> *From: *Jaanai Zhang <cloud.poster@gmail.com>
>> *Reply-To: *<user@phoenix.apache.org>
>> *Date: *Sunday, August 5, 2018 at 9:34 PM
>> *To: *<user@phoenix.apache.org>
>> *Subject: *Re: Spark-Phoenix Plugin
>>
>>
>>
>> You can get data type from Phoenix meta, then encode/decode data to
>> write/read data. I think this way is effective, FYI :)
>>
>>
>>
>>
>> ----------------------------------------
>>
>>    Yun Zhang
>>
>>    Best regards!
>>
>>
>>
>>
>>
>> 2018-08-04 21:43 GMT+08:00 Brandon Geise <brandongeise@gmail.com>:
>>
>> Good morning,
>>
>>
>>
>> I’m looking at using a combination of Hbase, Phoenix and Spark for a
>> project and read that using the Spark-Phoenix plugin directly is more
>> efficient than JDBC, however it wasn’t entirely clear from examples when
>> writing a dataframe if an upsert is performed and how much fine-grained
>> options there are for executing the upsert.  Any information someone can
>> share would be greatly appreciated!
>>
>>
>>
>>
>>
>> Thanks,
>>
>> Brandon
>>
>>
>>
>
>

Mime
View raw message