Dart基础之Isolate
Dart基础之Isolate
背景
在其他语言中为了高效利用多核CPU,通常使用多线程并行来实现并发执行代码,通过共享数据来保证多线程之间的协同,但这种模式衍生出了很多问题,开辟线程带来资源消耗,数据共享代理死锁问题。
不论是APP还是Web端,CPU大多数时间是处于空闲状态的,一般不需要密集和高并发的处理。Dart作为面向前端开发设计的语言,在并发设计上没有采用多线程方案,而是使用了Isolate(隔离区)这种单线程模型来解决并发任务对于多线程的依赖。
Isolate组成
每个Isolate由以下几部分组成:
(1)Stack用于存放函数调用上下文和调用链路信息。
(2)Heap用于存放对象,堆内存回收管理和java类似。
(3)用于存放异步回调的Queue,分为微事件队列(MicroTaskQueue)和微任务队列(EventQueue)。
(4)以及一个用于处理异步回调的EventLoop。
Isolate运行代码
在dart中编写的代码分为两种类型:
同步代码:即正常编写的代码。
异步代码:一些返回类型为Future和Stream的函数。
由于Isolate是一种单线程模型,代码运行时碰到异步代码会将其丢到Queue中,只按顺序运行同步代码。等同步代码都执行完成后才会按顺序执行异步任务。
下面写demo验证下这个结论,下图的Future.delayed就是异步代码。
Future printc(){
Future.delayed(new Duration(seconds: 2),(){
print("a");// 异步代码的的回调会在所有同步代码执行完毕后才开始执行
});
Future.delayed(new Duration(seconds: 1),(){
print("b");// 异步代码的的回调会在所有同步代码执行完毕后才开始执行
});
print("c");
var start = DateTime.now();
for(int i=0;i<100000;i++){
for(int j=0;j<100000;j++){
i*j+j*i-j*i;
}
}
var end = DateTime.now();
print("同步耗时任务:${end.second-start.second}秒");
print("d");
}
执行结果为:
c
同步耗时任务:11秒
d
b
a
解释下结果,打印a和b的异步任务会插入到Queue中,其回调目前也不会执行。运行同步代码,所以首先打印的是c,之后等同步耗时任务执行完成后再打印d。同步代码执行完成后EventLoop会从Queue中提取异步代码执行,虽然异步任务a在前面,但其延时时间长,所以先打印了b。
同步等待:在调用异步代码时(比如Future)使用await关键字,在方法声明处使用async关键字。
如果就是想先运行异步任务再执行同步代码则可以使用await来运行异步任务并阻塞同步代码。异步任务执行完毕后会继续执行同步代码。
Future printc() async{
await Future.delayed(new Duration(seconds: 2),(){
print("a");// 异步代码的的回调会在所有同步代码执行完毕后才开始执行
});
await Future.delayed(new Duration(seconds: 1),(){
print("b");// 异步代码的的回调会在所有同步代码执行完毕后才开始执行
});
print("c");
var start = DateTime.now();
for(int i=0;i<100000;i++){
for(int j=0;j<10000;j++){
i*j+j*i-j*i;
}
}
var end = DateTime.now();
print("同步耗时任务:${end.difference(start).inSeconds}秒");
print("d");
}
在两个异步任务中添加了await修饰符,则会像同步代码一样按照顺序进行执行。
EventLoop
异步任务会丢到EventQueue由EventLoop来执行。会不断的从时间队列中获取异步任务并运行。
[图片上传失败...(image-892b75-1692082991757)]
下图展示了EventLoop从队列中获取任务并执行的过程。
[图片上传失败...(image-e22d7c-1692082796350)]
图中的MicrotaskQueue是微任务队列,优先级最高,每次循环都会先检查微任务队列,有微任务则优先执行微任务,直到所有微任务都做完后才去执行EventQueue。
Microtask:微任务一般用于执行很短的异步操作,任务量不能太多。 生成微任务的方式有
// 方式一
scheduleMicrotask((){
print("我是一条微任务!");
});
// 方式二
Future.microtask(() => print("我是另一条微任务!"));
Event:事件,主要来自于Future,已经IO、手势、绘制、计时器和与Isloate通信的message等。
此处需要注意Future使用的特殊场景。
Future.delayed(new Duration(seconds: 2),(){
print("a");// 异步代码的的回调会在所有同步代码执行完毕后才开始执行
});
Future.delayed 会在延迟结束后才把异步回调添加到EventQueue尾部,并不是立即执行,不能保证执行时间。
Future(()=>print("zzz"))
.then((value) => print("xxx"))
.then((value) => print("yyy"))
.then((value) => print("www"));
Future.then 会对异步事件补充回调,then不会向EventQueue中添加事件,而是在前面的Future执行完成后立即执行。可以保证多个then内部task的执行顺序。
Future aaa = Future(()=>print("aaa"));
// 此时aaa已经处于完成状态
aaa.then((value) => print("bbb"));
// Future(()=>null)生成的就是一个已经完成的Future
Future(()=>null).then((value) => print("ccc"));
针对已经完成的Future,调用then时并不会立即执行,而是将then中的回调添加到Microtask队列中。
案例分析:
void dartLoopTest() {
Future x0 = Future(() => null);
Future x = Future(() => print('1'));
Future(() => print('2'));
scheduleMicrotask(() => print('3'));
x.then((value) {
print('4');
Future(() => print('5'));
}).then((value) => print('6'));
print('7');
x0.then((zvalue) {
print('8');
scheduleMicrotask(() {
print('9');
});
}).then((value) => print('10'));
}
输出结果为:
7
3
8
10
9
1
4
6
2
5
1.最先执行同步代码print('7');
2.其次判断MicroTask队列,执行scheduleMicrotask(() => print('3'));
3.当前MicroTask队列为空,从Event队列中获取事件,按照代码顺序执行
Future x0 = Future(() => null);
此时没有输出,但x0是完成状态的Future,第一个then的回调会加入到MicroTask队列。
x0.then((zvalue) {
print('8');
scheduleMicrotask(() {
print('9');
});
}).then((value) => print('10'));
然后运行微任务,调用print('8');,执行scheduleMicrotask((){print('9');});向微任务队列添加print('9');的任务。由于当前微任务没有执行完,所以会先调用print('10')。
4.按顺序执行Future x = Future(() => print('1'));,之后x 也处于完成状态,then回调会添加到微任务队列中执行。在微任务中先打印4,再向事件队列添加打印5的事件,最后打印6。
x.then((value) {
print('4');
Future(() => print('5'));
}).then((value) => print('6'));
5.此时事件队列里还有打印2和打印5的事件,按顺序执行输出最后结果。
void testThenAwait(){
Future(()=>print("AAA"))
.then((value) async => await Future(()=>print("BBB")))
.then((value) => print("CCC"));
Future(()=>print("DDD"));
}
当我们在then中使用await时会阻塞当前then向下调用。
输出结果:
AAA
DDD
BBB
CCC
输出分析:
1.首先这个方法中有两个Future,按顺序添加到Event队列中。
2.运行第一个Future,打印AAA,
3.接着同步执行第一个then,向Event队列中添加第三FutureFuture(()=>print("BBB")由于有同步等待,必须等Future(()=>print("BBB")执行完毕后才会继续向下执行。
4.运行Event队列中排在前面的异步事件Future(()=>print("DDD"));,打印DDD
5.运行Event队列中剩余的异步事件Future(()=>print("BBB"),打印BBB
6.接着执行then,打印CCC
创建Isolate
基本方法
Flutter应用中的代码默认都跑在root isloate 中,尽管是单线程但已经足够处理各类异步任务。
当有计算密集型的耗时任务时,就需要创建新的Isolate来进行耗时计算来避免阻塞root isloate。由于不同Isolate之间内存隔离,要通信就得通过 ReceivePort与SendPort 来实现。
使用Isolate.spawn来创建新的Isolate。看下函数签名
external static Future<Isolate> spawn<T>(
void entryPoint(T message), T message,
{bool paused = false,
bool errorsAreFatal = true,
SendPort? onExit,
SendPort? onError,
@Since("2.3") String? debugName});
external修饰说明这个方法在不同的平台有不同的实现,有点类似于java的native方法。
Future<Isolate>说明是异步方法,要想同步代码中拿到创建好的Isolate,调用时得添加同步等待await。
entryPoint定义了所能接受的计算任务的函数签名(空返回值,有且只有一个入参),必须是顶层函数或者静态方法。
T message是在新Isolate中运行计算任务所需的参数,类型必须和entryPoint所能接受的类型一致。
所以创建新的Isolate最主要的就以下两步:
1.使用顶层函数或静态方法定义计算任务
// 耗时计算部分
int fibonacci(int n) {
return n < 2 ? n : fibonacci(n - 2) + fibonacci(n - 1);
}
// 1. 计算任务
void task1(int start) {
DateTime startTime = DateTime.now();
int result = fibonacci(start);
DateTime endTime = DateTime.now();
print("计算耗时:${endTime.difference(startTime)} 结果:${result.toString()}");
}
void main() {
task1(50);
}
// 输出:计算耗时:0:00:48.608656 结果:12586269025
上面的计算要耗时48秒,必须要在新的Isolate中做计算
2.准备入参,调用spawn创建新的Isolate。
void main() async {
Isolate newIsolate = await Isolate.spawn(task1,10,debugName: "isolateDebug");
print("结束!");
}
输出结果:
结束!
在新的Isolate中计算得到的结果没有打印到当前Console中,此时就要使用ReceivePort与SendPort来建立当前Isolate和新Isolate的连接。
单向通信
定义可以接收宿主的SendPort的耗时任务,在计算完成后通过send方法将结果发回宿主Isolate。
/// [hostSendPort] 用于isolate向宿主isolate发送结果
void task2(SendPort hostSendPort){
DateTime startTime = DateTime.now();
int result = fibonacci(47);
DateTime endTime = DateTime.now();
var state = "计算耗时:${endTime.difference(startTime)} 结果:${result
.toString()}";
hostSendPort.send(state);
}
在宿主Isolate中要定义用于接收结果的ReceivePort,ReceivePort也是Stream,可以设置监听来处理收到的结果。
void main() async {
// 定义宿主接受结果的ReceivePort,设置监听。
ReceivePort hostReceivePort = ReceivePort();
hostReceivePort.listen((message) {
print(message);
});
// 定义向hostReceivePort 发送数据的hostSendPort
SendPort hostSendPort = hostReceivePort.sendPort;
// hostSendPort.send("message"); 测试了在当前isolate中发送结果的场景。
Isolate newIsolate =
await Isolate.spawn(task2, hostSendPort);
}
计算结果:
计算耗时:0:00:11.959955 结果:2971215073
双向通信
通过上面的改造,将宿主中的SendPort传递给子Isolate,在计算出结果后发回给宿主Isolate,宿主Isolate通过ReceivePort设置监听来处理结果。就完成了子Isolate向宿主Isolate发送数据。
但宿主Isolate如何向子Isolate发送数据呢?
子Isolate向宿主Isolate发送数据是通过,持有宿主中的SendPort来实现的。那么要实现宿主Isolate向子Isolate发送数据,宿主中也得持有子Isolate中的SendPort才行。最简单的方案就是在子Isolate中创建subSendPort并传递回宿主。
void task3(SendPort hostSendPort) {
///5.创建子Isolate自己的ReceivePort,用于接收宿主传过来的初始化参数
ReceivePort subReceivePort = ReceivePort();
subReceivePort.listen((start) {
if (start is int) {
///9. 收到宿主中初始化参数后进行计算。
DateTime startTime = DateTime.now();
int result = fibonacci(start);
DateTime endTime = DateTime.now();
var state =
"计算耗时:${endTime.difference(startTime)} 结果:${result.toString()}";
///10.计算结束后通过宿主的hostSendPort将结果发出去。
hostSendPort.send(state);
}
});
///6.将子Isolate自身的sendPort发给宿主,用于宿主向子Isolate传递初始化参数。
hostSendPort.send(subReceivePort.sendPort);
}
void main() async {
///1 定义宿主接受结果的port和发送参数的port
ReceivePort hostReceivePort = ReceivePort();
///2 定义子Isolate的SendPort引用
SendPort subSendPort;
///3 定义监听,监听的部分暂时不会运行
hostReceivePort.listen((message) {
if (message is SendPort) {
/// 7.收到子Isolate的SendPort,此时完成了双向通信的配置阶段
subSendPort = message;
/// 8.向子Isolate发送计算任务的初始化数据
subSendPort.send(2);
subSendPort.send(10);
subSendPort.send(20);
subSendPort.send(30);
subSendPort.send(40);
subSendPort.send(48);
} else if (message is String) {
/// 11.打印子Isolate中计算的结果。
print(message);
} else {
print("收到的数据不符合规范");
}
});
///4 定义向hostReceivePort 发送数据的hostSendPort,开始创建
SendPort hostSendPort = hostReceivePort.sendPort;
Isolate newIsolate = await Isolate.spawn(task3, hostSendPort);
}
最终输出结果为:
计算耗时:0:00:00.000000 结果:1
计算耗时:0:00:00.000000 结果:55
计算耗时:0:00:00.000000 结果:6765
计算耗时:0:00:00.002999 结果:832040
计算耗时:0:00:00.393017 结果:102334155
计算耗时:0:00:19.179601 结果:4807526976
上面的代码按顺序看可能有点绕,添加了注释描述了基本的运行顺序。可以看出要完成一个基本的双向通信功能,要写大段的配置的代码,真正属于业务部分的代码就几行,用起来很繁琐。
简化使用
Flutter中对Isolate的使用进行了简化,通过compute方法可以很方便的实现双向通信。
import 'package:flutter/foundation.dart';
int fibonacci(int n) {
return n < 2 ? n : fibonacci(n - 2) + fibonacci(n - 1);
}
void main()async{
var result = await compute( fibonacci,20);
print("计算结果为:$result");
}
输出结果:
I/flutter ( 9899): 计算结果为:6765
————————————————
版权声明:本文为CSDN博主「d0d0bird」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/d0d0bird/article/details/116718976