`
234390216
  • 浏览: 10195593 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
博客专栏
A5ee55b9-a463-3d09-9c78-0c0cf33198cd
Oracle基础
浏览量:460915
Ad26f909-6440-35a9-b4e9-9aea825bd38e
springMVC介绍
浏览量:1771987
Ce363057-ae4d-3ee1-bb46-e7b51a722a4b
Mybatis简介
浏览量:1395604
Bdeb91ad-cf8a-3fe9-942a-3710073b4000
Spring整合JMS
浏览量:393950
5cbbde67-7cd5-313c-95c2-4185389601e7
Ehcache简介
浏览量:678301
Cc1c0708-ccc2-3d20-ba47-d40e04440682
Cas简介
浏览量:529364
51592fc3-854c-34f4-9eff-cb82d993ab3a
Spring Securi...
浏览量:1178868
23e1c30e-ef8c-3702-aa3c-e83277ffca91
Spring基础知识
浏览量:462271
4af1c81c-eb9d-365f-b759-07685a32156e
Spring Aop介绍
浏览量:150218
2f926891-9e7a-3ce2-a074-3acb2aaf2584
JAXB简介
浏览量:66924
社区版块
存档分类
最新评论

RocketMQ(03)——通过Tag对消息分类

阅读更多

通过Tag对消息分类

RocketMQ建议一个业务系统只使用一个Topic,不同类型的消息通过tag来区分。tag可以在构造Message的时候指定,下面代码就指定了发送的消息的tag都为tag0。

@Test
public void sendWithTag() throws Exception {
  DefaultMQProducer producer = new DefaultMQProducer("group1");
  producer.setNamesrvAddr(nameServer);
  producer.start();
  for (int i = 0; i < 10; i++) {
    Message message = new Message("topic1", "tag0", ("hello with tag---" + i).getBytes());
    SendResult sendResult = producer.send(message);
    if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
      System.out.println("消息发送成功:" + sendResult);
    } else {
      System.out.println("消息发送失败:" + sendResult);
    }
  }
  producer.shutdown();
}

也可以通过Message的setTags()进行指定。

@Test
public void sendWithTag() throws Exception {
  DefaultMQProducer producer = new DefaultMQProducer("group1");
  producer.setNamesrvAddr(nameServer);
  producer.start();
  for (int i = 0; i < 10; i++) {
    Message message = new Message("topic1", ("hello with tag---" + i).getBytes());
    message.setTags("tag0");
    SendResult sendResult = producer.send(message);
    if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
      System.out.println("消息发送成功:" + sendResult);
    } else {
      System.out.println("消息发送失败:" + sendResult);
    }
  }
  producer.shutdown();
}

虽然参数名叫tags,但是一条消息只能指定一个tag。消费者进行消费的时候也可以指定需要消费的消息对应的tag,比如下面就指定了需要消费的消息对应的Topic是topic1,Tag是tag0。

@Test
public void testConsumeByTag() throws Exception {
  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group2");
  consumer.setNamesrvAddr(nameServer);
  consumer.subscribe("topic1", "tag0");
  consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
      System.out.println(Thread.currentThread().getName() + "收到了消息,数量是:" + msgs.size());
      AtomicInteger counter = new AtomicInteger();
      msgs.forEach(msg -> System.out.println(counter.incrementAndGet() + ".消息内容是:" + new String(msg.getBody())));
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
  });
  consumer.start();
  TimeUnit.SECONDS.sleep(120);
}

同一消费者也可以同时订阅同一个Topic的多个Tag,多个Tag之间通过||进行分隔。比如下面代码就同时订阅了tag0、tag1和tag2。

@Test
public void testConsumeByTag() throws Exception {
  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group2");
  consumer.setNamesrvAddr(nameServer);
  consumer.subscribe("topic1", "tag0||tag1||tag2");
  consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
      System.out.println(Thread.currentThread().getName() + "收到了消息,数量是:" + msgs.size());
      AtomicInteger counter = new AtomicInteger();
      msgs.forEach(msg -> System.out.println(counter.incrementAndGet() + ".消息内容是:" + new String(msg.getBody())));
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
  });
  consumer.start();
  TimeUnit.SECONDS.sleep(120);
}

(注:本文基于RocketMQ4.5.0所写)

0
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics