Big Data 12 min read

Synchronizing MySQL Data to Elasticsearch Using Logstash

This tutorial explains how to set up the environment, configure Elasticsearch and Logstash, create the necessary MySQL tables, and use a Logstash pipeline to continuously sync MySQL records into an Elasticsearch index, while also covering common pitfalls and troubleshooting steps.

Code Ape Tech Column
Code Ape Tech Column
Code Ape Tech Column
Synchronizing MySQL Data to Elasticsearch Using Logstash

Environment Preparation

The required tools and their versions are listed in the table: JDK 1.8, MySQL 8.0.26, Elasticsearch 7.17.11, and Logstash 7.17.10, with download links for both Linux (ARM) and Windows platforms.

Elasticsearch Installation

Extract the tarball, create a data directory, and modify config/elasticsearch.yml to set cluster name, node name, data and log paths, network host, HTTP port, and initial master node.

tar -zxvf elasticsearch-7.17.11-linux-aarch64.tar.gz -C /usr/software
mv /usr/software/elasticsearch-7.17.11-linux-aarch64 /usr/software/elasticsearch-7.17.11
cd /usr/software/elasticsearch-7.17.11/
mkdir data
vi config/elasticsearch.yml
# Example configuration snippets
cluster.name: xxx
node.name: node-1
path.data: /usr/software/elasticsearch-7.17.11/data
path.logs: /usr/software/elasticsearch-7.17.11/logs
network.host: 0.0.0.0
http.port: 9200
cluster.initial_master_nodes: ["node-1"]

Open the required ports and adjust firewall settings:

firewall-cmd --add-port=9300/tcp --permanent
firewall-cmd --add-port=9200/tcp --permanent
firewall-cmd --reload
systemctl restart firewalld

Create a non‑root user to run Elasticsearch and assign ownership:

# Create user
useradd elastic
# Grant ownership
chown -R elastic /usr/software/elasticsearch-7.17.11/

If the server has limited memory, adjust the JVM heap size in config/jvm.options (e.g., -Xms256m and -Xmx256m).

Increase system limits for virtual memory and file descriptors:

# vm.max_map_count
vi /etc/sysctl.conf
# add at the end
vm.max_map_count=655360
sysctl -p

# file descriptors
vi /etc/security/limits.conf
* soft nofile 65536
* hard nofile 131072
* soft nproc 2048
* soft nproc 4096

Start Elasticsearch in the background:

su elastic
cd /usr/software/elasticsearch-7.17.11/bin/
./elasticsearch -d

Logstash Installation

Extract Logstash and create a dedicated directory for MySQL pipeline files:

tar -zxvf logstash-7.17.10-linux-aarch64.tar.gz -C /usr/software
mv /usr/software/logstash-7.17.10-linux-aarch64 /usr/software/logstash-7.17.10
cd /usr/software/logstash-7.17.10
mkdir mysql

Download the MySQL JDBC driver:

wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar

Logstash Pipeline Configuration

Create a .conf file (e.g., jdbc.conf) inside the mysql folder with the following sections:

input {
  jdbc {
    type => "mytest_user"
    jdbc_driver_library => "/usr/software/logstash-7.17.10/mysql/mysql-connector-java-8.0.30.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false"
    jdbc_user => "root"
    jdbc_password => "123456"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "500"
    record_last_run => true
    last_run_metadata_path => "/usr/software/logstash-7.17.10/mysql/last_run_metadata_update_time.txt"
    tracking_column => "update_time"
    use_column_value => "true"
    tracking_column_type => "timestamp"
    schedule => "*/5 * * * * *"
    statement => "SELECT * FROM user WHERE update_time > :sql_last_value AND update_time < NOW()"
    clean_run => "false"
  }
}
filter {
  // No filters needed for this simple sync
}
output {
  if [type] == "mytest_user" {
    elasticsearch {
      hosts => ["127.0.0.1:9200"]
      index => "user"
      document_id => "%{id}"
    }
  }
  stdout { codec => json_lines }
}

Note that absolute paths must be used for jdbc_driver_library and last_run_metadata_path to avoid permission issues.

Data Synchronization

Create the MySQL user table (including fields for id, username, age, gender, create_time, update_time) and insert sample data.

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
  `id` int NOT NULL AUTO_INCREMENT,
  `username` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '用户名',
  `age` int DEFAULT NULL COMMENT '年龄',
  `gender` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '性别',
  `create_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '创建时间',
  `update_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
SET FOREIGN_KEY_CHECKS = 1;

Start Logstash with the pipeline configuration:

cd /usr/software/logstash-7.17.10
bin/logstash -f mysql/jdbc.conf

Insert a record into the MySQL user table and verify that it appears in the Elasticsearch user index via the Kibana/Head plugin or any ES UI.

Common Issues

Deletion not synced: Elasticsearch does not automatically delete documents when a MySQL row is removed. Implement a soft‑delete flag (e.g., is_deleted) and sync its value.

Memory errors: Adjust JVM heap size or ensure sufficient system memory before starting Elasticsearch.

System limits: Increase vm.max_map_count and file descriptor limits as shown above.

Conclusion

The guide demonstrates a complete end‑to‑end setup for real‑time MySQL‑to‑Elasticsearch synchronization using Logstash, covering environment preparation, configuration, execution, and troubleshooting.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

ElasticsearchLinuxmysqldata synchronizationETLLogstash
Code Ape Tech Column
Written by

Code Ape Tech Column

Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.