Synchronizing MySQL Data to Elasticsearch Using Logstash
This tutorial explains how to set up the environment, configure Elasticsearch and Logstash, create the necessary MySQL tables, and use a Logstash pipeline to continuously sync MySQL records into an Elasticsearch index, while also covering common pitfalls and troubleshooting steps.
Environment Preparation
The required tools and their versions are listed in the table: JDK 1.8, MySQL 8.0.26, Elasticsearch 7.17.11, and Logstash 7.17.10, with download links for both Linux (ARM) and Windows platforms.
Elasticsearch Installation
Extract the tarball, create a data directory, and modify config/elasticsearch.yml to set cluster name, node name, data and log paths, network host, HTTP port, and initial master node.
tar -zxvf elasticsearch-7.17.11-linux-aarch64.tar.gz -C /usr/software
mv /usr/software/elasticsearch-7.17.11-linux-aarch64 /usr/software/elasticsearch-7.17.11
cd /usr/software/elasticsearch-7.17.11/
mkdir data
vi config/elasticsearch.yml
# Example configuration snippets
cluster.name: xxx
node.name: node-1
path.data: /usr/software/elasticsearch-7.17.11/data
path.logs: /usr/software/elasticsearch-7.17.11/logs
network.host: 0.0.0.0
http.port: 9200
cluster.initial_master_nodes: ["node-1"]Open the required ports and adjust firewall settings:
firewall-cmd --add-port=9300/tcp --permanent
firewall-cmd --add-port=9200/tcp --permanent
firewall-cmd --reload
systemctl restart firewalldCreate a non‑root user to run Elasticsearch and assign ownership:
# Create user
useradd elastic
# Grant ownership
chown -R elastic /usr/software/elasticsearch-7.17.11/If the server has limited memory, adjust the JVM heap size in config/jvm.options (e.g., -Xms256m and -Xmx256m).
Increase system limits for virtual memory and file descriptors:
# vm.max_map_count
vi /etc/sysctl.conf
# add at the end
vm.max_map_count=655360
sysctl -p
# file descriptors
vi /etc/security/limits.conf
* soft nofile 65536
* hard nofile 131072
* soft nproc 2048
* soft nproc 4096Start Elasticsearch in the background:
su elastic
cd /usr/software/elasticsearch-7.17.11/bin/
./elasticsearch -dLogstash Installation
Extract Logstash and create a dedicated directory for MySQL pipeline files:
tar -zxvf logstash-7.17.10-linux-aarch64.tar.gz -C /usr/software
mv /usr/software/logstash-7.17.10-linux-aarch64 /usr/software/logstash-7.17.10
cd /usr/software/logstash-7.17.10
mkdir mysqlDownload the MySQL JDBC driver:
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jarLogstash Pipeline Configuration
Create a .conf file (e.g., jdbc.conf) inside the mysql folder with the following sections:
input {
jdbc {
type => "mytest_user"
jdbc_driver_library => "/usr/software/logstash-7.17.10/mysql/mysql-connector-java-8.0.30.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false"
jdbc_user => "root"
jdbc_password => "123456"
jdbc_paging_enabled => "true"
jdbc_page_size => "500"
record_last_run => true
last_run_metadata_path => "/usr/software/logstash-7.17.10/mysql/last_run_metadata_update_time.txt"
tracking_column => "update_time"
use_column_value => "true"
tracking_column_type => "timestamp"
schedule => "*/5 * * * * *"
statement => "SELECT * FROM user WHERE update_time > :sql_last_value AND update_time < NOW()"
clean_run => "false"
}
}
filter {
// No filters needed for this simple sync
}
output {
if [type] == "mytest_user" {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "user"
document_id => "%{id}"
}
}
stdout { codec => json_lines }
}Note that absolute paths must be used for jdbc_driver_library and last_run_metadata_path to avoid permission issues.
Data Synchronization
Create the MySQL user table (including fields for id, username, age, gender, create_time, update_time) and insert sample data.
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
`id` int NOT NULL AUTO_INCREMENT,
`username` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '用户名',
`age` int DEFAULT NULL COMMENT '年龄',
`gender` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '性别',
`create_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '创建时间',
`update_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
SET FOREIGN_KEY_CHECKS = 1;Start Logstash with the pipeline configuration:
cd /usr/software/logstash-7.17.10
bin/logstash -f mysql/jdbc.confInsert a record into the MySQL user table and verify that it appears in the Elasticsearch user index via the Kibana/Head plugin or any ES UI.
Common Issues
Deletion not synced: Elasticsearch does not automatically delete documents when a MySQL row is removed. Implement a soft‑delete flag (e.g., is_deleted) and sync its value.
Memory errors: Adjust JVM heap size or ensure sufficient system memory before starting Elasticsearch.
System limits: Increase vm.max_map_count and file descriptor limits as shown above.
Conclusion
The guide demonstrates a complete end‑to‑end setup for real‑time MySQL‑to‑Elasticsearch synchronization using Logstash, covering environment preparation, configuration, execution, and troubleshooting.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Code Ape Tech Column
Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
