增量数据同步-canal

1、定义

基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

2、原理

canal的工作原理就是把自己伪装成MySQL slave,模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如MySQL,Kafka,Elastic Search等等。 3、环境搭建

mysql搭建+binlog开启

docker run --name canal-trigger -p 11111:11111 -d canal/canal-server

4、参数

canal可以监听多个mysql,配置文件主要分为主配置文件canal.properties以及各个目标下的instance.properties

# 数据库地址
canal.instance.master.address=host.docker.internal:3307
# binlog日志名称
canal.instance.master.journal.name=
# mysql主库链接时起始的binlog偏移量
canal.instance.master.position=
# mysql主库链接时起始的binlog的时间戳
canal.instance.master.timestamp=
canal.instance.master.gtid=

# username/password
# 在MySQL服务器授权的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 字符集
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false

# table regex .*\\..*表示监听所有表 也可以写具体的表名,用,隔开
canal.instance.filter.regex=.*\\..*
# mysql 数据解析表的黑名单,多个表用,隔开
canal.instance.filter.black.regex=

5、客户端代码

public class CanalTest {

    public static void main(String[] args) throws Exception {

        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
        try {
            connector.connect();
            //监听的表,    格式为数据库.表名,数据库.表名
            connector.subscribe(".*\\..*");
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(1); // 获取指定数量的数据
                long batchId = message.getId();
                if (batchId == -1 || message.getEntries().isEmpty()) {
                    Thread.sleep(1000);
                    continue;
                }
                // System.out.println(message.getEntries());
                printEntries(message.getEntries());
                connector.ack(batchId);// 提交确认,消费成功,通知server删除数据
                // connector.rollback(batchId);// 处理失败, 回滚数据,后续重新获取数据
            }
        }catch (Exception e){

        }finally {
            connector.disconnect();
        }
    }

    private static void printEntries(List<Entry> entries) throws Exception {
        for (Entry entry : entries) {
            if (entry.getEntryType() != EntryType.ROWDATA) {
                continue;
            }

            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

            EventType eventType = rowChange.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));

            for (RowData rowData : rowChange.getRowDatasList()) {
                switch (rowChange.getEventType()) {
                    case INSERT:
                        System.out.println("INSERT ");
                        printColumns(rowData.getAfterColumnsList());
                        break;
                    case UPDATE:
                        System.out.println("UPDATE ");
                        printColumns(rowData.getAfterColumnsList());
                        break;
                    case DELETE:
                        System.out.println("DELETE ");
                        printColumns(rowData.getBeforeColumnsList());
                        break;

                    default:
                        break;
                }
            }
        }
    }

    private static void printColumns(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
        }
    }
}

主要流程是:

启动->轮询获取消息-处理消息->ack确认

6、springboot集成

maven配置

        <dependency>
            <groupId>top.javatool</groupId>
            <artifactId>canal-spring-boot-starter</artifactId>
            <version>1.2.1-RELEASE</version>
        </dependency>

application.yml配置

canal:
  server: 127.0.0.1:11111
  destination: example

handler

//表名
@CanalTable(value = "book")
@Slf4j
@Component
public class BookHandler implements EntryHandler<Book> {
    @Override
    public void insert(Book book) {
        log.info("------------insert book start--------");
    }

    @Override
    public void update(Book before, Book after) {
        log.info("------------update book start--------");
    }

    @Override
    public void delete(Book book) {
        log.info("------------delete book start--------");
    }
}

7、canal-admin以及canal-adapter

博客

results matching ""

    No results matching ""