Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch client updates #1448

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion apps/client/src/common/utils/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export const connectSocket = () => {
}

socketSendJson('set-client-type', 'ontime');
socketSendJson('set-client-use-patch', 'ontime');
setOnlineStatus(true);
};

Expand All @@ -64,7 +65,6 @@ export const connectSocket = () => {
websocket.onmessage = (event) => {
try {
const data = JSON.parse(event.data);

const { type, payload } = data;

if (!type) {
Expand Down Expand Up @@ -139,6 +139,12 @@ export const connectSocket = () => {
updateDevTools(serverPayload);
break;
}
case 'ontime-patch': {
patchRuntime(payload);
cpvalente marked this conversation as resolved.
Show resolved Hide resolved
updateDevTools(payload);
break;
}
//TODO: remove all other types as they are now patched
case 'ontime-clock': {
patchRuntimeProperty('clock', payload);
updateDevTools({ clock: payload });
Expand Down
57 changes: 56 additions & 1 deletion apps/server/src/adapters/WebsocketAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* Payload: adds necessary payload for the request to be completed
*/

import { Client, LogOrigin } from 'ontime-types';
import { Client, LogOrigin, RuntimeStore } from 'ontime-types';

import { WebSocket, WebSocketServer } from 'ws';
import type { Server } from 'http';
Expand All @@ -36,6 +36,10 @@ export class SocketServer implements IAdapter {
private lastConnection: Date | null = null;
private shouldShowWelcome = true;

//should we be tacking versions
alex-Arc marked this conversation as resolved.
Show resolved Hide resolved
private readonly patchClients: Map<string, WebSocket>;
private readonly keyClients: Map<string, WebSocket>;

constructor() {
if (instance) {
throw new Error('There can be only one');
Expand All @@ -44,6 +48,8 @@ export class SocketServer implements IAdapter {
// eslint-disable-next-line @typescript-eslint/no-this-alias -- this logic is used to ensure singleton
instance = this;
this.clients = new Map<string, Client>();
this.keyClients = new Map<string, WebSocket>();
this.patchClients = new Map<string, WebSocket>();
this.wss = null;
}

Expand All @@ -61,6 +67,8 @@ export class SocketServer implements IAdapter {
path: '',
});

this.keyClients.set(clientId, ws);

this.lastConnection = new Date();
logger.info(LogOrigin.Client, `${this.clients.size} Connections with new: ${clientId}`);

Expand Down Expand Up @@ -92,6 +100,9 @@ export class SocketServer implements IAdapter {

ws.on('close', () => {
this.clients.delete(clientId);
this.patchClients.delete(clientId);
this.keyClients.delete(clientId);

logger.info(LogOrigin.Client, `${this.clients.size} Connections with disconnected: ${clientId}`);
this.sendClientList();
});
Expand Down Expand Up @@ -175,6 +186,18 @@ export class SocketServer implements IAdapter {
return;
}

if (type === 'set-client-use-patch') {
this.keyClients.delete(clientId);
this.patchClients.set(clientId, ws);
return;
}

if (type === 'set-client-use-key') {
this.patchClients.delete(clientId);
this.keyClients.set(clientId, ws);
return;
}

// Protocol specific stuff handled above
try {
const reply = dispatchFromAdapter(type, payload, 'ws');
Expand Down Expand Up @@ -257,6 +280,38 @@ export class SocketServer implements IAdapter {
}
}

public sendRuntimeStoreUpdate(keysToUpdate: (keyof RuntimeStore)[], store: Partial<RuntimeStore>) {
// create a patch object with all the keys marked for updating
const patch = {};
keysToUpdate.map((key) => {
Object.assign(patch, { [key]: store[key] });
});

//convert to JSON once and reuse
const stringifiedPatch = JSON.stringify({ type: 'ontime-patch', payload: patch });

// for each client that have subscribed to the patch method send the patch object
this.patchClients.forEach((ws) => {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think this is necessary?

I understand the motivation behind it, but I wonder if trying to keep backwards compatibility makes our code harder and more bug prone

I am ok with this as long as we are not trading the above for the compatibility

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would have to do a breaking version change right?
I would love to keep it simple, but I have no idea what things that are dependent one us?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we continue sending the messages in serial?
the logic of the store should accumulate changes in the same event loop iteration, maybe the small difference in milliseconds on the message being received and parsed will not be visible?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm yeah it's worth a try

if (ws.readyState === WebSocket.OPEN) {
ws.send(stringifiedPatch);
}
});

// all the old type clients are sent the normal way
while (keysToUpdate.length) {
// for each key in the list
const key = keysToUpdate.pop();
// create a reuseble JSON object
const stringifiedMessage = JSON.stringify({ type: `ontime-${key}`, payload: store[key] });
//and send it to all client then hanve not switch to the patch method
this.keyClients.forEach((ws) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(stringifiedMessage);
}
});
}
}

shutdown() {
this.wss?.close();
}
Expand Down
13 changes: 8 additions & 5 deletions apps/server/src/services/runtime-service/RuntimeService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ function broadcastResult(_target: any, _propertyKey: string, descriptor: Propert
// we do the comparison by explicitly for each property
// to apply custom logic for different datasets

const shouldForceTimerUpdate = getForceUpdate(RuntimeService.previousTimerUpdate, state.clock);
const shouldForceTimerUpdate = getForceUpdate(RuntimeService.previousTimerUpdate, state.clock); //FIXME: pause causess an avlache of update if timer is negative
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?? lets chat

const shouldUpdateTimer =
shouldForceTimerUpdate || getShouldTimerUpdate(RuntimeService.previousTimerValue, state.timer.current);

Expand All @@ -696,12 +696,16 @@ function broadcastResult(_target: any, _propertyKey: string, descriptor: Propert
if (hasImmediateChanges || (shouldUpdateTimer && !deepEqual(RuntimeService.previousState?.timer, state.timer))) {
RuntimeService.previousTimerUpdate = state.clock;
RuntimeService.previousTimerValue = state.timer.current;
RuntimeService.previousClockUpdate = state.clock;
eventStore.set('clock', state.clock);
eventStore.set('timer', state.timer);
RuntimeService.previousState.timer = { ...state.timer };
}

if (hasChangedPlayback || (shouldUpdateTimer && !deepEqual(RuntimeService.previousState?.runtime, state.runtime))) {
eventStore.set('runtime', state.runtime);
RuntimeService.previousClockUpdate = state.clock;
eventStore.set('clock', state.clock);
RuntimeService.previousState.runtime = { ...state.runtime };
}

Expand All @@ -711,15 +715,14 @@ function broadcastResult(_target: any, _propertyKey: string, descriptor: Propert
updateEventIfChanged('eventNext', state);
updateEventIfChanged('publicEventNext', state);

let syncBlockStartAt = false;

if (!deepEqual(RuntimeService?.previousState.currentBlock, state.currentBlock)) {
eventStore.set('currentBlock', state.currentBlock);
RuntimeService.previousState.currentBlock = { ...state.currentBlock };
syncBlockStartAt = true;
RuntimeService.previousClockUpdate = state.clock;
eventStore.set('clock', state.clock);
}

const shouldUpdateClock = syncBlockStartAt || getShouldClockUpdate(RuntimeService.previousClockUpdate, state.clock);
const shouldUpdateClock = getShouldClockUpdate(RuntimeService.previousClockUpdate, state.clock);

if (shouldUpdateClock) {
RuntimeService.previousClockUpdate = state.clock;
Expand Down
23 changes: 13 additions & 10 deletions apps/server/src/stores/EventStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ export type StoreGetter = <T extends keyof RuntimeStore>(key: T) => Partial<Runt

let store: Partial<RuntimeStore> = {};

const changedKeys = new Array<keyof RuntimeStore>();
let isUpdatePending: NodeJS.Immediate | null = null;
cpvalente marked this conversation as resolved.
Show resolved Hide resolved
/**
* A runtime store that broadcasts its payload
* - init: allows for adding an initial payload to the store
Expand All @@ -23,16 +25,17 @@ export const eventStore = {
},
set<T extends keyof RuntimeStore>(key: T, value: RuntimeStore[T]) {
store[key] = value;
socket.sendAsJson({
type: `ontime-${key}`,
payload: value,
});
},
batchSet(values: Partial<RuntimeStore>) {
Object.entries(values).forEach(([key, value]) => {
store[key] = value;
});
this.broadcast();

// check if the key is already marked for and update otherwise push it onto the update array
if (!changedKeys.includes(key)) changedKeys.push(key);

//if there is already and update pending we don't need to schedule another one
if (!isUpdatePending) {
isUpdatePending = setImmediate(() => {
socket.sendRuntimeStoreUpdate(changedKeys, store);
isUpdatePending = null;
});
}
},
poll() {
return store;
Expand Down
Loading