反应式代码
在开发应用程序代码时,我们可以编写两种风格的代码,即命令式和反应式。
- 命令式的代码:非常类似于虚拟的报纸订阅方式。它由一组任务组成,每次只运行一项任务,每项任务又都依赖于前面的任务。数据会按批次进行处理,在前一项任务还没有完成对当前数据批次的处理时,不能将这些数据递交给下一项处理任务。
- 反应式的代码:非常类似于真实的报纸订阅方式。它定义了一组用来处理数据的任务,但是这些任务可以并行地执行。每项任务处理数据的一部分子集,并将结果交给处理流程中的下一项任务,同时继续处理数据的另一部分子集。
Reactor
Mono是Reactor的两种核心类型之一,另一个类型是Flux。两者都实现了反应式流的Publisher接口。Flux代表具有零个、一个或者多个(可能是无限个)数据项的管道。Mono是一种特殊的反应式类型,针对数据项不超过一个的场景,它进行了优化。
使用反应式流图
反应式流图通常使用弹珠图(marble diagram)表示。
弹珠图的展示形式非常简单,在顶部描述了流经Flux或者Mono的时间线,在中间描述了要执行的操作,在底部描述了结果形成的Flux或者Mono的时间线。我们将会看见,当数据流经原始的Flux时,某些操作会对它进行处理,并产生一个新的Flux,已经处理过的数据将会在新Flux中流动。
描绘Flux基本流程的弹珠图
描绘Mono基本流程的弹珠图
根据对象创建Flux
/**
* 根据对象创建
* 下面的测试方法使用Flux或Mono上的静态just方法来创建一个反应式类型
* 他们的数据会由这些对象来驱动
* 下面方法将从5个String对象中创建一个Flux
*/
@Test
public void createAFlux_just() {
Flux<String> fruitFlux = Flux.just("Apple","Orange","Grape","Banana","Strawberry");
//添加一个订阅者
fruitFlux.subscribe(f -> System.out.println("Here's some fruit: " + f));
//要了解玉东易的数据是否流经了fruitFlux,我们可以编写如下代码
/**
* StepVerifier订阅了fruitFlux,然后断言Flux中的每一个数据项是否与预期的水果名称相匹配。
* 最后,它验证Flux在发布完"Strawberry"之后,整个fruitFlux正常完成
*/
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
}
根据集合创建
/**
* 根据集合创建
* 要根据数组创建Flux,可以调用Flux上的静态方法fromArray(),并传入一个原数组
*/
@Test
public void createAFlux_fromArray() {
String[] fruits = new String[]{
"Apple","Orange","Grape","Banana","Strawberry"
};
Flux<String> fruitFlux = Flux.fromArray(fruits);
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
}
/**
* 如果我们需要根据List,Set,或者其他任意Iterable的实现来创建Flux,那么
* 可以使用静态的fromIterable方法
*/
@Test
public void createAFlux_fromIterable() {
ArrayList<String> fruitList = new ArrayList<>();
fruitList.add("Apple");
fruitList.add("Orange");
fruitList.add("Grape");
fruitList.add("Banana");
fruitList.add("Strawberry");
Flux<String> fruitFlux = Flux.fromIterable(fruitList);
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
}
/**
* 或者我们拥有一个Java Stream,并且希望将其用作Flux的源
*/
@Test
public void createAFlux_fromStream() {
Stream<String> stream = Stream.of("Apple", "Orange", "Grape", "Banana", "Strawberry");
Flux<String> stringFlux = Flux.fromStream(stream);
StepVerifier.create(stringFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
}
生成Flux的数据
/**
* 生成Flux的数据
* 有时候我们根本没有可用的数据,而只是想要一个作为计数器的Flux,他会
* 在每次发送新值时增加1.
* 要创建一个计数器Flux,我们可以使用静态方法range()
* 在这个例子中,我们创建了一个区间Flux,起始值为1,结束值为5
*/
@Test
public void createAFlux_range() {
Flux<Integer> intervalFlux = Flux.range(1, 5);
StepVerifier.create(intervalFlux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(4)
.expectNext(5)
.verifyComplete();
}
/**
* 另一个与range()方法类似的是interval()
* 与range()方法相同的是,interval()方法会创建一个发布值递增的Flux。interval(0的特殊
* 之处在于。我们不是给它设置一个起始值和结束值,而是
* 制定一个应该每个多长时间发出值的间隔时间
* 需要注意的是,通过interval()方法创建的Flux会从0开始发布值
* 并且后续的条目依次增加
* 此外,因为interval()方法没有指定最大值,所以他可能后永远运行
* 我们也可以使用take()方法将结果限制为前5个条目
*/
@Test
public void createAFlux_interval() {
Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(5);
StepVerifier.create(flux)
.expectNext(1L)
.expectNext(2L)
.expectNext(3L)
.expectNext(4L)
.expectNext(5L)
.verifyComplete();
}
组合反应式类型
/**
* 组合反应式类型
* 假设我们有两个Flux流,并且需要据此创建一个结果Flux,这个形成的Flux会在任意上游Flux流有数据时产生数据。
* 要将一个Flux与另一个Flux合并,可以使用mergeWith()方法
* 例如,假设有一个值是电视和电影角色名称的Flux,还有另一个值是这些角色喜欢吃的食物的名称的Flux。
* 下面的测试方法展示了如何使用mergeWith()方法合并两个Flux对象:
*/
@Test
public void mergeFluxs() {
Flux<String> characterFlux = Flux.just("zhangsan", "lisi", "wangwu").delayElements(Duration.ofMillis(500));
Flux<String> foodFLux = Flux.just("mifan", "jiaozi", "baozi").delaySubscription(Duration.ofMillis(250))
.delayElements(Duration.ofMillis(500));
Flux<String> mergedFlux = characterFlux.mergeWith(foodFLux);
StepVerifier.create(mergedFlux)
.expectNext("zhangsan")
.expectNext("lisi")
.expectNext("wangwu")
.expectNext("mifan")
.expectNext("jiaozi")
.expectNext("baozi")
.verifyComplete();
}
通常,Flux会尽可能快地发布数据。因此,我们在创建的两个Flux流上使用delayElements()方法来减慢它们的速度——每500毫秒发布一个条目。此外,为了使食物Flux在角色名称Flux之后再开始流式传输,我们调用了食物Flux上的delaySubscription()方法,以便它在订阅后再经过250毫秒后才开始发布数据。
在合并了两个Flux对象后,将会创建一个新的合并过后的Flux。当StepVerifier订阅这个合并过后的Flux时,它将依次订阅两个源Flux流并启动数据流。
这个合并过后的Flux数据项发布顺序与源Flux的发布时间一致。因为两个Flux对象都设置为以常规速率进行发布,所以这些值在合并后的Flux中会交错在一起,结果是:一个角色、一个食物、另一个角色、另一个食物,以此类推。如果任何一个Flux的计时发生变化,那么你可能会看到接连发布了两个角色或者两个食物。
/**
* 因为mergeWith()方法不能完美地保证源Flux之间的先后顺序,
* 所以我们可以考虑使用zip()方法。当两个Flux对象压缩在一起的时候,它将会产生一个新的发布元组的Flux,
* 其中每个元组中都包含了来自每个源Flux的数据项。
*/
@Test
public void zipFluxs() {
Flux<String> characterFlux = Flux.just("zhangsan", "lisi", "wangwu");
Flux<String> foodFLux = Flux.just("mifan", "jiaozi", "baozi");
Flux<Tuple2<String, String>> zippedFlux = Flux.zip(characterFlux, foodFLux);
StepVerifier.create(zippedFlux)
.expectNextMatches(p -> p.getT1().equals("zhangsan") && p.getT2().equals("mifan"))
.expectNextMatches(p -> p.getT1().equals("lisi") && p.getT2().equals("jiaozi"))
.expectNextMatches(p -> p.getT1().equals("wangwu") && p.getT2().equals("baozi"))
.verifyComplete();
}
/**
* 如果你不想使用Tuple2,而想要使用其他类型,
* 就可以为zip()方法提供一个合并函数来生成你想要的任何对象,
* 合并函数会传入这两个数据项
* 例如,下面的测试方法会将角色Flux与食物Flux合并在一起,以便生成一个包含String对象的Flux
*/
@Test
public void zipFluxsToObject() {
Flux<String> characterFlux = Flux.just("zhangsan", "lisi", "wangwu");
Flux<String> foodFLux = Flux.just("mifan", "jiaozi", "baozi");
Flux<String> zippedFlux = Flux.zip(characterFlux, foodFLux, (c, f) -> c + "eats" + f);
StepVerifier.create(zippedFlux)
.expectNext("zhangsan eats mifan")
.expectNext("lisi eats jiaozi")
.expectNext("wangwu eats baozi")
.verifyComplete();
}
选择第一个反应式类型进行发布
/**
* 选择第一个反应式类型进行发布
* 假设我们有两个Flux对象,此时我们不想将它们合并在一起,
* 而是想要创建一个新的Flux,让这个新的Flux从第一个产生值的Flux中发布值。
* first()操作会在两个Flux对象中选择第一个发布值的Flux,并再次发布它的值
* 下面的测试方法创建了一个快速的Flux和一个“缓慢”的Flux(
* 其中“慢”意味着它在被订阅后100毫秒才会发布数据项)。使用first()方法,
* 它将会创建一个新的Flux,这个Flux只会获取第一个源Flux发布的值,并再次发布
* 在这种情况下,因为慢速Flux会在快速Flux开始发布之后的100毫秒才发布值,
* 所以新创建的Flux将会简单地忽略慢的Flux,并仅发布来自快速Flux的值。
*/
@Test
public void firstFlux() {
Flux<String> slowFlux = Flux.just("man1", "man2", "man3").delayElements(Duration.ofMillis(100));
Flux<String> fastFlux = Flux.just("kuai1", "kuai2", "kuai3");
Flux<String> firstFlux = Flux.firstWithSignal(slowFlux, fastFlux);
StepVerifier.create(firstFlux)
.expectNext("kuai1")
.expectNext("kuai2")
.expectNext("kuai3")
.verifyComplete();
}
评论区