phoenix-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "sunfl@certusnet.com.cn" <su...@certusnet.com.cn>
Subject Local index related data bulkload
Date Tue, 09 Sep 2014 06:18:36 GMT
Hi all and rajeshbabu,
   Recently our job has encountered severe problems with trying to load data with local indexes
into phoenix. The data load performance looks very bad compared with our previous data
loading with gloable indexes. That seems quite absurd because phoenix local index targets

scenarios with heavy write and space constraint use case, which is just our job application.
   Observing stack trace during our job running, we can find the following info:
   

We then refer to the org.apache.phoenix.index.PhoenixIndexBuilder and commented the batchStarted
method. After recompiling the phoenix and restart cluster, 
our job loading performance get significant advance. Following is the code for batcStarted
method:
Here are my questions:
1 Can these code committor explain the concrete functionality for this method? Especially
concerning to local index data loading...
2 If we modify these codes (e.g. comment this method like what we do), are there any potential
influence for phoenix work?
3 More helpful work..Can any guys share their codes about how to complete data bulkload with
local indexes while data file are storaged within HDFS?
I know that CsvBulkload can do index related data upserting while map-reduce bulkload didnot
support that. Maybe our job is more likely to map-refuce bulkload? So, If someone 
had successfully done loading data through CsvBulkload using Spark and HDFS, please provide
us more kindly suggesion.

Best Regards,
Sun

/** 
* Index builder for covered-columns index that ties into phoenix for faster use. 
*/ 
public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder { 

@Override 
public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws
IOException { 
// The entire purpose of this method impl is to get the existing rows for the 
// table rows being indexed into the block cache, as the index maintenance code 
// does a point scan per row 
List<KeyRange> keys = Lists.newArrayListWithExpectedSize(miniBatchOp.size()); 
List<IndexMaintainer> maintainers = new ArrayList<IndexMaintainer>(); 
for (int i = 0; i < miniBatchOp.size(); i++) { 
Mutation m = miniBatchOp.getOperation(i); 
keys.add(PDataType.VARBINARY.getKeyRange(m.getRow())); 
maintainers.addAll(getCodec().getIndexMaintainers(m.getAttributesMap())); 
} 
Scan scan = IndexManagementUtil.newLocalStateScan(maintainers); 
ScanRanges scanRanges = ScanRanges.create(Collections.singletonList(keys), SchemaUtil.VAR_BINARY_SCHEMA);

scanRanges.setScanStartStopRow(scan); 
scan.setFilter(scanRanges.getSkipScanFilter()); 
HRegion region = this.env.getRegion(); 
RegionScanner scanner = region.getScanner(scan); 
// Run through the scanner using internal nextRaw method 
region.startRegionOperation(); 
try { 
boolean hasMore; 
do { 
List<Cell> results = Lists.newArrayList(); 
// Results are potentially returned even when the return value of s.next is false 
// since this is an indication of whether or not there are more values after the 
// ones returned 
hasMore = scanner.nextRaw(results);     
} while (hasMore); 
} finally { 
try { 
scanner.close(); 
} finally { 
region.closeRegionOperation(); 
} 
} 
}





Mime
View raw message