Dart Isolate入门
Dart多线程 Isolate 入门
什么是Isolate
Isolate就是Dart中的多线程,之所以不称为Thread,而是称为Isolate,是因为它与传统线程的概念有所区别,见如下表格
特性 | Isolate | Thread |
---|---|---|
内存共享 | 不共享内存 | 可以共享内存 |
通信方式 | ReceivePort和SendPort | 共享变量或synchronized关键字 |
数据竞争 | 无数据竞争 | 可能有数据竞争 |
开销 | 开销较大 | 相对轻量 |
任务调度 | Dart运行时管理,适合CPU密集型任务 | 由操作系统管理 |
Isolate的创建之所以比Thread开销要大,是因为Isolate在创建时需要创建更多的资源,例如事件循环队列、微任务队列,独立的内存堆以及独立的垃圾回收器,并且每个 Isolate 需要单独初始化 Dart 运行时环境,包括 Dart 代码执行上下文,而线程只需要共享进程的执行上下文。
使用场景:需要耗时计算时
为什么使用Isolate?
你可能会问,我直接用 await/async 异步编程也可以做到类似的功能啊,为什么偏要用Isolate呢?
Dart的异步编程是用事件循环和微任务队列实现的,本质上这些耗时任务也是和主任务(可能是UI任务)运行在同一个线程上的,一旦异步任务多了起来,多多少少会使事件循环的时间变长,造成卡顿。
而每个Isolate都有自己独立的事件循环和微任务队列,也就是说每个Isolate都可以有自己独立的异步任务,不会消耗其他Isolate的资源。
Isolate使用
创建Isolate
创建一个Isolate使用Isolate.spawn
函数(还有其他方法,只不过该方法更加常用),注意该函数是一个异步函数:
1 | void task(String info) { |
需要说明的是,此处的异步并不是指我们可以通过await
关键字来等待新创建的Isolate执行完毕,而是“创建Isolate”这个过程本身是异步的,新的Isolate创建好后它自己会自动执行。
那么await Isolate.spawn()
和不加await
有什么区别呢?答案是,如果在新的Isolate创建完毕之前,如果主Isolate结束了,那么这个新的Isolate永远不会被执行,这是因为它还没出生就已经被杀死了。
Isolate相当于是守护线程,主Isolate结束后,不管是否还有其他Isolate在运行,Dart都会结束程序。
Isolate间通信
两个Isolate之间使用ReceivePort
和SendPort
进行通信,注意Isolate之间不能通过共享内存来通信,这也是Isolate区别于传统线程的一点。
ReceivePort
对象在创建时会同时创建一个SendPort
对象,通过ReceivePort
对象的sendPort
属性取得:
1 | ReceivePort receivePort = ReceivePort(); |
使用与ReceivePort
同时创建的SendPort
对象即可向该ReceivePort
发送消息
1 | sendPort.send(message); // message 可以是任意类型 |
一般来说,在实际的使用中,我们会把与ReceivePort
配套创建的SendPort
通过Isolate.spawn
函数在创建新Isolate时传递给新Isolate,以便进行消息的发送和接收:
1 | ReceivePort receivePort = ReceivePort(); |
那么主Isolate如何接收消息呢?
这时候就需要使用ReceivePort
了,方法很多,这里一一介绍
ReceivePort.listen
该方法会持续监听新Isolate传来的消息,如果不主动结束监听,程序一直不会结束:
1 | import 'dart:isolate'; |
输出内容:
1 | 等待计算结果... |
想要结束程序,我们就必须关闭ReceivePort
,在listen
中添加以下代码:
1 | receivePort.listen((message) { |
close
并不是ReceivePort
中定义的方法,而是继承自其父类Stream
,没错,ReceivePort
本质上就是Stream
,所以如果你尝试在close
后再次订阅是会报错的
await for循环
使用Dart的await for
可以实现对ReceivePort
的持续监听,但是注意,该语法同await
关键字一样,只能在async
函数中使用
1 | await for(var data in receivePort) { |
同样的,要结束监听,需要在await for
循环体中调用receivePort.close()
方法,否则会一直阻塞在这里。
但是我们也可以选择只接收一定数量的消息而不调用close
方法,在接收到的消息数量达到我们设定的量后就会自动退出await for
:
1 | await for(var data in receivePort.take(2)) { // 只接收前两个消息 |
能使用await for
循环是因为,我们前面说过,ReceivePort
是Stream
的子类。
ReceivePort.first
first
是一个getter,它的返回值是一个Future
,表示ReceivePort
接收到的第一个数据,多用于只需要接收一次数据的场景,我们可以使用then
或者await
关键字来取得其中的数据:
1 | final data = await receivePort.first; |
使用first
时,我们不必手动close
,first
方法内部会为我们处理好。
双向通信
前面我们介绍的通信方式都是单向的,主Isolate除了在创建时向新Isolate传入了一个参数,后面就无法向新Isolate传递消息了,只能新Isolate向主Isolate传递消息,要使它俩可以双向通信,我们不仅需要把主Isolate的SendPort传递给新Isolate,还需要把新Isolate的SendPort传给主Isolate,以便主Isolate能够向新Isolate传递消息。
请看下面的代码:
1 | import 'dart:isolate'; |
运行这段代码,你会发现程序直接结束了,没有任何输出。
这是因为,当Dart发现主Isolate的事件循环没有任务后,便会直接结束整个程序,而不会管其他Isolate的死活。为了看到输出,我们可以在Isolate的最后加一句
1 | await Future.delayed(Duration(seconds: 1)); |
让主Isolate等待1s再退出
但是这样的解决方法明显不够优雅,还有什么其他办法吗?
你可能会这样写:
1 | import 'dart:isolate'; |
这样会报:
1 | Unhandled exception: |
异常说:Stream has already been listened to.
我们都知道冷Stream只能被订阅一次(此处的receivePort就是一个冷Stream),但是是在哪儿订阅的呢?其实我们在第一次使用receivePort.first
时就已经订阅了一次了,first
方法内部会为我们自动订阅和取消订阅。
眼下看来,我们就只能通过await for
循环来订阅消息并手动close掉receivePort了,具体代码这里就不再写了。
但其实还有更加优雅的解决方案,即可以封装一个专门用于消息发送和回复的方法:
1 | import 'dart:isolate'; |
使用示例:
1 | import 'dart:isolate'; |
输出:
1 | 新Isolate收到消息: Hi! 来自主Isolate的消息 |
Message.send
方法相当于是一个发送消息的代理,我们传入要发送的目标SendPort和消息本身,它会为我们把消息发送过去,同时它还会为我们创建一个新的ReceivePort,这个新的ReceivePort对于Message.send
的调用方是透明的,调用方只需要关心如何获取这个新ReceivePort中的数据就可以了,而新的ReceivePort伴生的SendPort则会被命名为reply
传递给目标Isolate,以便目标Isolate回复消息。
使用Message.send 方法的好处就是,我们可以把发送消息和接收回复写到同一行:
1 | final data = await Message.send(target, "Hi! 来自主Isolate的消息"); |