Boost Your IoT Apps with mica-mqtt: Low‑Latency, High‑Performance MQTT Framework

The article introduces mica-mqtt, an open‑source MQTT component built on t‑io that offers simple, low‑latency, high‑performance messaging, outlines its features, provides Spring Boot and plain Java integration steps, shows configuration examples, and explains monitoring and clustering capabilities for IoT applications.

Java Architecture Diary
Java Architecture Diary
Java Architecture Diary
Boost Your IoT Apps with mica-mqtt: Low‑Latency, High‑Performance MQTT Framework

1. Introduction

mica-mqtt is a simple, low‑latency, high‑performance MQTT open‑source component built on t‑io . See the mica-mqtt-example module for usage.

2. Features

Supports MQTT v3.1, v3.1.1 and v5.0.

Supports WebSocket sub‑protocol (compatible with mqtt.js).

Provides MQTT client and server implementations.

Supports will messages and retained messages.

Custom message handling and cluster forwarding.

Demo for Alibaba Cloud MQTT connection.

GraalVM native compilation.

Spring Boot starter for quick integration.

Prometheus + Grafana metrics integration.

3. TODO

Optimize MQTT session handling and add some MQTT v5.0 features.

4. Changelog

Added WebSocket sub‑protocol support.

Server IP optional.

Removed CountDownLatch from client.

Added max packet length field.

Added connection listener IMqttClientConnectListener.

Added maxClientIdLength config.

Improved decode error handling.

Log enhancements.

Code cleanup (Tio.close → Tio.remove).

Dockerfile added to spring‑boot example.

Enhanced starter with will‑message config.

Upgraded t‑io to 3.7.4.

5. Spring Boot Quick Start

5.1 Add Dependency

<dependency>
    <groupId>net.dreamlu</groupId>
    <artifactId>mica-mqtt-spring-boot-starter</artifactId>
    <version>1.0.3</version>
</dependency>

5.2 Server Configuration Example

mqtt:
  server:
    enabled: true
    ip: 127.0.0.1
    port: 5883
    name: Mica-Mqtt-Server
    buffer-allocator: HEAP
    heartbeat-timeout: 120000
    read-buffer-size: 8092
    max-bytes-in-message: 8092
    debug: true
    websocket-enable: true
    websocket-port: 8083

5.3 Server Interfaces (register as Spring beans)

IMqttServerAuthHandler – client authentication (required)

IMqttMessageListener – message listening (required)

IMqttConnectStatusListener – connection status (required)

IMqttSessionManager – session management (optional)

IMqttMessageStore – store will and retained messages (cluster mode)

AbstractMqttMessageDispatcher – message forwarding (cluster mode)

IpStatListener – t‑io IP status (optional)

5.4 Optional Server Customizer

@Configuration(proxyBeanMethods = false)
public class MqttServerCustomizerConfiguration {
    @Bean
    public MqttServerCustomizer activeRecordPluginCustomizer() {
        return new MqttServerCustomizer() {
            @Override
            public void customize(MqttServerCreator creator) {
                // custom configuration overrides yaml
                System.out.println("----MqttServerCustomizer----");
            }
        };
    }
}

5.5 Using MqttServerTemplate

import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;

@Service
public class ServerService {
    @Autowired
    private MqttServerTemplate server;

    public boolean publish(String body) {
        server.publishAll("/test/123", ByteBuffer.wrap(body.getBytes()));
        return true;
    }
}

5.6 Cluster Message Broadcasting via MQ

Implement IMqttConnectStatusListener for device status storage.

Implement IMqttMessageListener to forward messages to MQ.

Implement IMqttMessageStore for will and retained storage.

Implement AbstractMqttMessageDispatcher to dispatch messages through MQ back to the MQTT cluster.

Business messages are sent to MQ, broadcast to the MQTT cluster, and finally delivered to devices.

5.7 Prometheus + Grafana Monitoring

t‑io provides t‑iostat metrics. Supported metrics include connection counts, current connections, handled packets/bytes, received packets/bytes, and sent packets/bytes.

6. Plain Java Project Integration

6.1 Maven Dependency

<dependency>
    <groupId>net.dreamlu</groupId>
    <artifactId>mica-mqtt-core</artifactId>
    <version>1.0.3</version>
</dependency>

6.2 Client Example

// Initialize MQTT client
MqttClient client = MqttClient.create()
    .ip("127.0.0.1")
    .port(1883)
    .username("admin")
    .password("123456")
    .version(MqttVersion.MQTT_5)
    .clientId("xxxxxx")
    .connect();

// Subscribe
client.subQos0("/test/#", (topic, payload) -> {
    logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
});

// Publish
client.publish("/test/client", ByteBuffer.wrap("mica最牛皮".getBytes(StandardCharsets.UTF_8)));

// Disconnect, reconnect, stop
client.disconnect();
client.reconnect();
client.stop();

6.3 Server Example

// Increase stack size for many connections: -Xss129k
MqttServer mqttServer = MqttServer.create()
    .ip("127.0.0.1")
    .port(1883)
    .readBufferSize(512)
    .authHandler((clientId, userName, password) -> true)
    .messageListener((clientId, topic, mqttQoS, payload) -> {
        logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload));
    })
    .useSsl("", "", "")
    .connectStatusListener(new IMqttConnectStatusListener() {
        @Override public void online(String clientId) {}
        @Override public void offline(String clientId) {}
    })
    .messageDispatcher(new IMqttMessageDispatcher() {
        @Override public void config(MqttServer mqttServer) {}
        @Override public boolean send(Message message) { return false; }
        @Override public boolean send(String clientId, Message message) { return false; }
    })
    .debug()
    .start();

// Publish to a client
mqttServer.publish("clientId", "/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
// Publish to all listeners of a topic
mqttServer.publishAll("/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
// Stop server
mqttServer.stop();
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaSpring Bootgraalvmt-io
Java Architecture Diary
Written by

Java Architecture Diary

Committed to sharing original, high‑quality technical articles; no fluff or promotional content.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.