Backend Development 10 min read

Boost Your IoT Apps with mica-mqtt: High‑Performance MQTT Server & Client Guide

The article introduces mica-mqtt, a lightweight, low‑latency, high‑performance MQTT component built on t‑io, detailing its features, configuration for Spring Boot and plain Java projects, custom interfaces, clustering, monitoring with Prometheus‑Grafana, and provides code examples for both server and client integration.

Java Architecture Diary
Java Architecture Diary
Java Architecture Diary
Boost Your IoT Apps with mica-mqtt: High‑Performance MQTT Server & Client Guide

1. Introduction

mica-mqtt is a simple, low‑latency, high‑performance MQTT open‑source component built on t‑io . See the

mica-mqtt-example

module for usage.

2. Features

Supports MQTT v3.1, v3.1.1 and v5.0.

Provides MQTT client.

Provides MQTT server.

Supports will messages.

Supports retained messages.

Custom message (mq) processing and cluster forwarding.

Demo for Alibaba Cloud MQTT connection.

Supports GraalVM native compilation.

Spring Boot starter

mica-mqtt-spring-boot-starter

for quick integration.

Starter integrates with Prometheus + Grafana.

3. TODO

Add WebSocket support (research completed).

Optimize MQTT session handling and add full v5.0 support.

4. Changelog

Documentation added cluster processing steps and usage scenarios for will and retained messages.

Removed QoS2 parameter from demo to avoid performance loss.

Abstracted internal forwarding of will and retained messages.

Added

mica-mqtt-spring-boot-example

.

Starter now supports client access and server optimizations.

Starter supports metric collection for Prometheus + Grafana.

Server now disconnects previous connections with same clientId.

Upgraded mica-auto to 2.1.3 to fix IDE incremental compilation issue.

5. Spring Boot Quick Start

5.1 Add Dependency

<code><dependency>
    <groupId>net.dreamlu</groupId>
    <artifactId>mica-mqtt-spring-boot-starter</artifactId>
    <version>1.0.2</version>
</dependency>
</code>

5.2 Server YML Configuration

<code>mqtt:
  server:
    enabled: true          # default true
    ip: 127.0.0.1          # default 127.0.0.1
    port: 5883             # default 1883
    name: Mica-Mqtt-Server
    buffer-allocator: HEAP
    heartbeat-timeout: 120000
    read-buffer-size: 8092
    max-bytes-in-message: 8092
    debug: true            # disable when Prometheus metrics are enabled
</code>

5.3 Required Server Interfaces (register as Spring beans)

IMqttServerAuthHandler – client authentication (required).

IMqttMessageListener – message listening (required).

IMqttConnectStatusListener – connection status listening (required).

IMqttSessionManager – session management (optional).

IMqttMessageStore – stores will and retained messages (cluster mode).

AbstractMqttMessageDispatcher – message forwarding for will/retained messages (cluster mode).

IpStatListener – t‑io IP status listening (optional).

5.4 Optional Server Customizer

<code>@Configuration(proxyBeanMethods = false)
public class MqttServerCustomizerConfiguration {

    @Bean
    public MqttServerCustomizer activeRecordPluginCustomizer() {
        return new MqttServerCustomizer() {
            @Override
            public void customize(MqttServerCreator creator) {
                // custom configuration overrides YML settings
                System.out.println("----------------MqttServerCustomizer-----------------");
            }
        };
    }
}
</code>

5.5 Using MqttServerTemplate

<code>import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.nio.ByteBuffer;

@Service
public class ServerService {
    @Autowired
    private MqttServerTemplate server;

    public boolean publish(String body) {
        server.publishAll("/test/123", ByteBuffer.wrap(body.getBytes()));
        return true;
    }
}
</code>

5.6 Cluster Message Broadcasting via MQ

Implement

IMqttConnectStatusListener

to store device status.

Implement

IMqttMessageListener

to forward messages to MQ for business processing.

Implement

IMqttMessageStore

to store will and retained messages.

Implement

AbstractMqttMessageDispatcher

to forward messages to MQ, which then broadcasts back to the MQTT cluster.

Business messages are sent to MQ, broadcast to the MQTT cluster, and finally delivered to devices.

5.7 Prometheus + Grafana Monitoring

Thanks to t‑io’s design, monitoring metrics are exposed via t‑io‑stat . Supported metrics include connection counts, message packets, and byte volumes for both inbound and outbound traffic.

mqtt_connections_accepted – total connections accepted

mqtt_connections_closed – total connections closed

mqtt_connections_size – current active connections

mqtt_messages_handled_packets – processed message packets

mqtt_messages_handled_bytes – processed message bytes

mqtt_messages_received_packets – received message packets

mqtt_messages_received_bytes – received message bytes

mqtt_messages_send_packets – sent message packets

mqtt_messages_send_bytes – sent message bytes

6. Plain Java Project Integration

6.1 Maven Dependency

<code><dependency>
    <groupId>net.dreamlu</groupId>
    <artifactId>mica-mqtt-core</artifactId>
    <version>1.0.2</version>
</dependency>
</code>

6.2 mica-mqtt Client Example

<code>// Initialize MQTT client
MqttClient client = MqttClient.create()
    .ip("127.0.0.1")
    .port(1883)               // default 1883
    .username("admin")
    .password("123456")
    .version(MqttVersion.MQTT_5) // default 3_1_1
    .clientId("xxxxxx")       // default prefix + nanosecond
    .connect();

// Subscribe
client.subQos0("/test/#", (topic, payload) -> {
    logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
});

// Unsubscribe
client.unSubscribe("/test/#");

// Publish
client.publish("/test/client", ByteBuffer.wrap("mica最牛皮".getBytes(StandardCharsets.UTF_8)));

// Disconnect, reconnect, stop
client.disconnect();
client.reconnect();
client.stop();
</code>

6.3 mica-mqtt Server Example

<code>// Increase stack size for many connections: -Xss129k
MqttServer mqttServer = MqttServer.create()
    .ip("127.0.0.1")
    .port(1883)
    .readBufferSize(512)
    .authHandler((clientId, userName, password) -> true)
    .messageListener((clientId, topic, mqttQoS, payload) -> {
        logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS,
                ByteBufferUtil.toString(payload));
    })
    .useSsl("", "", "")
    .connectStatusListener(new IMqttConnectStatusListener() {
        @Override
        public void online(String clientId) { }
        @Override
        public void offline(String clientId) { }
    })
    .messageDispatcher(new IMqttMessageDispatcher() {
        @Override
        public void config(MqttServer mqttServer) { }
        @Override
        public boolean send(Message message) { return false; }
        @Override
        public boolean send(String clientId, Message message) { return false; }
    })
    .debug()
    .start();

// Publish to a specific client
mqttServer.publish("clientId", "/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));

// Publish to all listeners of a topic
mqttServer.publishAll("/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));

// Stop server
mqttServer.stop();
</code>

7. Demo

See the

mica-mqtt-example

project for a live demonstration.

JavaSpring BootGraalVMIoTMQTTt-io
Java Architecture Diary
Written by

Java Architecture Diary

Committed to sharing original, high‑quality technical articles; no fluff or promotional content.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.