Flutter异步编程之 Future/Isolate
前言
Flutter是用Dart实现的,在Dart中没有线程和进程的概念,我们编程使用多线程一般实现两种场景,一种是异步执行,一种是并行执行。那么如何在Flutter上实现异步编程呢。本篇文章将主要讨论以下问题:
1、Dart如何实现异步编程?
2、Event Loops是什么?
3、Isolate是什么呢?
4、如何实现Isolate?
5、Isolate底层原理是什么?
同步和异步
我们在写Dart代码的时候,就只有两种代码,
同步代码:就是一行行写下来的代码
异步代码:就是以Future等修饰的代码
并不是指的我们平常异步,这两种代码的区别只有一个:代码运行的顺序是不同的,先运行同步代码,再运行异步代码,顺序执行。
而异步代码是运行在Event loop里的,Event loops就是事件循环机制。
Event loop运行示意图这个很好理解,事件events加到Event queue里,Event loop循环从Event queue里取Event执行。
Event loop 流程图从这里看到,启动app(start app)后:
先查看MicroTask queue是不是空的,不是的话,先运行microtask
一个microtask运行完后,会看有没有下一个microtask,直到Microtask queue空了之后,才会去运行Event queue
在Evnet queue取出一个event task运行完后,又会跑到第一步,去运行microtask
这里多了两个名词:MicroTask和Event,这代表了两个不同的异步task
而且可以看出:
如果想让任务能够尽快执行,就用MicroTask。
MicroTask
是dart:async
提供的异步方法,主要实现在schedule_microtask.dart
中,使用方式
scheduleMicrotask((){
// ...code goes here...
});
或者
new Future.microtask((){
// ...code goes here...
});
Event
Event我们就很熟悉的,就是Future修饰的异步方法,使用方式
new Future(() {
// ...code goes here...
});
对 Future 的理解
1、Future对象(futures)表示异步操作的结果,进程或者IO会延迟完成
2、可以在async函数中使用await来挂起执行,返回一个Future对象
3、在async函数中使用try-catch来捕获异常(或者使用catchError())
4、await只能在async中使用
Future示例代码
// Future示例代码:
void futureTest() {
print("future start");
Future.wait([
// 2秒后返回结果
Future.delayed(new Duration(seconds: 2), () {
print("hello");
return "hello";
}),
// 4秒后返回结果
Future.delayed(new Duration(seconds: 4), () {
print("world");
return " world";
}),
// 4秒后返回结果
Future.delayed(new Duration(seconds: 6), () {
print("!");
return " !";
})
]).then((results) {
// 上面的两个任务执行完毕后进入
print("future finish");
}).catchError((e) {
// 执行失败会走到这里
print(e);
}).whenComplete(() {
// 无论成功或失败都会走到这里
});
}
/// 打印结果
future start
hello
world
!
future finish
Event loop示例代码,根据Event loop的执行流程,请问如下代码打印顺序是什么样的?
void eventLoopTest() {
print('eventLoopTest #1 of 2');
scheduleMicrotask(() => print('microtask #1 of 3'));
//使用delay方式,是将此task放到queue的尾部,
//若前面有耗时操作,不一定能准时执行
new Future.delayed(
new Duration(seconds: 1), () => print('future #1 (delayed)'));
//使用then,是表示在此task执行后立刻执行
new Future(() => print('future #2 of 4'))
.then((_) => print('future #2a'))
.then((_) {
print('future #2b');
scheduleMicrotask(() => print('microtask #0 (from future #2b)'));
}).then((_) => print('future #2c'));
scheduleMicrotask(() => print('microtask #2 of 3'));
new Future(() => print('future #3 of 4'))
.then((_) => new Future(() => print('future #3a (a new future)')))
.then((_) => print('future #3b'));
new Future(() => print('future #4 of 4')).then((_) {
new Future(() => print('future #4a'));
}).then((_) => print('future #4b'));
scheduleMicrotask(() => print('microtask #3 of 3'));
print('eventLoopTest #2 of 2');
}
//打印结果
isolateTest #1 of 2
isolateTest #2 of 2
microtask #1 of 3
microtask #2 of 3
microtask #3 of 3
future #2 of 4
future #2a
future #2b
future #2c
microtask #0 (from future #2b)
future #3 of 4
future #4 of 4
future #4b
future #3a (a new future)
future #3b
future #4a
future #1 (delayed)
关键点
1、Future.delayed需要延迟执行的,是在延迟时间到了之后才将此task加到event queue的队尾,所以万一前面有很耗时的任务,那么你的延迟task不一定能准时运行。
2、Future.then每次都会返回一个Future,默认是其本身。如果在then中函数也返回一个新的Future,则新Future会重新加入到event queue中等待执行
3、一个event task运行完后,会先去查看Micro queue里有没有可以执行的micro task。没有的话,在执行下一个event task
我们知道 Dart 是单线程异步编程模型 ,Future解决了异步执行的问题。但是并行执行怎么处理呢?
Flutter引擎架构
Flutter体系图我们只关注线程相关信息
1、Framework:我们直接接触的层级
2、Engine:Dart Isolate Setup, 创建Isolate,类似于DartVM中的线程,他的架构就是一个循环:event loops。但这一层并不创建及管理线程,它要求Embeder提供四个Task Runner,类似于线程,并不是真正的线程。
3、Embedder:Thread Setup,真正的线程创建及管理者,Embeder指的是将引擎移植到平台的中间层代码。
Task runners
Task runnerEmbedder将自己管理的线程作为 task runner 提供给Flutter 引擎。
主要的 task runner 有:
Platform Task Runner
UI Task Runner
GPU Task Runner
IO Task Runner
Isolate
Dart是一个单线程语言,它的"线程"概念被称为 Isolate
,中文意思是隔离。
-
特点:
1 、它与我们之前理解的 Thread 概念有所不同,各个 isolate 之间是无法共享内存空间。
2、Isolate是完全是独立的执行线,每个都有自己的 event loop。只能通过 Port 传递消息,所以它的资源开销低于线程。
3、Dart中的线程可以理解为微线程。
4、Future实现异步串行多个任务;Isolate可以实现异步并行多个任务
-
作用:
Flutter的代码都是默认跑在root isolate上的,将非常耗时的任务添加到event loop后,会拖慢整个事件循环的处理,甚至是阻塞。可见基于Event loop的异步模型仍然是有很大缺点的,这时候我们就需要Isolate。
-
使用场景
Dart中使用多线程计算的时候,在创建Isolate以及线程间数据传递中耗时要超过单线程,每当我们创建出来一个新的 Isolate 至少需要 2mb 左右的空间甚至更多,因此Isolate有合适的使用场景,不建议滥用Isolate。那么应该在什么时候使用Future,什么时候使用Isolate呢?
一个最简单的判断方法是根据某些任务的平均时间来选择:
方法执行在几毫秒或十几毫秒左右的,应使用Future
如果一个任务需要几百毫秒或之上的,则建议创建单独的Isolate
一些常见的可以参考的场景
JSON 解码
数据加密
图像处理
网络请求:加载资源、图片
如何使用Isolate
Isolate由一对Port分别由用于接收消息的ReceivePort
对象,和用于发送消息的SendPort
对象构成。其中SendPort
对象不用单独创建,它已经包含在ReceivePort
对象之中。需要注意,一对Port对象只能单向发消息,这就如同一根自来水管,ReceivePort
和SendPort
分别位于水管的两头,水流只能从SendPort
这头流向ReceivePort
这头。因此,两个Isolate之间的消息通信肯定是需要两根这样的水管的,这就需要两对Port对象。
1、Dart中创建
我们可以通过 Isolate.spawn
创建一个 isolate。
static Future<Isolate> spawn<T>(void entryPoint(T message),T message);
当我们调用 Isolate.spawn
的时候,它将会返回一个对 isolate 的引用的 Future。我们可以通过这个 isolate 来控制创建出的 Isolate,例如pause、resume、kill
等等。
- entryPoint:这里传入我们想要在其他 isolate 中执行的方法,入参是一个任意类型的 message。entryPoint 只能是顶层方法或静态方法,且返回值为 void。
- message:创建 Isolate 第一个调用方法的入参,可以是任意值。
但是在此之前我们必须要创建两个 isolate 之间沟通的桥梁。
import 'dart:isolate';
import 'dart:io';
void main() {
print("main isolate start");
create_isolate();
print("main isolate end");
}
// 创建一个新的 isolate
create_isolate() async{
ReceivePort rp = new ReceivePort();
SendPort port1 = rp.sendPort;
Isolate newIsolate = await Isolate.spawn(doWork, port1);
SendPort port2;
rp.listen((message){
print("main isolate message: $message");
if (message[0] == 0){
port2 = message[1];
}else{
port2?.send([1,"这条信息是 main isolate 发送的"]);
}
});
}
// 处理耗时任务
static void doWork(SendPort port1){
print("new isolate start");
ReceivePort rp2 = new ReceivePort();
SendPort port2 = rp2.sendPort;
rp2.listen((message){
print("doWork message: $message");
});
// 将新isolate中创建的SendPort发送到主isolate中用于通信
port1.send([0, port2]);
// 模拟耗时5秒
sleep(Duration(seconds:5));
port1.send([1, "doWork 任务完成"]);
print("new isolate end");
}
//运行结果
main isolate start
main isolate end
new isolate start
main isolate message: [0, SendPort]
new isolate end
main isolate message: [1, doWork 任务完成]
doWork message: [1, 这条信息是 main isolate 发送的]
运行后都会创建一个是新Isolate的微进程,新的Isolate和主Isolate都双向绑定了消息通信的通道,即使新的Isolate中的任务完成了,它的微进程也不会立刻退出,因此,当使用完自己创建的Isolate后,最好调用newIsolate.kill(priority: Isolate.immediate);
将Isolate立即杀死。
2、Flutter中创建
如果想在Flutter中创建Isolate,则有更简便的API,这是由Flutter官方进一步封装ReceivePort
而提供的更简洁API。使用compute
函数来创建新的Isolate并执行耗时任务。
import 'package:flutter/foundation.dart';
import 'dart:io';
// 创建一个新的Isolate,在其中运行任务doWork
create_new_task() async{
var str = "New Task";
var result = await compute(doWork, str);
print(result);
}
static String doWork(String value){
print("new isolate doWork start");
// 模拟耗时5秒
sleep(Duration(seconds:5));
print("new isolate doWork end");
return "complete:$value";
}
compute
函数有两个必须的参数,第一个是待执行的函数,这个函数必须是一个顶级函数或静态方法,不能是类的实例方法,第二个参数为动态的消息类型,可以是被运行函数的参数。
需要注意,使用compute
应导入'package:flutter/foundation.dart'
包。
实现线程管理器
- 线程管理器ThreadManagment
import 'dart:isolate';
typedef LikeCallback = void Function(Object value);
class ThreadManagement {
//entryPoint 必须是静态方法
static Future<Map> runTask (void entryPoint(SendPort message), LikeCallback(Object value),{Object parameter})async{
final response = ReceivePort();
Isolate d = await Isolate.spawn(entryPoint, response.sendPort);
// 调用sendReceive自定义方法
if(parameter!=null){
SendPort sendPort = await response.first;
ReceivePort receivePort = ReceivePort();
sendPort.send([parameter, receivePort.sendPort]);
receivePort.listen((value){
receivePort.close();
d.kill();
LikeCallback(value);
});
return {
'isolate': d,
"receivePort":receivePort,
};
}else{
response.listen((value){
response.close();
d.kill();
LikeCallback(value);
});
return {
'isolate': d,
"receivePort":response,
};
}
}
}
- 耗时的任务
// 无参数的任务
static void getNoParamTask(SendPort port) async {
var c = await Future.delayed(Duration(seconds: 1), () {
return "banner data";
});
port.send(c);
}
// 需要参数的任务
static getParamsTask(SendPort port) async {
ReceivePort receivePort = ReceivePort();
port.send(receivePort.sendPort);
// 监听外界调用
await for (var msg in receivePort) {
Map requestURL =msg[0];
SendPort callbackPort =msg[1];
receivePort.close();
var res = await Future.delayed(Duration(seconds: 1), () {
var requestUrl = requestURL["type"];
var after = requestURL["after"];
return "url = $requestUrl, after = $after";
});
callbackPort.send(res);
}
}
- 执行耗时任务
// 调用无参数的任务
ThreadManagement.runTask(API.getNoParamTask, (value){
if(value != null){
//业务逻辑
print(value);
}
});
//调用有参数的任务
ThreadManagement.runTask(API.getParamsTask, (value){
if(value != null){
//业务逻辑
print(value);
}
}, parameter: {
"type":"hot",
"after":"1"
});
线程池
如何减少 isolate 创建所带来的消耗。我们可以创建一个线程池,初始化到那里。当我们需要使用的时候再拿来用就好了。
LoadBalancer
是 dart team 已经为我们写好一个非常实用的 package。
我们现在 pubspec.yaml
中添加 isolate 的依赖。
isolate: ^2.0.3
我们可以通过 LoadBalancer
创建出指定个数的 isolate。
Future<LoadBalancer> loadBalancer = LoadBalancer.create(2, IsolateRunner.spawn);
这段代码将会创建出一个 isolate 线程池,并自动实现了负载均衡。
下面我们再来看看应该如何使用 LoadBalancer
中的 isolate。
void testBalancer() async {
final lb = await loadBalancer;
int res = await lb.run(doWork, 110);
print(res);
}
int doWork(int value) {
// 模拟耗时5秒
print("new isolate doWork start");
sleep(Duration(seconds: 5));
return value;
}
//打印数据
new isolate doWork start
110
我们关注的只有
Future<R> run<R, P>(FutureOr<R> function(P argument), argument,)
方法。我们还是需要传入一个 function 在某个 isolate 中运行,并传入其参数 argument
。run 方法将会返回我们执行方法的返回值。
整体和 compute
使用上差不多,但是当我们多次使用额外的 isolate 的时候,不再需要重复创建了。
并且 LoadBalancer
还支持 runMultiple
,可以让一个方法在多线程中执行。
LoadBalancer
经过测试,它会在第一次使用其 isolate 的时候初始化线程池。
当应用打开后,即使我们在顶层函数中调用了 LoadBalancer.create
,但是还是只会有一个 Isolate。
当我们调用 run 方法时,才真正创建出了实际的 isolate。
参考文章:
Dart 函数、箭头函数、匿名函数、立即执行函数及闭包
Flutter中的异步编程——Future
Flutter进阶Future异步详解
Flutter异步编程
深入了解Flutter的isolate
Flutter/Dart中的异步编程之Isolate
isolate