消息的群集消费和广播消费
RocketMQ的消费者进行消息消费时有两种消费方式,群集消费和广播消费。默认是群集消费。
群集消费
Consumer都有一个Group,当两个Consumer实例属于同一个Group时,它们会共享消息队列中的消息消费位移,即同一条消息只会由一个消费者实例消费。实际上一个队列只会分配给一个消费者实例,那么属于该队列中的消息就只能被一个消费者实例消费了。当一个消费者实例挂了后,会重新为消费者实例分配队列,这样原本分配给挂了的那个实例的队列中又会分配给其它消费者进行消费。RocketMQ限制了同一个JVM中不允许有相同Group名称的Consumer实例存在,所以同一Group的多个Consumer往往是部署在不同机器上的,通常是同一程序部署了多份。假设现在有名为group1的Consumer在Machine1上部署了一份,在Machine2上部署了一份,它们订阅的消息队列中有消息1-10共10条消息,那么可能1-5条消息由Machine1上的消费者消费,6-10条消息由Machine2上的消费者消费。如果在Machine1上还部署了一个名为group2的消费者,则该消费者可以从消息队列中消费1-10条消息,因为它跟group1是不同的Group,不同Group的消费者对同一队列的消费是完全独立的。比如下面的代码对应的就是群集消费。
@Test
public void testConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group1");
consumer.setNamesrvAddr(nameServer);
consumer.subscribe("topic1", "tag1");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + "收到了消息" + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
TimeUnit.SECONDS.sleep(120);
}
广播消费
消费者的消费模式是广播消费时,每一个消费者实例对消息队列中消息的消费是独立的,而不管它们的Group是什么。假设现在有名为group1的Consumer在Machine1上部署了一份,在Machine2上部署了一份,它们订阅的消息队列中有消息1-10共10条消息,那么可能1-10条消息由Machine1上的消费者消费,它们也会由Machine2上的消费者消费。可以通过消费者的setMessageModel(MessageModel.BROADCASTING)
指定消息的消费模式为广播消费,它的默认值是CLUSTERING。
@Test
public void testBroadcastConsume() throws Exception {
String topic = "topic1";
String tag = "tag1";
String consumerGroup = "consumer-group1";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(this.nameServer);
//广播方式,同一消息可以被所有的消费者消费。
consumer.setMessageModel(MessageModel.BROADCASTING);
try {
consumer.subscribe(topic, tag);
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.println("消费了消息——" + msgs.get(0).getMsgId());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}
(注:本文是基于RocketMQ4.5.0所写)
相关推荐
。
。
Windows Server 2003服务器群集创建和配置指南 服务器群集是一组协同工作并运行Microsoft群集服务(Microsoft Cluster Service,MSCS)的独立服务器。服务器群集为资源和应用程序提供高可用性、故障恢复、可伸缩性...
rocketmq-spring-boot-starter 阿里云RocketMQSpring图书版支持功能: 发送普通消息的三种模式:同步,异步和单向 订阅消息群集,广播 发送和接收顺序消息 交易讯息 延迟讯息 接收和接收定时消息定时消息和延迟消息...
适用于Prometheus的Apache RocketMQ导出器。 目录 制片人 消费群体 消费者 格拉法纳仪表板 快速开始 兼容性 支持Apache RocketMQ版本4.3.2(及更高版本)。 配置 可以使用不同的属性来配置该映像,有关配置示例,...
使用并发模式的消息(广播/群集) 消费有序消息 使用标签或sql92表达式过滤消息 支持消息跟踪 支持身份验证和授权 支持请求-回复消息交换模式 以推/拉模式使用消息 先决条件 JDK 1.8及更高版本 3.0及更高...
Apache RocketMQ是一个统一的消息传递引擎以及轻量级的数据处理平台。 AWS上的RocketMQ解决方案使客户能够在AWS云中快速部署RocketMQ集群。 基本群集设置(例如EC2实例类型)也可以在部署期间进行配置。 快速入门...
故障切换、故障切换群集、Microsoft 群集、Microsoft 群集服务的设置
Windows2003服务器群集创建和配置,绝对值啊!
Win2003服务器群集创建和配置指南,可以看看
Windows Server 2003 服务器群集创建和配置指南
RocketMQ运算符目录创建RocketMQ集群验证数据存储验证主机路径存储验证NFS存储水平刻度名称服务器群集规模经纪人集群规模乱序消息场景中的高级中间人主题转移清洁环境发展先决条件建造操作员代理和名称服务器映像 ...
会计师事务所选择缘何“群集”——内聚性效应还是结构等价性效应?.pdf
1.2.1群集组件和要求 1.2.2操作系统 1.3系统要求 1.3.1群集节点 1.3.2群集存储设备 1.4您可能需要的其它说明文件 二、准备系统以进行群集 2.1群集配置概览 2.2安装概览 2.3选择域模式 2.4配置群集节点中的...
群集配置, Windows server 2003 群集向导未完成的“失败”群集 ,安装 win2003群集
--基于windows server 2019操作系统,进行故障转移群集 (MSCS)安装 --每个操作步骤均详细进行图文讲解。 --本文档共107页 我们可以将多台服务器组成一个 故障转移群集 (failover clustercluster),这 些服务器...
服务器群集为资源和应用程序提供高可用性、故障恢复、可伸缩性和可管理性。 服务器群集允许客户端在出现故障和计划中的暂停时,依然能够访问应用程序和资源。如果群集中的某一台服务器由于故障或维护需要而无法使用...