Flutter异步编程之 Future/Isolate
前言
Flutter是用Dart实现的,在Dart中没有线程和进程的概念,我们编程使用多线程一般实现两种场景,一种是异步执行,一种是并行执行。那么如何在Flutter上实现异步编程呢。本篇文章将主要讨论以下问题:
1、Dart如何实现异步编程?
2、Event Loops是什么?
3、Isolate是什么呢?
4、如何实现Isolate?
5、Isolate底层原理是什么?
同步和异步
我们在写Dart代码的时候,就只有两种代码,
同步代码:就是一行行写下来的代码
异步代码:就是以Future等修饰的代码
并不是指的我们平常异步,这两种代码的区别只有一个:代码运行的顺序是不同的,先运行同步代码,再运行异步代码,顺序执行。
而异步代码是运行在Event loop里的,Event loops就是事件循环机制。
Event loop运行示意图
这个很好理解,事件events加到Event queue里,Event loop循环从Event queue里取Event执行。
Event loop 流程图
从这里看到,启动app(start app)后:
先查看MicroTask queue是不是空的,不是的话,先运行microtask一个microtask运行完后,会看有没有下一个microtask,直到Microtask queue空了之后,才会去运行Event queue在Evnet queue取出一个event task运行完后,又会跑到第一步,去运行microtask
这里多了两个名词:MicroTask和Event,这代表了两个不同的异步task
而且可以看出:
如果想让任务能够尽快执行,就用MicroTask。
MicroTask
是dart:async提供的异步方法,主要实现在schedule_microtask.dart中,使用方式
scheduleMicrotask((){
// ...code goes here...
});
或者
new Future.microtask((){
// ...code goes here...
});
Event
Event我们就很熟悉的,就是Future修饰的异步方法,使用方式
new Future(() {
// ...code goes here...
});
对 Future 的理解
1、Future对象(futures)表示异步操作的结果,进程或者IO会延迟完成
2、可以在async函数中使用await来挂起执行,返回一个Future对象
3、在async函数中使用try-catch来捕获异常(或者使用catchError())
4、await只能在async中使用
Future示例代码
// Future示例代码:
void futureTest() {
print("future start");
Future.wait([
// 2秒后返回结果
Future.delayed(new Duration(seconds: 2), () {
print("hello");
return "hello";
}),
// 4秒后返回结果
Future.delayed(new Duration(seconds: 4), () {
print("world");
return " world";
}),
// 4秒后返回结果
Future.delayed(new Duration(seconds: 6), () {
print("!");
return " !";
})
]).then((results) {
// 上面的两个任务执行完毕后进入
print("future finish");
}).catchError((e) {
// 执行失败会走到这里
print(e);
}).whenComplete(() {
// 无论成功或失败都会走到这里
});
}
/// 打印结果
future start
hello
world
!
future finish
Event loop示例代码,根据Event loop的执行流程,请问如下代码打印顺序是什么样的?
void eventLoopTest() {
print('eventLoopTest #1 of 2');
scheduleMicrotask(() => print('microtask #1 of 3'));
//使用delay方式,是将此task放到queue的尾部,
//若前面有耗时操作,不一定能准时执行
new Future.delayed(
new Duration(seconds: 1), () => print('future #1 (delayed)'));
//使用then,是表示在此task执行后立刻执行
new Future(() => print('future #2 of 4'))
.then((_) => print('future #2a'))
.then((_) {
print('future #2b');
scheduleMicrotask(() => print('microtask #0 (from future #2b)'));
}).then((_) => print('future #2c'));
scheduleMicrotask(() => print('microtask #2 of 3'));
new Future(() => print('future #3 of 4'))
.then((_) => new Future(() => print('future #3a (a new future)')))
.then((_) => print('future #3b'));
new Future(() => print('future #4 of 4')).then((_) {
new Future(() => print('future #4a'));
}).then((_) => print('future #4b'));
scheduleMicrotask(() => print('microtask #3 of 3'));
print('eventLoopTest #2 of 2');
}
//打印结果
isolateTest #1 of 2
isolateTest #2 of 2
microtask #1 of 3
microtask #2 of 3
microtask #3 of 3
future #2 of 4
future #2a
future #2b
future #2c
microtask #0 (from future #2b)
future #3 of 4
future #4 of 4
future #4b
future #3a (a new future)
future #3b
future #4a
future #1 (delayed)
关键点
1、Future.delayed需要延迟执行的,是在延迟时间到了之后才将此task加到event queue的队尾,所以万一前面有很耗时的任务,那么你的延迟task不一定能准时运行。
2、Future.then每次都会返回一个Future,默认是其本身。如果在then中函数也返回一个新的Future,则新Future会重新加入到event queue中等待执行
3、一个event task运行完后,会先去查看Micro queue里有没有可以执行的micro task。没有的话,在执行下一个event task
我们知道 Dart 是单线程异步编程模型 ,Future解决了异步执行的问题。但是并行执行怎么处理呢?
Flutter引擎架构
Flutter体系图
我们只关注线程相关信息
1、Framework:我们直接接触的层级
2、Engine:Dart Isolate Setup, 创建Isolate,类似于DartVM中的线程,他的架构就是一个循环:event loops。但这一层并不创建及管理线程,它要求Embeder提供四个Task Runner,类似于线程,并不是真正的线程。
3、Embedder:Thread Setup,真正的线程创建及管理者,Embeder指的是将引擎移植到平台的中间层代码。
Task runners
Task runner
Embedder将自己管理的线程作为 task runner 提供给Flutter 引擎。
主要的 task runner 有:
Platform Task Runner
UI Task Runner
GPU Task Runner
IO Task Runner
Isolate
Dart是一个单线程语言,它的"线程"概念被称为 Isolate,中文意思是隔离。
-
特点:
1 、它与我们之前理解的 Thread 概念有所不同,各个 isolate 之间是无法共享内存空间。
2、Isolate是完全是独立的执行线,每个都有自己的 event loop。只能通过 Port 传递消息,所以它的资源开销低于线程。
3、Dart中的线程可以理解为微线程。
4、Future实现异步串行多个任务;Isolate可以实现异步并行多个任务 -
作用:
Flutter的代码都是默认跑在root isolate上的,将非常耗时的任务添加到event loop后,会拖慢整个事件循环的处理,甚至是阻塞。可见基于Event loop的异步模型仍然是有很大缺点的,这时候我们就需要Isolate。 -
使用场景
Dart中使用多线程计算的时候,在创建Isolate以及线程间数据传递中耗时要超过单线程,每当我们创建出来一个新的 Isolate 至少需要 2mb 左右的空间甚至更多,因此Isolate有合适的使用场景,不建议滥用Isolate。那么应该在什么时候使用Future,什么时候使用Isolate呢?
一个最简单的判断方法是根据某些任务的平均时间来选择:
方法执行在几毫秒或十几毫秒左右的,应使用Future
如果一个任务需要几百毫秒或之上的,则建议创建单独的Isolate
一些常见的可以参考的场景
JSON 解码
数据加密
图像处理
网络请求:加载资源、图片
如何使用Isolate
Isolate由一对Port分别由用于接收消息的ReceivePort对象,和用于发送消息的SendPort对象构成。其中SendPort对象不用单独创建,它已经包含在ReceivePort对象之中。需要注意,一对Port对象只能单向发消息,这就如同一根自来水管,ReceivePort和SendPort分别位于水管的两头,水流只能从SendPort这头流向ReceivePort这头。因此,两个Isolate之间的消息通信肯定是需要两根这样的水管的,这就需要两对Port对象。
1、Dart中创建
我们可以通过 Isolate.spawn 创建一个 isolate。
static Future<Isolate> spawn<T>(void entryPoint(T message),T message);
当我们调用 Isolate.spawn 的时候,它将会返回一个对 isolate 的引用的 Future。我们可以通过这个 isolate 来控制创建出的 Isolate,例如pause、resume、kill 等等。
- entryPoint:这里传入我们想要在其他 isolate 中执行的方法,入参是一个任意类型的 message。entryPoint 只能是顶层方法或静态方法,且返回值为 void。
- message:创建 Isolate 第一个调用方法的入参,可以是任意值。
但是在此之前我们必须要创建两个 isolate 之间沟通的桥梁。
import 'dart:isolate';
import 'dart:io';
void main() {
print("main isolate start");
create_isolate();
print("main isolate end");
}
// 创建一个新的 isolate
create_isolate() async{
ReceivePort rp = new ReceivePort();
SendPort port1 = rp.sendPort;
Isolate newIsolate = await Isolate.spawn(doWork, port1);
SendPort port2;
rp.listen((message){
print("main isolate message: $message");
if (message[0] == 0){
port2 = message[1];
}else{
port2?.send([1,"这条信息是 main isolate 发送的"]);
}
});
}
// 处理耗时任务
static void doWork(SendPort port1){
print("new isolate start");
ReceivePort rp2 = new ReceivePort();
SendPort port2 = rp2.sendPort;
rp2.listen((message){
print("doWork message: $message");
});
// 将新isolate中创建的SendPort发送到主isolate中用于通信
port1.send([0, port2]);
// 模拟耗时5秒
sleep(Duration(seconds:5));
port1.send([1, "doWork 任务完成"]);
print("new isolate end");
}
//运行结果
main isolate start
main isolate end
new isolate start
main isolate message: [0, SendPort]
new isolate end
main isolate message: [1, doWork 任务完成]
doWork message: [1, 这条信息是 main isolate 发送的]
运行后都会创建一个是新Isolate的微进程,新的Isolate和主Isolate都双向绑定了消息通信的通道,即使新的Isolate中的任务完成了,它的微进程也不会立刻退出,因此,当使用完自己创建的Isolate后,最好调用newIsolate.kill(priority: Isolate.immediate);将Isolate立即杀死。
2、Flutter中创建
如果想在Flutter中创建Isolate,则有更简便的API,这是由Flutter官方进一步封装ReceivePort而提供的更简洁API。使用compute函数来创建新的Isolate并执行耗时任务。
import 'package:flutter/foundation.dart';
import 'dart:io';
// 创建一个新的Isolate,在其中运行任务doWork
create_new_task() async{
var str = "New Task";
var result = await compute(doWork, str);
print(result);
}
static String doWork(String value){
print("new isolate doWork start");
// 模拟耗时5秒
sleep(Duration(seconds:5));
print("new isolate doWork end");
return "complete:$value";
}
compute函数有两个必须的参数,第一个是待执行的函数,这个函数必须是一个顶级函数或静态方法,不能是类的实例方法,第二个参数为动态的消息类型,可以是被运行函数的参数。
需要注意,使用compute应导入'package:flutter/foundation.dart'包。
实现线程管理器
- 线程管理器ThreadManagment
import 'dart:isolate';
typedef LikeCallback = void Function(Object value);
class ThreadManagement {
//entryPoint 必须是静态方法
static Future<Map> runTask (void entryPoint(SendPort message), LikeCallback(Object value),{Object parameter})async{
final response = ReceivePort();
Isolate d = await Isolate.spawn(entryPoint, response.sendPort);
// 调用sendReceive自定义方法
if(parameter!=null){
SendPort sendPort = await response.first;
ReceivePort receivePort = ReceivePort();
sendPort.send([parameter, receivePort.sendPort]);
receivePort.listen((value){
receivePort.close();
d.kill();
LikeCallback(value);
});
return {
'isolate': d,
"receivePort":receivePort,
};
}else{
response.listen((value){
response.close();
d.kill();
LikeCallback(value);
});
return {
'isolate': d,
"receivePort":response,
};
}
}
}
- 耗时的任务
// 无参数的任务
static void getNoParamTask(SendPort port) async {
var c = await Future.delayed(Duration(seconds: 1), () {
return "banner data";
});
port.send(c);
}
// 需要参数的任务
static getParamsTask(SendPort port) async {
ReceivePort receivePort = ReceivePort();
port.send(receivePort.sendPort);
// 监听外界调用
await for (var msg in receivePort) {
Map requestURL =msg[0];
SendPort callbackPort =msg[1];
receivePort.close();
var res = await Future.delayed(Duration(seconds: 1), () {
var requestUrl = requestURL["type"];
var after = requestURL["after"];
return "url = $requestUrl, after = $after";
});
callbackPort.send(res);
}
}
- 执行耗时任务
// 调用无参数的任务
ThreadManagement.runTask(API.getNoParamTask, (value){
if(value != null){
//业务逻辑
print(value);
}
});
//调用有参数的任务
ThreadManagement.runTask(API.getParamsTask, (value){
if(value != null){
//业务逻辑
print(value);
}
}, parameter: {
"type":"hot",
"after":"1"
});
线程池
如何减少 isolate 创建所带来的消耗。我们可以创建一个线程池,初始化到那里。当我们需要使用的时候再拿来用就好了。
LoadBalancer是 dart team 已经为我们写好一个非常实用的 package。
我们现在 pubspec.yaml中添加 isolate 的依赖。
isolate: ^2.0.3
我们可以通过 LoadBalancer 创建出指定个数的 isolate。
Future<LoadBalancer> loadBalancer = LoadBalancer.create(2, IsolateRunner.spawn);
这段代码将会创建出一个 isolate 线程池,并自动实现了负载均衡。
下面我们再来看看应该如何使用 LoadBalancer 中的 isolate。
void testBalancer() async {
final lb = await loadBalancer;
int res = await lb.run(doWork, 110);
print(res);
}
int doWork(int value) {
// 模拟耗时5秒
print("new isolate doWork start");
sleep(Duration(seconds: 5));
return value;
}
//打印数据
new isolate doWork start
110
我们关注的只有
Future<R> run<R, P>(FutureOr<R> function(P argument), argument,)
方法。我们还是需要传入一个 function 在某个 isolate 中运行,并传入其参数 argument。run 方法将会返回我们执行方法的返回值。
整体和 compute 使用上差不多,但是当我们多次使用额外的 isolate 的时候,不再需要重复创建了。
并且 LoadBalancer 还支持 runMultiple,可以让一个方法在多线程中执行。
LoadBalancer 经过测试,它会在第一次使用其 isolate 的时候初始化线程池。
当应用打开后,即使我们在顶层函数中调用了 LoadBalancer.create,但是还是只会有一个 Isolate。
当我们调用 run 方法时,才真正创建出了实际的 isolate。
参考文章:
Dart 函数、箭头函数、匿名函数、立即执行函数及闭包
Flutter中的异步编程——Future
Flutter进阶Future异步详解
Flutter异步编程
深入了解Flutter的isolate
Flutter/Dart中的异步编程之Isolate
isolate