phoenix-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alex Warshavsky <awarshav...@salesforce.com>
Subject Re: Apache Pig Integration with Phoenix
Date Tue, 08 Sep 2015 18:17:33 GMT
James,

Is async upsert/select query server route is an option now? Or is that
something that will have to be developed over time?

Cheers,
--Alex.

On Fri, Sep 4, 2015 at 10:44 PM, James Taylor <jamestaylor@apache.org>
wrote:

> Hi Alex,
> You can execute that pig script in a single UPSERT SELECT call in Phoenix
> and it'll perform far better. What's the motivation for using Pig? If the
> answer is "I don't want to tie up client threads for that amount of time",
> then one answer to that is to use our query server[1]. That'd still use one
> client thread, but with an UPSERT SELECT you'd be ok running it
> asynchronously (please file a JIRA for this if you're interested in
> pursuing it).
>
> FYI, our Pig integration does not support any aggregation in the LOAD
> command as Phoenix expects to do the final merge. It'd be interesting to
> explore being able to invoke the server-side aggregation and have the
> client-side final merge be done by Pig. I believe Pig exposes an API for
> that, but Phoenix doesn't have an API for a partial aggregation. We're
> looking into integrating with Apache Drill and I suspect this kind of thing
> will be possible through our Calcite integration.
>
> Thanks,
> James
>
> [1] https://phoenix.apache.org/server.html
>
> On Thu, Sep 3, 2015 at 11:05 AM, Alex Warshavsky <
> awarshavsky@salesforce.com> wrote:
>
>> Hello all,
>>
>> Expanding earlier email conversation to wider audience to share the
>> wealth. Please read email chain below to get the context if needed.
>>
>> Followup question. Here's a simple PIG script that loads data from
>> Phoenix, performs aggregation, and writes aggregations back to Phoenix.
>> Idea is to rollup counts every hour, hence the WHERE clause by EVENT_TIME.
>>
>> raw = LOAD 'hbase://query/SELECT TENANT_ID, USER_ID FROM LOGINS WHERE
>> EVENT_TIME > 8/1/2015 AND EVENT_TIME < 8/2/2015'
>>
>>
>> by_org_and_app = GROUP raw BY TENANT_ID, USER_ID
>>
>>
>> count_by_user = FOREACH by_org_and_app GENERATE group AS TENANT_ID,
>> USER_ID, COUNT(raw)
>>
>>
>> STORE count_by_user into 'hbase://METRICS’
>>
>> For script above, since GROUP follows the LOAD in PIG, is the best way to
>> push GROUP filter to the SELECT WHERE clause? I want to ensure that the
>> subsequent rollups are correct for each TENANT_ID/USER_ID without loading
>> the whole table.
>>
>>
>> raw = LOAD 'hbase://query/SELECT TENANT_ID, USER_ID FROM LOGINS WHERE
>> EVENT_TIME > 8/1/2015 AND EVENT_TIME < 8/2/2015' *AND TENANT_ID =
>> '123456789'*
>>
>>
>> by_org_and_app = GROUP raw BY TENANT_ID, USER_ID
>>
>> The flip side is that the idea was to use PIG to Map/Reduce aggregations
>> (rollups) across tenants rather than processing one TENANT at a time. If
>> tenants have to processed one at a time, a loop will have to be created
>> which I was trying to avoid.
>>
>> Thanks,
>> --Alex.
>>
>> On Thu, Aug 27, 2015 at 7:44 PM, Ravi Kiran wrote:
>>
>>> Hi Alex,
>>>    I believe the way Pig works is to apply predicate pushdown to LOAD
>>> and once the data is loaded , applies the operators like GROUP and COUNT.
>>> Hence, the rollup in your example above would happen as a subsequent step.
>>>    Currently, the PhoenixHBaseLoader doesn't implement
>>> LoadPredicatePushdown hence, any FILTER operators after LOAD doesn't get
>>> pushed to HBase. In these cases, its good to go with the SELECT query
>>> approach and have the WHERE clause to get the predicate pushdown feature.
>>>
>>> I also would recommend running the EXPLAIN operator on the script to see
>>> the execution plan.
>>>
>>> Sorry for the delay in my response, just got back from office.
>>>
>>> Regards
>>> Ravi
>>>
>>> On Thu, Aug 27, 2015 at 12:54 PM, Alex Warshavsky wrote:
>>>
>>>> Hi Ravi,
>>>>
>>>> Thanks a lot for your response. I wanted to clarify relationship
>>>> between splits and PIG operations like GROUP, COUNT, and DISTINCT.
>>>>
>>>> Let's talk about an example.
>>>>
>>>> In the example below, GROUPing happens to be by TENANT_ID and USER_ID.
>>>> When LOAD happens, does the split take care of the rollup being calculated
>>>> for all rows for a given TENANT_ID? Is the data read in chunks (by
>>>> split logic) and then rolled up using map/reduce for each of the groups?
>>>> What about cases like DISTINCT (count(distinct) equivalent) where
>>>> *all* of the data for a given group needs to be available to perform final
>>>> calculation.
>>>>
>>>> Effectively on Map/Reduce level we're talking about TENANT_ID and
>>>> USER_ID being the keys and Reduce happening for each key (COUNT).
>>>>
>>>> # Calculate number of logins per user for a given day. Rollup by
>>>> TENANT_ID.
>>>>
>>>> raw = LOAD 'hbase://query/SELECT TENANT_ID, USER_ID FROM LOGINS WHERE
>>>> EVENT_TIME > 8/1/2015 AND EVENT_TIME < 8/2/2015'
>>>>
>>>> by_org_and_app = GROUP raw BY TENANT_ID, USER_ID
>>>>
>>>> count_by_user = FOREACH by_org_and_app GENERATE group AS TENANT_ID,
>>>> USER_ID, COUNT(raw)
>>>>
>>>> STORE count_by_user into 'hbase://METRICS’
>>>>
>>>> Cheers,
>>>> --Alex.
>>>>
>>>> On Wed, Aug 26, 2015 at 7:42 PM, Ravi Kiran wrote:
>>>>
>>>>> Hi Soumen,
>>>>>
>>>>>    Below are the series of steps that happens for the given LOAD
>>>>> operator
>>>>>
>>>>> a) Parse the information passed to the LOAD operator and generate a
>>>>> SELECT query with the necessary columns requested. You can also pass
a
>>>>> SELECT query directly.
>>>>> b) From the SELECT query, we generate the QueryPlan and thereby get
>>>>> the splits.  This is all done in the PhoenixInputFormat.
>>>>> c) Each split pulls records from the table. Kindly note the splits
>>>>> dont map to region boundaries . They just have a start and stop keys.
>>>>> d) We then transform the returned columns onto Pig datatype . You can
>>>>> look further into TypeUtil .
>>>>>
>>>>> Coming to you questions,
>>>>>
>>>>>
>>>>>    - Could you suggest how the data is being loaded into Pig from
>>>>>    Phoenix/HBase? We are trying to evaluate whether there is data movement
>>>>>    from HBase to MapReduce hdfs during the Load.
>>>>>    *RAVI*: No data movement happens from Hbase to HDFS. Its a direct
>>>>>    SCAN from HBase table.
>>>>>
>>>>>    - Is the data processed in chunks? Is there a concept of batchSize
>>>>>    in LOAD function similar to STORE batchSize?
>>>>>    *RAVI*:  Yes. 1 chunk is 1 split.  I am not sure on the use of
>>>>>    batchSize as PIG cannot execute any downstream operators like GROUP
, JOIN
>>>>>    till all data is loaded. Please feel free to revert if I am not on
the same
>>>>>    page as you on this question.
>>>>>
>>>>>    - Can the Load be reused for multi-query on Phoenix? We are
>>>>>    looking to process the same load data using multiple Group+Store
>>>>>    *RAVI*:   I am hoping you are looking into using Multi-query
>>>>>    optimization of Pig. https://issues.apache.org/jira/browse/PIG-627
>>>>>    .  I would definitely recommend using that if you have multiple STORE.
>>>>>
>>>>> Please feel free to reach out to me if you need any information. I am
>>>>> HTH.
>>>>>
>>>>> Regards
>>>>> Ravi
>>>>>
>>>>> On Wed, Aug 26, 2015 at 4:31 PM, Soumen Bandyopadhyay wrote:
>>>>>
>>>>>> Hello Ravi,
>>>>>>
>>>>>> We are developers at Salesforce looking into the Pig integration
>>>>>> <https://phoenix.apache.org/pig_integration.html> with Phoenix.
We
>>>>>> are looking into the LOAD function and have some questions around
its
>>>>>> implementation.
>>>>>>
>>>>>>
>>>>>>    - Could you suggest how the data is being loaded into Pig from
>>>>>>    Phoenix/HBase? We are trying to evaluate whether there is data
movement
>>>>>>    from HBase to MapReduce hdfs during the Load.
>>>>>>    - Is the data processed in chunks? Is there a concept of
>>>>>>    batchSize in LOAD function similar to STORE batchSize?
>>>>>>    - Can the Load be reused for multi-query on Phoenix? We are
>>>>>>    looking to process the same load data using multiple Group+Store.
>>>>>>
>>>>>> Thank you for your input in this regard.
>>>>>>
>>>>>> --
>>>>>> Thanks,
>>>>>>
>>>>>> Soumen
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message