В предыдущем параграфе мы разобрали основные концепции, связанные с асинхронностью: как устроен Event Loop, что такое Future и как устроена асинхронная работа.
В этом поговорим про Stream API, которое помогает нам осуществлять реактивное программирование в Dart. Концепция реактивного или декларативного программирования строится на асинхронных последовательностях событий.
Суть концепции можно представить так:
Есть некий генератор асинхронных событий, и есть подписчики, слушающие эти события.
В Dart такой последовательностью называется Stream.
Stream
Создать Stream можно можно несколькими способами:
-
С помощью конструктора.
1final stream = Stream.fromIterable([1, 2, 3, 4, 5]);
Про другие конструкторы можно почитать в документации
-
С помощью асинхронного генератора.
Его принцип работы такой же как и обычного генератора, только он возвращает
Stream, а неIterable1Stream<int> generateStream() async* { 2 int count = 1; 3 while(count <= 5){ 4 yield count++; 5 } 6} -
С помощью
StreamController.На этом задержимся подольше ниже, а пока пример простейшего
Stream, созданногоStreamController:1class StreamProvider { 2 final _controller = StreamController<int>(); 3 Stream get stream => _controller.stream; 4 void startPushingEvents() { 5 for (int i = 0; i < 20; i++) { 6 _controller.sink.add(i); 7 } 8 } 9 void stopPushingEvents() { 10 _controller.close(); 11 } 12}
В примере выше StreamProvider предоставляет stream, который отдает последовательность чисел от 1 до 19. Помимо stream появляется Sink и необходимость закрывать controller. Зачем это необходимо, давайте разберемся ниже.
StreamController
О нем можно думать, как о продвинутом генераторе асинхронных событий. Среди публичных членов интерфейса есть:
sink — объект, принимающий события.
stream — сам объект Stream.
В общем случае работу StreamController можно описать так:
- Кто-то в коде подписывается на объект
stream - Кто-то в коде добавляет события в
sink— ошибки, сами события или другойStream StreamControllerтут же отправляет события подписчикамstream
StreamController выглядит как самый удобный способ работы со Stream API, но его поведение отличается в зависимости от того, какой StreamController вы используете.
Предлагаю рассмотреть следующую схему:
Single-subscription StreamController
Стандартный конструктор создаст StreamController, который отдает Single-subscription Stream. Что это значит:
- У
Streamможет быть только один подписчик. - Все события складываются в буфер, так что подписчик гарантировано получит всю очередь событий, даже если подпишется сильно позже.
StreamControllerдолжен быть закрыт, если его никто не слушает, чтобы избежать утечки ресурсовStreamне должен порождать события, пока на него не подпишется кто-то.
Широковещательный StreamController
factory-конструктор StreamController.broadcast() создаст StreamController, который отдает Broadcast Stream. Что это значит:
- У
Streamможет быть больше одного подписчика. - Все события отправляются активным слушателям в момент вызова методов
add,addAllилиaddErrorу объектаSink. - У такого
StreamControllerнет буфера, так что события могут быть пропущены. StreamControllerдолжен быть закрыт, если его никто не слушает, чтобы избежать утечки ресурсовStreamне должен порождать события, пока на него не подпишется кто-то.
Давайте рассмотрим конструктор подробнее и увидим несколько коллбеков:
onListen— cрабатывает, когда кто-то подписывается наStreamвпервые.onCancel— cрабатывает, когдаStreamзакрывается и больше его никто не слуашет.onResume— cрабатывает, когдаStreamуходит с паузы.onPause— cрабатывает, когдаStreamвстает на паузу, события добавляются в буфер, но подписчик их не получает.
Последние два есть только у Single-subscription, поскольку буфер событий есть только у него.
По умолчанию каждый из конструкторов создает асинхронный StreamController. Это значит, что каждое событие проходит через EventLoop как microtask, что вызывает небольшую задержку.
У обоих конструкторов есть параметр sync , если он true, то мы получим синхронный StreamController, который стоит использовать с осторожностью.
Пример создания синхронных и асинхронных StreamController
1// Асинхронные контроллеры
2final asyncController = StreamController();
3final asyncBroadcastController = StreamController.broadcast();
4
5// Синхронные контроллеры
6final syncController = StreamController(sync: true);
7final syncBroadcastController = StreamController.broadcast(sync: true);
Давайте разберём подробнее синхронный контроллер.
SynchronousStreamController
Мы сказали, что SynchronousStreamController стоит использовать с осторожностью. Здесь мы поймём, какие преимущества он даёт, и как он может сломать контракт работы со Stream.
Вместо отправки события посредством microtask, которая вызовет задержку, событие моментально отправляется во всех активных слушателей. Так что если вы понимаете, что асинхронный StreamController работает слишком медленно, вы можете посмотреть в сторону синхронной реализации.
А теперь минусы:
- Асинхронный
Streamгарантирует, что подписчики смогут получить событие только, когда будут готовы. В синхронном случае, такой гарантии нет, и подписчик может получить событие до того, как будет готов его обработать.
Представим, что мы делаем иконку корзины, которая подписывается на источник количества товаров. Источник добавит в Stream событие «Теперь в корзине 5 товаров», но иконка ещё не закончила инициализацию и не готова слушать новые события. В синхронном случае, мы потеряем событие и не отобразим счетчик, в асинхронном же Dart гарантирует доставку события.
Streamгарантирует, что события доходят до подписчиков по порядку, и пока предыдущий вызовaddне отработал полностью, следующее событие не будет отправлено. СинхронныйStreamControllerсам не следит за этим, поэтому есть риск нарушить контрактStream.
Все повседневные задачи можно решить с помощью асинхронного StreamController, так что нужды в синхронном нет. Для более глубокого понимания синхронного StreamController советую обратиться к документации.
Отправка событий в StreamController
Выше мы рассмотрели все способы создания двух типов Stream: с одним подписчиком (single-subscription) и широковещательные (broadcast).
Если с созданием Stream через конструктор и через генератор всё ясно (события в момент подписки доходят до обработчика сразу), то для StreamController есть специальный механизм — Sink.
Предназначение Sink — точка входа для данных в потоке.
У Sink есть множество реализаций в Dart, но мы остановимся на StreamSink — класс, который может принимать данные для Stream синхронно и асинхронно (как именно — определяет реализация StreamController).
Теперь рассмотрим доступные методы Sink, для лучшего понимания, советую открыть реализацию этого класса или его сигнатуру в документации.
Доступно всего несколько методов:
Наследование от EventSink<S>:
addEvent(S event)— добавляет новое событие вStream, затемStreamControllerпередает его всем активным подписчикам.addError(Object error)— добавляет ошибку вStream. Отловить ее можно с помощьюhandleError()в месте подписки.
Наследование от StreamConsumer<S>:
addStream(Stream<S> stream)— берет все события изstreamи передает вStreamController. ВозвращаетFuture.
Наследование от Sink:
close— СообщитьSink, что новых событий для этого инстанса больше не будет. Если попытаетесь что-то добавить в закрытыйSink, вы получите ошибку.
И лишь одно свойство:
done— ЭтоFuture, которая выполнится, еслиStreamSinkзакрыт или один из методовEventSinkвыстрелил исключение.
Пока выполняется Future из addStream(), нельзя пользоваться методами EventSink. Иначе вы получите исключение.
Обработка событий Stream
Начать обработку ивентов можно двумя способами:
- Метод
listen() - Асинхронный цикл
for(await for)
Рассмотрим await for.
По сравнению с for in циклом у него три отличия:
- Перед
forнужно добавитьawait - Итерироваться он может только по событиям
Stream - Использовать его можно только в асинхронной функции
Для лучшего понимания давайте решим задачу:
👉 Нам нужно считывать весь контент файла и складывать его в одну строку.
Для этого напишем две функции, которые будут складывать строковый массив в полноценную строку для итерируемых коллекций(Iterable) и для Stream:
1String concatStrings(Iterable<String> collection) {
2 final stringBuffer = StringBuffer();
3 for (final string in collection) {
4 stringBuffer.write(string);
5 }
6 return stringBuffer.toString();
7}
8
9Future<String> conctatStreams(Stream<String> stream) async* {
10 final stringBuffer = StringBuffer();
11 await for (final string in stream) {
12 stringBuffer.write(string);
13 }
14 return stringBuffer.toString();
15}
Так, мы получили два решения одной и той же задачи, но с помощью conctatStreams() мы можем асинхронно обрабатывать контент файла и не блокировать исполнение дальнейшего кода.
Заметим, что события ошибки await for обрабатывать не умеет. В случае оной цикл просто прекратит исполнение.
Теперь рассмотрим метод listen()
Этот метод позволяет создать подписчика у Stream в любом месте.
Напоминаем, что для single-subscription Stream нельзя создать больше одного подписчика. Компилятор за этим не следит, так что вы увидите исключение только во время работы приложения.
Давайте посмотрим на пример кода с вызовом метода listen()
1StreamSubscription<T> createStreamSubcription<T>(Stream<T> stream) {
2 final streamSubscription = stream.listen(
3 _handleStreamEvent,
4 onError: _handleOnError,
5 onDone: _handleOnDone,
6 cancelOnError: true,
7 );
8 return streamSubscription;
9}
10
11void _handleStreamEvent<T>(T event) {
12 print('Got new event $event');
13}
14
15void _handleOnError(Object? error, StackTrace? stackTrace) {
16 print('Got stream error');
17}
18
19void _handleOnDone() {
20 print('Stream subscription closed');
21}
Первое, что мы видим — listen() возвращает объект StreamSubscription. Это и есть подписка. Подписку можно поставить на паузу(pause()), восстановить(resumse()) и закрыть(cancel()).
Если флаг cancelOnError == true, тогда подписка закроется после первого события ошибки.
Будьте внимательны с cancel() — закрытую подписку уже нельзя восстановить, а Stream сам за вас ее не закроет. Если забыть про это, то события могут протечь в неожиданных для вас сценариях.
В момент подписки вы можете определить несколько callback:
onData— будет вызван каждый раз, когдаStreamSubscriptionобрабатывает событиеStream.onError— будет вызван при появлении ошибки вStream. Если не определить, все ошибки будут обработаны какunhandledи отправятся в обработчик текущейзоны(о том что такое зоны мы будем говорить в параграфах ниже)onDone— будет вызван, еслиStreamзакроется и отправит об этом событие. Т.е. завершится без ошибок. Если не определить, ничего не произойдет.
Мы рассмотрели только основные способы обработки событий, в библиотеке dart:async есть еще больше методов-модификаторов, которые строятся на всем вышесказанном. А для более сложных сценариев используйте пакет rxdart.
Обработка ошибок в Stream
Выше мы рассмотрели принцип работы Stream в Dart, вы уже могли заметить, что он сильно отличается от асинхронных функций и Isolate. Так и с ошибками, обычный try/catch не отловит все исключения.
Любая ошибка в потоке Stream приведет к тому, что подписчик не получит событие и не обработает его в onData. А все необработанные исключения считаются unhandled и попадают в обработчик зоны.
Исключение может подстерегать нас в нескольких случаях:
- Один из методов-модификаторов выбрасывает исключение.
- В
Streamпоявилось событие ошибки. Кто-то явно его добавил черезaddError() - Один из коллбеков
StreamSubscriptionвыбрасывает исключение.
Рассмотрим их подробнее
Ошибка от метода-модификатора
В этом случае можно воспользоваться:
try/catchвнутри функции;handleError()ниже поStream;- Коллбек
onErrorвStreamControllerи вStreamSubscription.
Ошибка долетает в порядке выше, т.е. если handleError() поймал где-то ошибку, то StreamSubscrtiption ее не получит.
Ошибка через addError()
В этом случае:
-
handleError()ниже поStream;1Stream.periodic(const Duration(seconds: 1), (count) { 2 if (count == 2) { 3 throw Exception('Exceptional event'); 4 } 5 return count; 6}).take(4).handleError(print).forEach(print); -
Коллбек
onErrorвStreamControllerи вStreamSubscription.
Ошибка от коллбека StreamSubscription
В этом случае только try/catch, в onError исключение не попадет.
Вот мы и познакомились со Stream API и научились писать реактивный код. Советуем пройти квиз ниже, чтобы закрепить знания.
В следующем параграфе мы приступим к изучению виджетов — компонентов, с помощью которых создаётся внешний вид Flutter-приложения.
