import { Observable } from 'rxjs'
import { bufferTime, distinctUntilChanged, filter, map, share, switchMap } from 'rxjs/operators'
-function buildBulkObservable <T extends number | string, R> (options: {
- notifierObservable: Observable<T>
+function buildBulkObservable <P extends number | string, R> (options: {
+ notifierObservable: Observable<P>
time: number
- bulkGet: (params: T[]) => Observable<R>
+ bulkGet: (params: P[]) => Observable<R>
}) {
const { notifierObservable, time, bulkGet } = options
bufferTime(time),
filter(params => params.length !== 0),
map(params => uniq(params)),
- switchMap(params => bulkGet(params)),
+ switchMap(params => {
+ return bulkGet(params)
+ .pipe(map(response => ({ params, response })))
+ }),
share()
)
}
import * as debug from 'debug'
import { Observable, Subject } from 'rxjs'
-import { first, map } from 'rxjs/operators'
+import { filter, first, map } from 'rxjs/operators'
import { Injectable } from '@angular/core'
import { buildBulkObservable } from '@app/helpers'
import { ResultList } from '@shared/models/common'
type BulkObservables <P extends number | string, R> = {
notifier: Subject<P>
- result: Observable<R>
+ result: Observable<{ params: P[], response: R }>
}
@Injectable()
return new Observable<R>(obs => {
observableObject.result
.pipe(
+ filter(result => result.params.includes(param)),
first(),
- map(({ data }) => data),
+ map(result => result.response.data),
map(data => data.find(finder))
)
.subscribe(result => {
return this.searchService.searchVideoPlaylists({ uuids })
}
- private buildBulkObservableObject <T extends number | string, R> (bulkGet: (params: T[]) => Observable<R>) {
- const notifier = new Subject<T>()
+ private buildBulkObservableObject <P extends number | string, R> (bulkGet: (params: P[]) => Observable<R>) {
+ const notifier = new Subject<P>()
return {
notifier,