Design and Implementation of a Go-Based MySQL-to-MySQL Data Synchronization System
This article presents a Go-based MySQL-to-MySQL data synchronization solution built on go-mysql-transfer, detailing its architecture, multi‑threaded design, binlog handling, DDL/DML ordering, and code implementation for reliable, ordered, and fault‑tolerant replication.
The article begins by describing the common need for data synchronization in modern software architectures, especially for decoupling micro‑service code, handling high concurrency, and supporting data analysis. It distinguishes heterogeneous synchronization (e.g., MySQL → ES/Redis) from same‑type synchronization (MySQL → MySQL) and notes the limitations of existing tools like Alibaba Cloud's Canal.
To address these issues, the authors forked the open‑source go-mysql-transfer project and added multi‑threaded MySQL‑to‑MySQL synchronization with row‑level ordered consistency. The source code is available at github.com/j262965682/goMysqlSync .
Logical Architecture : The system starts a Go‑based Canal service to pull binlog events from the source, parses a configuration file to define synchronization scope, and routes events to either a DDL channel or multiple DML channels based on a hash of the primary key. DDL events block DML channels to preserve ordering, and a SQL merging step improves execution performance on the target side.
Code Analysis – Program Entry :
func main() {
// parse flags
if helpFlag {
flag.Usage()
return
}
// start Prometheus web server
go http.ListenAndServe(":9999", nil)
// initialize services and start sync
err := service.InitApplication(cfgPath)
if err != nil {
println(errors.WithStack(err))
return
}
global.StartMonitor()
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Kill, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
if global.Cfg().NotCluster() {
service.StartApplication()
} else {
service.BootCluster()
}
select {
case sig := <-signalChan:
log.Printf("Application Stop,Signal: %s \n", sig.String())
case <-service.CtxDone():
log.Printf("context is done with %v, closing", service.CtxErr())
}
service.CloseApplication()
}Service Initialization parses the configuration, sets up logging, initializes storage (using BoltDB to persist binlog positions), and creates the TransferService which in turn initializes the Canal instance, rule engine, and endpoint.
func InitApplication(cfgPath string) error {
cfg, err := global.NewConfigWithFile(cfgPath)
if err != nil { return err }
err = logutil.InitGlobalLogger(cfg.LoggerConfig)
if err != nil { return err }
err = storage.InitStorage(cfg)
if err != nil { return err }
transferService := &TransferService{config: cfg}
err = transferService.initialize()
if err != nil { return err }
return nil
}Transfer Service Initialization creates the Canal, loads table‑level rules, starts the endpoint, and prepares a hash‑map of channels for concurrent DML processing.
func (s *TransferService) initialize() error {
if err := s.initCanal(); err != nil { return errors.WithStack(err) }
if err := s.initRules(); err != nil { return errors.WithStack(err) }
_endpoint := endpoint.NewEndpoint(s.config, s.canal)
if err := _endpoint.Start(); err != nil { return errors.WithStack(err) }
global.SetDestinationState(global.MetricsStateOK)
s.endpoint = _endpoint
// start full‑load sync, init dumper, create hash map, etc.
return nil
}Service Run launches a goroutine to listen on the request queue, then runs the Canal from the saved binlog position. DDL and DML events are dispatched to their respective worker goroutines.
func (s *TransferService) run() error {
s.wg.Add(1)
s.handler.startRequestQueueListener()
if err = s.canal.RunFrom(current); err != nil {
logutil.Errorf("start transfer : %v", err)
s.cancelFunc()
return errors.Trace(err)
}
s.running.Store(false)
logutil.Info("Canal is Closed")
return nil
}Request Queue Listener creates a dedicated DDL channel and multiple DML channels (one per thread). It uses a global flag protected by a mutex to ensure DDL events are processed exclusively before any DML events resume.
go func() {
DDLMessage := make(chan *global.RowRequest, bulkSize)
global.GlobalChangeChan = global.ChangeChan{DdlControl: false, DdlControlChan: make(chan struct{})}
for i := 0; i < h.transfer.config.Threads; i++ {
go h.transfer.endpoint.Consume(i, h.hashMap.Array[i], global.GlobalChangeChan)
}
go h.transfer.endpoint.Consume(100, DDLMessage, global.GlobalChangeChan)
for {
if global.GlobalChangeChan.DdlControl {
// handle DDL
} else {
// handle DML, record position, flush to storage, etc.
}
}
}()Overall Synchronization Logic consists of three stages:
Schema Synchronization : Checks information_schema.COLUMNS for primary keys, extracts CREATE TABLE statements, and applies them on the target with SET FOREIGN_KEY_CHECKS = 0/1 to avoid FK conflicts.
Full Data Load : Records the current binlog position, then pulls data in batches using a paginated SELECT id FROM table ORDER BY id LIMIT offset, batchSize query. Rows are converted to bulk INSERT statements and dispatched to a thread pool (configured via dump_threads and dump_record_rows ) using the ants library.
Incremental Sync : Uses the saved position to resume binlog streaming, separates DDL and DML into different goroutine groups, applies row‑level idempotent transformations (e.g., INSERT IGNORE , DELETE/UPDATE by primary key), and employs a hash‑based write‑set algorithm to achieve row‑level ordered concurrency.
The article also discusses MySQL’s native parallel replication (logical‑clock based) and how the presented solution mimics its behavior by hashing rows to threads, merging compatible SQL statements, and ensuring ordered execution to avoid conflicts.
In summary, the solution demonstrates a complete, multi‑threaded, ordered, and fault‑tolerant MySQL‑to‑MySQL data synchronization framework that can serve as a reference for building custom replication tools.
政采云技术
ZCY Technology Team (Zero), based in Hangzhou, is a growth-oriented team passionate about technology and craftsmanship. With around 500 members, we are building comprehensive engineering, project management, and talent development systems. We are committed to innovation and creating a cloud service ecosystem for government and enterprise procurement. We look forward to your joining us.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.