2.6. Dart: concurrency, изоляты

Во время работы над приложением вам может понадобится решать одну из следующих задач:

  1. Работа с базой данных
  2. Общение с бекендом
  3. Фоновая загрузка данных
  4. Работа с файловой системой
  5. Работа с нативной платформой (батарея телефона, GPS)
  6. Управление состоянием экранов и приложения

Всё перечисленное выше — задачи, которые не выйдет решить эффективно, если писать императивный синхронный код. А любая тяжёлая операция, например, парсинг объемного JSON, приведёт к зависаниям приложения.

В этом параграфе мы разберем инструменты в Dart, которые помогут вам решать такие задачи эффективно.

Но сперва давайте определимся с ключевыми тезисами:

  1. В Dart есть Isolates (изоляты) — это аналоги Threads (потоков).
  2. Весь код на Dart исполняется по умолчанию в рамках одного изолята.
  3. В Dart нет параллелизма в общепринятом понимании.
  4. Внутри каждого изолята есть Event Loop — механизм неблокирующего выполнения кода.
  5. В Dart есть настоящая асинхронность.
  6. В Dart есть способ писать реактивный код — Stream API.

Мы подразумеваем, что вы уже знакомы с такими понятиями как асинхронность, параллелизм и конкурентность. Так что на объяснении этих определений мы не будем останавливаться. А вот их реализацию в Dart мы разберём подробно.

Event Loop

Event Loop — это механизм обработки событий, который позволяет писать неблокирующий код. В этом блоке пойдёт много теории, но ее стоит понять, потому что все инструменты, рассматриваемые в статье, строятся на приницпах работы Event Loop.

Его принцип работы можно представить в виде следующей схемы:

fluttern

Или, если кому-то удобнее читать код, то вот пример:

bool programWorks = false;

void main(){
	startEventLoop();
	// Ваша программа
	// ...
}

void startEventLoop(){
	programWorks = true;
	while(programWorks){
		eventLoopIteration();
	}
}

void stopEventLoop(){
  programWorks = false;
	while(programWorks && !eventLoopQueueIsEmpty()){
		eventLoopIteration();
	}
  disposeProgram();
}

За итерацию EventLoop делает примерно следующее:

void eventLoopIteration() {
  if (microtaskQueue.isNotEmpty()) {
    fetchFirstMicroTaskFromQueue();
    executeThisMicroTask();
    return;
  }

  if (eventQueue.isNotEmpty()) {
    fetchFirstEventFromQueue();
    handleThisEvent();
  }
}
  1. Программа запускается
  2. EventLoop разбирает всю очередь микрозадач (Microtask Queue) — в FIFO-порядке.
  3. Затем переходит к очереди событий (Event Queue) и обрабатывает следующее событие из очереди.
  4. Затем цикл переходит к началу — п.2
  5. Как только очередь событий опустеет, программа может завершать работу

Выше мы узнали два новых понятия: Microtask и Event. Напрямую при написании кода мы с ними не взаимодействуем, так что давайте разберёмся, какие наши действия порождают Event и Microtask.

Microtask

Это очень маленькие и дешёвые по ресурсам операции, которые должны быть выполнены асинхронно и как можно скорее.

Создать Microtask можно с помощью функции scheduleMicrotask(), а использовать при инициализации очень важного кода

class MyVeryImportantManager{

	void init(){
		scheduleMicrotask((){
			// important code
		});
	}
}

Здесь складывается впечатление, что мы можем гибко настраивать приоритет для нужных нам операций. И это верно, публичное API Dart позволяет нам, но не стоит этим злоупотреблять. Если в коде вперемешкуFuture (о нём мы поговорим ниже), и Microtask, он менее безопасен:

  • Нет гарантии, что Future и Microtask всегда отработают в порядке, который вы ожидаете.
  • Весь код, отвечающий за отрисовку кадра — это Events из EventQueue. Длинная очередь Microtask заблокирует обработку EventQueue , а пользователь, в лучшем случае, увидит пропуски кадров, в худшем, зависший UI.

Например, в исходных кодах Flutter не так много применений scheduleMicrotask() , а используются они разработчиками для очень маленьких операций, в критичных для плавного UI местах.

Event

Это какое-то внешнее событие, которое попадает в Event Queue и обрабатывается Event Loop.

fluttern

Например:

  • операции ввода/вывода
  • жесты
  • рисование
  • таймеры
  • ...
  • Future

Напрямую добавляет новые события в Event Queue мы не можем, исключение — Future API. О них мы и поговорим ниже.

Future

Future — это отложенная операция, которая завершится с неким результатом через время.

Это может пригодиться во множестве кейсов: например, нам нужно отправить запрос на сервер, а пока он выполняется, мы хотим выводить время выполнения запроса. Мы попросту не сможем решить эту задачу без неблокирующего кода в языке.

void serverRequest(){
	print('Start loading');
	var seconds = 0;
	Timer? timer;
  Future(getServerData).then((result){
		timer?.cancel();
		print(result);
	});
	// Код ниже будет выполняться уже после старта запроса
	timer = Timer.periodic(Duration(seconds:1), (_) {
		print('Seconds elapsed: ${seconds++}');
	});
}

Future<String> getServerData() => //server request

Future представляет собой обычный класс с набором конструкторов.

Создать его можно с помощью публичного конструктора:

Future((){
	// Some async job
});

А вот функция переданная в конструктор — это и есть событие, которое выполнит Event Loop и вернёт результат или ошибку в обработчик этого Future.

Несколько нюансов:

  • Future API может даже создать микро-задачу, через factory-конструктор Future.microtask() — результат выполнения можно будет обработать как и для любого другого Future.
  • Мы можем отложить выполнение кода внутри Future, на определённый срок с помощью Future.delayed(). Например, если нам нужно реализовать тротлинг операций — то есть выполнять операцию не чаще одного раза за X миллисекунд.

Состояния Future

Выше мы сказали следующее: «Future — это отложенная операция». Значит, мы можем выделить два состояния:

  1. Uncompleted — код внутри Future ещё не начал исполняться или исполняется.
  2. Completed — работа Future завершена со значением или с ошибкой.

Обработка результата Future

Для этого нужно зарегистрировать then-коллбек. Он будет вызван сразу после выполнения функции внутри Future.

void main() {
  Future(() {
    print('Future print');
  }).then((_) => print('Print after future'));
}

Обработка ошибки Future

Если во время работы Future возникнет ошибка, то Event Loop отправит ее в catchError-коллбек:

void main() {
  Future(() => print('Future print'))
      .then((_) => print('Print after future'))
      .catchError((error, stacktrace) => print(error));
}

У then есть тоже опциональный onError-коллбек, его приоритет «выше», чем у catchError:

А еще есть метод onError() под капотом он вызывает catchError()

Отличается тем, что:

  1. Позволяет изменить структуру ошибки
  2. Гарантирует, что в результате ошибка будет такого же типа, что и изначальная

Это может пригодиться, например, при обработке ошибки сетевого запроса.

Давайте представим, что сервер в ответ на запрос присылает различные коды ошибок в теле ответа, помимо стандартных кодов HTTP:

void main() {
  Future(() => performNetworkRequest())
      .onError((error, stackTrace){
        if(error.body.code == 1){
          //Еще один конструктор Future, немедленно отстрелит переданную ошибку через Event Loop
          return Future.error(CodeOneError());
        }
                              
        return error;
      });
}

Асинхронная работа

Выше мы рассмотрели принципы работы EventLoop, очереди событий и Future. Но все это не отвечает на вопрос:

Если Event Loop – это обычный синхронный цикл, очередь событий – это FIFO структура, а Future — интерфейс для создания событий в очереди, то что во всём этом асинхронного?

Ответ — ничего.

Если обратиться к описанию работы EventLoop, можно вообще сделать вывод, что тяжелое событие может заблокировать работу EventLoop и приложение зависнет. Ничего из перечисленного выше не сделает ваш код асинхронным.

Но всё это важно для понимания того, что такое асинхронные функции, и их предназначения.

Далее будет непростая теория, так что давайте остановимся и определимся с терминологией самих разработчиков языка.

Ключевые понятия:

  • Синхронная операция: Некая синхронная работа, которая блокирует исполнение последующего кода пока она не выполнится.
  • Синхронная функция: Функция, которая может выполнять только синхронные операции.
  • Асинхронная операция: Некая работа, которая может позволять другим операциям выполняться после ее старта и вплоть до завершения.
  • Асинхронная функция: Функция, которая выполняет как минимум одну асинхронную операцию и неограниченное кол-во синхронных операций.

Если с первыми двумя все ясно (синхронная операция — просто привычный всем код, синхронная функция — просто любая не асинхронная функция), то с оставшимися уже не все так очевидно.

Разберёмся подробнее.

Асинхронная операция

В блоке про Future мы уже приводили примеры кода, которые позволяют не блокировать исполнение кода.

Это не совсем так:

Как выполнится код Future внутри EventLoop зависит от того, какая функция была передана внутрь Future — синхронная или асинхронная.

Синхронные операции могут заблокировать выполнение дальнейшего кода, потому что Event Loop не пойдет на новый цикл, пока не обработает такую операцию.

Получается, что Future позволяет делать мнимые асинхронные операции, порядок выполнения которых на деле зависит от функции, переданной в конструкторе Future.

Для окончательного понимания давайте рассмотрим следующий код:

print('Before Future');
Future(()=> print('Future')).then((_)=> print('Future then'));
print('After Future');

Порядок вывода будет следующий: 'Before Future' → 'AfterFuture' → 'Future' -> 'Future then'

Это верно, но на деле вызов будет следующим

fluttern

Такой порядок будет верен всегда, так что:

  1. Future не является настоящей асинхронной операцией. И Future , и код внутри сами по себе — синхронные операции.
  2. Future не является легковесной реализацией параллелизма. Если бы это было правдой, мы не смогли бы гарантировать, что ‘After Future’ выведется раньше ‘Future’

Асинхронная функция

Повторим определение:

  • Асинхронная операция: Некая работа, которая может позволять другим операциям выполняться после её старта и вплоть до завершения.
  • Асинхронная функция: Функция, которая выполняет как минимум одну асинхронную операцию и неограниченное кол-во синхронных операций.

Чтобы превратить синхронную функцию в асинхронную достаточно добавить ключевое слово async перед телом функции:

void myAsyncFunction() async {
	print('Some job');
}

Поздравляю, теперь вызов myAsyncFunction() — асинхронная операция.

Пробуем:

Что-то не то, foo() же асинхронная, она не должна была блокировать вызов print('After operation')

На самом деле, функции помеченные async выполняются синхронно, до тех пор, пока не встретят первое применение ключевого слова await

Для полного понимания, давайте расширим понятие асинхронной функции:

Это функция, выполнение которой может быть приостановлено с передачей управления назад в место вызова. Как только блокирующая операция выполнится, выполнение функции продолжится с места, где она была приостановлена.

Ключевое слово await как раз позволяет приостановить исполнение кода.

Несколько важных замечаний:

  1. await можно использовать только внутри функции, помеченной async.
  2. Мы можем дождаться только выполнения Future.
  3. Асинхронная функция может возвращать значения только типа Future.
  4. Возвращаемым значением асинхронной функции может быть void, но в таком случае мы не сможем дождаться ее выполнения

Давайте модифицируем предыдущий пример. Не торопитесь запускать, попробуйте сами предугадать результат:

Вот теперь действительно все сошлось:

  • асинхронная getAsyncJobValue() была вызвана;
  • код начал исполняться синхронно;
  • на 9-й строке await поставило на паузу выполнение getAsyncJobValue(), но не заблокировало выполнение main();
  • как только Future на 9-й строке завершил свою работу, код пошёл дальше.

Прежде чем двинуться дальше, коротко напомним, что мы узнали:

  1. Асинхронной функцию можно сделать с помощью ключевого слова async, но код будет выполняться асинхронно только после await
  2. Future — не операция вовсе, а вот функция внутри него уже может быть синхронной или асинхронной.
  3. Event Loop — это бесконечный цикл обработки событий, он позволяет достичь псевдопаралелизма

Обработка ошибок

Осуществляется с помощью конструкции try/catch. Но есть нюанс:

Если результат асинхронной функции не ожидается, то в случае возникновения ошибки try/catch-блок её не отловит

Почему так — поговорим подробно в advanced секции, а пока — пример.

Как видите, catch-блок функции badAsyncJob() не смог обработать исключение.

Важно следить за ожиданием асинхронных операций, там где хотим отловить ошибки.

Таймеры

Таймер — это класс в Dart, который позволяет c помощью событий Event Loop выполнять какие-то операции через время.

Пример простейших таймеров:

// Через секунду напечатает 'on Tick'
Timer(Duration(seconds: 1), (){
	print('on Tick');
});

// Каждую секунду будет печатать 'on Tick'
Timer.periodic(Duration(seconds: 1), (timer){
	print('on Tick');
}); 

При разговоре про Future, мы упоминали про метод Future.delayed(), который делает то же самое. Но у него есть пара отличий от таймера:

  1. Timer возвращает объект Timer

  2. Future.delayed возвращает Future и позволяет использовать await

  3. Timer можно закрыть до выполнения кода внутри, с Future так не получится.

    final timer = Timer(Duration(seconds: 1), (){
      print('on Tick');
    });
    timer.cancel();//Ничего не выведется
    
  4. Future.delayed использует Timer, см. документацию

Isolate

Выше мы сказали следующее:

В Dart есть Isolates — аналоги потоков или нитей(Threads).

Пришло время объяснить что это значит.

Изолят:

  1. Это способ реализации псевдопараллелизма в Dart.
  2. У него свой Event Loop и участок памяти.
  3. Можно создавать сколько угодно изолятов.

Почему псевдопараллелизм и почему Isolate — это не совсем поток, поговорим в Advanced-главе.

Сейчас же остановимся на их пользе:

Как мы говорили выше в блоке про EventLoop, тяжёлые операции могут наглухо заблокировать работу Dart-кода. С помощью изолята мы можем выполнить какую-то сложную операцию «асинхронно», а затем обработать результат в основном изоляте.

Есть два простых способа создать изолят:

  1. С помощью Isolate.run

    int slowFib(int n) =>
        n <= 1 ? 1 : slowFib(n - 1) + slowFib(n - 2);
    Isolate.run(() => slowFib(40));
    

    Isolate.run ****за нас создаст изолят, выполнит код внутри коллбека, вернет результат и почистит изолят в конце.
    Важно подметить, что точкой входа в изолят может быть только функции верхнего уровня, статические функции или замыкания.
    С замыканиями стоит быть осторожными, они должны использовать только те объекты, доступ к которым может получить сам Isolate.

  2. С помощью функции compute

    Future<bool> isPrime(int value) {
      return compute(_calculate, value);
    }
    bool _calculate(int value) {
      if (value == 1) {
        return false;
      }
      for (int i = 2; i < value; ++i) {
        if (value % i == 0) {
          return false;
        }
      }
      return true;
    }
    

Функция compute за нас создаст изолят, выполнит код внутри коллбека, вернет результат и почистит изолят в конце.

Stream и реактивное программирование

Концепция реактивного или декларативного программирования строится на асинхронных последовательностях событий.

Суть концепции может представлена в виде подобной схемы:

fluttern

Есть некий генератор асинхронных событий, и есть подписчики, слушающие эти события.

В Dart такой последовательностью называется Stream . Создать Stream можно можно несколькими способами:

  1. С помощью конструктора.

    final stream = Stream.fromIterable([1, 2, 3, 4, 5]);
    

    Про другие конструкторы можно почитать в документации

  2. С помощью асинхронного генератора.
    Его принцип работы такой же как и обычного генератора, только он возвращает Stream, а не Iterable

    Stream<int> generateStream() async* {
      int count = 1;
      while(count <= 5){
        yield count++;
      }
    }
    
  3. С помощью StreamController.
    На этом задержимся подольше ниже, а пока пример простейшего Stream, созданного StreamController:

    class StreamProvider {
      final _controller = StreamController<int>();
      Stream get stream => _controller.stream;
      void startPushingEvents() {
        for (int i = 0; i < 20; i++) {
          _controller.sink.add(i);
        }
      }
      void stopPushingEvents() {
        _controller.close();
      }
    }
    

В примере выше StreamProvider предоставляет stream, который отдает последовательность чисел от 1 до 19. Помимо stream появляется Sink и необходимость закрывать controller. Зачем это необходимо, давайте разберемся ниже.

StreamController

О нем можно думать, как о продвинутом генераторе асинхронных событий. Среди публичных членов интерфейса есть:

sink — объект, принимающий события.

stream — сам объект Stream.

В общем случае работу StreamController можно описать так:

  1. Кто-то в коде подписывается на объект stream
  2. Кто-то в коде добавляет события в sink — ошибки, сами события или другой Stream
  3. StreamController тут же отправляет события подписчикам stream

StreamController выглядит как самый удобный способ работы с Stream API, но его поведение отличается в зависимости от того, какой StreamController вы используете.

Предлагаю рассмотреть следующую схему:

fluttern

Single-subscription StreamController

Стандартный конструктор создаст StreamController, который отдает Single-subscription Stream. Что это значит:

  1. У Stream может быть только один подписчик.
  2. Все события складываются в буфер, так что подписчик гарантировано получит всю очередь событий, даже если подпишется сильно позже.
  3. StreamController должен быть закрыт, если его никто не слушает, чтобы избежать утечки ресурсов
  4. Stream не должен порождать события, пока на него не подпишется кто-то.

Широковещательный StreamController

factory-конструктор StreamController.broadcast() создаст StreamController, который отдает Broadcast Stream. Что это значит:

  1. У Stream может быть больше одного подписчика.
  2. Все события отправляются активным слушателям в момент вызова методов add, addAll или addError у объекта Sink.
  3. У такого StreamController нет буфера, так что события могут быть пропущены.
  4. StreamController должен быть закрыт, если его никто не слушает, чтобы избежать утечки ресурсов
  5. Stream не должен порождать события, пока на него не подпишется кто-то.

Давайте рассмотрим конструктор подробнее и увидим несколько коллбеков:

  1. onListen - cрабатывает, когда кто-то подписывается на Stream впервые.
  2. onCancel - cрабатывает, когда Stream закрывается и больше его никто не слуашет.
  3. onResume - cрабатывает, когда Stream уходит с паузы.
  4. onPause - cрабатывает, когда Stream встает на паузу, события добавляются в буфер, но подписчик их не получает.

Последние два есть только у Single-subscription, поскольку буфер событий есть только у него.

По умолчанию каждый из конструкторов создает асинхронный StreamController. Это значит, что каждое событие проходит через EventLoop как microtask, что вызывает небольшую задержку.

У обоих конструкторов есть параметр sync , если он true, то мы получим синхронный StreamController, который стоит использовать с осторожностью.

Пример создания синхронных и асинхронных StreamController

// Асинхронные контроллеры
final asyncController = StreamController();
final asyncBroadcastController = StreamController.broadcast();

// Синхронные контроллеры
final syncController = StreamController(sync: true);
final syncBroadcastController = StreamController.broadcast(sync: true);

Давайте разберём подробнее синхронный контроллер.

SynchronousStreamController

Мы сказали, что SynchronousStreamController стоит использовать с осторожностью. Здесь мы поймем какие преимущества он дает и как он может сломать контракт работы с Stream.

Вместо отправки события посредством microtask, которая вызовет задержку, событие моментально отправляется во всех активных слушателей. Так что если вы понимаете, что асинхронный StreamController работает слишком медленно, вы можете посмотреть в сторону синхронной реализации.

А теперь минусы:

  1. Асинхронный Stream гарантирует, что подписчики смогут получить событие только, когда будут готовы. В синхронном случае, такой гарантии нет, и подписчик может получить событие до того, как будет готов его обработать.
    Представим, что мы делаем иконку корзины, которая подписывается на источник количества товаров. Источник добавит в Stream событие «Теперь в корзине 5 товаров», но иконка ещё не закончила инициализацию и не готова слушать новые события. В синхронном случае, мы потеряем событие и не отобразим счетчик, в асинхронном же Dart гарантирует доставку события.

  2. Stream гарантирует, что события доходят до подписчиков по порядку, и пока предыдущий вызов add не отработал полностью, следующее событие не будет отправлено. Синхронный StreamController сам не следит за этим, поэтому есть риск нарушить контракт Stream.

Все повседневные задачи можно решить с помощью асинхронного StreamController, так что нужды в синхронном нет. Для более глубокого понимания синхронного StreamController советую обратиться к документации.

Отправка событий в StreamController

Выше мы рассмотрели все способы создания двух типов Stream: с одним подписчиком (single-subscription) и широковещательные (broadcast).

Если с созданием Stream через конструктор и через генератор всё ясно (события в момент подписки доходят до обработчика сразу), то для StreamController есть специальный механизм — Sink.

Предназначение Sink — точка входа для данных в потоке.

У Sink есть множество реализаций в Dart, но мы остановимся на StreamSink — класс, который может принимать данные для Stream синхронно и асинхронно (как именно — определяет реализация StreamController).

Теперь рассмотрим доступные методы Sink, для лучшего понимания, советую открыть реализацию этого класса или его сигнатуру в документации.

Доступно всего несколько методов:

Наследование от EventSink<S>:

  • addEvent(S event) — добавляет новое событие в Stream, затем StreamController передает его всем активным подписчикам.
  • addError(Object error) — добавляет ошибку в Stream. Отловить ее можно с помощьюhandleError() в месте подписки.

Наследование от StreamConsumer<S>:

  • addStream(Stream<S> stream) — берет все события из stream и передает в StreamController. Возвращает Future.

Наследование от Sink:

  • close — Сообщить Sink, что новых событий для этого инстанса больше не будет. Если попытаетесь что-то добавить в закрытый Sink , вы получите ошибку.

И лишь одно свойство:

  • done — Это Future, которая выполнится, если StreamSink закрыт или один из методов EventSink выстрелил исключение.

Пока выполняется Future из addStream(), нельзя пользоваться методами EventSink. Иначе вы получите исключение.

Обработка событий Stream

Начать обработку ивентов можно двумя способами:

  1. Метод listen()
  2. Асинхронный цикл for (await for)

Рассмотрим await for.

По сравнению с for in циклом у него три отличия:

  1. Перед for нужно добавить await
  2. Итерироваться он может только по событиям Stream
  3. Использовать его можно только в асинхронной функции

Для лучшего понимания давайте решим задачу:

Нам нужно считывать весь контент файла и складывать его в одну строку.

Для этого напишем две функции, которые будут складывать строковый массив в полноценную строку для итерируемых коллекций(Iterable) и для Stream:

String concatStrings(Iterable<String> collection) {
  final stringBuffer = StringBuffer();
  for (final string in collection) {
    stringBuffer.write(string);
  }
  return stringBuffer.toString();
}

Future<String> conctatStreams(Stream<String> stream) async* {
  final stringBuffer = StringBuffer();
  await for (final string in stream) {
    stringBuffer.write(string);
  }
  return stringBuffer.toString();
}

Так, мы получили два решения одной и той же задачи, но с помощью conctatStreams() мы можем асинхронно обрабатывать контент файла и не блокировать исполнение дальнейшего кода.

Заметим, что события ошибки await for обрабатывать не умеет. В случае оной цикл просто прекратит исполнение.

Теперь рассмотрим метод listen()

Этот метод позволяет создать подписчика у Stream в любом месте.

Напоминаем, что для single-subscription Stream нельзя создать больше одного подписчика. Компилятор за этим не следит, так что вы увидите исключение только во время работы приложения.

Давайте посмотрим на пример кода с вызовом метода listen()

StreamSubscription<T> createStreamSubcription<T>(Stream<T> stream) {
  final streamSubscription = stream.listen(
    _handleStreamEvent,
    onError: _handleOnError,
    onDone: _handleOnDone,
    cancelOnError: true,
  );
  return streamSubscription;
}

void _handleStreamEvent<T>(T event) {
  print('Got new event $event');
}

void _handleOnError(Object? error, StackTrace? stackTrace) {
  print('Got stream error');
}

void _handleOnDone() {
  print('Stream subscription closed');
}

Первое, что мы видим — listen() возвращает объект StreamSubscription. Это и есть подписка. Подписку можно поставить на паузу(pause()), восстановить(resumse()) и закрыть(cancel()).

Если флаг cancelOnError == true, тогда подписка закроется после первого события ошибки.

Будьте внимательны с cancel() — закрытую подписку уже нельзя восстановить, а Stream сам за вас ее не закроет. Если забыть про это, то события могут протечь в неожиданных для вас сценариях.

В момент подписки вы можете определить несколько callback:

  1. onData — будет вызван каждый раз, когда StreamSubscription обрабатывает событие Stream.
  2. onError — будет вызван при появлении ошибки в Stream. Если не определить, все ошибки будут обработаны как unhandled и отправятся в обработчик текущей зоны (о том что такое зоны мы будем говорить в параграфах ниже)
  3. onDone — будет вызван, если Stream закроется и отправит об этом событие. Т.е. завершится без ошибок. Если не определить, ничего не произойдет.

Мы рассмотрели только основные способы обработки событий, в библиотеке dart:async есть еще больше методов-модификаторов, которые строятся на всем вышесказанном. А для более сложных сценариев используйте пакет rxdart.

Обработка ошибок в Stream

Выше мы рассмотрели принцип работы Stream в Dart, вы уже могли заметить, что он сильно отличается от асинхронных функций и Isolate. Так и с ошибками, обычный try/catch не отловит все исключения.

Любая ошибка в потоке Stream приведет к тому, что подписчик не получит событие и не обработает его в onData. А все необработанные исключения считаются unhandled и попадают в обработчик зоны.

Исключение может подстерегать нас в нескольких случаях:

  • Один из методов-модификаторов выбрасывает исключение. В этом случае можно воспользоваться:
    1. try/catch внутри функции;
    2. handleError() ниже по Stream;
    3. Коллбек onError в StreamController и в StreamSubscription.

Ошибка долетает в порядке выше, т.е. если handleError() поймал где-то ошибку, то StreamSubscrtiption ее не получит.

  • В Stream появилось событие ошибки. Кто-то явно его добавил через addError() . В этом случае:
  1. handleError() ниже по Stream;

    Stream.periodic(const Duration(seconds: 1), (count) {
      if (count == 2) {
        throw Exception('Exceptional event');
      }
      return count;
    }).take(4).handleError(print).forEach(print);
    
  2. Коллбек onError в StreamController и в StreamSubscription.
    Ошибка долетает в порядке выше, т.е. если handleError() поймал где-то ошибку, то StreamSubscrtiption ее не получит.

    • Один из коллбеков StreamSubscription выбрасывает исключение. В этом случае только try/catch, в onError исключение не попадет.

Заключение

В этом параграфе мы изучили весь доступный инструментарий Dart для написания стабильного декларативного кода:

  1. Знаем зачем нужен и как устроен EventLoop.
  2. Можем отличить Event от Microtask.
  3. Познакомились с Future API — инструментом для написания конкурентного кода.
  4. Знаем что такое Isolate и как им пользоваться.
  5. Познакомились с Stream API — способом писать реактивный код.

В следующем параграфе мы приступим к изучению виджетов — компонентов, с помощью которых создаётся внешний вид Flutter-приложения.

Отмечайте параграфы как прочитанные чтобы видеть свой прогресс обучения

Вступайте в сообщество хендбука

Здесь можно найти единомышленников, экспертов и просто интересных собеседников. А ещё — получить помощь или поделиться знаниями.
Вступить
Сообщить об ошибке
Предыдущий параграф2.5. Dart: ООП
Следующий параграф2.7. Widgets: basics, stless, stful, inherited