SpringBoot2.0不容错过的新特性 WebFlux响应式
慕课网学习地址:https://coding.imooc.com/class/209.html
原课程是付费的,但是同事发了我下载离线版,拿来抽空学呗,学无止境。
第一节 课程介绍
学习之路第二节 函数式编程和lambda表达式
- 数组里取最小值
public static void main(String[] args) {
int[] nums = {33,55,-55,90,-666,90};
int min = Integer.MAX_VALUE;
for (int i : nums) {
if(i < min) {
min = i;
}
}
System.out.println(min);
// jdk8
int min2 = IntStream.of(nums).parallel().min().getAsInt();
System.out.println(min2);
}
- 多线程 匿名内部类
public static void main(String[] args) {
Object target = new Runnable() {
@Override
public void run() {
System.out.println("ok");
}
};
new Thread((Runnable) target).start();
// jdk8 lambda
Object target2 = (Runnable)() -> System.out.println("ok");
Runnable target3 = () -> System.out.println("ok");
System.out.println(target2 == target3); // false
new Thread((Runnable) target2).start();
}
- 函数接口以及链式操作
import java.text.DecimalFormat;
import java.util.function.Function;
class MyMoney {
private final int money;
public MyMoney(int money) {
this.money = money;
}
public void printMoney(Function<Integer, String> moneyFormat) {
System.out.println("我的存款:" + moneyFormat.apply(this.money));
}
}
public class MoneyDemo {
public static void main(String[] args) {
MyMoney me = new MyMoney(99999999);
Function<Integer, String> moneyFormat = i -> new DecimalFormat("#,###")
.format(i);
// 函数接口链式操作
me.printMoney(moneyFormat.andThen(s -> "人民币 " + s));
}
}
运行效果
我的存款:人民币 99,999,999
-
函数式接口
常用的lambda接口
public static void main(String[] args) {
// 断言函数接口
IntPredicate predicate = i -> i > 0;
System.out.println(predicate.test(-9));
//建议使用带类型的接口,这样就不用写泛型了
// IntConsumer
// 消费函数接口
Consumer<String> consumer = s -> System.out.println(s);
consumer.accept("输入的数据");
}
- 方法引用
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.IntUnaryOperator;
/**
* 精诚所至,金石为开。
* 石の上にも三年;陽気の発する所金石亦透る。
* Faith moves mountains.
*
* @author marvin.ma
* @create 2018-07-03 18:58
* @desc ${DESCRIPTION}
**/
class Dog {
private String name = "哮天犬";
/**
* 默认10斤狗粮
*/
private int food = 10;
public Dog() {
}
/**
* 带参数的构造函数
*
* @param name
*/
public Dog(String name) {
this.name = name;
}
/**
* 狗叫,静态方法
*
* @param dog
*/
public static void bark(Dog dog) {
System.out.println(dog + "叫了");
}
/**
* 吃狗粮 JDK
*
* 默认会把当前实例传入到非静态方法,参数名为this,位置是第一个;
*
* @param num
* @return 还剩下多少斤
*/
public int eat(int num) {
System.out.println("吃了" + num + "斤狗粮");
this.food -= num;
return this.food;
}
@Override
public String toString() {
return this.name;
}
}
public class MethodRefrenceDemo {
public static void main(String[] args) {
Dog dog = new Dog();
dog.eat(3);
// 方法引用
Consumer<String> consumer = System.out::println;
consumer.accept("接受的数据");
// 静态方法的方法引用
Consumer<Dog> consumer2 = Dog::bark;
consumer2.accept(dog);
// 非静态方法,使用对象实例的方法引用
// Function<Integer, Integer> function = dog::eat;
// UnaryOperator<Integer> function = dog::eat; //入参出参都是Integer,可以用这个
IntUnaryOperator function = dog::eat;
// dog置空,不影响下面的函数执行,因为java 参数是传值
dog = null;
System.out.println("还剩下" + function.applyAsInt(2) + "斤");
//
// // 使用类名来方法引用
// BiFunction<Dog, Integer, Integer> eatFunction = Dog::eat;
// System.out.println("还剩下" + eatFunction.apply(dog, 2) + "斤");
//
// // 构造函数的方法引用
// Supplier<Dog> supplier = Dog::new;
// System.out.println("创建了新对象:" + supplier.get());
//
// 带参数的构造函数的方法引用
Function<String, Dog> function2 = Dog::new;
System.out.println("创建了新对象:" + function2.apply("旺财"));
// 测试java变量是传值还是穿引用
List<String> list = new ArrayList<>();
test(list);
System.err.println(list);
}
private static void test(List<String> list) {
list = null;
}
}
运行输出:
吃了3斤狗粮
接受的数据
哮天犬叫了
吃了2斤狗粮
还剩下5斤
创建了新对象:旺财
- 类型引用
@FunctionalInterface
interface IMath {
int add(int x, int y);
}
@FunctionalInterface
interface IMath2 {
int sub(int x, int y);
}
public class TypeDemo {
public static void main(String[] args) {
// 变量类型定义
IMath lambda = (x, y) -> x + y;
// 数组里
IMath[] lambdas = { (x, y) -> x + y };
// 强转
Object lambda2 = (IMath) (x, y) -> x + y;
// 通过返回类型
IMath createLambda = createLambda();
TypeDemo demo = new TypeDemo();
// 当有二义性的时候,使用强转对应的接口解决
demo.test( (IMath2)(x, y) -> x + y);
}
public void test(IMath math) {
}
public void test(IMath2 math) {
}
public static IMath createLambda() {
return (x, y) -> x + y;
}
}
- 级联表达式和珂里化
级联表达式:有多个箭头的函数
柯里化:把多个参数的函数转换为只有一个参数的函数
高阶函数:就是返回函数的函数
import java.util.function.Function;
/**
* 级联表达式和柯里化
* 柯里化:把多个参数的函数转换为只有一个参数的函数
* 柯里化的目的:函数标准化
* 高阶函数:就是返回函数的函数
*/
public class CurryDemo {
public static void main(String[] args) {
// 实现了x+y的级联表达式
Function<Integer, Function<Integer, Integer>> fun = x -> y -> x
+ y;
System.out.println(fun.apply(2).apply(3));
Function<Integer, Function<Integer, Function<Integer, Integer>>> fun2 = x -> y -> z -> x
+ y + z;
System.out.println(fun2.apply(2).apply(3).apply(4));
int[] nums = { 2, 3, 4 };
Function f = fun2;
for (int i = 0; i < nums.length; i++) {
if (f instanceof Function) {
Object obj = f.apply(nums[i]);
if (obj instanceof Function) {
f = (Function) obj;
} else {
System.out.println("调用结束:结果为" + obj);
}
}
}
}
}
执行结果
5
9
调用结束:结果为9
第三节 Stream流编程
- demo1
import java.util.stream.IntStream;
/**
* 精诚所至,金石为开。
* 石の上にも三年;陽気の発する所金石亦透る。
* Faith moves mountains.
*
* @author 马海强
* @create 2018-07-03 22:17
* @desc ${DESCRIPTION}
**/
public class StreamDemo1 {
public static void main(String[] args) {
int[] nums = {1, 2, 3};
// 外部迭代
int sum = 0;
for (int i : nums) {
sum += i;
}
System.out.println("结果为:" + sum);
// 使用stream的内部迭代
// map就是中间操作(返回stream的操作)
// sum就是终止操作
System.out.println("结果为:" + IntStream.of(nums).sum());
int sum2 = IntStream.of(nums).map(i -> i*2).sum();
System.out.println("结果为:" + sum2);
System.out.println("惰性求值就是终止没有调用的情况下,中间操作不会执行");
IntStream.of(nums).map(StreamDemo1::doubleNum);
}
public static int doubleNum(int i) {
System.out.println("执行了乘以2");
return i * 2;
}
}
执行结果:
结果为:6
结果为:6
结果为:12
惰性求值就是终止没有调用的情况下,中间操作不会执行
-
demo 2, 流的创建
流的创建方式
public class StreamDemo2 {
public static void main(String[] args) {
List<String> list = new ArrayList<>();
// 从集合创建
list.stream();
list.parallelStream();
// 从数组创建
Arrays.stream(new int[] { 2, 3, 5 });
// 创建数字流
IntStream.of(1, 2, 3);
IntStream.rangeClosed(1, 10);
// 使用random创建一个无限流
new Random().ints().limit(10);
Random random = new Random();
// 自己产生流
Stream.generate(() -> random.nextInt()).limit(20);
}
}
-
demo3,流的中间操作
流的中间操作
import java.util.Random;
import java.util.stream.Stream;
/**
* 精诚所至,金石为开。
* 石の上にも三年;陽気の発する所金石亦透る。
* Faith moves mountains.
*
* @author 马海强
* @create 2018-07-03 22:39
* @desc 流的中间操作
**/
public class StreamDemo3 {
public static void main(String[] args) {
String str = "my name is 007";
//把每个单词的长度调用出来
Stream.of(str.split(" ")).filter(s -> s.length() > 2).map(s -> s.length()).forEach(System.out::println);
//flatMap 适合A元素下面有B属性,B属性是集合,最终得到所有A元素里面的所有B属性的集合
// intStream/longStream 并不是Stream的子类,所以要进行装箱 boxed
Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed())
.forEach(i -> System.out.println((char)i.intValue()));
//peek 用于debug,是个中间操作,与foreach不同,foreach是终止操作
System.out.println("--------------peek------------");
Stream.of(str.split(" ")).peek(System.out::println)
.forEach(System.out::println);
//limit的使用
new Random().ints().filter(i -> i > 100 && i < 1000).limit(10)
.forEach(System.out::println);
}
}
-
stream的终止操作
stream终止操作
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* 精诚所至,金石为开。
* 石の上にも三年;陽気の発する所金石亦透る。
* Faith moves mountains.
*
* @author 马海强
* @create 2018-07-06 13:33
* @desc ${DESCRIPTION}
**/
public class StreamDemo4 {
public static void main(String[] args) {
String str = "my name is 007";
//使用并行流
str.chars().parallel().forEach(i -> System.out.print((char)i));
System.out.println();
// 使用 forEachOrdered 保证顺序
str.chars().parallel().forEachOrdered(i -> System.out.print((char) i));
System.out.println();
//收集到list/set set时最后使用toSet方法
List<String> list = Stream.of(str.split(" ")).collect(Collectors.toList());
System.out.println(list);
//使用reduce拼接字符串
Optional<String> letters = Stream.of(str.split(" ")).reduce((s1, s2) -> s1 + "|" + s2);
System.out.println(letters.orElse(""));
//带初始值的reduce
String reduce = Stream.of(str.split(" ")).reduce("",
(s1, s2) -> s1 + "|" + s2);
System.out.println(reduce);
//计算所有单词总长度
Integer length = Stream.of(str.split(" ")).map(s -> s.length())
.reduce(0, (s1, s2) -> s1 +s2);
System.out.println(length);
//使用max函数
Optional<String> max = Stream.of(str.split(" ")).max(Comparator.comparingInt(String::length));
System.out.println(max.get());
//// 使用 findFirst 短路操作
OptionalInt findFirst = new Random().ints().filter(i -> i > 10000).findFirst();
System.out.println(findFirst.getAsInt());
}
}
运行结果:
immnae7y 0 0 s
my name is 007
[my, name, is, 007]>
my|name|is|007
|my|name|is|007
11
name
113660557
- 并行流 & 串行流知识点
1、调用parallel 产生一个并行流.
2、调用sequential 产生串行流.
3、多次调用 parallel / sequential, 以最后一次调用为准.
4、并行流使用的默认线程池名称: ForkJoinPool.commonPool,默认的线程数是 当前机器的cpu个数.
5、使用这个属性可以修改默认的线程数:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
6、一般地,推荐使用自己的线程池, 不使用默认线程池, 防止任务被阻塞。注意在main函数中调用,要让main函数不退出,等会儿再退出。自己新起的线程要用shutdown关闭,不然会一直执行着。
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(20);
pool.submit(() -> IntStream.range(1, 100).parallel()
.peek(StreamDemo5::debug).count());
pool.shutdown();
synchronized (pool) {
try {
pool.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void debug(int i) {
System.out.println(Thread.currentThread().getName() + " debug " + i);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
运行结果
ForkJoinPool-1-worker-25 debug 65
ForkJoinPool-1-worker-18 debug 31
ForkJoinPool-1-worker-11 debug 90
ForkJoinPool-1-worker-4 debug 15
ForkJoinPool-1-worker-19 debug 56
ForkJoinPool-1-worker-15 debug 81
ForkJoinPool-1-worker-8 debug 21
ForkJoinPool-1-worker-26 debug 40
ForkJoinPool-1-worker-29 debug 43
ForkJoinPool-1-worker-12 debug 93
ForkJoinPool-1-worker-5 debug 28
ForkJoinPool-1-worker-30 debug 78
ForkJoinPool-1-worker-22 debug 96
ForkJoinPool-1-worker-23 debug 87
ForkJoinPool-1-worker-1 debug 6
ForkJoinPool-1-worker-9 debug 12
ForkJoinPool-1-worker-16 debug 18
ForkJoinPool-1-worker-2 debug 3
ForkJoinPool-1-worker-27 debug 48
ForkJoinPool-1-worker-20 debug 37
- 收集器
所收集的对象信息
public class Student {
private String name;
private int age;
private Gender gender;
private Grade grade;
public Student(String name, int age, Gender gender, Grade grade) {
super();
this.name = name;
this.age = age;
this.gender = gender;
this.grade = grade;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public Gender getGender() {
return gender;
}
public void setGender(Gender gender) {
this.gender = gender;
}
public Grade getGrade() {
return grade;
}
public void setGrade(Grade grade) {
this.grade = grade;
}
@Override
public String toString() {
return "[name=" + name + ", age=" + age + ", gender=" + gender
+ ", grade=" + grade + "]";
}
}
/**
* 性别
*/
enum Gender {
MALE, FEMALE
}
/**
* 班级
*/
enum Grade {
ONE, TWO, THREE, FOUR;
}
测试类:
import org.apache.commons.collections.MapUtils;
import java.util.Arrays;
import java.util.IntSummaryStatistics;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 精诚所至,金石为开。
* 石の上にも三年;陽気の発する所金石亦透る。
* Faith moves mountains.
*
* @author 马海强
* @create 2018-07-07 22:37
* @desc 流收集器demo
**/
public class CollectDemo {
public static void main(String[] args) {
// 测试数据
List<Student> students = Arrays.asList(
new Student("小明", 10, Gender.MALE, Grade.ONE),
new Student("大明", 9, Gender.MALE, Grade.THREE),
new Student("小白", 8, Gender.FEMALE, Grade.TWO),
new Student("小黑", 13, Gender.FEMALE, Grade.FOUR),
new Student("小红", 7, Gender.FEMALE, Grade.THREE),
new Student("小黄", 13, Gender.MALE, Grade.ONE),
new Student("小青", 13, Gender.FEMALE, Grade.THREE),
new Student("小紫", 9, Gender.FEMALE, Grade.TWO),
new Student("小王", 6, Gender.MALE, Grade.ONE),
new Student("小李", 6, Gender.MALE, Grade.ONE),
new Student("小马", 14, Gender.FEMALE, Grade.FOUR),
new Student("小刘", 13, Gender.MALE, Grade.FOUR));
// 得到所有学生的年龄列表
// 推荐使用方法引用Student::getAge,不要使用s->s.getAge(),就不会多生成一个类似 lambda$0这样的函数
List<Integer> ages = students.stream().map(Student::getAge).collect(Collectors.toList());
System.out.println("所有学生的年龄:" + ages);
// 统计汇总信息
IntSummaryStatistics agesSummaryStatistics = students.stream().collect(Collectors.summarizingInt(Student::getAge));
System.out.println("年龄汇总信息:" + agesSummaryStatistics);
//分块
Map<Boolean, List<Student>> genders = students.stream().collect(Collectors.partitioningBy(s -> s.getGender() == Gender.MALE));
//System.out.println("男女学生列表:" + genders);
MapUtils.verbosePrint(System.out, "男女学生列表", genders);
//分组,比如分块就是分两组,返回boolean
Map<Grade, List<Student>> grades = students.stream().collect(Collectors.groupingBy(Student::getGrade));
MapUtils.verbosePrint(System.out, "学生班级列表", grades);
//得到所有班级学生的个数
Map<Grade, Long> gradesCount = students.stream().collect(Collectors.groupingBy(Student::getGrade, Collectors.counting()));
MapUtils.verbosePrint(System.out, "学生班级个数列表", gradesCount);
}
}
运行结果:
Connected to the target VM, address: 'javadebug', transport: 'shared memory'
所有学生的年龄:[10, 9, 8, 13, 7, 13, 13, 9, 6, 6, 14, 13]
年龄汇总信息:IntSummaryStatistics{count=12, sum=121, min=6, average=10.083333, max=14}
男女学生列表 =
{
false = [[name=小白, age=8, gender=FEMALE, grade=TWO], [name=小黑, age=13, gender=FEMALE, grade=FOUR], [name=小红, age=7, gender=FEMALE, grade=THREE], [name=小青, age=13, gender=FEMALE, grade=THREE], [name=小紫, age=9, gender=FEMALE, grade=TWO], [name=小马, age=14, gender=FEMALE, grade=FOUR]]
true = [[name=小明, age=10, gender=MALE, grade=ONE], [name=大明, age=9, gender=MALE, grade=THREE], [name=小黄, age=13, gender=MALE, grade=ONE], [name=小王, age=6, gender=MALE, grade=ONE], [name=小李, age=6, gender=MALE, grade=ONE], [name=小刘, age=13, gender=MALE, grade=FOUR]]
}
学生班级列表 =
{
ONE = [[name=小明, age=10, gender=MALE, grade=ONE], [name=小黄, age=13, gender=MALE, grade=ONE], [name=小王, age=6, gender=MALE, grade=ONE], [name=小李, age=6, gender=MALE, grade=ONE]]
FOUR = [[name=小黑, age=13, gender=FEMALE, grade=FOUR], [name=小马, age=14, gender=FEMALE, grade=FOUR], [name=小刘, age=13, gender=MALE, grade=FOUR]]
TWO = [[name=小白, age=8, gender=FEMALE, grade=TWO], [name=小紫, age=9, gender=FEMALE, grade=TWO]]
THREE = [[name=大明, age=9, gender=MALE, grade=THREE], [name=小红, age=7, gender=FEMALE, grade=THREE], [name=小青, age=13, gender=FEMALE, grade=THREE]]
}
Disconnected from the target VM, address: 'javadebug', transport: 'shared memory'
学生班级个数列表 =
{
ONE = 4
FOUR = 3
TWO = 2
THREE = 3
}
- Stream 运行机制
验证:
- 所有操作是链式调用, 一个元素只迭代一次
- 每一个中间操作返回一个新的流. 流里面有一个属性sourceStage
指向同一个 地方,就是Head - Head->nextStage->nextStage->... -> null
- 有状态操作会把无状态操作阶段,单独处理
- 并行环境下, 有状态的中间操作不一定能并行操作
- parallel/ sequetial 这2个操作也是中间操作(也是返回stream)
但是他们不创建流, 他们只修改 Head的并行标志
第三节 reactive stream 响应式流
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
/**
* 精诚所至,金石为开。
* 石の上にも三年;陽気の発する所金石亦透る。
* Faith moves mountains.
*
* @author 马海强
* @create 2018-07-08 0:03
* @desc 响应式流
**/
public class FlowDemo {
public static void main(String[] args) throws InterruptedException {
//1.定义发布者,发布的数据类型是Integer
// 直接使用jdk自带的SubmissionPublisher,它实现了Publisher接口
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
//2. 定义订阅者
Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<Integer>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
// 保存订阅关系,需要用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
// 接收一个数据,并处理
System.out.println("接收到的数据:" + item);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 处理完调用request再请求一个数据
this.subscription.request(1);
//或者 已经达到了目标,调用cancel告诉发布者不再接收数据了
//this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
//出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
//我们可以告诉发布者,后面不接收数据了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理完了!");
}
};
//3. 发布者和订阅者建立订阅关系
publisher.subscribe(subscriber);
//4. 生产数据,并发布 这里忽略数据生产过程
for (int i=0;i<1000;i++) {
System.out.println("生成数据:" + i);
//submit 是个block方法
publisher.submit(i);
}
//5. 结束后 关闭发布者
// 正式环境应该放在finally或者使用try-resource 确保关闭
publisher.close();
//主线程延迟停止, 否则数据没有消费就退出了
Thread.currentThread().join(1000);
//debug的时候,下面这行需要有断点。否则主线程结束无法debug
System.out.println();
}
}
运行效果
生成数据:262
接收到的数据:6
生成数据:263
接收到的数据:7
生成数据:264
接收到的数据:8
生成数据:265
接收到的数据:9
生成数据:266
接收到的数据:10
生成数据:267
接收到的数据:11
生成数据:268
另一个例子
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
/**
* 精诚所至,金石为开。
* 石の上にも三年;陽気の発する所金石亦透る。
* Faith moves mountains.
*
* @author 马海强
* @create 2018-07-08 23:14
* @desc 带 process 的 flow demo
* Processor, 需要继承SubmissionPublisher并实现Processor接口
* 输入源数据 integer, 过滤掉小于0的, 然后转换成字符串发布出去
**/
class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<Integer, String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
// 接受到一个数据, 处理
System.out.println("处理器接受到数据: " + item);
// 过滤掉小于0的, 然后发布出去
if (item > 0) {
this.submit("转换后的数据:" + item);
}
// 处理完调用request再请求一个数据
this.subscription.request(1);
// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理器处理完了!");
// 关闭发布者
this.close();
}
}
public class FlowDemo2 {
public static void main(String[] args) throws Exception {
// 1. 定义发布者, 发布的数据类型是 Integer
// 直接使用jdk自带的SubmissionPublisher
SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
// 2. 定义处理器, 对数据进行过滤, 并转换为String类型
MyProcessor processor = new MyProcessor();
// 3. 发布者 和 处理器 建立订阅关系
publiser.subscribe(processor);
// 4. 定义最终订阅者, 消费 String 类型数据
Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(String item) {
// 接受到一个数据, 处理
System.out.println("接受到数据: " + item);
// 处理完调用request再请求一个数据
this.subscription.request(1);
// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理完了!");
}
};
// 5. 处理器 和 最终订阅者 建立订阅关系
processor.subscribe(subscriber);
// 6. 生产数据, 并发布
// 这里忽略数据生产过程
publiser.submit(-111);
publiser.submit(111);
// 7. 结束后 关闭发布者
// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
publiser.close();
// 主线程延迟停止, 否则数据没有消费就退出
Thread.currentThread().join(1000);
}
}
运行结果:
处理器接受到数据: -111
处理器接受到数据: 111
处理器处理完了!
接受到数据: 转换后的数据:111
处理完了!