5 Canal增量数据同步利器
5.1 Canal介绍
canal主要用途是基于 MySQL 数据库增量日志解析,并能提供增量数据订阅和消费,应用场景十分丰富。
github地址:https://github.com/alibaba/canal
版本下载地址:https://github.com/alibaba/canal/releases
文档地址:https://github.com/alibaba/canal/wiki/Docker-QuickStart
Canal应用场景
1.电商场景下商品、用户实时更新同步到至Elasticsearch、solr等搜索引擎; 2.价格、库存发生变更实时同步到redis; 3.数据库异地备份、数据同步; 4.代替使用轮询数据库方式来监控数据库变更,有效改善轮询耗费数据库资源。
MySQL主从复制原理
1.MySQL master
将数据变更写入二进制日志( binary log
, 其中记录叫做二进制日志事件binary log events
,可以通过 show binlog events
进行查看)
2.MySQL slave
将 master 的 binary log events
拷贝到它的中继日志(relay log
)
3.MySQL slave
重放 relay log
中事件,将数据变更反映它自己的数据
Canal工作原理
1.canal 模拟 MySQL slave
的交互协议,伪装自己为 MySQL slave
,向 MySQL master
发送dump 协议
2.MySQL master
收到 dump 请求,开始推送 binary log
给 slave (即 canal )
3.canal 解析 binary log
对象(原始为 byte 流)
5.2 Canal安装
参考文档:https://github.com/alibaba/canal/wiki/QuickStart
5.2.1 MySQL Bin-log开启
1)MySQL开启bin-log
a.进入mysql容器
docker exec -it -u root mysql /bin/bash
b.开启mysql的binlog
cd /etc/mysql/mysql.conf.d
在mysqld.cnf最下面添加如下配置
# 开启 binlog
log-bin=/var/lib/mysql/mysql-bin
# 选择 ROW 模式
binlog-format=ROW
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server-id=12345
c.创建账号并授权
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant:
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
d.重启mysql
docker restart mysql
开启bin-log后,我们可以用sql语句查看下:
show variables like '%log_bin%'
效果如下:
5.2.2 Canal安装
1)拉取镜像
docker pull canal/canal-server:v1.1.1
2)安装容器
a.安装canal-server容器
docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server
b.配置canal-server
修改/home/admin/canal-server/conf/canal.properties
,将它的id属性修改成和mysql数据库中server-id不同的值,如下图:
c.修改/home/admin/canal-server/conf/example/instance.properties
,配置要监听的数据库服务地址和监听数据变化的数据库以及表,修改如下:
指定监听数据库表的配置如下canal.instance.filter.regex
:
mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
1. 所有表:.* or .*\\..*
2. canal schema下所有表: canal\\..*
3. canal下的以canal打头的表:canal\\.canal.*
4. canal schema下的一张表:canal.test1
5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)
重启canal:
docker restart canal
5.3 Canal微服务
我们搭建一个微服务,用于读取canal监听到的变更日志,微服务名字叫seckill-canal
。该项目我们需要引入canal-spring-boot-autoconfigure
包,并且需要实现EntryHandler<T>
接口,该接口中有3个方法,分别为insert
、update
、delete
,这三个方法用于监听数据增删改变化。
参考地址:https://github.com/NormanGyllenhaal/canal-client
1)pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>seckill-service</artifactId>
<groupId>com.seckill</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>seckill-canal</artifactId>
<dependencies>
<!--web-->
<dependency>
<groupId>com.seckill</groupId>
<artifactId>seckill-web</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!--esAPI-->
<dependency>
<groupId>com.seckill</groupId>
<artifactId>seckill-search-api</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!--goodsAPI-->
<dependency>
<groupId>com.seckill</groupId>
<artifactId>seckill-goods-api</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!--canal-->
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-autoconfigure</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<!-- 指定该Main Class为全局的唯一入口 -->
<mainClass>com.seckill.CanalApplication</mainClass>
<layout>ZIP</layout>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal><!--可以把依赖的包都打包到生成的Jar包中-->
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
bootstrap.yml配置
server:
port: 18088
spring:
application:
name: seckill-canal
cloud:
nacos:
config:
file-extension: yaml
server-addr: nacos-server:8848
discovery:
#Nacos的注册地址
server-addr: nacos-server:8848
#超时配置
ribbon:
ReadTimeout: 3000000
#Canal配置
canal:
server: canal-server:11111
destination: example
#日志
logging:
level:
root: error
2)创建com.seckill.handler.SkuHandler
实现EntryHandler
接口,代码如下:
@Component
@CanalTable(value = "tb_sku")
public class SkuHandler implements EntryHandler<Sku> {
/***
* 增加数据
* @param sku
*/
@Override
public void insert(Sku sku) {
System.out.println("===========insert:"+sku);
}
/***
* 修改数据
* @param before
* @param after
*/
@Override
public void update(Sku before, Sku after) {
System.out.println("===========update-before:"+before);
System.out.println("===========update-after:"+after);
}
/***
* 删除数据
* @param sku
*/
@Override
public void delete(Sku sku) {
System.out.println("===========delete:"+sku);
}
3)创建启动类
@SpringBootApplication
public class CanalApplication {
public static void main(String[] args) {
SpringApplication.run(CanalApplication.class,args);
}
}
程序启动后,修改tb_sku
数据,可以看到控制会打印修改前后的数据: