`
234390216
  • 浏览: 10195675 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
博客专栏
A5ee55b9-a463-3d09-9c78-0c0cf33198cd
Oracle基础
浏览量:460930
Ad26f909-6440-35a9-b4e9-9aea825bd38e
springMVC介绍
浏览量:1772003
Ce363057-ae4d-3ee1-bb46-e7b51a722a4b
Mybatis简介
浏览量:1395627
Bdeb91ad-cf8a-3fe9-942a-3710073b4000
Spring整合JMS
浏览量:393961
5cbbde67-7cd5-313c-95c2-4185389601e7
Ehcache简介
浏览量:678309
Cc1c0708-ccc2-3d20-ba47-d40e04440682
Cas简介
浏览量:529373
51592fc3-854c-34f4-9eff-cb82d993ab3a
Spring Securi...
浏览量:1178882
23e1c30e-ef8c-3702-aa3c-e83277ffca91
Spring基础知识
浏览量:462312
4af1c81c-eb9d-365f-b759-07685a32156e
Spring Aop介绍
浏览量:150225
2f926891-9e7a-3ce2-a074-3acb2aaf2584
JAXB简介
浏览量:66926
社区版块
存档分类
最新评论

RocketMQ(05)——消息的群集消费和广播消费

阅读更多

消息的群集消费和广播消费

RocketMQ的消费者进行消息消费时有两种消费方式,群集消费和广播消费。默认是群集消费。

群集消费

Consumer都有一个Group,当两个Consumer实例属于同一个Group时,它们会共享消息队列中的消息消费位移,即同一条消息只会由一个消费者实例消费。实际上一个队列只会分配给一个消费者实例,那么属于该队列中的消息就只能被一个消费者实例消费了。当一个消费者实例挂了后,会重新为消费者实例分配队列,这样原本分配给挂了的那个实例的队列中又会分配给其它消费者进行消费。RocketMQ限制了同一个JVM中不允许有相同Group名称的Consumer实例存在,所以同一Group的多个Consumer往往是部署在不同机器上的,通常是同一程序部署了多份。假设现在有名为group1的Consumer在Machine1上部署了一份,在Machine2上部署了一份,它们订阅的消息队列中有消息1-10共10条消息,那么可能1-5条消息由Machine1上的消费者消费,6-10条消息由Machine2上的消费者消费。如果在Machine1上还部署了一个名为group2的消费者,则该消费者可以从消息队列中消费1-10条消息,因为它跟group1是不同的Group,不同Group的消费者对同一队列的消费是完全独立的。比如下面的代码对应的就是群集消费。

@Test
public void testConsumer() throws Exception {
  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group1");
  consumer.setNamesrvAddr(nameServer);
  consumer.subscribe("topic1", "tag1");
  consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
      System.out.println(Thread.currentThread().getName() + "收到了消息" + msgs);
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
  });
  consumer.start();
  TimeUnit.SECONDS.sleep(120);
}

广播消费

消费者的消费模式是广播消费时,每一个消费者实例对消息队列中消息的消费是独立的,而不管它们的Group是什么。假设现在有名为group1的Consumer在Machine1上部署了一份,在Machine2上部署了一份,它们订阅的消息队列中有消息1-10共10条消息,那么可能1-10条消息由Machine1上的消费者消费,它们也会由Machine2上的消费者消费。可以通过消费者的setMessageModel(MessageModel.BROADCASTING)指定消息的消费模式为广播消费,它的默认值是CLUSTERING。

@Test
public void testBroadcastConsume() throws Exception {
  String topic = "topic1";
  String tag = "tag1";
  String consumerGroup = "consumer-group1";
  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
  consumer.setNamesrvAddr(this.nameServer);
  //广播方式,同一消息可以被所有的消费者消费。
  consumer.setMessageModel(MessageModel.BROADCASTING);
  try {
    consumer.subscribe(topic, tag);
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
      System.out.println("消费了消息——" + msgs.get(0).getMsgId());
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    consumer.start();
  } catch (MQClientException e) {
    e.printStackTrace();
  }
  TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}

(注:本文是基于RocketMQ4.5.0所写)

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics