How to Build a Real‑Time Multi‑Tenant Site Search with MySQL, Kafka, and Elasticsearch

This article explains how to design and implement a real‑time, multi‑tenant site search system by migrating data from MySQL to Elasticsearch using Kafka and MySQL binlog, covering architecture decisions, data schema, Docker deployment, and core SpringBoot/Kotlin code for event collection and indexing.

JD Cloud Developers
JD Cloud Developers
JD Cloud Developers
How to Build a Real‑Time Multi‑Tenant Site Search with MySQL, Kafka, and Elasticsearch

We need a site‑wide search feature for a multi‑tenant, multi‑product web application that updates in real time. The article discusses the core infrastructure and technology stack required to achieve this.

Problem Definition and Decisions

Using MySQL as the primary database, two main options were considered for search:

Query keywords directly in MySQL with LIKE patterns (e.g., %word1%word2%).

Adopt a dedicated search engine such as Elasticsearch.

Because the application is multi‑tenant and involves many relational tables, direct MySQL queries would require scanning multiple tables and cannot handle real‑time updates or deletions cleanly. Therefore, the MySQL‑only approach was discarded.

To keep data synchronized with Elasticsearch in real time, three strategies were evaluated:

Periodically poll MySQL and push changes to Elasticsearch.

Write to both MySQL and Elasticsearch from the application using the Elasticsearch client.

Capture MySQL changes as events (e.g., via binlog) and stream them to a processing layer.

Option 1 is not real‑time and adds heavy load; option 2 couples the application tightly to Elasticsearch and risks inconsistency; the chosen solution is option 3 – an event‑driven pipeline based on MySQL binlog.

Service Overview

The search service exposes a unified API. Search results consist of:

searchable : the title and content that are actually indexed.

metadata : tenant ID, product type, timestamps, and creator information used for filtering.

raw : optional raw content for custom rendering.

The Elasticsearch document structure is:

{
  "searchable": {
    "title": "string",
    "content": "string"
  },
  "metadata": {
    "tenant_id": "long",
    "type": "long",
    "created_at": "date",
    "created_by": "string",
    "updated_at": "date",
    "updated_by": "string"
  },
  "raw": {}
}

Infrastructure

We use Apache Kafka as a durable event stream for MySQL changes. The mysql-binlog-connector-java library reads binlog events and publishes them to Kafka. A separate consumer service reads from Kafka, transforms the payload, and indexes it into Elasticsearch.

Key Q&A:

Q: Why not use an Elasticsearch connector?
A: Our product documents are stored in an external object store, so the connector cannot access the full text.

Q: Why not preprocess data before sending to Kafka?
A: That would duplicate the data on disk and waste Kafka storage.

Q: Why use a custom binlog collector instead of Filebeat?
A: In many environments we cannot install agents on the MySQL host, so a client‑server approach is more portable.

Technology Stack Configuration

All services are containerized with Docker‑Compose. The development setup uses a single‑node MySQL, Zookeeper, Kafka, and Elasticsearch without authentication.

version: "3"
services:
  mysql:
    image: mysql:5.7
    container_name: mysql
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: app
    ports:
      - "3306:3306"
    volumes:
      - mysql:/var/lib/mysql
  zookeeper:
    image: bitnami/zookeeper:3.6.2
    container_name: zookeeper
    ports:
      - "2181:2181"
    volumes:
      - zookeeper:/bitnami
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: bitnami/kafka:2.7.0
    container_name: kafka
    ports:
      - "9092:9092"
    volumes:
      - kafka:/bitnami
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.11.0
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
    volumes:
      - elasticsearch:/usr/share/elasticsearch/data
    ports:
      - "9200:9200"
volumes:
  mysql:
    driver: local
  zookeeper:
    driver: local
  kafka:
    driver: local
  elasticsearch:
    driver: local

After the containers start, an Elasticsearch index is created via a simple curl request:

curl "http://localhost:9200/search" -XPUT -d '{
  "mappings": {
    "properties": {
      "searchable": {
        "type": "nested",
        "properties": {
          "title": {"type": "text"},
          "content": {"type": "text"}
        }
      },
      "metadata": {
        "type": "nested",
        "properties": {
          "tenant_id": {"type": "long"},
          "type": {"type": "integer"},
          "created_at": {"type": "date"},
          "created_by": {"type": "keyword"},
          "updated_at": {"type": "date"},
          "updated_by": {"type": "keyword"}
        }
      },
      "raw": {"type": "nested"}
    }
  }
}'

Core Code Implementation (SpringBoot + Kotlin)

Binlog Collector – reads MySQL binlog events, filters for table‑map and row‑mutation events, maps them to a domain model, and publishes JSON to Kafka.

override fun run() {
    client.serverId = properties.serverId
    val eventDeserializer = EventDeserializer()
    eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG)
    client.setEventDeserializer(eventDeserializer)
    client.registerEventListener { event ->
        val header = event.getHeader<EventHeader>()
        val data = event.getData<EventData>()
        if (header.eventType == EventType.TABLE_MAP) {
            tableRepository.updateTable(Table.of(data as TableMapEventData))
        } else if (EventType.isRowMutation(header.eventType)) {
            val events = when {
                EventType.isWrite(header.eventType) -> mapper.map(data as WriteRowsEventData)
                EventType.isUpdate(header.eventType) -> mapper.map(data as UpdateRowsEventData)
                EventType.isDelete(header.eventType) -> mapper.map(data as DeleteRowsEventData)
                else -> emptyList()
            }
            events.forEach { kafkaTemplate.send("binlog", objectMapper.writeValueAsString(it)) }
        }
    }
    client.connect()
}

Kafka Listener (Event Processor) – consumes binlog messages and forwards them to a handler.

@Component
class KafkaBinlogTopicListener(val binlogEventHandler: BinlogEventHandler) {
    private val logger = LoggerFactory.getLogger(KafkaBinlogTopicListener::class.java)
    private val objectMapper = jacksonObjectMapper()

    @KafkaListener(topics = ["binlog"])
    fun process(message: String) {
        val binlogEvent = objectMapper.readValue<BinlogEvent>(message)
        logger.info("Consume binlog event: {}", binlogEvent)
        binlogEventHandler.handle(binlogEvent)
    }
}

Elasticsearch Indexer – creates, updates, or deletes documents in the search index based on the event type.

@Component
class ElasticsearchIndexerBinlogEventHandler(val restHighLevelClient: RestHighLevelClient) : BinlogEventHandler {
    override fun handle(binlogEvent: BinlogEvent) {
        val payload = binlogEvent.payload as Map<*, *>
        val documentId = "${binlogEvent.database}_${binlogEvent.table}_${payload["id"]}"
        if (binlogEvent.eventType == EVENT_TYPE_DELETE) {
            val deleteRequest = DeleteRequest().index("search").id(documentId)
            restHighLevelClient.delete(deleteRequest, DEFAULT)
        } else {
            val indexRequest = IndexRequest().index("search").id(documentId).source(
                mapOf(
                    "searchable" to mapOf(
                        "title" to payload["title"],
                        "content" to payload["content"]
                    ),
                    "metadata" to mapOf(
                        "tenantId" to payload["tenantId"],
                        "type" to payload["type"],
                        "createdAt" to payload["createdAt"],
                        "createdBy" to payload["createdBy"],
                        "updatedAt" to payload["updatedAt"],
                        "updatedBy" to payload["updatedBy"]
                    )
                )
            )
            restHighLevelClient.index(indexRequest, DEFAULT)
        }
    }
}

The overall pipeline ensures that any insert, update, or delete in MySQL is reflected in Elasticsearch almost instantly, providing a fast, tenant‑aware search experience.

In summary, while many open‑source binlog processors exist, this manual approach demonstrates a flexible pattern that can be adapted to other databases or custom requirements.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

DockerElasticsearchKafkaKotlinmysqlSpringBootSearch
JD Cloud Developers
Written by

JD Cloud Developers

JD Cloud Developers (Developer of JD Technology) is a JD Technology Group platform offering technical sharing and communication for AI, cloud computing, IoT and related developers. It publishes JD product technical information, industry content, and tech event news. Embrace technology and partner with developers to envision the future.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.