Java 快速使用 Debezium 监听 binlog

Debezium是一个开源分布式数据监控平台,可以持续捕获和流式传输对外部数据库系统所做的实时修改(CDC)。

通过CDC操作,Debezium 将外部数据库转换为实时事件流,从而获取和记录在相应数据库应用程序上所做的行级更改。

通常Debezium会配合Kafka Connect集群使用,以确保高级别的容错性和可靠性。对于一般级别的应用来说,使用嵌入式Debezium引擎即可。

环境

  • Debezium:1.9.0.Final
  • Mysql:8.0.29
  • JDK:17.0.2

开启 binlog 日志

临时开启

SET GLOBAL log_bin = 'mysql-bin';
SET GLOBAL binlog_format = 'ROW';  -- 推荐使用ROW格式,因为它提供了更好的事务一致性

要永久性地开启binlog,你需要编辑MySQL的配置文件(通常是my.cnfmysqld.cnfmy.ini),然后重启MySQL服务。在配置文件的[mysqld]部分下添加或修改以下行

[mysqld]
# 开启二进制日志功能
log_bin = mysql-bin
# 设置二进制日志格式,ROW格式对于复制和数据恢复更安全
binlog_format = ROW
# 设置一个唯一的server-id,尤其在主从复制场景中非常重要
# 设置服务器ID,用于主从复制等,确保每个服务器的ID是唯一的
server-id = 1  
  • log_bin指定了binlog文件的基础名称。
  • binlog_format建议设置为ROW,记录每一行数据的变化,适合几乎所有复制场景。
  • server-id需要设置为一个唯一的整数,是每个MySQL服务器实例的唯一标识,用于主从复制配置。

完成配置文件的修改后,需要重启MySQL服务使设置生效。重启命令通常为:

sudo service mysql restart  # 对于Linux系统
sudo systemctl restart mysqld  # 对于使用systemd的系统

或者在Windows系统中,可能通过服务管理器重启MySQL服务

重启服务后,你可以登录到MySQL服务器并通过SQL命令检查binlog是否已经开启:

SHOW VARIABLES LIKE 'log_bin';
  • 如果返回的结果中Value列显示为ON,则表示binlog已成功开启。

配置 Debezium

Debezium 相关依赖

注意 debezium 版本和 mysql 驱动版本适配

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>1.9.0.Final</version>
</dependency>

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>1.9.0.Final</version>
</dependency>

yml 配置

debezium:
  enable: true
  datasource:
    hostname: localhost
    port: 3306
    user: root
    password: 123456
    # 监控的表名白名单, 格式:数据库名.表名,数据库名.表名
    tableWhitelist: test.test
    # 存放读取进度的本地文件地址
    storageFile: F:/debezium/test/offsets/offset.dat
    historyFile: F:/debezium/test/history/custom-file-db-history.dat
    flushInterval: 10000
    # 伪装成slave的Debezium服务的id, 自定义, 有多个Debezium服务不能重复
    serverId: 9111
    serverName: name-1

Debezium 配置类

import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.relational.history.FileDatabaseHistory;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.File;
import java.io.IOException;


@ConditionalOnProperty(name ="debezium.enable", havingValue = "true")
@Configuration
@ConfigurationProperties(prefix ="debezium.datasource")
@Data
@Slf4j
public class DebeziumConfig {

    private String hostname;
    private String port;
    private String user;
    private String password;
    private String tableWhitelist;
    private String storageFile;
    private String historyFile;
    private Long flushInterval;
    private String serverId;
    private String serverName;

    @Bean
    public io.debezium.config.Configuration debeziumConfig() throws Exception {
        checkFile();
        return io.debezium.config.Configuration.create()
                .with("name", "mysql_connector")
                .with("database.connectionTimeZone", "Asia/Shanghai")
                .with("database.character.encoding", "utf8")
                .with("decimal.handling.mode", "string")
                //监控的数据库类型,这里选mysql
                .with("connector.class", MySqlConnector.class)

                .with("database.hostname", hostname)
                .with("database.port", port)
                .with("database.user", user)
                .with("database.password", password)
                //监控的数据库白名单, 如果选此值, 会忽略table.whitelist, 然后监控此db下所有表的binlog
//                .with("database.whitelist", "test")
                .with("table.whitelist", tableWhitelist)
                .with("database.server.id", serverId)
                .with("database.server.name", serverName)

                // .with("offset.storage", KafkaOffsetBackingStore.class)
                //使用kafka存储偏移量
                //.with("bootstrap.servers", "kafka-node:9092")
                //.with("offset.storage", KafkaOffsetBackingStore.class.getCanonicalName())
                //.with("offset.storage.topic", "debezium.offset.storage")
                //.with("offset.storage.partitions", "1")
                //.with("offset.storage.replication.factor", "1")

                //使用本地存储偏移量
                .with("offset.storage", FileOffsetBackingStore.class)
                //存放读取进度的本地文件地址
                .with("offset.storage.file.filename", storageFile)
                //读取进度刷新保存频率, 默认1分钟.
                // 如果不依赖kafka的话, 应该就没有exactly once只读取一次语义, 应该是至少读取一次. 意味着可能重复读取
                // 如果web容器挂了, 最新的读取进度没有刷新到文件里, 下次重启时, 就会重复读取binlog
                .with("offset.flush.interval.ms", flushInterval)

                .with("database.history", FileDatabaseHistory.class.getName())
                .with("database.history.file.filename", historyFile)
                //快照模式
                .with("snapshot.mode", "Schema_only")


                .build();
    }

    private void checkFile() throws IOException {
        String dir = storageFile.substring(0, storageFile.lastIndexOf("/"));
        File dirFile = new File(dir);
        if(!dirFile.exists()){
            if (!dirFile.mkdirs()) {
                log.error("dir file mkdirs fail...");
            }
        }
        File file = new File(storageFile);
        if(!file.exists()){
            if (!file.createNewFile()) {
                log.error("file create fail...");
            }
        }
    }
}

监听服务配置

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.debezium.config.Configuration;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;


@ConditionalOnBean(name = "debeziumConfiguration")
@Component
@Slf4j
public class MysqlBinlogListener {

    @Resource
    private Executor taskExecutor;

    private final List<DebeziumEngine<ChangeEvent<String, String>>> engineList = new ArrayList<>();

    private MysqlBinlogListener (Configuration configuration) {
        this.engineList.add(DebeziumEngine.create(Json.class)
                .using(configuration.asProperties())
                .notifying(record -> receiveChangeEvent(record.value()))
                .build());
    }

    private void receiveChangeEvent(String value) {
        if (StringUtils.hasLength(value)) {
            JSONObject json = JSON.parseObject(value);
            JSONObject payload = getPayload(json);

            String op = payload.getString("op");
            if (StringUtils.hasLength(op)) {
                //将监听到的binlog数据转换为自定义格式
                ChangeData changeData = getChangeData(payload);
                // 如果上抛异常会导致后续日志监听失败
                try {
                    //处理监听到的数据
                    System.out.println(JSON.toJSONString(changeData));
                }catch (Exception e){
                    log.error("binlog处理异常,原数据: " + changeData, e);
                }
            }
        }
    }

    @PostConstruct
    private void start() {
        for (DebeziumEngine<ChangeEvent<String, String>> engine : engineList) {
            taskExecutor.execute(engine);
        }
    }

    @PreDestroy
    private void stop() {
        for (DebeziumEngine<ChangeEvent<String, String>> engine : engineList) {
            if (engine != null) {
                try {
                    engine.close();
                } catch (IOException e) {
                    log.error("", e);
                }
            }
        }
    }


    public static JSONObject getPayload(JSONObject value) {
        return value.getJSONObject("payload");
    }

    public static ChangeData getChangeData(JSONObject payload) {
        JSONObject source = payload.getJSONObject("source");
        return ChangeData.builder()
                .op(payload.getString("op"))
                .table(source.getString("table"))
                .after(payload.getJSONObject("after"))
                .source(source)
                .before(payload.getJSONObject("before"))
                .build();
    }

    @Data
    @Builder
    public static class ChangeData {
        /**
         * 更改前数据
         */
        private JSONObject after;
        private JSONObject source;
        /**
         * 更改后数据
         */
        private JSONObject before;
        /**
         * 更改的表名
         */
        private String table;
        /**
         * 操作类型, 枚举 Envelope.Operation
         */
        private String op;
    }

}

相关文档

官方文档:https://debezium.io/documentation/reference/1.9/index.html

Debezium 引擎:https://debezium.io/documentation/reference/1.9/development/engine.html

参考文章:

使用Debezium Engine运行MySQL连接器:https://instaer.me/article/aLkJvj