4.11. Инструменты асинхронного программирования

В одном из предыдущих параграфов мы изучили Future — основной механизм работы с асинхронными операциями в Dart. Однако у Future есть важная особенность: после запуска его нельзя отменить.

Представьте ситуацию: пользователь открыл экран, приложение начало загружать данные, но пользователь тут же вернулся назад. Загрузка продолжится, хотя результат уже не нужен. А также бывают случаи, когда надо соединить два мира — синхронный и асинхронный — и самостоятельно управлять жизненным циклом Future.

Этот параграф посвящён инструментам, которые помогают решить проблему отмены и дают больше контроля над асинхронными операциями. Мы рассмотрим:

Completer — для ручного управления жизненным циклом Future.
AsyncCache и AsyncMemoize — для оптимизации повторяющихся операций.
Timeout — для ограничения времени выполнения.
CancelToken и CancellableOperation — для отмены операций.

Начнём с ситуаций, когда стандартных возможностей Future недостаточно:

  • Создать Future, результат которого будет предоставлен позже из другого участка кода.

  • Преобразовать API на колбэках, чтобы самостоятельно управлять завершением ожидания асинхронных операций.

  • Инкапсулировать сложную асинхронную логику из нескольких шагов.

  • Кешировать результаты для повышения производительности.

В таких случаях на помощь приходит Completer.

Completer

Completer — это класс, который позволяет вручную создавать объект Future и контролировать его завершение: либо успешно с результатом, либо с ошибкой. Он разделяет процесс создания Future от процесса его завершения. Полезен для оборачивания синхронных API, использующих колбэк в Future, помогает дожидаться завершения работы Stream, когда API требует вернуть именно Future.

Рассмотрим ситуацию, когда мы получаем с сервера поток сообщений и нам необходимо получить первое сообщение или в случае завершения потока вернуть сообщение по умолчанию. В таком случае нам поможет Completer и его возможности по управлению жизненным циклом Future.

1Future<Message> getFirstMessage(Stream<Message> stream) {
2  final completer = Completer<Message>();
3  StreamSubscription? subscription; 
4
5  // Подписываемся на поток (`listen`)
6  subscription = stream.listen(
7        (message) {
8      if (!completer.isCompleted) {
9        completer.complete(message);
10        subscription?.cancel();
11      }
12    },
13    onError: (error) {
14      if (!completer.isCompleted) {
15        completer.completeError(error);
16      }
17    },
18    onDone: () {
19      if (!completer.isCompleted) {
20        completer.complete(defaultMessage);
21      }
22    },
23    cancelOnError: true,
24  );
25
26  return completer.future;
27}

Функция получает поток сообщений в качестве аргумента, создаёт объект Completer и завершает его при наступлении определённых событий. Вызывающая сторона получает Future, который находится внутри Completer и хранит результат. Поскольку Completer можно завершить только один раз, важно проверять, не был ли он уже завершён, чтобы избежать исключения при повторном завершении.

Синхронный Completer

Конструктор Completer.sync() — это специальный конструктор, который создаёт Future, и его выполнение завершается синхронно при вызове методов complete() и completeError(). Это значит, что слушателям, которые ожидают результат Future от этого Completer, он будет доставлен синхронно, в этом же цикле Event Loop, вне зависимости от следующей микротаски или итерации Event Loop.

Рассмотрим его работу на примере двух функций (практический пример будет ниже). Одна использует асинхронный Completer, другая — синхронный. Слушателей Future регистрируем через then(), но помните, что синтаксис async/await делает то же самое; в данном примере then() нагляднее.

В результате мы получаем следующий вывод:

1Асинхронный Completer
2Перед asyncCompleter.complete()
3После asyncCompleter.complete()
4Получено значение: асинхронно, в следующем цикле!
5======
6Синхронный Completer
7Перед syncCompleter.complete(()
8Получено значение: синхронно, в этом цикле
9После syncCompleter.complete(()

Completer.sync() стоит использовать в следующих сценариях:

  • Когда нужно, чтобы обработчики запускались сразу же после завершения Future, без каких-либо задержек. Вы должны быть уверены, что вызывающий код к этому готов и для него это желаемое поведение.

  • Для реализации гарантии последовательности обработки событий — как мы показали в примере выше.

  • Важна максимальная скорость получения результатов функций.

В качестве практического примера разберём механизм кеширования. Если мы хотим, чтобы кешированное значение возвращалось мгновенно из кеша, без прерываний, которые могут возникнуть из-за наличия Future или Microtask неопределённой продолжительности, то нам поможет Completer.sync().

1class SyncCache<T> {
2  final Map<String, T> _cache = {};
3  final Map<String, Completer<T>> _pending = {};
4
5  Future<T> getValue(String key, Future<T> Function() networkRequest) async {
6    if (_cache.containsKey(key)) {
7      final completer = Completer<T>.sync();
8      completer.complete(_cache[key]);
9      return completer.future;
10    }
11
12    if (_pending.containsKey(key)) {
13      return _pending[key]!.future;
14    }
15
16    final completer = Completer<T>();
17    _pending[key] = completer;
18    
19    try {
20      final value = await networkRequest();
21      _cache[key] = value;
22      completer.complete(value);
23    } on Exception catch (error) {
24      completer.completeError(error);
25    } finally {
26      _pending.remove(key);
27    }
28
29    return completer.future;
30  }
31}
32

В этом примере Completer.sync() используется для мгновенного возврата кешированных значений. Это позволяет сохранить асинхронный API. А ещё — избежать создания отдельного синхронного кеша и асинхронного репозитория плюс класса для выбора между двумя способами получения данных.

Completer.sync() и Future.sync() решают разные задачи. И когда мы хотим доставить результат синхронно, то подойдёт только Completer.sync(). При этом Future.sync() лишь произведёт синхронное вычисление, а результат будет доставлен в следующей итерации Event Loop с помощью микрозадачи.

Важно

Completer.sync() доставляет значение синхронно. А Future.sync() — асинхронно, но производит синхронное вычисление.

Ранее мы говорили, что Future.sync() доставляет своё значение асинхронно, чтобы соблюдать контракт доставки результата и гарантирования правильной, асинхронной, последовательности событий, однако Completer.sync() её нарушает, поэтому следует избегать использования этого конструктора и применять его только в исключительных ситуациях.

Когда стоит избегать Completer.sync():

  • В большинстве случаев, когда нет явной необходимости синхронного получения результатов.

  • При большой вложенности кода или при длинной цепочке вызовов.

  • В публичных API библиотек. Так как API остаётся асинхронным, потребители будут ожидать получения результата в следующем цикле Event Loop.

Для удобства сравнили Future.sync() и Completer.sync() в таблице:

Критерий

Future.sync()

Completer.sync()

Способ доставки результата

Асинхронный (результат доставляется в следующей итерации Event Loop с помощью микрозадачи)

Синхронный (результат доставляется сразу в текущем цикле Event Loop)

Цель использования

Синхронное вычисление с асинхронной доставкой результата

Мгновенная доставка результата, когда нужны скорость и последовательность обработки событий

Контракт Future

Соблюдает контракт Future и асинхронную последовательность событий

Нарушает контракт асинхронности Future

Рекомендации по использованию

Подходит для широкого круга задач, где требуется синхронное вычисление с асинхронной доставкой результата

Следует использовать только в исключительных случаях. Например, при реализации механизма кеширования для мгновенного возврата значений. Не рекомендуется использовать в большинстве случаев при большой вложенности кода, длинной цепочке вызовов и в публичных API библиотек

Кому отдаёт управление

Сам вычисляет и отдаёт результат, когда сможет (всегда «позже»)

Управление всегда у разработчика (вы сами завершаете Сompleter)

 

AsyncCache и AsyncMemoize

Это инструменты из пакета async, которые помогают оптимизировать асинхронные операции, избавляют разработчика от написания рутинного кода при работе с Completer и кешировании Future.

  • AsyncCache кеширует результаты асинхронных функций, чтобы избежать повторного выполнения одинаковых запросов. Это полезно, когда функция выполняет дорогостоящие операции, такие как запросы к серверу.

  • AsyncMemoize похож на AsyncCache, но он кеширует не только результаты, но и сами асинхронные вызовы. Это позволяет избежать повторного вызова функции с одинаковыми аргументами.

AsyncCache и AsyncMemoize используют Completer для управления Future и обеспечения корректной работы кеширования. Они позволяют оптимизировать асинхронные операции, уменьшая количество запросов и улучшая производительность приложения.

Пример с кешированием запроса с помощью Future:

1Future<String>? _futureResult;
2
3Future<String> fetchData() async {
4  if (_futureResult != null) {
5    return _futureResult!;
6  }
7  
8  _futureResult = networkRequest().whenComplete(() => _futureResult = null);
9  return _futureResult!;
10}

Можно сократить до двух строк:

1late final AsyncCache<String> _asyncCache = AsyncCache.ephemeral();
2Future<String> fetchData() => _asyncCache.fetch(() => networkRequest());

Конструктор AsyncCache.ephemeral() создаст кеш только на время выполнения Future, переданного в функцию fetch. Это предотвращает дублирование сетевых запросов.

Иногда возникает необходимость выполнить какую-то операцию только один раз и навсегда закешировать результат. В этом поможет AsyncMemoize. Рассмотрим пример создания графа зависимостей, который содержит асинхронные операции.

1class DependencyResolver {
2  final AsyncMemoizer<Graph> _graphMemoizer = AsyncMemoizer<Graph>();
3  
4  Future<Graph> resolveGraph() {
5    return _graphMemoizer.runOnce(() async {
6      final rootNode = await _fetchRootNode();
7      final childNodes = await _fetchChildNodes(rootNode.id);
8      return Graph(root: rootNode, nodes: childNodes);
9    });
10  }
11}

Многократный вызов resolveGraph() не приведёт к созданию нескольких сущностей.

Инструменты для отмены Future

Timeout

С помощью этого метода мы можем ограничить время выполнения Future. Если результат не будет получен за указанное время, то будет выброшено исключение или вызван колбэк onTimeout, где необходимо вернуть результат на этот случай, чтобы Future мог завершиться.

Это метод, который находится в самом классе Future(). Изучите примеры работы timeout из документации. Далее мы рассмотрим реализацию функции timeout на упрощённом коде реализации:

1Future<T> timeout(Duration timeLimit, ...) {
2  _Future<T> _future = new _Future<T>();
3  Timer timer = new Timer(timeLimit, () {
4      _future._completeError(
5          new TimeoutException("Future not completed", timeLimit),
6          StackTrace.empty
7      );
8    });
9  
10  this.then((T v) {
11    if (timer.isActive) {
12      timer.cancel();
13      _future._completeWithValue(v);
14    }
15  }, onError: (Object e, StackTrace s) {
16    if (timer.isActive) {
17      timer.cancel();
18      _future._completeError(e, s);
19    }
20  });
21  return _future;
22}

Как работает код:

  1. Создаётся новый Future и запускается Timer на указанное время, по истечении которого будет выброшено исключение. В случае с переданным колбэком есть отдельная ветка, которая его обрабатывает, но в данном листинге она не представлена — для упрощения понимания.

  2. На существующем Future применяется функция then, которая всегда будет выполнена, когда исходное Future завершится.

  3. Внутри then происходит проверка того, что таймер ещё активен, а значит, время timeout не вышло. И тогда можно отменить таймер и завершить Future полученным результатом. А если время вышло, значит, результат не нужен, ничего не происходит.

Это наглядный пример, как можно использовать механизм отмены таймера для завершения ожидания результата Future.

Минусы отмены с timeout:

  • Не всегда понятно, сколько времени мы готовы ждать результат. Писать случайные таймауты — плохая практика.

  • Код внутри Future, к которому применили timeout, будет исполнен до конца. И если там получится тяжёлое вычисление, то будут зря потрачены ресурсы.

CancelToken

Можно самостоятельно реализовать механизм отмены операций с помощью передачи специального флага внутрь исполняемой функции.

1class CancelToken {
2  bool _isCancelled = false;
3
4  void cancel() {
5    if (!_isCancelled) {
6      _isCancelled = true;
7    }
8  }
9
10  bool get isCancelled => _isCancelled;
11}

Пишем специальный класс, который имеет состояние о том, можно ли продолжать исполнение кода. Далее инстанс этого класса нужно передать в функцию, в которой хочется прервать исполнение.

1Future<int> longRunningOperation(CancelToken cancelToken) async {
2  int result = 0;
3
4  for (int i = 0; i < 10; i++) {
5    // Проверяем, был ли отменён токен
6    if (cancelToken.isCancelled) {
7      throw 'Operation cancelled'; // Нужно выбросить исключение или вернуть специальное значение. Если бы возвращали void, можно было бы обойтись без исключения
8    }
9
10    print('Processing $i');
11    result += i; // Накопление результата
12  }
13
14  return result; // Возвращаем результат
15}

В цикле на каждую итерацию проверяем, не отменили ли выполнение. И если нет, то продолжаем работу. Если внутри этой функции потребуется вызвать другие асинхронные операции, то придётся передавать токен во все асинхронные операции и там его обрабатывать. Подобным образом реализована отмена сетевых запросов в популярной библиотеке для работы с сетью — Dio.

Минусы подхода CancelToken:

  • Возможны утечки памяти при хранении CancelToken.

  • Трудности с отладкой. Бывает сложно определить, кто отменил операцию при хранении CancelToken в разных частях кода.

  • Накладные расходы времени разработчика на протягивание токенов через все функции и написание адаптеров, если в проекте появляются разные реализации CancelToken.

CancellableOperation

CancellableOperation — это обёртка вокруг Future, которая:

  1. Позволяет явно отменить операцию.

  2. Предоставляет информацию о состоянии Future.

  3. Предоставляет механизм создания цепочек операций, которые никогда не будут выполнены, если предыдущая операция была отменена.

  4. Предоставляет механизмы для очистки ресурсов.

  5. Поддерживает композицию операций.

Это более структурированный подход к отмене асинхронных операций в Dart по сравнению с базовым CancelToken. Он предоставляет более полный API для управления жизненным циклом операций. Входит в пакет async. Это предотвращает появление разных реализаций в проекте. API доступен для применения к любому Future. В то же время, как CancelToken, это абстракция, существующая только в Dio, а для применения в других местах нужно писать свою реализацию. Также во время выполнения асинхронных операций добавлять проверки токена.

Ключевое отличие от подхода CancelToken в том, что CancellableOperation не может отменить уже начавшуюся операцию: он лишь сократит время ожидания цепочки, завершая Future досрочно, как timeout, только не по константному времени, а по решению бизнес-логики.

Рассмотрим пример создания CancellableOperation.

1final operation = CancellableOperation.fromFuture(
2  longRunningOperation(), // Код внутри функции будет исполнен в любом случае
3  onCancel: () => print('cancelled, resources released'), // функция, в которой вы можете очистить смежные ресурсы и вернуть значение для функции cancel()
4);
5
6operation.cancel(); // Не остановит longRunningOperation(), но вернёт значение функции onCancel()
7
8final result = await operation.valueOrCancellateion('result if operation was cancelled');

CancelToken предоставляет более ручной механизм отмены и требует написания кода отмены самой операции и всех операций по цепочке. CancellableOperation — это более простой способ отменить цепочку вызовов, он лучше подходит для обёртки существующего кода без его модификации.

Рассмотрим свойства, которые предоставляет класс CancellableOperation:

  • isCanceled → bool // Была ли эта операция отменена до её завершения.

  • isCompleted → bool // Готов ли результат этой операции.

  • value → Future<T> // Результат этой операции, если он не отменён.

Данные свойства предоставляют информацию о состоянии операции. Свойством value лучше никогда не пользоваться. У него сложное поведение, и в случае отмены операции это приведёт к зависанию Future навсегда.

Важно

Ожидание результата от CancellableOperaiton с помощью value не рекомендуется, так как может привести к зависанию ожидания навечно и утечке памяти. Вместо этого используйте метод valueOrCancellation.

Рассмотрим методы, которые предоставляет класс CancellableOperation:

  • asStream();

  • cancel();

  • then();

  • thenOperation();

  • valueOrCancellation();

Разберём их подробнее.

asStream()

1asStream() → Stream<T>

Создаёт Stream, содержащий результат этой операции.

cancel()

1cancel() → Future

Отменяет операцию, возвращает Future, который содержит результат вызова onCancel.

then()

1then<R>(FutureOr<R> onValue(T), {FutureOr<R> onError(Object, StackTrace)?, FutureOr<R> onCancel()?, bool propagateCancel = true}) → CancellableOperation<R>

Создаёт CancellableOperation, которая будет выполнена после завершения родительской операции. Флаг propagateCancel по умолчанию true. Если его изменить на false, то операция не будет отменяться при отмене родительской.
Это бывает необходимо, когда у второй операции несколько слушателей и она должна быть отменена независимо от родительской операции.

thenOperation()

1thenOperation<R>(FutureOr<void> onValue(T, CancellableCompleter<R>), {FutureOr<void> onError(Object, StackTrace, CancellableCompleter<R>)?, FutureOr<void> onCancel(CancellableCompleter<R>)?, bool propagateCancel = true}) → CancellableOperation<R>

Такой же метод, как и then, только в данном случае в колбэк передаётся ещё и CancellableCompleter, с помощью которого мы обязаны сообщить о результате данной операции дочерним операциям. Этот Completer даёт больше гибкости в управлении жизненным циклом Future внутри вызова then.

valueOrCancellation()

1valueOrCancellation([T? cancellationValue]) → Future<T?>

Создаёт Future, содержащий результат цепочки CancellableOperation. В случае отмены операции результат будет null или то, что передано в качестве cancellationValue.
Именно этот метод надо использовать для получения результата от CancellableOperation.

Разберём на примере двух операций — получения локации и выполнения сетевого запроса на основе полученных данных о локации.

1CancellableOperation<Data>? operation;
2
3Future<Data?> fetchData() async {
4  operation?.cancel();
5  
6  operation = CancellableOperation
7      .fromFuture(locationProvider.lastLocation)
8      .then((location) => _networkManager.executeRequest(location));
9
10  return operation?.valueOrCancellation();
11}
12
13void onUserClosedScreen() {
14  operation?.cancel();
15}

Метод onUserClosedScreen будет вызван, когда пользователь закроет экран и эти данные уже будут не нужны. В случае если получение локации затянется, то, когда она станет доступна, сетевой запрос не начнёт исполняться, так как операция отменена. В момент отмены потребители, ожидающие fetchData(), получат null и обработают его.

Не забыть обработать отсутствие результата помогает null-safety. При многократном вызове fetchData() все предыдущие результаты операции и ожидания будут отменяться, что позволяет не накапливать слушателей lastLocation, у которого неопределённое время выполнения.

Игнорирование результата

Код на Dart представляет собой последовательность асинхронных операций, собранных в Event Loop. Какие-то операции зависят друг от друга, какие-то нет. Мы можем отказаться от ожидания результата Future в каком-то месте по разным причинам, хоть делать это не рекомендуется.

В контексте отмены Future про такие операции также важно не забывать, ведь даже если у вас нет кода, который ожидает выполнения Future, они будут находиться в Event Loop до тех пор, пока Future не будет завершён.

Рассмотрим три варианта игнорирования результата.

1void main() {
2  fetchData(); // асинхронная функция вызвана без await
3  print("Продолжаем выполнение");
4}
5
6void anotherMain() {
7  unawaited(fetchData());
8  print("Продолжаем выполнение");
9}
10
11void oneMoreMain() {
12  fetchData().ignore();
13  print("Продолжаем выполнение");
14}

В первом случае невозможно даже понять, был ли вызов синхронной или асинхронной функции.

Во втором — вызов обёрнут unawaited, что явно сообщает нам об игнорировании результата асинхронной функции, и не более.

Оператор .ignore() отличается от unawaited тем, что игнорирует не только результат работы функции, но и ошибки, которые могут возникнуть в ходе выполнения fetchData.

В первых двух случаях исключение, которое возникнет во время выполнения асинхронной функции, попадёт в Unhandled Exceptions зоны, в которой была вызвана функция.

Игнорируя результат любым из способов, важно помнить, что в случае возникновения исключения оно не нарушит работу программы. Однако функция, которую вы вызвали, может быть завершена аварийно, и вы должны быть уверены, что это не повлияет на общее состояние программы. Лучше обработать исключение явно и не использовать игнорирование результата.


Прочитав этот параграф, вы освоили ряд инструментов для работы с асинхронным кодом в Dart и Flutter. Вы научились использовать Completer, чтобы вручную управлять жизненным циклом Future, интегрировать API на основе колбэков и создавать сложную логику управления асинхронными операциями. При этом вы понимаете, в каких редких случаях допустимо применять Completer.sync(), а когда от его использования лучше отказаться.

Вы также освоили инструменты AsyncCache и AsyncMemoize из пакета async, которые помогают оптимизировать асинхронные операции. Теперь вы можете упростить реализацию кеширования, избежать написания шаблонного кода и исключить типичные ошибки. Вы знаете, когда стоит использовать AsyncCache для временного кеширования, а когда лучше применить AsyncMemoize для операций, которые должны выполниться только один раз.

Кроме того, вы научились управлять отменой асинхронных операций. Вы освоили три подхода:

  • научились ограничивать время выполнения операций с помощью timeout и понимать его ограничения;
  • поняли, как реализовать механизм отмены операций с помощью CancelToken, получая полный контроль над отменой, но с необходимостью модифицировать код в цепочке вызовов;
  • познакомились с CancellableOperation — инструментом, который позволяет отменять цепочку вызовов без изменения существующего кода, предоставляет удобный API и поддерживает композицию операций.

Теперь вы умеете осознанно выбирать подходящий инструмент для управления асинхронными операциями, руководствуясь принципом минимальной достаточности. Вы понимаете, когда достаточно использовать обычный Future, когда подойдёт AsyncCache, а когда потребуются более мощные инструменты вроде Completer или CancellableOperation.

Вы получили представление о том, как писать предсказуемый и надёжный асинхронный код: научились избегать ненужных игнорирований результатов операций, обрабатывать возможные ошибки и учитывать особенности поведения используемых инструментов.

Помните, что асинхронный код должен оставаться предсказуемым. Избегайте игнорирования результатов операций без явной необходимости, всегда обрабатывайте возможные ошибки и документируйте нестандартное поведение. Эти инструменты дают большую власть над асинхронными операциями, но с ней приходит и ответственность за правильное их использование.

А в следующем параграфе мы поговорим о смежной теме: разберём, как в Dart устроены параллельные вычисления и изоляты — и как с их помощью улучшить пользовательский опыт.

Чтобы добавить в заметки выделенный текст, нажмите Ctrl + E
Предыдущий параграф4.10. Продвинутая асинхронность: микротаски
Следующий параграф4.12. Параллельные вычисления и изоляты в Dart