Big Data 11 min read

Real-time MySQL Binlog Capture with Oracle GoldenGate and Kafka Integration

This article provides a step‑by‑step guide on configuring MySQL binlog, installing and deploying Oracle GoldenGate, extracting changes, converting them to JSON, and streaming the data into Kafka for real‑time processing, complete with code snippets and verification procedures.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Real-time MySQL Binlog Capture with Oracle GoldenGate and Kafka Integration

For real‑time processing of MySQL data streams, this tutorial explains how to monitor MySQL binlog using Oracle GoldenGate (OGG) and forward the captured changes to Kafka in JSON format, enabling downstream Flink or other stream processors to consume the data.

1. Design Flowchart

2. MySQL Binlog Configuration

Edit /etc/my.cnf to enable row‑based binlog and set replication parameters:

[root@localhost etc]# vim /etc/my.cnf
[mysqld]
# other configs omitted
lower_case_table_names=1
## Replication
server_id                       =2020041006     # unique
log_bin                         =mysql-bin-1   # unique
relay_log_recovery              =1
binlog_format                   =row   # must be row
master_info_repository          =TABLE
relay_log_info_repository       =TABLE
#rpl_semi_sync_master_enabled    =1
rpl_semi_sync_master_timeout    =1000
rpl_semi_sync_slave_enabled     =1
binlog-do-db                    =dsout    # database to capture
sql_mode=NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES

After editing, restart MySQL and verify the configuration.

3. Download and Install OGG

Download OGG from Oracle's site, extract, and set proper permissions:

mkdir -p /opt/module/ogg/oggservice
tar -xvf ggs_Linux_x64_MySQL_64bit.tar -C /opt/module/ogg/oggservice/
chown -R root:root oggservice/   # set owner

Start the OGG command line interface:

cd oggservice/
./ggsci

4. Source System Configuration (Extract & Pump)

GGSCI (localhost.localdomain) 1> create subdirs   # create directories
GGSCI (localhost.localdomain) 3> dblogin sourcedb [email protected]:3306,userid <em>user</em>,password <em>pwd</em>;   # login
GGSCI (localhost.localdomain) 3> edit params mgr
port 7015
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

GGSCI (localhost.localdomain) 4> edit params ext1   # extract process
EXTRACT ext1
setenv (MYSQL_HOME="/usr/local/mysql")
dboptions host 192.168.x.xx:3306, connectionport 3306
tranlogoptions altlogdest /usr/local/mysql/data/mysql-bin-1.index
SOURCEDB [email protected]:3306,userid <em>user</em>,password <em>pwd</em>
EXTTRAIL ./dirdat/et
dynamicresolution
GETUPDATEBEFORES
NOCOMPRESSDELETES
NOCOMPRESSUPDATES
table dsout.employees;
table dsout.departments;

GGSCI (localhost.localdomain) 5> edit params pump1   # pump process
EXTRACT pump1
SOURCEDB [email protected]:3306,userid <em>user</em>,password <em>pwd</em>
RMTHOST <em>target_ip</em>, MGRPORT 2021
RMTTRAIL ./dirdat/xd
table dsout.*;    # tables to push

# Register processes
GGSCI (localhost.localdomain DBLOGIN as dsout) 8> add extract ext1, tranlog,begin now
GGSCI (localhost.localdomain DBLOGIN as dsout) 9> add exttrail ./dirdat/et, extract ext1
GGSCI (localhost.localdomain DBLOGIN as dsout) 10> add extract pump1, exttrailsource ./dirdat/et
GGSCI (localhost.localdomain DBLOGIN as dsout) 11> add rmttrail ./dirdat/rt,extract pump1

# Defgen configuration
GGSCI (localhost.localdomain) 6> edit param defgen
defsfile ./dirdef/defgen.def
sourcedb [email protected]:3306,userid <em>user</em>,password <em>pwd</em>
table dsout.*;

# Generate defgen.prm
[mysql@localhost oggformysql]$ ./defgen paramfile ./dirprm/defgen.prm

5. Target System – Kafka Integration

Install OGG on the target host and configure the replicat process:

mkdir -p /opt/module/ogg/oggservice
tar -xvf OGG_BigData_Linux_x64_19.1.0.0.1.tar -C /opt/module/ogg/oggservice/
chown -R root:root oggservice/

Set library path for Java:

find / -name libjvm.so
vim ~/.bash_profile
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b03-1.el7.x86_64/jre/lib/amd64/server/
source ~/.bash_profile

Configure manager and replicat:

cd oggservice/
./ggsci
GGSCI (cdh102) 1> create subdirs
GGSCI (cdh102) 1> edit param mgr
PORT 2021
ACCESSRULE, PROG *, IPADDR *, ALLOW
GGSCI (cdh102) 2> edit param rep2
replicat rep2
sourcedefs ./dirdef/defgen.def
TARGETDB LIBFILE libggjava.so SET property=./dirprm/kafkaxd.props
MAP dsout.*, TARGET dsout.*;
GGSCI (cdh102) 5> add replicat rep2, exttrail ./dirdat/rt

6. Kafka Handler Configuration (kafkaxd.props)

# kafkaxd.props content
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=xindai_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=xindai-topic
gg.handler.kafkahandler.SchemaTopicName=xindai-topic
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=op
gg.handler.kafkahandler.format=json
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
gg.classpath=dirprm/:/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/libs/*
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

7. Kafka Producer Properties (xindai_kafka_producer.properties)

bootstrap.servers=cdh101:9092,cdh102:9092,cdh103:9092
acks=1
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=16384
linger.ms=0
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

8. Start Processes

# Target side
GGSCI (gpdata) 6> start mgr
GGSCI (gpdata) 7> start rep2
# Source side
GGSCI (localhost.localdomain) 1> start mgr
GGSCI (localhost.localdomain) 2> start ext1
GGSCI (localhost.localdomain) 3> start pump1   # start target mgr first

9. Verification

Consume messages from Kafka:

[root@cdh102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server cdh101:9092,cdh102:9092,cdh103:9092 --topic xindai-topic --from-beginning

Perform insert, update, delete on the source MySQL tables:

INSERT INTO employees VALUES('101','changyin',6666.66,'2020-05-05 16:12:20','syy01');
INSERT INTO employees VALUES('102','siling',1234.12,'2020-05-05 16:12:20','syy01');

Observe the JSON messages appear in the Kafka consumer output, confirming that OGG successfully captured MySQL binlog changes and streamed them to Kafka.

With this setup, real‑time data from MySQL can be ingested into downstream big‑data platforms such as Flink for further processing.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Real-TimeFlinkmysqlBinlogOracle GoldenGate
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.