Backend Development 10 min read

Boost Your IoT Apps with mica-mqtt: Low‑Latency, High‑Performance MQTT Framework

The article introduces mica-mqtt, an open‑source MQTT component built on t‑io that offers simple, low‑latency, high‑performance messaging, outlines its features, provides Spring Boot and plain Java integration steps, shows configuration examples, and explains monitoring and clustering capabilities for IoT applications.

Java Architecture Diary
Java Architecture Diary
Java Architecture Diary
Boost Your IoT Apps with mica-mqtt: Low‑Latency, High‑Performance MQTT Framework

1. Introduction

mica-mqtt is a simple, low‑latency, high‑performance MQTT open‑source component built on t‑io . See the

mica-mqtt-example

module for usage.

2. Features

Supports MQTT v3.1, v3.1.1 and v5.0.

Supports WebSocket sub‑protocol (compatible with mqtt.js).

Provides MQTT client and server implementations.

Supports will messages and retained messages.

Custom message handling and cluster forwarding.

Demo for Alibaba Cloud MQTT connection.

GraalVM native compilation.

Spring Boot starter for quick integration.

Prometheus + Grafana metrics integration.

3. TODO

Optimize MQTT session handling and add some MQTT v5.0 features.

4. Changelog

Added WebSocket sub‑protocol support.

Server IP optional.

Removed CountDownLatch from client.

Added max packet length field.

Added connection listener

IMqttClientConnectListener

.

Added

maxClientIdLength

config.

Improved decode error handling.

Log enhancements.

Code cleanup (Tio.close → Tio.remove).

Dockerfile added to spring‑boot example.

Enhanced starter with will‑message config.

Upgraded t‑io to 3.7.4.

5. Spring Boot Quick Start

5.1 Add Dependency

<code>&lt;dependency&gt;
    &lt;groupId&gt;net.dreamlu&lt;/groupId&gt;
    &lt;artifactId&gt;mica-mqtt-spring-boot-starter&lt;/artifactId&gt;
    &lt;version&gt;1.0.3&lt;/version&gt;
&lt;/dependency&gt;
</code>

5.2 Server Configuration Example

<code>mqtt:
  server:
    enabled: true
    ip: 127.0.0.1
    port: 5883
    name: Mica-Mqtt-Server
    buffer-allocator: HEAP
    heartbeat-timeout: 120000
    read-buffer-size: 8092
    max-bytes-in-message: 8092
    debug: true
    websocket-enable: true
    websocket-port: 8083
</code>

5.3 Server Interfaces (register as Spring beans)

IMqttServerAuthHandler – client authentication (required)

IMqttMessageListener – message listening (required)

IMqttConnectStatusListener – connection status (required)

IMqttSessionManager – session management (optional)

IMqttMessageStore – store will and retained messages (cluster mode)

AbstractMqttMessageDispatcher – message forwarding (cluster mode)

IpStatListener – t‑io IP status (optional)

5.4 Optional Server Customizer

<code>@Configuration(proxyBeanMethods = false)
public class MqttServerCustomizerConfiguration {
    @Bean
    public MqttServerCustomizer activeRecordPluginCustomizer() {
        return new MqttServerCustomizer() {
            @Override
            public void customize(MqttServerCreator creator) {
                // custom configuration overrides yaml
                System.out.println("----MqttServerCustomizer----");
            }
        };
    }
}
</code>

5.5 Using MqttServerTemplate

<code>import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;

@Service
public class ServerService {
    @Autowired
    private MqttServerTemplate server;

    public boolean publish(String body) {
        server.publishAll("/test/123", ByteBuffer.wrap(body.getBytes()));
        return true;
    }
}
</code>

5.6 Cluster Message Broadcasting via MQ

Implement

IMqttConnectStatusListener

for device status storage.

Implement

IMqttMessageListener

to forward messages to MQ.

Implement

IMqttMessageStore

for will and retained storage.

Implement

AbstractMqttMessageDispatcher

to dispatch messages through MQ back to the MQTT cluster.

Business messages are sent to MQ, broadcast to the MQTT cluster, and finally delivered to devices.

5.7 Prometheus + Grafana Monitoring

t‑io provides

t‑iostat

metrics. Supported metrics include connection counts, current connections, handled packets/bytes, received packets/bytes, and sent packets/bytes.

6. Plain Java Project Integration

6.1 Maven Dependency

<code>&lt;dependency&gt;
    &lt;groupId&gt;net.dreamlu&lt;/groupId&gt;
    &lt;artifactId&gt;mica-mqtt-core&lt;/artifactId&gt;
    &lt;version&gt;1.0.3&lt;/version&gt;
&lt;/dependency&gt;
</code>

6.2 Client Example

<code>// Initialize MQTT client
MqttClient client = MqttClient.create()
    .ip("127.0.0.1")
    .port(1883)
    .username("admin")
    .password("123456")
    .version(MqttVersion.MQTT_5)
    .clientId("xxxxxx")
    .connect();

// Subscribe
client.subQos0("/test/#", (topic, payload) -> {
    logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
});

// Publish
client.publish("/test/client", ByteBuffer.wrap("mica最牛皮".getBytes(StandardCharsets.UTF_8)));

// Disconnect, reconnect, stop
client.disconnect();
client.reconnect();
client.stop();
</code>

6.3 Server Example

<code>// Increase stack size for many connections: -Xss129k
MqttServer mqttServer = MqttServer.create()
    .ip("127.0.0.1")
    .port(1883)
    .readBufferSize(512)
    .authHandler((clientId, userName, password) -> true)
    .messageListener((clientId, topic, mqttQoS, payload) -> {
        logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload));
    })
    .useSsl("", "", "")
    .connectStatusListener(new IMqttConnectStatusListener() {
        @Override public void online(String clientId) {}
        @Override public void offline(String clientId) {}
    })
    .messageDispatcher(new IMqttMessageDispatcher() {
        @Override public void config(MqttServer mqttServer) {}
        @Override public boolean send(Message message) { return false; }
        @Override public boolean send(String clientId, Message message) { return false; }
    })
    .debug()
    .start();

// Publish to a client
mqttServer.publish("clientId", "/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
// Publish to all listeners of a topic
mqttServer.publishAll("/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
// Stop server
mqttServer.stop();
</code>
JavaSpring BootGraalVMIoTMQTTt-io
Java Architecture Diary
Written by

Java Architecture Diary

Committed to sharing original, high‑quality technical articles; no fluff or promotional content.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.