Canal
# 概述
Canal 是个开源的消息队列系统 , 基于 数据库增量 日志解析 进行同步 , 提供增量 增量数据 订阅/消费 . 用于 异步消息传递 和 事件驱动 应用程序
GitHub : https://github.com/alibaba/canal (opens new window)
Canal核心 :
- 数据同步
- 分布式架构
- 多协议支持
- 权限管理
# 运作原理
Canal 同步原理 , 基于主从同步实现 , 运作原理 :
- Canal 提供 MySQL 二进制文件(binary_log) , 用于记录 增 删 改 命令的事件
- master 将 增 删 改 事件 写入 binary_log , 写入前会根据 Canal 配置过滤规则 , 过滤binary_log
- slave 异步线程不断读取 master 写入的 binary_log 拷贝到 中继日志(realy_log)
- slave 根据 realy_log 中的事件 , 将数据再次加载一次 , 到达数据同步目的
Canal_slave
你可以把它看做是 伪装成MySQL的部分 , 去监听MySQL
# 安装
以下安装基于 MySQL 数据库 , 实现同步
准备 :
- Java环境
- Canal安装包 , 并上传主机
- MySQL
MySQL授权用户
# 创建用户 canal
create user canal@'%' IDENTIFIED by 'canal';
# 授权用户
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%' identified by 'canal';
# 刷新生效
FLUSH PRIVILEGES;
提示
账号密码都是 canal , 后续自行更改即可!
Canal的 conf配置
- 进入 conf目录 , 拷贝一份
example
文件夹 , 并更改为你实例的名称 - 进入 刚刚拷贝的文件夹 , 配置
instance.properties
文件
# sql IP:Port
canal.instance.master.address=127.0.0.1:3306
# sql账号
canal.instance.dbUsername=canal
# sql密码
canal.instance.dbPassword=canal
# sql字符集
canal.instance.connectionCharset=UTF-8
# 启动同步日志
canal.instance.tsdb.enable=true
# sql库名
canal.instance.defaultDatabaseName=<库名>
# 监听 指定表名
canal.instance.filter.regex=<库名>\\..*
监听表名支持写法 :
Perl正则表达式 , 多个正则需要 ,
分割 , 转义符需要 \\
格式 | 说明 |
---|---|
.* / .*\\.. | 所有库表 |
canal\\..* | canal库下的 所有表 |
canal\\.canal.* | canal库下的 以canal开头的表 |
canal.test1 | canal库的test1表 |
启动验证
进入bin执行以下命令 :
# 启动
sh startup.sh
# 创建实例
sh canal.sh start
# 查看日志 (但SQL 增 改 时..)
sh ./bin/logview.sh -f -c ../conf/example/instance.properties
# 后面加 -s 选项 代表 包含删除操作
sh ./bin/logview.sh -f -c ../conf/example/instance.properties -s
# docker
MySQL配置
MySQL根据目录下的 /conf/my.cnf
配置
# 配置文件路径
log-bin= /var/lib/mysql/mysql-bin
# 指定数据库
binlog-do-db=<你的数据库>
验证配置生效
# shell 重启容器
docker restart mysql
# 进入sql指定以下命令
show master status;
docker容器 通信
容器与容器之间的独立的 , 因此需要建立网络通信
创建网络
docker network create <网络名称>
MySQL连接网络
docker network connect <网络名称> <容器name/id>
# MySQL容器连接
docker network connect <网络名称> <容器name/id>
Canal配置
载入Canal镜像 , 从外部下载的
docker load -i <文件名>.tar
运行 Canal容器
docker run -p 11111:11111 --name canal \
-e canal.destinations=<库名> \
-e canal.instance.master.address=mysql:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \
-e canal.instance.filter.regex=<库名>\\..* \
--network <网络名称> \
-d canal/canal-server:v1.1.5
验证
进入容器日志查看
# 进入容器
docker exec -it canal bash
# 查看日志
tail -f canal-server/logs/canal/canal.log
# 查看实例日志
tail -f canal-server/logs/sans/sans.log
# SprinBoot监听同步
引入依赖
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
配置
canal:
destination: <库名> # canal的集群名字,要与安装canal时设置的名称一致
server: <IP>:11111 # canal服务地址
修改实体类
通过 @Id/@Column /@Transient/@TableField 等注解完善 对库表的映射
在逆向工程生成中 , 可以生成这些注解 , 如果如果没有需要手动配置
监听器类
通过实现 EntryHandler<T>
接口 编写监听器 , 里面包含有 增 删 改 触发的方法 , 我们需要重写他们
应用前我们需要注意 :
- 监听类需要
@CanalTable("<表名>")
注解 指定监听的表 EntryHandler<T>
接口 泛型 需要些表中对应的 实体类
import com.heima.item.pojo.Item;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;
import javax.annotation.Resource;
@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> {
// 封装的 RedisTeamp操作类(并非重点)
@Resource
private RedisHandler redisHandler;
// 监控消息 增 删 改
@Override
public void insert(Item item) {
String key = "item:" + item.getId();
redisHandler.save(key, item);
}
/**
* 可根据 之后/之前 的数据差异进行控制更新缓存
* @param before 变化前对象
* @param after 变化后对象
*/
@Override
public void update(Item before, Item after) {
String key = "item:" + after.getId();
redisHandler.save(key, after);
}
@Override
public void delete(Item item) {
String key = "item:" + item.getId();
redisHandler.deleteByKey(key);
}
}
运行测试
- 查看Redis数据
- 更改MySQL数据
- 查看Redis数据
可以发现 Reids 和 MySQL数据是同步的
目前学习 Canal 简单应用 , 发现这个的确挺好用的 , 后续学到其他知识点尝试拓展下~