Executing a Queue of Async Duties Utilizing RxJS

[ad_1]

Photograph by Xiangkun ZHU on Unsplash

I’ve to say the title for this text was fairly onerous to phrase accurately however if you happen to’ve arrived right here after trying to find the important thing phrases then credit score to you. This downside can come up in real-world functions and generally in interview questions. Let’s say you wish to make 10,000 API requests. We will simply construct a queue of these 10,000 requests in RxJS like this:

import request from 'request-promise-native';
import { from } from 'rxjs';

let reqs = [];
for ( let i = 0; i < 10000; i++ ) {
reqs.push(
from(request(`http://instance.com/${i}`))
)
};

Now you could have an array of RxJS observables which have been created from an array of guarantees. It will symbolize the queue of duties. Subsequent, we wish to do the next:

  • Execute all 10,000 API requests within the queue
  • We solely need 3 concurrent requests working at any given time
  • Upon completion of 1 request, we want a brand new request to start out and take its place

After I see one thing like this my first thought is that forkJoin from RxJS can be utilized to make the API calls. However we additionally have to restrict the execution to a most variety of concurrent subscriptions. As an alternative of utilizing forkJoin we use mergeMap and expose the concurrent variety of working duties. This may be illustrated by a generically typed helper operate that may deal with any array of observables and a max parallel executions worth:

import { from, Observable } from "rxjs";
import { final, map, mergeMap, toArray } from "rxjs/operators";

operate maxConcurrent<T>(
observables: Observable<T>[],
concurrent: quantity
): Observable<T[]> {
return from(observables).pipe(
mergeMap((observable, observableIndex) =>
observable.pipe(
final(),
map((worth) => ({ index: observableIndex, worth })
)
),
concurrent
),
toArray(),
map((pairs) => pairs.kind((l, r) => l.index - r.index).map(({ worth }) => worth))
);
}

Our helper operate begins off by changing the array of observables right into a higher-order observable. Then it merges every observable to create a single stream and takes the final worth emitted from the supply upon completion. Then it matches every “final” worth with its index in order that the output order matches the enter order.

? Now you can use an open-source toolchain like Bit to extract this helper operate into its personal module after which you possibly can take a look at, doc, and model this module independently after which reuse it throughout a number of tasks utilizing a easy bit import your.username/helperFunction command.

Study extra right here:

Now to see our helper operate in motion we are able to use defer as an alternative of from to generate the unique queue of observables. The defer operator makes use of a manufacturing facility sample and can solely execute as soon as the promise is subscribed to:

import { defer, from, Observable } from "rxjs";
import { final, map, mergeMap, toArray } from "rxjs/operators";

operate maxConcurrent<T>(
observables: Observable<T>[],
concurrent: quantity
): Observable<T[]> {
return from(observables).pipe(
mergeMap((observable, observableIndex) =>
observable.pipe(
final(),
map((worth) => ({ index: observableIndex, worth })
)
),
concurrent
),
toArray(),
map((pairs) => pairs.kind((l, r) => l.index - r.index).map(({ worth }) => worth))
);
}

const handleRequest = async (id: string) => await fetch(
`http://instance.com/${id}`
);
const ids: string[] = [...Array(10000).keys()].map(toString);
const concurrent: quantity = 3;
const observables = ids.map((id: string) => defer(() => from(handleRequest(id))));

maxConcurrent<Response>(observables, concurrent).subscribe((worth) => {
console.log('VALUE = ', worth);
});

You’ll be able to do this out with an actual API and an actual queue of duties and it’ll at all times maintain a max of three (or nonetheless many you need) duties working at any time throughout the iteration ?

[ad_2]

Leave a Reply

Your email address will not be published. Required fields are marked *