1. 创建Flux及Mono
1.1 使用just从现有的已知内容和大小的数据创建Flux或Mono
Flux.just(new Integer[]{1, 2, 3, 4})
//观察者监听被观察者(消费者)
.subscribe(System.out::println);
//使用可变参数创建Flux
Flux.just(1, 2, 3, 4)
.subscribe(System.out::println);
//使用just创建Mono
Mono.just("1s")
.subscribe(System.out::println);
Mono.just(new Integer[]{1, 2, 3, 4})
.subscribe(System.out::println);
1.2 使用fromIterable从可迭代对象中创建Flux
//从可迭代的对象中创建Flux
Flux.fromIterable(Arrays.asList(1,2,3,4))
.subscribe(System.out::println);
ArrayList<Integer> list = Lists.newArrayList(1, 2, 3, 4);
Flux<Integer> flux = Flux.fromIterable(list);
//在创建Flux后追加元素
list.add(5);
flux.subscribe(System.out::println);
1.3 使用fromStream从集合流中创建Flux
Flux.fromStream(Stream.of(1,2,3,4))
.subscribe(System.out::println);
1.4 使用range中创建一个范围内迭代的Flux
Flux.range(0,10)
.subscribe(System.out::println);
1.5 创建定时发送Flux
1.5.1 使用interval创建间隔某一时间异步执行的Flux
//interval() 方法可以用来生成从 0 开始递增的 Long 对象的数据序列
Flux.interval(Duration.ofMillis(100))
// map可以对数据进行处理
.map(i->"执行内容:"+i)
//限制执行10次
.take(5)
.subscribe(System.out::println);
//避免主线程提前结束
Thread.sleep(1100);
1.5.2 使用delayElements延时发送
Flux.fromIterable(Lists.newArrayList(1,2,3,4))
//延时发送
.delayElements(Duration.ofMillis(100L))
.subscribe(System.out::println);
//避免主线程提前结束
Thread.sleep(1100);
1.6 Flux与Mono之间的相互转换
1.6.1 Flux与Mono之间的相互转换
Flux<Integer> just = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
just.subscribe(System.out::println);
Mono<List<Integer>> mono = just.collectList();
mono.subscribe(System.out::println);
//自定义收集器
Mono<List<Integer>> monoList = just.collect(toList());
monoList.subscribe(System.out::println);
//Mono转仅有一个元素的Flux
Flux<List<Integer>> flux = mono.flux();
flux.subscribe(System.out::println);
//将一个元素的Flux转Mono
Mono<Integer> single = Flux.just(1).single();
single.subscribe(System.out::println);
1.6.2 使用concatWith从多个Mono组合成Flux
Flux<String> flux = Mono.just("1").concatWith(Mono.just("2"));
flux.subscribe(System.out::println);
1.6.3 使用concatWithValues追加Flux
//连接多个Flux
Flux.just("连接")
//连接两个Flux
.concatWith(Flux.just("两个"))
//将元素追加到Flux
.concatWithValues("或追加")
.subscribe(System.out::print);
1.7 动态方法创建 Flux
1.7.1 generate动态创建Flux
generate() 方法生成 Flux 序列依赖于 Reactor 所提供的 SynchronousSink 组件,定义如下。
public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)
SynchronousSink 组件包括 next()、complete() 和 error() 这三个核心方法。从 SynchronousSink 组件的命名上就能知道它是一个同步的 Sink 组件,也就是说元素的生成过程是同步执行的。这里要注意的是 next() 方法只能最多被调用一次。使用 generate() 方法创建 Flux 的示例代码如下。
// 同步动态创建, next() 方法只能最多被调用一次
Flux.generate(sink -> {
sink.next("1");
//第二次会报错:
//java.lang.IllegalStateException: More than one call to onNext
//sink.next("2");
//如果不调用 complete() 方法,那么就会生成一个所有元素均为“1”的无界数据流
sink.complete();
}).subscribe(System.out::println);
如果想要在序列生成过程中引入状态,那么可以使用如下所示的 generate() 方法重载。
generate() 重载方法:
public static <T, S> Flux<T> generate(
Callable<S> stateSupplier,
BiFunction<S, SynchronousSink<T>, S> generator) {
return onAssembly(new FluxGenerate<>(stateSupplier, generator));
}
示例:
Flux.generate(() -> 1, (i, sink) -> {
sink.next(i);
if (i == 5) {
sink.complete();
}
return ++i;
}).subscribe(System.out::println);
这里我们引入了一个代表中间状态的变量 i,然后根据 i 的值来判断是否终止序列。()->1 设置初始态
显然,以上代码的执行效果会在控制台中输入 1 到 5 这 5 个数字。
1.7.2 create动态创建Flux
create() 方法与 generate() 方法比较类似,但它使用的是一个 FluxSink 组件,定义如下。
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)
FluxSink 除了 next()、complete() 和 error() 这三个核心方法外,还定义了背压策略,并且可以在一次调用中产生多个元素。使用 create() 方法创建 Flux 的示例代码如下。
Flux.create(sink -> {
for (int i = 0; i < 5; i++) {
sink.next("Tang" + i);
}
sink.complete();
}).subscribe(System.out::println);
运行该程序,我们会在系统控制台上得到从“Tang0”到“Tang4”的 5 个数据。
1.8 Mono 对象创建响应式流
对于 Mono 而言,可以认为它是 Flux 的一种特例,所以很多创建 Flux 的方法同样适用。针对静态创建 Mono 的场景,前面给出的 just()、empty()、error() 和 never() 等方法同样适用。除了这些方法之外,比较常用的还有 justOrEmpty() 等方法。
justOrEmpty() 方法会先判断所传入的对象中是否包含值,只有在传入对象不为空时,Mono 序列才生成对应的元素,该方法示例代码如下。
Mono.justOrEmpty(Optional.of("Tang")).subscribe(System.out::println);
另一方面,如果要想动态创建 Mono,我们同样也可以通过 create() 方法并使用 MonoSink 组件,示例代码如下。
Mono.create(sink -> sink.success("Tang")).subscribe(System.out::println);
使用fromCallable动态创建Mono
Mono.fromCallable(() -> {
Thread.sleep(1000);
return "1";
}).subscribe(System.out::println);
2.异常处理
2.1 创建包含异常的Flux和Mono
//直接创建一个包含异常的Flux
Flux.error(new Exception());
//直接创建一个包含异常的Mono
Mono.error(new Exception());
2.2 异常处理
Mono.just("1")
//连接一个包含异常的Mono
.concatWith(Mono.error(new Exception("Exception")))
//异常监听
.doOnError(error -> System.out.println("错误: " + error))
//在发生异常时将其入参传递给订阅者
.onErrorReturn("ErrorReturn")
.subscribe(System.out::println);
2.2.1 调用subscribe可以指定需要处理的消息类型
consumer:正常消费,errorConsumer:异常处理,completeConsumer:消费完成
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer)
Flux.just(1,2,3,4)
.concatWith(Mono.error(new Exception("Exception")))
.subscribe(System.out::println,System.err::println,()-> System.out.println("完成"));
2.2.2 通过onErrorResume当发生异常时重新产生新的流
Flux.just(1,2,3,4)
.concatWith(Mono.error(new Exception("Exception")))
.onErrorResume(e -> {
System.out.println(e);
return Flux.just(11,12,13);
})
.subscribe(System.out::println);
2.2.3 retry重试
Flux.just(1,2,3,4)
.concatWith(Mono.error(new Exception("Exception")))
.retry(1)
.subscribe(System.out::println);
3. 常用方法
3.1 merge 、mergeSequential、mergeComparing
3.1.1 merge
merge按照所有流中元素的实际产生序列来合并
Flux.merge(Flux.interval(Duration.ofMillis(10)).take(5),
Flux.interval(Duration.ofMillis(10)).take(3))
.log()
.subscribe();
Thread.sleep(1000);
3.1.2 mergeSequential
mergeSequential按照所有流被订阅的顺序,以流为单位进行合并。
例如: FluxA 和FluxB 只有在A消费完后才会去消费B
Flux.mergeSequential(Flux.interval(Duration.ofMillis(10)).take(5),
Flux.interval(Duration.ofMillis(10)).take(3))
.log()
.subscribe();
Thread.sleep(1000);
3.1.3 mergeComparing
消费两个流中较小的那个
prefetch:比较个数
comparator:比较器
sources:数据流
public static <T> Flux<T> mergeComparing(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources)
Flux.mergeComparing(Flux.just(1,2,9,4,76,6),
Flux.just(2,75,4,3,5,6))
.log()
.subscribe();
Thread.sleep(1000);
3.2 buffer、bufferTimeout、bufferWhile、bufferUntil
把当前流中的元素收集到集合中,并把集合对象作为流中的新元素
3.2.1 buffer: 收集为集合
当maxSize > skip 时 重叠 如3,2 [1,2,3],[3,4,5],[5,6,7],[7,8,9],[9,10]
当maxSize < skip 时 重叠 如3,4 [1,2,3],[5,6,7],[9,10]
当maxSize = skip 时 准确分割 等价于只传maxSize 如3,3 [1,2,3],[4,5,6],[7,8,9],[10]
Flux.range(1, 10)
// .buffer(3,2)
// .buffer(3,4)
// .buffer(3,3)
.buffer(3)
.subscribe(System.out::println);
3.2.2 bufferTimeout: 可以根据maxSize 或 skip 满足其一就可以切割集合
Flux.interval(Duration.ofMillis(100L))
.bufferTimeout(9,Duration.ofMillis(1000L))
.subscribe(System.out::println);
Thread.sleep(10000);
3,2.3 bufferWhile: 则只有当Predicate返回true时才会收集。一旦为false,会立即开始下一次收集。
Flux.range(1, 10)
.bufferWhile(i -> i % 2 == 0)
.subscribe(System.out::println);
3.2.4 bufferUntil: 会一直收集直到Predicate返回true。
Flux.range(1, 10)
.bufferUntil(i -> i % 2 == 0)
.subscribe(System.out::println);
cutBefore true : Predicate返回true放到下一个集合中
Flux.range(1, 10)
.bufferUntil(i -> i % 2 == 0,true)
.subscribe(System.out::println);
3.3 Filter 对流中包含的元素进行过滤
对流中包含的元素进行过滤,只留下满足Predicate指定条件的元素。
Flux.range(1, 10).filter(i -> i%2 == 0).subscribe(System.out::println);
3.4 zipWith 把当前流中的元素与另一个流中的元素按照一对一的方式进行合并
把当前流中的元素与另一个流中的元素按照一对一的方式进行合并。多的元素被舍弃,可以通过BiFunction函数对合并的元素进行处理
Flux.just(1, 2)
.zipWith(Flux.just(3, 4))
.subscribe(System.out::println);
//通过BiFunction函数对合并的元素进行处理
Flux.just(1, 2,3)
.zipWith(Flux.just(4, 5), (s1, s2) -> s1 + "-" + s2)
subscribe(System.out::println);
3.5 take 用来从当前流中提取元素
//提取指定数量的元素
Flux.range(1, 1000)
.take(10)
.subscribe(System.out::println);
//按时间间隔提取元素
Flux.interval(Duration.ofMillis(10))
.take(Duration.ofMillis(100))
.subscribe(System.out::println);
Thread.sleep(1000);
//提取最后N个元素
Flux.range(1, 1000)
.takeLast(10)
.subscribe(System.out::println);
//当Predicate返回true时才进行提取
Flux.range(1, 1000)
.takeWhile(i -> i < 10)
.subscribe(System.out::println);
//提取元素直到Predicate返回true
Flux.range(1, 1000)
.takeUntil(i -> i == 10)
.subscribe(System.out::println);
3.6 reduce和reduceWith 归约操作,可以进行累加累乘等操作
Flux.range(1, 100)
.reduce((x, y) -> x + y)
.subscribe(System.out::println);
//设定默认值
Flux.range(1, 100)
.reduce(100,(x, y) -> x + y)
.subscribe(System.out::println);
//可以设置Supplier初始值
Flux.range(1, 100)
.reduceWith(() -> 100, (x ,y) -> x + y)
.subscribe(System.out::println)
3.7 flatMap和flatMapSequential 把流中的每个元素转换成一个流,再把所有流中的元素进行合并。
//flatMap按实际生产顺序进行合并
Flux.just(5, 10)
.flatMap(x -> Flux.interval(
Duration.ofMillis(x * 10),
Duration.ofMillis(100)).take(x)
)
.subscribe(System.out::println);
Thread.sleep(1000);
//flatMapSequential按订阅顺序进行合并
Flux.just(5, 10)
.flatMapSequential(x -> Flux.interval(
Duration.ofMillis(x * 10),
Duration.ofMillis(100)).take(x)
)
.subscribe(System.out::println);
Thread.sleep(1000);
3.8 concatMap 把流中的每个元素转换成一个流,再把所有流进行合并
concatMap会根据原始流中的元素顺序依次把转换之后的流进行合并,并且concatMap堆转换之后的流的订阅是动态进行的,而flatMapSequential在合并之前就已经订阅了所有的流。
Flux.just(5, 10)
.concatMap(x -> Flux.interval(
Duration.ofMillis(x * 10),
Duration.ofMillis(100)).take(x)
)
.subscribe(System.out::println);
Thread.sleep(1000);
3.9 combineLatest 把所有流中的最新产生的元素合并成一个新的元素
把所有流中的最新产生的元素合并成一个新的元素,作为返回结果流中的元素。只要其中任何一个流中产生了新的元素,合并操作就会被执行一次,结果流中就会产生新的元素。
Flux.combineLatest(Arrays::toString,
Flux.interval(Duration.ofMillis(50), Duration.ofMillis(100)).take(5),
Flux.interval(Duration.ofMillis(50), Duration.ofMillis(100)).take(5))
.subscribe(System.out::println);
Thread.sleep(1000);
3.10 使用skip跳过元素
//跳过指定条数
Flux.just(1,2,3,4,5,6,7)
.skip(2)
.subscribe(System.out::println);
//跳过指定时间间隔
Flux.interval(Duration.ofMillis(100))
.skip(Duration.ofMillis(300))
.log()
.subscribe();
Thread.sleep(1000);
3.11 使用distinct去重
Flux.just(1,1,2,2,5,6,7)
.distinct()
.subscribe(System.out::println);
3.12 从Flux获取首元素和尾元素
Flux<Integer> just = Flux.just(1, 2, 3, 4, 5, 6, 7);
Mono<Integer> last = just.last();
Mono<Integer> first = just.next();
3.13 从Flux阻塞式取一个元素
Flux<String> flux = Flux.create(skin -> {
for (int i = 0; i < 2; ++i) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
skin.next("这是第" + i + "个元素");
}
skin.complete();
});
//flux订阅者所有操作都是无副作用的,即不会改变原flux对象数据
//阻塞式订阅,只要有一个元素进入Flux
String first = flux.blockFirst();
//输出: 这是第0个元素
System.out.println(first);
//还是输出: 这是第0个元素
System.out.println(flux.blockFirst());
//输出: 这是第1个元素
System.out.println(flux.blockLast());
//还是输出: 这是第1个元素
System.out.println(flux.blockLast());
3.14 监听:doOnError、doOnComplete、doFinally、doOnSubscribe
Flux.just(1,2,3,4,5,6,7,8,9)
.concatWith(Flux.error(new Exception()))
//错误时执行
.doOnError(e -> System.out.println("报错:" + e))
//完成时执行
.doOnComplete(()-> System.out.println("数据接收完成"))
//最后执行
.doFinally(t-> System.out.println("最后执行信息:" + t))
.subscribe(System.out::println);
//消费者参与前执行的最后一件事,入参为消费者对象(一般用于修改、添加、删除源数据流)
Flux.just(1,2,3,4,5,6,7,8,9)
.log()
.doOnSubscribe(i ->{
System.out.println("先请求2个");
i.request(2);
System.out.println("再请求3个");
i.request(3);
i.cancel();
System.out.println("取消监听");
})
.subscribe(System.out::println);
4. 背压------主动控制订阅量
4.1 原始的Subscriber::onNext
Flux.interval(Duration.ofMillis(10L))
.subscribe(new Subscriber<Long>() {
Subscription subscription;
AtomicInteger count = new AtomicInteger();
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
//首先请求5个
subscription.request(5);
count.set(5);
}
@Override
public void onNext(Long aLong) {
System.out.print(" value:" + aLong);
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (count.decrementAndGet() <= 0){
System.out.println(" 消费完成,重新请求5个");
subscription.request(5);
count.set(5);
}
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
System.out.println("全部消费完成");
}
});
Thread.sleep(5000L);
4.2 BaseSubscriber
Flux.range(1,50)
.log()
.subscribe(new BaseSubscriber<Integer>() {
private int count = 0;
private final int limit = 5;
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(limit);
}
@Override
protected void hookOnNext(Integer value) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (++count == limit){
request(count);
count = 0;
}
}
});
4.3 limitRate
Flux.interval(Duration.ofMillis(10L))
.take(10)
.log()
.limitRate(4)
.subscribe();
Thread.sleep(1000L);
当已经处理75%的数据量时会重新请求下一批数据