Java 9 Reactive Streams
Java 9 Reactive Streams允许我们实现非阻塞异步流处理。这是将响应式编程模型应用于核心java编程的重要一步。
如果您对响应式编程不熟悉,请阅读Reactive Manifesto并阅读Reactive Streams的简短说明。RxJava和Akka Streams一直是十分优秀的响应流实现库。现在java 9已经通过java.util.concurrent.Flow API 引入了响应流支持。
Reactive Streams是关于流的异步处理,因此应该有一个发布者(Publisher)和一个订阅者(Subscriber)。发布者发布数据流,订阅者使用数据。
有时我们必须在Publisher和Subscriber之间转换数据。处理器(Processor)是位于最终发布者和订阅者之间的实体,用于转换从发布者接收的数据,以便订阅者能理解它。我们可以拥有一系列(chain )处理器。
从上面的图中可以清楚地看出,Processor既可以作为订阅者也可以作为发布者。
Java 9 Flow API实现了Reactive Streams规范。Flow API是Iterator和Observer模式的组合。Iterator在pull模型上工作,用于应用程序从源中拉取项目;而Observer在push模型上工作,并在item从源推送到应用程序时作出反应。
Java 9 Flow API订阅者可以在订阅发布者时请求N个项目。然后将项目从发布者推送到订阅者,直到推送玩所有项目或遇到某些错误。
让我们快速浏览一下Flow API类和接口。
java.util.concurrent.Flow:这是Flow API的主要类。该类封装了Flow API的所有重要接口。这是一个final类,我们不能扩展它。
java.util.concurrent.Flow.Publisher:这是一个功能接口,每个发布者都必须实现它的subscribe方法,并添加相关的订阅者以接收消息。
java.util.concurrent.Flow.Subscriber:每个订阅者都必须实现此接口。订阅者中的方法以严格的顺序进行调用。此接口有四种方法:
onSubscribe:这是订阅者订阅了发布者后接收消息时调用的第一个方法。通常我们调用subscription.request开始从处理器(Processor)接收项目。
onNext:当从发布者收到项目时调用此方法,这是我们实现业务逻辑以处理流,然后从发布者请求更多数据的方法。
onError:当发生不可恢复的错误时调用此方法,我们可以在此方法中执行清理操作,例如关闭数据库连接。
onComplete:这就像finally方法,并且在发布者没有发布其他项目或发布者关闭时调用。我们可以用它来发送流成功处理的通知。
java.util.concurrent.Flow.Subscription:这用于在发布者和订阅者之间创建异步非阻塞链接。订阅者调用其request方法来向发布者请求项目。它还有cancel取消订阅的方法,即关闭发布者和订阅者之间的链接。
java.util.concurrent.Flow.Processor:此接口同时扩展了Publisher和Subscriber接口,用于在发布者和订阅者之间转换消息。
java.util.concurrent.SubmissionPublisher:一个Publisher实现,它将提交的项目异步发送给当前订阅者,直到它关闭为止。它使用Executor框架,我们将在响应流示例中使用该类来添加订阅者,然后向其提交项目。
让我们从一个简单的例子开始,我们将实现Flow API Subscriber接口并使用SubmissionPublisher来创建发布者和发送消息。
假设我们有一个Employee类,用于创建从发布者发送到订阅者的流消息。
packagecom.journaldev.reactive.beans;publicclassEmployee{privateintid;privateString name;publicintgetId() {returnid; }publicvoidsetId(intid) {this.id = id; }publicStringgetName() {returnname; }publicvoidsetName(String name) {this.name = name; }publicEmployee(inti, String s) {this.id = i;this.name = s; }publicEmployee() { }@OverridepublicStringtoString() {return"[id="+id+",name="+name+"]"; }}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
我们还有一个实用程序类来为我们的示例创建一个员工列表。
packagecom.journaldev.reactive_streams;importjava.util.ArrayList;importjava.util.List;importcom.journaldev.reactive.beans.Employee;publicclassEmpHelper{publicstaticListgetEmps() { Employee e1 =newEmployee(1,"Pankaj"); Employee e2 =newEmployee(2,"David"); Employee e3 =newEmployee(3,"Lisa"); Employee e4 =newEmployee(4,"Ram"); Employee e5 =newEmployee(5,"Anupam"); List emps =newArrayList<>(); emps.add(e1); emps.add(e2); emps.add(e3); emps.add(e4); emps.add(e5);returnemps; }}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
packagecom.journaldev.reactive_streams;importjava.util.concurrent.Flow.Subscriber;importjava.util.concurrent.Flow.Subscription;importcom.journaldev.reactive.beans.Employee;publicclassMySubscriberimplementsSubscriber {privateSubscription subscription;privateintcounter =0;@OverridepublicvoidonSubscribe(Subscription subscription) { System.out.println("Subscribed");this.subscription = subscription;this.subscription.request(1);//requesting data from publisherSystem.out.println("onSubscribe requested 1 item"); }@OverridepublicvoidonNext(Employee item) { System.out.println("Processing Employee "+item); counter++;this.subscription.request(1); }@OverridepublicvoidonError(Throwable e) { System.out.println("Some error happened"); e.printStackTrace(); }@OverridepublicvoidonComplete() { System.out.println("All Processing Done"); }publicintgetCounter() {returncounter; }}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
Subscription变量以保持引用,以便可以在onNext方法中进行请求。
counter变量以记录处理的项目数,请注意它的值在onNext方法中增加。这将在我们的main方法中用于在结束主线程之前等待执行完成。
在onSubscribe方法中调用订阅请求以开始处理。另请注意,onNext在处理项目后再次调用方法,要求对下一个从发布者发布的项目进行处理。
onError和onComplete在例子中没有太多逻辑,但在实际情况中,它们应该用于在发生错误时执行纠正措施或在处理成功完成时清理资源。
我们将使用SubmissionPublisherPublisher作为示例,让我们看一下响应流实现的测试程序。
packagecom.journaldev.reactive_streams;importjava.util.List;importjava.util.concurrent.SubmissionPublisher;importcom.journaldev.reactive.beans.Employee;publicclassMyReactiveApp{publicstaticvoidmain(String args[])throwsInterruptedException {// Create PublisherSubmissionPublisher publisher =newSubmissionPublisher<>();// Register SubscriberMySubscriber subs =newMySubscriber(); publisher.subscribe(subs); List emps = EmpHelper.getEmps();// Publish itemsSystem.out.println("Publishing Items to Subscriber"); emps.stream().forEach(i -> publisher.submit(i));// logic to wait till processing of all messages are overwhile(emps.size() != subs.getCounter()) { Thread.sleep(10); }// close the Publisherpublisher.close(); System.out.println("Exiting the app"); }}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
在上述代码中,最重要的部分是发布者subscribe和submit方法的调用。我们应该始终关闭发布者以避免任何内存泄漏。
执行上述程序时,我们将得到以下输出。
SubscribedPublishing ItemstoSubscriberonSubscribe requested1itemProcessing Employee [id=1,name=Pankaj]Processing Employee [id=2,name=David]Processing Employee [id=3,name=Lisa]Processing Employee [id=4,name=Ram]Processing Employee [id=5,name=Anupam]ExitingtheappAll Processing Done
1
2
3
4
5
6
7
8
9
10
处理器用于在发布者和订阅者之间转换消息。假设我们有另一个用户希望处理不同类型的消息。假设这个新的消息类型是Freelancer。
packagecom.journaldev.reactive.beans;publicclassFreelancerextendsEmployee{privateintfid;publicintgetFid() {returnfid; }publicvoidsetFid(intfid) {this.fid = fid; }publicFreelancer(intid,intfid, String name) {super(id, name);this.fid = fid; }@OverridepublicStringtoString() {return"[id="+super.getId()+",name="+super.getName()+",fid="+fid+"]"; }}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
我们有一个新订阅者使用Freelancer流数据。
packagecom.journaldev.reactive_streams;importjava.util.concurrent.Flow.Subscriber;importjava.util.concurrent.Flow.Subscription;importcom.journaldev.reactive.beans.Freelancer;publicclassMyFreelancerSubscriberimplementsSubscriber {privateSubscription subscription;privateintcounter =0;@OverridepublicvoidonSubscribe(Subscription subscription) { System.out.println("Subscribed for Freelancer");this.subscription = subscription;this.subscription.request(1);//requesting data from publisherSystem.out.println("onSubscribe requested 1 item for Freelancer"); }@OverridepublicvoidonNext(Freelancer item) { System.out.println("Processing Freelancer "+item); counter++;this.subscription.request(1); }@OverridepublicvoidonError(Throwable e) { System.out.println("Some error happened in MyFreelancerSubscriber"); e.printStackTrace(); }@OverridepublicvoidonComplete() { System.out.println("All Processing Done for MyFreelancerSubscriber"); }publicintgetCounter() {returncounter; }}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
代码重要的部分是实现Processor接口。由于我们想要使用它SubmissionPublisher,我们会扩展它并在适合的地方使用它。
packagecom.journaldev.reactive_streams;importjava.util.concurrent.Flow.Processor;importjava.util.concurrent.Flow.Subscription;importjava.util.concurrent.SubmissionPublisher;importjava.util.function.Function;importcom.journaldev.reactive.beans.Employee;importcom.journaldev.reactive.beans.Freelancer;publicclassMyProcessorextendsSubmissionPublisherimplementsProcessor {privateSubscription subscription;privateFunction function;publicMyProcessor(Function function) {super();this.function = function; }@OverridepublicvoidonSubscribe(Subscription subscription) {this.subscription = subscription; subscription.request(1); }@OverridepublicvoidonNext(Employee emp) { submit((Freelancer) function.apply(emp)); subscription.request(1); }@OverridepublicvoidonError(Throwable e) { e.printStackTrace(); }@OverridepublicvoidonComplete() { System.out.println("Done"); }}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
Function 将用于将Employee对象转换为Freelancer对象。
我们将传入的Employee消息转换为onNext方法中的Freelancer消息,然后使用SubmissionPublisher submit方法将其发送给订阅者。
由于Processor既是订阅者又是发布者,我们可以在终端发布者和订阅者之间创建一系列处理器。
packagecom.journaldev.reactive_streams;importjava.util.List;importjava.util.concurrent.SubmissionPublisher;importcom.journaldev.reactive.beans.Employee;importcom.journaldev.reactive.beans.Freelancer;publicclassMyReactiveAppWithProcessor{publicstaticvoidmain(String[] args)throwsInterruptedException {// Create End PublisherSubmissionPublisher publisher =newSubmissionPublisher<>();// Create ProcessorMyProcessor transformProcessor =newMyProcessor(s -> {returnnewFreelancer(s.getId(), s.getId() +100, s.getName()); });//Create End SubscriberMyFreelancerSubscriber subs =newMyFreelancerSubscriber();//Create chain of publisher, processor and subscriberpublisher.subscribe(transformProcessor);// publisher to processortransformProcessor.subscribe(subs);// processor to subscriberList emps = EmpHelper.getEmps();// Publish itemsSystem.out.println("Publishing Items to Subscriber"); emps.stream().forEach(i -> publisher.submit(i));// Logic to wait for messages processing to finishwhile(emps.size() != subs.getCounter()) { Thread.sleep(10); }// Closing publisherspublisher.close(); transformProcessor.close(); System.out.println("Exiting the app"); }}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
阅读程序中的注释以正确理解它,最重要的变化是发布者 - 处理器 - 订阅者链的创建。执行上述程序时,我们将得到以下输出。
SubscribedforFreelancerPublishing ItemstoSubscriberonSubscribe requested1itemforFreelancerProcessing Freelancer [id=1,name=Pankaj,fid=101]Processing Freelancer [id=2,name=David,fid=102]Processing Freelancer [id=3,name=Lisa,fid=103]Processing Freelancer [id=4,name=Ram,fid=104]Processing Freelancer [id=5,name=Anupam,fid=105]ExitingtheappAll Processing DoneforMyFreelancerSubscriberDone
1
2
3
4
5
6
7
8
9
10
11
我们可以使用Subscription cancel方法停止在订阅者中接收消息。
以下是一个示例代码,其中订阅者只消费3条消息,然后取消订阅。
@OverridepublicvoidonNext(Employee item) { System.out.println("Processing Employee "+item); counter++;if(counter==3) {this.subscription.cancel();return; }this.subscription.request(1);}
1
2
3
4
5
6
7
8
9
10
请注意,在这种情况下,我们在处理所有消息之前停止主线程的逻辑将进入无限循环。我们可以为此场景添加一些额外的逻辑,如果订阅者已停止处理或取消订阅,就使用一些全局变量来标志该状态。
当发布者以比订阅者消费更快的速度生成消息时,会产生背压。Flow API不提供任何关于背压或处理它的信号的机制。但我们可以设计自己的策略来处理它,例如微调用户或降低信息产生率。您可以阅读RxJava deals with Back Pressure。
Java 9 Flow API是响应式编程和创建异步非阻塞应用程序的良好举措。但是,只有在所有系统API都支持它时,才能创建真正的响应式应用程序。
原文地址:Java 9 Reactive Streams written by Pankaj
完整代码:Github