侧边栏壁纸
博主头像
qingtian博主等级

喜欢是一件细水流长的事,是永不疲惫的双向奔赴~!

  • 累计撰写 85 篇文章
  • 累计创建 40 个标签
  • 累计收到 0 条评论

反应式代码

qingtian
2021-03-05 / 0 评论 / 0 点赞 / 978 阅读 / 9,474 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2021-03-05,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

反应式代码

在开发应用程序代码时,我们可以编写两种风格的代码,即命令式和反应式。

  • 命令式的代码:非常类似于虚拟的报纸订阅方式。它由一组任务组成,每次只运行一项任务,每项任务又都依赖于前面的任务。数据会按批次进行处理,在前一项任务还没有完成对当前数据批次的处理时,不能将这些数据递交给下一项处理任务。
  • 反应式的代码:非常类似于真实的报纸订阅方式。它定义了一组用来处理数据的任务,但是这些任务可以并行地执行。每项任务处理数据的一部分子集,并将结果交给处理流程中的下一项任务,同时继续处理数据的另一部分子集。

Reactor

Mono是Reactor的两种核心类型之一,另一个类型是Flux。两者都实现了反应式流的Publisher接口。Flux代表具有零个、一个或者多个(可能是无限个)数据项的管道。Mono是一种特殊的反应式类型,针对数据项不超过一个的场景,它进行了优化。

使用反应式流图

反应式流图通常使用弹珠图(marble diagram)表示。

弹珠图的展示形式非常简单,在顶部描述了流经Flux或者Mono的时间线,在中间描述了要执行的操作,在底部描述了结果形成的Flux或者Mono的时间线。我们将会看见,当数据流经原始的Flux时,某些操作会对它进行处理,并产生一个新的Flux,已经处理过的数据将会在新Flux中流动。

描绘Flux基本流程的弹珠图

save_share_review_picture_1614908248

描绘Mono基本流程的弹珠图

save_share_review_picture_1614908255

根据对象创建Flux

/**
 * 根据对象创建
 * 下面的测试方法使用Flux或Mono上的静态just方法来创建一个反应式类型
 * 他们的数据会由这些对象来驱动
 * 下面方法将从5个String对象中创建一个Flux
 */
@Test
public void createAFlux_just() {
    Flux<String> fruitFlux = Flux.just("Apple","Orange","Grape","Banana","Strawberry");

    //添加一个订阅者
    fruitFlux.subscribe(f -> System.out.println("Here's some fruit: " + f));

    //要了解玉东易的数据是否流经了fruitFlux,我们可以编写如下代码
    /**
     * StepVerifier订阅了fruitFlux,然后断言Flux中的每一个数据项是否与预期的水果名称相匹配。
     * 最后,它验证Flux在发布完"Strawberry"之后,整个fruitFlux正常完成
     */
    StepVerifier.create(fruitFlux)
            .expectNext("Apple")
            .expectNext("Orange")
            .expectNext("Grape")
            .expectNext("Banana")
            .expectNext("Strawberry")
            .verifyComplete();



}

根据集合创建

save_share_review_picture_1614909818

/**
     * 根据集合创建
     * 要根据数组创建Flux,可以调用Flux上的静态方法fromArray(),并传入一个原数组
     */
    @Test
    public void createAFlux_fromArray() {

        String[] fruits = new String[]{
                "Apple","Orange","Grape","Banana","Strawberry"
        };


        Flux<String> fruitFlux = Flux.fromArray(fruits);

        StepVerifier.create(fruitFlux)
                .expectNext("Apple")
                .expectNext("Orange")
                .expectNext("Grape")
                .expectNext("Banana")
                .expectNext("Strawberry")
                .verifyComplete();


    }

/**
     * 如果我们需要根据List,Set,或者其他任意Iterable的实现来创建Flux,那么
     * 可以使用静态的fromIterable方法
     */
    @Test
    public void createAFlux_fromIterable() {

        ArrayList<String> fruitList = new ArrayList<>();

        fruitList.add("Apple");
        fruitList.add("Orange");
        fruitList.add("Grape");
        fruitList.add("Banana");
        fruitList.add("Strawberry");



        Flux<String> fruitFlux = Flux.fromIterable(fruitList);

        StepVerifier.create(fruitFlux)
                .expectNext("Apple")
                .expectNext("Orange")
                .expectNext("Grape")
                .expectNext("Banana")
                .expectNext("Strawberry")
                .verifyComplete();

    }


/**
     * 或者我们拥有一个Java Stream,并且希望将其用作Flux的源
     */
    @Test
    public void createAFlux_fromStream() {

        Stream<String> stream = Stream.of("Apple", "Orange", "Grape", "Banana", "Strawberry");

        Flux<String> stringFlux = Flux.fromStream(stream);
        StepVerifier.create(stringFlux)
                .expectNext("Apple")
                .expectNext("Orange")
                .expectNext("Grape")
                .expectNext("Banana")
                .expectNext("Strawberry")
                .verifyComplete();
    }

生成Flux的数据

save_share_review_picture_1614912060


/**
     * 生成Flux的数据
     * 有时候我们根本没有可用的数据,而只是想要一个作为计数器的Flux,他会
     * 在每次发送新值时增加1.
     * 要创建一个计数器Flux,我们可以使用静态方法range()
     * 在这个例子中,我们创建了一个区间Flux,起始值为1,结束值为5
     */
    @Test
    public void createAFlux_range() {
        Flux<Integer> intervalFlux = Flux.range(1, 5);

        StepVerifier.create(intervalFlux)
                .expectNext(1)
                .expectNext(2)
                .expectNext(3)
                .expectNext(4)
                .expectNext(5)
                .verifyComplete();
    }

save_share_review_picture_1614912066


    /**
     * 另一个与range()方法类似的是interval()
     * 与range()方法相同的是,interval()方法会创建一个发布值递增的Flux。interval(0的特殊
     * 之处在于。我们不是给它设置一个起始值和结束值,而是
     * 制定一个应该每个多长时间发出值的间隔时间
     * 需要注意的是,通过interval()方法创建的Flux会从0开始发布值
     * 并且后续的条目依次增加
     * 此外,因为interval()方法没有指定最大值,所以他可能后永远运行
     * 我们也可以使用take()方法将结果限制为前5个条目
     */
    @Test
    public void createAFlux_interval() {
        Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(5);

        StepVerifier.create(flux)
                .expectNext(1L)
                .expectNext(2L)
                .expectNext(3L)
                .expectNext(4L)
                .expectNext(5L)
                .verifyComplete();
    }

组合反应式类型

image-20210305121628767

/**
     * 组合反应式类型
     * 假设我们有两个Flux流,并且需要据此创建一个结果Flux,这个形成的Flux会在任意上游Flux流有数据时产生数据。
     * 要将一个Flux与另一个Flux合并,可以使用mergeWith()方法
     * 例如,假设有一个值是电视和电影角色名称的Flux,还有另一个值是这些角色喜欢吃的食物的名称的Flux。
     * 下面的测试方法展示了如何使用mergeWith()方法合并两个Flux对象:
     */
    @Test
    public void mergeFluxs() {
        Flux<String> characterFlux = Flux.just("zhangsan", "lisi", "wangwu").delayElements(Duration.ofMillis(500));
        Flux<String> foodFLux = Flux.just("mifan", "jiaozi", "baozi").delaySubscription(Duration.ofMillis(250))
                .delayElements(Duration.ofMillis(500));
        Flux<String> mergedFlux = characterFlux.mergeWith(foodFLux);
        StepVerifier.create(mergedFlux)
                .expectNext("zhangsan")
                .expectNext("lisi")
                .expectNext("wangwu")
                .expectNext("mifan")
                .expectNext("jiaozi")
                .expectNext("baozi")
                .verifyComplete();

    }

通常,Flux会尽可能快地发布数据。因此,我们在创建的两个Flux流上使用delayElements()方法来减慢它们的速度——每500毫秒发布一个条目。此外,为了使食物Flux在角色名称Flux之后再开始流式传输,我们调用了食物Flux上的delaySubscription()方法,以便它在订阅后再经过250毫秒后才开始发布数据。

在合并了两个Flux对象后,将会创建一个新的合并过后的Flux。当StepVerifier订阅这个合并过后的Flux时,它将依次订阅两个源Flux流并启动数据流。

这个合并过后的Flux数据项发布顺序与源Flux的发布时间一致。因为两个Flux对象都设置为以常规速率进行发布,所以这些值在合并后的Flux中会交错在一起,结果是:一个角色、一个食物、另一个角色、另一个食物,以此类推。如果任何一个Flux的计时发生变化,那么你可能会看到接连发布了两个角色或者两个食物。

image-20210305121830770

/**
     * 因为mergeWith()方法不能完美地保证源Flux之间的先后顺序,
     * 所以我们可以考虑使用zip()方法。当两个Flux对象压缩在一起的时候,它将会产生一个新的发布元组的Flux,
     * 其中每个元组中都包含了来自每个源Flux的数据项。
     */
    @Test
    public void zipFluxs() {

        Flux<String> characterFlux = Flux.just("zhangsan", "lisi", "wangwu");
        Flux<String> foodFLux = Flux.just("mifan", "jiaozi", "baozi");

        Flux<Tuple2<String, String>> zippedFlux = Flux.zip(characterFlux, foodFLux);
        StepVerifier.create(zippedFlux)
                .expectNextMatches(p -> p.getT1().equals("zhangsan") && p.getT2().equals("mifan"))
                .expectNextMatches(p -> p.getT1().equals("lisi") && p.getT2().equals("jiaozi"))
                .expectNextMatches(p -> p.getT1().equals("wangwu") && p.getT2().equals("baozi"))
                .verifyComplete();


    }

image-20210305121918518

/**
     * 如果你不想使用Tuple2,而想要使用其他类型,
     * 就可以为zip()方法提供一个合并函数来生成你想要的任何对象,
     * 合并函数会传入这两个数据项
     * 例如,下面的测试方法会将角色Flux与食物Flux合并在一起,以便生成一个包含String对象的Flux
     */
    @Test
    public void zipFluxsToObject() {

        Flux<String> characterFlux = Flux.just("zhangsan", "lisi", "wangwu");
        Flux<String> foodFLux = Flux.just("mifan", "jiaozi", "baozi");

        Flux<String> zippedFlux = Flux.zip(characterFlux, foodFLux, (c, f) -> c + "eats" + f);
        StepVerifier.create(zippedFlux)
                .expectNext("zhangsan eats mifan")
                .expectNext("lisi eats jiaozi")
                .expectNext("wangwu eats baozi")
                .verifyComplete();
    }

选择第一个反应式类型进行发布

image-20210305122352951

/**
     * 选择第一个反应式类型进行发布
     * 假设我们有两个Flux对象,此时我们不想将它们合并在一起,
     * 而是想要创建一个新的Flux,让这个新的Flux从第一个产生值的Flux中发布值。
     * first()操作会在两个Flux对象中选择第一个发布值的Flux,并再次发布它的值
     * 下面的测试方法创建了一个快速的Flux和一个“缓慢”的Flux(
     * 其中“慢”意味着它在被订阅后100毫秒才会发布数据项)。使用first()方法,
     * 它将会创建一个新的Flux,这个Flux只会获取第一个源Flux发布的值,并再次发布
     * 在这种情况下,因为慢速Flux会在快速Flux开始发布之后的100毫秒才发布值,
     * 所以新创建的Flux将会简单地忽略慢的Flux,并仅发布来自快速Flux的值。
     */
    @Test
    public void firstFlux() {

        Flux<String> slowFlux = Flux.just("man1", "man2", "man3").delayElements(Duration.ofMillis(100));
        Flux<String> fastFlux = Flux.just("kuai1", "kuai2", "kuai3");

        Flux<String> firstFlux = Flux.firstWithSignal(slowFlux, fastFlux);
        StepVerifier.create(firstFlux)
                .expectNext("kuai1")
                .expectNext("kuai2")
                .expectNext("kuai3")
                .verifyComplete();
    }
0

评论区