基于Apache Kafka的Stream实现
如果你的应用使用了Apache Kafka,你需要把它和Spring Cloud进行整合。需要在应用中添加如下依赖。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
然后就是Spring Cloud Stream的标准配置了。需要在@Configuration
类上使用@EnableBinding
声明需要应用的Binding。
@EnableBinding({Source.class, Sink.class})
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
上面代码定义需要使用的Binding是Source和Sink接口中声明的input和output两个Binding。然后可以在application.properties文件中声明这两个Binding对应的destination,它们对应于kafka的Topic。如果指定的Topic还未创建,默认会自动进行创建。
spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.group=test-group
如果你的Kafka服务器不是本机或者监听端口不是默认的9092,则还需要通过spring.cloud.stream.kafka.binder.brokers
指定Kafka的服务地址。
spring.cloud.stream.kafka.binder.brokers=localhost:9092
之后就是照常的使用Spring Cloud Stream的相关API进行操作了。如下是发送消息的示例。
@Component
@Slf4j
public class SourceProducer {
@Autowired
private Source source;
public void sendMessages(String msg) {
Message<String> message = MessageBuilder.withPayload(msg).build();
log.info("发送了一条消息-{}", msg);
this.source.output().send(message);
}
}
如下是监听消息的示例。
@Component
@Slf4j
public class SinkConsumer {
@StreamListener(Sink.INPUT)
public void inputConsumer(Message<String> message) {
String payload = message.getPayload();
MessageHeaders headers = message.getHeaders();
log.info("从Binding-{}收到信息-{}, headers:{}", Sink.INPUT, payload, headers);
}
}
由于笔者的上一篇文章——Spring Cloud Stream基于RocketMQ的实现已经介绍了Spring Cloud Stream的一些规范,这里就不再赘述了。
从Kafka服务,也就是从Spring Cloud Stream的Binder的角度来讲可以定义的参数可以参考org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties
。比较核心的参数有:
- spring.cloud.stream.kafka.binder.brokers:用来指定Kafka服务的地址,可以是host,也可以是host:port格式,如:
spring.cloud.stream.kafka.binder.brokers=localhost,10.10.10.1:9092
。默认是localhost。 - spring.cloud.stream.kafka.binder.defaultBrokerPort:Kafka服务的完整地址应该是host+port,当brokers只定义了host时,将默认取该属性定义的port作为Kafka服务的port,默认是9092。
- spring.cloud.stream.kafka.binder.autoCreateTopics:指定Topic不存在时是否需要自动创建,默认是true。
Spring Cloud Stream有多种不同的实现,比如RocketMQ/Kafka/RabbitMQ。不同的实现者在Producer和Consumer上也可能是有些差别的,或者是有些特性的。整合Spring Cloud Stream后这些特性的属性也是可以进行配置的。可以通过spring.cloud.stream.kafka.bindings.xxx.consumer.yyy
指定名为xxx的这个Consumer角色的yyy属性。可以通过spring.cloud.stream.kafka.bindings.xxx.producer.yyy
指定名为xxx的Producer的yyy属性。Kafka实现的Producer的特性属性配置可以参考org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties
,Consumer的特性属性配置可以参考org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties
。Consumer特性的参数主要有:
- autoRebalanceEnabled:默认为true。当设置为true时,会对分区进行负载均衡,有Consumer加入或退出时会对Topic的分区重新分配。设置为false时,每个Consumer分配的Topic分区是固定的,不会变。
- autoCommitOffset:默认为true。当设置为true时表示消息处理完后会自动提交offset;如果设置为false则会在消息的header中添加一个key为kafka_acknowledgment,value为
org.springframework.kafka.support.Acknowledgment
类型的对象的header,消费者可以在处理消息后从header中获取该对象进行手动响应消息的处理情况。 - startOffset:指定新的消费者组加入的时候起始的消费位置,可选值有earliest和latest。默认是null,相当于earliest。
(注:本文是基于Spring cloud Finchley.SR1所写)
相关推荐
简单介绍了如何在Spring Cloud中使用RabbitMQ和Kafka来完成消息发送与接收
卡夫卡春天云流 Apache Kafka的Spring Cloud Stream展示
Spring Cloud系列教程 Spring Boot Spring Cloud Stream 和 Kafka案例教程 SpringCloud系列教程、SpringBoot、 Stream、Kafka、案例教程
Spring for Apache Kafka API。 Spring for Apache Kafka 开发文档。
先启动消费者(kafka,kafka1) 在启动kafka2 自己安装好kafka 及zookeeper文件,有什么疑问,可以一起交流交流。
主要介绍了spring-cloud-stream结合kafka使用详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
Apache Kafka实战.pdf..
Spring Cloud系列教程 Spring Boot Spring Cloud Stream 和 Kafka案例教程 springcloud生产者与消费者项目实战案例 Spring Cloud 中断路器 Circuit Breaker的应用 配置 Spring Cloud Config Server Spring Cloud ...
12.Spring Cloud中Hystrix的服务降级与异常处理 13.Spring Cloud中Hystrix的请求缓存 14.Spring Cloud中Hystrix的请求合并 15.Spring Cloud中Hystrix仪表盘与Turbine集群监控 16.Spring Cloud中声明式服务调用...
spring-cloud-stream-kafka:Spring Cloud Streams Kafka Avro
spring-boot-starter-kafka示例程序\n 支持springcloud1.5.4,kafka0.8.2.x\n 项目地址:https://github.com/zhyea/spring-boot-starter-kafka
Apache Kafka源码剖析 PDF较大,分5份上传!一起解压即可。
Apache Kafka is a popular distributed streaming platform that acts as a messaging queue or an enterprise messaging system. It lets you publish and subscribe to a stream of records and process them in ...
This book is here to help you get familiar with Apache Kafka and to solve your challenges related to the consumption of millions of messages in publisher-subscriber architectures. It is aimed at ...
《Spring Boot 整合 Apache Kafka》博文对应的源代码;博文地址:https://blog.csdn.net/shawearn1027/article/details/107067328
KafkaStream之时间窗口WindowBy的详细解释和例子接收,阐述grace方法和suppress方法 的妙用,还有自定义TimeStampExtractor的详细示例
spring cloud stream kafka 消息驱动集成
BUS、Turbine、Zipkin、Cache、Spring Cloud Admin、API Gateway、ELK Spring Cloud Security、 Spring Cloud Stream Component: RoketMQ、Kafka、MongoDB、OSS、Redis、Swagger、Zuul、Label、BASE、Charts、...
spring-cloud
Apache Kafka Apache Kafka Apache Kafka Apache Kafka