2.8. Dart: Stream API и реактивное программирование

В предыдущем параграфе мы разобрали основные концепции, связанные с асинхронностью: как устроен Event Loop, что такое Future и как устроена асинхронная работа.

В этом поговорим про Stream API, которое помогает нам осуществлять реактивное программирование в Dart. Концепция реактивного или декларативного программирования строится на асинхронных последовательностях событий.

Суть концепции можно представить так:

fluttern

Есть некий генератор асинхронных событий, и есть подписчики, слушающие эти события.

В Dart такой последовательностью называется Stream.

Stream

Создать Stream можно можно несколькими способами:

  1. С помощью конструктора.

    1final stream = Stream.fromIterable([1, 2, 3, 4, 5]);
    

Про другие конструкторы можно почитать в документации

  1. С помощью асинхронного генератора.

    Его принцип работы такой же как и обычного генератора, только он возвращает Stream, а не Iterable

    1Stream<int> generateStream() async* {
    2  int count = 1;
    3  while(count <= 5){
    4    yield count++;
    5  }
    6}
    
  2. С помощью StreamController.

    На этом задержимся подольше ниже, а пока пример простейшего Stream, созданного StreamController:

    1class StreamProvider {
    2  final _controller = StreamController<int>();
    3  Stream get stream => _controller.stream;
    4  void startPushingEvents() {
    5    for (int i = 0; i < 20; i++) {
    6      _controller.sink.add(i);
    7    }
    8  }
    9  void stopPushingEvents() {
    10    _controller.close();
    11  }
    12}
    

В примере выше StreamProvider предоставляет stream, который отдает последовательность чисел от 1 до 19. Помимо stream появляется Sink и необходимость закрывать controller. Зачем это необходимо, давайте разберемся ниже.

StreamController

О нем можно думать, как о продвинутом генераторе асинхронных событий. Среди публичных членов интерфейса есть:

sink — объект, принимающий события.

stream — сам объект Stream.

В общем случае работу StreamController можно описать так:

  1. Кто-то в коде подписывается на объект stream
  2. Кто-то в коде добавляет события в sink — ошибки, сами события или другой Stream
  3. StreamController тут же отправляет события подписчикам stream

StreamController выглядит как самый удобный способ работы со Stream API, но его поведение отличается в зависимости от того, какой StreamController вы используете.

Предлагаю рассмотреть следующую схему:

fluttern

Single-subscription StreamController

Стандартный конструктор создаст StreamController, который отдает Single-subscription Stream. Что это значит:

  1. У Stream может быть только один подписчик.
  2. Все события складываются в буфер, так что подписчик гарантировано получит всю очередь событий, даже если подпишется сильно позже.
  3. StreamController должен быть закрыт, если его никто не слушает, чтобы избежать утечки ресурсов
  4. Stream не должен порождать события, пока на него не подпишется кто-то.

Широковещательный StreamController

factory-конструктор StreamController.broadcast() создаст StreamController, который отдает Broadcast Stream. Что это значит:

  1. У Stream может быть больше одного подписчика.
  2. Все события отправляются активным слушателям в момент вызова методов addaddAll или addError у объекта Sink.
  3. У такого StreamController нет буфера, так что события могут быть пропущены.
  4. StreamController должен быть закрыт, если его никто не слушает, чтобы избежать утечки ресурсов
  5. Stream не должен порождать события, пока на него не подпишется кто-то.

Давайте рассмотрим конструктор подробнее и увидим несколько коллбеков:

  1. onListen — cрабатывает, когда кто-то подписывается на Stream впервые.
  2. onCancel — cрабатывает, когда Stream закрывается и больше его никто не слуашет.
  3. onResume — cрабатывает, когда Stream уходит с паузы.
  4. onPause — cрабатывает, когда Stream встает на паузу, события добавляются в буфер, но подписчик их не получает.

Последние два есть только у Single-subscription, поскольку буфер событий есть только у него.

По умолчанию каждый из конструкторов создает асинхронный StreamController. Это значит, что каждое событие проходит через EventLoop как microtask, что вызывает небольшую задержку.

У обоих конструкторов есть параметр sync , если он true, то мы получим синхронный StreamController, который стоит использовать с осторожностью.

Пример создания синхронных и асинхронных StreamController

1// Асинхронные контроллеры
2final asyncController = StreamController();
3final asyncBroadcastController = StreamController.broadcast();
4
5// Синхронные контроллеры
6final syncController = StreamController(sync: true);
7final syncBroadcastController = StreamController.broadcast(sync: true);

Давайте разберём подробнее синхронный контроллер.

SynchronousStreamController

Мы сказали, что SynchronousStreamController стоит использовать с осторожностью. Здесь мы поймём, какие преимущества он даёт, и как он может сломать контракт работы со Stream.

Вместо отправки события посредством microtask, которая вызовет задержку, событие моментально отправляется во всех активных слушателей. Так что если вы понимаете, что асинхронный StreamController работает слишком медленно, вы можете посмотреть в сторону синхронной реализации.

А теперь минусы:

  1. Асинхронный Stream гарантирует, что подписчики смогут получить событие только, когда будут готовы. В синхронном случае, такой гарантии нет, и подписчик может получить событие до того, как будет готов его обработать.

Представим, что мы делаем иконку корзины, которая подписывается на источник количества товаров. Источник добавит в Stream событие «Теперь в корзине 5 товаров», но иконка ещё не закончила инициализацию и не готова слушать новые события. В синхронном случае, мы потеряем событие и не отобразим счетчик, в асинхронном же Dart гарантирует доставку события.

  1. Stream гарантирует, что события доходят до подписчиков по порядку, и пока предыдущий вызов add не отработал полностью, следующее событие не будет отправлено. Синхронный StreamController сам не следит за этим, поэтому есть риск нарушить контракт Stream.

Все повседневные задачи можно решить с помощью асинхронного StreamController, так что нужды в синхронном нет. Для более глубокого понимания синхронного StreamController советую обратиться к документации.

Отправка событий в StreamController

Выше мы рассмотрели все способы создания двух типов Stream: с одним подписчиком (single-subscription) и широковещательные (broadcast).

Если с созданием Stream через конструктор и через генератор всё ясно (события в момент подписки доходят до обработчика сразу), то для StreamController есть специальный механизм — Sink.

Предназначение Sink — точка входа для данных в потоке.

У Sink есть множество реализаций в Dart, но мы остановимся на StreamSink — класс, который может принимать данные для Stream синхронно и асинхронно (как именно — определяет реализация StreamController).

Теперь рассмотрим доступные методы Sink, для лучшего понимания, советую открыть реализацию этого класса или его сигнатуру в документации.

Доступно всего несколько методов:

Наследование от EventSink<S>:

  • addEvent(S event) — добавляет новое событие в Stream, затем StreamController передает его всем активным подписчикам.
  • addError(Object error) — добавляет ошибку в Stream. Отловить ее можно с помощьюhandleError() в месте подписки.

Наследование от StreamConsumer<S>:

  • addStream(Stream<S> stream) — берет все события из stream и передает в StreamController. Возвращает Future.

Наследование от Sink:

  • close — Сообщить Sink, что новых событий для этого инстанса больше не будет. Если попытаетесь что-то добавить в закрытый Sink , вы получите ошибку.

И лишь одно свойство:

  • done — Это Future, которая выполнится, если StreamSink закрыт или один из методов EventSink выстрелил исключение.

Пока выполняется Future из addStream(), нельзя пользоваться методами EventSink. Иначе вы получите исключение.

Обработка событий Stream

Начать обработку ивентов можно двумя способами:

  1. Метод listen()
  2. Асинхронный цикл for (await for)

Рассмотрим await for.

По сравнению с for in циклом у него три отличия:

  1. Перед for нужно добавить await
  2. Итерироваться он может только по событиям Stream
  3. Использовать его можно только в асинхронной функции

Для лучшего понимания давайте решим задачу:

👉 Нам нужно считывать весь контент файла и складывать его в одну строку.

Для этого напишем две функции, которые будут складывать строковый массив в полноценную строку для итерируемых коллекций(Iterable) и для Stream:

1String concatStrings(Iterable<String> collection) {
2  final stringBuffer = StringBuffer();
3  for (final string in collection) {
4    stringBuffer.write(string);
5  }
6  return stringBuffer.toString();
7}
8
9Future<String> conctatStreams(Stream<String> stream) async* {
10  final stringBuffer = StringBuffer();
11  await for (final string in stream) {
12    stringBuffer.write(string);
13  }
14  return stringBuffer.toString();
15}

Так, мы получили два решения одной и той же задачи, но с помощью conctatStreams() мы можем асинхронно обрабатывать контент файла и не блокировать исполнение дальнейшего кода.

Заметим, что события ошибки await for обрабатывать не умеет. В случае оной цикл просто прекратит исполнение.

Теперь рассмотрим метод listen()

Этот метод позволяет создать подписчика у Stream в любом месте.

Напоминаем, что для single-subscription Stream нельзя создать больше одного подписчика. Компилятор за этим не следит, так что вы увидите исключение только во время работы приложения.

Давайте посмотрим на пример кода с вызовом метода listen()

1StreamSubscription<T> createStreamSubcription<T>(Stream<T> stream) {
2  final streamSubscription = stream.listen(
3    _handleStreamEvent,
4    onError: _handleOnError,
5    onDone: _handleOnDone,
6    cancelOnError: true,
7  );
8  return streamSubscription;
9}
10
11void _handleStreamEvent<T>(T event) {
12  print('Got new event $event');
13}
14
15void _handleOnError(Object? error, StackTrace? stackTrace) {
16  print('Got stream error');
17}
18
19void _handleOnDone() {
20  print('Stream subscription closed');
21}

Первое, что мы видим — listen() возвращает объект StreamSubscription. Это и есть подписка. Подписку можно поставить на паузу(pause()), восстановить(resumse()) и закрыть(cancel()).

Если флаг cancelOnError == true, тогда подписка закроется после первого события ошибки.

Будьте внимательны с cancel() — закрытую подписку уже нельзя восстановить, а Stream сам за вас ее не закроет. Если забыть про это, то события могут протечь в неожиданных для вас сценариях.

В момент подписки вы можете определить несколько callback:

  1. onData — будет вызван каждый раз, когда StreamSubscription обрабатывает событие Stream.
  2. onError — будет вызван при появлении ошибки в Stream. Если не определить, все ошибки будут обработаны как unhandled и отправятся в обработчик текущей зоны (о том что такое зоны мы будем говорить в параграфах ниже)
  3. onDone — будет вызван, если Stream закроется и отправит об этом событие. Т.е. завершится без ошибок. Если не определить, ничего не произойдет.

Мы рассмотрели только основные способы обработки событий, в библиотеке dart:async есть еще больше методов-модификаторов, которые строятся на всем вышесказанном. А для более сложных сценариев используйте пакет rxdart.

Обработка ошибок в Stream

Выше мы рассмотрели принцип работы Stream в Dart, вы уже могли заметить, что он сильно отличается от асинхронных функций и Isolate. Так и с ошибками, обычный try/catch не отловит все исключения.

Любая ошибка в потоке Stream приведет к тому, что подписчик не получит событие и не обработает его в onData. А все необработанные исключения считаются unhandled и попадают в обработчик зоны.

Исключение может подстерегать нас в нескольких случаях:

  • Один из методов-модификаторов выбрасывает исключение.
  • В Stream появилось событие ошибки. Кто-то явно его добавил через addError()
  • Один из коллбеков StreamSubscription выбрасывает исключение.

Рассмотрим их подробнее

Ошибка от метода-модификатора

В этом случае можно воспользоваться:

  1. try/catch внутри функции;
  2. handleError() ниже по Stream;
  3. Коллбек onError в StreamController и в StreamSubscription.

Ошибка долетает в порядке выше, т.е. если handleError() поймал где-то ошибку, то StreamSubscrtiption ее не получит.

Ошибка через addError()

В этом случае:

  1. handleError() ниже по Stream;

    1Stream.periodic(const Duration(seconds: 1), (count) {
    2  if (count == 2) {
    3    throw Exception('Exceptional event');
    4  }
    5  return count;
    6}).take(4).handleError(print).forEach(print);
    
  2. Коллбек onError в StreamController и в StreamSubscription.

Ошибка от коллбека StreamSubscription

В этом случае только try/catch, в onError исключение не попадет.


Вот мы и познакомились со Stream API и научились писать реактивный код. Советуем пройти квиз ниже, чтобы закрепить знания.

В следующем параграфе мы приступим к изучению виджетов — компонентов, с помощью которых создаётся внешний вид Flutter-приложения.

Чтобы добавить в заметки выделенный текст, нажмите Ctrl + E

Отмечайте параграфы как прочитанные, чтобы видеть свой прогресс обучения

Вступайте в сообщество хендбука

Здесь можно найти единомышленников, экспертов и просто интересных собеседников. А ещё — получить помощь или поделиться знаниями.
Вступить
Сообщить об ошибке
Предыдущий параграф2.7. Dart: асинхронность
Следующий параграф2.9. Виджеты: основы