Trigger功能简单实现下

1、注解扫描需要监听的表

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Trigger {

    /**
     * trigger名称
     *
     * @return
     */
    String name();

    /**
     * 监听表对应的模型对象
     *
     * @return
     */
    Class modelClass();

    /**
     * 监听的表名称
     *
     * @return
     */
    String tableName();

}

//insert表监听
@Component
@Slf4j
@Trigger(name = "area",tableName = "area",modelClass = Area.class)
public class AreaTrigger implements AddTriggerListener<Area> {

    @Override
    public void execute(Area data) {
        log.info("insert data->" + JSON.toJSONString(data));
    }
}

//update表更新
@Component
@Slf4j
@Trigger(name = "book", tableName = "book", modelClass = Book.class)
public class BookTrigger implements UpdateTriggerListener<Book> {

    @Override
    public void execute(Book before, Book after) {
        log.info("update data before->" + JSON.toJSONString(before));
        log.info("update data after->" + JSON.toJSONString(after));
    }
}

//删除表更新
@Component
@Slf4j
@Trigger(name = "product", tableName = "product", modelClass = Product.class)
public class ProductTrigger implements DeletedTriggerListener<Product> {
    @Override
    public void execute(Product data) {
        log.info("delete data->" + JSON.toJSONString(data));
    }
}


public interface TriggerListener {
}

public interface AddTriggerListener<T> extends TriggerListener {

    void execute(T data);

}

public interface UpdateTriggerListener<T> extends TriggerListener {

    void execute(T before,T after);

}

public interface DeletedTriggerListener<T> extends TriggerListener {

    void execute(T data);

}

2、扫描全部的triggerListener,记录并判断消息由哪个triggerListener处理

//选择相应的TriggerListener进行处理
@Component
public class TriggerSelect implements ApplicationContextAware {

    private ApplicationContext applicationContext;

    private ConcurrentHashMap<String, AddTriggerListener> addListenerMap = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, UpdateTriggerListener> updateListenerMap = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, DeletedTriggerListener> deleteListenerMap = new ConcurrentHashMap<>();

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    private static final String TABLE_KEY = "CANAL_TABLE_NAME";

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @EventListener(classes = {ApplicationReadyEvent.class})
    public void init() {
        redisTemplate.delete(TABLE_KEY);
        Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(Trigger.class);
        for (Map.Entry<String, Object> entry : beansWithAnnotation.entrySet()) {
            Object entryValue = entry.getValue();
            Trigger annotation = entryValue.getClass().getAnnotation(Trigger.class);
            redisTemplate.opsForSet().add(TABLE_KEY, annotation.tableName());
            if (entryValue instanceof AddTriggerListener) {
                addListenerMap.put(annotation.tableName() + "_INSERT", (AddTriggerListener) entryValue);
            }
            if (entryValue instanceof UpdateTriggerListener) {
                updateListenerMap.put(annotation.tableName() + "_UPDATE", (UpdateTriggerListener) entryValue);
            }
            if (entryValue instanceof DeletedTriggerListener) {
                deleteListenerMap.put(annotation.tableName() + "_DELETE", (DeletedTriggerListener) entryValue);
            }
        }
    }

    public AddTriggerListener selectAddListener(String tableName) {
        String key = tableName + "_INSERT";
        if (addListenerMap.containsKey(key)) {
            return addListenerMap.get(key);
        }
        return null;
    }

    public UpdateTriggerListener selectUpdateListener(String tableName) {
        String key = tableName + "_UPDATE";
        if (updateListenerMap.containsKey(key)) {
            return updateListenerMap.get(key);
        }
        return null;
    }

    public DeletedTriggerListener selectDeleteListener(String tableName) {
        String key = tableName + "_DELETE";
        if (deleteListenerMap.containsKey(key)) {
            return deleteListenerMap.get(key);
        }
        return null;
    }
}

3、mq消息消费并发往相应的trigger处理

@Component
@Slf4j
public class MqConsumer {

    private static final String GROUP_NAME="DefaultConsumer";

    private static final String NAME_ADDRESS="localhost:9876";

    private static final String TOPIC_NAME="canal";

    private DefaultMQPushConsumer consumer;

    @Autowired
    private MQListener mqListener;

    @PostConstruct
    public void init(){
        consumer = new DefaultMQPushConsumer(GROUP_NAME);
        consumer.setNamesrvAddr(NAME_ADDRESS);
        //从消息队列头部开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //订阅主题
        try {
            consumer.subscribe(TOPIC_NAME, "*");
            //注册消息监听器
            consumer.registerMessageListener(mqListener);
            //启动消费端
            consumer.start();
        } catch (MQClientException e) {
            log.error("mq consumer start failed!",e);
        }
    }

}


@Component
@Slf4j
public class MQListener implements MessageListenerConcurrently {

    @Autowired
    private TriggerSelect triggerSelect;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if (CollectionUtils.isEmpty(msgs)) {
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        for (MessageExt msg : msgs) {
            try {
                log.info("Message Consumer: Handle New Message: messageId: " + msg.getMsgId() + ",topic: " + msg.getTopic() + ",tags: " + msg.getTags());
                String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
                MqMessage mqMessage = JSON.parseObject(messageBody, MqMessage.class);
                if ("INSERT".equals(mqMessage.getEventType())) {
                    AddTriggerListener addTriggerListener = triggerSelect.selectAddListener(mqMessage.getTableName());
                    if (Objects.nonNull(addTriggerListener)) {
                        Trigger annotation = addTriggerListener.getClass().getAnnotation(Trigger.class);
                        Class aClass = annotation.modelClass();
                        Object object = JSON.parseObject(mqMessage.getJsonBodyAfter(), aClass);
                        addTriggerListener.execute(object);
                    }
                } else if ("UPDATE".equals(mqMessage.getEventType())) {
                    UpdateTriggerListener updateTriggerListener = triggerSelect.selectUpdateListener(mqMessage.getTableName());
                    if (Objects.nonNull(updateTriggerListener)) {
                        Trigger annotation = updateTriggerListener.getClass().getAnnotation(Trigger.class);
                        Class aClass = annotation.modelClass();
                        Object objectBefore = JSON.parseObject(mqMessage.getJsonBodyBefore(), aClass);
                        Object objectAfter = JSON.parseObject(mqMessage.getJsonBodyAfter(), aClass);
                        updateTriggerListener.execute(objectBefore, objectAfter);
                    }
                } else if ("DELETE".equals(mqMessage.getEventType())) {
                    DeletedTriggerListener deletedTriggerListener = triggerSelect.selectDeleteListener(mqMessage.getTableName());
                    if (Objects.nonNull(deletedTriggerListener)) {
                        Trigger annotation = deletedTriggerListener.getClass().getAnnotation(Trigger.class);
                        Class aClass = annotation.modelClass();
                        Object object = JSON.parseObject(mqMessage.getJsonBodyBefore(), aClass);
                        deletedTriggerListener.execute(object);
                    }
                }
            } catch (Exception e) {
                log.error("consumer message failed->" + JSON.toJSONString(msg));
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

4、运行

截屏2022-04-12 15.56.45

results matching ""

    No results matching ""