How to Build a Reliable Email Service with RabbitMQ and Spring Boot
This article walks through creating a robust email‑sending system using Spring Boot and RabbitMQ, covering message confirmation, consumer idempotency, retry mechanisms, scheduled re‑delivery, and detailed code examples to ensure reliable delivery even under failure conditions.
Overview
The article explains how to implement a reliable email service with Spring Boot and RabbitMQ, focusing on message sending confirmation, consumer idempotency, retry handling, and scheduled re‑delivery of failed messages.
Implementation Steps
Obtain the 163.com email authorization code.
Write a mail utility class.
Create RabbitMQ configuration files.
Producer sends messages.
Consumer processes messages and sends emails.
Scheduled task re‑delivers failed messages.
Test various exception scenarios.
Extend with dynamic proxy for idempotency and ack handling.
Project Setup
springbootversion
2.1.5.RELEASE RabbitMQversion
3.7.15 MailUtil: email sending utility. RabbitConfig: RabbitMQ related configuration. TestServiceImpl: producer that sends messages. MailConsumer: consumer that receives messages and sends emails. ResendMsg: scheduled task that re‑delivers failed messages.
Dependencies
<!--mq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--mail-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>RabbitMQ & Mail Configuration
# rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=100
# mail
spring.mail.host=smtp.163.com
spring.mail.username=186****[email protected]
spring.mail.password=***
spring.mail.from=186****[email protected]
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=trueMailUtil
@Component
@Slf4j
public class MailUtil {
@Value("${spring.mail.from}")
private String from;
@Autowired
private JavaMailSender mailSender;
public boolean send(Mail mail) {
SimpleMailMessage message = new SimpleMailMessage();
message.setFrom(from);
message.setTo(mail.getTo());
message.setSubject(mail.getTitle());
message.setText(mail.getContent());
try {
mailSender.send(message);
log.info("邮件发送成功");
return true;
} catch (MailException e) {
log.error("邮件发送失败, to: {}, title: {}", mail.getTo(), mail.getTitle(), e);
return false;
}
}
}RabbitConfig
@Configuration
@Slf4j
public class RabbitConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private MsgLogService msgLogService;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(converter());
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息成功发送到Exchange");
msgLogService.updateStatus(correlationData.getId(), Constant.MsgLogStatus.DELIVER_SUCCESS);
} else {
log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
}
});
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}",
exchange, routingKey, replyCode, replyText, message);
});
return rabbitTemplate;
}
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
public static final String MAIL_QUEUE_NAME = "mail.queue";
public static final String MAIL_EXCHANGE_NAME = "mail.exchange";
public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";
@Bean
public Queue mailQueue() {
return new Queue(MAIL_QUEUE_NAME, true);
}
@Bean
public DirectExchange mailExchange() {
return new DirectExchange(MAIL_EXCHANGE_NAME, true, false);
}
@Bean
public Binding mailBinding() {
return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME);
}
}Producer (TestServiceImpl)
@Service
public class TestServiceImpl implements TestService {
@Autowired
private MsgLogMapper msgLogMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public ServerResponse send(Mail mail) {
String msgId = RandomUtil.UUID32();
mail.setMsgId(msgId);
MsgLog msgLog = new MsgLog(msgId, mail, RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME);
msgLogMapper.insert(msgLog);
CorrelationData correlationData = new CorrelationData(msgId);
rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME,
MessageHelper.objToMsg(mail), correlationData);
return ServerResponse.success(ResponseCode.MAIL_SEND_SUCCESS.getMsg());
}
}Consumer (MailConsumer)
@Component
@Slf4j
public class MailConsumer {
@Autowired
private MsgLogService msgLogService;
@Autowired
private MailUtil mailUtil;
@RabbitListener(queues = RabbitConfig.MAIL_QUEUE_NAME)
public void consume(Message message, Channel channel) throws IOException {
Mail mail = MessageHelper.msgToObj(message, Mail.class);
String msgId = mail.getMsgId();
MsgLog msgLog = msgLogService.selectByMsgId(msgId);
if (msgLog == null || msgLog.getStatus().equals(Constant.MsgLogStatus.CONSUMED_SUCCESS)) {
log.info("重复消费, msgId: {}", msgId);
return;
}
long tag = message.getMessageProperties().getDeliveryTag();
boolean success = mailUtil.send(mail);
if (success) {
msgLogService.updateStatus(msgId, Constant.MsgLogStatus.CONSUMED_SUCCESS);
channel.basicAck(tag, false);
} else {
channel.basicNack(tag, false, true);
}
}
}Scheduled Retry (ResendMsg)
@Component
@Slf4j
public class ResendMsg {
@Autowired
private MsgLogService msgLogService;
@Autowired
private RabbitTemplate rabbitTemplate;
private static final int MAX_TRY_COUNT = 3;
@Scheduled(cron = "0/30 * * * * ?")
public void resend() {
log.info("开始执行定时任务(重新投递消息)");
List<MsgLog> msgLogs = msgLogService.selectTimeoutMsg();
for (MsgLog msgLog : msgLogs) {
String msgId = msgLog.getMsgId();
if (msgLog.getTryCount() >= MAX_TRY_COUNT) {
msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_FAIL);
log.info("超过最大重试次数, 消息投递失败, msgId: {}", msgId);
} else {
msgLogService.updateTryCount(msgId, msgLog.getNextTryTime());
CorrelationData correlationData = new CorrelationData(msgId);
rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(),
MessageHelper.objToMsg(msgLog.getMsg()), correlationData);
log.info("第 {} 次重新投递消息", msgLog.getTryCount() + 1);
}
}
log.info("定时任务执行结束(重新投递消息)");
}
}Database Table
CREATE TABLE `msg_log` (
`msg_id` varchar(255) NOT NULL DEFAULT '' COMMENT '消息唯一标识',
`msg` text COMMENT '消息体, json格式化',
`exchange` varchar(255) NOT NULL DEFAULT '' COMMENT '交换机',
`routing_key` varchar(255) NOT NULL DEFAULT '' COMMENT '路由键',
`status` int(11) NOT NULL DEFAULT '0' COMMENT '状态: 0投递中 1投递成功 2投递失败 3已消费',
`try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
`next_try_time` datetime DEFAULT NULL COMMENT '下一次重试时间',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`msg_id`),
UNIQUE KEY `unq_msg_id` (`msg_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息投递日志';Testing
Basic flow test shows successful email delivery, message status becomes 3 (consumed), and retry count remains 0. Screenshots of request, backend logs, database records, and mailbox confirm the success.
Failure Scenarios
Exchange not found triggers confirm callback with error code 404.
Routing key not found triggers return callback with error code 312.
Manual ack mode: if basicAck is omitted, the message stays unacknowledged in the queue.
Consumer exception: basicNack(tag, false, true) re‑queues the message to avoid loss.
Scheduled retry: messages are retried up to three times; after exceeding the limit, status is set to delivery failure for manual investigation.
Extension
The article suggests using dynamic proxies (or AOP) to extract common logic such as idempotency checks, ack handling, and status updates, allowing business code like mailUtil.send(mail) to stay clean.
Conclusion
Implementing a reliable email service with RabbitMQ involves careful handling of confirmations, idempotency, retries, and proper configuration; the provided code and tests demonstrate a complete solution that can be adapted to other message‑driven use cases.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Programmer DD
A tinkering programmer and author of "Spring Cloud Microservices in Action"
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
