本节目标
- Stream 创建
- StreamController 控制
- StreamSubscription 订阅
- StreamTransformer 转换
- Sink、StreamSink、EventSink 修改数据
视频
代码
https://github.com/ducafecat/flutter-bloc-learn/tree/master/stream
正文
核心类
名称 |
说明 |
Stream |
事件流或者管道 |
StreamController |
事件管理者 |
StreamSubscription |
管理事件订阅,如 cacenl、pause |
StreamSink |
流 Sink 入口,提供如 add、addStream 等 |
EventSink |
事件 Sink 入口 |
StreamTransformer |
流转换 |
准备函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| printStream(Stream<Object> stream) async { await for (var val in stream) { print(val); } }
Future<int> funi = Future(() { return 100; });
Future<int> funii = Future(() { return 200; });
|
stream 创建
延迟间隔
1 2 3 4 5 6 7 8 9 10 11 12
| periodic() async { Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (val) => val); await printStream(stream); }
>> 1 2 3 4 5 6
|
futrue 数据源
1 2 3 4 5 6 7 8
| fromFuture() async { Stream<int> stream = Stream<int>.fromFuture(funi); await printStream(stream); }
>>
100
|
futrues 多数据源
1 2 3 4 5 6 7 8 9 10 11 12
| fromFutures() async { Stream<int> stream = Stream<int>.fromFutures([ funi, funii, ]); await printStream(stream); }
>>
100 200
|
stream 监听
单对单
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| listen() async { Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (val) => val); stream.listen( (event) { print(event); }, onError: (err) { print(err); }, onDone: () {}, cancelOnError: true, ); }
>>
1 2 3 4 5 6
|
广播
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| boardcast() async { Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (val) => val) .asBroadcastStream(); stream.listen((event) { print(event); }); stream.listen((event) { print(event); }); }
>>
1 1 2 2 3 3 4 4 5 5
|
操作 task skip
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| opt() async { Stream<int> stream = Stream<int>.fromIterable([1, 2, 3, 4, 5]); stream = stream.take(3);
stream.listen((event) { print(event); }); }
>> take(3) 1 2 3
>> skip(2) 3 4 5
|
StreamController 流控制类
单点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| scListen() async { StreamController sc = StreamController( onListen: () => print("onListen"), onPause: () => print("onPause"), onResume: () => print("onResume"), onCancel: () => print("onCancel"), sync: false);
StreamSubscription ss = sc.stream.listen(((event) { print(event); }));
sc.add(100);
ss.pause(); ss.resume(); ss.cancel();
sc.close(); }
>>
onListen onPause onCancel
|
广播
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| scBroadcast() async { StreamController sc = StreamController.broadcast();
StreamSubscription ss1 = sc.stream.listen(print); StreamSubscription ss2 = sc.stream.listen(print);
sc.addStream(Stream.fromIterable([1, 2, 3, 4, 5]));
await Future.delayed(Duration(seconds: 2)); sc.close(); }
>>
1 1 2 2 3 3 4 4 5 5
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| scTransformer() async { StreamController sc = StreamController<int>.broadcast();
StreamTransformer stf = StreamTransformer<int, double>.fromHandlers( handleData: (int data, EventSink sink) { sink.add((data * 2).toDouble()); }, handleError: (error, stacktrace, sink) { sink.addError('wrong: $error'); }, handleDone: (sink) { sink.close(); }, );
Stream stream = sc.stream.transform(stf); stream.listen(print); stream.listen(print);
sc.addStream(Stream<int>.fromIterable([1, 2, 3, 4, 5]));
await Future.delayed(Duration(seconds: 2)); sc.close(); }
>> 2 2 4 4 6 6 8 8 10 10
|
执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| main(List<String> args) async { print('--- start ---');
print('--- end ---'); }
|
© 猫哥
https://ducafecat.tech
https://ducafecat.gitee.io
邮箱 ducafecat@gmail.com / 微信 ducafecat / 留言板 disqus