发送消息的三种方式
同步发送
Producer在进行消息发送时可以是阻塞的,也可以是非阻塞的。具体对应到发送方式一共有三种,分别是同步、异步和单向的(ONEWAY)。之前介绍的调用send()
返回SendResult的方法是阻塞的,它一定要等到Broker进行了响应后才会返回,才能继续往下执行。对于下面的代码就是只有第一条消息发送完了后,才能发送第二条消息,接着是第三条。这种阻塞发送的方式也叫同步发送,它的整个响应时间还包括可能的重试时间。其内部会默认进行两次重试,可以通过setRetryTimesWhenSendFailed()
指定同步发送时内部最大的重试次数。
@Test
public void testSyncSend() throws Exception {
//指定Producer的Group为group1
DefaultMQProducer producer = new DefaultMQProducer("group1");
//指定需要连接的Name Server
producer.setNamesrvAddr(nameServer);
//发送消息前必须调用start(),其内部会进行一些初始化工作。
producer.start();
for (int i = 0; i < 10; i++) {
//指定消息发送的Topic是topic1。
Message message = new Message("topic1", ("hello" + i).getBytes());
//同步发送,发送成功后才会返回
SendResult sendResult = producer.send(message);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("消息发送成功:" + sendResult);
} else {
System.out.println("消息发送失败:" + sendResult);
}
}
//使用完毕后需要把Producer关闭,以释放相应的资源
producer.shutdown();
}
异步发送
同步发送时调用send()
的线程会阻塞,而异步发送时当前线程是不会阻塞的。发送结果将由一个回调函数进行回调。下面的代码就是异步发送消息的示例,它与同步发送消息的区别是它在发送消息时多传递了一个SendCallback对象,该方法一调用立马返回,而不需要等待Broker的响应返回。消息发送成功或失败后将回调SendCallback对象的对应方法。所以对于下面示例而言,第一条消息Broker还没有确认发送成功时,第二条消息就发送了,第三条消息也是一样。它们真正在Broker发送成功的顺序其实是不确定的。
@Test
public void sendAsync() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr(nameServer);
producer.start();
CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
Message message = new Message("topic1", ("send by async, no." + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功:" + message);
latch.countDown();
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送失败");
latch.countDown();
}
});
}
latch.await();
producer.shutdown();
}
通过异步方式发送消息如果失败了,其内部也是会进行重试的,其最大重试次数是通过
setRetryTimesWhenSendAsyncFailed()
指定的,默认也是2次。
ONEWAY
除了同步发送和异步发送外,还有一种发送方式叫ONEWAY,它的发送是单向的,即它不需要等待Broker的响应,只管发送即可,而不论发送成功与失败。通常应用于一些消息不是那么重要,可丢失的场景。它的发送是通过调用sendOneway()
发送的。
@Test
public void sendOneway() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr(nameServer);
producer.start();
for (int i=0; i<10; i++) {
Message message = new Message("topic1", "tag2", ("message send with oneway, no."+i).getBytes());
producer.sendOneway(message);
}
producer.shutdown();
}
(注:本文是基于Apache RocketMQ4.5.0所写)
相关推荐
大道至简,消息队列可以简单概括为:“一发一存一收”,在这三个过程中消息发送最为简单,也比较容易入手,适合初中阶童鞋作为MQ研究和学习的切入点。因此,本篇主要从一条消息发送为切入点,详细阐述在RocketMQ这款...
一张图进阶 RocketMQ - 消息发送.doc
NULL 博文链接:https://hbxflihua.iteye.com/blog/2370891
伴随着互联网的飞速发展, 特别是在近几年中, 移动... 本文介绍了MQTT 协议与RocketMQ 的这种开源项目的应用, 并通过RocketMQ 与Mosquitto 相结合的方式, 实现了一种基于RocketMQ 的MQTT 消息推送服务器及其分布式部署.
RocketMQ 开发指南,包含基本概念以及生产者和消费者的使用等
所以在实际产环境中,个Topic会设置成多分区的模式,来持多个消费者,参照下图:在互联企业的实际产环境中,Topic数量和分区都会较多,这就要求消息中间件在多T
2 功能强大,覆盖普通消息、定时(延时)消息、事务消息(提交、回滚)等基本场景的发送场景。集群订阅和广播订阅的消费场景。内含多种命令行参数(例如消费位点的调整,消息体大小调整,并发数调整,JVM 参数调优,...
万亿级数据洪峰下的消息引擎——Apache RocketMQ--阿里.pdf
RocketMQ消息丢失解决方案:事务消息.docx
3.1.1消息发送 3.1.2消费消息 3.2 顺序消息 3.3 ⼴播消息 3.4 延时消息 3.4.1 延时消息介绍 3.4.2 RocketMQ中的延迟消息 3.5 批量消息 3.6 过滤消息 3.6.1 TAG模式过滤 3.6.2 SQL表达式过滤 3.6.3 类过滤模式(基于...
该资源为在购买了阿里云中间件产品rocketmq消息队列之后,使用的连接rocketmq的demo工程,该程序以 Java 为例,包括普通消息、事务消息、定时消息的测试代码,以及相关 Spring 的配置示例,同时提供tcp连接的程序。
rocketmq集群可视化界面项目,直接修改配置中的集群ip地址即可~
在Producer方面,分为普通发送者和事务消息发送者,其中普通发送者构建Netty客户端向Broker发送消息,而事务消息发送者还需要构建Netty服务端以供Broker回查本地事务状态。消息存储方面,探讨了Broker如何存储...
RockeMQ 服务器安装包、控制台代码、消息发送接收代码【同步发送 异步发送 单向发送 顺序发送】
在使用RocketMQ时如何确保消息的不丢失和顺序性,这对于维护系统的稳定性和可靠性至关重要。首先,消息丢失问题可以通过多个策略来解决,包括生产者使用事务消息机制,配置RocketMQ同步刷盘和Dledger主从架构,以及...
RocketMQ 的设计基于主题的发布与订阅模式,其核心功能包括消息发送、消息存储(Broker)、消息消费,整体设计追求简单与性能第一。 1. NameServer 设计及其简单,RocketMQ 摈弃了业界常用的 Zookeeper 充当消息管理的...
prometheus监控rocketmq用到的rocketmq-exporter jar包,官方github.com/apache/rocketmq-exporter,mvn打包
rocketmq消息中间件
rocketmq事务消息, 使用场景, 最终一致性(而不是强一致性)