phoenix-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From James Taylor <jamestay...@apache.org>
Subject Re: Phoenix custom UDF
Date Sun, 02 Oct 2016 01:33:47 GMT
Hi Akhil,
You want to create an Array, convert it to its byte[] representation, and
set the ptr argument to point to it. Take a look at ArrayIT for examples of
creating an Array:

    // Create Array of FLOAT
    Float[] floatArr =  new Float[2];
    floatArr[0] = 64.87;
    floatArr[1] = 89.96;
    Array array = conn.createArrayOf("FLOAT", floatArr);
    // Convert to byte[]
    byte[] arrayAsBytes = PFloatArray.INSTANCE.toBytes(array);
    // Set ptr to byte[]
    ptr.set(arrayAsBytes);

Thanks,
James


On Sat, Oct 1, 2016 at 9:19 AM, akhil jain <akhilcancerian@gmail.com> wrote:

> I am using hbase 1.1 with phoenix 4.8. I have a table with columns whose
> datatype is 'VARBINARY'.
> The data in these columns is compressed float[] in form of ByteBuffer
> called DenseVector which is an ordered set of 16 bit IEEE floats of
> cardinality no more than 3996.
> I have loaded data into phoenix tables through spark-phoenix plugin. Just
> to give an idea the mapreduce jobs write data in hive in parquet gzip
> format. I read data into a dataframe using sqlContext.parquetFile() ,
> register it as temp table and run a sqlContext.sql("select query ...")
> query and finally calling res.save("org.apache.phoenix.spark",
> SaveMode.Overwrite, Map("table" -> "SITEFLOWTABLE" ,"zkUrl" ->
> "localhost:2181"))
> We have a hive/shark UDF(code pasted below) that can decode these
> ByteBuffer columns and display them in readable float[]. So this UDF works
> on spark-shell.
> Now I just want to write a similar UDF in phoenix and run queries as "
> select uplinkcostbuffer,DENSEVECTORUDF(uplinkcostbuffer) from
> siteflowtable" and further write UDAFs over it.
> How do I make phoenix UDF return float[] ?? I have tried a lot many things
> but none seem to work.
>
> Below is the code for hive/shark UDF-
> ------------------------------------------------------------
> ------------------------------
> package com.ABCD.densevectorudf;
>
> import java.nio.ByteBuffer;
> import java.util.Vector;
>
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
> import org.apache.hadoop.hive.ql.exec.Description;
> import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
> import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
> import org.apache.hadoop.hive.ql.metadata.HiveException;
> import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
> import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto
> rFactory;
> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Bina
> ryObjectInspector;
> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim
> itiveObjectInspectorFactory;
> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Stri
> ngObjectInspector;
> import org.apache.hadoop.io.FloatWritable;
>
> import com.ABCD.common.attval.IDenseVectorOperator;
> import com.ABCD.common.attval.Utility;
> import com.ABCD.common.attval.array.BufferOperations;
> import com.ABCD.common.attval.array.FloatArrayFactory;
>
> @Description(name = "DenseVectorUDF",
> value = "Dense Vector UDF in Hive / Shark\n"
> + "_FUNC_(binary|hex) - "
> + "Returns the dense vector array<float> value\n",
> extended = "Dense Vector UDAF in Hive / Shark")
>
> public class DenseVectorUDF extends GenericUDF {
> private static final Log LOG = LogFactory.getLog(DenseVectorU
> DF.class.getName());
> private ObjectInspector inputOI;
> private ListObjectInspector outputOI;
>
> @Override
> public String getDisplayString(String[] children) {
> StringBuilder sb = new StringBuilder();
> sb.append("densevectorudf(");
> for (int i = 0; i < children.length; i++) {
> sb.append(children[i]);
> if (i + 1 != children.length) {
> sb.append(",");
> }
> }
> sb.append(")");
> return sb.toString();
> }
>
> @Override
> public ObjectInspector initialize(ObjectInspector[] arguments) throws
> UDFArgumentException {
> if (arguments.length == 1) {
> ObjectInspector first = arguments[0];
> if (!(first instanceof StringObjectInspector) && !(first instanceof
> BinaryObjectInspector)) {
> LOG.error("first argument must be a either binary or hex buffer");
> throw new UDFArgumentException("first argument must be a either binary or
> hex buffer");
> }
> inputOI = first;
> outputOI = ObjectInspectorFactory.getStandardListObjectInspector(Primit
> iveObjectInspectorFactory.writableFloatObjectInspector);
> } else {
> throw new UDFArgumentLengthException("Wrong argument length is passed.
> Arguments length is NOT supported.");
> }
> return outputOI;
> }
>
> @Override
> public Object evaluate(DeferredObject[] arguments) throws HiveException {
> IDenseVectorOperator idv = FloatArrayFactory.getFloatArray();
> Object object = arguments[0].get();
> Vector<Float> floatVector = null;
> ByteBuffer buff = null;
> if (inputOI instanceof StringObjectInspector) {
> String hex = ((StringObjectInspector) inputOI).getPrimitiveJavaObjec
> t(object);
> buff = ByteBuffer.wrap(Utility.hexToBytes(hex));
> } else if (inputOI instanceof BinaryObjectInspector) {
> byte[] bytes = ((BinaryObjectInspector) inputOI).getPrimitiveJavaObjec
> t(object);
> buff = ByteBuffer.wrap(bytes);
> }
> floatVector = idv.getElements(buff);
> Object red [] = new Object[floatVector.size()];
> for(int index = 0; index < red.length; index++){
> red[index] = new FloatWritable(floatVector.get(index));
> }
> LOG.info("Buffer header = " + BufferOperations.stringifyBuffer(buff));
> return red;
> }
> }
>
> ------------------------------------------------------------
> ------------------------------
>
>
> Following is the code I have written for Phoenix UDF-
> ------------------------------------------------------------
> ------------------------------
> package org.apache.phoenix.expression.function;
>
> import com.ABCD.common.attval.IDenseVectorOperator;
> import com.ABCD.common.attval.array.BufferOperations;
> import com.ABCD.common.attval.array.FloatArrayFactory;
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto
> rFactory;
> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim
> itiveObjectInspectorFactory;
> import org.apache.hadoop.io.FloatWritable;
> import org.apache.phoenix.expression.Expression;
> import org.apache.phoenix.hive.objectinspector.PhoenixBinaryObjectI
> nspector;
> import org.apache.phoenix.parse.FunctionParseNode.Argument;
> import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
> import org.apache.phoenix.schema.SortOrder;
> import org.apache.phoenix.schema.tuple.Tuple;
> import org.apache.phoenix.schema.types.PDataType;
> import org.apache.phoenix.schema.types.PFloatArray;
> import org.apache.phoenix.schema.types.PVarbinary;
>
> import java.nio.ByteBuffer;
> import java.nio.FloatBuffer;
> import java.sql.SQLException;
> import java.util.List;
> import java.util.Vector;
>
> @BuiltInFunction(name = DenseVectorFunction.NAME, args = {
>         @Argument(allowedTypes = {PVarbinary.class})})
> public class DenseVectorFunction extends ScalarFunction {
>     public static final String NAME = "DenseVectorFunction";
>     private ListObjectInspector outputOI;
>
>     public DenseVectorFunction() {
>     }
>
>     public DenseVectorFunction(List<Expression> children) throws
> SQLException {
>         super(children);
>     }
>
>     @Override
>     public String getName() {
>         return NAME;
>     }
>
>     public Expression getElementExpr() {
>         return children.get(0);
>     }
>
>     @Override
>     public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
>         if (!getElementExpr().evaluate(tuple, ptr)) {
>             return false;
>         }
>         Object element = getElementExpr().getDataType().toObject(ptr,
> getElementExpr().getSortOrder(), getElementExpr().getMaxLength(),
> getElementExpr().getScale());
>         IDenseVectorOperator idv = FloatArrayFactory.getFloatArray();
>         PhoenixBinaryObjectInspector pboi = new
> PhoenixBinaryObjectInspector();
>         byte[] bytes = pboi.getPrimitiveJavaObject(element);
>         Object object = ptr.get();
>         Vector<Float> floatVector = null;
>         ByteBuffer buff = null;
>         buff = ByteBuffer.wrap(bytes);
>         floatVector = idv.getElements(buff);
>
>         Object[] red = new Object[floatVector.size()];
>         for (int index = 0; index < red.length; index++) {
>             red[index] = new FloatWritable(floatVector.get(index));
>             System.out.println("" + floatVector.get(index));
>         }
>         System.out.println("Buffer header = " +
> BufferOperations.stringifyBuffer(buff)); // This prints header info in
> ByteBuffer which is correct
> //HOW DO I MAKE IT RETURN FLOAT[] or PHOENIXARRAY
>         ptr.set(??);
>         return true;
>     }
>
>     @Override
>     public SortOrder getSortOrder() {
>         return children.get(0).getSortOrder();
>     }
>
>     @Override
>     public PDataType getDataType() {
>         return PFloatArray.INSTANCE;
>     }
> }
> ------------------------------------------------------------
> ------------------------------
>
> Any help will be much appreciated.
>

Mime
View raw message