Skip to content

Commit

Permalink
ksCombineLatest tuple arguments
Browse files Browse the repository at this point in the history
  • Loading branch information
KEIII committed Feb 9, 2024
1 parent 13f9c03 commit 11c0f8e
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 40 deletions.
59 changes: 23 additions & 36 deletions src/creation_operators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,56 +237,43 @@ export const ksPeriodic = (
/**
* When any observable emits a value, emit the last emitted value from each.
*/
export const ksCombineLatest = <A, B>(
stream_a: Stream<A>,
stream_b: Stream<B>,
): Stream<[A, B]> => {
return stream_a.constructor(({ next, complete }) => {
let completed_a = false;
let completed_b = false;
let value_a: Option<A> = none;
let value_b: Option<B> = none;
const a = _unsubscribableObservable(stream_a);
const b = _unsubscribableObservable(stream_b);
export const ksCombineLatest = <A extends unknown[]>(
streams: [...{ [K in keyof A]: Stream<A[K]> }],
): Stream<A> => {
if (streams.length < 1) return ksNever;
return streams[0].constructor(({ next, complete }) => {
const observables = streams.map(_unsubscribableObservable);
const completedObservables = streams.map(() => false);
const valueOptionObservables = streams.map((): Option<unknown> => none);

const unsubscribe = () => {
b.unsubscribe();
a.unsubscribe();
return observables.forEach(observable => observable.unsubscribe());
};

const tryNext = () => {
if (isSome(value_a) && isSome(value_b)) {
return next([value_a.value, value_b.value]);
if (valueOptionObservables.every(isSome)) {
next(valueOptionObservables.map(option => option.value) as A);
}
};

const tryComplete = () => {
if (completed_a && completed_b) {
if (completedObservables.every(completed => completed)) {
complete();
unsubscribe();
}
};

a.subscribe({
next: value => {
value_a = some(value);
tryNext();
},
complete: () => {
completed_a = true;
tryComplete();
},
});

b.subscribe({
next: value => {
value_b = some(value);
tryNext();
},
complete: () => {
completed_b = true;
tryComplete();
},
observables.forEach((observable, index) => {
observable.subscribe({
next: value => {
valueOptionObservables[index] = some(value);
tryNext();
},
complete: () => {
completedObservables[index] = true;
tryComplete();
},
});
});

return { unsubscribe };
Expand Down
4 changes: 2 additions & 2 deletions tests/glitches.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ describe('diamond problem (glitches)', () => {
const firstName = ksBehaviourSubject('John');
const lastName = ksBehaviourSubject('Doe');
const isFirstNameShort = firstName.pipe(ksMap(n => n.length < 10));
const fullName = ksCombineLatest(firstName, lastName).pipe(
const fullName = ksCombineLatest([firstName, lastName]).pipe(
ksMap(([first, last]) => `${first} ${last}`),
);
const displayName = isFirstNameShort.pipe(
Expand Down Expand Up @@ -80,7 +80,7 @@ describe('diamond problem (glitches)', () => {
const a = ksPeriodic(0); // 0-----1-----2-----3-----4------
const b = a.pipe(ksMap(i => alphabet[i])); // a-----b-----c-----d-----e------
const c = a.pipe(ksMap(i => i * i)); // 0-----1-----4-----9-----16-----
const d = ksCombineLatest(b, c)
const d = ksCombineLatest([b, c])
.pipe(ksMap(([_1, _2]) => `${_1}${_2}`))
.pipe(ksTake(alphabet.length));
expect(await stackOut(d)).toEqual([
Expand Down
4 changes: 3 additions & 1 deletion tests/rxjs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ it('should works like RxJS (in some cases ;)', async () => {
takeUntil(timer(ms)),
);

const aa = stackOut(ksMerge(ksCombineLatest(a, a), ksCombineLatest(a, a)));
const aa = stackOut(
ksMerge(ksCombineLatest([a, a]), ksCombineLatest([a, a])),
);
const bb = stackOut<any>(merge(combineLatest([b, b]), combineLatest([b, b])));
const aaa = await aa;
const bbb = await bb;
Expand Down
2 changes: 1 addition & 1 deletion tests/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ describe('ksCombineLatest', () => {
const limit = 100;
const ms = 5;
const s = ksPeriodic(ms).pipe(ksTake(limit));
const a = await stackOut(ksCombineLatest(s, s));
const a = await stackOut(ksCombineLatest([s, s]));
const rxjsStream = timer(0, ms).pipe(take(limit));
const b = await stackOut(combineLatest([rxjsStream, rxjsStream]));
expect(a).toEqual(b);
Expand Down

0 comments on commit 11c0f8e

Please sign in to comment.