发送顺序消息
如果你的业务上对消息的发送和消费顺序有较高的需求,那么在发送消息的时候你需要把它们放到同一个消息队列中,因为只有同一个队列的消息才能确保消费的顺序性。下面代码我们在发送消息的时候,调用的是需要传递MessageQueueSelector的send()
,该方法还可以传递一个额外的参数,其对应MessageQueueSelector的select()
的最后一个参数。下面代码中我们一共发送了10条消息,从1开始算顺序为奇数的都放到第一个队列中,顺序为偶数的都放第二个队列中。所以最终第一个队列放了顺序号为1/3/5/7/9的消息,第二个队列中放了顺序号为2/4/6/8/10的消息。
@Test
public void testOrderSend() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr(this.nameServer);
producer.start();
for (int i=0; i<10; i++) {
Message message = new Message("topic1", "tag3", (System.currentTimeMillis() + "---" + System.nanoTime() + "hello ordered message " + i).getBytes());
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int index = (int) arg;
//奇数放一个队列,偶数放一个队列
return mqs.get(index % mqs.size() % 2);
}
}, i);
Assert.assertTrue(sendResult.getSendStatus() == SendStatus.SEND_OK);
}
producer.shutdown();
}
在消费的时候如果需要确保队列中的消息是按照顺序消费的,注册消息监听器时不能再选择并发消费的MessageListenerConcurrently,而需要选择按顺序消费的MessageListenerOrderly。按顺序消费时每个线程会锁定当前队列,只有一条消息消费完了才会释放锁,这样确保同一队列同时只能有一个线程消费一条消息。而并发消费时是不会一直锁队列的。有序消费时同一个队列里面的消息会按照顺序进行消费,但是它们可能被不同的线程消费。如消息的顺序是1/2/3/4/5/6,则按照顺序消费可以保证消息的消费顺序一定是1/2/3/4/5/6,但是消费它们的线程有可能是线程6/5/4/3/2/1。如果要保证有序的消费是在同一个线程完成的,则消费者线程只能有一个,可以通过setConsumeThreadMax()
定义消费线程的最大数,可以通过setConsumeThreadMin
设置消费者线程的最小数。下面的代码中就定义了按照顺序进行消息的消费。
@Test
public void testOrderConsume() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group2");
consumer.setNamesrvAddr(this.nameServer);
consumer.subscribe("topic1", "tag3");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//默认值,订阅以前的消息将被忽略
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
System.out.println(Thread.currentThread().getName() + "消费消息:" + new String(msgs.get(0).getBody()));
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
TimeUnit.SECONDS.sleep(120);
consumer.shutdown();
}
顺序消息消费时返回的ConsumeOrderlyStatus只能是SUCCESS和SUSPEND_CURRENT_QUEUE_A_MOMENT。SUCCESS表示消费成功,可以继续消费下一条,而SUSPEND_CURRENT_QUEUE_A_MOMENT表示消费失败,需要等待一下再继续消费。它不能像并发消费那样跳过消费失败的消息,因为那样就破坏了消息消费的顺序性。
(注:本文是基于RocketMQ4.5.0所写)
相关推荐
一张图进阶 RocketMQ - 消息发送.doc
本文来自于jianshu,文章介绍了RocketMQ网络架构图以及RocketMQ发送普通消息的全流程解读等相关内容。摘要:使用客户端发送一条消息很Easy,在这背后RocketMQ完成了怎么样的操作呢?大道至简,消息队列可以简单概括...
所以在实际产环境中,个Topic会设置成多分区的模式,来持多个消费者,参照下图:在互联企业的实际产环境中,Topic数量和分区都会较多,这就要求消息中间件在多T
RocketMQ 开发指南,包含基本概念以及生产者和消费者的使用等
伴随着互联网的飞速发展, 特别是在近几年中, 移动... 本文介绍了MQTT 协议与RocketMQ 的这种开源项目的应用, 并通过RocketMQ 与Mosquitto 相结合的方式, 实现了一种基于RocketMQ 的MQTT 消息推送服务器及其分布式部署.
2.消息队列:RocketMQ能够保证消息的顺序和高效传递。 3.持久化存储:消息存储在磁盘上,可以进行持久化。 4.高可用:RocketMQ支持主从和HA(高可用)机制。 5.低延迟:RocketMQ设计为低延迟,适合需要实时消息系统。 6.支持...
3.1.1消息发送 3.1.2消费消息 3.2 顺序消息 3.3 ⼴播消息 3.4 延时消息 3.4.1 延时消息介绍 3.4.2 RocketMQ中的延迟消息 3.5 批量消息 3.6 过滤消息 3.6.1 TAG模式过滤 3.6.2 SQL表达式过滤 3.6.3 类过滤模式(基于...
rocketmq集群可视化界面项目,直接修改配置中的集群ip地址即可~
其次,保证消息顺序是通过将相关联的消息放入同一个MessageQueue实现的。此外,本文还涉及了处理消息积压的策略,这在面对大量未处理消息时尤为关键。最后,本文还介绍了RocketMQ的消息轨迹功能,这有助于更好地跟踪...
该资源为在购买了阿里云中间件产品rocketmq消息队列之后,使用的连接rocketmq的demo工程,该程序以 Java 为例,包括普通消息、事务消息、定时消息的测试代码,以及相关 Spring 的配置示例,同时提供tcp连接的程序。
RockeMQ 服务器安装包、控制台代码、消息发送接收代码【同步发送 异步发送 单向发送 顺序发送】
Apache RocketMQ高性能云原生消息队列架构.pdf
2 功能强大,覆盖普通消息、定时(延时)消息、事务消息(提交、回滚)等基本场景的发送场景。集群订阅和广播订阅的消费场景。内含多种命令行参数(例如消费位点的调整,消息体大小调整,并发数调整,JVM 参数调优,...
RocketMQ 的设计基于主题的发布与订阅模式,其核心功能包括消息发送、消息存储(Broker)、消息消费,整体设计追求简单与性能第一。 1. NameServer 设计及其简单,RocketMQ 摈弃了业界常用的 Zookeeper 充当消息管理的...
万亿级数据洪峰下的消息引擎——Apache RocketMQ--阿里.pdf
RocketMQ消息丢失解决方案:事务消息.docx
《RocketMQ高级原理:深入剖析消息系统的核心机制》的关键内容,涵盖RocketMQ的基础组件、消息存储机制、刷盘方式、主从复制、负载均衡、消息重试、死信队列和幂等性等核心概念。RocketMQ作为一款高性能、高可用的...
主要介绍了RocketMq事务消息发送代码流程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
NULL 博文链接:https://hbxflihua.iteye.com/blog/2370891