flutter相关

Flutter异步编程之 Future/Isolate

2021-01-27  本文已影响0人  哈啰于先生

前言

Flutter是用Dart实现的,在Dart中没有线程和进程的概念,我们编程使用多线程一般实现两种场景,一种是异步执行,一种是并行执行。那么如何在Flutter上实现异步编程呢。本篇文章将主要讨论以下问题:

1、Dart如何实现异步编程?
2、Event Loops是什么?
3、Isolate是什么呢?
4、如何实现Isolate?
5、Isolate底层原理是什么?

同步和异步

我们在写Dart代码的时候,就只有两种代码,

同步代码:就是一行行写下来的代码
异步代码:就是以Future等修饰的代码

并不是指的我们平常异步,这两种代码的区别只有一个:代码运行的顺序是不同的,先运行同步代码,再运行异步代码,顺序执行。

而异步代码是运行在Event loop里的,Event loops就是事件循环机制。

Event loop运行示意图

这个很好理解,事件events加到Event queue里,Event loop循环从Event queue里取Event执行。

Event loop 流程图

从这里看到,启动app(start app)后:

这里多了两个名词:MicroTask和Event,这代表了两个不同的异步task
而且可以看出:

如果想让任务能够尽快执行,就用MicroTask。

MicroTask

dart:async提供的异步方法,主要实现在schedule_microtask.dart中,使用方式

scheduleMicrotask((){
  // ...code goes here...
}); 

或者

new Future.microtask((){
    // ...code goes here...
});

Event

Event我们就很熟悉的,就是Future修饰的异步方法,使用方式

new Future(() {
  // ...code goes here...
});

对 Future 的理解
1、Future对象(futures)表示异步操作的结果,进程或者IO会延迟完成
2、可以在async函数中使用await来挂起执行,返回一个Future对象
3、在async函数中使用try-catch来捕获异常(或者使用catchError())
4、await只能在async中使用

Future示例代码

// Future示例代码:

void futureTest() {
  print("future start");
  Future.wait([
// 2秒后返回结果
    Future.delayed(new Duration(seconds: 2), () {
      print("hello");
      return "hello";
    }),
// 4秒后返回结果
    Future.delayed(new Duration(seconds: 4), () {
      print("world");
      return " world";
    }),
    // 4秒后返回结果
    Future.delayed(new Duration(seconds: 6), () {
      print("!");
      return " !";
    })
  ]).then((results) {
// 上面的两个任务执行完毕后进入
    print("future finish");
  }).catchError((e) {
// 执行失败会走到这里
    print(e);
  }).whenComplete(() {
// 无论成功或失败都会走到这里
  });
}

/// 打印结果
future start
hello
world
!
future finish

Event loop示例代码,根据Event loop的执行流程,请问如下代码打印顺序是什么样的?

void eventLoopTest() {
  print('eventLoopTest #1 of 2');
  scheduleMicrotask(() => print('microtask #1 of 3'));
  //使用delay方式,是将此task放到queue的尾部,
  //若前面有耗时操作,不一定能准时执行
  new Future.delayed(
      new Duration(seconds: 1), () => print('future #1 (delayed)'));
  //使用then,是表示在此task执行后立刻执行
  new Future(() => print('future #2 of 4'))
      .then((_) => print('future #2a'))
      .then((_) {
    print('future #2b');
    scheduleMicrotask(() => print('microtask #0 (from future #2b)'));
  }).then((_) => print('future #2c'));

  scheduleMicrotask(() => print('microtask #2 of 3'));

  new Future(() => print('future #3 of 4'))
      .then((_) => new Future(() => print('future #3a (a new future)')))
      .then((_) => print('future #3b'));

  new Future(() => print('future #4 of 4')).then((_) {
    new Future(() => print('future #4a'));
  }).then((_) => print('future #4b'));

  scheduleMicrotask(() => print('microtask #3 of 3'));
  print('eventLoopTest #2 of 2');
}

//打印结果
isolateTest #1 of 2
isolateTest #2 of 2
microtask #1 of 3
microtask #2 of 3
microtask #3 of 3
future #2 of 4
future #2a
future #2b
future #2c
microtask #0 (from future #2b)
future #3 of 4
future #4 of 4
future #4b
future #3a (a new future)
future #3b
future #4a
future #1 (delayed)

关键点
1、Future.delayed需要延迟执行的,是在延迟时间到了之后才将此task加到event queue的队尾,所以万一前面有很耗时的任务,那么你的延迟task不一定能准时运行。
2、Future.then每次都会返回一个Future,默认是其本身。如果在then中函数也返回一个新的Future,则新Future会重新加入到event queue中等待执行
3、一个event task运行完后,会先去查看Micro queue里有没有可以执行的micro task。没有的话,在执行下一个event task

我们知道 Dart 是单线程异步编程模型 ,Future解决了异步执行的问题。但是并行执行怎么处理呢?

Flutter引擎架构

Flutter体系图

我们只关注线程相关信息
1、Framework:我们直接接触的层级
2、Engine:Dart Isolate Setup, 创建Isolate,类似于DartVM中的线程,他的架构就是一个循环:event loops。但这一层并不创建及管理线程,它要求Embeder提供四个Task Runner,类似于线程,并不是真正的线程。
3、Embedder:Thread Setup,真正的线程创建及管理者,Embeder指的是将引擎移植到平台的中间层代码。

Task runners

Task runner

Embedder将自己管理的线程作为 task runner 提供给Flutter 引擎。

主要的 task runner 有:
Platform Task Runner
UI Task Runner
GPU Task Runner
IO Task Runner

Isolate

Dart是一个单线程语言,它的"线程"概念被称为 Isolate,中文意思是隔离。

Dart中使用多线程计算的时候,在创建Isolate以及线程间数据传递中耗时要超过单线程,每当我们创建出来一个新的 Isolate 至少需要 2mb 左右的空间甚至更多,因此Isolate有合适的使用场景,不建议滥用Isolate。那么应该在什么时候使用Future,什么时候使用Isolate呢?
一个最简单的判断方法是根据某些任务的平均时间来选择:
方法执行在几毫秒或十几毫秒左右的,应使用Future
如果一个任务需要几百毫秒或之上的,则建议创建单独的Isolate
一些常见的可以参考的场景
JSON 解码
数据加密
图像处理
网络请求:加载资源、图片

如何使用Isolate

Isolate由一对Port分别由用于接收消息的ReceivePort对象,和用于发送消息的SendPort对象构成。其中SendPort对象不用单独创建,它已经包含在ReceivePort对象之中。需要注意,一对Port对象只能单向发消息,这就如同一根自来水管,ReceivePortSendPort分别位于水管的两头,水流只能从SendPort这头流向ReceivePort这头。因此,两个Isolate之间的消息通信肯定是需要两根这样的水管的,这就需要两对Port对象。

1、Dart中创建
我们可以通过 Isolate.spawn 创建一个 isolate。

static Future<Isolate> spawn<T>(void entryPoint(T message),T message);

当我们调用 Isolate.spawn 的时候,它将会返回一个对 isolate 的引用的 Future。我们可以通过这个 isolate 来控制创建出的 Isolate,例如pause、resume、kill 等等。

但是在此之前我们必须要创建两个 isolate 之间沟通的桥梁。

import 'dart:isolate';
import  'dart:io';

void main() {
  print("main isolate start");
  create_isolate();
  print("main isolate end");
}

// 创建一个新的 isolate
create_isolate() async{
  ReceivePort rp = new ReceivePort();
  SendPort port1 = rp.sendPort;

  Isolate newIsolate = await Isolate.spawn(doWork, port1);

  SendPort port2;
  rp.listen((message){
    print("main isolate message: $message");
    if (message[0] == 0){
      port2 = message[1];
    }else{
      port2?.send([1,"这条信息是 main isolate 发送的"]);
    }
  });
}

// 处理耗时任务
static void doWork(SendPort port1){
  print("new isolate start");
  ReceivePort rp2 = new ReceivePort();
  SendPort port2 = rp2.sendPort;

  rp2.listen((message){
    print("doWork message: $message");
  });

  // 将新isolate中创建的SendPort发送到主isolate中用于通信
  port1.send([0, port2]);
  // 模拟耗时5秒
  sleep(Duration(seconds:5));
  port1.send([1, "doWork 任务完成"]);

  print("new isolate end");
}

//运行结果
main isolate start
main isolate end
new isolate start
main isolate message: [0, SendPort]
new isolate end
main isolate message: [1, doWork 任务完成]
doWork message: [1, 这条信息是 main isolate 发送的]

运行后都会创建一个是新Isolate的微进程,新的Isolate和主Isolate都双向绑定了消息通信的通道,即使新的Isolate中的任务完成了,它的微进程也不会立刻退出,因此,当使用完自己创建的Isolate后,最好调用newIsolate.kill(priority: Isolate.immediate);将Isolate立即杀死。

2、Flutter中创建
如果想在Flutter中创建Isolate,则有更简便的API,这是由Flutter官方进一步封装ReceivePort而提供的更简洁API。使用compute函数来创建新的Isolate并执行耗时任务。

import 'package:flutter/foundation.dart';
import 'dart:io';

// 创建一个新的Isolate,在其中运行任务doWork
create_new_task() async{
  var str = "New Task";
  var result = await compute(doWork, str);
  print(result);
}

static String doWork(String value){
  print("new isolate doWork start");
  // 模拟耗时5秒
  sleep(Duration(seconds:5));
  print("new isolate doWork end");
  return "complete:$value";
}

compute函数有两个必须的参数,第一个是待执行的函数,这个函数必须是一个顶级函数或静态方法,不能是类的实例方法,第二个参数为动态的消息类型,可以是被运行函数的参数。
需要注意,使用compute应导入'package:flutter/foundation.dart'包。

实现线程管理器

import 'dart:isolate';
typedef LikeCallback = void Function(Object value);

class  ThreadManagement  {
  //entryPoint 必须是静态方法
  static Future<Map>  runTask (void entryPoint(SendPort message), LikeCallback(Object value),{Object parameter})async{
    final response = ReceivePort();
    Isolate  d =  await Isolate.spawn(entryPoint, response.sendPort);
    // 调用sendReceive自定义方法
    if(parameter!=null){
      SendPort sendPort = await response.first;
      ReceivePort receivePort = ReceivePort();
      sendPort.send([parameter, receivePort.sendPort]);
      receivePort.listen((value){
        receivePort.close();
        d.kill();
        LikeCallback(value);
      });
      return {
        'isolate': d,
        "receivePort":receivePort,
      };
    }else{
      response.listen((value){
        response.close();
        d.kill();
        LikeCallback(value);
      });
      return {
        'isolate': d,
        "receivePort":response,
      };
    }
  }
}
// 无参数的任务
static void getNoParamTask(SendPort port) async {
    var c = await Future.delayed(Duration(seconds: 1), () {
      return "banner data";
    });
    port.send(c);
  }

// 需要参数的任务
static getParamsTask(SendPort port) async {
    ReceivePort receivePort = ReceivePort();
    port.send(receivePort.sendPort);
    // 监听外界调用
    await for (var msg in receivePort) {
      Map requestURL =msg[0];
      SendPort callbackPort =msg[1];
      receivePort.close();
      var res = await Future.delayed(Duration(seconds: 1), () {
        var requestUrl = requestURL["type"];
        var after = requestURL["after"];
        return "url = $requestUrl, after = $after";
      });
      callbackPort.send(res);
   }
}
// 调用无参数的任务
ThreadManagement.runTask(API.getNoParamTask, (value){
     if(value != null){
        //业务逻辑
        print(value);
    }
});

//调用有参数的任务
ThreadManagement.runTask(API.getParamsTask,  (value){
    if(value != null){
        //业务逻辑
       print(value);
   }
}, parameter: {
    "type":"hot",
    "after":"1"
});

线程池

如何减少 isolate 创建所带来的消耗。我们可以创建一个线程池,初始化到那里。当我们需要使用的时候再拿来用就好了。
LoadBalancer是 dart team 已经为我们写好一个非常实用的 package。
我们现在 pubspec.yaml中添加 isolate 的依赖。

isolate: ^2.0.3

我们可以通过 LoadBalancer 创建出指定个数的 isolate。

Future<LoadBalancer> loadBalancer = LoadBalancer.create(2, IsolateRunner.spawn);

这段代码将会创建出一个 isolate 线程池,并自动实现了负载均衡。
下面我们再来看看应该如何使用 LoadBalancer 中的 isolate。

void testBalancer() async {
   final lb = await loadBalancer;
   int res = await lb.run(doWork, 110);
   print(res);
}

int doWork(int value) {
// 模拟耗时5秒
  print("new isolate doWork start");
  sleep(Duration(seconds: 5));
  return value;
}

//打印数据
new isolate doWork start
110

我们关注的只有

Future<R> run<R, P>(FutureOr<R> function(P argument), argument,)

方法。我们还是需要传入一个 function 在某个 isolate 中运行,并传入其参数 argument。run 方法将会返回我们执行方法的返回值。
整体和 compute 使用上差不多,但是当我们多次使用额外的 isolate 的时候,不再需要重复创建了。
并且 LoadBalancer 还支持 runMultiple,可以让一个方法在多线程中执行。
LoadBalancer 经过测试,它会在第一次使用其 isolate 的时候初始化线程池。
当应用打开后,即使我们在顶层函数中调用了 LoadBalancer.create,但是还是只会有一个 Isolate。
当我们调用 run 方法时,才真正创建出了实际的 isolate。

参考文章:
Dart 函数、箭头函数、匿名函数、立即执行函数及闭包
Flutter中的异步编程——Future
Flutter进阶Future异步详解
Flutter异步编程
深入了解Flutter的isolate
Flutter/Dart中的异步编程之Isolate
isolate

上一篇下一篇

猜你喜欢

热点阅读