基于 canal 的 Mysql 与 Elasticsearch 实时同步的 javaweb 服务。
canal是阿里巴巴mysql数据库binlog的增量订阅&消费组件。canal传送门
暴露Http接口(接口定义见wiki),待调用后开启后台线程,通过主键分批同步指定数据库中数据到Elasticsearch
读取数据库会加读锁
主键必须为数字类型
- 首先会根据所给的数据库主键字段,拿到最大的主键数字max_id;
- 设pk=min_id(默认是数据库中的主键最小值);
- 加读锁🔐,从数据库中取出pk —— pk+stepSize 大小的数据(默认500)的数据;
- 插入到Elasticsearch中;
- 释放读锁🔐,pk累加stepSize,循环3.操作,直到pk>max_id
循环监听canal通过binlog同步过来的event事件,区别增删改进行与之对应的Elasticsearch的操作。
目前只解析了 insert、update、delete,其它数据库操作会被忽略
Mysql字段类型 | Elasticsearch类型 |
char | {"type": "text", "fields": {"keyword": {"type": "keyword", "ignore_above": 256}} |
text | {"type": "text", "fields": {"keyword": {"type": "keyword", "ignore_above": 256}} |
blob | {"type": "text", "fields": {"keyword": {"type": "keyword", "ignore_above": 256}} |
int | {"type": "long"} |
date | {"type": "date"} |
time | {"type": "date"} |
float | {"type": "float"} |
double | {"type": "float"} |
decimal | {"type": "float"} |
其它 | {"type": "text", "fields": {"keyword": {"type": "keyword", "ignore_above": 256}} |
- Mysql的binlog格式必须为ROW
- 由于使用binlog进行增量同步,和数据库主从类似,不可避免的会有一定的主从延迟,延迟时间取决于机房网络、机器负载、数据量大小等
- Elasticsearch支持的版本为5.x
- 增量同步只监听了 INSERT、UPDATE、DELETE,其它如建表、删表等尚未支持
- 建议Elasticsearch的mapping手动来创建,因为默认的创建方式不能保证满足业务需求
如果有不合理的地方,还请不吝赐教。
- QQ:760823254
- 邮件:[email protected]
支持记得star✨