Skip to content

Commit

Permalink
[sync] BusStats and some multi-way tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesgpearce committed Apr 8, 2024
1 parent b41731a commit 5be6d8e
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 33 deletions.
73 changes: 42 additions & 31 deletions src/persisters/persister-sync.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
Bus,
BusStats,
Receive,
Send,
SyncPersister,
Expand All @@ -15,15 +16,16 @@ import {
TablesDelta,
ValuesStamp,
} from '../types/mergeable-store';
import {Id, IdOrNull, Ids} from '../types/common';
import {IdMap, mapGet, mapNew, mapSet} from '../common/map';
import {collDel, collForEach} from '../common/coll';
import {
DEBUG,
ifNotUndefined,
isUndefined,
promiseAll,
promiseNew,
} from '../common/other';
import {Id, IdOrNull, Ids} from '../types/common';
import {IdMap, mapGet, mapNew, mapSet} from '../common/map';
import {collDel, collForEach, collSize} from '../common/coll';
import {EMPTY_STRING} from '../common/strings';
import {PersisterListener} from '../types/persisters';
import {createCustomPersister} from '../persisters';
Expand Down Expand Up @@ -231,33 +233,42 @@ export const createSyncPersister = ((
}) as typeof createSyncPersisterDecl;

export const createLocalBus = (() => {
let sends = 0;
let receives = 0;
const stores: IdMap<Receive> = mapNew();
const join = (storeId: Id, receive: Receive): [Send, () => void] => {
mapSet(stores, storeId, receive);
const send = (
requestId: IdOrNull,
toStoreId: IdOrNull,
message: string,
payload: any,
args?: Ids,
): void =>
isUndefined(toStoreId)
? collForEach(stores, (receive, otherStoreId) =>
otherStoreId != storeId
? receive(requestId, storeId, message, payload, args)
: 0,
)
: mapGet(stores, toStoreId)?.(
requestId,
storeId,
message,
payload,
args,
);
const leave = (): void => {
collDel(stores, storeId);
};
return [send, leave];
};
return [join];
return [
(storeId: Id, receive: Receive): [Send, () => void] => {
mapSet(stores, storeId, receive);
const send = (
requestId: IdOrNull,
toStoreId: IdOrNull,
message: string,
payload: any,
args?: Ids,
): void => {
if (DEBUG) {
sends++;
receives += isUndefined(toStoreId) ? collSize(stores) - 1 : 1;
}
isUndefined(toStoreId)
? collForEach(stores, (receive, otherStoreId) =>
otherStoreId != storeId
? receive(requestId, storeId, message, payload, args)
: 0,
)
: mapGet(stores, toStoreId)?.(
requestId,
storeId,
message,
payload,
args,
);
};
const leave = (): void => {
collDel(stores, storeId);
};
return [send, leave];
},
(): BusStats => (DEBUG ? {sends, receives} : {}),
];
}) as typeof createLocalBusDecl;
5 changes: 5 additions & 0 deletions src/types/docs/persisters/persister-sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
*
*/
/// Bus
/**
* The BusStats type
*
*/
/// BusStats
/**
* The SyncPersister interface is a minor extension to the Persister interface.
*
Expand Down
4 changes: 4 additions & 0 deletions src/types/persisters/persister-sync.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ export type Send = (
args?: Ids,
) => void;

/// BusStats
export type BusStats = {sends?: number; receives?: number};

/// Bus
export type Bus = [
join: (storeId: Id, receive: Receive) => [send: Send, leave: () => void],
getStats: () => BusStats,
];

/// SyncPersister
Expand Down
4 changes: 4 additions & 0 deletions src/types/with-schemas/persisters/persister-sync.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ export type Send = (
args?: Ids,
) => void;

/// BusStats
export type BusStats = {sends?: number; receives?: number};

/// Bus
export type Bus = [
join: (storeId: Id, receive: Receive) => [send: Send, leave: () => void],
getStats: () => BusStats,
];

/// SyncPersister
Expand Down
76 changes: 74 additions & 2 deletions test/unit/persisters/persister-sync.test.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
/* eslint-disable jest/no-conditional-expect */

import {Content, MergeableStore, createMergeableStore} from 'tinybase/debug';
import {
Bus,
SyncPersister,
createLocalBus,
createSyncPersister,
} from 'tinybase/debug/persisters/persister-sync';
import {Content, MergeableStore, createMergeableStore} from 'tinybase/debug';
import {pause} from '../common/other';
import {resetHlc} from '../common/mergeable';

beforeEach(() => {
resetHlc();
});

let bus: Bus;
let store1: MergeableStore;
let store2: MergeableStore;
let persister1: SyncPersister;
Expand All @@ -33,7 +35,7 @@ const expectEachToHaveContent = async (
};

beforeEach(() => {
const bus = createLocalBus();
bus = createLocalBus();
store1 = createMergeableStore('s1');
store2 = createMergeableStore('s2');
persister1 = createSyncPersister(store1, bus, 0.001);
Expand Down Expand Up @@ -362,3 +364,73 @@ describe('Bidirectional', () => {
await expectEachToHaveContent([{}, {v1: 2}]);
});
});

describe('bus getStats', () => {
test('2 stores', async () => {
await persister1.startSync();
await persister2.startSync();
await pause(2, true);

store1.setTable('t1', {r1: {c1: 1}});
await pause(2, true);
store2.setTable('t2', {r2: {c2: 2}});
await pause(2, true);

const [, getBusStats] = bus;
expect(getBusStats()).toEqual({sends: 25, receives: 25});

expect(store1.getContent()).toEqual(store2.getContent());
});

test('3 stores', async () => {
const store3 = createMergeableStore('s3');
const persister3 = createSyncPersister(store3, bus, 0.001);

await persister1.startSync();
await persister2.startSync();
await persister3.startSync();
await pause(2, true);

store1.setTable('t1', {r1: {c1: 1}});
await pause(2, true);
store2.setTable('t2', {r2: {c2: 2}});
await pause(2, true);
store3.setTable('t3', {r3: {c3: 3}});
await pause(2, true);

const [, getBusStats] = bus;
expect(getBusStats()).toEqual({sends: 66, receives: 75});

expect(store1.getContent()).toEqual(store2.getContent());
expect(store2.getContent()).toEqual(store3.getContent());
});

test('4 stores', async () => {
const store3 = createMergeableStore('s3');
const persister3 = createSyncPersister(store3, bus, 0.001);
const store4 = createMergeableStore('s4');
const persister4 = createSyncPersister(store4, bus, 0.001);

await persister1.startSync();
await persister2.startSync();
await persister3.startSync();
await persister4.startSync();
await pause(2, true);

store1.setTable('t1', {r1: {c1: 1}});
await pause(2, true);
store2.setTable('t2', {r2: {c2: 2}});
await pause(2, true);
store3.setTable('t3', {r3: {c3: 3}});
await pause(2, true);
store4.setTable('t4', {r4: {c4: 4}});
await pause(2, true);

const [, getBusStats] = bus;
expect(getBusStats()).toEqual({sends: 126, receives: 150});

expect(store1.getContent()).toEqual(store2.getContent());
expect(store2.getContent()).toEqual(store3.getContent());
expect(store3.getContent()).toEqual(store4.getContent());
});
});

0 comments on commit 5be6d8e

Please sign in to comment.