Contents

Asynchronous programming: streams

What's the point?

  • 流提供异步的数据序列.
  • 数据序列包括用户生成的事件和从文件读取的数据.
  • 您可以使用Stream API中的await forlisten()处理流.
  • 流提供了一种响应错误的方法.
  • 流有两种:单一订阅或广播.

Dart中的异步编程的特征是FutureStream类.

未来表示无法立即完成的计算. 在普通函数返回结果的地方,异步函数返回Future,最终将包含结果. 将来会告诉您结果准备就绪的时间.

流是一系列异步事件. 这就像一个异步的Iterable,在那里,流告诉您准备就绪时有一个事件,而不是在您请求时获取下一个事件.

Receiving stream events

流可以以多种方式创建,这是另一篇文章的主题,但是它们都可以以相同的方式使用: 异步for循环 (通常仅称为await for )遍历流的事件,如for循环迭代遍历. 例如:

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (var value in stream) {
    sum += value;
  }
  return sum;
}

此代码仅接收整数事件流中的每个事件,将它们相加,然后返回(和)其和. 循环主体结束时,函数将暂停,直到下一个事件到达或流完成为止.

该函数标记有async关键字,在使用await for循环时需要此关键字.

以下示例通过使用async*函数生成简单的整数流来测试前面的代码:

Error events

当流中没有更多事件时,就完成流,并通知接收事件的代码,就像通知新事件到达一样. 使用await for循环读取事件时,流完成后循环停止.

在某些情况下,流完成之前会发生错误; 可能是网络从远程服务器上获取文件时失败了,或者创建事件的代码存在bug,但是有些人需要了解它.

流还可以传递错误事件,就像传递数据事件一样. 大多数流将在出现第一个错误后停止,但是有可能传递多个错误,并且在发生错误事件后传递更多数据. 在本文档中,我们仅讨论传递最多一个错误的流.

当使用await for读取流时,循环语句会引发错误. 这也结束了循环. 您可以使用try-catch捕获错误. 下面的示例在循环迭代器等于4时引发错误:

Working with streams

Stream类包含许多辅助方法,这些方法可以为您对流执行常见操作,类似于Iterable上的方法. 例如,您可以使用Stream API中的lastWhere()查找流中的最后一个正整数.

Future<int> lastPositive(Stream<int> stream) =>
    stream.lastWhere((x) => x >= 0);

Two kinds of streams

流有两种.

Single subscription streams

最常见的流包含一系列事件,这些事件是较大整体的一部分. 必须以正确的顺序交付事件,并且不要错过任何事件. 这是您在读取文件或接收Web请求时获得的流.

这样的流只能被收听一次. 稍后再次收听可能意味着错过了最初的事件,然后其余部分毫无意义. 当您开始收听时,数据将被提取并以块的形式提供.

Broadcast streams

另一种流是针对可以一次处理的单个消息的. 例如,这种流可用于浏览器中的鼠标事件.

您可以随时开始收听这样的流,并且在收听时会触发事件. 多个收听者可以同时收听,并且您可以在取消上一个订阅之后稍后再次收听.

Methods that process a stream

Stream <T>上的以下方法处理流并返回结果:

Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object needle);
Future<E> drain<E>([E futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function() orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = ""]);
Future<T> lastWhere(bool Function(T element) test, {T Function() orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function() orElse});
Future<List<T>> toList();
Future<Set<T>> toSet();

所有这些函数( drain()pipe()除外drain()Iterable上的类似函数相对应. 通过使用带有await for循环的async函数(或仅使用其他方法之一),可以轻松编写每个async . 例如,一些实现可能是:

Future<bool> contains(Object needle) async {
  await for (var event in this) {
    if (event == needle) return true;
  }
  return false;
}

Future forEach(void Function(T element) action) async {
  await for (var event in this) {
    action(event);
  }
}

Future<List<T>> toList() async {
  final result = <T>[];
  await this.forEach(result.add);
  return result;
}

Future<String> join([String separator = ""]) async =>
    (await this.toList()).join(separator);

(实际实现稍微复杂一些,但主要是出于历史原因.)

Methods that modify a stream

Stream上的以下方法基于原始流返回新的流. 每个人都等到有人收听新的视频流,然后再收听原始视频.

Stream<R> cast<R>();
Stream<S> expand<S>(Iterable<S> Function(T element) convert);
Stream<S> map<S>(S Function(T event) convert);
Stream<T> skip(int count);
Stream<T> skipWhile(bool Function(T element) test);
Stream<T> take(int count);
Stream<T> takeWhile(bool Function(T element) test);
Stream<T> where(bool Function(T event) test);

前面的方法对应于Iterable上的类似方法,该方法将一个Iterable转换为另一个Iterable . 所有这些都可以使用带有等待循环的async函数轻松编写.

Stream<E> asyncExpand<E>(Stream<E> Function(T event) convert);
Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert);
Stream<T> distinct([bool Function(T previous, T next) equals]);

asyncExpand()asyncMap()函数类似于expand()map() ,但允许它们的函数参数为异步函数. 在Iterable上不存在distinct()函数,但可能存在.

Stream<T> handleError(Function onError, {bool test(error)});
Stream<T> timeout(Duration timeLimit,
    {void Function(EventSink<T> sink) onTimeout});
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer);

最后三个功能比较特殊. 它们涉及等待循环无法完成的错误处理-到达循环的第一个错误将结束循环及其在流上的订阅. 无法从中恢复. 您可以使用handleError()从流中删除错误,然后在等待循环中使用它.

The transform() function

transform()函数不仅用于错误处理. 它是流的更通用的"地图". 正常映射需要为每个传入事件提供一个值. 但是,特别是对于I / O流,可能需要几个传入事件才能产生输出事件. StreamTransformer可以使用它. 例如,像Utf8Decoder这样的解码器就是变压器. 转换器仅需要一个函数bind() ,即可通过async函数轻松实现.

Stream<S> mapLogErrors<S, T>(
  Stream<T> stream,
  S Function(T event) convert,
) async* {
  var streamWithoutErrors = stream.handleError((e) => log(e));
  await for (var event in streamWithoutErrors) {
    yield convert(event);
  }
}

Reading and decoding a file

以下代码读取一个文件,并对流执行两次转换. 它首先转换来自UTF8的数据,然后通过LineSplitter运行它. 打印所有行,除了以#开头的行除外.

import 'dart:convert';
import 'dart:io';

Future<void> main(List<String> args) async {
  var file = File(args[0]);
  var lines = utf8.decoder
      .bind(file.openRead())
      .transform(LineSplitter());
  await for (var line in lines) {
    if (!line.startsWith('#')) print(line);
  }
}

The listen() method

Stream的最终方法是listen() . 这是一种"低级"方法,所有其他流函数都是根据listen()定义的.

StreamSubscription<T> listen(void Function(T event) onData,
    {Function onError, void Function() onDone, bool cancelOnError});

要创建新的Stream类型,你可以扩展Stream类并实现listen()方法,所有其他方法Stream调用listen()以工作.

listen()方法允许您开始在流上侦听. 在执行此操作之前,流是一个惰性对象,用于描述您要查看的事件. 当您侦听时,将返回一个StreamSubscription对象,该对象表示活动流产生事件. 这类似于Iterable只是对象集合的方式,但是iterator是进行实际迭代的对象.

流订阅使您可以暂停订阅,在暂停后继续订阅,然后完全取消订阅. 您可以设置每个数据事件或错误事件以及流关闭时要调用的回调.

Other resources

Read the following documentation for more details on using streams and asynchronous programming in Dart.

by  ICOPY.SITE