Boost Your IoT Apps with mica-mqtt: Low‑Latency, High‑Performance MQTT Framework
The article introduces mica-mqtt, an open‑source MQTT component built on t‑io that offers simple, low‑latency, high‑performance messaging, outlines its features, provides Spring Boot and plain Java integration steps, shows configuration examples, and explains monitoring and clustering capabilities for IoT applications.
1. Introduction
mica-mqtt is a simple, low‑latency, high‑performance MQTT open‑source component built on t‑io . See the
mica-mqtt-examplemodule for usage.
2. Features
Supports MQTT v3.1, v3.1.1 and v5.0.
Supports WebSocket sub‑protocol (compatible with mqtt.js).
Provides MQTT client and server implementations.
Supports will messages and retained messages.
Custom message handling and cluster forwarding.
Demo for Alibaba Cloud MQTT connection.
GraalVM native compilation.
Spring Boot starter for quick integration.
Prometheus + Grafana metrics integration.
3. TODO
Optimize MQTT session handling and add some MQTT v5.0 features.
4. Changelog
Added WebSocket sub‑protocol support.
Server IP optional.
Removed CountDownLatch from client.
Added max packet length field.
Added connection listener
IMqttClientConnectListener.
Added
maxClientIdLengthconfig.
Improved decode error handling.
Log enhancements.
Code cleanup (Tio.close → Tio.remove).
Dockerfile added to spring‑boot example.
Enhanced starter with will‑message config.
Upgraded t‑io to 3.7.4.
5. Spring Boot Quick Start
5.1 Add Dependency
<code><dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-spring-boot-starter</artifactId>
<version>1.0.3</version>
</dependency>
</code>5.2 Server Configuration Example
<code>mqtt:
server:
enabled: true
ip: 127.0.0.1
port: 5883
name: Mica-Mqtt-Server
buffer-allocator: HEAP
heartbeat-timeout: 120000
read-buffer-size: 8092
max-bytes-in-message: 8092
debug: true
websocket-enable: true
websocket-port: 8083
</code>5.3 Server Interfaces (register as Spring beans)
IMqttServerAuthHandler – client authentication (required)
IMqttMessageListener – message listening (required)
IMqttConnectStatusListener – connection status (required)
IMqttSessionManager – session management (optional)
IMqttMessageStore – store will and retained messages (cluster mode)
AbstractMqttMessageDispatcher – message forwarding (cluster mode)
IpStatListener – t‑io IP status (optional)
5.4 Optional Server Customizer
<code>@Configuration(proxyBeanMethods = false)
public class MqttServerCustomizerConfiguration {
@Bean
public MqttServerCustomizer activeRecordPluginCustomizer() {
return new MqttServerCustomizer() {
@Override
public void customize(MqttServerCreator creator) {
// custom configuration overrides yaml
System.out.println("----MqttServerCustomizer----");
}
};
}
}
</code>5.5 Using MqttServerTemplate
<code>import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
@Service
public class ServerService {
@Autowired
private MqttServerTemplate server;
public boolean publish(String body) {
server.publishAll("/test/123", ByteBuffer.wrap(body.getBytes()));
return true;
}
}
</code>5.6 Cluster Message Broadcasting via MQ
Implement
IMqttConnectStatusListenerfor device status storage.
Implement
IMqttMessageListenerto forward messages to MQ.
Implement
IMqttMessageStorefor will and retained storage.
Implement
AbstractMqttMessageDispatcherto dispatch messages through MQ back to the MQTT cluster.
Business messages are sent to MQ, broadcast to the MQTT cluster, and finally delivered to devices.
5.7 Prometheus + Grafana Monitoring
t‑io provides
t‑iostatmetrics. Supported metrics include connection counts, current connections, handled packets/bytes, received packets/bytes, and sent packets/bytes.
6. Plain Java Project Integration
6.1 Maven Dependency
<code><dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-core</artifactId>
<version>1.0.3</version>
</dependency>
</code>6.2 Client Example
<code>// Initialize MQTT client
MqttClient client = MqttClient.create()
.ip("127.0.0.1")
.port(1883)
.username("admin")
.password("123456")
.version(MqttVersion.MQTT_5)
.clientId("xxxxxx")
.connect();
// Subscribe
client.subQos0("/test/#", (topic, payload) -> {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
});
// Publish
client.publish("/test/client", ByteBuffer.wrap("mica最牛皮".getBytes(StandardCharsets.UTF_8)));
// Disconnect, reconnect, stop
client.disconnect();
client.reconnect();
client.stop();
</code>6.3 Server Example
<code>// Increase stack size for many connections: -Xss129k
MqttServer mqttServer = MqttServer.create()
.ip("127.0.0.1")
.port(1883)
.readBufferSize(512)
.authHandler((clientId, userName, password) -> true)
.messageListener((clientId, topic, mqttQoS, payload) -> {
logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload));
})
.useSsl("", "", "")
.connectStatusListener(new IMqttConnectStatusListener() {
@Override public void online(String clientId) {}
@Override public void offline(String clientId) {}
})
.messageDispatcher(new IMqttMessageDispatcher() {
@Override public void config(MqttServer mqttServer) {}
@Override public boolean send(Message message) { return false; }
@Override public boolean send(String clientId, Message message) { return false; }
})
.debug()
.start();
// Publish to a client
mqttServer.publish("clientId", "/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
// Publish to all listeners of a topic
mqttServer.publishAll("/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
// Stop server
mqttServer.stop();
</code>Java Architecture Diary
Committed to sharing original, high‑quality technical articles; no fluff or promotional content.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.