How RocketMQ Producer Works: A Deep Dive into Init, Startup, and Message Sending

This article examines the RocketMQ client’s Producer implementation using the latest release‑4.7.1 source and unit tests, explaining the facade pattern, startup sequence, state management, and the three message‑sending modes with detailed code walkthroughs and design insights.

JavaEdge
JavaEdge
JavaEdge
How RocketMQ Producer Works: A Deep Dive into Init, Startup, and Message Sending

Overview

The RocketMQ client source for version release-4.7.1 resides in the rocketmq/client module. Unit‑test class DefaultMQProducerTest is the primary entry point for exploring the Producer API because each testXXX method isolates a specific sending scenario.

Key Classes and Design Patterns

MQProducer – façade interface that hides internal complexity.

DefaultMQProducer – implements MQProducer and delegates to DefaultMQProducerImpl.

DefaultMQProducerImpl – contains the actual business logic for producing messages.

MQClientInstance – singleton that holds client‑wide state (producers, consumers, services).

MQClientAPIImpl – encapsulates RPC communication; uses NettyRemotingClient for network I/O.

Producer Startup Process

The init() method in the test creates a DefaultMQProducer, configures parameters, and calls start(). DefaultMQProducer.start() forwards to DefaultMQProducerImpl.start(), which manages a serviceState variable using a switch‑case implementation of the State Pattern.

Important State Values

Normal states: RUNNING, SHUTDOWN_ALREADY Intermediate state: CREATE_JUST Exceptional state: START_FAILED Only CREATE_JUST may transition to RUNNING, preventing multiple starts.

Detailed Startup Steps

Obtain (or create) a singleton MQClientInstance via MQClientManager.

Register the producer with the instance.

Start the MQClientInstance, which in turn starts internal services.

Send heartbeat messages to all brokers.

The MQClientInstance class centralises client state and delegates RPC calls to MQClientAPIImpl, which uses Netty for transport.

Message Sending Process

The MQProducer interface defines 19 overloads grouped into three categories:

Oneway – fire‑and‑forget, no response.

Sync – blocks until a response or timeout.

Async – returns immediately; a SendCallback handles the result.

All categories eventually invoke the same core logic. The asynchronous path uses an ExecutorService to run sendSelectImpl() in a separate thread.

@Deprecated
public void send(final Message msg, final MessageQueueSelector selector, final Object arg,
    final SendCallback sendCallback, final long timeout)
    throws MQClientException, RemotingException, InterruptedException {
    final long beginStartTime = System.currentTimeMillis();
    ExecutorService executor = this.getAsyncSenderExecutor();
    try {
        executor.submit(new Runnable() {
            @Override
            public void run() {
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeout > costTime) {
                    try {
                        try {
                            sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC,
                                sendCallback, timeout - costTime);
                        } catch (MQBrokerException e) {
                            throw new MQClientException("unknownn exception", e);
                        }
                    } catch (Exception e) {
                        sendCallback.onException(e);
                    }
                } else {
                    sendCallback.onException(new RemotingTooMuchRequestException("call timeout"));
                }
            }
        });
    } catch (RejectedExecutionException e) {
        throw new MQClientException("exector rejected ", e);
    }
}

Synchronous and oneway calls invoke sendSelectImpl() directly in the calling thread.

sendSelectImpl()

This method selects a target MessageQueue using a MessageQueueSelector (Strategy Pattern) and then calls sendKernelImpl().

MessageQueue mq = null;
try {
    List<MessageQueue> messageQueueList =
        mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
    Message userMessage = MessageAccessor.cloneMessage(msg);
    String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(),
        mQClientFactory.getClientConfig().getNamespace());
    userMessage.setTopic(userTopic);
    mq = mQClientFactory.getClientConfig().queueWithNamespace(
        selector.select(messageQueueList, userMessage, arg));
} catch (Throwable e) {
    throw new MQClientException("select message queue throwed exception.", e);
}
if (mq != null) {
    return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null,
        timeout - costTime);
} else {
    throw new MQClientException("select message queue return null.", null);
}

sendKernelImpl()

It builds a RequestHeader and a SendMessageContext, then delegates to MQClientAPIImpl.sendMessage(), which serialises the request and transmits it to the broker via Netty.

Configuration Parameters

defaultTopicQueueNums

– limits the number of queues a producer will use for a given topic, reducing unnecessary client‑side load. writeQueueNums / readQueueNums – server‑side limits for how many queues a client may write to or read from; they override the client‑side defaultTopicQueueNums. perm – topic permission flag (read/write).

MessageQueueSelector Implementations

RocketMQ provides several built‑in selectors (random, hash, same‑room) and allows custom implementations. Selecting a queue based on a key (hash selector) is required for strict ordering of messages with the same key.

Summary

The Producer lifecycle consists of:

Initialization ( init()start()) which sets up MQClientInstance and registers the producer.

State management via serviceState to ensure a single start.

Message sending, where oneway, sync, and async paths converge on sendSelectImpl()sendKernelImpl()MQClientAPIImpl.sendMessage().

Both synchronous and asynchronous sending share the same underlying flow; the only difference is the thread that executes the final network call.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaRocketMQProducer
JavaEdge
Written by

JavaEdge

First‑line development experience at multiple leading tech firms; now a software architect at a Shanghai state‑owned enterprise and founder of Programming Yanxuan. Nearly 300k followers online; expertise in distributed system design, AIGC application development, and quantitative finance investing.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.