Big Data 6 min read

Real-time Log Processing System Based on Flink and Drools

This article describes a real-time log processing platform that integrates Kafka, Flink, Drools rule engine, Redis, and Elasticsearch to unify heterogeneous log formats, extract business metrics, and provide configurable, dynamic data processing for large‑scale logging scenarios.

IT Architects Alliance
IT Architects Alliance
IT Architects Alliance
Real-time Log Processing System Based on Flink and Drools

Background: Various log sources such as filebeat, winbeat, syslog, and business logs from Kafka introduce challenges of inconsistent formats and difficulty extracting useful metrics.

To address these issues, a real‑time log processing service was built using Flink and the Drools rule engine.

System Architecture

All logs are aggregated into Kafka as a central hub. Flink consumes the Kafka streams, invokes Drools via a broadcast map state to parse and process logs, and stores the enriched data in Elasticsearch for search and analysis. Processing statistics (e.g., logs per minute, per‑IP counts) are written to Redis for monitoring.

Module Overview

The project, named eagle , consists of:

eagle-api: Spring Boot service exposing APIs for reading and writing Drools rules.

eagle-common: Shared utility module.

eagle-log: Flink‑based log processing service.

Key implementation details for eagle‑log include connections to Kafka, Elasticsearch, and Redis. The Kafka and Elasticsearch connectors use the official Flink connectors (flink‑connector‑kafka‑0.10 and flink‑connector‑elasticsearch6).

Initially, Redis aggregation used a keyBy‑based approach that cached entire groups, leading to high memory usage. The optimized version replaces the group cache with Flink’s aggregate function and accumulator, reducing memory pressure.

String name = "redis-agg-log";
DataStream<Tuple2<String, List<LogEntry>>> keyedStream = dataSource
    .keyBy((KeySelector) log -> log.getIndex())
    .timeWindow(Time.seconds(windowTime))
    .trigger(new CountTriggerWithTimeout<>(windowCount, TimeCharacteristic.ProcessingTime))
    .process(new ProcessWindowFunction
>, String, TimeWindow>() {
        @Override
        public void process(String s, Context context, Iterable
iterable, Collector
>> collector) {
            ArrayList
logs = Lists.newArrayList(iterable);
            if (logs.size() > 0) {
                collector.collect(new Tuple2<>(s, logs));
            }
        }
    })
    .setParallelism(redisSinkParallelism).name(name).uid(name);

After optimization:

String name = "redis-agg-log";
DataStream keyedStream = dataSource
    .keyBy((KeySelector) log -> log.getIndex())
    .timeWindow(Time.seconds(windowTime))
    .trigger(new CountTriggerWithTimeout<>(windowCount, TimeCharacteristic.ProcessingTime))
    .aggregate(new LogStatAggregateFunction(), new LogStatWindowFunction())
    .setParallelism(redisSinkParallelism).name(name).uid(name);

The system also broadcasts Drools rule streams using Flink’s broadcast state, then connects the rule stream with the Kafka data stream for joint processing.

// Broadcast rule stream
env.addSource(new RuleSourceFunction(ruleUrl)).name(ruleName).uid(ruleName).setParallelism(1)
    .broadcast(ruleStateDescriptor);

// Kafka data stream
FlinkKafkaConsumer010<> source = new FlinkKafkaConsumer010<>(kafkaTopic, new LogSchema(), properties);
env.addSource(source).name(kafkaTopic).uid(kafkaTopic).setParallelism(kafkaParallelism);

// Connect streams and process
BroadcastConnectedStream connectedStreams = dataSource.connect(ruleSource);
connectedStreams.process(new LogProcessFunction(ruleStateDescriptor, ruleBase))
    .setParallelism(processParallelism).name(name).uid(name);

Conclusion: The system provides a reference implementation for real‑time data processing with Flink, integrating Kafka, Redis, and Elasticsearch, and leveraging a configurable Drools rule engine to achieve dynamic, scalable log parsing, cleaning, and standardization.

For further details and source code, see the GitHub repository: https://github.com/luxiaoxun/eagle .

Real-timeFlinkElasticsearchRedisKafkadroolslog-processing
IT Architects Alliance
Written by

IT Architects Alliance

Discussion and exchange on system, internet, large‑scale distributed, high‑availability, and high‑performance architectures, as well as big data, machine learning, AI, and architecture adjustments with internet technologies. Includes real‑world large‑scale architecture case studies. Open to architects who have ideas and enjoy sharing.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.