Big Data 11 min read

How to Build a Real-Time Customer Behavior Collection System with Storm and NSQ

This article explains the design of a real-time customer behavior collection platform that uses NSQ for messaging, Storm for streaming processing, and HBase for storage, covering architecture, data flow, reliability guarantees, and deployment considerations.

21CTO
21CTO
21CTO
How to Build a Real-Time Customer Behavior Collection System with Storm and NSQ

Background

The Youzan membership system supports omnichannel customer management; with the rise of social networks, merchants need richer interactions and real‑time insights, requiring a system that can collect and process customer behavior events in real time.

Behavior Model

Customer behavior events are defined as interactions between a client and the business system. The model records event types, attributes (e.g., product click details, order information) and timestamps, allowing analysis of event sequences, impact on business value, and root‑cause investigation.

Logical Architecture

The architecture consists of three layers:

Client : Generates behavior events and may also consume them.

Collector : Receives events from clients (push or pull), transforms them, and publishes to a distributed queue (NSQ). The queue feeds a streaming engine.

Storage : Persists events in MySQL or HBase; HBase is preferred for its flexible schema and Hadoop integration.

Service : Provides query access to stored events, currently supporting random reads.

Data Flow

Two main flows exist:

Processing flow : Client events are formatted per the behavior model, sent to NSQ, consumed by Storm, and persisted to HBase/MySQL.

Output flow : Stored events are read by a query service for downstream use or exported to Hive via external tables.

System Design Requirements

Real‑time

High real‑time performance is essential for timely customer insights. Challenges include traffic spikes, component failures requiring retries, data backlog, and bug‑induced reprocessing. Storm is used for its low‑latency continuous streaming model, strong fault tolerance, and easy horizontal scaling.

Storm topology uses two NSQ topics: a normal topic for regular events and a retry topic for failed messages. A retry strategy with delayed offline compensation tasks is illustrated.

Message Delivery Guarantees

Storm supports At‑Least‑Once via Acker and Exactly‑Once via Trident. The system requires no message loss and, for some scenarios, Exactly‑Once semantics. NSQ already guarantees delivery to consumers, so the implementation adopts At‑Least‑Once with idempotent processing.

public class NSQSpout extends BaseRichSpout {
    public void nextTuple() {
        String messageId = UUID.randomUUID().toString();
        try {
            // message processing logic
            this.collector.emit("stream_id", new Values(message), messageId);
        } catch (Exception e) {
            logger.error("Emit message failed. ", e);
        }
    }
    @Override
    public void ack(Object msgId) {
        // Get origin NSQMessage and finish it
        try {
            this.consumer.finish(NSQMessage message);
        } catch(NSQException e) {
            logger.error("Failed to ack message. id: {}", msgId, e);
        }
    }
    @Override
    public void fail(Object msgId) {
        logger.info("msg failed: {}", msgId);
        // retry according to strategy
    }
}

public class BizBolt extends BaseRichBolt {
    private OutputCollector collector;
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }
    @Override
    public void execute(Tuple tuple) {
        Object obj = tuple.getValues();
        // business logic
        this.collector.emit("another-stream-id", tuple, new Values(obj));
        this.collector.ack(tuple);
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("log-stream", new Fields("content"));
    }
}

Storage

HBase is used for persistence. RowKey design considerations include short length, hashing to avoid hotspot regions (e.g., MD5 or prefix + timestamp), and uniqueness.

// Method 1
byte[] rowkey = MessageDigest.getInstance("MD5").digest(identifier.getBytes());
// Method 2
byte[] rowkey = String.format("%08d%d", prefix, timestamp).getBytes();

Deployment

Deploying a Storm topology is straightforward: upload a new JAR. NSQ retains consumer offsets, allowing resume from the last position; resetting the channel offset forces re‑consumption.

Conclusion

The real‑time system bridges the gap of offline “T+1” pipelines, offering stricter latency, availability, and scalability. Continuous improvement of data collection efficiency drives support for more business scenarios. Contact: [email protected]

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

data pipelineReal-time StreamingHBaseNSQStorm
21CTO
Written by

21CTO

21CTO (21CTO.com) offers developers community, training, and services, making it your go‑to learning and service platform.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.