Dart Steam 流简介

什么是流,这篇Dart 语言Stream详解文章讲得比较好,可以参考下。

代码来自 猫哥的bloc教程

流可以分为两类:

单订阅流(Single Subscription),这种流最多只能有一个监听器(listener)

多订阅流(Broadcast),这种流可以有多个监听器监听(listener)

延迟间隔

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import 'dart:async';

periodic() async {
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (val) => val);
await printStream(stream);
}

// 打印流列表
printStream(Stream<Object> stream) async {
await for (var val in stream) {
print(val);
}
}

main(List<String> args) async {
print('==== start ====');
// 延迟间隔
await periodic();

print('==== end ====');
}

执行结果

future 数据源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import 'dart:async';

// 打印流列表
printStream(Stream<Object> stream) async {
await for (var val in stream) {
print(val);
}
}

// 异步函数i
Future<int> funi = Future(() {
return 100;
});

// future 数据源
fromFuture() async {
Stream<int> stream = Stream<int>.fromFuture(funi);
await printStream(stream);
}


main(List<String> args) async {
print('==== start ====');

// future 数据源
await fromFuture();

print('==== end ====');
}

执行结果

future 多数据源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import 'dart:async';

// 打印流列表
printStream(Stream<Object> stream) async {
await for (var val in stream) {
print(val);
}
}

// 异步函数i
Future<int> funi = Future(() {
return 100;
});

// 异步函数i
Future<int> funii = Future(() {
return 200;
});

// future 多数据源
fromFutures() async {
Stream<int> stream = Stream<int>.fromFutures([
funi,
funii,
]);

await printStream(stream);
}


main(List<String> args) async {
print('==== start ====');

// future 多数据源
await fromFutures();

print('==== end ====');
}

执行结果

Stream 监听 单对单(单订阅)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import 'dart:async';

// 打印流列表
printStream(Stream<Object> stream) async {
await for (var val in stream) {
print(val);
}
}

// Stream 监听 单对单
listen() async {
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (val) => val);

stream.listen(
(event) {
print(event);
},
onError: (err) {
print(err);
},
onDone: () {
print('done');
},
cancelOnError: true,
);
}

main(List<String> args) async {
print('==== start ====');

// Stream 监听 单对单
await listen();

print('==== end ====');
}

执行结果

Stream take

通过take方法控制Stream中的元素数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import 'dart:async';

// 打印流列表
printStream(Stream<Object> stream) async {
await for (var val in stream) {
print(val);
}
}

// Stream take
take(int max) async {
Stream<int> stream =
Stream<int>.periodic(Duration(seconds: 1), (val) => val).take(max);

stream.listen(
(event) {
print(event);
},
onError: (err) {
print(err);
},
onDone: () {
print('done');
},
cancelOnError: true,
);
}

main(List<String> args) async {
print('==== start ====');

// Stream take
await take(10);

print('==== end ====');
}

执行结果

Stream skip

通过skip方法跳过Stream中的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import 'dart:async';

// 打印流列表
printStream(Stream<Object> stream) async {
await for (var val in stream) {
print(val);
}
}

// Stream skip
skip(int max, int skipNum) async {
Stream<int> stream =
Stream<int>.fromIterable(Iterable.generate(max)).skip(skipNum);

stream.listen(
(event) {
print(event);
},
onError: (err) {
print(err);
},
onDone: () {
print('done');
},
cancelOnError: true,
);
}

main(List<String> args) async {
print('==== start ====');

// Stream skip
await skip(10, 4);

print('==== end ====');
}

执行结果

Stream 监听 广播

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import 'dart:async';

// 打印流列表
printStream(Stream<Object> stream) async {
await for (var val in stream) {
print(val);
}
}

// Stream 监听 广播
boardcast() async {
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (val) => val)
.asBroadcastStream();

stream.listen(
(event) {
print('监听1 $event');
},
onError: (err) {
print('监听1 $err');
},
onDone: () {
print('监听1 done');
},
cancelOnError: true,
);
}

main(List<String> args) async {
print('==== start ====');

// Stream 监听 广播
await boardcast();

print('==== end ====');
}

执行结果

StreamController 流控制器 单点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import 'dart:async';

// 打印流列表
printStream(Stream<Object> stream) async {
await for (var val in stream) {
print(val);
}
}

// StreamController 流控制器 单点
scListen() async {
// 定义流控制器
StreamController sc = StreamController(
onListen: () => print('onListen'),
onPause: () => print('onPause'),
onResume: () => print('onResume'),
onCancel: () => print('onCancel'),
sync: false, // 是否同步
);

// 订阅
StreamSubscription ss = sc.stream.listen(print);

// 操作流
sc.add(100);
await Future.delayed(Duration(seconds: 1));

sc.add(200);
await Future.delayed(Duration(seconds: 1));

// 暂停订阅
ss.pause();
await Future.delayed(Duration(seconds: 1));

// 恢复订阅
ss.resume();
await Future.delayed(Duration(seconds: 1));

sc.add(300);
await Future.delayed(Duration(seconds: 1));

// 取消订阅
// ss.cancel();

// 重要:使用完流一定要关闭流
sc.close();
}

main(List<String> args) async {
print('==== start ====');

// StreamController 流控制器 单点
await scListen();

print('==== end ====');
}

执行结果

StreamController 流控制器 广播

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import 'dart:async';

// 打印流列表
printStream(Stream<Object> stream) async {
await for (var val in stream) {
print(val);
}
}

// StreamController 流控制器 广播
scBroadcaset() async {
StreamController sc = StreamController.broadcast();

sc.stream.listen(print);
sc.stream.listen(print);

sc.addStream(Stream.fromIterable([1, 2, 3, 4, 5]));

await Future.delayed(Duration(seconds: 2));
sc.close();
}

main(List<String> args) async {
print('==== start ====');

// StreamController 流控制器 广播
await scBroadcaset();

print('==== end ====');
}

执行结果

StreamTransformer 流的转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import 'dart:async';

// 打印流列表
printStream(Stream<Object> stream) async {
await for (var val in stream) {
print(val);
}
}

// StreamTransformer 流的转换
scTransformer() async {
StreamController sc = StreamController<int>.broadcast();

StreamTransformer stf = StreamTransformer<int, String>.fromHandlers(
handleData: (int data, EventSink sink) {
sink.add('转换后的数据:${data * 2}');
},
handleError: (error, stacktrace, sink) {
sink.addError('error: $error');
},
handleDone: (sink) {
sink.close();
},
);

Stream stream = sc.stream.transform(stf);
stream.listen(print);
stream.listen(print);

sc.addStream(Stream<int>.fromIterable([1, 2, 3, 4, 5]));

await Future.delayed(Duration(seconds: 2));

sc.close();
}

main(List<String> args) async {
print('==== start ====');

// StreamTransformer 流的转换
await scTransformer();

print('==== end ====');
}

执行结果