Skip to content

Commit

Permalink
fix(signals): do not listen to observable changes on instance injecto…
Browse files Browse the repository at this point in the history
…r destroy
  • Loading branch information
markostanimirovic committed Sep 28, 2024
1 parent 2deed39 commit b3fcf82
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 73 deletions.
169 changes: 110 additions & 59 deletions modules/signals/rxjs-interop/spec/rx-method.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import {
signal,
} from '@angular/core';
import { TestBed } from '@angular/core/testing';
import { BehaviorSubject, finalize, pipe, Subject, tap } from 'rxjs';
import { rxMethod } from '../src';
import { createLocalService } from '../../spec/helpers';
import { provideRouter } from '@angular/router';
import { provideLocationMocks } from '@angular/common/testing';
import { provideRouter } from '@angular/router';
import { RouterTestingHarness } from '@angular/router/testing';
import { BehaviorSubject, pipe, Subject, tap } from 'rxjs';
import { rxMethod } from '../src';
import { createLocalService } from '../../spec/helpers';

describe('rxMethod', () => {
it('runs with a value', () => {
Expand Down Expand Up @@ -239,43 +239,44 @@ describe('rxMethod', () => {
expect(counter()).toBe(4);
});

it('completes on manual destroy with Signals', () => {
TestBed.runInInjectionContext(() => {
let completed = false;
const counter = signal(1);
const fn = rxMethod<number>(finalize(() => (completed = true)));
TestBed.flushEffects();
fn(counter);
fn.unsubscribe();
expect(completed).toBe(true);
});
});

/**
* This test suite verifies that the internal effect of
* an RxMethod instance is executed with the correct injector
* and is destroyed at the specified time.
*
* Since we cannot directly observe the destruction of the effect from the outside,
* we test it indirectly.
* This test suite verifies that a signal or observable passed to a reactive
* method that is initialized at the ancestor injector level is tracked within
* the correct injection context and untracked at the specified time.
*
* Components use the globalSignal from GlobalService and pass it
* to the `log` method. If the component is destroyed but a subsequent
* Signal change still increases the `globalSignalChangerCounter`,
* it indicates that the internal effect is still active.
* Components use `globalSignal` or `globalObservable` from `GlobalService`
* and pass it to the reactive method. If the component is destroyed but
* signal or observable change still increases the corresponding counter,
* the internal effect or subscription is still active.
*/
describe('Internal effect for Signal tracking', () => {
describe('with instance injector', () => {
@Injectable({ providedIn: 'root' })
class GlobalService {
globalSignal = signal(1);
readonly globalSignal = signal(1);
readonly globalObservable = new BehaviorSubject(1);

globalSignalChangeCounter = 0;
globalObservableChangeCounter = 0;

readonly signalMethod = rxMethod<number>(
tap(() => this.globalSignalChangeCounter++)
);
readonly observableMethod = rxMethod<number>(
tap(() => this.globalObservableChangeCounter++)
);

log = rxMethod<number>(pipe(tap(() => this.globalSignalChangeCounter++)));
incrementSignal(): void {
this.globalSignal.update((value) => value + 1);
}

incrementObservable(): void {
this.globalObservable.next(this.globalObservable.value + 1);
}
}

@Component({
selector: `app-storeless`,
template: ``,
selector: 'app-without-store',
template: '',
standalone: true,
})
class WithoutStoreComponent {}
Expand All @@ -297,111 +298,161 @@ describe('rxMethod', () => {
return TestBed.inject(GlobalService);
}

it('it tracks the Signal when component is active', async () => {
it('tracks a signal until the component is destroyed', async () => {
@Component({
selector: 'app-with-store',
template: ``,
template: '',
standalone: true,
})
class WithStoreComponent {
store = inject(GlobalService);

constructor() {
this.store.log(this.store.globalSignal);
this.store.signalMethod(this.store.globalSignal);
}
}

const globalService = setup(WithStoreComponent);
const harness = await RouterTestingHarness.create('/with-store');

await RouterTestingHarness.create('/with-store');
expect(globalService.globalSignalChangeCounter).toBe(1);

globalService.globalSignal.update((value) => value + 1);
globalService.incrementSignal();
TestBed.flushEffects();
expect(globalService.globalSignalChangeCounter).toBe(2);

globalService.globalSignal.update((value) => value + 1);
globalService.incrementSignal();
TestBed.flushEffects();
expect(globalService.globalSignalChangeCounter).toBe(3);

await harness.navigateByUrl('/without-store');
globalService.incrementSignal();
TestBed.flushEffects();

expect(globalService.globalSignalChangeCounter).toBe(3);
});

it('destroys with component injector when rxMethod is in root and RxMethod in component', async () => {
it('tracks an observable until the component is destroyed', async () => {
@Component({
selector: 'app-with-store',
template: ``,
template: '',
standalone: true,
})
class WithStoreComponent {
store = inject(GlobalService);

constructor() {
this.store.log(this.store.globalSignal);
this.store.observableMethod(this.store.globalObservable);
}
}

const globalService = setup(WithStoreComponent);

const harness = await RouterTestingHarness.create('/with-store');

// effect is destroyed → Signal is not tracked anymore
expect(globalService.globalObservableChangeCounter).toBe(1);

globalService.incrementObservable();
expect(globalService.globalObservableChangeCounter).toBe(2);

globalService.incrementObservable();
expect(globalService.globalObservableChangeCounter).toBe(3);

await harness.navigateByUrl('/without-store');
globalService.globalSignal.update((value) => value + 1);
TestBed.flushEffects();
globalService.incrementObservable();

expect(globalService.globalSignalChangeCounter).toBe(1);
expect(globalService.globalObservableChangeCounter).toBe(3);
});

it("falls back to rxMethod's injector when RxMethod's call is outside of injection context", async () => {
it('tracks a signal until the provided injector is destroyed', async () => {
@Component({
selector: `app-store`,
template: ``,
selector: 'app-with-store',
template: '',
standalone: true,
})
class WithStoreComponent implements OnInit {
store = inject(GlobalService);
injector = inject(Injector);

ngOnInit() {
this.store.log(this.store.globalSignal);
this.store.signalMethod(this.store.globalSignal, {
injector: this.injector,
});
}
}

const globalService = setup(WithStoreComponent);

const harness = await RouterTestingHarness.create('/with-store');

// Signal is still tracked because RxMethod injector is used
globalService.incrementSignal();
TestBed.flushEffects();

expect(globalService.globalSignalChangeCounter).toBe(2);

await harness.navigateByUrl('/without-store');
globalService.globalSignal.update((value) => value + 1);
globalService.incrementSignal();
TestBed.flushEffects();

expect(globalService.globalSignalChangeCounter).toBe(2);
});

it('provides the injector for RxMethod on call', async () => {
it('tracks an observable until the provided injector is destroyed', async () => {
@Component({
selector: `app-store`,
template: ``,
selector: 'app-with-store',
template: '',
standalone: true,
})
class WithStoreComponent implements OnInit {
store = inject(GlobalService);
injector = inject(Injector);

ngOnInit() {
this.store.log(this.store.globalSignal, { injector: this.injector });
this.store.observableMethod(this.store.globalObservable, {
injector: this.injector,
});
}
}

const globalService = setup(WithStoreComponent);
const harness = await RouterTestingHarness.create('/with-store');

globalService.incrementObservable();

expect(globalService.globalObservableChangeCounter).toBe(2);

await harness.navigateByUrl('/without-store');
globalService.incrementObservable();

expect(globalService.globalObservableChangeCounter).toBe(2);
});

it('falls back to source injector when reactive method is called is outside of injection context', async () => {
@Component({
selector: 'app-with-store',
template: '',
standalone: true,
})
class WithStoreComponent implements OnInit {
store = inject(GlobalService);

ngOnInit() {
this.store.signalMethod(this.store.globalSignal);
this.store.observableMethod(this.store.globalObservable);
}
}

const globalService = setup(WithStoreComponent);
const harness = await RouterTestingHarness.create('/with-store');

// effect is destroyed → Signal is not tracked anymore
expect(globalService.globalSignalChangeCounter).toBe(1);
expect(globalService.globalObservableChangeCounter).toBe(1);

await harness.navigateByUrl('/without-store');
globalService.globalSignal.update((value) => value + 1);
globalService.incrementSignal();
TestBed.flushEffects();
globalService.incrementObservable();

expect(globalService.globalSignalChangeCounter).toBe(1);
expect(globalService.globalSignalChangeCounter).toBe(2);
expect(globalService.globalObservableChangeCounter).toBe(2);
});
});
});
37 changes: 23 additions & 14 deletions modules/signals/rxjs-interop/src/rx-method.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,24 @@ export function rxMethod<Input>(
assertInInjectionContext(rxMethod);
}

const injector = config?.injector ?? inject(Injector);
const destroyRef = injector.get(DestroyRef);
const sourceInjector = config?.injector ?? inject(Injector);
const source$ = new Subject<Input>();

const sourceSub = generator(source$).subscribe();
destroyRef.onDestroy(() => sourceSub.unsubscribe());
sourceInjector.get(DestroyRef).onDestroy(() => sourceSub.unsubscribe());

const rxMethodFn = (
input: Input | Signal<Input> | Observable<Input>,
config?: { injector?: Injector }
) => {
if (isSignal(input)) {
const instanceInjector = config?.injector ?? getCallerInjectorIfAvailable() ?? injector;
if (isStatic(input)) {
source$.next(input);
return { unsubscribe: noop };
}

const instanceInjector =
config?.injector ?? getCallerInjector() ?? sourceInjector;

if (isSignal(input)) {
const watcher = effect(
() => {
const value = input();
Expand All @@ -51,25 +55,30 @@ export function rxMethod<Input>(
return instanceSub;
}

if (isObservable(input)) {
const instanceSub = input.subscribe((value) => source$.next(value));
sourceSub.add(instanceSub);
const instanceSub = input.subscribe((value) => source$.next(value));
sourceSub.add(instanceSub);

return instanceSub;
if (instanceInjector !== sourceInjector) {
instanceInjector
.get(DestroyRef)
.onDestroy(() => instanceSub.unsubscribe());
}

source$.next(input);
return { unsubscribe: noop };
return instanceSub;
};
rxMethodFn.unsubscribe = sourceSub.unsubscribe.bind(sourceSub);

return rxMethodFn;
}

function getCallerInjectorIfAvailable(): Injector | null {
function isStatic<T>(value: T | Signal<T> | Observable<T>): value is T {
return !isSignal(value) && !isObservable(value);
}

function getCallerInjector(): Injector | null {
try {
return inject(Injector);
} catch (e) {
} catch {
return null;
}
}

0 comments on commit b3fcf82

Please sign in to comment.