phoenix-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akhil jain <akhilcancer...@gmail.com>
Subject Phoenix custom UDF
Date Sat, 01 Oct 2016 16:19:55 GMT
I am using hbase 1.1 with phoenix 4.8. I have a table with columns whose
datatype is 'VARBINARY'.
The data in these columns is compressed float[] in form of ByteBuffer
called DenseVector which is an ordered set of 16 bit IEEE floats of
cardinality no more than 3996.
I have loaded data into phoenix tables through spark-phoenix plugin. Just
to give an idea the mapreduce jobs write data in hive in parquet gzip
format. I read data into a dataframe using sqlContext.parquetFile() ,
register it as temp table and run a sqlContext.sql("select query ...")
query and finally calling res.save("org.apache.phoenix.spark",
SaveMode.Overwrite, Map("table" -> "SITEFLOWTABLE" ,"zkUrl" ->
"localhost:2181"))
We have a hive/shark UDF(code pasted below) that can decode these
ByteBuffer columns and display them in readable float[]. So this UDF works
on spark-shell.
Now I just want to write a similar UDF in phoenix and run queries as "
select uplinkcostbuffer,DENSEVECTORUDF(uplinkcostbuffer) from
siteflowtable" and further write UDAFs over it.
How do I make phoenix UDF return float[] ?? I have tried a lot many things
but none seem to work.

Below is the code for hive/shark UDF-
------------------------------------------------------------
------------------------------
package com.ABCD.densevectorudf;

import java.nio.ByteBuffer;
import java.util.Vector;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.
BinaryObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.
PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.
StringObjectInspector;
import org.apache.hadoop.io.FloatWritable;

import com.ABCD.common.attval.IDenseVectorOperator;
import com.ABCD.common.attval.Utility;
import com.ABCD.common.attval.array.BufferOperations;
import com.ABCD.common.attval.array.FloatArrayFactory;

@Description(name = "DenseVectorUDF",
value = "Dense Vector UDF in Hive / Shark\n"
+ "_FUNC_(binary|hex) - "
+ "Returns the dense vector array<float> value\n",
extended = "Dense Vector UDAF in Hive / Shark")

public class DenseVectorUDF extends GenericUDF {
private static final Log LOG = LogFactory.getLog(
DenseVectorUDF.class.getName());
private ObjectInspector inputOI;
private ListObjectInspector outputOI;

@Override
public String getDisplayString(String[] children) {
StringBuilder sb = new StringBuilder();
sb.append("densevectorudf(");
for (int i = 0; i < children.length; i++) {
sb.append(children[i]);
if (i + 1 != children.length) {
sb.append(",");
}
}
sb.append(")");
return sb.toString();
}

@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws
UDFArgumentException {
if (arguments.length == 1) {
ObjectInspector first = arguments[0];
if (!(first instanceof StringObjectInspector) && !(first instanceof
BinaryObjectInspector)) {
LOG.error("first argument must be a either binary or hex buffer");
throw new UDFArgumentException("first argument must be a either binary or
hex buffer");
}
inputOI = first;
outputOI = ObjectInspectorFactory.getStandardListObjectInspector(
PrimitiveObjectInspectorFactory.writableFloatObjectInspector);
} else {
throw new UDFArgumentLengthException("Wrong argument length is passed.
Arguments length is NOT supported.");
}
return outputOI;
}

@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
IDenseVectorOperator idv = FloatArrayFactory.getFloatArray();
Object object = arguments[0].get();
Vector<Float> floatVector = null;
ByteBuffer buff = null;
if (inputOI instanceof StringObjectInspector) {
String hex = ((StringObjectInspector) inputOI).
getPrimitiveJavaObject(object);
buff = ByteBuffer.wrap(Utility.hexToBytes(hex));
} else if (inputOI instanceof BinaryObjectInspector) {
byte[] bytes = ((BinaryObjectInspector) inputOI).
getPrimitiveJavaObject(object);
buff = ByteBuffer.wrap(bytes);
}
floatVector = idv.getElements(buff);
Object red [] = new Object[floatVector.size()];
for(int index = 0; index < red.length; index++){
red[index] = new FloatWritable(floatVector.get(index));
}
LOG.info("Buffer header = " + BufferOperations.stringifyBuffer(buff));
return red;
}
}

------------------------------------------------------------
------------------------------


Following is the code I have written for Phoenix UDF-
------------------------------------------------------------
------------------------------
package org.apache.phoenix.expression.function;

import com.ABCD.common.attval.IDenseVectorOperator;
import com.ABCD.common.attval.array.BufferOperations;
import com.ABCD.common.attval.array.FloatArrayFactory;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.
PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.FloatWritable;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.hive.objectinspector.PhoenixBinaryObjectInspector;
import org.apache.phoenix.parse.FunctionParseNode.Argument;
import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PFloatArray;
import org.apache.phoenix.schema.types.PVarbinary;

import java.nio.ByteBuffer;
import java.nio.FloatBuffer;
import java.sql.SQLException;
import java.util.List;
import java.util.Vector;

@BuiltInFunction(name = DenseVectorFunction.NAME, args = {
        @Argument(allowedTypes = {PVarbinary.class})})
public class DenseVectorFunction extends ScalarFunction {
    public static final String NAME = "DenseVectorFunction";
    private ListObjectInspector outputOI;

    public DenseVectorFunction() {
    }

    public DenseVectorFunction(List<Expression> children) throws
SQLException {
        super(children);
    }

    @Override
    public String getName() {
        return NAME;
    }

    public Expression getElementExpr() {
        return children.get(0);
    }

    @Override
    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
        if (!getElementExpr().evaluate(tuple, ptr)) {
            return false;
        }
        Object element = getElementExpr().getDataType().toObject(ptr,
getElementExpr().getSortOrder(), getElementExpr().getMaxLength(),
getElementExpr().getScale());
        IDenseVectorOperator idv = FloatArrayFactory.getFloatArray();
        PhoenixBinaryObjectInspector pboi = new
PhoenixBinaryObjectInspector();
        byte[] bytes = pboi.getPrimitiveJavaObject(element);
        Object object = ptr.get();
        Vector<Float> floatVector = null;
        ByteBuffer buff = null;
        buff = ByteBuffer.wrap(bytes);
        floatVector = idv.getElements(buff);

        Object[] red = new Object[floatVector.size()];
        for (int index = 0; index < red.length; index++) {
            red[index] = new FloatWritable(floatVector.get(index));
            System.out.println("" + floatVector.get(index));
        }
        System.out.println("Buffer header = " +
BufferOperations.stringifyBuffer(buff));
// This prints header info in ByteBuffer which is correct
//HOW DO I MAKE IT RETURN FLOAT[] or PHOENIXARRAY
        ptr.set(??);
        return true;
    }

    @Override
    public SortOrder getSortOrder() {
        return children.get(0).getSortOrder();
    }

    @Override
    public PDataType getDataType() {
        return PFloatArray.INSTANCE;
    }
}
------------------------------------------------------------
------------------------------

Any help will be much appreciated.

Mime
View raw message