Contents

Creating streams in Dart

由Lasse Nielsen撰写
2013年4月(2018年10月更新)

dart:async库包含对许多Dart API很重要的两种类型: StreamFuture. 如果Future表示单个计算的结果,则流是一系列结果. 您侦听流以获取有关结果(数据和错误)以及流关闭的通知. 您还可以在收听流时暂停播放或在流完成之前停止收听流.

但是本文与使用流无关. 这是关于创建自己的流的. 您可以通过以下几种方式创建流:

  • Transforming existing streams.
  • 通过使用async*函数从头开始创建流.
  • 通过使用StreamController创建流.

本文介绍了每种方法的代码,并提供了有助于您正确实现流的技巧.

有关使用流的帮助,请参见" 异步编程:流" .

Transforming an existing stream

创建流的常见情况是您已经有一个流,并且您想要基于原始流的事件创建一个新的流. 例如,您可能要通过UTF-8解码输入来将字节流转换为字符串流. 最通用的方法是创建一个新流,该流在原始流上等待事件,然后输出新事件. 例:

/// Splits a stream of consecutive strings into lines.
///
/// The input string is provided in smaller chunks through
/// the `source` stream.
Stream<String> lines(Stream<String> source) async* {
  // Stores any partial line from the previous chunk.
  var partial = '';
  // Wait until a new chunk is available, then process it.
  await for (var chunk in source) {
    var lines = chunk.split('\n');
    lines[0] = partial + lines[0]; // Prepend partial line.
    partial = lines.removeLast(); // Remove new partial line.
    for (var line in lines) {
      yield line; // Add lines to output stream.
    }
  }
  // Add final partial line to output stream, if any.
  if (partial.isNotEmpty) yield partial;
}

对于许多常见的转换,您可以使用Stream提供的转换方法,例如map()where()expand()take() .

例如,假设您有一个流counterStream ,它counterStream发出一个递增的计数器. 实施方法如下:

var counterStream =
    Stream<int>.periodic(Duration(seconds: 1), (x) => x).take(15);

要快速查看事件,可以使用如下代码:

counterStream.forEach(print); // Print an integer every second, 15 times.

要转换流事件,可以在侦听流之前在流上调用诸如map()类的转换方法. 该方法返回一个新的流.

// Double the integer in each event.
var doubleCounterStream = counterStream.map((int x) => x * 2);
doubleCounterStream.forEach(print);

除了map() ,您可以使用任何其他转换方法,例如:

.where((int x) => x.isEven) // Retain only even integer events.
.expand((var x) => [x, x]) // Duplicate each event.
.take(5) // Stop after the first five events.

通常,您只需要一种转换方法. 但是,如果您需要对转换进行更多控制,则可以使用Streamtransform()方法指定StreamTransformer . 平台库为许多常见任务提供了流转换器. 例如,以下代码使用dart:convert库提供的utf8.decoderLineSplitter转换器.

Stream<List<int>> content = File('someFile.txt').openRead();
List<String> lines =
    await content.transform(utf8.decoder).transform(LineSplitter()).toList();

Creating a stream from scratch

创建新流的一种方法是使用异步生成器( async* )函数. 在调用该函数时创建该流,并且在侦听该流时该函数的主体开始运行. 函数返回时,流关闭. 在函数返回之前,它可以使用yieldyield*语句在流上发出事件.

这是一个原始示例,该示例定期发出数字:

Stream<int> timedCounter(Duration interval, [int maxCount]) async* {
  int i = 0;
  while (true) {
    await Future.delayed(interval);
    yield i++;
    if (i == maxCount) break;
  }
}

此函数返回Stream . 当收听该流时,主体开始运行. 它反复延迟请求的时间间隔,然后产生下一个数字. 如果忽略count参数,则循环上没有停止条件,因此流永远输出越来越大的数字-或直到侦听器取消其订阅为止.

当侦听器取消时(通过在listen()方法返回的StreamSubscription对象上调用cancel() ),则主体下一次到达yield语句时, yield充当return语句. 执行所有封闭的finally块,然后函数退出. 如果函数尝试在退出前产生一个值,则该操作将失败并充当返回值.

当函数最终退出时,由cancel()方法返回的将来完成. 如果函数以错误退出,则将来会以该错误结束; 否则,它以null .

另一个更有用的示例是将期货序列转换为流的函数:

Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {
  for (var future in futures) {
    var result = await future;
    yield result;
  }
}

此函数要求可迭代的futures有新的futures ,等待该期货,发出结果值,然后循环. 如果将来因错误而完成,那么流将因该错误而完成.

很少有async*函数async*构建流. 它需要从某个地方获取数据,并且通常是某个地方是另一个流. 在某些情况下,例如上述期货序列,数据来自其他异步事件源. 但是,在许多情况下, async*函数过于简单,无法轻松处理多个数据源. 那就是StreamController类的所在.

Using a StreamController

如果流的事件来自程序的不同部分,而不仅仅是来自async函数可以遍历的流或期货,则使用StreamController创建并填充流.

StreamController为您提供了一个新的流,并提供了一种随时随地将事件添加到该流的方法. 流具有处理侦听器和暂停所需的所有逻辑. 您返回流并将控制器留给您自己.

下面的示例(来自stream_controller_bad.dart )显示了一个基本的但有缺陷的用法,该用法使用StreamController来实现前面示例中的timedCounter()函数. 此代码创建要返回的流,然后根据计时器事件(既不是期货也不是流事件)将数据馈入其中.

// NOTE: This implementation is FLAWED!
// It starts before it has subscribers, and it doesn't implement pause.
Stream<int> timedCounter(Duration interval, [int maxCount]) {
  var controller = StreamController<int>();
  int counter = 0;
  void tick(Timer timer) {
    counter++;
    controller.add(counter); // Ask stream to send counter values as event.
    if (maxCount != null && counter >= maxCount) {
      timer.cancel();
      controller.close(); // Ask stream to shut down and tell listeners.
    }
  }

  Timer.periodic(interval, tick); // BAD: Starts before it has subscribers.
  return controller.stream;
}

和以前一样,您可以像这样使用timedCounter()返回的流:

var counterStream = timedCounter(const Duration(seconds: 1), 15);
counterStream.listen(print); // Print an integer every second, 15 times.

timedCounter()此实现有两个问题:

  • 它在拥有订阅者之前就开始产生事件.
  • 即使订户请求暂停,它也会继续产生事件.

如下一节所示,您可以通过在创建StreamController时指定诸如onListenonPause回调来解决这两个问题.

Waiting for a subscription

通常,流应在开始工作之前等待订阅者. async*函数会自动执行此操作,但是使用StreamController ,您可以完全控制并可以添加事件,即使您不应该这样做也可以. 当流没有订阅者时,其StreamController缓冲事件,如果该流从未获得订阅者,则可能导致内存泄漏.

尝试将使用流的代码更改为以下内容:

void listenAfterDelay() async {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  await Future.delayed(const Duration(seconds: 5));

  // After 5 seconds, add a listener.
  await for (int n in counterStream) {
    print(n); // Print an integer every second, 15 times.
  }
}

运行此代码后,尽管流正在工作,但在开始的5秒钟内什么都不会打印. 然后,添加侦听器,并一次打印前5个左右的事件,因为它们由StreamController缓冲.

要获得订阅通知,请在创建StreamController时指定一个onListen参数. 当流获得其第一个订阅者时,将调用onListen回调. 如果指定onCancel回调,则在控制器丢失其最后一个订阅者时调用该回调. 在前面的示例中, Timer.periodic()应该移至onListen处理程序,如下一节所示.

Honoring the pause state

当侦听器请求暂停时,避免产生事件. 流订阅被暂停时, async*函数会自动在yield语句处暂停. 另一方面, StreamController在暂停期间缓冲事件. 如果提供事件的代码不考虑暂停,则缓冲区的大小可能会无限期增加. 另外,如果侦听器在暂停后立即停止侦听,则浪费了创建缓冲区所花费的工作.

若要查看没有暂停支持的情况,请尝试将使用流的代码更改为以下内容:

void listenWithPause() {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  StreamSubscription<int> subscription;

  subscription = counterStream.listen((int counter) {
    print(counter); // Print an integer every second.
    if (counter == 5) {
      // After 5 ticks, pause for five seconds, then resume.
      subscription.pause(Future.delayed(const Duration(seconds: 5)));
    }
  });
}

暂停五秒钟后,在此期间触发的事件将立即全部收到. 之所以会发生这种情况,是因为流的源不支持暂停,而是会继续向流中添加事件. 因此,流缓冲事件,然后在流变得不暂停时清空其缓冲区.

以下版本的timedCounter() (来自stream_controller.dart )通过在StreamController上使用onListenonPauseonResumeonCancel回调实现暂停.

Stream<int> timedCounter(Duration interval, [int maxCount]) {
  StreamController<int> controller;
  Timer timer;
  int counter = 0;

  void tick(_) {
    counter++;
    controller.add(counter); // Ask stream to send counter values as event.
    if (counter == maxCount) {
      timer.cancel();
      controller.close(); // Ask stream to shut down and tell listeners.
    }
  }

  void startTimer() {
    timer = Timer.periodic(interval, tick);
  }

  void stopTimer() {
    if (timer != null) {
      timer.cancel();
      timer = null;
    }
  }

  controller = StreamController<int>(
      onListen: startTimer,
      onPause: stopTimer,
      onResume: startTimer,
      onCancel: stopTimer);

  return controller.stream;
}

使用上面的listenWithPause()函数运行此代码. 您会看到它在暂停时停止计数,然后恢复正常.

您必须使用所有listeners-的onListenonCancelonPauseonResume -to通知处于暂停状态的变化. 其原因是,如果申购,暂停状态中,同时这两个变化,只有onListenonCancel回调被调用.

Final hints

在不使用async *函数的情况下创建流时,请牢记以下提示:

  • 使用同步控制器时要小心,例如,使用StreamController(sync: true)创建的控制器. 当您在未暂停的同步控制器上发送事件时(例如,使用EventSink定义的add()addError()close()方法),该事件将立即发送到流中的所有侦听器. 在添加了侦听器的代码完全返回之前,决不能调用Stream侦听器,并且在错误的时间使用同步控制器可能会破坏此承诺并导致良好的代码失败. 避免使用同步控制器.

  • 如果使用StreamController ,则在listen调用返回StreamSubscription之前,将调用onListen回调. 不要让onListen回调依赖于已经存在的订阅. 例如,在下面的代码中,在subscription变量具有有效值之前,将触发onListen事件(并调用handler ).

    subscription = stream.listen(handler);
  • 当流的侦听器状态更改时,由StreamController调用StreamController定义的onListenonPauseonResumeonCancel回调,但是在事件触发时或在另一个状态更改处理程序的调用期间都不会调用该方法. 在这些情况下,状态更改回调将延迟到上一个回调完成之前.

  • 不要尝试自己实现Stream接口. 很容易获得事件,回调以及添加和删除侦听器之间的交互,这是非常错误的. 始终使用可能来自StreamController的现有流来实现新流的listen调用.

  • 尽管可以通过扩展Stream类并在顶部实现listen方法和其他功能来创建扩展Stream的类,但通常不建议这样做,因为它引入了用户必须考虑的新类型. 相反,你可以经常做, 一个一类Stream (及以上) -而不是一个流(及以上).

by  ICOPY.SITE