How to Parse MySQL Binlog with Go: Build a Simple Replication Demo
This article demonstrates how to use Go to parse MySQL binlog events and synchronize data by implementing a lightweight replication client, covering configuration, server module, packet handling, handshake, registration, dump command, and providing complete source code examples with explanations.
The author demonstrates how to use Go to parse MySQL binlog and synchronize data.
Background
This is a demo‑oriented binlog parsing tool written in Go. In simple terms it simulates the MySQL binlog protocol, creates a service that acts as a MySQL replica, receives binlog events, similar to the Java project Canal.
Practice
Process and Structure
The execution mainly involves the server module. First it connects to MySQL (referencing the kingshard middleware), disables checksum, registers as a slave, sends the binlog_dump command, then listens for binlog data and uses go-mysql methods to parse the events and print them.
Code
Configuration part, describing the binlog file, position, and master MySQL credentials.
package app
type Config struct {
Host string
Port int
User string
Pass string
ServerId int
LogFile string
Position int
}Server module – the core part that handles connection, registration, command sending, and binlog retrieval. Parsing uses the go-mysql library.
package app
import (
"bufio"
"bytes"
"context"
"crypto/sha1"
"encoding/binary"
"errors"
"fmt"
"github.com/siddontang/go-mysql/replication"
"io"
"net"
"os"
"time"
)
const (
MinProtocolVersion byte = 10
OK_HEADER byte = 0x00
ERR_HEADER byte = 0xff
EOF_HEADER byte = 0xfe
LocalInFile_HEADER byte = 0xfb
)
const MaxPayloadLength = 1<<24 - 1
type Server struct {
Cfg *Config
Ctx context.Context
conn net.Conn
io *PacketIo
registerSucc bool
}
func (s *Server) Run() {
defer func() { s.Quit() }()
s.dump()
}
func (s *Server) dump() {
err := s.handshake()
if err != nil {
panic(err)
}
s.invalidChecksum()
fmt.Println("dump ...")
s.register()
s.writeDumpCommand()
parser := replication.NewBinlogParser()
for {
data, err := s.io.readPacket()
if err != nil || len(data) == 0 {
continue
}
if data[0] == OK_HEADER {
data = data[1:]
if e, err := parser.Parse(data); err == nil {
e.Dump(os.Stdout)
} else {
fmt.Println(err)
}
} else {
s.io.HandleError(data)
}
}
}
func (s *Server) invalidChecksum() {
sql := `SET @master_binlog_checksum='NONE'`
if err := s.query(sql); err != nil {
fmt.Println(err)
}
_, _ = s.io.readPacket()
}
func (s *Server) handshake() error {
conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", s.Cfg.Host, s.Cfg.Port), 10*time.Second)
if err != nil {
return err
}
tc := conn.(*net.TCPConn)
tc.SetKeepAlive(true)
tc.SetNoDelay(true)
s.conn = tc
s.io = &PacketIo{}
s.io.r = bufio.NewReaderSize(s.conn, 16*1024)
s.io.w = tc
data, err := s.io.readPacket()
if err != nil {
return err
}
if data[0] == ERR_HEADER {
return errors.New("error packet")
}
if data[0] < MinProtocolVersion {
return fmt.Errorf("version is too lower, current:%d", data[0])
}
// ... (handshake parsing omitted for brevity)
return nil
}
func (s *Server) writeDumpCommand() {
s.io.seq = 0
data := make([]byte, 4+1+4+2+4+len(s.Cfg.LogFile))
pos := 4
data[pos] = 18 // COM_BINLOG_DUMP
pos++
binary.LittleEndian.PutUint32(data[pos:], uint32(s.Cfg.Position))
pos += 4
binary.LittleEndian.PutUint16(data[pos:], 0)
pos += 2
binary.LittleEndian.PutUint32(data[pos:], uint32(s.Cfg.ServerId))
pos += 4
copy(data[pos:], s.Cfg.LogFile)
s.io.writePacket(data)
res, _ := s.io.readPacket()
if res[0] == OK_HEADER {
fmt.Println("send dump command return ok.")
} else {
s.io.HandleError(res)
}
}
func (s *Server) register() {
s.io.seq = 0
hostname, _ := os.Hostname()
data := make([]byte, 4+1+4+1+len(hostname)+1+len(s.Cfg.User)+1+len(s.Cfg.Pass)+2+4+4)
pos := 4
data[pos] = 21 // COM_REGISTER_SLAVE
pos++
binary.LittleEndian.PutUint32(data[pos:], uint32(s.Cfg.ServerId))
pos += 4
data[pos] = uint8(len(hostname))
pos++
n := copy(data[pos:], hostname)
pos += n
data[pos] = uint8(len(s.Cfg.User))
pos++
n = copy(data[pos:], s.Cfg.User)
pos += n
data[pos] = uint8(len(s.Cfg.Pass))
pos++
n = copy(data[pos:], s.Cfg.Pass)
pos += n
binary.LittleEndian.PutUint16(data[pos:], uint16(s.Cfg.Port))
pos += 2
binary.LittleEndian.PutUint32(data[pos:], 0)
pos += 4
binary.LittleEndian.PutUint32(data[pos:], 0)
s.io.writePacket(data)
res, _ := s.io.readPacket()
if res[0] == OK_HEADER {
fmt.Println("register success.")
s.registerSucc = true
} else {
s.io.HandleError(data)
}
}
func (s *Server) writeCommand(command byte) {
s.io.seq = 0
_ = s.io.writePacket([]byte{0x01, 0x00, 0x00, 0x00, command})
}
func (s *Server) query(q string) error {
s.io.seq = 0
length := len(q) + 1
data := make([]byte, length+4)
data[4] = 3 // COM_QUERY
copy(data[5:], q)
return s.io.writePacket(data)
}
func (s *Server) Quit() {
s.writeCommand(1)
if err := s.conn.Close(); err != nil {
fmt.Printf("error in close :%v
", err)
}
}Main
package main
import (
"flag"
"fmt"
"github.com/igoso/gbinlog/app"
"os"
"os/signal"
"runtime"
"syscall"
)
var myHost = flag.String("host", "127.0.0.1", "MySQL replication host")
var myPort = flag.Int("port", 3306, "MySQL replication port")
var myUser = flag.String("user", "root", "MySQL replication user")
var myPass = flag.String("pass", "****", "MySQL replication pass")
var serverId = flag.Int("server_id", 1111, "MySQL replication server id")
func main() {
sc := make(chan os.Signal, 1)
signal.Notify(sc, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGINT, syscall.SIGTERM)
runtime.GOMAXPROCS(runtime.NumCPU()/4 + 1)
flag.Parse()
cfg := &app.Config{*myHost, *myPort, *myUser, *myPass, *serverId, "mysql-bin.000032", 3070}
srv := &app.Server{Cfg: cfg}
go srv.Run()
select {
case n := <-sc:
srv.Quit()
fmt.Printf("receive signal %v, closing", n)
}
}go.mod
module github.com/igoso/gbinlog
go 1.15
require (
github.com/siddontang/go-mysql v1.1.0
)Other
Note that if you issue a QUIT command while using binlog dump, the MySQL side will stay in close_wait state until a new connection arrives; you may also see error 1236 about duplicate server_id, which does not affect usage.
The author originally tried to implement a full binlog parser from scratch, but realized the workload is large due to many event types. The go-mysql package already provides a convenient BinlogSyncer with complete parsing capabilities, which can be used as an alternative.
Below is an example using BinlogSyncer:
package main
import (
"context"
"flag"
"fmt"
"os"
"github.com/pingcap/errors"
"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
)
var host = flag.String("host", "127.0.0.1", "MySQL host")
var port = flag.Int("port", 3306, "MySQL port")
var user = flag.String("user", "root", "MySQL user, must have replication privilege")
var password = flag.String("password", "****", "MySQL password")
var flavor = flag.String("flavor", "mysql", "Flavor: mysql or mariadb")
var file = flag.String("file", "mysql-bin.000032", "Binlog filename")
var pos = flag.Int("pos", 3070, "Binlog position")
var semiSync = flag.Bool("semisync", false, "Support semi sync")
var backupPath = flag.String("backup_path", "", "Backup path to store binlog files")
var rawMode = flag.Bool("raw", false, "Use raw mode")
func main() {
flag.Parse()
cfg := replication.BinlogSyncerConfig{
ServerID: 101,
Flavor: *flavor,
Host: *host,
Port: uint16(*port),
User: *user,
Password: *password,
RawModeEnabled: *rawMode,
SemiSyncEnabled: *semiSync,
UseDecimal: true,
}
b := replication.NewBinlogSyncer(cfg)
startPos := mysql.Position{Name: *file, Pos: uint32(*pos)}
if len(*backupPath) > 0 {
if err := b.StartBackup(*backupPath, startPos, 0); err != nil {
fmt.Printf("Start backup error: %v
", errors.ErrorStack(err))
return
}
} else {
s, err := b.StartSync(startPos)
if err != nil {
fmt.Printf("Start sync error: %v
", errors.ErrorStack(err))
return
}
for {
e, err := s.GetEvent(context.Background())
if err != nil {
events := s.DumpEvents()
for _, ev := range events {
ev.Dump(os.Stdout)
}
fmt.Printf("Get event error: %v
", errors.ErrorStack(err))
return
}
e.Dump(os.Stdout)
}
}
}That concludes the entire content of this issue.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
MaGe Linux Operations
Founded in 2009, MaGe Education is a top Chinese high‑end IT training brand. Its graduates earn 12K+ RMB salaries, and the school has trained tens of thousands of students. It offers high‑pay courses in Linux cloud operations, Python full‑stack, automation, data analysis, AI, and Go high‑concurrency architecture. Thanks to quality courses and a solid reputation, it has talent partnerships with numerous internet firms.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
