Spring Cloud Function
Spring Cloud Function专注于提供一些与业务无关的函数功能。它允许用户把java.util.function.Function
、java.util.function.Consumer
和java.util.function.Supplier
类型的bean直接对外发布。
通过Http对外发布
Function、Consumer、Supplier可以直接以Http的方式对外发布,这需要我们添加spring-cloud-starter-function-web
依赖。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-function-web</artifactId>
</dependency>
默认会发布到根路径,资源名称就是对应的bean名称。比如下面的代码中定义了一个名称为uppercase的Function类型的bean,它默认会发布到/uppercase
,对应的请求方式是POST。当我们以POST方式请求/uppercase
时如果请求体中传递的数据是abc,则服务端会响应ABC,因为对应处理的uppercase Function会把接入的参数转为大写后再返回。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
}
如果我们更习惯基于Reactive编程,则可以定义入参和出参为reactor.core.publisher.Flux
。
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return flux -> flux.map(value -> value.toUpperCase());
}
Consumer和Supplier也是类似的。Function可以接收一个参数进行处理后再返回一个结果,Consumer是只接收参数进行处理,Supplier是不接收参数但是会返回一个结果。在使用的时候可以根据它们的特性选择使用Function、Consumer还是Supplier。下面的定义了consumer、consumerFlux和supplier三个bean,当POST请求/consumer
时传递的参数会被简单的输出到控制台,POST请求/consumerFlux
时其内部的逻辑是基于Flux编程的,最终也是简单输出;GET请求/supplier
时会返回当前时间的字符串表示。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public Consumer<String> consumer() {
return message -> System.out.println("收到消息:" + message);
}
@Bean
public Consumer<Flux<String>> consumerFlux() {
return stringFlux -> {stringFlux.subscribe(str -> System.out.println("receive message : " + str));};
}
@Bean
public Supplier<String> supplier() {
return () -> LocalDateTime.now().toString();
}
}
Function、Consumer和Supplier的输入和输出参数除了可以是字符串以外,还可以是一个对象。当是一个对象时它会被转换为JSON格式,或者从JSON格式转换为对象(入参从JSON转换为对象,出参从对象转换为JSON)。比如下面代码中,当POST请求/functionUser
,传递JSON数据{"id":1,"name":"zhangsan"}
时会被转换为ID为1,name为zhangsan的User对象,经functionUser
对应的Function处理后会把name设置为output:zhangsan
,然后再转换为JSON对象返回给客户端,所以客户端会收到{"id":1,"name":"output:zhangsan"}
。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public Function<User, User> functionUser() {
return input -> {
input.setName("output:" + input.getName());
return input;
};
}
@Data
public static class User {
private Long id;
private String name;
}
}
Supplier和Consumer也是类似的。
@Bean
public Consumer<Flux<User>> consumerUser() {
return flux -> flux.subscribe(user -> System.out.println("收到User消息:" + user));
}
@Bean
public Supplier<User> supplierUser() {
return () -> {
User user = new User();
user.setId(System.currentTimeMillis());
user.setName("User-" + System.nanoTime());
return user;
};
}
输入、输出参数还可以是org.springframework.messaging.Message
类型。当入参是Message类型时,Http请求的Header会存放到Message的Header中,比如下面这样可以通过Message.getHeaders()
拿到Http请求的所有Header信息。
@Bean
public Function<Message<String>, Message<String>> functionMessage() {
return message -> {
String payload = "response:" + message.getPayload();
System.out.println("请求的Headers是:" + message.getHeaders());
Map<String, Object> headers = new HashMap<>();
headers.put("responseTime", LocalDateTime.now());
MessageHeaders messageHeaders = new MessageHeaders(headers);
return MessageBuilder.createMessage(payload, messageHeaders);
};
}
当响应的内容也是Message类型时,响应的Message的Header不会作为响应Http的Header,而是会把Message作为一个整体转换为JSON进行返回。比如下面这样:
{
"payload": "response:abc",
"headers": {
"responseTime": "2019-03-07T19:41:18.9",
"id": "4f244b72-b4fe-d679-6c7e-3a23dbfc9b53",
"timestamp": 1551958878900
}
}
Message也可以和复杂对象一起使用,比如下面的代码就定义了Message的payload是User对象,那请求体中的JSON对象会转换为User对象。
@Bean
public Consumer<Flux<Message<User>>> consumerMessage() {
return flux -> flux.subscribe(message -> System.out.println("收到User消息:" + message.getPayload() + ",消息头是:" + message.getHeaders()));
}
通过Stream交互
Spring Cloud Function可以和Spring Cloud Stream一起使用。当只有一个Consumer类型的bean定义时,Consumer的入参会和名为input的Binding绑定,即通过从名为input的Binding接收到的消息会调用Consumer bean进行处理。Spring Cloud Function和Spring Cloud Stream一起使用时需要添加spring-cloud-function-stream
依赖和一个Stream实现,笔者选择的是基于RocketMQ的实现,所以添加了spring-cloud-starter-stream-rocketmq
依赖。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-stream</artifactId>
</dependency>
现假设我们的Spring Cloud Function应用中定义了如下这样一个Consumer类型的bean,我们期望它接收的对象来自于Spring Cloud Stream,更直观的说是来自于名为input的Binding。
@Bean
public Consumer<String> consumer() {
return message -> System.out.println("收到消息:" + message);
}
当我们的Stream的Binder实现采用的RocketMQ,我们可以进行如下定义,它指定了RocketMQ的NameServer的地址,指定input Binding对应的是名为test-topic
的Topic。
spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.group=test-group
所以当我们往RocketMQ的test-topic
中发送消息时对应的消息就会转而交给定义的Consumer bean进行处理。
DefaultMQProducer producer = new DefaultMQProducer("test1");
producer.setNamesrvAddr("localhost:9876");
try {
producer.start();
for (int i=0; i<10; i++) {
org.apache.rocketmq.common.message.Message message = new Message();
message.setTopic("test-topic");
message.setTags("tag1");
message.setBody(("Hello"+i).getBytes());
producer.send(message);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
当我们用上面的代码往test-topic
Topic中发送了10条消息后,我们在控制台可能会看到Consumer输出的如下内容。
收到消息:Hello4
收到消息:Hello3
收到消息:Hello1
收到消息:Hello9
收到消息:Hello2
收到消息:Hello0
收到消息:Hello8
收到消息:Hello6
收到消息:Hello7
收到消息:Hello5
同样,它也可以把参数类型定义为org.springframework.messaging.Message
,payload也可以是复杂对象。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer("test1");
producer.setNamesrvAddr("localhost:9876");
try {
producer.start();
for (int i=0; i<10; i++) {
org.apache.rocketmq.common.message.Message message = new Message();
message.setTopic("test-topic");
message.setTags("tag1");
User user = new User();
user.setId(Long.valueOf(i));
user.setName("User" + i);
message.setBody(JSON.toJSONString(user).getBytes());
producer.send(message);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
SpringApplication.run(Application.class, args);
}
@Bean
public Consumer<org.springframework.messaging.Message<User>> consumer() {
return message -> {
System.out.println("收到消息:" + message);
User user = message.getPayload();
System.out.println(user.getName());
};
}
@Data
public static class User {
private Long id;
private String name;
}
}
也可以定义Supplier类型的bean,Supplier类型的bean默认会绑定名为output的Binding。下面的代码就定义了一个Supplier类型的bean,返回的是Flux对象,其每秒会产生一条String类型的消息。
@Bean
public Supplier<Flux<String>> supplier() {
return () -> Flux.interval(Duration.ofSeconds(1)).map(i -> "新消息" + LocalDateTime.now());
}
接着我们可以定义名为output的Binding的destination等信息。比如下面我们定义了其destination为test-topic。
spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.group=group1
之前我们定义了Consumer对应的名为input的Binding的destination也为test-topic。所以当应用启动后,Supplier会每秒往test-topic发送一条消息,对应的消息会由Consumer接收到。此时会看到Consumer在控制台打印的如下信息。
收到消息:新消息2019-03-08T21:56:20.577
收到消息:新消息2019-03-08T21:56:21.577
收到消息:新消息2019-03-08T21:56:22.577
收到消息:新消息2019-03-08T21:56:23.576
收到消息:新消息2019-03-08T21:56:24.577
收到消息:新消息2019-03-08T21:56:25.576
完整代码如下:
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public Consumer<String> consumer() {
return message -> {
System.out.println("收到消息:" + message);
};
}
@Bean
public Supplier<Flux<String>> supplier() {
return () -> Flux.interval(Duration.ofSeconds(1)).map(i -> "新消息" + LocalDateTime.now());
}
}
当定义Function类型的bean时,因为它同时有入参和出参,默认情况下它的入参会绑定名为input的Binding,出参绑定名为output的Binding。当我们定义下面这样一个Function类型的bean时,它先是把接收到的消息打印到控制台,然后加上response:
前缀返回并发送到名为output的Binding。
@Bean
public Function<String, String> function() {
return input -> {
System.out.println("Function接收到消息:" + input);
return "response:" + input;
};
}
当input和output的Binding定义如下时即表示从test-topic接收到消息,加上response:
前缀后又发送到了test-topic1这个Topic上。
spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.group=test-group
spring.cloud.stream.bindings.output.destination=test-topic1
spring.cloud.stream.bindings.output.group=group1
基于上述配置进行测试的完整代码如下。先是通过sendMessages()
往test-topic中发送了5条消息,它们会被Function bean处理后再发送到test-topic1中。而receiveMessages()
中定义了对test-topic1中消息的监听,它会收到test-topic1中所有的消息。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
sendMessages();
receiveMessages();
SpringApplication.run(Application.class, args);
}
private static void sendMessages() {
DefaultMQProducer producer = new DefaultMQProducer("test1");
producer.setNamesrvAddr("localhost:9876");
try {
producer.start();
for (int i=0; i<5; i++) {
org.apache.rocketmq.common.message.Message message = new Message();
message.setTopic("test-topic");
message.setTags("tag1");
message.setBody(("消息-" + i).getBytes());
producer.send(message);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}
private static void receiveMessages() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("localhost:9876");
try {
consumer.subscribe("test-topic1", "*");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
msgs.forEach(msg -> {
System.out.println("消费者收到消息:" + new String(msg.getBody()));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
@Bean
public Function<String, String> function() {
return input -> {
System.out.println("Function接收到消息:" + input);
return "response:" + input;
};
}
}
上述代码运行后你会在控制台看到类似如下输出。
Function接收到消息:消息-4
消费者收到消息:response:消息-4
Function接收到消息:消息-2
Function接收到消息:消息-3
Function接收到消息:消息-0
Function接收到消息:消息-1
消费者收到消息:response:消息-2
消费者收到消息:response:消息-0
消费者收到消息:response:消息-3
消费者收到消息:response:消息-1
Supplier默认会绑定名为output的Binding,Consumer默认会绑定名为input的Binding,Function默认会同时绑定input和output这两个Binding。当同时存在多个Consumer或Function类型的bean时,因为它们都与名为input的Binding绑定,当名为input的Binding中有新消息了,Spring Cloud Function默认会抛出异常,因为它不知道该给哪一个Consumer或Function处理。此时可以指定spring.cloud.function.stream.shared=true
,这样多个function类型的bean将共享消息。需要注意的是从RocketMQ的角度来讲,它们都属于一个消费者组,它们在共享消息时每条消息只会被一个消费者消费,比如有消息ABCD,消费者1可能消费了AB,消费者2消费了CD。但是对于Spring Cloud Function Stream来讲,它们对应的都是名为input的Binding,一个Binding只对应一个消费者,它能消费所有的消息,Spring Cloud Function内部在共享消息的时候所有与名为input的Binding绑定的Function或Consumer都将收到所有的消息。
@Bean
public Consumer<String> consumer() {
return message -> {
System.out.println("Consumer收到消息:" + message);
};
}
@Bean
public Function<String, String> function() {
return input -> {
System.out.println("Function接收到消息:" + input);
return "response:" + input;
};
}
当我们定义了上述这样的Function和Consumer类型的bean,又定义了spring.cloud.function.stream.shared=true
,当往名为input的Binding指定的destination发送消息时,我们的控制台会输出类似如下这样的内容。
Function接收到消息:消息-3
Consumer收到消息:消息-3
Function接收到消息:消息-2
Consumer收到消息:消息-2
Function接收到消息:消息-0
Consumer收到消息:消息-0
Function接收到消息:消息-1
Consumer收到消息:消息-1
Function接收到消息:消息-4
Consumer收到消息:消息-4
那如果想精确的把一条消息发送到某个Consumer或Function怎么办呢?Spring Cloud Function Stream允许我们通过头信息stream_routekey
来指定需要发送的Consumer或Function对应的bean名称。比如想要名为abc的Consumer处理,就指定stream_routekey
的值为abc。
Map<String, Object> headers = new HashMap<>();
headers.put("stream_routekey", "consumer");
MessageHeaders messageHeaders = new MessageHeaders(headers);
for (int i=0; i<5; i++) {
Message<String> message = MessageBuilder.createMessage("消息" + i, messageHeaders);
this.sender.send(message);
}
因为笔者使用的基于RocketMQ的实现没有转发最原始的Header到新消息的Header中(会放到RocketMQ的消息的properties中),所以笔者模拟不了指定
stream_routekey
的效果。
参考文档
(注:本文是基于Spring Cloud Finchley.SR1所写)
相关推荐
最简单的 SpringCloud 教程—— 服务的注册与发现(Eureka) 1.创建服务注册中心 (Eureka Server) 2.创建一个服务提供者 (Eureka Client)
SpringCloud——分布式配置中心(Spring Cloud Config)之高可用的分布式配置中心
SpringCloud——消息总线(Bus)之Spring Cloud Bus将分布式系统的节点与轻量级消息代理链接。
SpringCloud——分布式跟踪(Sleuth)之Spring Cloud Sleuth 主要功能就是在分布式系统中提供追踪解决方案。
SpringCloud——断路器(Hystrix)之Ribbon使用断路器和Feign使用断路器
SpringCloud——路由器和过滤器(Zuul)之微服务网关的实现
SpringCloud——服务注册(consul)之Consul通过HTTP API和DNS提供服务发现服务。
简单的spring cloud项目,适合新手学习使用
SpringCloud系列Demo代码,每个子项目都是SpringCloud的一个知识点或者说技能点且都有对应的博客介绍,代码开箱即用适合新手学习或老司机复习。 SpringCloud系列Demo代码,每个子项目都是SpringCloud的一个知识点...
Zookeeper作为注册中心搭建SpringCloud实现服务注册及发现
基于SpringCloud的快递驿站系统源码 基于SpringCloud的快递驿站系统源码 基于SpringCloud的快递驿站系统源码 基于SpringCloud的快递驿站系统源码 基于SpringCloud的快递驿站系统源码 基于SpringCloud的...
Spring boot 是 Spring 的一套快速配置脚手架,可以基于spring boot 快速开发单个微服务,Spring Cloud是一个基于Spring Boot实现的云应用开发工具;Spring boot专注于快速、方便集成的单个个体,Spring Cloud是关注...
基于spring cloud 和vue全家桶的开源电商源码基于spring cloud 和vue全家桶的开源电商源码基于spring cloud 和vue全家桶的开源电商源码基于spring cloud 和vue全家桶的开源电商源码基于spring cloud 和vue全家桶的...
spring boot , spring cloud alibaba, spring cloub 版本选型
Spring Cloud 提供完全的微服务技术体系,可以帮护开发者快速实现架构升级。SpringCloud 是目前最火热的构建微服务体系的解决方案。
一篇很好的springCloud学习的思维导读,详细的介绍了,springCloud的搭建步骤以及各组件的说明讲解 涵盖 Eureka服务注册与发现 Zookeeper服务注册与发现 Consul服务注册与发现 Ribbon负载均衡服务调用 OpenFeign...
(完整版)基于SpringCloud微服务系统设计方案.pdf(完整版)基于SpringCloud微服务系统设计方案.pdf(完整版)基于SpringCloud微服务系统设计方案.pdf(完整版)基于SpringCloud微服务系统设计方案.pdf(完整版)基于Spring...
Spring Cloud Function 是基于Spring Boot 的函数计算框架,它抽象出所有传输细节和基础架构,允许开发人员保留所有熟悉的工具和流程,并专注于业务逻辑。 批量检测脚本:python Spel_RCE_POC.py url.txt 反弹...
课程的源码是尚硅谷的springcloud教程源码,SpringCloud各种核心组件,到最终的微服务架构总结,帮助大家快速入门、上手并精通微服务框架SpringCloud。 课程中对比了 Dubbo 和 SpringCloud,并深入讲授SpringCloud...
第1章 SpringCloud简介 第2章 SpringCache缓存详细讲解及应用 第3章 Redis高级缓存讲解及应用 第4章 SpringSecurity安全详细讲解及应用 第5章 RabbitMQ高级消息的讲解及应用 第6章 ElasticSearch详细讲解及应用 第7...