Databases 16 min read

How to Parse MySQL Binlog with Go: Build a Simple Replication Demo

This article demonstrates how to use Go to parse MySQL binlog events and synchronize data by implementing a lightweight replication client, covering configuration, server module, packet handling, handshake, registration, dump command, and providing complete source code examples with explanations.

MaGe Linux Operations
MaGe Linux Operations
MaGe Linux Operations
How to Parse MySQL Binlog with Go: Build a Simple Replication Demo

The author demonstrates how to use Go to parse MySQL binlog and synchronize data.

Background

This is a demo‑oriented binlog parsing tool written in Go. In simple terms it simulates the MySQL binlog protocol, creates a service that acts as a MySQL replica, receives binlog events, similar to the Java project Canal.

Practice

Process and Structure

The execution mainly involves the server module. First it connects to MySQL (referencing the kingshard middleware), disables checksum, registers as a slave, sends the binlog_dump command, then listens for binlog data and uses go-mysql methods to parse the events and print them.

Code

Configuration part, describing the binlog file, position, and master MySQL credentials.

package app

type Config struct {
    Host      string
    Port      int
    User      string
    Pass      string
    ServerId  int

    LogFile   string
    Position  int
}

Server module – the core part that handles connection, registration, command sending, and binlog retrieval. Parsing uses the go-mysql library.

package app

import (
    "bufio"
    "bytes"
    "context"
    "crypto/sha1"
    "encoding/binary"
    "errors"
    "fmt"
    "github.com/siddontang/go-mysql/replication"
    "io"
    "net"
    "os"
    "time"
)

const (
    MinProtocolVersion byte = 10
    OK_HEADER          byte = 0x00
    ERR_HEADER         byte = 0xff
    EOF_HEADER         byte = 0xfe
    LocalInFile_HEADER byte = 0xfb
)

const MaxPayloadLength = 1<<24 - 1

type Server struct {
    Cfg          *Config
    Ctx          context.Context
    conn         net.Conn
    io           *PacketIo
    registerSucc bool
}

func (s *Server) Run() {
    defer func() { s.Quit() }()
    s.dump()
}

func (s *Server) dump() {
    err := s.handshake()
    if err != nil {
        panic(err)
    }
    s.invalidChecksum()
    fmt.Println("dump ...")
    s.register()
    s.writeDumpCommand()
    parser := replication.NewBinlogParser()
    for {
        data, err := s.io.readPacket()
        if err != nil || len(data) == 0 {
            continue
        }
        if data[0] == OK_HEADER {
            data = data[1:]
            if e, err := parser.Parse(data); err == nil {
                e.Dump(os.Stdout)
            } else {
                fmt.Println(err)
            }
        } else {
            s.io.HandleError(data)
        }
    }
}

func (s *Server) invalidChecksum() {
    sql := `SET @master_binlog_checksum='NONE'`
    if err := s.query(sql); err != nil {
        fmt.Println(err)
    }
    _, _ = s.io.readPacket()
}

func (s *Server) handshake() error {
    conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", s.Cfg.Host, s.Cfg.Port), 10*time.Second)
    if err != nil {
        return err
    }
    tc := conn.(*net.TCPConn)
    tc.SetKeepAlive(true)
    tc.SetNoDelay(true)
    s.conn = tc
    s.io = &PacketIo{}
    s.io.r = bufio.NewReaderSize(s.conn, 16*1024)
    s.io.w = tc
    data, err := s.io.readPacket()
    if err != nil {
        return err
    }
    if data[0] == ERR_HEADER {
        return errors.New("error packet")
    }
    if data[0] < MinProtocolVersion {
        return fmt.Errorf("version is too lower, current:%d", data[0])
    }
    // ... (handshake parsing omitted for brevity)
    return nil
}

func (s *Server) writeDumpCommand() {
    s.io.seq = 0
    data := make([]byte, 4+1+4+2+4+len(s.Cfg.LogFile))
    pos := 4
    data[pos] = 18 // COM_BINLOG_DUMP
    pos++
    binary.LittleEndian.PutUint32(data[pos:], uint32(s.Cfg.Position))
    pos += 4
    binary.LittleEndian.PutUint16(data[pos:], 0)
    pos += 2
    binary.LittleEndian.PutUint32(data[pos:], uint32(s.Cfg.ServerId))
    pos += 4
    copy(data[pos:], s.Cfg.LogFile)
    s.io.writePacket(data)
    res, _ := s.io.readPacket()
    if res[0] == OK_HEADER {
        fmt.Println("send dump command return ok.")
    } else {
        s.io.HandleError(res)
    }
}

func (s *Server) register() {
    s.io.seq = 0
    hostname, _ := os.Hostname()
    data := make([]byte, 4+1+4+1+len(hostname)+1+len(s.Cfg.User)+1+len(s.Cfg.Pass)+2+4+4)
    pos := 4
    data[pos] = 21 // COM_REGISTER_SLAVE
    pos++
    binary.LittleEndian.PutUint32(data[pos:], uint32(s.Cfg.ServerId))
    pos += 4
    data[pos] = uint8(len(hostname))
    pos++
    n := copy(data[pos:], hostname)
    pos += n
    data[pos] = uint8(len(s.Cfg.User))
    pos++
    n = copy(data[pos:], s.Cfg.User)
    pos += n
    data[pos] = uint8(len(s.Cfg.Pass))
    pos++
    n = copy(data[pos:], s.Cfg.Pass)
    pos += n
    binary.LittleEndian.PutUint16(data[pos:], uint16(s.Cfg.Port))
    pos += 2
    binary.LittleEndian.PutUint32(data[pos:], 0)
    pos += 4
    binary.LittleEndian.PutUint32(data[pos:], 0)
    s.io.writePacket(data)
    res, _ := s.io.readPacket()
    if res[0] == OK_HEADER {
        fmt.Println("register success.")
        s.registerSucc = true
    } else {
        s.io.HandleError(data)
    }
}

func (s *Server) writeCommand(command byte) {
    s.io.seq = 0
    _ = s.io.writePacket([]byte{0x01, 0x00, 0x00, 0x00, command})
}

func (s *Server) query(q string) error {
    s.io.seq = 0
    length := len(q) + 1
    data := make([]byte, length+4)
    data[4] = 3 // COM_QUERY
    copy(data[5:], q)
    return s.io.writePacket(data)
}

func (s *Server) Quit() {
    s.writeCommand(1)
    if err := s.conn.Close(); err != nil {
        fmt.Printf("error in close :%v
", err)
    }
}

Main

package main

import (
    "flag"
    "fmt"
    "github.com/igoso/gbinlog/app"
    "os"
    "os/signal"
    "runtime"
    "syscall"
)

var myHost = flag.String("host", "127.0.0.1", "MySQL replication host")
var myPort = flag.Int("port", 3306, "MySQL replication port")
var myUser = flag.String("user", "root", "MySQL replication user")
var myPass = flag.String("pass", "****", "MySQL replication pass")
var serverId = flag.Int("server_id", 1111, "MySQL replication server id")

func main() {
    sc := make(chan os.Signal, 1)
    signal.Notify(sc, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGINT, syscall.SIGTERM)
    runtime.GOMAXPROCS(runtime.NumCPU()/4 + 1)
    flag.Parse()
    cfg := &app.Config{*myHost, *myPort, *myUser, *myPass, *serverId, "mysql-bin.000032", 3070}
    srv := &app.Server{Cfg: cfg}
    go srv.Run()
    select {
    case n := <-sc:
        srv.Quit()
        fmt.Printf("receive signal %v, closing", n)
    }
}

go.mod

module github.com/igoso/gbinlog

go 1.15

require (
    github.com/siddontang/go-mysql v1.1.0
)

Other

Note that if you issue a QUIT command while using binlog dump, the MySQL side will stay in close_wait state until a new connection arrives; you may also see error 1236 about duplicate server_id, which does not affect usage.

The author originally tried to implement a full binlog parser from scratch, but realized the workload is large due to many event types. The go-mysql package already provides a convenient BinlogSyncer with complete parsing capabilities, which can be used as an alternative.

Below is an example using BinlogSyncer:

package main

import (
    "context"
    "flag"
    "fmt"
    "os"
    "github.com/pingcap/errors"
    "github.com/siddontang/go-mysql/mysql"
    "github.com/siddontang/go-mysql/replication"
)

var host = flag.String("host", "127.0.0.1", "MySQL host")
var port = flag.Int("port", 3306, "MySQL port")
var user = flag.String("user", "root", "MySQL user, must have replication privilege")
var password = flag.String("password", "****", "MySQL password")
var flavor = flag.String("flavor", "mysql", "Flavor: mysql or mariadb")
var file = flag.String("file", "mysql-bin.000032", "Binlog filename")
var pos = flag.Int("pos", 3070, "Binlog position")
var semiSync = flag.Bool("semisync", false, "Support semi sync")
var backupPath = flag.String("backup_path", "", "Backup path to store binlog files")
var rawMode = flag.Bool("raw", false, "Use raw mode")

func main() {
    flag.Parse()
    cfg := replication.BinlogSyncerConfig{
        ServerID: 101,
        Flavor:   *flavor,
        Host:    *host,
        Port:    uint16(*port),
        User:    *user,
        Password: *password,
        RawModeEnabled:  *rawMode,
        SemiSyncEnabled: *semiSync,
        UseDecimal: true,
    }
    b := replication.NewBinlogSyncer(cfg)
    startPos := mysql.Position{Name: *file, Pos: uint32(*pos)}
    if len(*backupPath) > 0 {
        if err := b.StartBackup(*backupPath, startPos, 0); err != nil {
            fmt.Printf("Start backup error: %v
", errors.ErrorStack(err))
            return
        }
    } else {
        s, err := b.StartSync(startPos)
        if err != nil {
            fmt.Printf("Start sync error: %v
", errors.ErrorStack(err))
            return
        }
        for {
            e, err := s.GetEvent(context.Background())
            if err != nil {
                events := s.DumpEvents()
                for _, ev := range events {
                    ev.Dump(os.Stdout)
                }
                fmt.Printf("Get event error: %v
", errors.ErrorStack(err))
                return
            }
            e.Dump(os.Stdout)
        }
    }
}

That concludes the entire content of this issue.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

parsingGoBinlogReplication
MaGe Linux Operations
Written by

MaGe Linux Operations

Founded in 2009, MaGe Education is a top Chinese high‑end IT training brand. Its graduates earn 12K+ RMB salaries, and the school has trained tens of thousands of students. It offers high‑pay courses in Linux cloud operations, Python full‑stack, automation, data analysis, AI, and Go high‑concurrency architecture. Thanks to quality courses and a solid reputation, it has talent partnerships with numerous internet firms.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.