Routing and Distribution Layers in EMQX 3.0: Architecture and Implementation Details
This article explains how EMQX 3.0 maintains routing and distribution layers, detailing the subscriber tables, shared‑subscription handling, topic‑trie structures, route look‑ups, and the Erlang code that drives message matching and dispatch across a clustered broker.
Previous article focused on Conn and Session processes; this article explains how EMQX maintains routing and distribution layers to organize distributed services and deliver messages.
Routing Layer maintains subscriber and subscription tables (emqx_suboption, emqx_subscription, emqx_subscriber, etc.) and dispatches messages to sessions, which then deliver according to QoS. It distinguishes ordinary and shared subscriptions, with tables such as emqx_suboption , emqx_subscription , emqx_shared_subscription , emqx_shared_subscriber , and emqx_alive_shared_subscribers . The tables store keys like {Topic, SPid} and values describing subscription maps.
Example subscription data:
subscriptions = #{<<"test/set/#">> => #{nl => 0, pktid => 1, qos => 1, rap => 0, rc => 0, rh => 0, share => <<"[email protected]">>, subid => <<"broker/tcp">>}}Shared subscriptions use additional tables to track groups and ensure only one subscriber receives a message.
Distribution Layer maintains a global topic trie and route table. The trie stores hierarchical topic nodes; the route table maps topics to destination nodes. When a topic contains wildcards, entries are written to emqx_trie and emqx_trie_node .
Insertion of a wildcard topic “test/set/#” results in trie records such as:
#trie{edge = #trie_edge{node_id = root, word = <<"test">>}, node_id = <<"test">>}
#trie{edge = #trie_edge{node_id = <<"test">>, word = <<"set">>}, node_id = <<"test/set">>}
#trie{edge = #trie_edge{node_id = <<"test/set">>, word = '#'}, node_id = <<"test/set/#">>}Trie nodes are stored in emqx_trie_node with fields node_id , edge_count , topic , etc.
Publishing flow: the broker splits the topic, searches the trie and trie_node tables to find matching topic filters, then looks up emqx_route to obtain destination sessions. For ordinary subscriptions it forwards to the session process; for shared subscriptions it selects a subscriber from emqx_shared_subscription .
Key Erlang functions for trie matching are shown:
match(Topic) when is_binary(Topic) ->
TrieNodes = match_node(root, emqx_topic:words(Topic)),
[Name || #trie_node{topic = Name} <- TrieNodes, Name =/= undefined].
match_node(NodeId, [W|Words]) ->
lists:foldl(fun(WArg, Acc) ->
case mnesia:read(?TRIE, #trie_edge{node_id = NodeId, word = WArg}) of
[#trie{node_id = ChildId}] -> match_node(ChildId, Words, Acc);
[] -> Acc
end
end, match_#(NodeId, []), [W, '+']).
'match_#'(NodeId, ResAcc) ->
case mnesia:read(?TRIE, #trie_edge{node_id = NodeId, word = '#'}) of
[#trie{node_id = ChildId}] -> mnesia:read(?TRIE_NODE, ChildId) ++ ResAcc;
[] -> ResAcc
end.The article concludes with a brief overview of EMQX 3.0’s routing and distribution mechanisms and points to further detailed documentation.
360 Smart Cloud
Official service account of 360 Smart Cloud, dedicated to building a high-quality, secure, highly available, convenient, and stable one‑stop cloud service platform.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.