Trigger功能简单实现上

实现Trigger功能,主要作用是监听数据库表,快速增加自定义逻辑。

前置环境:

mysql binlog开启+数据库表准备

截屏2022-04-12 15.41.02

Canal server启动:

截屏2022-04-12 15.41.44

rocketmq启动+console控制台+topic配置

截屏2022-04-12 15.42.36

服务端:

1、canal消息接收

@Component
@Slf4j
public class CanalComponent {

    private static final String host_name = "localhost";

    private static final int port = 11111;

    private static final String destination = "example";

    private static Integer batchSize = 10;

    private static Long timeout = 1L;

    private TimeUnit unit = TimeUnit.SECONDS;

    private Thread workThread;

    protected volatile boolean flag;

    @Autowired
    private CanalMessageHandler canalMessageHandler;

    @PostConstruct
    public void init() {
        workThread = new Thread(this::process);
        workThread.setName("canal-client-thread");
        workThread.start();
        flag = true;
    }

    @PreDestroy
    public void destroy() {
        flag = false;
        if (workThread != null) {
            workThread.interrupt();
        }
    }

    private void process() {
        CanalConnector connector = null;
        try {
            connector = CanalConnectors.newSingleConnector(new InetSocketAddress(host_name, port), destination, "", "");
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            while (flag) {
                Message message = connector.getWithoutAck(batchSize, timeout, unit);
                log.info("获取消息 {}", message);
                long batchId = message.getId();
                if (message.getId() != -1 && message.getEntries().size() != 0) {
                    canalMessageHandler.handleMessage(message);
                }
                connector.ack(batchId);
            }
        } catch (Exception e) {
            log.error("canal client 异常", e);
        } finally {
            if (connector != null) {
                connector.disconnect();
            }
        }
    }


}

2、canal消息处理

@Component
@Slf4j
public class CanalMessageHandler {

    @Autowired
    private MqProducer mqProducer;

    @Autowired
    private FilterTable filterTable;

    public void handleMessage(Message message) {
        try {
            printEntries(message.getEntries());
        } catch (Exception e) {
            log.error("canal consume message failed!", e);
        }
    }

    private void printEntries(List<CanalEntry.Entry> entries) throws Exception {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
                continue;
            }
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            CanalEntry.EventType eventType = rowChange.getEventType();
            log.info(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
            if (!filterTable.isContains(entry.getHeader().getTableName())) {
                continue;
            }
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                MqMessage mqMessage = new MqMessage();
                mqMessage.setTableName(entry.getHeader().getTableName());
                switch (rowChange.getEventType()) {
                    case INSERT:
                        mqMessage.setEventType("INSERT");
                        mqMessage.setJsonBodyAfter(toJsonBody(rowData.getAfterColumnsList()));
                        break;
                    case UPDATE:
                        mqMessage.setEventType("UPDATE");
                        mqMessage.setJsonBodyBefore(toJsonBody(rowData.getBeforeColumnsList()));
                        mqMessage.setJsonBodyAfter(toJsonBody(rowData.getAfterColumnsList()));
                        break;
                    case DELETE:
                        mqMessage.setEventType("DELETE");
                        mqMessage.setJsonBodyBefore(toJsonBody(rowData.getBeforeColumnsList()));
                        break;
                    default:
                        break;
                }
                mqProducer.sendMessage(mqMessage);
            }
        }
    }

    private String toJsonBody(List<CanalEntry.Column> columns) {
        JSONObject object = new JSONObject();
        for (CanalEntry.Column column : columns) {
            object.put(column.getName(), column.getValue());
        }
        return object.toJSONString();
    }

}

3、组装发往mq

@Data
public class MqMessage {

    private String tableName;

    private String eventType;

    private String jsonBodyBefore;

    private String jsonBodyAfter;

}



@Component
@Slf4j
public class MqProducer {

    private static final String GROUP_NAME = "test";

    private static final String NAME_ADDRESS = "localhost:9876";

    private static final String TOPIC_NAME = "canal";

    private DefaultMQProducer defaultMQProducer;

    @PostConstruct
    public void init() {
        defaultMQProducer = new DefaultMQProducer(GROUP_NAME);
        defaultMQProducer.setNamesrvAddr(NAME_ADDRESS);
        try {
            defaultMQProducer.start();
        } catch (MQClientException e) {
            log.error("mq producer start error!", e);
        }
    }

    public boolean sendMessage(MqMessage mqMessage) {
        Message message = new Message(TOPIC_NAME, null, JSON.toJSONBytes(mqMessage));
        try {
            SendResult sendResult = defaultMQProducer.send(message);
            if (Objects.nonNull(sendResult) && SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
                return true;
            }
        } catch (Exception e) {
            log.error("send message failed!,info->" + JSON.toJSONString(message));
        }
        return false;
    }

    @PreDestroy
    public void destroy() {
        if (defaultMQProducer != null) {
            defaultMQProducer.shutdown();
        }
    }


}

4、运行,修改数据库表字段

截屏2022-04-12 15.46.25

截屏2022-04-12 15.46.44

results matching ""

    No results matching ""