监听 MySQL binlog 实现数据变化后的实时通知

Hello 大家好,我是阿粉。不知道大家在日常的工作中有没有遇到这样的场景,很多时候业务数据有变更需要及时加载到缓存、ES 或者发送到消息队列中通知下游服务。

一般遇到这种情况下,在实时性要求不高的场景我们有两种处理模式,一种是写任务定时推送数据同步到缓存中,另一个是下游服务定时自动拉取。这两种模式都依赖服务自己的定时周期时间,很多时候不好设定具体要多久执行一次,定时时间太短在数据没有变化的时候会有很多无效的操作,如果定时时间太长可能很多时候数据的延迟会比较大,某些时候影响也不好。

那有没有一种比较好的方式可以解决这个问题呢?答案当然是肯定的。今天就给大家介绍一下 Canal,基于 MySQLbin log 日志来实时监听数据变化。

什么是 Canal

官方的解释是:canal,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

通过官方的解释我们看到,是针对 MySQL 数据库增量日志解析的,MySQL 的日志是通过 bin log 的形式存储的二进制文件,提供数据订阅和消费就是说提供对二进制文件数据的监听。当日志数据发生变化的时候就会被监听到,从而程序就可以实时获取到有变化的数据。拿到变化的数据后就可以更新进缓存,ES 或发送到消息队列中通知下游服务了。

原理

上面介绍了 Canal 的基本概念,现在我们看看 Canal是怎么实现的,我们都知道 MySQL 是支持主从同步的,而且 Slave 也是通过 bin log 日志的形式同步 master 实例数据的。所以 Canal 就巧妙的运用了这个原理,把自己模拟成一个 Slave,给 MySQLmaster 发送 dump 协议,当 master 接受到 dump 协议的时候就以为 Canal 是一个 Slave 就会推送 bin logCanal

使用方式

开启 MySQL 的binlog

MySQL 的安装阿粉这里就不演示了,网上的文章一大把,大家可以自己去研究安装,要是 macOS 的话,终端里面输入brew install mysql 坐等搞定。

安装完成过后我们看下是否开启了 bin log ,如果没有开启则修改 my.cnf 增加 log-bin=mysql-bin 即可开启。输入命令mysql> show variables like 'log_bin'; 从图中我们可以看到阿粉这里是开启了 bin log 日志的。

接下来我们创建一个 canal 的账号,用于 canal 使用。我们创建一个 canal 的账号,同时密码也是 canal

1
2
3
4
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

安装 Canal

这里我们安装 1.1.5 的版本,可以直接 wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz 也可以在 GitHub 上面直接下载。

下载完解压后目录如图,我们需要修改配置文件,将账号密码以及 bin log 文件名配上

配置完成过后,通过 bin 目录下的脚本进行启动,并且通过日志我们可以看到启动成功。

服务端启动成功后,我们就需要使用客户端去获取数据了,这里我们可以参考 CanalGitHub 官网中提供的 example 样例去进行模拟。

这里有个坑大家要注意下,如果 MySQL 的版本是 8.0 以下应该没有这个问题,如果是 8.0 版本的,我们通过查看tail -f example.log 日志会发现如下异常Caused by: java.io.IOException: caching_sha2_password Auth failed

阿粉这里就遇到了,经过在官方 GitHub 上面的 issue 中,如果搜索到相关的错误信息 https://github.com/alibaba/canal/issues/1700,里面有大佬给了解决方案,在 MySQL 中执行如下命令即可解决

1
2
3
ALTER USER 'canal'@'%' IDENTIFIED BY 'canal' PASSWORD EXPIRE NEVER;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
FLUSH PRIVILEGES;

如果没有遇到这个问题的小伙伴就可以直接忽略,接下来我们通过官方源码中的 example 示例来测试功能。把源码下载下来后找到com.alibaba.otter.canal.example.SimpleCanalClientTest 类,正常来说不需要修改什么内容,如果密码有变化的话这里可以调整,然后直接运行 main 函数即可。这个时候 MySQL,Canal,以及我们的测试类都已经启动了,下面通过执行 SQL 来创建数据库和表以及插入相应的数据,观察控制台的输出情况。

数据变更

创建数据库

1
2
3
4
5
6
mysql> create database canal_test;
Query OK, 1 row affected (0.01 sec)
mysql> use canal_test;
Database changed
mysql> show tables;
Empty set (0.00 sec)

我们通过语句create database canal_test; 创建了数据库过后,可以看到控制有如下输出,已经监听到了 bin log 的变化了。

创建测试表

再执行如下语句创建数据表

1
2
3
4
5
6
7
8
9
CREATE TABLE `example` 
(
    `id` INT(11) NOT NULL
    ,`username` VARCHAR(32) DEFAULT NULL COMMENT '用户名称'
    ,` age` INT(11) DEFAULT 0 COMMENT '用户年龄'
    ,` sex` INT(11) DEFAULT 0 COMMENT '用户性别 0 男 1 女'
    ,PRIMARY KEY (`id`)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户信息表';

可以看到成功的监听到了数据表的创建,接下来我们再试试插入数据和更新数据

## 插入语句
INSERT INTO example VALUES(1,'张三', 18,0),(2,'李四', 19,0),(3,'王五', 20,1);

## 更新语句
update example set username = '张小三' where id = 1;

从上图中我们可以看到插入的数据以及更新的数据都被实时的监听到了。监听到数据过后,我们就可以根据事件类型以及相应的库和表名来进行过滤操作了。对了,我们可以通过配置 filter 来过滤需要监听的数据库和数据表或者字段,这个都是可以实现的,避免无用的数据变更带来的影响。

对于访问 GitHub 很慢的小伙伴,阿粉已经帮大家把 Canal 的压缩包下载好了,公众号回复【canal】即可获取网盘地址。

总结

今天的文章给大家分享了 Canal 的使用,感兴趣的小伙伴可以自己去试试,如果需要的话,可以在项目中用起来,会事半功倍。

Java Geek Tech wechat
欢迎订阅 Java 极客技术,这里分享关于 Java 的一切。