phoenix-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From James Taylor <jamestay...@apache.org>
Subject Re: Accessing phoenix tables in Spark 2
Date Fri, 07 Oct 2016 16:37:00 GMT
Hi Mich,
I'd encourage you to use this mechanism mentioned by Josh:
Another option is to use Phoenix-JDBC from within Spark Streaming. I've got
a toy example of using Spark streaming with Phoenix DataFrames, but it
could just as easily be a batched JDBC upsert.

Trying to write directly to HBase in a Phoenix-compliant way is likely
brittle, especially as Phoenix evolves. Josh's mechanism has the advantage
of still going through Phoenix APIs.

Thanks,
James

On Fri, Oct 7, 2016 at 9:24 AM, Mich Talebzadeh <mich.talebzadeh@gmail.com>
wrote:

> Thanks Josh, I will try your code as well.
>
> I wrote this simple program based on some code that directly creates or
> populates an Hbase table called "new" from Spark 2
>
> import org.apache.spark._
> import org.apache.spark.rdd.NewHadoopRDD
> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
> import org.apache.hadoop.hbase.client.HBaseAdmin
> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
> import org.apache.hadoop.fs.Path
> import org.apache.hadoop.hbase.HColumnDescriptor
> import org.apache.hadoop.hbase.util.Bytes
> import org.apache.hadoop.hbase.client.Put
> import org.apache.hadoop.hbase.client.HTable
> import scala.util.Random
> import scala.math._
> import org.apache.spark.sql.functions._
>  val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>   def randomString(chars: String, length: Int): String =
>     (0 until length).map(_ => chars(Random.nextInt(chars.
> length))).mkString
>     val chars = ('a' to 'z') ++ ('A' to 'Z')
>     val tableName = "new"
>     val conf = HBaseConfiguration.create()
>     // Add local HBase conf
>     //conf.addResource(new Path("file:////usr/lib/hbase/
> conf/hase-site.xml"))
>     conf.set(TableInputFormat.INPUT_TABLE, tableName)
>     // create this table with column family
>     val admin = new HBaseAdmin(conf)
>     if(!admin.isTableAvailable(tableName)) {
>       println("Creating table " + tableName)
>       val tableDesc = new HTableDescriptor(tableName)
>       tableDesc.addFamily(new HColumnDescriptor("cf1".getBytes()))
>       admin.createTable(tableDesc)
>     }else{
>       println("Table " + tableName + " already exists!!")
>     }
>       println("populating table")
>     //put data into table
>     val myTable = new HTable(conf, tableName)
>     for (i <- 0 to 99) {
>       val r = scala.util.Random.nextInt(100000000)
>       val c = randomString(chars.mkString(""),1)
>       val key = c+r.toString
>       val data =  randomString(chars.mkString(""),50)
>       //var p = new Put()
>       var p = new Put(new String(key).getBytes())
>           p.add("cf1".getBytes(), "column-1".getBytes(), new
> String(data).getBytes())
>           myTable.put(p)
>         }
>         myTable.flushCommits()
>         //create rdd
>     val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
> classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
> classOf[org.apache.hadoop.hbase.client.Result])
>     //get the row count
>     val count = hBaseRDD.count()
>     print("HBase RDD count:"+count+"\n")
>   println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
>     System.exit(0)
>   }
> I am no hands on programmer but it seems to work on Spark shell and with
> MVN as a jar file built :)
>
> hbase(main):265:0> scan 'new', 'LIMIT' => 5
> ROW                                                       COLUMN+CELL
>  A10179499
> column=cf1:column-1, timestamp=1475857020533, value=
> riEgIrLuHNKLUmMeEnWZwAWdIUMYqOTkDpqpxnKsnlccuDRvEE
>  A27318405
> column=cf1:column-1, timestamp=1475857115678, value=
> zpQWDjvPXobFkPspBxfTOefULkCidPGTjeLOzuxgLEcfzecVef
>  A44949791
> column=cf1:column-1, timestamp=1475856238280, value=
> kzeuRUCqWYBKXcbPRSWMZLqPpsrLvgkOMLjDArtdJkoOlPGKZs
>  A4682060
> column=cf1:column-1, timestamp=1475857115666, value=
> MTXnucpYRxKbYSVmTVaFtPteWAtxZEUeTMXPntsVLIsMGDghcs
>  A54369308
> column=cf1:column-1, timestamp=1475856238328, value=
> HGYCCAefvCTKWbSwlZxgEauInysLOjXHKauZevnEhZLCLvjDTz
> 5 row(s) in 0.0050 seconds
>
>
> Cheers
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 7 October 2016 at 16:24, Josh Mahonin <jmahonin@gmail.com> wrote:
>
>> Hi Mich,
>>
>> You're correct that the rowkey is the primary key, but if you're writing
>> to HBase directly and bypassing Phoenix, you'll have to be careful about
>> the construction of your row keys to adhere to the Phoenix data types and
>> row format. I don't think it's very well documented, but you might have
>> some luck by checking with the data type implementations here:
>> https://github.com/apache/phoenix/tree/master/phoenix-core/s
>> rc/main/java/org/apache/phoenix/schema/types
>>
>> Another option is to use Phoenix-JDBC from within Spark Streaming. I've
>> got a toy example of using Spark streaming with Phoenix DataFrames, but it
>> could just as easily be a batched JDBC upsert.
>> https://github.com/jmahonin/spark-streaming-phoenix/blob/mas
>> ter/src/main/scala/SparkStreamingPhoenix.scala
>>
>> Best of luck,
>>
>> Josh
>>
>> On Fri, Oct 7, 2016 at 10:28 AM, Mich Talebzadeh <
>> mich.talebzadeh@gmail.com> wrote:
>>
>>> Thank you all. very helpful.
>>>
>>> I have not tried the method Ciureanu suggested but will do so.
>>>
>>> Now I will be using Spark Streaming to populate Hbase table. I was
>>> hoping to do this through Phoenix but managed to write a script to write to
>>> Hbase table from Spark 2 itself.
>>>
>>> Having worked with Hbase I take the row key to be primary key, i.e.
>>> unique much like RDBMS (Oracle). Sounds like phoenix relies on that one
>>> when creating table on top of Hbase table. Is this assessment correct
>>> please?
>>>
>>> Thanks
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 7 October 2016 at 14:30, Ciureanu Constantin <
>>> ciureanu.constantin@gmail.com> wrote:
>>>
>>>> In Spark 1.4 it worked via JDBC - sure it would work in 1.6 / 2.0
>>>> without issues.
>>>>
>>>> Here's a sample code I used (it was getting data in parallel 24
>>>> partitions)
>>>>
>>>>
>>>> import org.apache.spark.SparkConf
>>>> import org.apache.spark.SparkContext
>>>>
>>>> import org.apache.spark.rdd.JdbcRDD
>>>> import java.sql.{Connection, DriverManager, ResultSet}
>>>>
>>>> sc.addJar("/usr/lib/hbase/hbase-protocol.jar")
>>>> sc.addJar("phoenix-x.y.z-bin/phoenix-core-x.y.z.jar")
>>>> sc.addJar("phoenix-x.y.z-bin/phoenix-x.y.z-client.jar")
>>>>
>>>> def createConnection() = {
>>>> Class.forName("org.apache.phoenix.jdbc.PhoenixDriver").newInstance();
>>>> DriverManager.getConnection("jdbc:phoenix:hd101.lps.stage,hd
>>>> 102.lps.stage,hd103.lps.stage"); // the Zookeeper quorum
>>>> }
>>>>
>>>> def extractValues(r: ResultSet) = {
>>>> (r.getLong(1),    // datum
>>>> r.getInt(2),  // pg
>>>> r.getString(3),  // HID
>>>> ....
>>>>  )
>>>> }
>>>>
>>>> val data = new JdbcRDD(sc, createConnection,
>>>> "SELECT DATUM, PG, HID,  ..... WHERE DATUM >= ? * 1000  AND DATUM <=
?
>>>> * 1000 and PG = <a value>",
>>>> lowerBound = 1364774400, upperBound = 1384774400, numPartitions = 24,
>>>> mapRow = extractValues)
>>>>
>>>> data.count()
>>>>
>>>> println(data.collect().toList)
>>>>
>>>>
>>>> 2016-10-07 15:20 GMT+02:00 Ted Yu <yuzhihong@gmail.com>:
>>>>
>>>>> JIRA on hbase side:
>>>>> HBASE-16179
>>>>>
>>>>> FYI
>>>>>
>>>>> On Fri, Oct 7, 2016 at 6:07 AM, Josh Mahonin <jmahonin@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Mich,
>>>>>>
>>>>>> There's an open ticket about this issue here:
>>>>>> https://issues.apache.org/jira/browse/PHOENIX-3333
>>>>>>
>>>>>> Long story short, Spark changed their API (again), breaking the
>>>>>> existing integration. I'm not sure the level of effort to get it
working
>>>>>> with Spark 2.0, but based on examples from other projects, it looks
like
>>>>>> there's a fair bit of Maven module work to support both Spark 1.x
and Spark
>>>>>> 2.x concurrently in the same project. Patches are very welcome!
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Josh
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 8:33 AM, Mich Talebzadeh <
>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Has anyone managed to read phoenix table in Spark 2 by any chance
>>>>>>> please?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> Dr Mich Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property
which may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary
damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message