Хабрахабр

Асинхронное программирование в JavaScript (Callback, Promise, RxJs )

На связи Омельницкий Сергей. Всем привет. Сегодня я бы хотел законспектировать этот материал. Не так давно я вел стрим по реактивному программированию, где рассказывал про асинхронность в JavaScript.

Итак, давайте начнем с определений: что такое стек и очередь? Но перед тем как начать основной материал нам нужно сделать вводную.

Стек — это коллекция, элементы которой получают по принципу «последний вошел, первый вышел» LIFO

Очередь — это коллекция, элементы которой получают по принципу («первый вошел, первый вышел» FIFO

Окей, продолжим.

Это значит, что он имеется только один поток выполнения и один стек, в который помещаются функции в очередь на выполнение. JavaScript — это однопоточный язык программирования. Следовательно в один момент времени JavaScript может выполнить только одну операцию, другие операции при этом будут ждать своей очереди в стеке, пока их не вызовут.

Если мы переходим в функцию, мы помещаем запись о ней в верхнюю часть стека. Стек вызовов — это структура данных, которая, упрощенно говоря, записывает сведения о месте в программе, где мы находимся. Это — всё, что умеет стек. Когда мы из функции возвращаемся, мы вытаскиваем из стека самый верхний элемент и оказываемся там, откуда вызывали эту функцию. Как тогда работает асинхронность в JavasScript? А теперь крайне интересный вопрос.

Функции из этой очереди выполнятся по порядку только после того, как стек будет полностью очищен. На самом деле помимо стека в браузерах присутствует особая очередь для работы с так называемым WebAPI. Если в стеке в данный момент находится хотя бы один элемент, то они в стек попасть не могут. Только после этого они помещаются из очереди в стек на выполнение. Как раз именно из-за этого вызов функций по таймаута часто бывает не точным по времени, так как функция не может попасть из очереди в стек, пока он заполнен.

Также рассмотрим посмотрим, что при этом происходит в системе. Рассмотрим следующий пример и займёмся его пошаговым «выполнением».

console.log('Hi);
setTimeout(function cb1() { console.log('cb1');
}, 5000);
console.log('Bye');

Консоль браузера чиста, стек вызовов пуст. 1) Пока ничего не происходит.

2) Потом команда console.log('Hi') добавляется в стек вызовов.

3) И она выполняется

4) Затем console.log('Hi') удаляется из стека вызовов.

Она добавляется в стек вызовов. 5) Теперь переходим к команде setTimeout(function cb1() ).

Браузер создаёт таймер, являющийся частью Web API. 6) Команда setTimeout(function cb1() {… }) выполняется. Он будет выполнять обратный отсчёт времени.

7) Команда setTimeout(function cb1() {… }) завершила работу и удаляется из стека вызовов.

8) Команда console.log('Bye') добавляется в стек вызовов.

9) Команда console.log('Bye') выполняется.

10) Команда console.log('Bye') удаляется из стека вызовов.

11) После того, как пройдут, как минимум, 5000 мс., таймер завершает работу и помещает коллбэк cb1 в очередь коллбэков.

12) Цикл событий берёт c функцию cb1 из очереди коллбэков и помещает её в стек вызовов.

13) Функция cb1 выполняется и добавляет console.log('cb1') в стек вызовов.

14) Команда console.log('cb1') выполняется.

15) Команда console.log('cb1') удаляется из стека вызовов.

16) Функция cb1 удаляется из стека вызовов.

Взглянем на пример в динамике:

Теперь давайте кратко поговорим об эволюции асинхронного кода. Ну вот мы и рассмотрели как в JavaScript реализована асинхронность.

Эволюция асинхронного кода.

a(function (resultsFromA) { b(resultsFromA, function (resultsFromB) { c(resultsFromB, function (resultsFromC) { d(resultsFromC, function (resultsFromD) { e(resultsFromD, function (resultsFromE) { f(resultsFromE, function (resultsFromF) { console.log(resultsFromF); }) }) }) }) })
});

Они могут быть переданы как любая другая переменная другим функциям. Асинхронное программирование, каким мы его знаем в JavaScript, может быть реализовано только функциями. И это прикольно, весело и задорно, пока не превращается в грусть, тоску и печаль. Так родились коллбэки. Да все просто: Почему?

  • С ростом сложности кода, проект быстро превращается в малопонятные многократно вложенные блоки — «callback hell».
  • Обработку ошибок можно легко упустить.
  • Нельзя возвращать выражения с return.

С появлением Promise обстановка стала чуть лучше.

new Promise(function(resolve, reject) { setTimeout(() => resolve(1), 2000); }).then((result) => { alert(result); return result + 2; }).then((result) => { throw new Error('FAILED HERE'); alert(result); return result + 2; }).then((result) => { alert(result); return result + 2; }).catch((e) => { console.log('error: ', e);
});

  • Появились цепочки промисов, что улучшило читаемость кода
  • Появился отдельный метод перехвата ошибок
  • Появилась возможность параллельного выполнения с помощью Promise.all
  • Вложенную асинхронность мы можем решить с помощью async/await

К примеру промис, без танцев с бубном, нельзя отменить, а что самое главное — работает с одним значением. Но у промиса есть свои ограничения.

Устали? Ну вот мы и плавно подошли к реактивному программированию. А я продолжу. Ну благо дело можно можно пойти заварить чаек, обмозговать и вернуться читать далее.

Основы реактивного программирования

Давайте более детально разберем что такое поток данных. Реактивное программирование — парадигма программирования, ориентированная на потоки данных и распространение изменений.

// Получаем ссылку на элемент
const input = ducument.querySelector('input'); const eventsArray = []; // Пушим каждое событие в массив eventsArray
input.addEventListener('keyup', event => eventsArray.push(event)
);

Мы создаем массив, и на каждый keyup события input мы будем сохранять событие в нашем массиве. Представим, что у нас есть поле ввода.  индекс более поздних событий больше, чем индекс более ранних. При этом хотелось бы отметить, что наш массив отсортирован по времени  т.е. Для того чтоб этот массив можно было смело назвать потоком он должен уметь каким-то образом сообщать подписчикам, что в него поступили новые данные. Такой массив представляет собой упрощенную модель потока данных, но это еще не поток. Таким образом мы подошли к определению потока.

Поток данных

const { interval1 } = Rx;
const { take } = RxOperators; interval(1000).pipe( take(4)
)

А теперь представьте как удобно становится писать код, в котором на одно действие потребуется вызывать несколько событий в разных участках кода. Поток — это массив данных, отсортированных по времени, который может сообщать о том, что данные изменились. И это умеет делать библиотека RxJs. Мы просто делаем подписку на поток и он нам сам сообщит когда произойдут изменения.

Библиотека предоставляет основной тип Observable, несколько вспомогательных типов (Observer, Schedulers, Subjects) и операторы работы с событиями как с коллекциями (map, filter, reduce, every и подобные из JavaScript Array). RxJS — это библиотека для работы с асинхронными и основанными на событиях программами с использованием наблюдаемых последовательностей.

Давайте разберемся с основными понятиями этой библиотеки.

Observable, Observer, Producer

Этот класс содержит в себе основную часть реализации RxJs. Observable — первый базовый тип, который мы рассмотрим. Он связан с наблюдаемым потоком, на который можно как подписаться с помощью метода subscribe.

Источником значений для Observer называется Producer. В Observable реализуется вспомогательный механизм для создания обновлений, так называемый Observer. Так что можно сказать, что observable является проводником между Producer и Observer. Это может быть массив, итератор, web socket, какое-то событие и т.п.

Observable обрабатывает три вида событий Observer:

  • next – новые данные
  • error – ошибку, если последовательность завершилась по причине исключительной ситуации. это событие так же предполагает завершение последовательности.
  • complete — сигнал о завершении последовательности. Это означает, что новых данных больше не будет

Посмотрим демо:

мы получим 4 и завершим наш поток. В начале мы обработаем значения 1, 2, 3, а спустя 1 сек.

Мысли в слух

😀 И тут я понял, что рассказывать было интересней чем писать об этом.

Subscription

Так же мы можем сгруппировать подписки с помощью метода add. Когда мы делаем подписку на поток, мы создаем новый класс subscription, который дает нам возможность отменить подписку с помощью метода unsubscribe. Методы add и remove на вход принимают другую подписку. Ну и логично, что мы можем разгруппировать потоки с помощью remove. Идем дальше. Хотелось бы отметить, что когда мы делаем отписку, то мы отписываемся от всех дочерних подписок как будто бы и у них вызывали метод unsubscribe.

Виды потоков

В какой момент времени ты пришел, с того момента и начал просмотр. Если приводить аналогию, то я бы представил горячий поток как фильм в кинотеатре. поддержку. Холодный поток я бы сравнил со звонком в тех. Любой позвонивший слушает запись автоответчика от начала до конца, но ты можешь бросить трубку с помощью unsubscribe.

Возникает вопрос — где использовать)) Приведу пример из практики. Хотелось бы отметить, что существуют еще так называемые warm потоки ( такое определение я встречал крайне редко и только на зарубежных сообществах ) — это поток, который трансформируется из холодного потока в горячий.

Он активно использует rxjs. Я работаю с ангуляром. Если я использую этот пайп несколько раз, то, возвращаясь к определению холодного потока, каждый pipe будет запрашивать данные с сервера, что мягко говоря странно. Для получения данных на сервер я ожидаю холодный поток и этот поток использую в шаблоне с помощью asyncPipe. А если я преобразую холодный поток в теплый, то запрос произойдет единожды.

Вообще понимание вида потоков достаточно сложна для начинающих, но важна.

Operators

return this.http.get(`${environment.apiUrl}/${this.apiUrl}/trade_companies`) .pipe( tap(({ data }: TradeCompanyList) => this.companies$$.next(cloneDeep(data))), map(({ data }: TradeCompanyList) => data) );

Они помогают контролировать события, протекающие в Observable. Расширить возможность работы с потоками нам предоставляют операторы. Мы рассмотрим парочку наиболее популярных, а подробнее с операторами можно ознакомиться по ссылкам в полезной информации.

Operators — of

Начнем со вспомогательного оператора of. Он создает Observable на основе простого значения.

Operators — filter

Если оператор возвращает истину, то пропускает далее. Оператор фильтрации filter, как можно понять по названию, фильтрует сигнал потока.

Operators — take

take — Принимает значение кол-ва эмитов, после которого завершает поток.

Operators — debounceTime

debounceTime — отбрасывает испускаемые значения, которые попадают в указанный промежуток времени между выходными данными — по прошествии временного интервала эмитит последнее значение.

const { Observable } = Rx;
const { debounceTime, take } = RxOperators; Observable.create((observer) => { let i = 1; observer.next(i++); // Испускаем значение раз в 1000мс setInterval(() => { observer.next(i++) }, 1000); // Испускаем значение раз в 1500мс setInterval(() => { observer.next(i++) }, 1500);
}).pipe( debounceTime(700), // Ожидаем 700мс значения прежде чем обработать take(3)
);

Operators — takeWhile

Эмитит значения, пока takeWhile не вернет false, после чего отпишется от потока.

const { Observable } = Rx;
const { debounceTime, takeWhile } = RxOperators; Observable.create((observer) => { let i = 1; observer.next(i++); // Испускаем значение раз в 1000мс setInterval(() => { observer.next(i++) }, 1000);
}).pipe( takeWhile( producer => producer < 5 )
);

Operators — combineLatest

Он объединяет несколько потоков в один. Комбинированный оператор combineLatest чем-то похож на promise.all. Далее, после любо любого эмита из объединённых потоков он будет отдавать новые значения. После того как каждый поток сделает хотя бы один эмит, мы получаем последние значения от каждого в виде массива.

const { combineLatest, Observable } = Rx;
const { take } = RxOperators; const observer_1 = Observable.create((observer) => { let i = 1; // Испускаем значение раз в 1000мс setInterval(() => { observer.next('a: ' + i++); }, 1000);
}); const observer_2 = Observable.create((observer) => { let i = 1; // Испускаем значение раз в 750мс setInterval(() => { observer.next('b: ' + i++); }, 750);
}); combineLatest(observer_1, observer_2).pipe(take(5));

Operators — zip

Если значение не придет из какого-либо потока, то группа не будет сформирована. Zip — ждет значение из каждого потока и формирует массив на основе этих значений.

const { zip, Observable } = Rx;
const { take } = RxOperators; const observer_1 = Observable.create((observer) => { let i = 1; // Испускаем значение раз в 1000мс setInterval(() => { observer.next('a: ' + i++); }, 1000);
}); const observer_2 = Observable.create((observer) => { let i = 1; // Испускаем значение раз в 750 setInterval(() => { observer.next('b: ' + i++); }, 750);
}); const observer_3 = Observable.create((observer) => { let i = 1; // Испускаем значение раз в 500 setInterval(() => { observer.next('c: ' + i++); }, 500);
}); zip(observer_1, observer_2, observer_3).pipe(take(5));

Operators — forkJoin

forkJoin также объединяет потоки, но он емитнит значение только когда все потоки будут завершены (complete).

const { forkJoin, Observable } = Rx;
const { take } = RxOperators; const observer_1 = Observable.create((observer) => { let i = 1; // Испускаем значение раз в 1000мс setInterval(() => { observer.next('a: ' + i++); }, 1000);
}).pipe(take(3)); const observer_2 = Observable.create((observer) => { let i = 1; // Испускаем значение раз в 750 setInterval(() => { observer.next('b: ' + i++); }, 750);
}).pipe(take(5)); const observer_3 = Observable.create((observer) => { let i = 1; // Испускаем значение раз в 500 setInterval(() => { observer.next('c: ' + i++); }, 500);
}).pipe(take(4)); forkJoin(observer_1, observer_2, observer_3);

Operators — map

Оператор трансформации map преобразует значение эмита в новое.

const { Observable } = Rx;
const { take, map } = RxOperators; Observable.create((observer) => { let i = 1; // Испускаем значение раз в 1000мс setInterval(() => { observer.next(i++); }, 1000);
}).pipe( map(x => x * 10), take(3)
);

Operators – share, tap

Оператор tap - позволяет делать side эффекты, то есть какие-либо действия, которые не влияют на последовательность.

Утилитный оператор share способен из холодного потока сделать горячим.

Перейдем к Subject. С операторами закончили.

Мысли в слух

Утомили меня эти примеры 😀 И тут я пошел чаек пить.

Семейство subject-ов

Эти классы являются неким гибридом, которые выступают одновременно в роли observable и observer. Семейство subject-ов являются ярким примером горячих потоков. Если говорить по основным методам, то это: Так как subject является горячим потоком, то от него необходимо отписываться.

  • next – передача новых данных в поток
  • error – ошибка и завершение потока
  • complete – завершение потока
  • subscribe – подписаться на поток
  • unsubscribe – отписаться от потока
  • asObservable – трансформируем в наблюдателя
  • toPromise – трансформирует в промис

Выделяют 4 5 типов subject-ов.

Мысли в слух

Как говорится век живи век учись. На стриме говорил 4, а оказалось они еще один добавили.

Создается без параметров. Простой Subject new Subject()– самый простой вид subject-ов. Передает значения пришедшие только после подписки.

На вход принимает значение по умолчанию. BehaviorSubject new BehaviorSubject( defaultData<T> ) – на мой взгляд самый распространённый вид subject-ов. Данный класс имеет так же полезный метод value, который возвращает текущее значение потока. Всегда сохраняет данные последнего эмита, которые передает при подписке.

ReplaySubject new ReplaySubject(bufferSize?: number, windowTime?: number) — На вход опционально может принять первым аргументом размер буфера значений, которые он будет в себе хранить, а вторым время в течении которого нам нужны изменения.

Будет возвращено только последнее значение потока. AsyncSubject new AsyncSubject() — при подписке ничего не происходит, и значение будет возвращено только при complete.

Кто знает что он делает пишите, дополним. WebSocketSubject new WebSocketSubject(urlConfigOrSource: string | WebSocketSubjectConfig<T> | Observable<T>, destination?: Observer<T>) — О нем документация молчит и я сам его в первый раз вижу.

Ну вот мы и рассмотрели все, что я хотел сегодня рассказать. Фуф. Самостоятельно ознакомиться со списком литературы можно во вкладке полезная информация. Надеюсь данная информация была полезной.

Полезная информация

Теги
Показать больше

Похожие статьи

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *

Кнопка «Наверх»
Закрыть