phoenix-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Accessing phoenix tables in Spark 2
Date Fri, 07 Oct 2016 18:35:51 GMT
thanks  again all.

My primary objective was to write to Hbase directly from Spark streaming
and Phoenix was really the catalyst here.

My point being that if I manage to write directly from Spark streaming to
Hbase would that being a better option.

FYI, I can read from phoenix table on Hbase using IDBC or Zeppelin  on
Phoenix through JDBC with no problem

thanks


Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 7 October 2016 at 17:37, James Taylor <jamestaylor@apache.org> wrote:

> Hi Mich,
> I'd encourage you to use this mechanism mentioned by Josh:
> Another option is to use Phoenix-JDBC from within Spark Streaming. I've
> got a toy example of using Spark streaming with Phoenix DataFrames, but it
> could just as easily be a batched JDBC upsert.
>
> Trying to write directly to HBase in a Phoenix-compliant way is likely
> brittle, especially as Phoenix evolves. Josh's mechanism has the advantage
> of still going through Phoenix APIs.
>
> Thanks,
> James
>
> On Fri, Oct 7, 2016 at 9:24 AM, Mich Talebzadeh <mich.talebzadeh@gmail.com
> > wrote:
>
>> Thanks Josh, I will try your code as well.
>>
>> I wrote this simple program based on some code that directly creates or
>> populates an Hbase table called "new" from Spark 2
>>
>> import org.apache.spark._
>> import org.apache.spark.rdd.NewHadoopRDD
>> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
>> import org.apache.hadoop.hbase.client.HBaseAdmin
>> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
>> import org.apache.hadoop.fs.Path
>> import org.apache.hadoop.hbase.HColumnDescriptor
>> import org.apache.hadoop.hbase.util.Bytes
>> import org.apache.hadoop.hbase.client.Put
>> import org.apache.hadoop.hbase.client.HTable
>> import scala.util.Random
>> import scala.math._
>> import org.apache.spark.sql.functions._
>>  val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>   def randomString(chars: String, length: Int): String =
>>     (0 until length).map(_ => chars(Random.nextInt(chars.len
>> gth))).mkString
>>     val chars = ('a' to 'z') ++ ('A' to 'Z')
>>     val tableName = "new"
>>     val conf = HBaseConfiguration.create()
>>     // Add local HBase conf
>>     //conf.addResource(new Path("file:////usr/lib/hbase/c
>> onf/hase-site.xml"))
>>     conf.set(TableInputFormat.INPUT_TABLE, tableName)
>>     // create this table with column family
>>     val admin = new HBaseAdmin(conf)
>>     if(!admin.isTableAvailable(tableName)) {
>>       println("Creating table " + tableName)
>>       val tableDesc = new HTableDescriptor(tableName)
>>       tableDesc.addFamily(new HColumnDescriptor("cf1".getBytes()))
>>       admin.createTable(tableDesc)
>>     }else{
>>       println("Table " + tableName + " already exists!!")
>>     }
>>       println("populating table")
>>     //put data into table
>>     val myTable = new HTable(conf, tableName)
>>     for (i <- 0 to 99) {
>>       val r = scala.util.Random.nextInt(100000000)
>>       val c = randomString(chars.mkString(""),1)
>>       val key = c+r.toString
>>       val data =  randomString(chars.mkString(""),50)
>>       //var p = new Put()
>>       var p = new Put(new String(key).getBytes())
>>           p.add("cf1".getBytes(), "column-1".getBytes(), new
>> String(data).getBytes())
>>           myTable.put(p)
>>         }
>>         myTable.flushCommits()
>>         //create rdd
>>     val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],clas
>> sOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],class
>> Of[org.apache.hadoop.hbase.client.Result])
>>     //get the row count
>>     val count = hBaseRDD.count()
>>     print("HBase RDD count:"+count+"\n")
>>   println ("\nFinished at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>> ").collect.foreach(println)
>>     System.exit(0)
>>   }
>> I am no hands on programmer but it seems to work on Spark shell and with
>> MVN as a jar file built :)
>>
>> hbase(main):265:0> scan 'new', 'LIMIT' => 5
>> ROW                                                       COLUMN+CELL
>>  A10179499
>> column=cf1:column-1, timestamp=1475857020533, value=riEgIrLuHNKLUmMeEnWZwAWd
>> IUMYqOTkDpqpxnKsnlccuDRvEE
>>  A27318405
>> column=cf1:column-1, timestamp=1475857115678, value=zpQWDjvPXobFkPspBxfTOefU
>> LkCidPGTjeLOzuxgLEcfzecVef
>>  A44949791
>> column=cf1:column-1, timestamp=1475856238280, value=kzeuRUCqWYBKXcbPRSWMZLqP
>> psrLvgkOMLjDArtdJkoOlPGKZs
>>  A4682060
>> column=cf1:column-1, timestamp=1475857115666, value=MTXnucpYRxKbYSVmTVaFtPte
>> WAtxZEUeTMXPntsVLIsMGDghcs
>>  A54369308
>> column=cf1:column-1, timestamp=1475856238328, value=HGYCCAefvCTKWbSwlZxgEauI
>> nysLOjXHKauZevnEhZLCLvjDTz
>> 5 row(s) in 0.0050 seconds
>>
>>
>> Cheers
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 7 October 2016 at 16:24, Josh Mahonin <jmahonin@gmail.com> wrote:
>>
>>> Hi Mich,
>>>
>>> You're correct that the rowkey is the primary key, but if you're writing
>>> to HBase directly and bypassing Phoenix, you'll have to be careful about
>>> the construction of your row keys to adhere to the Phoenix data types and
>>> row format. I don't think it's very well documented, but you might have
>>> some luck by checking with the data type implementations here:
>>> https://github.com/apache/phoenix/tree/master/phoenix-core/s
>>> rc/main/java/org/apache/phoenix/schema/types
>>>
>>> Another option is to use Phoenix-JDBC from within Spark Streaming. I've
>>> got a toy example of using Spark streaming with Phoenix DataFrames, but it
>>> could just as easily be a batched JDBC upsert.
>>> https://github.com/jmahonin/spark-streaming-phoenix/blob/mas
>>> ter/src/main/scala/SparkStreamingPhoenix.scala
>>>
>>> Best of luck,
>>>
>>> Josh
>>>
>>> On Fri, Oct 7, 2016 at 10:28 AM, Mich Talebzadeh <
>>> mich.talebzadeh@gmail.com> wrote:
>>>
>>>> Thank you all. very helpful.
>>>>
>>>> I have not tried the method Ciureanu suggested but will do so.
>>>>
>>>> Now I will be using Spark Streaming to populate Hbase table. I was
>>>> hoping to do this through Phoenix but managed to write a script to write
to
>>>> Hbase table from Spark 2 itself.
>>>>
>>>> Having worked with Hbase I take the row key to be primary key, i.e.
>>>> unique much like RDBMS (Oracle). Sounds like phoenix relies on that one
>>>> when creating table on top of Hbase table. Is this assessment correct
>>>> please?
>>>>
>>>> Thanks
>>>>
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>> On 7 October 2016 at 14:30, Ciureanu Constantin <
>>>> ciureanu.constantin@gmail.com> wrote:
>>>>
>>>>> In Spark 1.4 it worked via JDBC - sure it would work in 1.6 / 2.0
>>>>> without issues.
>>>>>
>>>>> Here's a sample code I used (it was getting data in parallel 24
>>>>> partitions)
>>>>>
>>>>>
>>>>> import org.apache.spark.SparkConf
>>>>> import org.apache.spark.SparkContext
>>>>>
>>>>> import org.apache.spark.rdd.JdbcRDD
>>>>> import java.sql.{Connection, DriverManager, ResultSet}
>>>>>
>>>>> sc.addJar("/usr/lib/hbase/hbase-protocol.jar")
>>>>> sc.addJar("phoenix-x.y.z-bin/phoenix-core-x.y.z.jar")
>>>>> sc.addJar("phoenix-x.y.z-bin/phoenix-x.y.z-client.jar")
>>>>>
>>>>> def createConnection() = {
>>>>> Class.forName("org.apache.phoenix.jdbc.PhoenixDriver").newInstance();
>>>>> DriverManager.getConnection("jdbc:phoenix:hd101.lps.stage,hd
>>>>> 102.lps.stage,hd103.lps.stage"); // the Zookeeper quorum
>>>>> }
>>>>>
>>>>> def extractValues(r: ResultSet) = {
>>>>> (r.getLong(1),    // datum
>>>>> r.getInt(2),  // pg
>>>>> r.getString(3),  // HID
>>>>> ....
>>>>>  )
>>>>> }
>>>>>
>>>>> val data = new JdbcRDD(sc, createConnection,
>>>>> "SELECT DATUM, PG, HID,  ..... WHERE DATUM >= ? * 1000  AND DATUM
<= ?
>>>>> * 1000 and PG = <a value>",
>>>>> lowerBound = 1364774400, upperBound = 1384774400, numPartitions = 24,
>>>>> mapRow = extractValues)
>>>>>
>>>>> data.count()
>>>>>
>>>>> println(data.collect().toList)
>>>>>
>>>>>
>>>>> 2016-10-07 15:20 GMT+02:00 Ted Yu <yuzhihong@gmail.com>:
>>>>>
>>>>>> JIRA on hbase side:
>>>>>> HBASE-16179
>>>>>>
>>>>>> FYI
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 6:07 AM, Josh Mahonin <jmahonin@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Mich,
>>>>>>>
>>>>>>> There's an open ticket about this issue here:
>>>>>>> https://issues.apache.org/jira/browse/PHOENIX-3333
>>>>>>>
>>>>>>> Long story short, Spark changed their API (again), breaking the
>>>>>>> existing integration. I'm not sure the level of effort to get
it working
>>>>>>> with Spark 2.0, but based on examples from other projects, it
looks like
>>>>>>> there's a fair bit of Maven module work to support both Spark
1.x and Spark
>>>>>>> 2.x concurrently in the same project. Patches are very welcome!
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Josh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 8:33 AM, Mich Talebzadeh <
>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Has anyone managed to read phoenix table in Spark 2 by any
chance
>>>>>>>> please?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>
>>>>>>>>
>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>> for any loss, damage or destruction of data or any other
property which may
>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>> disclaimed. The author will in no case be liable for any
monetary damages
>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message