Backend Development 11 min read

Master Spring Boot Messaging: RabbitMQ & Kafka Integration Guide

This article explains how to configure Spring Boot 2.7.10 for AMQP‑based RabbitMQ and Apache Kafka messaging, covering property settings, connection factories, template customization, sending and receiving messages, listener containers, retry mechanisms, and stream processing with code examples.

Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Spring Full-Stack Practical Cases
Master Spring Boot Messaging: RabbitMQ & Kafka Integration Guide

RabbitMQ Support

Advanced Message Queuing Protocol (AMQP) is a platform‑independent, wire‑level protocol for message middleware. Spring AMQP applies Spring core concepts to AMQP‑based solutions, and Spring Boot provides the spring-boot-starter-amqp starter for easy RabbitMQ integration.

RabbitMQ is a lightweight, reliable, scalable, and portable message broker that uses AMQP. Its configuration is driven by spring.rabbitmq.* properties, for example in application.properties :

<code>spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "guest"
    password: "guest"
</code>

Alternatively, you can configure the connection with the addresses property:

<code>spring:
  rabbitmq:
    addresses: "amqp://guest:guest@localhost"
</code>
When addresses are specified, the host and port properties are ignored. Using the amqps scheme automatically enables SSL.

RabbitProperties provides property‑based options. To customize the underlying ConnectionFactory , define a ConnectionFactoryCustomizer bean. If a ConnectionNameStrategy bean exists, it will be used to name connections created by the auto‑configured CachingConnectionFactory .

Sending Messages

The auto‑configured AmqpTemplate and AmqpAdmin can be injected directly:

<code>@Component
public class MyBean {
    private final AmqpAdmin amqpAdmin;
    private final AmqpTemplate amqpTemplate;

    public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
        this.amqpAdmin = amqpAdmin;
        this.amqpTemplate = amqpTemplate;
    }
    // ...
}
</code>
RabbitMessagingTemplate can be injected similarly. If a MessageConverter bean is defined, it is automatically associated with the auto‑configured AmqpTemplate .

Any bean of type org.springframework.amqp.core.Queue will cause a corresponding queue to be declared on the RabbitMQ broker.

Retry can be enabled on AmqpTemplate via configuration:

<code>spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"
</code>

By default, retry is disabled. You can also customize the RetryTemplate programmatically by providing a RabbitRetryTemplateCustomizer bean.

To create additional RabbitTemplate instances or override defaults, use the RabbitTemplateConfigurer bean.

Sending to Streams

Specify the stream name to send messages to a particular stream:

<code>spring:
  rabbitmq:
    stream:
      name: "my-stream"
</code>

If MessageConverter , StreamMessageConverter , or ProducerCustomizer beans are defined, they are automatically associated with the auto‑configured RabbitStreamTemplate . The RabbitStreamTemplateConfigurer bean can be used to create or customize additional stream templates.

Receiving Messages

Any bean can use the @RabbitListener annotation to create a listener endpoint. If no RabbitListenerContainerFactory is defined, a default SimpleRabbitListenerContainerFactory is auto‑configured. The spring.rabbitmq.listener.type property can switch to a direct container. If a MessageConverter or MessageRecoverer bean exists, it is automatically linked.

<code>@Component
public class MyBean {
    @RabbitListener(queues = "someQueue")
    public void processMessage(String content) {
        // ...
    }
}
</code>

To create additional RabbitListenerContainerFactory instances or override defaults, use SimpleRabbitListenerContainerFactoryConfigurer or DirectRabbitListenerContainerFactoryConfigurer with the auto‑configured factory settings.

<code>@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {
    @Bean
    public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        ConnectionFactory connectionFactory = getCustomConnectionFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new MyMessageConverter());
        return factory;
    }
    private ConnectionFactory getCustomConnectionFactory() {
        return ...;
    }
}
</code>
<code>@Component
public class MyBean {
    @RabbitListener(queues = "someQueue", containerFactory = "myFactory")
    public void processMessage(String content) {
        // ...
    }
}
</code>
Retry for listeners can be enabled; by default, RejectAndDontRequeueRecoverer is used. You can provide a custom MessageRecoverer . When retries are exhausted, the message is rejected and, depending on broker configuration, may be discarded or routed to a dead‑letter exchange.

Kafka Support

Apache Kafka is supported via the spring‑kafka project’s auto‑configuration.

Kafka settings are controlled by spring.kafka.* properties, for example:

<code>spring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup"
</code>
To create a topic at startup, declare a bean of type NewTopic . Existing topics are ignored.

For more options, see KafkaProperties .

Sending Messages

The auto‑configured KafkaTemplate can be injected directly:

<code>@Component
public class MyBean {
    private final KafkaTemplate<String, String> kafkaTemplate;
    public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    // ...
}
</code>
If spring.kafka.producer.transaction-id-prefix is set, a KafkaTransactionManager is auto‑configured. A defined RecordMessageConverter bean is automatically linked to the KafkaTemplate .

Receiving Messages

Any bean can use @KafkaListener to create a listener endpoint. If no KafkaListenerContainerFactory is defined, a default container is auto‑configured from spring.kafka.listener.* properties.

<code>@Component
public class MyBean {
    @KafkaListener(topics = "someTopic")
    public void processMessage(String content) {
        // ...
    }
}
</code>

If a KafkaTransactionManager , RecordFilterStrategy , CommonErrorHandler , AfterRollbackProcessor , or ConsumerAwareRebalanceListener bean is defined, it is automatically associated with the container factory. Depending on the listener type, a RecordMessageConverter or BatchMessageConverter bean is linked accordingly.

Kafka Streams

Spring for Apache Kafka provides a factory bean to create a StreamsBuilder and manage stream lifecycles. When @EnableKafkaStreams is present and Kafka Streams are on the classpath, a KafkaStreamsConfiguration bean is auto‑configured.

Enable Kafka Streams by setting spring.kafka.streams.application-id (defaults to spring.application.name ) and spring.kafka.bootstrap-servers . Additional properties can be set under spring.kafka.streams.properties .

To use the factory bean, inject the StreamsBuilder into a @Bean method:

<code>@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {
    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
        stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
        return stream;
    }
    private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
        return new KeyValue<>(key, value.toUpperCase());
    }
}
</code>

By default, streams managed by the StreamsBuilder start automatically; this can be customized with the spring.kafka.streams.auto-startup property.

JavamicroservicesKafkaSpring BootRabbitMQMessagingAMQP
Spring Full-Stack Practical Cases
Written by

Spring Full-Stack Practical Cases

Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.