В предыдущем параграфе мы разобрали основные концепции, связанные с асинхронностью: как устроен Event Loop
, что такое Future
и как устроена асинхронная работа.
В этом поговорим про Stream API, которое помогает нам осуществлять реактивное программирование в Dart. Концепция реактивного или декларативного программирования строится на асинхронных последовательностях событий.
Суть концепции можно представить так:
Есть некий генератор асинхронных событий, и есть подписчики, слушающие эти события.
В Dart такой последовательностью называется Stream
.
Stream
Создать Stream
можно можно несколькими способами:
-
С помощью конструктора.
1final stream = Stream.fromIterable([1, 2, 3, 4, 5]);
Про другие конструкторы можно почитать в документации
-
С помощью асинхронного генератора.
Его принцип работы такой же как и обычного генератора, только он возвращает
Stream
, а неIterable
1Stream<int> generateStream() async* { 2 int count = 1; 3 while(count <= 5){ 4 yield count++; 5 } 6}
-
С помощью
StreamController
.На этом задержимся подольше ниже, а пока пример простейшего
Stream
, созданногоStreamController
:1class StreamProvider { 2 final _controller = StreamController<int>(); 3 Stream get stream => _controller.stream; 4 void startPushingEvents() { 5 for (int i = 0; i < 20; i++) { 6 _controller.sink.add(i); 7 } 8 } 9 void stopPushingEvents() { 10 _controller.close(); 11 } 12}
В примере выше StreamProvider
предоставляет stream
, который отдает последовательность чисел от 1 до 19. Помимо stream
появляется Sink
и необходимость закрывать controller
. Зачем это необходимо, давайте разберемся ниже.
StreamController
О нем можно думать, как о продвинутом генераторе асинхронных событий. Среди публичных членов интерфейса есть:
sink
— объект, принимающий события.
stream
— сам объект Stream
.
В общем случае работу StreamController
можно описать так:
- Кто-то в коде подписывается на объект
stream
- Кто-то в коде добавляет события в
sink
— ошибки, сами события или другойStream
StreamController
тут же отправляет события подписчикамstream
StreamController
выглядит как самый удобный способ работы со Stream API
, но его поведение отличается в зависимости от того, какой StreamController
вы используете.
Предлагаю рассмотреть следующую схему:
Single-subscription StreamController
Стандартный конструктор создаст StreamController
, который отдает Single-subscription Stream
. Что это значит:
- У
Stream
может быть только один подписчик. - Все события складываются в буфер, так что подписчик гарантировано получит всю очередь событий, даже если подпишется сильно позже.
StreamController
должен быть закрыт, если его никто не слушает, чтобы избежать утечки ресурсовStream
не должен порождать события, пока на него не подпишется кто-то.
Широковещательный StreamController
factory
-конструктор StreamController.broadcast()
создаст StreamController
, который отдает Broadcast Stream
. Что это значит:
- У
Stream
может быть больше одного подписчика. - Все события отправляются активным слушателям в момент вызова методов
add
,addAll
илиaddError
у объектаSink
. - У такого
StreamController
нет буфера, так что события могут быть пропущены. StreamController
должен быть закрыт, если его никто не слушает, чтобы избежать утечки ресурсовStream
не должен порождать события, пока на него не подпишется кто-то.
Давайте рассмотрим конструктор подробнее и увидим несколько коллбеков:
onListen
— cрабатывает, когда кто-то подписывается наStream
впервые.onCancel
— cрабатывает, когдаStream
закрывается и больше его никто не слуашет.onResume
— cрабатывает, когдаStream
уходит с паузы.onPause
— cрабатывает, когдаStream
встает на паузу, события добавляются в буфер, но подписчик их не получает.
Последние два есть только у Single-subscription
, поскольку буфер событий есть только у него.
По умолчанию каждый из конструкторов создает асинхронный StreamController
. Это значит, что каждое событие проходит через EventLoop
как microtask
, что вызывает небольшую задержку.
У обоих конструкторов есть параметр sync
, если он true
, то мы получим синхронный StreamController
, который стоит использовать с осторожностью.
Пример создания синхронных и асинхронных StreamController
1// Асинхронные контроллеры
2final asyncController = StreamController();
3final asyncBroadcastController = StreamController.broadcast();
4
5// Синхронные контроллеры
6final syncController = StreamController(sync: true);
7final syncBroadcastController = StreamController.broadcast(sync: true);
Давайте разберём подробнее синхронный контроллер.
SynchronousStreamController
Мы сказали, что SynchronousStreamController
стоит использовать с осторожностью. Здесь мы поймём, какие преимущества он даёт, и как он может сломать контракт работы со Stream
.
Вместо отправки события посредством microtask
, которая вызовет задержку, событие моментально отправляется во всех активных слушателей. Так что если вы понимаете, что асинхронный StreamController
работает слишком медленно, вы можете посмотреть в сторону синхронной реализации.
А теперь минусы:
- Асинхронный
Stream
гарантирует, что подписчики смогут получить событие только, когда будут готовы. В синхронном случае, такой гарантии нет, и подписчик может получить событие до того, как будет готов его обработать.
Представим, что мы делаем иконку корзины, которая подписывается на источник количества товаров. Источник добавит в Stream
событие «Теперь в корзине 5 товаров», но иконка ещё не закончила инициализацию и не готова слушать новые события. В синхронном случае, мы потеряем событие и не отобразим счетчик, в асинхронном же Dart гарантирует доставку события.
Stream
гарантирует, что события доходят до подписчиков по порядку, и пока предыдущий вызовadd
не отработал полностью, следующее событие не будет отправлено. СинхронныйStreamController
сам не следит за этим, поэтому есть риск нарушить контрактStream
.
Все повседневные задачи можно решить с помощью асинхронного StreamController
, так что нужды в синхронном нет. Для более глубокого понимания синхронного StreamController
советую обратиться к документации.
Отправка событий в StreamController
Выше мы рассмотрели все способы создания двух типов Stream
: с одним подписчиком (single-subscription
) и широковещательные (broadcast
).
Если с созданием Stream
через конструктор и через генератор всё ясно (события в момент подписки доходят до обработчика сразу), то для StreamController
есть специальный механизм — Sink
.
Предназначение Sink
— точка входа для данных в потоке.
У Sink
есть множество реализаций в Dart, но мы остановимся на StreamSink
— класс, который может принимать данные для Stream синхронно и асинхронно (как именно — определяет реализация StreamController
).
Теперь рассмотрим доступные методы Sink
, для лучшего понимания, советую открыть реализацию этого класса или его сигнатуру в документации.
Доступно всего несколько методов:
Наследование от EventSink<S>
:
addEvent(S event)
— добавляет новое событие вStream
, затемStreamController
передает его всем активным подписчикам.addError(Object error)
— добавляет ошибку вStream
. Отловить ее можно с помощьюhandleError()
в месте подписки.
Наследование от StreamConsumer<S>
:
addStream(Stream<S> stream)
— берет все события изstream
и передает вStreamController
. ВозвращаетFuture
.
Наследование от Sink
:
close
— СообщитьSink
, что новых событий для этого инстанса больше не будет. Если попытаетесь что-то добавить в закрытыйSink
, вы получите ошибку.
И лишь одно свойство:
done
— ЭтоFuture
, которая выполнится, еслиStreamSink
закрыт или один из методовEventSink
выстрелил исключение.
Пока выполняется Future
из addStream()
, нельзя пользоваться методами EventSink
. Иначе вы получите исключение.
Обработка событий Stream
Начать обработку ивентов можно двумя способами:
- Метод
listen()
- Асинхронный цикл
for
(await for
)
Рассмотрим await for
.
По сравнению с for in
циклом у него три отличия:
- Перед
for
нужно добавитьawait
- Итерироваться он может только по событиям
Stream
- Использовать его можно только в асинхронной функции
Для лучшего понимания давайте решим задачу:
👉 Нам нужно считывать весь контент файла и складывать его в одну строку.
Для этого напишем две функции, которые будут складывать строковый массив в полноценную строку для итерируемых коллекций(Iterable
) и для Stream
:
1String concatStrings(Iterable<String> collection) {
2 final stringBuffer = StringBuffer();
3 for (final string in collection) {
4 stringBuffer.write(string);
5 }
6 return stringBuffer.toString();
7}
8
9Future<String> conctatStreams(Stream<String> stream) async* {
10 final stringBuffer = StringBuffer();
11 await for (final string in stream) {
12 stringBuffer.write(string);
13 }
14 return stringBuffer.toString();
15}
Так, мы получили два решения одной и той же задачи, но с помощью conctatStreams()
мы можем асинхронно обрабатывать контент файла и не блокировать исполнение дальнейшего кода.
Заметим, что события ошибки await for
обрабатывать не умеет. В случае оной цикл просто прекратит исполнение.
Теперь рассмотрим метод listen()
Этот метод позволяет создать подписчика у Stream в любом месте.
Напоминаем, что для single-subscription Stream
нельзя создать больше одного подписчика. Компилятор за этим не следит, так что вы увидите исключение только во время работы приложения.
Давайте посмотрим на пример кода с вызовом метода listen()
1StreamSubscription<T> createStreamSubcription<T>(Stream<T> stream) {
2 final streamSubscription = stream.listen(
3 _handleStreamEvent,
4 onError: _handleOnError,
5 onDone: _handleOnDone,
6 cancelOnError: true,
7 );
8 return streamSubscription;
9}
10
11void _handleStreamEvent<T>(T event) {
12 print('Got new event $event');
13}
14
15void _handleOnError(Object? error, StackTrace? stackTrace) {
16 print('Got stream error');
17}
18
19void _handleOnDone() {
20 print('Stream subscription closed');
21}
Первое, что мы видим — listen()
возвращает объект StreamSubscription
. Это и есть подписка. Подписку можно поставить на паузу(pause()
), восстановить(resumse()
) и закрыть(cancel()
).
Если флаг cancelOnError == true
, тогда подписка закроется после первого события ошибки.
Будьте внимательны с cancel()
— закрытую подписку уже нельзя восстановить, а Stream
сам за вас ее не закроет. Если забыть про это, то события могут протечь в неожиданных для вас сценариях.
В момент подписки вы можете определить несколько callback:
onData
— будет вызван каждый раз, когдаStreamSubscription
обрабатывает событиеStream
.onError
— будет вызван при появлении ошибки вStream
. Если не определить, все ошибки будут обработаны какunhandled
и отправятся в обработчик текущейзоны
(о том что такое зоны мы будем говорить в параграфах ниже)onDone
— будет вызван, еслиStream
закроется и отправит об этом событие. Т.е. завершится без ошибок. Если не определить, ничего не произойдет.
Мы рассмотрели только основные способы обработки событий, в библиотеке dart:async есть еще больше методов-модификаторов, которые строятся на всем вышесказанном. А для более сложных сценариев используйте пакет rxdart.
Обработка ошибок в Stream
Выше мы рассмотрели принцип работы Stream
в Dart, вы уже могли заметить, что он сильно отличается от асинхронных функций и Isolate
. Так и с ошибками, обычный try/catch
не отловит все исключения.
Любая ошибка в потоке Stream
приведет к тому, что подписчик не получит событие и не обработает его в onData
. А все необработанные исключения считаются unhandled и попадают в обработчик зоны
.
Исключение может подстерегать нас в нескольких случаях:
- Один из методов-модификаторов выбрасывает исключение.
- В
Stream
появилось событие ошибки. Кто-то явно его добавил черезaddError()
- Один из коллбеков
StreamSubscription
выбрасывает исключение.
Рассмотрим их подробнее
Ошибка от метода-модификатора
В этом случае можно воспользоваться:
try/catch
внутри функции;handleError()
ниже поStream
;- Коллбек
onError
вStreamController
и вStreamSubscription
.
Ошибка долетает в порядке выше, т.е. если handleError()
поймал где-то ошибку, то StreamSubscrtiption
ее не получит.
Ошибка через addError()
В этом случае:
-
handleError()
ниже поStream
;1Stream.periodic(const Duration(seconds: 1), (count) { 2 if (count == 2) { 3 throw Exception('Exceptional event'); 4 } 5 return count; 6}).take(4).handleError(print).forEach(print);
-
Коллбек
onError
вStreamController
и вStreamSubscription
.
Ошибка от коллбека StreamSubscription
В этом случае только try/catch
, в onError
исключение не попадет.
Вот мы и познакомились со Stream API
и научились писать реактивный код. Советуем пройти квиз ниже, чтобы закрепить знания.
В следующем параграфе мы приступим к изучению виджетов — компонентов, с помощью которых создаётся внешний вид Flutter-приложения.