常见的反应式操作
转换和过滤反应式流
在数据流经一个流时,我们通常需要过滤掉某些值并对其他的值进行处理。
从反应式类型中过滤数据
数据在从Flux流出时,进行过滤的最基本方法之一是简单地忽略第一批指定数目的数据项。skip操作可以完成这样的工作。
/**
* 针对具有多个数据项的Flux,skip操作将创建一个新的Flux,
* 它会首先跳过指定数量的数据项,然后从源Flux中发布剩余的数据项。
* 在这个场景下,我们有一个具有5个String数据项的Flux。
* 在这个Flux上调用skip(3)方法后会产生一个新的Flux,它会跳过前3个数据项,
* 只发布最后2个数据项。
*/
@Test
public void skipAFew() {
Flux<String> skipFlux = Flux.just("one", "two", "skip a few", "ninety nine", "one hundred").skip(3);
StepVerifier.create(skipFlux)
.expectNext("ninety nine","one hundred")
.verifyComplete();
}
/**
* 但是,你可能并不想跳过特定数量的条目,
* 而是想要在一段时间之内跳过所有的第一批数据。
* 这是skip()操作的另一种形式,将会产生一个新Flux,
* 在发布来自源Flux的数据项之前等待指定的一段时间
* 下面的测试方法使用skip操作创建了一个在发布值之前会等待4秒的Flux。
* 因为Flux是基于一个在发布数据项之间有1秒延迟的Flux创建的(使用了delayElements()操作),
* 所以它只会发布出最后两个数据项
*/
@Test
public void skipAFewSeconds() {
Flux<String> skipFlux = Flux.just("one", "two", "skip a few", "ninety nine", "one hundred")
.delayElements(Duration.ofSeconds(1))
.skip(3);
StepVerifier.create(skipFlux)
.expectNext("ninety nine","one hundred")
.verifyComplete();
}
/**
* 我们已经看过skip操作的示例,根
* 据对skip操作的描述来看,take可以认为是与skip相反的操作。
* skip操作会跳过前面几个数据项,而take操作只发布第一批指定数量的数据项,然后将取消订阅
*/
@Test
public void take() {
Flux<String> takeFlux = Flux.just("one", "two", "three", "four", "five")
.take(3);
StepVerifier.create(takeFlux)
.expectNext("one", "two", "three")
.verifyComplete();
}
/**
* 与skip()方法一样,take()方法也有另一种替代形式,
* 基于间隔时间而不是数据项个数。它将接受并发布与源Flux一样多的数据项,
* 直到某段时间结束,之后Flux将会完成
* 下面的测试方法使用take()方法的另一种形式,将会在订阅之后的前3.5秒发布数据条目。
*/
@Test
public void take2() {
Flux<String> takeFlux = Flux.just("one", "two", "three", "four", "five")
.delayElements(Duration.ofSeconds(1))
.take(Duration.ofMillis(3500));
StepVerifier.create(takeFlux)
.expectNext("one", "two", "three")
.verifyComplete();
}
/**
* skip操作和take操作都可以被认为是过滤操作,
* 其过滤条件是基于计数或者持续时间的,
* 而Flux值的更通用过滤则是filter操作
* 我们需要指定一个Predicate,
* 用于决定数据项是否能通过Flux,filter操作允许我们根据任何条件进行选择性地发布。
* 以下操作会过滤掉中间有空格的字符串
*/
@Test
public void filter() {
Flux<String> filterFlux = Flux.just("one", "tw o", "thre e", "fo ur", "five")
.filter(np -> !np.contains(" "));
StepVerifier.create(filterFlux)
.expectNext("one","five")
.verifyComplete();
}
/**
* 我们还可能想要过滤掉已经接收过的数据条目,可以采用distinct操作
* ,形成的Flux将只会发布源Flux中尚未发布过的数据项
* 在下面的测试中,调用distinct()方法产生的Flux只会发布不同的String值
* 虽然"dog"和"bird"从源Flux中都发布了两次,
* 但是在调用distinct()方法产生的结果Flux中,它们只被发布了一次。
*/
@Test
public void distinct() {
Flux<String> distinctFlux = Flux.just("dog", "cat", "bird", "dog", "bird", "anteater")
.distinct();
StepVerifier.create(distinctFlux)
.expectNext("dog", "cat", "bird","anteater")
.verifyComplete();
}
映射反应式数据
/**
* 映射反应式数据
* 在Flux或Mono上最常见的操作之一就是将已发布的数据项转换为其他的形式或类型。
* Reactor的反应式类型(Flux和Mono)为此提供了map和flatMap操作。
* map操作会创建一个新的Flux,只是在重新发布它所接收的每个对象之前会执行给定Function指定的转换。
* 在下面的test()方法中,包含代表篮球运动员名字的String值的Flux被转换为一个包含Player对象的新Flux。
* 以lambda形式传递给map()方法的函数会将传入的String值按照空格进行拆分,
* 并使用生成的String数组来创建Player对象。使用just()方法创建的Flux包含了String对象,
* 但是map()方法产生的Flux包含了Player对象。
*/
@Test
public void map() {
Flux<Player> playerFlux = Flux.just("Michael Jordan", "Scottie Pippen", "Steve Kerr").map(n -> {
String[] split = n.split("\\s");
return new Player(split[0], split[1]);
});
StepVerifier.create(playerFlux)
.expectNext(new Player("Michael","Jordan"))
.expectNext(new Player("Scottie","Pippen"))
.expectNext(new Player("Steve","Kerr"))
.verifyComplete();
}
/**
* 其中重要的一点是:在每个数据项被源Flux发布时,map操作是同步执行的,
* 如果你想要异步地转换过程,那么你应该考虑使用flatMap操作。
* 对于flatMap操作,我们可能需要一些思考和练习才能完全掌握。
* flatMap并不像map操作那样简单地将一个对象转换到另一个对象,
* 而是将对象转换为新的Mono或Flux。结果形成的Mono或Flux会扁平化为新的Flux。
* 当与subscribeOn()方法结合使用时,flatMap操作可以释放Reactor反应式的异步能力。
*/
@Test
public void flatMap() {
Flux<Player> playerFlux = Flux.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
.flatMap(n -> Mono.just(n)
.map(p -> {
String[] split = n.split("\\s");
return new Player(split[0], split[1]);
}).subscribeOn(Schedulers.parallel()));
List<Player> playerList = Arrays.asList(
new Player("Michael","Jordan"),
new Player("Scottie","Pippen"),
new Player("Steve","Kerr")
);
StepVerifier.create(playerFlux)
.expectNextMatches(p -> playerList.contains(p))
.expectNextMatches(p -> playerList.contains(p))
.expectNextMatches(p -> playerList.contains(p))
.verifyComplete();
}
需要注意的是,我们为flatMap()方法指定了一个lambda形式的函数,传入的String将会转换为一个Mono类型的String,然后在这个Mono上通过map()方法将字符串转换为一个Player。
如果到此为止,那么产生的Flux将同样包含Player对象,与使用map()方法的例子相同,顺序同步地生成。但是我们对Mono做的最后一个动作就是调用subscribeOn()方法,它声明每个订阅都应该在并行线程中进行,因此可以异步并行地执行多个String对象的转换操作。
尽管subscribeOn()方法的命名与subscribe()方法类似,但是它们的含义却完全不同。 subscribe()方法是一个动词,订阅并驱动反应式流;而subscribeOn()方法则更具描述性,指定了如何并发地处理订阅。Reactor本身并不强制使用特定的并发模型,通过subscribeOn()方法,我们可以使用Schedulers中的任意一个静态方法来指定并发模型。在这个例子中,我们使用了parallel()方法,使用来自固定线程池(大小与CPU核心数量相同)的工作线程。Schedulers支持多种并发模型
使用flatMap()和subscribeOn()的好处是:我们可以在多个并行线程之间拆分工作,从而增加流的吞吐量。因为工作是并行完成的,无法保证哪项工作首先完成,所以结果Flux中数据项的发布顺序是未知的。
在反应流上缓存数据
/**
* 在反应式流上缓存数据
* 在处理流经Flux的数据时,你可能会发现将数据流拆分为小块会带来一定的收益。
* 我们给定一个包含多个String值的Flux,
* 其中每个值代表一种水果的名称,我们可以创建一个新的包含List集合的Flux,
* 其中每个List只有不超过指定数量的元素
*/
@Test
public void buffer() {
Flux<String> fruitFlux = Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry"
);
// Flux<List<String>> buffer = fruitFlux.buffer(3);
// StepVerifier.create(buffer)
// .expectNext(Arrays.asList("apple", "orange", "banana"))
// .expectNext(Arrays.asList("kiwi", "strawberry"))
// .verifyComplete();
Flux.just("apple", "orange", "banana", "kiwi", "strawberry")
.buffer(3)
.flatMap(x ->
Flux.fromIterable(x)
.map(y -> y.toUpperCase())
.subscribeOn(Schedulers.parallel())
.log()).subscribe();
/**
* 如果由于某些原因需要将Flux发布的所有数据项都收集到一个List中,
* 那么可以使用不带参数的buffer()方法
* 这将会产生一个新的Flux。这个Flux将会发布一个List,
* 其中包含源Flux发布的所有数据项。我们可以使用collectList操作实现相同的功能
*/
Flux<List<String>> bufferFlux = fruitFlux.buffer();
}
在这种情况下,String元素的Flux被缓冲到一个新的包含List集合的Flux中,其中每个集合不超过3个条目。因此,发出5个String值的原始Flux将会被转换为一个新的Flux,它会发出两个List集合,其中一个包含3个水果,而另一个包含2个水果。
当组合使用buffer()方法和flatMap()方法时,我们可以对每个List集合进行并行处理。
我们仍然将5个String值的Flux缓冲到一个新的包含List的Flux中,但是这次将flatMap()应用于包含List集合的Flux。这将获取每个List缓冲区,并为其中的元素创建一个新的Flux,然后对其应用map操作。因此,每个List缓冲区都会在各个线程中执行进一步并行处理。
我们观察控制台上打印的日志,为了能够看得清楚,我先删除不重要部分
2021-03-09 17:36:15.149 INFO 23116 --- [ main] reactor.Flux.SubscribeOn.1 : onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
2021-03-09 17:36:15.156 INFO 23116 --- [ main] reactor.Flux.SubscribeOn.1 : request(32)
2021-03-09 17:36:15.161 INFO 23116 --- [ main] reactor.Flux.SubscribeOn.2 : onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
2021-03-09 17:36:15.161 INFO 23116 --- [ main] reactor.Flux.SubscribeOn.2 : request(32)
2021-03-09 17:36:15.165 INFO 23116 --- [ parallel-1] reactor.Flux.SubscribeOn.1 : onNext(APPLE)
2021-03-09 17:36:15.166 INFO 23116 --- [ parallel-2] reactor.Flux.SubscribeOn.2 : onNext(KIWI)
2021-03-09 17:36:15.167 INFO 23116 --- [ parallel-2] reactor.Flux.SubscribeOn.2 : onNext(STRAWBERRY)
2021-03-09 17:36:15.167 INFO 23116 --- [ parallel-1] reactor.Flux.SubscribeOn.1 : onNext(ORANGE)
2021-03-09 17:36:15.168 INFO 23116 --- [ parallel-1] reactor.Flux.SubscribeOn.1 : onNext(BANANA)
2021-03-09 17:36:15.169 INFO 23116 --- [ parallel-2] reactor.Flux.SubscribeOn.2 : onComplete()
2021-03-09 17:36:15.169 INFO 23116 --- [ parallel-1] reactor.Flux.SubscribeOn.1 : onComplete()
如同日志记录所清晰展示的,第一个缓冲区(apple、orange和banana)中的水果在parallel-1线程中处理;与此同时,第二个缓冲区(kiwi和strawberry)中的水果在parallel-2线程中处理。从缓冲区的日志记录交织在一起的事实可以明显地看出,对两个缓冲区的处理是并行执行的。
当有些时候我们需要将Flux发布的数据项都收集到一个List当中时,我们可以试试不带参数的buffer()方法。
这将会产生一个新的Flux。这个Flux将会发布一个List,其中包含源Flux发布的所有数据项。我们可以使用collectList操作实现相同的功能。
/**
* collectList()方法会产生一个发布List的Mono,
* 而不是发布List的Flux。下面的测试方法展示了它的用法
*/
@Test
public void collectList() {
Flux<String> fruitFlux = Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry"
);
Mono<List<String>> fruitListMono = fruitFlux.collectList();
StepVerifier.create(fruitListMono)
.expectNext(Arrays.asList("apple", "orange", "banana", "kiwi", "strawberry"))
.verifyComplete();
}
/**
* 一种更加有趣的收集Flux发出的数据项的方法是将它们收集到Map中。
* collectMap操作将会产生一个发布Map的Mono,
* 这个Map中填充了由给定Function计算key值所生成的条目
* 源Flux会发布一些动物的名字。基于这个Flux,我们使用collectMap操作创建了一个发布Map的新Mono,
* 其中key由动物名称的首字母确定,而值则为动物名称本身。
* 如果两个动物名称以相同的字母开头(如elephant和eagle,或者koala和kangaroo),
* 那么流经该流的最后一个条目将会覆盖先前的条目。
*/
@Test
public void collectMap() {
Flux<String> animalFlux = Flux.just("aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Map<Character, String>> animalMono = animalFlux.collectMap(a -> a.charAt(0));
}
源Flux会发布一些动物的名字。基于这个Flux,我们使用collectMap操作创建了一个发布Map的新Mono,其中key由动物名称的首字母确定,而值则为动物名称本身。如果两个动物名称以相同的字母开头(如elephant和eagle,或者koala和kangaroo),那么流经该流的最后一个条目将会覆盖先前的条目.
在反应式类型上执行逻辑操作
有时候我们想要知道由Mono或者Flux发布的条目是否满足某些条件,那么all()和any()方法可以实现这样的逻辑。
下面两张图可以说明使用all()
和any()
方法的原理
/**
* 在反应式类型上执行逻辑操作
* 有时候我们想要知道由Mono或者Flux发布的条目是否满足某些条件,
* 那么all()和any()方法可以实现这样的逻辑
* 假设我们想知道Flux发布的每个String中是否都包含了字母a或字母k,
* 那么下面的测试将使用all()方法来检查这个条件
*/
@Test
public void all() {
Flux<String> animalFlux = Flux.just("aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasAMono = animalFlux.all(n -> n.contains("a"));
StepVerifier.create(hasAMono)
.expectNext(true)
.verifyComplete();
//是否有一个有k
Mono<Boolean> hasKMono = animalFlux.any(n -> n.contains("k"));
StepVerifier.create(hasKMono)
.expectNext(true)
.verifyComplete();
}
评论区