Uno dei tratti distintivi delle applicazioni complesse è l’acquisizione di dati da numerose fonti. Possono essere fornitori di dati esterni, applicazioni o un database locale. RxJS fornisce una varietà di operatori per combinarli in un osservabile in base all’ordine, al tempo e/o alla struttura dei valori emessi. In questo articolo impareremo a sottoscrivere più flussi contemporaneamente utilizzando due dei più popolari operatori di combinazione RxJS: combinareUltime e forkUnisciti.
RxJS combineUltimo operatore
Uno dei motivi più comuni per combinare gli osservabili è eseguire un qualche tipo di calcolo o determinazione utilizzando i dati di ciascuno. Il combinareUltime L’operatore emette un elemento ogni volta che una delle osservabili di origine emette un valore, ma solo quando ciascuna delle osservabili di origine ha emesso almeno un valore. Pertanto, ogni volta che uno degli osservabili sorgente emette un valore, combinareUltime:
- combina gli elementi emessi più di recente da ciascuna delle altre sorgenti osservabili e
- se hai fornito una funzione di progetto, emette il valore restituito da quella funzione.
Ecco il combinareUltime firma:
combineLatest(observables: ...Observable [, project: function]): Observable
Il seguente frammento di codice mostra uno scenario di utilizzo tipico di combinareUltime utilizzo. Noterai che il il peso e altezza le osservabili sono passate a combinareUltime come un array (di array). Il secondo parametro (opzionale) è la funzione di progetto che calcola il BMI utilizzando i valori emessi dal il peso e altezza osservabili. Infine, i risultati del calcolo vengono restituiti come un nuovo osservabile:
import { combineLatest, of } from 'rxjs'; const weight = of(70, 72, 76, 79, 75); const height = of(1.76, 1.77, 1.78); const bmi = combineLatest([weight, height], (w, h) => { console.log('project values: w = ', w, ', h=", h); return w / (h * h); }); bmi.subscribe(res => console.log("BMI is ' + res)); // Output to console is: // project values: w = 75, h = 1.76 // BMI is 24.212293388429753 // project values: w = 75, h = 1.77 // BMI is 23.93948099205209 // project values: w = 75, h = 1.78 // BMI is 23.671253629592222
Il console.log() l’output della funzione progetto mostra che l’ultimo valore di peso emesso viene utilizzato in tutti i calcoli. È combinato con ciascuno dei altezza valori per produrre i risultati dell’IMC. Questo accade perché il il peso i valori osservabili vengono emessi immediatamente senza ritardo. Quindi:
se un osservabile emette valori prima degli altri, allora quei valori vengono persi.
Il numero di valori emessi da combinareUltime è determinato dal flusso che emette il minor numero di valori, in questo caso, altezza.
RxJS forkJoin Operator
Piace combinareUltime, forkUnisciti accetta anche una serie di osservabili. Tuttavia, emette un array di valori nello stesso identico ordine dell’array passato. L’osservabile restituito emetterà gli ultimi valori emessi da ogni flusso di input.
La firma di forkUnisciti è quasi identico a quello di combinareUltime:
forkJoin(...args [, selector : function]): Observable
Se ripetiamo il nostro esempio precedente, sostituendo forkUnisciti per combinareUltime, possiamo vedere che viene prodotto un solo valore di BMI:
const bmi = forkJoin([weight, height], (w, h) => { console.log('selector values: w =', w, ', h=", h); return w / (h * h); }); bmi.subscribe(res => console.log("BMI is ' + res)); // Output to console is: // selector values: w = 75, h = 1.78 // BMI is 3.671253629592222
In che modo gli operatori combinati sanno quando tutti gli osservabili sono stati completati
Per emettere gli ultimi valori emessi da ogni flusso di input, forkUnisciti deve sapere quando tutti i flussi sono stati completati. Questo pone la domanda, come fa a sapere quando un flusso ha finito di emettere valori. La risposta sta nella classe Subscription, che implementa l’interfaccia Observer. Nel recente RxJS Observables Primer in Angular articolo, abbiamo appreso del prossimo(), errore(), e completare() gestori dell’interfaccia Observer. Gli Operatori Combinati attendono il completare() segnale per combinare i flussi.
Il di() la funzione si occupa della chiamata completare() per noi, ma è nostra responsabilità quando creiamo i nostri osservabili. Possiamo vedere che entrambe le osservabili sotto chiamano completare() al termine dell’emissione dei valori:
const weight = new Observable<number>((subscriber: Subscriber<number>) => { subscriber.next(70); subscriber.next(72); setTimeout(() => { subscriber.next(76); subscriber.next(79); subscriber.next(75); subscriber.complete(); }, 1200); }); const height = new Observable<number>(( subscriber: Subscriber<number>) => { subscriber.next(1.76); subscriber.next(1.77); setTimeout(() => { subscriber.next(1.78); subscriber.complete(); }, 1000); }); const bmi = forkJoin([weight2, height2], (w, h) => { console.log('selector values: w =', w, ', h=", h); return w / (h * h); }); bmi.subscribe(res => console.log("BMI is ' + res)); // Output to console is: // selector values: w = 75, h = 1.78 // BMI is 3.671253629592222
Di nuovo, forkUnisciti restituisce 3.671253629592222.
Tutti i frammenti di codice di cui sopra sono inclusi nel demo di stackblitz.com.
Conclusione
In questo articolo, abbiamo imparato a sottoscrivere più flussi contemporaneamente utilizzando due dei più popolari operatori di combinazione RxJS: combinareUltime e forkUnisciti. Non è proprio quello che stavi cercando? Non temere, ci sono più operatori combinati da dove provengono. Esploreremo alcuni di questi nelle prossime settimane.