How to Build a Real‑Time Multi‑Tenant Site Search with MySQL, Kafka, and Elasticsearch
This article explains how to design and implement a real‑time, multi‑tenant site search system by migrating data from MySQL to Elasticsearch using Kafka and MySQL binlog, covering architecture decisions, data schema, Docker deployment, and core SpringBoot/Kotlin code for event collection and indexing.
We need a site‑wide search feature for a multi‑tenant, multi‑product web application that updates in real time. The article discusses the core infrastructure and technology stack required to achieve this.
Problem Definition and Decisions
Using MySQL as the primary database, two main options were considered for search:
Query keywords directly in MySQL with LIKE patterns (e.g., %word1%word2%).
Adopt a dedicated search engine such as Elasticsearch.
Because the application is multi‑tenant and involves many relational tables, direct MySQL queries would require scanning multiple tables and cannot handle real‑time updates or deletions cleanly. Therefore, the MySQL‑only approach was discarded.
To keep data synchronized with Elasticsearch in real time, three strategies were evaluated:
Periodically poll MySQL and push changes to Elasticsearch.
Write to both MySQL and Elasticsearch from the application using the Elasticsearch client.
Capture MySQL changes as events (e.g., via binlog) and stream them to a processing layer.
Option 1 is not real‑time and adds heavy load; option 2 couples the application tightly to Elasticsearch and risks inconsistency; the chosen solution is option 3 – an event‑driven pipeline based on MySQL binlog.
Service Overview
The search service exposes a unified API. Search results consist of:
searchable : the title and content that are actually indexed.
metadata : tenant ID, product type, timestamps, and creator information used for filtering.
raw : optional raw content for custom rendering.
The Elasticsearch document structure is:
{
"searchable": {
"title": "string",
"content": "string"
},
"metadata": {
"tenant_id": "long",
"type": "long",
"created_at": "date",
"created_by": "string",
"updated_at": "date",
"updated_by": "string"
},
"raw": {}
}Infrastructure
We use Apache Kafka as a durable event stream for MySQL changes. The mysql-binlog-connector-java library reads binlog events and publishes them to Kafka. A separate consumer service reads from Kafka, transforms the payload, and indexes it into Elasticsearch.
Key Q&A:
Q: Why not use an Elasticsearch connector?
A: Our product documents are stored in an external object store, so the connector cannot access the full text.
Q: Why not preprocess data before sending to Kafka?
A: That would duplicate the data on disk and waste Kafka storage.
Q: Why use a custom binlog collector instead of Filebeat?
A: In many environments we cannot install agents on the MySQL host, so a client‑server approach is more portable.Technology Stack Configuration
All services are containerized with Docker‑Compose. The development setup uses a single‑node MySQL, Zookeeper, Kafka, and Elasticsearch without authentication.
version: "3"
services:
mysql:
image: mysql:5.7
container_name: mysql
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: app
ports:
- "3306:3306"
volumes:
- mysql:/var/lib/mysql
zookeeper:
image: bitnami/zookeeper:3.6.2
container_name: zookeeper
ports:
- "2181:2181"
volumes:
- zookeeper:/bitnami
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: bitnami/kafka:2.7.0
container_name: kafka
ports:
- "9092:9092"
volumes:
- kafka:/bitnami
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.11.0
container_name: elasticsearch
environment:
- discovery.type=single-node
volumes:
- elasticsearch:/usr/share/elasticsearch/data
ports:
- "9200:9200"
volumes:
mysql:
driver: local
zookeeper:
driver: local
kafka:
driver: local
elasticsearch:
driver: localAfter the containers start, an Elasticsearch index is created via a simple curl request:
curl "http://localhost:9200/search" -XPUT -d '{
"mappings": {
"properties": {
"searchable": {
"type": "nested",
"properties": {
"title": {"type": "text"},
"content": {"type": "text"}
}
},
"metadata": {
"type": "nested",
"properties": {
"tenant_id": {"type": "long"},
"type": {"type": "integer"},
"created_at": {"type": "date"},
"created_by": {"type": "keyword"},
"updated_at": {"type": "date"},
"updated_by": {"type": "keyword"}
}
},
"raw": {"type": "nested"}
}
}
}'Core Code Implementation (SpringBoot + Kotlin)
Binlog Collector – reads MySQL binlog events, filters for table‑map and row‑mutation events, maps them to a domain model, and publishes JSON to Kafka.
override fun run() {
client.serverId = properties.serverId
val eventDeserializer = EventDeserializer()
eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG)
client.setEventDeserializer(eventDeserializer)
client.registerEventListener { event ->
val header = event.getHeader<EventHeader>()
val data = event.getData<EventData>()
if (header.eventType == EventType.TABLE_MAP) {
tableRepository.updateTable(Table.of(data as TableMapEventData))
} else if (EventType.isRowMutation(header.eventType)) {
val events = when {
EventType.isWrite(header.eventType) -> mapper.map(data as WriteRowsEventData)
EventType.isUpdate(header.eventType) -> mapper.map(data as UpdateRowsEventData)
EventType.isDelete(header.eventType) -> mapper.map(data as DeleteRowsEventData)
else -> emptyList()
}
events.forEach { kafkaTemplate.send("binlog", objectMapper.writeValueAsString(it)) }
}
}
client.connect()
}Kafka Listener (Event Processor) – consumes binlog messages and forwards them to a handler.
@Component
class KafkaBinlogTopicListener(val binlogEventHandler: BinlogEventHandler) {
private val logger = LoggerFactory.getLogger(KafkaBinlogTopicListener::class.java)
private val objectMapper = jacksonObjectMapper()
@KafkaListener(topics = ["binlog"])
fun process(message: String) {
val binlogEvent = objectMapper.readValue<BinlogEvent>(message)
logger.info("Consume binlog event: {}", binlogEvent)
binlogEventHandler.handle(binlogEvent)
}
}Elasticsearch Indexer – creates, updates, or deletes documents in the search index based on the event type.
@Component
class ElasticsearchIndexerBinlogEventHandler(val restHighLevelClient: RestHighLevelClient) : BinlogEventHandler {
override fun handle(binlogEvent: BinlogEvent) {
val payload = binlogEvent.payload as Map<*, *>
val documentId = "${binlogEvent.database}_${binlogEvent.table}_${payload["id"]}"
if (binlogEvent.eventType == EVENT_TYPE_DELETE) {
val deleteRequest = DeleteRequest().index("search").id(documentId)
restHighLevelClient.delete(deleteRequest, DEFAULT)
} else {
val indexRequest = IndexRequest().index("search").id(documentId).source(
mapOf(
"searchable" to mapOf(
"title" to payload["title"],
"content" to payload["content"]
),
"metadata" to mapOf(
"tenantId" to payload["tenantId"],
"type" to payload["type"],
"createdAt" to payload["createdAt"],
"createdBy" to payload["createdBy"],
"updatedAt" to payload["updatedAt"],
"updatedBy" to payload["updatedBy"]
)
)
)
restHighLevelClient.index(indexRequest, DEFAULT)
}
}
}The overall pipeline ensures that any insert, update, or delete in MySQL is reflected in Elasticsearch almost instantly, providing a fast, tenant‑aware search experience.
In summary, while many open‑source binlog processors exist, this manual approach demonstrates a flexible pattern that can be adapted to other databases or custom requirements.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
JD Cloud Developers
JD Cloud Developers (Developer of JD Technology) is a JD Technology Group platform offering technical sharing and communication for AI, cloud computing, IoT and related developers. It publishes JD product technical information, industry content, and tech event news. Embrace technology and partner with developers to envision the future.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
