Master RocketMQ Message Filtering in Spring Boot: TAG and SQL92 Methods

This tutorial demonstrates how to integrate Spring Boot with RocketMQ and implement two message filtering strategies—TAG-based and SQL expression-based—covering dependencies, configuration, code examples, and testing procedures for reliable message consumption.

Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Master RocketMQ Message Filtering in Spring Boot: TAG and SQL92 Methods

Dependencies and Configuration

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.2.0</version>
</dependency>
server:
  port: 8082
---
rocketmq:
  nameServer: localhost:9876
  producer:
    group: demo-mq

RocketMQ provides two message filtering methods: TAG and SQL expression.

Method 1: Filter by TAG

The producer can set a single tag, while the consumer can subscribe to multiple tags.

Consumer side

Set multiple tags with “||”:

tag1 || tag2 || tag3 || ...
@RocketMQMessageListener(topic = "filter-topic", consumerGroup = "consumer06-group", 
  selectorExpression = "tag11 || tag12 || tag13", messageModel = MessageModel.CLUSTERING, 
  selectorType = SelectorType.TAG)
@Component
public class ConsumerFilterMessageListener implements RocketMQListener<MessageExt> {

  @Override
  public void onMessage(MessageExt message) {
    System.out.println(Thread.currentThread().getName()) ;
    System.out.println(new String(message.getBody())) ;
    System.out.println(message.getProperties()) ;
  }
}

selectorType indicates TAG filtering (default). messageModel defines consumption mode: CLUSTERING (single consumer) or BROADCASTING (all consumers). selectorExpression specifies which tags the consumer accepts.

Producer side

Send messages with topic and tag concatenated:

public void sendFilterMessage(String topic, String message, String tags) {
  Message<String> msg = MessageBuilder.withPayload(message).build() ;
  rocketMQTemplate.convertAndSend(topic + ":" + tags, msg);
}

Format: topic:tag (only one tag allowed).

@GetMapping("/filter")
public Object sendFilterMessage(String content) {
  ps.sendFilterMessage("filter-topic", content, "tag12") ;
  return "send filter message success" ;
}

Only consumers whose selectorExpression includes tag12 will receive the message.

Changing the tag prevents the consumer from receiving the message.

Method 2: Filter by SQL Expression

SQL expression filtering allows evaluating message properties.

Supported syntax includes numeric, string, null checks, and logical operators (AND, OR, NOT).

Numeric comparisons: >, >=, <, <=, BETWEEN, =

String comparisons: =, <>, IN

NULL checks: IS NULL, IS NOT NULL

Logical operators: AND, OR, NOT

Constants can be numbers, strings (single quotes), NULL, or Boolean TRUE/FALSE.

Consumer side

@RocketMQMessageListener(topic = "filter-topic", consumerGroup = "consumer06-group", 
  selectorExpression = "pack = 'abc' || a = 1", messageModel = MessageModel.CLUSTERING, 
  selectorType = SelectorType.SQL92)
@Component
public class ConsumerFilterMessageListener implements RocketMQListener<MessageExt> {

  @Override
  public void onMessage(MessageExt message) {
    System.out.println(Thread.currentThread().getName()) ;
    System.out.println(new String(message.getBody())) ;
    System.out.println(message.getProperties()) ;
  }
}
selectorExpression

specifies that only messages with header pack='abc' or a=1 are accepted. selectorType is SQL92.

Producer side

public void sendFilterMessage(String topic, String message, String tags) {
  Message<String> msg = MessageBuilder.withPayload(message).build() ;
  Map<String, Object> headers = new HashMap<>() ;
  headers.put("pack", "abc") ;
  headers.put("a", 10) ;
  rocketMQTemplate.convertAndSend(topic, msg, headers);
}

The Map sets message headers for filtering.

@GetMapping("/filter")
public Object sendFilterMessage(String content) {
  ps.sendFilterMessage("filter-topic", content, null) ;
  return "send filter message success" ;
}

Testing shows successful receipt; modifying selectorExpression to use AND prevents receipt.

These are the two RocketMQ message filtering methods.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

RocketMQMessage FilteringTag Filteringsql expression
Spring Full-Stack Practical Cases
Written by

Spring Full-Stack Practical Cases

Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.