增量数据同步-canal
1、定义
基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
2、原理
canal的工作原理就是把自己伪装成MySQL slave,模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如MySQL,Kafka,Elastic Search等等。 3、环境搭建
mysql搭建+binlog开启
docker run --name canal-trigger -p 11111:11111 -d canal/canal-server
4、参数
canal可以监听多个mysql,配置文件主要分为主配置文件canal.properties以及各个目标下的instance.properties
# 数据库地址
canal.instance.master.address=host.docker.internal:3307
# binlog日志名称
canal.instance.master.journal.name=
# mysql主库链接时起始的binlog偏移量
canal.instance.master.position=
# mysql主库链接时起始的binlog的时间戳
canal.instance.master.timestamp=
canal.instance.master.gtid=
# username/password
# 在MySQL服务器授权的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 字符集
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
# table regex .*\\..*表示监听所有表 也可以写具体的表名,用,隔开
canal.instance.filter.regex=.*\\..*
# mysql 数据解析表的黑名单,多个表用,隔开
canal.instance.filter.black.regex=
5、客户端代码
public class CanalTest {
public static void main(String[] args) throws Exception {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
try {
connector.connect();
//监听的表, 格式为数据库.表名,数据库.表名
connector.subscribe(".*\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(1); // 获取指定数量的数据
long batchId = message.getId();
if (batchId == -1 || message.getEntries().isEmpty()) {
Thread.sleep(1000);
continue;
}
// System.out.println(message.getEntries());
printEntries(message.getEntries());
connector.ack(batchId);// 提交确认,消费成功,通知server删除数据
// connector.rollback(batchId);// 处理失败, 回滚数据,后续重新获取数据
}
}catch (Exception e){
}finally {
connector.disconnect();
}
}
private static void printEntries(List<Entry> entries) throws Exception {
for (Entry entry : entries) {
if (entry.getEntryType() != EntryType.ROWDATA) {
continue;
}
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
for (RowData rowData : rowChange.getRowDatasList()) {
switch (rowChange.getEventType()) {
case INSERT:
System.out.println("INSERT ");
printColumns(rowData.getAfterColumnsList());
break;
case UPDATE:
System.out.println("UPDATE ");
printColumns(rowData.getAfterColumnsList());
break;
case DELETE:
System.out.println("DELETE ");
printColumns(rowData.getBeforeColumnsList());
break;
default:
break;
}
}
}
}
private static void printColumns(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
主要流程是:
启动->轮询获取消息-处理消息->ack确认
6、springboot集成
maven配置
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
application.yml配置
canal:
server: 127.0.0.1:11111
destination: example
handler
//表名
@CanalTable(value = "book")
@Slf4j
@Component
public class BookHandler implements EntryHandler<Book> {
@Override
public void insert(Book book) {
log.info("------------insert book start--------");
}
@Override
public void update(Book before, Book after) {
log.info("------------update book start--------");
}
@Override
public void delete(Book book) {
log.info("------------delete book start--------");
}
}
7、canal-admin以及canal-adapter