Big Data 6 min read

Creating a Test Table in Phoenix/HBase and Implementing a Custom Bitmap Aggregation Function in Spark

This tutorial demonstrates how to create a VARBINARY test table in HBase using Phoenix, serialize its data with RoaringBitmap, implement a custom Spark aggregation function to merge bitmap values, and query the table via Spark SQL, showcasing a practical big-data processing workflow.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Creating a Test Table in Phoenix/HBase and Implementing a Custom Bitmap Aggregation Function in Spark

Create Test Table

Use Phoenix to create a test table in HBase with a VARBINARY column.

CREATE TABLE IF NOT EXISTS test_binary (
    date VARCHAR NOT NULL,
    dist_mem VARBINARY,
    CONSTRAINT test_binary_pk PRIMARY KEY (date)
) SALT_BUCKETS=6;

After the table is created, data is serialized with RoaringBitmap before being stored in the database.

Implement Custom Bitmap Aggregation Function

Provide a Spark UserDefinedAggregateFunction that merges VARBINARY bitmap data using RoaringBitmap.

import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.roaringbitmap.RoaringBitmap;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
/**
 * Implement custom aggregation function Bitmap
 */
public class UdafBitMap extends UserDefinedAggregateFunction {
    @Override
    public StructType inputSchema() {
        List<StructField> structFields = new ArrayList<>();
        structFields.add(DataTypes.createStructField("field", DataTypes.BinaryType, true));
        return DataTypes.createStructType(structFields);
    }
    @Override
    public StructType bufferSchema() {
        List<StructField> structFields = new ArrayList<>();
        structFields.add(DataTypes.createStructField("field", DataTypes.BinaryType, true));
        return DataTypes.createStructType(structFields);
    }
    @Override
    public DataType dataType() {
        return DataTypes.LongType;
    }
    @Override
    public boolean deterministic() {
        // whether the result is always the same for the same input
        return false;
    }
    @Override
    public void initialize(MutableAggregationBuffer buffer) {
        // initialization
        buffer.update(0, null);
    }
    @Override
    public void update(MutableAggregationBuffer buffer, Row input) {
        // merge data from the same executor
        Object in = input.get(0);
        if (in == null) {
            return;
        }
        byte[] inBytes = (byte[]) in;
        Object out = buffer.get(0);
        if (out == null) {
            buffer.update(0, inBytes);
            return;
        }
        // both source and input are not null, use bitmap to deduplicate and merge
        byte[] outBytes = (byte[]) out;
        byte[] result = outBytes;
        RoaringBitmap outRR = new RoaringBitmap();
        RoaringBitmap inRR = new RoaringBitmap();
        try {
            outRR.deserialize(new DataInputStream(new ByteArrayInputStream(outBytes)));
            inRR.deserialize(new DataInputStream(new ByteArrayInputStream(inBytes)));
            outRR.or(inRR);
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            outRR.serialize(new DataOutputStream(bos));
            result = bos.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
        }
        buffer.update(0, result);
    }
    @Override
    public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
        // merge data from different executors
        update(buffer1, buffer2);
    }
    @Override
    public Object evaluate(Row buffer) {
        // compute final result from buffer
        long r = 0L;
        Object val = buffer.get(0);
        if (val != null) {
            RoaringBitmap rr = new RoaringBitmap();
            try {
                rr.deserialize(new DataInputStream(new ByteArrayInputStream((byte[]) val)));
                r = rr.getLongCardinality();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return r;
    }
}

Usage Example

Register the UDAF in Spark, read the Phoenix table via JDBC, and run a SQL query that applies the bitmap aggregation.

/**
 * Use the custom function to parse bitmap
 */
private static void udafBitmap(SparkSession sparkSession) {
    try {
        Properties prop = PropUtil.loadProp(DB_PHOENIX_CONF_FILE);
        // JDBC connection properties
        Properties connProp = new Properties();
        connProp.put("driver", prop.getProperty(DB_PHOENIX_DRIVER));
        connProp.put("user", prop.getProperty(DB_PHOENIX_USER));
        connProp.put("password", prop.getProperty(DB_PHOENIX_PASS));
        connProp.put("fetchsize", prop.getProperty(DB_PHOENIX_FETCHSIZE));
        // Register custom aggregation function
        sparkSession.udf().register("bitmap", new UdafBitMap());
        sparkSession.read()
            .jdbc(prop.getProperty(DB_PHOENIX_URL), "test_binary", connProp)
            // In SQL the table must be referenced as global_temp.tableName
            .createOrReplaceGlobalTempView("test_binary");
        // Example query
        sparkSession.sql("select date, bitmap(dist_mem) memNum from global_temp.test_binary group by date").show();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

Result shows the aggregated bitmap cardinality.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big DataSQLHBaseRoaringBitmapSparkPhoenixUDAF
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.