Java 快速使用 Debezium 监听 binlog
Debezium是一个开源分布式数据监控平台,可以持续捕获和流式传输对外部数据库系统所做的实时修改(CDC)。
通过CDC操作,Debezium 将外部数据库转换为实时事件流,从而获取和记录在相应数据库应用程序上所做的行级更改。
通常Debezium会配合Kafka Connect集群使用,以确保高级别的容错性和可靠性。对于一般级别的应用来说,使用嵌入式Debezium引擎即可。
环境
- Debezium:1.9.0.Final
- Mysql:8.0.29
- JDK:17.0.2
开启 binlog 日志
临时开启
SET GLOBAL log_bin = 'mysql-bin';
SET GLOBAL binlog_format = 'ROW'; -- 推荐使用ROW格式,因为它提供了更好的事务一致性
要永久性地开启binlog,你需要编辑MySQL的配置文件(通常是my.cnf
或mysqld.cnf
或 my.ini
),然后重启MySQL服务。在配置文件的[mysqld]
部分下添加或修改以下行
[mysqld]
# 开启二进制日志功能
log_bin = mysql-bin
# 设置二进制日志格式,ROW格式对于复制和数据恢复更安全
binlog_format = ROW
# 设置一个唯一的server-id,尤其在主从复制场景中非常重要
# 设置服务器ID,用于主从复制等,确保每个服务器的ID是唯一的
server-id = 1
log_bin
指定了binlog文件的基础名称。binlog_format
建议设置为ROW
,记录每一行数据的变化,适合几乎所有复制场景。server-id
需要设置为一个唯一的整数,是每个MySQL服务器实例的唯一标识,用于主从复制配置。
完成配置文件的修改后,需要重启MySQL服务使设置生效。重启命令通常为:
sudo service mysql restart # 对于Linux系统
sudo systemctl restart mysqld # 对于使用systemd的系统
或者在Windows系统中,可能通过服务管理器重启MySQL服务
重启服务后,你可以登录到MySQL服务器并通过SQL命令检查binlog是否已经开启:
SHOW VARIABLES LIKE 'log_bin';
- 如果返回的结果中
Value
列显示为ON
,则表示binlog已成功开启。
配置 Debezium
Debezium 相关依赖
注意 debezium 版本和 mysql 驱动版本适配
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>1.9.0.Final</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>1.9.0.Final</version>
</dependency>
yml 配置
debezium:
enable: true
datasource:
hostname: localhost
port: 3306
user: root
password: 123456
# 监控的表名白名单, 格式:数据库名.表名,数据库名.表名
tableWhitelist: test.test
# 存放读取进度的本地文件地址
storageFile: F:/debezium/test/offsets/offset.dat
historyFile: F:/debezium/test/history/custom-file-db-history.dat
flushInterval: 10000
# 伪装成slave的Debezium服务的id, 自定义, 有多个Debezium服务不能重复
serverId: 9111
serverName: name-1
Debezium 配置类
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.relational.history.FileDatabaseHistory;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.File;
import java.io.IOException;
@ConditionalOnProperty(name ="debezium.enable", havingValue = "true")
@Configuration
@ConfigurationProperties(prefix ="debezium.datasource")
@Data
@Slf4j
public class DebeziumConfig {
private String hostname;
private String port;
private String user;
private String password;
private String tableWhitelist;
private String storageFile;
private String historyFile;
private Long flushInterval;
private String serverId;
private String serverName;
@Bean
public io.debezium.config.Configuration debeziumConfig() throws Exception {
checkFile();
return io.debezium.config.Configuration.create()
.with("name", "mysql_connector")
.with("database.connectionTimeZone", "Asia/Shanghai")
.with("database.character.encoding", "utf8")
.with("decimal.handling.mode", "string")
//监控的数据库类型,这里选mysql
.with("connector.class", MySqlConnector.class)
.with("database.hostname", hostname)
.with("database.port", port)
.with("database.user", user)
.with("database.password", password)
//监控的数据库白名单, 如果选此值, 会忽略table.whitelist, 然后监控此db下所有表的binlog
// .with("database.whitelist", "test")
.with("table.whitelist", tableWhitelist)
.with("database.server.id", serverId)
.with("database.server.name", serverName)
// .with("offset.storage", KafkaOffsetBackingStore.class)
//使用kafka存储偏移量
//.with("bootstrap.servers", "kafka-node:9092")
//.with("offset.storage", KafkaOffsetBackingStore.class.getCanonicalName())
//.with("offset.storage.topic", "debezium.offset.storage")
//.with("offset.storage.partitions", "1")
//.with("offset.storage.replication.factor", "1")
//使用本地存储偏移量
.with("offset.storage", FileOffsetBackingStore.class)
//存放读取进度的本地文件地址
.with("offset.storage.file.filename", storageFile)
//读取进度刷新保存频率, 默认1分钟.
// 如果不依赖kafka的话, 应该就没有exactly once只读取一次语义, 应该是至少读取一次. 意味着可能重复读取
// 如果web容器挂了, 最新的读取进度没有刷新到文件里, 下次重启时, 就会重复读取binlog
.with("offset.flush.interval.ms", flushInterval)
.with("database.history", FileDatabaseHistory.class.getName())
.with("database.history.file.filename", historyFile)
//快照模式
.with("snapshot.mode", "Schema_only")
.build();
}
private void checkFile() throws IOException {
String dir = storageFile.substring(0, storageFile.lastIndexOf("/"));
File dirFile = new File(dir);
if(!dirFile.exists()){
if (!dirFile.mkdirs()) {
log.error("dir file mkdirs fail...");
}
}
File file = new File(storageFile);
if(!file.exists()){
if (!file.createNewFile()) {
log.error("file create fail...");
}
}
}
}
监听服务配置
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.debezium.config.Configuration;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
@ConditionalOnBean(name = "debeziumConfiguration")
@Component
@Slf4j
public class MysqlBinlogListener {
@Resource
private Executor taskExecutor;
private final List<DebeziumEngine<ChangeEvent<String, String>>> engineList = new ArrayList<>();
private MysqlBinlogListener (Configuration configuration) {
this.engineList.add(DebeziumEngine.create(Json.class)
.using(configuration.asProperties())
.notifying(record -> receiveChangeEvent(record.value()))
.build());
}
private void receiveChangeEvent(String value) {
if (StringUtils.hasLength(value)) {
JSONObject json = JSON.parseObject(value);
JSONObject payload = getPayload(json);
String op = payload.getString("op");
if (StringUtils.hasLength(op)) {
//将监听到的binlog数据转换为自定义格式
ChangeData changeData = getChangeData(payload);
// 如果上抛异常会导致后续日志监听失败
try {
//处理监听到的数据
System.out.println(JSON.toJSONString(changeData));
}catch (Exception e){
log.error("binlog处理异常,原数据: " + changeData, e);
}
}
}
}
@PostConstruct
private void start() {
for (DebeziumEngine<ChangeEvent<String, String>> engine : engineList) {
taskExecutor.execute(engine);
}
}
@PreDestroy
private void stop() {
for (DebeziumEngine<ChangeEvent<String, String>> engine : engineList) {
if (engine != null) {
try {
engine.close();
} catch (IOException e) {
log.error("", e);
}
}
}
}
public static JSONObject getPayload(JSONObject value) {
return value.getJSONObject("payload");
}
public static ChangeData getChangeData(JSONObject payload) {
JSONObject source = payload.getJSONObject("source");
return ChangeData.builder()
.op(payload.getString("op"))
.table(source.getString("table"))
.after(payload.getJSONObject("after"))
.source(source)
.before(payload.getJSONObject("before"))
.build();
}
@Data
@Builder
public static class ChangeData {
/**
* 更改前数据
*/
private JSONObject after;
private JSONObject source;
/**
* 更改后数据
*/
private JSONObject before;
/**
* 更改的表名
*/
private String table;
/**
* 操作类型, 枚举 Envelope.Operation
*/
private String op;
}
}
相关文档
官方文档:https://debezium.io/documentation/reference/1.9/index.html
Debezium 引擎:https://debezium.io/documentation/reference/1.9/development/engine.html
参考文章:
使用Debezium Engine运行MySQL连接器:https://instaer.me/article/aLkJvj