消息中间件rocketmq(上)
1、定义
消息中间件是基于队列与消息传递技术,在网络环境中为应用系统提供同步或异步、可靠的消息传输的支撑性软件系统 。
2、作用
应用解耦 流量削峰 消息分发 异步
3、启动
参考 https://rocketmq.apache.org/docs/quick-start/
启动nameserver broker
启动 rocketmq console(github下载)
![](
生产者代码
public class Producer {
public static void main(String[] args) {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("test");
defaultMQProducer.setNamesrvAddr("localhost:9876");
try {
defaultMQProducer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
int i = 100;
while (i > 0) {
String body = System.currentTimeMillis() + ":hello";
Message message = new Message("test", "mm", body.getBytes());
try {
SendResult sendResult = defaultMQProducer.send(message);
System.out.println(body);
} catch (Exception e) {
e.printStackTrace();
}
i--;
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
defaultMQProducer.shutdown();
}
}
消费者代码
public class Consumer {
public static void main(String[] args) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");
consumer.setNamesrvAddr("localhost:9876");
//从消息队列头部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置广播消费模式
consumer.setMessageModel(MessageModel.BROADCASTING);
//订阅主题
try {
consumer.subscribe("test", "*");
//注册消息监听器
consumer.registerMessageListener(new MQListener());
//启动消费端
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
System.err.println("Message Consumer Start...");
}
}
监听器
public class MQListener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (CollectionUtils.isEmpty(msgs)){
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
msgs.stream()
.forEach(msg -> {
try {
String messageBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println(msg.getMsgId());
System.out.println(msg.getKeys());
System.out.println(msg.getTags());
System.out.println(msg.getTopic());
System.out.println(messageBody);
System.err.println("Message Consumer: Handle New Message: messageId: " + msg.getMsgId() + ",topic: " + msg.getTopic() + ",tags: " + msg.getTags());
} catch (Exception e) {
}
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
消费者
public class MQPullConsumer {
private static final Map<MessageQueue,Long> OFFSE_TABLE = new HashMap<MessageQueue,Long>();
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupName");
consumer.setNamesrvAddr("localhost:9876");
consumer.start();
consumer.setConsumerPullTimeoutMillis(1000);
// 从指定topic中拉取所有消息队列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("test");
for(MessageQueue mq:mqs){
try {
// 获取消息的offset,指定从store中获取
long offset = consumer.fetchConsumeOffset(mq,true);
System.out.println("consumer from the queue:"+mq+":"+offset);
while(true){
long start=System.currentTimeMillis();
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null,
getMessageQueueOffset(mq), 32);
long end=System.currentTimeMillis();
System.out.println("end-start:"+(end-start));
putMessageQueueOffset(mq,pullResult.getNextBeginOffset());
switch(pullResult.getPullStatus()){
case FOUND:
List<MessageExt> messageExtList = pullResult.getMsgFoundList();
for (MessageExt m : messageExtList) {
System.out.println(m.getMsgId());
System.out.println(new String(m.getBody()));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break;
case OFFSET_ILLEGAL:
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
consumer.shutdown();
}
// 保存上次消费的消息下标
private static void putMessageQueueOffset(MessageQueue mq,
long nextBeginOffset) {
OFFSE_TABLE.put(mq, nextBeginOffset);
}
// 获取上次消费的消息的下标
private static Long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if(offset != null){
return offset;
}
return 0l;
}
}
4、基本概念
Nameserver:注册中心,负责broker的管理和路由管理。
Broker:消息存储
Producer:生产者
Consumer:消费者
Consumer group:多个消费者组成消费者组
Topic:主题
Message queue :一个topic下划分多个queue,分为读写队列,可以并发发送、消费。
Message:消息
Tag:标签,消息标签,可以用于服务端过滤
5、broker核心配置
#集群名称
brokerClusterName=default-cluster
#broker名称
brokerName=broker-b
#0位master,大于0为slave
brokerId=0
#- ASYNC_MASTER 异步复制Master SYNC_MASTER 同步双写Master
brokerRole=SYNC_MASTER
#- ASYNC_FLUSH 异步刷盘 - SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#nameserver地址
namesrvAddr=100.100.100.100:9876;100.100.100.101:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=75
#存储路径
storePathRootDir=/data/mq/rocketmq4.3/store
#commitLog 存储路径
storePathCommitLog=/data/mq/rocketmq4.3/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/data/mq/rocketmq4.3/store/consumequeue
#消息索引存储路径
storePathIndex=/data/mq/rocketmq4.3/store/index
#checkpoint 文件存储路径
storeCheckpoint=/data/mq/rocketmq4.3/store/checkpoint
#abort 文件存储路径
abortFile=/data/mq/rocketmq4.3/store/abort
#限制的消息大小
maxMessageSize=65536
#并发send线程数,多线程来发送消息可能会出现broker busy
sendMessageThreadPoolNums=128
6、控制台命令
创建topic、删除topic、创建删除订阅组、更新broker信息、查看列表信息、根据messageid查询
sh mqadmin updateTopic -n 127.0.0.1:9876 -b 127.0.0.1:10911 -t test
7、消费者
DefaultMQPushConsumer:系统控制消费,系统收取到消息,自动处理,自动保存offset,加入新的消费者会自动负载均衡。
DefaultMQPullConsumer:手动拉取,手动处理offset,根据从broker返回的消费类型做相应的处理,可以设置长轮询
消息模式
clustering:集群消费模式,消息通过一定负载均衡策略,将消息分发到多个consumer中。
brocasting:将所有消息分发给Consume Group中每个消费者消费。
push方式有很好的实时性,但是主动推送,加大了server的工作,且client状态不可控
pull方式可控,但是间隔时间不好设置,容易出现忙等,也有可能时间太长导致实时性降低
8、生产者
DefaultMQProducer
返回状态(与刷盘策略,主从同步机制有关)
package org.apache.rocketmq.client.producer;
public enum SendStatus {
/**
* 消息已经发送成功
*/
SEND_OK,
/**
* 后面这三种情况,如果业务不允许丢消息,需要做相应的补偿,做可靠性的重投
*/
/**
* 消息发送成功,但是服务器刷盘的时候超时了,消息已经进入服务器队列,只有服务器宕机,消息才会丢失
* 会等待下一次刷盘时机再去刷盘,如果服务器宕机,broker挂掉,消息就会丢失,返回此状态需要考虑重发消息
*/
FLUSH_DISK_TIMEOUT,
/**
* 主从同步的时候,同步到slave的时候超时了,消息已经步入到slave,但是消息也是超时了,在slave宕机的时候
* 消息才会丢失,返回此状态需要考虑重发消息,需要保证可靠性
*/
FLUSH_SLAVE_TIMEOUT,
/**
* 消息已经发送成功,但是此时slave是不可用的
*/
SLAVE_NOT_AVAILABLE,
}
设置自定义消息发送规则:MessageQueueSelector
支持分布式事务:两阶段提交,那么第二阶段必定涉及到查询消息,修改状态。通过使用offset偏移量查找消息,修改消息状态的。缺点就是导致系统脏页过多。
支持定时消息:setDelayTimeLevel,但是不支持非常精确的定时。