Trigger功能简单实现下
1、注解扫描需要监听的表
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Trigger {
/**
* trigger名称
*
* @return
*/
String name();
/**
* 监听表对应的模型对象
*
* @return
*/
Class modelClass();
/**
* 监听的表名称
*
* @return
*/
String tableName();
}
//insert表监听
@Component
@Slf4j
@Trigger(name = "area",tableName = "area",modelClass = Area.class)
public class AreaTrigger implements AddTriggerListener<Area> {
@Override
public void execute(Area data) {
log.info("insert data->" + JSON.toJSONString(data));
}
}
//update表更新
@Component
@Slf4j
@Trigger(name = "book", tableName = "book", modelClass = Book.class)
public class BookTrigger implements UpdateTriggerListener<Book> {
@Override
public void execute(Book before, Book after) {
log.info("update data before->" + JSON.toJSONString(before));
log.info("update data after->" + JSON.toJSONString(after));
}
}
//删除表更新
@Component
@Slf4j
@Trigger(name = "product", tableName = "product", modelClass = Product.class)
public class ProductTrigger implements DeletedTriggerListener<Product> {
@Override
public void execute(Product data) {
log.info("delete data->" + JSON.toJSONString(data));
}
}
public interface TriggerListener {
}
public interface AddTriggerListener<T> extends TriggerListener {
void execute(T data);
}
public interface UpdateTriggerListener<T> extends TriggerListener {
void execute(T before,T after);
}
public interface DeletedTriggerListener<T> extends TriggerListener {
void execute(T data);
}
2、扫描全部的triggerListener,记录并判断消息由哪个triggerListener处理
//选择相应的TriggerListener进行处理
@Component
public class TriggerSelect implements ApplicationContextAware {
private ApplicationContext applicationContext;
private ConcurrentHashMap<String, AddTriggerListener> addListenerMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, UpdateTriggerListener> updateListenerMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, DeletedTriggerListener> deleteListenerMap = new ConcurrentHashMap<>();
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String TABLE_KEY = "CANAL_TABLE_NAME";
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@EventListener(classes = {ApplicationReadyEvent.class})
public void init() {
redisTemplate.delete(TABLE_KEY);
Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(Trigger.class);
for (Map.Entry<String, Object> entry : beansWithAnnotation.entrySet()) {
Object entryValue = entry.getValue();
Trigger annotation = entryValue.getClass().getAnnotation(Trigger.class);
redisTemplate.opsForSet().add(TABLE_KEY, annotation.tableName());
if (entryValue instanceof AddTriggerListener) {
addListenerMap.put(annotation.tableName() + "_INSERT", (AddTriggerListener) entryValue);
}
if (entryValue instanceof UpdateTriggerListener) {
updateListenerMap.put(annotation.tableName() + "_UPDATE", (UpdateTriggerListener) entryValue);
}
if (entryValue instanceof DeletedTriggerListener) {
deleteListenerMap.put(annotation.tableName() + "_DELETE", (DeletedTriggerListener) entryValue);
}
}
}
public AddTriggerListener selectAddListener(String tableName) {
String key = tableName + "_INSERT";
if (addListenerMap.containsKey(key)) {
return addListenerMap.get(key);
}
return null;
}
public UpdateTriggerListener selectUpdateListener(String tableName) {
String key = tableName + "_UPDATE";
if (updateListenerMap.containsKey(key)) {
return updateListenerMap.get(key);
}
return null;
}
public DeletedTriggerListener selectDeleteListener(String tableName) {
String key = tableName + "_DELETE";
if (deleteListenerMap.containsKey(key)) {
return deleteListenerMap.get(key);
}
return null;
}
}
3、mq消息消费并发往相应的trigger处理
@Component
@Slf4j
public class MqConsumer {
private static final String GROUP_NAME="DefaultConsumer";
private static final String NAME_ADDRESS="localhost:9876";
private static final String TOPIC_NAME="canal";
private DefaultMQPushConsumer consumer;
@Autowired
private MQListener mqListener;
@PostConstruct
public void init(){
consumer = new DefaultMQPushConsumer(GROUP_NAME);
consumer.setNamesrvAddr(NAME_ADDRESS);
//从消息队列头部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅主题
try {
consumer.subscribe(TOPIC_NAME, "*");
//注册消息监听器
consumer.registerMessageListener(mqListener);
//启动消费端
consumer.start();
} catch (MQClientException e) {
log.error("mq consumer start failed!",e);
}
}
}
@Component
@Slf4j
public class MQListener implements MessageListenerConcurrently {
@Autowired
private TriggerSelect triggerSelect;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (CollectionUtils.isEmpty(msgs)) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
for (MessageExt msg : msgs) {
try {
log.info("Message Consumer: Handle New Message: messageId: " + msg.getMsgId() + ",topic: " + msg.getTopic() + ",tags: " + msg.getTags());
String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
MqMessage mqMessage = JSON.parseObject(messageBody, MqMessage.class);
if ("INSERT".equals(mqMessage.getEventType())) {
AddTriggerListener addTriggerListener = triggerSelect.selectAddListener(mqMessage.getTableName());
if (Objects.nonNull(addTriggerListener)) {
Trigger annotation = addTriggerListener.getClass().getAnnotation(Trigger.class);
Class aClass = annotation.modelClass();
Object object = JSON.parseObject(mqMessage.getJsonBodyAfter(), aClass);
addTriggerListener.execute(object);
}
} else if ("UPDATE".equals(mqMessage.getEventType())) {
UpdateTriggerListener updateTriggerListener = triggerSelect.selectUpdateListener(mqMessage.getTableName());
if (Objects.nonNull(updateTriggerListener)) {
Trigger annotation = updateTriggerListener.getClass().getAnnotation(Trigger.class);
Class aClass = annotation.modelClass();
Object objectBefore = JSON.parseObject(mqMessage.getJsonBodyBefore(), aClass);
Object objectAfter = JSON.parseObject(mqMessage.getJsonBodyAfter(), aClass);
updateTriggerListener.execute(objectBefore, objectAfter);
}
} else if ("DELETE".equals(mqMessage.getEventType())) {
DeletedTriggerListener deletedTriggerListener = triggerSelect.selectDeleteListener(mqMessage.getTableName());
if (Objects.nonNull(deletedTriggerListener)) {
Trigger annotation = deletedTriggerListener.getClass().getAnnotation(Trigger.class);
Class aClass = annotation.modelClass();
Object object = JSON.parseObject(mqMessage.getJsonBodyBefore(), aClass);
deletedTriggerListener.execute(object);
}
}
} catch (Exception e) {
log.error("consumer message failed->" + JSON.toJSONString(msg));
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
4、运行