Trigger功能简单实现上
实现Trigger功能,主要作用是监听数据库表,快速增加自定义逻辑。
前置环境:
mysql binlog开启+数据库表准备
Canal server启动:
rocketmq启动+console控制台+topic配置
服务端:
1、canal消息接收
@Component
@Slf4j
public class CanalComponent {
private static final String host_name = "localhost";
private static final int port = 11111;
private static final String destination = "example";
private static Integer batchSize = 10;
private static Long timeout = 1L;
private TimeUnit unit = TimeUnit.SECONDS;
private Thread workThread;
protected volatile boolean flag;
@Autowired
private CanalMessageHandler canalMessageHandler;
@PostConstruct
public void init() {
workThread = new Thread(this::process);
workThread.setName("canal-client-thread");
workThread.start();
flag = true;
}
@PreDestroy
public void destroy() {
flag = false;
if (workThread != null) {
workThread.interrupt();
}
}
private void process() {
CanalConnector connector = null;
try {
connector = CanalConnectors.newSingleConnector(new InetSocketAddress(host_name, port), destination, "", "");
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
while (flag) {
Message message = connector.getWithoutAck(batchSize, timeout, unit);
log.info("获取消息 {}", message);
long batchId = message.getId();
if (message.getId() != -1 && message.getEntries().size() != 0) {
canalMessageHandler.handleMessage(message);
}
connector.ack(batchId);
}
} catch (Exception e) {
log.error("canal client 异常", e);
} finally {
if (connector != null) {
connector.disconnect();
}
}
}
}
2、canal消息处理
@Component
@Slf4j
public class CanalMessageHandler {
@Autowired
private MqProducer mqProducer;
@Autowired
private FilterTable filterTable;
public void handleMessage(Message message) {
try {
printEntries(message.getEntries());
} catch (Exception e) {
log.error("canal consume message failed!", e);
}
}
private void printEntries(List<CanalEntry.Entry> entries) throws Exception {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
continue;
}
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
CanalEntry.EventType eventType = rowChange.getEventType();
log.info(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
if (!filterTable.isContains(entry.getHeader().getTableName())) {
continue;
}
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
MqMessage mqMessage = new MqMessage();
mqMessage.setTableName(entry.getHeader().getTableName());
switch (rowChange.getEventType()) {
case INSERT:
mqMessage.setEventType("INSERT");
mqMessage.setJsonBodyAfter(toJsonBody(rowData.getAfterColumnsList()));
break;
case UPDATE:
mqMessage.setEventType("UPDATE");
mqMessage.setJsonBodyBefore(toJsonBody(rowData.getBeforeColumnsList()));
mqMessage.setJsonBodyAfter(toJsonBody(rowData.getAfterColumnsList()));
break;
case DELETE:
mqMessage.setEventType("DELETE");
mqMessage.setJsonBodyBefore(toJsonBody(rowData.getBeforeColumnsList()));
break;
default:
break;
}
mqProducer.sendMessage(mqMessage);
}
}
}
private String toJsonBody(List<CanalEntry.Column> columns) {
JSONObject object = new JSONObject();
for (CanalEntry.Column column : columns) {
object.put(column.getName(), column.getValue());
}
return object.toJSONString();
}
}
3、组装发往mq
@Data
public class MqMessage {
private String tableName;
private String eventType;
private String jsonBodyBefore;
private String jsonBodyAfter;
}
@Component
@Slf4j
public class MqProducer {
private static final String GROUP_NAME = "test";
private static final String NAME_ADDRESS = "localhost:9876";
private static final String TOPIC_NAME = "canal";
private DefaultMQProducer defaultMQProducer;
@PostConstruct
public void init() {
defaultMQProducer = new DefaultMQProducer(GROUP_NAME);
defaultMQProducer.setNamesrvAddr(NAME_ADDRESS);
try {
defaultMQProducer.start();
} catch (MQClientException e) {
log.error("mq producer start error!", e);
}
}
public boolean sendMessage(MqMessage mqMessage) {
Message message = new Message(TOPIC_NAME, null, JSON.toJSONBytes(mqMessage));
try {
SendResult sendResult = defaultMQProducer.send(message);
if (Objects.nonNull(sendResult) && SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
return true;
}
} catch (Exception e) {
log.error("send message failed!,info->" + JSON.toJSONString(message));
}
return false;
}
@PreDestroy
public void destroy() {
if (defaultMQProducer != null) {
defaultMQProducer.shutdown();
}
}
}
4、运行,修改数据库表字段