Big Data 7 min read

Design and Implementation of Hourly Feature Coverage Metrics Using Spark and Elasticsearch

This article describes a high‑throughput solution for calculating hourly feature coverage, positive‑sample ratio and negative‑sample ratio on billions of records by streaming data with Spark, indexing per experiment‑hour in Elasticsearch, and executing parallel aggregation tasks with Java code.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Design and Implementation of Hourly Feature Coverage Metrics Using Spark and Elasticsearch

Background – The team needed to monitor, on an hourly basis, the coverage, positive‑sample proportion and negative‑sample proportion of individual or combined features across experiments. Each hour generates nearly 100 million records, and seven days of data would exceed 10 billion records.

Technical Options – Three approaches were considered: (1) Spark streaming to compute all possible feature combinations (rejected due to combinatorial explosion), (2) offline HDFS scans on request (too slow for user experience), and (3) indexing data by experiment and hour in Elasticsearch and performing real‑time aggregation on request (chosen).

Architecture – Raw data is ingested from Kafka into Spark, parsed and transformed, then stored in Elasticsearch with an index key of experiment+hour. When a client request arrives, asynchronous tasks query Elasticsearch in parallel, aggregate the needed metrics, and the results are merged and returned to the frontend.

Code Implementation – Asynchronous Tasks

// Launch parallel tasks
final Map<String,List<Future<GetCoverageTask.Result>>> futures = Maps.newHashMap();
for(String metric : metrics) {
    final SampleRatio sampleRatio = getSampleRatio(metric);
    for (String exptId : expts) {
        for (String id : features) {
            final String name = getMetricsName(exptId, sampleRatio, id);
            final List<Future<GetCoverageTask.Result>> resultList = Lists.newArrayList();
            for (Date hour : coveredHours) {
                final String fieldName = getFieldName(isFect ? Constants.FACET_COLLECT : Constants.FEATURE_COLLECT, id);
                final GetCoverageTask task = new GetCoverageTask(exptId, fieldName, sampleRatio, hour);
                final Future<GetCoverageTask.Result> future = TaskExecutor.submit(task);
                resultList.add(future);
            }
            futures.put(name, resultList);
        }
    }
}
final QueryRes queryRes = new QueryRes();
final Iterator<Map.Entry<String, List<Future<GetCoverageTask.Result>>>> it = futures.entrySet().iterator();
while (it.hasNext()){
    // result processing omitted
}

Code Implementation – Metric Calculation

// 1. Aggregate documents to obtain total and target counts
final AggregationBuilder[] agg = getAggregationBuilder(sampleRatio, fieldName);
final SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
searchBuilder.aggregation(agg[0]).aggregation(agg[1]).size(0);
// 2. Retrieve coverage
final String indexName = getIndexName(exptId, hour);
final Search search = new Search.Builder(searchBuilder.toString())
    .addIndex(indexName).addType(getType()).build();
final SearchResult result = jestClient.execute(search);
if(result.getResponseCode() != HttpUtils.STATUS_CODE_200){
    log.warn(result.getErrorMessage());
    return 0f;
}
final MetricAggregation aggregations = result.getAggregations();
// 3. Parse results
final long dividend ;
if(SampleRatio.ALL == sampleRatio){
    dividend = aggregations.getValueCountAggregation(Constants.DIVIDEND).getValueCount();
} else {
    dividend = aggregations.getFilterAggregation(Constants.DIVIDEND).getCount();
}
if(dividend <= 0){
    return 0f;
}
long divisor = aggregations.getFilterAggregation(Constants.DIVISOR).getCount();
return divisor / (float)dividend;

Code Implementation – Aggregation Builder

int label = 0;
final ExistsQueryBuilder existsQuery = QueryBuilders.existsQuery(fieldName);
final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
final List<QueryBuilder> must = boolQuery.must();
TermQueryBuilder labelQuery = null;
if(SampleRatio.POSITIVE == sampleRatio) {
    label = 1;
    labelQuery = QueryBuilders.termQuery(Constants.LABEL, label);
    must.add(labelQuery);
} else if(SampleRatio.NEGATIVE == sampleRatio) {
    labelQuery = QueryBuilders.termQuery(Constants.LABEL, label);
    must.add(labelQuery);
}
must.add(existsQuery);
final ValueCountAggregationBuilder existsCountAgg = AggregationBuilders.count(sampleRatio.getField());
existsCountAgg.field(fieldName);
final FilterAggregationBuilder filterAgg = AggregationBuilders.filter(aggName, boolQuery);
filterAgg.subAggregation(existsCountAgg);
return filterAgg;

Deployment Results – After launch, the service meets expectations with an average response time of about 3 seconds, providing a smooth user experience. Screenshots of the Elasticsearch index and performance metrics are included in the original article.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaElasticsearchparallel processingSparkcoverage metrics
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.