Dart Isolate入门

Dart多线程 Isolate 入门

什么是Isolate

Isolate就是Dart中的多线程,之所以不称为Thread,而是称为Isolate,是因为它与传统线程的概念有所区别,见如下表格

特性 Isolate Thread
内存共享 不共享内存 可以共享内存
通信方式 ReceivePort和SendPort 共享变量或synchronized关键字
数据竞争 无数据竞争 可能有数据竞争
开销 开销较大 相对轻量
任务调度 Dart运行时管理,适合CPU密集型任务 由操作系统管理

Isolate的创建之所以比Thread开销要大,是因为Isolate在创建时需要创建更多的资源,例如事件循环队列、微任务队列,独立的内存堆以及独立的垃圾回收器,并且每个 Isolate 需要单独初始化 Dart 运行时环境,包括 Dart 代码执行上下文,而线程只需要共享进程的执行上下文。

使用场景:需要耗时计算时

为什么使用Isolate?

你可能会问,我直接用 await/async 异步编程也可以做到类似的功能啊,为什么偏要用Isolate呢?

Dart的异步编程是用事件循环和微任务队列实现的,本质上这些耗时任务也是和主任务(可能是UI任务)运行在同一个线程上的,一旦异步任务多了起来,多多少少会使事件循环的时间变长,造成卡顿。

而每个Isolate都有自己独立的事件循环和微任务队列,也就是说每个Isolate都可以有自己独立的异步任务,不会消耗其他Isolate的资源。

Isolate使用

创建Isolate

创建一个Isolate使用Isolate.spawn 函数(还有其他方法,只不过该方法更加常用),注意该函数是一个异步函数:

1
2
3
4
5
6
7
8
void task(String info) {
print(info);
}

void main() async {
var receivePort = ReceivePort();
await Isolate.spawn(task, "Hello");
}

需要说明的是,此处的异步并不是指我们可以通过await关键字来等待新创建的Isolate执行完毕,而是“创建Isolate”这个过程本身是异步的,新的Isolate创建好后它自己会自动执行。

那么await Isolate.spawn() 和不加await有什么区别呢?答案是,如果在新的Isolate创建完毕之前,如果主Isolate结束了,那么这个新的Isolate永远不会被执行,这是因为它还没出生就已经被杀死了。

Isolate相当于是守护线程,主Isolate结束后,不管是否还有其他Isolate在运行,Dart都会结束程序。

Isolate间通信

两个Isolate之间使用ReceivePortSendPort 进行通信,注意Isolate之间不能通过共享内存来通信,这也是Isolate区别于传统线程的一点。

ReceivePort对象在创建时会同时创建一个SendPort对象,通过ReceivePort对象的sendPort属性取得:

1
2
ReceivePort receivePort = ReceivePort();
SendPort sendPort = receivePort.sendPort;

使用与ReceivePort同时创建的SendPort对象即可向该ReceivePort发送消息

1
sendPort.send(message); // message 可以是任意类型

一般来说,在实际的使用中,我们会把与ReceivePort配套创建的SendPort通过Isolate.spawn 函数在创建新Isolate时传递给新Isolate,以便进行消息的发送和接收:

1
2
3
4
5
6
7
ReceivePort receivePort = ReceivePort();
SendPort sendPort = receivePort.sendPort;
Isolate.spawn(newIsolate, sendPort);

void newIsolate(SendPort port) {
port.send(message); // 发送消息给创建它的Isolate
}

那么主Isolate如何接收消息呢?

这时候就需要使用ReceivePort了,方法很多,这里一一介绍

ReceivePort.listen

该方法会持续监听新Isolate传来的消息,如果不主动结束监听,程序一直不会结束:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import 'dart:isolate';

void main() async {
ReceivePort receivePort = ReceivePort();
SendPort sendPort = receivePort.sendPort;
await Isolate.spawn(newIsolate, sendPort);

receivePort.listen((message) {
print("计算结果: " + message.toString());
},);

print("等待计算结果...");
}

// 进行循环
void newIsolate(SendPort port) async {
int sum = 0;
for(int i = 0; i < 2000000000; i++) {
// do nothing
sum += i;
}

port.send(sum);
}

输出内容:

1
2
3
4
等待计算结果...
计算结果: 1999999999000000000

// 然后程序就会停在这儿,不会结束

想要结束程序,我们就必须关闭ReceivePort,在listen中添加以下代码:

1
2
3
4
receivePort.listen((message) {
print("计算结果: " + message.toString());
receivePort.close();
},);

close 并不是ReceivePort中定义的方法,而是继承自其父类Stream,没错,ReceivePort本质上就是Stream,所以如果你尝试在close后再次订阅是会报错的

await for循环

使用Dart的await for 可以实现对ReceivePort的持续监听,但是注意,该语法同await 关键字一样,只能在async 函数中使用

1
2
3
await for(var data in receivePort) {
// ...
}

同样的,要结束监听,需要在await for循环体中调用receivePort.close() 方法,否则会一直阻塞在这里。

但是我们也可以选择只接收一定数量的消息而不调用close方法,在接收到的消息数量达到我们设定的量后就会自动退出await for

1
2
3
await for(var data in receivePort.take(2)) { // 只接收前两个消息
// ...
}

能使用await for循环是因为,我们前面说过,ReceivePortStream的子类。

ReceivePort.first

first是一个getter,它的返回值是一个Future,表示ReceivePort接收到的第一个数据,多用于只需要接收一次数据的场景,我们可以使用then或者await关键字来取得其中的数据:

1
2
final data = await receivePort.first;
print("The data is $data");

使用first时,我们不必手动closefirst方法内部会为我们处理好。

双向通信

前面我们介绍的通信方式都是单向的,主Isolate除了在创建时向新Isolate传入了一个参数,后面就无法向新Isolate传递消息了,只能新Isolate向主Isolate传递消息,要使它俩可以双向通信,我们不仅需要把主Isolate的SendPort传递给新Isolate,还需要把新Isolate的SendPort传给主Isolate,以便主Isolate能够向新Isolate传递消息。

请看下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import 'dart:isolate';

void main() async {
ReceivePort receivePort = ReceivePort();
SendPort sendPort = receivePort.sendPort;
await Isolate.spawn(newIsolate, sendPort);
// 接收新Isolate的SendPort
SendPort target = await receivePort.first;
// 向新Isolate发送信息
target.send("Hello! 一条来自主Isolate的消息");
}

// 进行循
void newIsolate(SendPort port) async {
ReceivePort receivePort = ReceivePort();
SendPort sendPort = receivePort.sendPort;
// 将新Isolate的SendPort传递给主Isolate
port.send(sendPort);
// 接收主Isolate的消息
String data = await receivePort.first;
print("新Isolate收到消息: $data");
}

运行这段代码,你会发现程序直接结束了,没有任何输出。

这是因为,当Dart发现主Isolate的事件循环没有任务后,便会直接结束整个程序,而不会管其他Isolate的死活。为了看到输出,我们可以在Isolate的最后加一句

1
await Future.delayed(Duration(seconds: 1));

让主Isolate等待1s再退出

但是这样的解决方法明显不够优雅,还有什么其他办法吗?

你可能会这样写:

1
2
3
4
5
6
7
8
9
10
11
12
import 'dart:isolate';

void main() async {
// ... 相同部分省略
print(await receivePort.first); // 在此处等待新Isolate的回复
}

// 进行循
void newIsolate(SendPort port) async {
// ... 相同部分省略
port.send("From new Isolate");
}

这样会报:

1
2
3
4
5
6
7
8
9
Unhandled exception:
Bad state: Stream has already been listened to.
#0 _StreamController._subscribe (dart:async/stream_controller.dart:686:7)
#1 _ControllerStream._createSubscription (dart:async/stream_controller.dart:837:19)
#2 _StreamImpl.listen (dart:async/stream_impl.dart:497:9)
#3 _ReceivePortImpl.listen (dart:isolate-patch/isolate_patch.dart:87:31)
#4 Stream.first (dart:async/stream.dart:1581:14)
#5 main (file:///Users/graftcopolymer/playground/isolate_play/main.dart:12:27)
<asynchronous suspension>

异常说:Stream has already been listened to.

我们都知道冷Stream只能被订阅一次(此处的receivePort就是一个冷Stream),但是是在哪儿订阅的呢?其实我们在第一次使用receivePort.first 时就已经订阅了一次了,first方法内部会为我们自动订阅和取消订阅。

眼下看来,我们就只能通过await for 循环来订阅消息并手动close掉receivePort了,具体代码这里就不再写了。

但其实还有更加优雅的解决方案,即可以封装一个专门用于消息发送和回复的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import 'dart:isolate';

/// Isolate消息对象
class Message<T>{
T? data;
/// use this to reply a message to the sender
SendPort reply;
Message(this.data, this.reply);

static Future send<T>(SendPort sendPort, T msg) async {
ReceivePort response = ReceivePort();
sendPort.send(Message<T>(msg, response.sendPort));
return response.first;
}
}

使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import 'dart:isolate';

import 'message.dart';

void main() async {
ReceivePort receivePort = ReceivePort();
SendPort sendPort = receivePort.sendPort;
final isolate = await Isolate.spawn(newIsolate, sendPort);
// 接收新Isolate的SendPort
SendPort target = await receivePort.first;
// 向新Isolate发送信息
final data = await Message.send(target, "Hi! 来自主Isolate的消息");
print("主Isolate收到回复: $data");
}

// 进行循环
void newIsolate(SendPort port) async {
ReceivePort receivePort = ReceivePort();
SendPort sendPort = receivePort.sendPort;
// 将新Isolate的SendPort传递给主Isolate
port.send(sendPort);
// 接收主Isolate的消息
Message<String> message = await receivePort.first;
print("新Isolate收到消息: ${message.data}");
// 使用 message 对象中的reply对象回复主线程
message.reply.send("来自新Isolate的消息");
}

输出:

1
2
新Isolate收到消息: Hi! 来自主Isolate的消息
主Isolate收到回复: 来自新Isolate的消息

Message.send方法相当于是一个发送消息的代理,我们传入要发送的目标SendPort和消息本身,它会为我们把消息发送过去,同时它还会为我们创建一个新的ReceivePort,这个新的ReceivePort对于Message.send 的调用方是透明的,调用方只需要关心如何获取这个新ReceivePort中的数据就可以了,而新的ReceivePort伴生的SendPort则会被命名为reply 传递给目标Isolate,以便目标Isolate回复消息。

使用Message.send 方法的好处就是,我们可以把发送消息和接收回复写到同一行:

1
final data = await Message.send(target, "Hi! 来自主Isolate的消息");