# Reactor

back

Project Reactor(以下简称“Reactor”)与Spring是兄弟项目,侧重于Server端的响应式编程,主要 artifact 是 reactor-core,这是一个基于 Java 8 的实现了响应式流规范 (Reactive Streams specification)的响应式库。

# 添加依赖

back

<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.3.0.RELEASE</version>
</dependency>

1
2
3
4
5
6
7

出于测试的需要,添加如下依赖

<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-test -->
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <version>3.3.0.RELEASE</version>
    <scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.13-rc-1</version>
    <scope>test</scope>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# Flux与Mono

back

Reactor中的发布者(Publisher)由Flux和Mono两个类定义,它们都提供了丰富的操作符(operator)。
一个Flux对象代表一个包含0..N个元素的响应式序列,
而一个Mono对象代表一个包含零/一个(0..1)元素的结果。

Integer[] array = new Integer[]{1,2,3,4,5,6};
Flux.fromArray(array);
List<Integer> list = Arrays.asList(array);
Flux.fromIterable(list);
Stream<Integer> stream = list.stream();
Flux.fromStream(stream);
1
2
3
4
5
6
// 只有完成信号的空数据流
Flux.just();
Flux.empty();
Mono.empty();
Mono.justOrEmpty(Optional.empty());
// 只有错误信号的数据流
Flux.error(new Exception("some error"));
Mono.error(new Exception("some error"));
1
2
3
4
5
6
7
8

# 订阅前什么都不会发生

back

Flux.just(1, 2, 3, 4, 5, 6).subscribe(System.out::print);
System.out.println();
Mono.just(1).subscribe(System.out::println);
1
2
3

输出如下:

123456
1
1
2

Flux和Mono还提供了多个subscribe方法的变体:

// 订阅并触发数据流
subscribe();
// 订阅并指定对正常数据元素如何处理
subscribe(Consumer<? super T> consumer);
// 订阅并定义对正常数据元素和错误信号的处理
subscribe(Consumer<? super T> consumer,
 Consumer<? super Throwable> errorConsumer);
// 订阅并定义对正常数据元素、错误信号和完成信号的处理
subscribe(Consumer<? super T> consumer,
 Consumer<? super Throwable> errorConsumer,
 Runnable completeConsumer);
// 订阅并定义对正常数据元素、错误信号和完成信号的处理,以及订阅发生时的处理逻辑
subscribe(Consumer<? super T> consumer,
 Consumer<? super Throwable> errorConsumer,
 Runnable completeConsumer,
 Consumer<? super Subscription> subscriptionConsumer);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 体会

  • 相对于传统的基于回调和Future的异步开发方式,响应式编程更加具有可编排性和可读性,配合lambda表达式,代码更加简洁,处理逻辑的表达就像装配“流水线”,适用于对数据流的处理;
  • 在订阅(subscribe)时才触发数据流,这种数据流叫做“冷”数据流,就像插座插上电器才会有电流一样,还有一种数据流不管是否有订阅者订阅它都会一直发出数据,称之为“热”数据流,Reactor中几乎都是“冷”数据流;
  • 调度器对线程管理进行更高层次的抽象,使得我们可以非常容易地切换线程执行环境; 灵活的错误处理机制有利于编写健壮的程序;
  • “回压”机制使得订阅者可以无限接受数据并让它的源头“满负荷”推送所有的数据,也可以通过使用request方法来告知源头它一次最多能够处理 n 个元素,从而将“推送”模式转换为“推送+拉取”混合的模式。