Операторы буфера RxJS: основы работы

По умолчанию наблюдаемые потоки в RxJS не сохраняют значения в буфере или в кэше. Однако в некоторых ситуациях было бы полезно буферизовать некоторые значения (например, это выгодно для обработки в пакетном режиме). RxJS предоставляет 5 операторов для упрощения этой задачи: buffer, bufferCount, bufferTime, bufferToggle и bufferWhen. В этом мануале мы рассмотрим эти операторы буферизации на простых примерах.

Примечание: Обратите внимание, что буферизованные значения выдаются как массивы значений, а не как отдельные значения.

В приведенных ниже примерах мы будем использовать нажатия кнопок в качестве наблюдаемых объектов и сопоставлять клики со случайным числом.

Оператор buffer

Оператор buffer принимает второй наблюдаемый объект в качестве аргумента и будет буферизовать значения из первого наблюдаемого объекта до тех пор, пока второй объект не будет передан. Затем buffer сбрасывается и начинает буферизацию снова, пока второй наблюдаемый объект не будет передан еще раз.

В следующем примере значения, связанные с нажатиями кнопки, буферизуются до тех пор, пока генерируется наблюдаемый объект release$ (когда нажимается кнопка release):

const btn = document.querySelector('.click-me');
const releaseBtn = document.querySelector('.release');

const release$ = Rx.Observable.fromEvent(releaseBtn, 'click');

const myObs1 = Rx.Observable.fromEvent(btn, 'click')
  .map(_ => Math.floor(Math.random() * 100))
  .buffer(release$)
  .subscribe(random => console.log(random));

Оператор bufferCount

С помощью bufferCount вы можете задать количество значений, которые нужно хранить в буфере, прежде чем они будут переданы.

В этом примере мы указываем, что в буфер нужно поместить 4 значения (4 нажатия кнопки), после чего они будут переданы и записаны в консоль:

const btn = document.querySelector('.click-me');

const myObs1 = Rx.Observable.fromEvent(btn, 'click')
  .map(_ => Math.floor(Math.random() * 100))
  .bufferCount(4)
  .subscribe(random => console.log(random));

При этом каждые 4 клика в консоль будет записываться массив типа [86, 93, 57, 64].

Оператор bufferTime

bufferTime принимает количество миллисекунд для буферизации значений. По истечении времени буферизованные значения передаются, а буфер запускается снова.

Здесь значения, отображаемые из событий нажатия, буферизуются на 1 секунду, а затем передаются:

const btn = document.querySelector('.click-me');

const myObs1 = Rx.Observable.fromEvent(btn, 'click')
  .map(_ => Math.floor(Math.random() * 100))
  .bufferTime(1000)
  .take(15)
  .subscribe(random => console.log(random));

Если пользователь 3 раза нажимает на кнопку в течение одного из периодов буферизации (длиной в одну секунду), в консоль будет записан массив вроде [44, 71, 90]. Это может продолжаться вечно, поэтому мы также использовали оператор take, чтобы закрыть поток после того, как были переданы 15 значений массива.

Оператор bufferWhen

bufferWhen похож на buffer, но вместо того, чтобы принимать наблюдаемый объект, он принимает функцию селектора (так называемый закрывающий селектор). Здесь мы будем использовать тот же пример, что и с оператором buffer. Небольшое отличие заключается в том, что мы используем функцию, которая возвращает наблюдаемый объект release$:

const btn = document.querySelector('.click-me');
const releaseBuffer = document.querySelector('.release');

const release$ = Rx.Observable.fromEvent(releaseBuffer, 'click');

const myObs1 = Rx.Observable.fromEvent(btn, 'click')
  .map(_ => Math.floor(Math.random() * 100))
  .bufferWhen(() => release$)
  .subscribe(random => console.log(random));

Оператор bufferToggle

bufferToggle похож на buffer, но принимает два аргумента: наблюдаемый объект, чтобы начать буферизацаию, и закрывающий селектор, чтобы остановить ее и выдать значения.

В данном примере нажатия кнопки игнорируются до тех пор, пока передается объект start$. После этого start$ должен передаться еще раз, чтобы начать новый буфер:

const btn = document.querySelector('.click-me');

const startBtn = document.querySelector('.start');
const stopBtn = document.querySelector('.stop');

const start$ = Rx.Observable
  .fromEvent(startBtn, 'click')
  .do(_ => console.log('Start buffering!'));

const stop$ = Rx.Observable
  .fromEvent(stopBtn, 'click')
  .do(_ => console.log('Stop buffering!'));

const myObs1 = Rx.Observable.fromEvent(btn, 'click')
  .map(_ => Math.floor(Math.random() * 100))
  .bufferToggle(start$, () => stop$)
  .subscribe(random => console.log(random));

Примечание: Здесь мы также использовали оператор do с наблюдаемыми объектами start$ и stop$, чтобы записать сообщение в консоль, когда они будут переданы.

Читайте также: Субъекты поведения и воспроизведения RxJS

Tags:

Добавить комментарий