通过Tag对消息分类
RocketMQ建议一个业务系统只使用一个Topic,不同类型的消息通过tag来区分。tag可以在构造Message的时候指定,下面代码就指定了发送的消息的tag都为tag0。
@Test
public void sendWithTag() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr(nameServer);
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("topic1", "tag0", ("hello with tag---" + i).getBytes());
SendResult sendResult = producer.send(message);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("消息发送成功:" + sendResult);
} else {
System.out.println("消息发送失败:" + sendResult);
}
}
producer.shutdown();
}
也可以通过Message的setTags()
进行指定。
@Test
public void sendWithTag() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr(nameServer);
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("topic1", ("hello with tag---" + i).getBytes());
message.setTags("tag0");
SendResult sendResult = producer.send(message);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("消息发送成功:" + sendResult);
} else {
System.out.println("消息发送失败:" + sendResult);
}
}
producer.shutdown();
}
虽然参数名叫tags,但是一条消息只能指定一个tag。消费者进行消费的时候也可以指定需要消费的消息对应的tag,比如下面就指定了需要消费的消息对应的Topic是topic1,Tag是tag0。
@Test
public void testConsumeByTag() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group2");
consumer.setNamesrvAddr(nameServer);
consumer.subscribe("topic1", "tag0");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + "收到了消息,数量是:" + msgs.size());
AtomicInteger counter = new AtomicInteger();
msgs.forEach(msg -> System.out.println(counter.incrementAndGet() + ".消息内容是:" + new String(msg.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
TimeUnit.SECONDS.sleep(120);
}
同一消费者也可以同时订阅同一个Topic的多个Tag,多个Tag之间通过||
进行分隔。比如下面代码就同时订阅了tag0、tag1和tag2。
@Test
public void testConsumeByTag() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group2");
consumer.setNamesrvAddr(nameServer);
consumer.subscribe("topic1", "tag0||tag1||tag2");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + "收到了消息,数量是:" + msgs.size());
AtomicInteger counter = new AtomicInteger();
msgs.forEach(msg -> System.out.println(counter.incrementAndGet() + ".消息内容是:" + new String(msg.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
TimeUnit.SECONDS.sleep(120);
}
(注:本文基于RocketMQ4.5.0所写)
相关推荐
3.4.2 RocketMQ中的延迟消息 3.5 批量消息 3.6 过滤消息 3.6.1 TAG模式过滤 3.6.2 SQL表达式过滤 3.6.3 类过滤模式(基于4.2.0版本) 3.7 事务消息 3.7.1 概念介绍 3.7.2 分布式事务消息的优势 3.7.3 典型场景 3.7.4...
IOS应用源码——3D TAG 云.zip
JSP源码——Noka tag 软件标签 v3.0_noka3.9.zip
JSP实例开发源码——Noka tag 软件标签.zip
安卓Android源码——Tag.rar
顺序队列用tag标记来判断front==rear相等时队列时空还是满。
RocketMQ是一个分布式消息和流平台,它被设计为简单和可靠的消息中间件。 以下是RocketMQ的一些关键特性和概述: 1.分布式:RocketMQ集群能够水平扩展,支持高并发和高容错。 2.消息队列:RocketMQ能够保证消息的顺序和...
RocketMQ概念 组成: producer:生产者,消息发送者 producer group:生产者组,由多个生产者组成, ...Broker:代理服务器,负责消息的存储,...Tag:将消息在Topic的基础上再次分类,消费者可以进一步细化自己想要的消息
Apache Alibaba RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点: • 支持严格的消息顺序 • 支持 Topic 与 Queue 两种模式 • 亿级消息堆积能力 • 比较友好的分布式特性 • 同时支持 Push 与 Pull ...
3.4.2 RocketMQ中的延迟消息 3.5 批量消息 3.6 过滤消息 3.6.1 TAG模式过滤 3.6.2 SQL表达式过滤 3.6.3 类过滤模式(基于4.2.0版本) 3.7 事务消息 3.7.1 概念介绍 3.7.2 分布式事务消息的优势 3.7.3 典型场景 3.7.4...
3.8.过滤消息tag 4.rocketmq集群 4.1.集群分类 4.2.搭建2主2从集群 4.3.启动集群 4.4.rocketmq控制台 5.rocketmq高级 5.1.持久化 5.2.刷盘机制 5.3.高可用与主从复制 5.4.负载均衡 5.5.消息重试与死信队列 5.6.消息...
网页模板——jQuery 标签云显示插件 Tag Cloud
JSTL教程,jstl技术本身是一个标签库,用来实现jsp页面的显示逻辑。本文档对其核心标签库的使用进行了介绍,可供大家学习使用。
Mp3tag是一个功能强大且易于使用的工具来编辑音频文件的元数据。 它支持批量标签ID3v1,ID3v2.3,id3v2.4,iTunes MP4、WMA编辑,Vorbis评论和元标记多个文件一次涵盖多种音频格式。 此外,它支持在线的数据库查询,...
rocketmq-flume Source&Sink该项目用于与之间的消息接收和投递。首先请确定您已经对和有了基本的了解确保本地maven库中已经存在,或者下载RocketMQ源码自行编译在rocketmq-flume项目根目录执行mvn clean install ...
帝国CMS 7.2 7.5 TAG插件 TAG高级管理工具tags插件SEO利器 一、TAG展示新增功能 TAG伪静态可设置以TAG拼音或TAGID形式,更利于SEO 自带TAG首页功能,方便展示...修复帝国原始TAG系统对存在未审核信息时分页错误的bug
你可以通过本项目轻松的集成Rocketmq到你的SpringBoot项目中。 本项目主要包含以下特性 同步发送消息 异步发送消息 广播发送消息 有序发送和消费消息 发送延时消息 消息tag和key支持 自动序列化和反序列化...
NULL 博文链接:https://kenshin54.iteye.com/blog/858156
Apache RocketMQ 是一个分布式消息传递和流平台,具有低延迟,高性能和可靠性,万亿级...各种消息过滤器机制,例如SQL和Tag 用于隔离测试和云隔离群集的Docker映像 功能丰富的管理仪表板,用于配置,指标和监视 认