什么是流,这篇Dart 语言Stream详解文章讲得比较好,可以参考下。
代码来自 猫哥的bloc教程
流可以分为两类:
单订阅流(Single Subscription),这种流最多只能有一个监听器(listener)
多订阅流(Broadcast),这种流可以有多个监听器监听(listener)
延迟间隔
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import 'dart:async';
periodic() async { Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (val) => val); await printStream(stream); }
printStream(Stream<Object> stream) async { await for (var val in stream) { print(val); } }
main(List<String> args) async { print('==== start ===='); await periodic();
print('==== end ===='); }
|
执行结果
future 数据源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import 'dart:async';
printStream(Stream<Object> stream) async { await for (var val in stream) { print(val); } }
Future<int> funi = Future(() { return 100; });
fromFuture() async { Stream<int> stream = Stream<int>.fromFuture(funi); await printStream(stream); }
main(List<String> args) async { print('==== start ====');
await fromFuture();
print('==== end ===='); }
|
执行结果
future 多数据源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| import 'dart:async';
printStream(Stream<Object> stream) async { await for (var val in stream) { print(val); } }
Future<int> funi = Future(() { return 100; });
Future<int> funii = Future(() { return 200; });
fromFutures() async { Stream<int> stream = Stream<int>.fromFutures([ funi, funii, ]);
await printStream(stream); }
main(List<String> args) async { print('==== start ====');
await fromFutures();
print('==== end ===='); }
|
执行结果
Stream 监听 单对单(单订阅)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| import 'dart:async';
printStream(Stream<Object> stream) async { await for (var val in stream) { print(val); } }
listen() async { Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (val) => val);
stream.listen( (event) { print(event); }, onError: (err) { print(err); }, onDone: () { print('done'); }, cancelOnError: true, ); }
main(List<String> args) async { print('==== start ====');
await listen();
print('==== end ===='); }
|
执行结果
Stream take
通过take方法控制Stream中的元素数量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| import 'dart:async';
printStream(Stream<Object> stream) async { await for (var val in stream) { print(val); } }
take(int max) async { Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (val) => val).take(max);
stream.listen( (event) { print(event); }, onError: (err) { print(err); }, onDone: () { print('done'); }, cancelOnError: true, ); }
main(List<String> args) async { print('==== start ====');
await take(10);
print('==== end ===='); }
|
执行结果
Stream skip
通过skip方法跳过Stream中的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| import 'dart:async';
printStream(Stream<Object> stream) async { await for (var val in stream) { print(val); } }
skip(int max, int skipNum) async { Stream<int> stream = Stream<int>.fromIterable(Iterable.generate(max)).skip(skipNum);
stream.listen( (event) { print(event); }, onError: (err) { print(err); }, onDone: () { print('done'); }, cancelOnError: true, ); }
main(List<String> args) async { print('==== start ====');
await skip(10, 4);
print('==== end ===='); }
|
执行结果
Stream 监听 广播
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| import 'dart:async';
printStream(Stream<Object> stream) async { await for (var val in stream) { print(val); } }
boardcast() async { Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (val) => val) .asBroadcastStream();
stream.listen( (event) { print('监听1 $event'); }, onError: (err) { print('监听1 $err'); }, onDone: () { print('监听1 done'); }, cancelOnError: true, ); }
main(List<String> args) async { print('==== start ====');
await boardcast();
print('==== end ===='); }
|
执行结果
StreamController 流控制器 单点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| import 'dart:async';
printStream(Stream<Object> stream) async { await for (var val in stream) { print(val); } }
scListen() async { StreamController sc = StreamController( onListen: () => print('onListen'), onPause: () => print('onPause'), onResume: () => print('onResume'), onCancel: () => print('onCancel'), sync: false, );
StreamSubscription ss = sc.stream.listen(print);
sc.add(100); await Future.delayed(Duration(seconds: 1));
sc.add(200); await Future.delayed(Duration(seconds: 1));
ss.pause(); await Future.delayed(Duration(seconds: 1));
ss.resume(); await Future.delayed(Duration(seconds: 1));
sc.add(300); await Future.delayed(Duration(seconds: 1));
sc.close(); }
main(List<String> args) async { print('==== start ====');
await scListen();
print('==== end ===='); }
|
执行结果
StreamController 流控制器 广播
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| import 'dart:async';
printStream(Stream<Object> stream) async { await for (var val in stream) { print(val); } }
scBroadcaset() async { StreamController sc = StreamController.broadcast();
sc.stream.listen(print); sc.stream.listen(print);
sc.addStream(Stream.fromIterable([1, 2, 3, 4, 5]));
await Future.delayed(Duration(seconds: 2)); sc.close(); }
main(List<String> args) async { print('==== start ====');
await scBroadcaset();
print('==== end ===='); }
|
执行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| import 'dart:async';
printStream(Stream<Object> stream) async { await for (var val in stream) { print(val); } }
scTransformer() async { StreamController sc = StreamController<int>.broadcast();
StreamTransformer stf = StreamTransformer<int, String>.fromHandlers( handleData: (int data, EventSink sink) { sink.add('转换后的数据:${data * 2}'); }, handleError: (error, stacktrace, sink) { sink.addError('error: $error'); }, handleDone: (sink) { sink.close(); }, );
Stream stream = sc.stream.transform(stf); stream.listen(print); stream.listen(print);
sc.addStream(Stream<int>.fromIterable([1, 2, 3, 4, 5]));
await Future.delayed(Duration(seconds: 2));
sc.close(); }
main(List<String> args) async { print('==== start ====');
await scTransformer();
print('==== end ===='); }
|
执行结果