phoenix-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alex Warshavsky <awarshav...@salesforce.com>
Subject Re: Apache Pig Integration with Phoenix
Date Thu, 03 Sep 2015 18:05:50 GMT
Hello all,

Expanding earlier email conversation to wider audience to share the wealth.
Please read email chain below to get the context if needed.

Followup question. Here's a simple PIG script that loads data from Phoenix,
performs aggregation, and writes aggregations back to Phoenix. Idea is to
rollup counts every hour, hence the WHERE clause by EVENT_TIME.

raw = LOAD 'hbase://query/SELECT TENANT_ID, USER_ID FROM LOGINS WHERE
EVENT_TIME > 8/1/2015 AND EVENT_TIME < 8/2/2015'


by_org_and_app = GROUP raw BY TENANT_ID, USER_ID


count_by_user = FOREACH by_org_and_app GENERATE group AS TENANT_ID,
USER_ID, COUNT(raw)


STORE count_by_user into 'hbase://METRICS’

For script above, since GROUP follows the LOAD in PIG, is the best way to
push GROUP filter to the SELECT WHERE clause? I want to ensure that the
subsequent rollups are correct for each TENANT_ID/USER_ID without loading
the whole table.


raw = LOAD 'hbase://query/SELECT TENANT_ID, USER_ID FROM LOGINS WHERE
EVENT_TIME > 8/1/2015 AND EVENT_TIME < 8/2/2015' *AND TENANT_ID =
'123456789'*


by_org_and_app = GROUP raw BY TENANT_ID, USER_ID

The flip side is that the idea was to use PIG to Map/Reduce aggregations
(rollups) across tenants rather than processing one TENANT at a time. If
tenants have to processed one at a time, a loop will have to be created
which I was trying to avoid.

Thanks,
--Alex.

On Thu, Aug 27, 2015 at 7:44 PM, Ravi Kiran wrote:

> Hi Alex,
>    I believe the way Pig works is to apply predicate pushdown to LOAD and
> once the data is loaded , applies the operators like GROUP and COUNT.
> Hence, the rollup in your example above would happen as a subsequent step.
>    Currently, the PhoenixHBaseLoader doesn't implement
> LoadPredicatePushdown hence, any FILTER operators after LOAD doesn't get
> pushed to HBase. In these cases, its good to go with the SELECT query
> approach and have the WHERE clause to get the predicate pushdown feature.
>
> I also would recommend running the EXPLAIN operator on the script to see
> the execution plan.
>
> Sorry for the delay in my response, just got back from office.
>
> Regards
> Ravi
>
> On Thu, Aug 27, 2015 at 12:54 PM, Alex Warshavsky wrote:
>
>> Hi Ravi,
>>
>> Thanks a lot for your response. I wanted to clarify relationship between
>> splits and PIG operations like GROUP, COUNT, and DISTINCT.
>>
>> Let's talk about an example.
>>
>> In the example below, GROUPing happens to be by TENANT_ID and USER_ID.
>> When LOAD happens, does the split take care of the rollup being calculated
>> for all rows for a given TENANT_ID? Is the data read in chunks (by
>> split logic) and then rolled up using map/reduce for each of the groups?
>> What about cases like DISTINCT (count(distinct) equivalent) where *all*
>> of the data for a given group needs to be available to perform final
>> calculation.
>>
>> Effectively on Map/Reduce level we're talking about TENANT_ID and USER_ID being
>> the keys and Reduce happening for each key (COUNT).
>>
>> # Calculate number of logins per user for a given day. Rollup by
>> TENANT_ID.
>>
>> raw = LOAD 'hbase://query/SELECT TENANT_ID, USER_ID FROM LOGINS WHERE
>> EVENT_TIME > 8/1/2015 AND EVENT_TIME < 8/2/2015'
>>
>> by_org_and_app = GROUP raw BY TENANT_ID, USER_ID
>>
>> count_by_user = FOREACH by_org_and_app GENERATE group AS TENANT_ID,
>> USER_ID, COUNT(raw)
>>
>> STORE count_by_user into 'hbase://METRICS’
>>
>> Cheers,
>> --Alex.
>>
>> On Wed, Aug 26, 2015 at 7:42 PM, Ravi Kiran wrote:
>>
>>> Hi Soumen,
>>>
>>>    Below are the series of steps that happens for the given LOAD operator
>>>
>>> a) Parse the information passed to the LOAD operator and generate a
>>> SELECT query with the necessary columns requested. You can also pass a
>>> SELECT query directly.
>>> b) From the SELECT query, we generate the QueryPlan and thereby get the
>>> splits.  This is all done in the PhoenixInputFormat.
>>> c) Each split pulls records from the table. Kindly note the splits dont
>>> map to region boundaries . They just have a start and stop keys.
>>> d) We then transform the returned columns onto Pig datatype . You can
>>> look further into TypeUtil .
>>>
>>> Coming to you questions,
>>>
>>>
>>>    - Could you suggest how the data is being loaded into Pig from
>>>    Phoenix/HBase? We are trying to evaluate whether there is data movement
>>>    from HBase to MapReduce hdfs during the Load.
>>>    *RAVI*: No data movement happens from Hbase to HDFS. Its a direct
>>>    SCAN from HBase table.
>>>
>>>    - Is the data processed in chunks? Is there a concept of batchSize
>>>    in LOAD function similar to STORE batchSize?
>>>    *RAVI*:  Yes. 1 chunk is 1 split.  I am not sure on the use of
>>>    batchSize as PIG cannot execute any downstream operators like GROUP , JOIN
>>>    till all data is loaded. Please feel free to revert if I am not on the same
>>>    page as you on this question.
>>>
>>>    - Can the Load be reused for multi-query on Phoenix? We are looking
>>>    to process the same load data using multiple Group+Store
>>>    *RAVI*:   I am hoping you are looking into using Multi-query
>>>    optimization of Pig. https://issues.apache.org/jira/browse/PIG-627
>>>    .  I would definitely recommend using that if you have multiple STORE.
>>>
>>> Please feel free to reach out to me if you need any information. I am
>>> HTH.
>>>
>>> Regards
>>> Ravi
>>>
>>> On Wed, Aug 26, 2015 at 4:31 PM, Soumen Bandyopadhyay wrote:
>>>
>>>> Hello Ravi,
>>>>
>>>> We are developers at Salesforce looking into the Pig integration
>>>> <https://phoenix.apache.org/pig_integration.html> with Phoenix. We
are
>>>> looking into the LOAD function and have some questions around its
>>>> implementation.
>>>>
>>>>
>>>>    - Could you suggest how the data is being loaded into Pig from
>>>>    Phoenix/HBase? We are trying to evaluate whether there is data movement
>>>>    from HBase to MapReduce hdfs during the Load.
>>>>    - Is the data processed in chunks? Is there a concept of batchSize
>>>>    in LOAD function similar to STORE batchSize?
>>>>    - Can the Load be reused for multi-query on Phoenix? We are looking
>>>>    to process the same load data using multiple Group+Store.
>>>>
>>>> Thank you for your input in this regard.
>>>>
>>>> --
>>>> Thanks,
>>>>
>>>> Soumen
>>>>
>>>
>>>
>>
>

Mime
View raw message