Implementing Max‑Effort Notification with Spring Boot & RabbitMQ
This article explains the concept of max‑effort notification, contrasts it with reliable messaging, and provides a complete Spring Boot 2.4.9 and RabbitMQ 3.7.4 implementation—including project structure, configuration, entity models, repositories, services, controllers, and message listeners—plus testing steps and results.
What Is Max‑Effort Notification
Environment: Spring Boot 2.4.9 + RabbitMQ 3.7.4
This is a recharge case.
Interaction flow:
1. Account system calls recharge API.
2. Recharge system sends result notification; if it fails, it retries according to a strategy.
3. Account system updates status upon receiving the notification.
4. If the notification is not received, the account system actively queries the recharge result.
The goal of max‑effort notification is for the sender to make every reasonable effort to deliver the business result to the receiver, including a retry mechanism and a verification mechanism where the receiver can query the sender if the message is not received.
Differences from reliable message consistency:
Solution philosophy: reliable messaging requires the sender to guarantee delivery; max‑effort notification relies on the receiver to query when delivery fails.
Application scenarios: reliable messaging focuses on transaction consistency; max‑effort notification focuses on post‑transaction result notification.
Technical direction: reliable messaging ensures end‑to‑end consistency, while max‑effort provides reliability only on the receiver side, with manual query when needed.
Implementing Max‑Effort Notification with RabbitMQ
Related RabbitMQ articles: “SpringBoot RabbitMQ Message Reliable Send/Receive”, “RabbitMQ Confirm Mechanism”.
Project Structure
Two sub‑modules: users‑manager (account) and pay‑manager (payment).
pay‑manager
Configuration file
<code>server:
port: 8080
---
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
publisherConfirmType: correlated
publisherReturns: true
listener:
simple:
concurrency: 5
maxConcurrency: 10
prefetch: 5
acknowledgeMode: MANUAL
retry:
enabled: true
initialInterval: 3000
maxAttempts: 3
defaultRequeueRejected: false
</code>Entity class
<code>@Entity
@Table(name = "t_pay_info")
public class PayInfo implements Serializable{
@Id
private Long id;
private BigDecimal money;
private Long accountId;
}
</code>DAO and Service
<code>public interface PayInfoRepository extends JpaRepository<PayInfo, Long> {
PayInfo findByOrderId(String orderId);
}
@Service
public class PayInfoService {
@Resource
private PayInfoRepository payInfoRepository;
@Resource
private RabbitTemplate rabbitTemplate;
@Transactional
public PayInfo savePayInfo(PayInfo payInfo) {
payInfo.setId(System.currentTimeMillis());
PayInfo result = payInfoRepository.save(payInfo);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().replaceAll("-", ""));
try {
rabbitTemplate.convertAndSend("pay-exchange", "pay.#",
new ObjectMapper().writeValueAsString(payInfo), correlationData);
} catch (AmqpException | JsonProcessingException e) {
e.printStackTrace();
}
return result;
}
public PayInfo queryByOrderId(String orderId) {
return payInfoRepository.findByOrderId(orderId);
}
}
</code>Controller
<code>@RestController
@RequestMapping("/payInfos")
public class PayInfoController {
@Resource
private PayInfoService payInfoService;
@PostMapping("/pay")
public Object pay(@RequestBody PayInfo payInfo) {
payInfoService.savePayInfo(payInfo);
return "Payment submitted, awaiting result";
}
@GetMapping("/queryPay")
public Object queryPay(String orderId) {
return payInfoService.queryByOrderId(orderId);
}
}
</code>users‑manager
Application configuration
<code>server:
port: 8081
---
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
publisherConfirmType: correlated
publisherReturns: true
listener:
simple:
concurrency: 5
maxConcurrency: 10
prefetch: 5
acknowledgeMode: MANUAL
retry:
enabled: true
initialInterval: 3000
maxAttempts: 3
defaultRequeueRejected: false
</code>Entity classes
<code>@Entity
@Table(name = "t_users")
public class Users {
@Id
private Long id;
private String name;
private BigDecimal money;
}
@Entity
@Table(name = "t_users_log")
public class UsersLog {
@Id
private Long id;
private String orderId;
@Column(columnDefinition = "int default 0")
private Integer status = 0; // 0: pending, 1: paid, 2: cancelled
private BigDecimal money;
private Date createTime;
}
</code>DAO
<code>public interface UsersRepository extends JpaRepository<Users, Long> {}
public interface UsersLogRepository extends JpaRepository<UsersLog, Long> {
UsersLog findByOrderId(String orderId);
}
</code>Service
<code>@Service
public class UsersService {
@Resource
private UsersRepository usersRepository;
@Resource
private UsersLogRepository usersLogRepository;
@Transactional
public boolean updateMoneyAndLogStatus(Long id, String orderId) {
UsersLog usersLog = usersLogRepository.findByOrderId(orderId);
if (usersLog != null && usersLog.getStatus() == 1) {
throw new RuntimeException("Already paid");
}
Users users = usersRepository.findById(id).orElse(null);
if (users == null) {
throw new RuntimeException("Account not found");
}
users.setMoney(users.getMoney().add(usersLog.getMoney()));
usersRepository.save(users);
usersLog.setStatus(1);
usersLogRepository.save(usersLog);
return true;
}
@Transactional
public boolean saveLog(UsersLog usersLog) {
usersLog.setId(System.currentTimeMillis());
usersLogRepository.save(usersLog);
return true;
}
}
</code>Message Listener
<code>@Component
public class PayMessageListener {
private static final Logger logger = LoggerFactory.getLogger(PayMessageListener.class);
@Resource
private UsersService usersService;
@RabbitListener(queues = {"pay-queue"})
@RabbitHandler
public void receive(Message message, Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
byte[] buf = null;
try {
buf = message.getBody();
logger.info("Received message: {}", new String(buf, "UTF-8"));
Map<String, Object> result = new JsonMapper().readValue(buf, Map.class);
Long id = ((Integer) result.get("accountId")) + 0L;
String orderId = (String) result.get("orderId");
usersService.updateMoneyAndLogStatus(id, orderId);
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
logger.error("Message processing error: {}, payload: {}", e.getMessage(),
new String(buf, Charset.forName("UTF-8")));
try {
channel.basicReject(deliveryTag, false);
} catch (IOException e1) {
logger.error("Reject failed: {}", e1.getMessage());
}
}
}
}
</code>Controller
<code>@RestController
@RequestMapping("/users")
public class UsersController {
@Resource
private RestTemplate restTemplate;
@Resource
private UsersService usersService;
@PostMapping("/pay")
public Object pay(Long id, BigDecimal money) throws Exception {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
String orderId = UUID.randomUUID().toString().replaceAll("-", "");
Map<String, String> params = new HashMap<>();
params.put("accountId", String.valueOf(id));
params.put("orderId", orderId);
params.put("money", money.toString());
UsersLog usersLog = new UsersLog();
usersLog.setCreateTime(new Date());
usersLog.setOrderId(orderId);
usersLog.setMoney(money);
usersLog.setStatus(0);
usersService.saveLog(usersLog);
HttpEntity<String> requestEntity = new HttpEntity<>(new ObjectMapper().writeValueAsString(params), headers);
return restTemplate.postForObject("http://localhost:8080/payInfos/pay", requestEntity, String.class);
}
}
</code>The above includes all code for the two sub‑modules and demonstrates testing with initial data, console output, and final database state.
Spring Full-Stack Practical Cases
Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.