phoenix-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jonathan Leech <jonat...@gmail.com>
Subject Re: Moving column family into new table
Date Thu, 19 Jan 2017 18:09:23 GMT
Do an explain on your query to confirm that it's doing a full scan and not a skip scan.

I typically use an in () clause instead of or, especially with compound keys. I have also
had to hint queries to use a skip scan, e.g /*+ SKIP_SCAN */.

Phoenix seems to do a very good job not reading data from column families that aren't needed
by the query, so I think your schema design is fine.

> On Jan 19, 2017, at 10:30 AM, Mark Heppner <heppner.mark@gmail.com> wrote:
> 
> Thanks for the quick reply, Josh!
> 
> For our demo cluster, we have 5 nodes, so the table was already set to 10 salt buckets.
I know you can increase the salt buckets after the table is created, but how do you change
the split points? The repartition in Spark seemed to be extremely inefficient, so we were
trying to skip it and keep the 400+ default partitions.
> 
> The biggest issue we're facing is that as Spark goes through the partitions during the
scan, it becomes exponentially slower towards the end. Around task 380/450, it slows down
to a halt, eventually timing out around 410 and getting killed. We have no idea if this is
something with Spark, YARN, or HBase, so that's why we were brainstorming with using the foreign
key-based layout, hoping that the files on HDFS would be more compacted.
> 
> We haven't noticed too much network overhead, nor have we seen CPU or RAM usage too high.
Our nodes are pretty big, 32 cores and 256 GB RAM each, connected on a 10 GbE network. Even
if our query is for 80-100 rows, the Spark job still slows to a crawl at the end, but that
should really only be about 80 MB of data it would be pulling out of Phoenix into the executors.
I guess we should have verified that the Phoenix+Spark plugin did achieve data locality, but
there isn't anything that says otherwise. Even though it doesn't have data locality, we have
no idea why it would progressively slow down as it reaches the end of the scan/filter.
> 
> The images are converted to a NumPy array, then saved as a binary string into Phoenix.
In Spark, this is fairly quick to convert the binary string back to the NumPy array. This
also allows us to use GET_BYTE() from Phoenix to extract specific values within the array,
without going through Spark at all. Do you have any other architecture recommendations for
our use case? Would storing the images directly in HBase be any better?
> 
>> On Thu, Jan 19, 2017 at 12:02 PM, Josh Mahonin <jmahonin@gmail.com> wrote:
>> Hi Mark,
>> 
>> At present, the Spark partitions are basically equivalent to the number of regions
in the underlying HBase table. This is typically something you can control yourself, either
using pre-splitting or salting (https://phoenix.apache.org/faq.html#Are_there_any_tips_for_optimizing_Phoenix).
Given that you have 450+ partitions though, it sounds like you should be able to achieve a
decent level or parallelism, but that's a knob you can fiddle with. It might also be useful
to look at Spark's "repartition" operation if you have idle Spark executors.
>> 
>> The partitioning is sort of orthogonal from the primary key layout and the resulting
query efficiency, but the strategy you've taken with your schema seems fairly sensible to
me. Given that your primary key is the 'id' field, the query you're using is going to be much
more efficient than, e.g., filtering on the 'title' column. Iterating on your schema and queries
using straight SQL and then applying that to Spark after is probably a good strategy here
to get more familiar with query performance.
>> 
>> If you're reading the binary 'data' column in Spark and seeing a lot of network overhead,
one thing to be aware of is the present Phoenix MR / Spark code isn't location aware, so executors
are likely reading big chunks of data from another node. There's a few patches in to address
this, but they're not in a released version yet:
>> 
>> https://issues.apache.org/jira/browse/PHOENIX-3600
>> https://issues.apache.org/jira/browse/PHOENIX-3601
>> 
>> Good luck!
>> 
>> Josh
>> 
>> 
>> 
>> 
>>> On Thu, Jan 19, 2017 at 11:30 AM, Mark Heppner <heppner.mark@gmail.com>
wrote:
>>> Our use case is to analyze images using Spark. The images are typically ~1MB
each, so in order to prevent the small files problem in HDFS, we went with HBase and Phoenix.
For 20+ million images and metadata, this has been working pretty well so far. Since this
is pretty new to us, we didn't create a robust design:
>>> 
>>> CREATE TABLE IF NOT EXISTS mytable
>>> (
>>>     id VARCHAR(36) NOT NULL PRIMARY KEY,
>>>     title VARCHAR,
>>>     ...
>>>     image.dtype VARCHAR(12),
>>>     image.width UNSIGNED_INT,
>>>     image.height UNSIGNED_INT,
>>>     image.data VARBINARY
>>> )
>>> 
>>> Most queries are on the metadata, so all of that is kept in the default column
family. Only the image data is stored in a secondary column family. Additional indexes are
created anyways, so the main table isn't usually touched.
>>> 
>>> We first run a Phoenix query to check if there are any matches. If so, then we
start a Spark job on the images. The primary keys are sent to the PySpark job, which then
grabs the images based on the primary keys:
>>> 
>>> df = sqlContext.read \
>>>     .format('org.apache.phoenix.spark') \
>>>     .option('table', 'mytable') \
>>>     .option('zkUrl', 'localhost:2181:/hbase-unsecure') \
>>>     .load()
>>> df.registerTempTable('mytable')
>>> 
>>> query = 
>>> df_imgs = sqlContext.sql(
>>>     'SELECT IMAGE FROM mytable WHERE ID = 1 OR ID = 2 ...'
>>> )
>>> 
>>> When this was first designed, we thought since the lookup was by primary key,
it would be smart enough to do a skip scan, but it appears to be doing a full scan. The df_imgs.rdd.getNumPartitions()
ends up being 450+, which matches up with the number of split files in HDFS.
>>> 
>>> Would it be better to use a foreign key and split the tables :
>>> 
>>> CREATE TABLE IF NOT EXISTS mytable
>>> (
>>>     id VARCHAR(36) NOT NULL PRIMARY KEY,
>>>     title VARCHAR,
>>>     image_id VARCHAR(36)
>>> )
>>> CREATE TABLE IF NOT EXISTS images
>>> (
>>>     image_id VARCHAR(36) NOT NULL PRIMARY KEY,
>>>     dtype VARCHAR(12),
>>>     width UNSIGNED_INT,
>>>     height UNSIGNED_INT,
>>>     data VARBINARY
>>> )
>>> 
>>> If the first query grabs the image_ids and send them to Spark, would Spark be
able to handle the query more efficiently?
>>> 
>>> If this is a better design, is there any way of moving the "image" column family
from "mytable" to the default column family of the new "images" table? Is it possible to create
the new table with the "image_id"s, make the foreign keys, then move the column family into
the new table?
>>> 
>>> 
>>> -- 
>>> Mark Heppner
>> 
> 
> 
> 
> -- 
> Mark Heppner

Mime
View raw message