消息中间件rocketmq(上)

1、定义

消息中间件是基于队列与消息传递技术,在网络环境中为应用系统提供同步或异步、可靠的消息传输的支撑性软件系统 。

2、作用

应用解耦 流量削峰 消息分发 异步

3、启动

参考 https://rocketmq.apache.org/docs/quick-start/

启动nameserver broker

截屏2022-04-01 15.51.38

启动 rocketmq console(github下载)

截屏2022-04-01 15.54.23![](

生产者代码

public class Producer {

    public static void main(String[] args) {
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer("test");
        defaultMQProducer.setNamesrvAddr("localhost:9876");
        try {
            defaultMQProducer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        int i = 100;
        while (i > 0) {
            String body = System.currentTimeMillis() + ":hello";
            Message message = new Message("test", "mm", body.getBytes());
            try {
                SendResult sendResult = defaultMQProducer.send(message);
                System.out.println(body);
            } catch (Exception e) {
                e.printStackTrace();
            }
            i--;
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        defaultMQProducer.shutdown();
    }
}

消费者代码

public class Consumer {

    public static void main(String[] args) {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");
        consumer.setNamesrvAddr("localhost:9876");
        //从消息队列头部开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //设置广播消费模式
        consumer.setMessageModel(MessageModel.BROADCASTING);

        //订阅主题
        try {
            consumer.subscribe("test", "*");
            //注册消息监听器
            consumer.registerMessageListener(new MQListener());

            //启动消费端
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

        System.err.println("Message Consumer Start...");

    }
}

监听器

public class MQListener implements MessageListenerConcurrently {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if (CollectionUtils.isEmpty(msgs)){
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        msgs.stream()
                .forEach(msg -> {
                    try {
                        String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
                        System.out.println(msg.getMsgId());
                        System.out.println(msg.getKeys());
                        System.out.println(msg.getTags());
                        System.out.println(msg.getTopic());
                        System.out.println(messageBody);
                        System.err.println("Message Consumer: Handle New Message: messageId: " + msg.getMsgId() + ",topic: " + msg.getTopic() + ",tags: " + msg.getTags());
                    } catch (Exception e) {
                    }
                });
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

消费者

public class MQPullConsumer {

    private static final Map<MessageQueue,Long> OFFSE_TABLE = new HashMap<MessageQueue,Long>();

    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupName");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.start();
        consumer.setConsumerPullTimeoutMillis(1000);
        // 从指定topic中拉取所有消息队列
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("test");
        for(MessageQueue mq:mqs){
            try {
                // 获取消息的offset,指定从store中获取
                long offset = consumer.fetchConsumeOffset(mq,true);
                System.out.println("consumer from the queue:"+mq+":"+offset);
                while(true){
                    long start=System.currentTimeMillis();
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq, null,
                            getMessageQueueOffset(mq), 32);
                    long end=System.currentTimeMillis();
                    System.out.println("end-start:"+(end-start));
                    putMessageQueueOffset(mq,pullResult.getNextBeginOffset());
                    switch(pullResult.getPullStatus()){
                        case FOUND:
                            List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                            for (MessageExt m : messageExtList) {
                                System.out.println(m.getMsgId());
                                System.out.println(new String(m.getBody()));
                            }
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break;
                        case OFFSET_ILLEGAL:
                            break;
                    }

                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        consumer.shutdown();
    }

    // 保存上次消费的消息下标
    private static void putMessageQueueOffset(MessageQueue mq,
                                              long nextBeginOffset) {
        OFFSE_TABLE.put(mq, nextBeginOffset);
    }

    // 获取上次消费的消息的下标
    private static Long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSE_TABLE.get(mq);
        if(offset != null){
            return offset;
        }
        return 0l;
    }


}

4、基本概念

Nameserver:注册中心,负责broker的管理和路由管理。

Broker:消息存储

Producer:生产者

Consumer:消费者

Consumer group:多个消费者组成消费者组

Topic:主题

Message queue :一个topic下划分多个queue,分为读写队列,可以并发发送、消费。

Message:消息

Tag:标签,消息标签,可以用于服务端过滤

5、broker核心配置

#集群名称
brokerClusterName=default-cluster
#broker名称  
brokerName=broker-b        
#0位master,大于0为slave
brokerId=0  
#- ASYNC_MASTER 异步复制Master  SYNC_MASTER 同步双写Master
brokerRole=SYNC_MASTER 
#- ASYNC_FLUSH 异步刷盘  - SYNC_FLUSH 同步刷盘 
flushDiskType=SYNC_FLUSH 
#nameserver地址
namesrvAddr=100.100.100.100:9876;100.100.100.101:9876 
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 
defaultTopicQueueNums=4
#Broker 对外服务的监听端口
listenPort=10911    
#删除文件时间点,默认凌晨 4点
deleteWhen=04    
#文件保留时间,默认 48 小时
fileReservedTime=120    
#commitLog每个文件的大小默认1G 
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整  
mapedFileSizeConsumeQueue=300000 
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=75 
#存储路径
storePathRootDir=/data/mq/rocketmq4.3/store 
#commitLog 存储路径 
storePathCommitLog=/data/mq/rocketmq4.3/store/commitlog 
#消费队列存储路径存储路径 
storePathConsumeQueue=/data/mq/rocketmq4.3/store/consumequeue 
#消息索引存储路径
storePathIndex=/data/mq/rocketmq4.3/store/index 
#checkpoint 文件存储路径 
storeCheckpoint=/data/mq/rocketmq4.3/store/checkpoint 
 #abort 文件存储路径 
abortFile=/data/mq/rocketmq4.3/store/abort
#限制的消息大小
maxMessageSize=65536 
#并发send线程数,多线程来发送消息可能会出现broker busy
sendMessageThreadPoolNums=128

6、控制台命令

创建topic、删除topic、创建删除订阅组、更新broker信息、查看列表信息、根据messageid查询

 sh mqadmin updateTopic -n 127.0.0.1:9876 -b 127.0.0.1:10911 -t test

博客参考

7、消费者

DefaultMQPushConsumer:系统控制消费,系统收取到消息,自动处理,自动保存offset,加入新的消费者会自动负载均衡。

DefaultMQPullConsumer:手动拉取,手动处理offset,根据从broker返回的消费类型做相应的处理,可以设置长轮询

消息模式

clustering:集群消费模式,消息通过一定负载均衡策略,将消息分发到多个consumer中。

brocasting:将所有消息分发给Consume Group中每个消费者消费。

push方式有很好的实时性,但是主动推送,加大了server的工作,且client状态不可控

pull方式可控,但是间隔时间不好设置,容易出现忙等,也有可能时间太长导致实时性降低

8、生产者

DefaultMQProducer

返回状态(与刷盘策略,主从同步机制有关)

package org.apache.rocketmq.client.producer;

public enum SendStatus {
    /**
     * 消息已经发送成功
     */
    SEND_OK,
    /**
     * 后面这三种情况,如果业务不允许丢消息,需要做相应的补偿,做可靠性的重投
     */
    /**
     * 消息发送成功,但是服务器刷盘的时候超时了,消息已经进入服务器队列,只有服务器宕机,消息才会丢失
     * 会等待下一次刷盘时机再去刷盘,如果服务器宕机,broker挂掉,消息就会丢失,返回此状态需要考虑重发消息
     */
    FLUSH_DISK_TIMEOUT,
    /**
     * 主从同步的时候,同步到slave的时候超时了,消息已经步入到slave,但是消息也是超时了,在slave宕机的时候
     * 消息才会丢失,返回此状态需要考虑重发消息,需要保证可靠性
     */
    FLUSH_SLAVE_TIMEOUT,
    /**
     * 消息已经发送成功,但是此时slave是不可用的
     */
    SLAVE_NOT_AVAILABLE,
}

设置自定义消息发送规则:MessageQueueSelector

支持分布式事务:两阶段提交,那么第二阶段必定涉及到查询消息,修改状态。通过使用offset偏移量查找消息,修改消息状态的。缺点就是导致系统脏页过多。

支持定时消息:setDelayTimeLevel,但是不支持非常精确的定时。

results matching ""

    No results matching ""