Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a cleanup mechanism for old db entries #104

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 50 additions & 11 deletions flottform/forms/src/flottform-channel-host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@ import {

export class FlottformChannelHost extends EventEmitter<FlottformEventMap> {
private flottformApi: string | URL;
private baseApi: string;
private endpointId: string = '';
private hostKey: string = '';
private createClientUrl: (params: { endpointId: string }) => Promise<string>;
private rtcConfiguration: RTCConfiguration;
private pollTimeForIceInMs: number;
private logger: Logger;

private keepConnectionAliveIntervalId: NodeJS.Timeout | undefined;
private state: FlottformState | 'disconnected' = 'new';
private channelNumber: number = 0;
private openPeerConnection: RTCPeerConnection | null = null;
Expand All @@ -35,6 +39,11 @@ export class FlottformChannelHost extends EventEmitter<FlottformEventMap> {
}) {
super();
this.flottformApi = flottformApi;
this.baseApi = (
this.flottformApi instanceof URL ? this.flottformApi : new URL(this.flottformApi)
)
.toString()
.replace(/\/$/, '');
this.createClientUrl = createClientUrl;
this.rtcConfiguration = DEFAULT_WEBRTC_CONFIG;
this.pollTimeForIceInMs = pollTimeForIceInMs;
Expand All @@ -55,14 +64,9 @@ export class FlottformChannelHost extends EventEmitter<FlottformEventMap> {
if (this.openPeerConnection) {
this.close();
}
const baseApi = (
this.flottformApi instanceof URL ? this.flottformApi : new URL(this.flottformApi)
)
.toString()
.replace(/\/$/, '');

try {
this.rtcConfiguration.iceServers = await this.fetchIceServers(baseApi);
this.rtcConfiguration.iceServers = await this.fetchIceServers();
} catch (error) {
// Use the default configuration as a fallback
this.logger.error(error);
Expand All @@ -74,11 +78,13 @@ export class FlottformChannelHost extends EventEmitter<FlottformEventMap> {
const session = await this.openPeerConnection.createOffer();
await this.openPeerConnection.setLocalDescription(session);

const { endpointId, hostKey } = await this.createEndpoint(baseApi, session);
const { endpointId, hostKey } = await this.createEndpoint(this.baseApi, session);
this.hostKey = hostKey;
this.endpointId = endpointId;
this.logger.log('Created endpoint', { endpointId, hostKey });

const getEndpointInfoUrl = `${baseApi}/${endpointId}`;
const putHostInfoUrl = `${baseApi}/${endpointId}/host`;
const getEndpointInfoUrl = `${this.baseApi}/${endpointId}`;
const putHostInfoUrl = `${this.baseApi}/${endpointId}/host`;

const hostIceCandidates = new Set<RTCIceCandidateInit>();
await this.putHostInfo(putHostInfoUrl, hostKey, hostIceCandidates, session);
Expand All @@ -103,6 +109,20 @@ export class FlottformChannelHost extends EventEmitter<FlottformEventMap> {
this.openPeerConnection = null;
}
this.changeState('disconnected');
// Stop heartbeat function.
clearInterval(this.keepConnectionAliveIntervalId);
// Cleanup old entries.
this.deleteEndpoint(this.baseApi, this.endpointId, this.hostKey);
};

private keepConnectionAlive = async (getEndpointInfoUrl: string) => {
this.keepConnectionAliveIntervalId = setInterval(
async () => {
// Make a GET request to refresh the connection
await retrieveEndpointInfo(getEndpointInfoUrl);
},
5 * 60 * 1000
);
};

private setupDataChannelListener = () => {
Expand Down Expand Up @@ -170,12 +190,18 @@ export class FlottformChannelHost extends EventEmitter<FlottformEventMap> {
this.logger.info(`onconnectionstatechange - ${this.openPeerConnection!.connectionState}`);
if (this.openPeerConnection!.connectionState === 'connected') {
this.stopPollingForConnection();
// Start the heartbeat process
this.keepConnectionAlive(getEndpointInfoUrl);
}
if (this.openPeerConnection!.connectionState === 'disconnected') {
this.startPollingForConnection(getEndpointInfoUrl);
// Stop the hearbeat process
clearInterval(this.keepConnectionAliveIntervalId);
}
if (this.openPeerConnection!.connectionState === 'failed') {
this.stopPollingForConnection();
// Stop the hearbeat process
clearInterval(this.keepConnectionAliveIntervalId);
this.changeState('error', { message: 'connection-failed' });
}
};
Expand Down Expand Up @@ -223,8 +249,21 @@ export class FlottformChannelHost extends EventEmitter<FlottformEventMap> {
return response.json();
};

private fetchIceServers = async (baseApi: string) => {
const response = await fetch(`${baseApi}/ice-server-credentials`, {
private deleteEndpoint = async (baseApi: string, endpointId: string, hostKey: string) => {
const response = await fetch(`${baseApi}/${endpointId}`, {
method: 'DELETE',
headers: {
Accept: 'application/json',
'Content-Type': 'application/json'
},
body: JSON.stringify({ hostKey })
});

return response.json();
};

private fetchIceServers = async () => {
const response = await fetch(`${this.baseApi}/ice-server-credentials`, {
method: 'GET',
headers: {
Accept: 'application/json'
Expand Down
73 changes: 70 additions & 3 deletions flottform/server/src/database.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { describe, expect, it } from 'vitest';
import { beforeEach, afterEach, describe, expect, it, vi } from 'vitest';
import { createFlottformDatabase } from './database';

describe('Flottform database', () => {
Expand Down Expand Up @@ -33,7 +33,8 @@ describe('Flottform database', () => {
await db.putHostInfo({
endpointId,
hostKey: 'clearly-wrong',
iceCandidates: []
iceCandidates: [],
session: offer
})
).rejects.toThrow(/hostkey/i);
});
Expand Down Expand Up @@ -168,9 +169,75 @@ describe('Flottform database', () => {
session: answer,
iceCandidates: []
})
).rejects.toThrow(/peerkey/i);
).rejects.toThrow(
/clientKey is wrong: Another peer is already connected and you cannot change this info without the correct key anymore. If you lost your key, initiate a new Flottform connection./i
);
Comment on lines +172 to +174
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since peerkey isn't used in this error message but I don't see a change to the error itself, was/is this test not working? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that test was not working since as you've said the string 'peerkey' is not used anymore and is replaced by this string: 'clientKey is wrong: Another peer is already connected and you cannot change this info without the correct key anymore. If you lost your key, initiate a new Flottform connection.'
--> I fixed it in this commit: 481b54b

const infoAfter = await db.getEndpoint({ endpointId });
expect(infoBefore).toStrictEqual(infoAfter);
});
});

describe('startCleanup()', () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});

it('Should clean up stale entries after entryTTL', async () => {
const db = await createFlottformDatabase({
cleanupPeriod: 1000,
entryTimeToLive: 500
});
const conn = new RTCPeerConnection();
const offer = await conn.createOffer();
const { endpointId } = await db.createEndpoint({ session: offer });

const connPeer = new RTCPeerConnection();
await connPeer.setRemoteDescription(offer);
const answer = await connPeer.createAnswer();
const clientKey = 'random-key';

await db.putClientInfo({
endpointId,
clientKey,
session: answer,
iceCandidates: []
});

// Sleep for enough time to trigger the first cleanup
vi.advanceTimersByTime(1100);

// The endpoint should be cleaned by now
expect(async () => await db.getEndpoint({ endpointId })).rejects.toThrow(/endpoint/i);
});

it("Shouldn't clean up entries before entryTTL is expired", async () => {
const db = await createFlottformDatabase({
cleanupPeriod: 1000,
entryTimeToLive: 500
});
const conn = new RTCPeerConnection();
const offer = await conn.createOffer();
const { endpointId } = await db.createEndpoint({ session: offer });

const connPeer = new RTCPeerConnection();
await connPeer.setRemoteDescription(offer);
const answer = await connPeer.createAnswer();
const clientKey = 'random-key';

await db.putClientInfo({
endpointId,
clientKey,
session: answer,
iceCandidates: []
});

// The endpoint shouldn't be cleaned by now
const retrievedInfo = await db.getEndpoint({ endpointId });
expect(retrievedInfo).toBeDefined();
expect(retrievedInfo?.hostInfo.session).toStrictEqual(offer);
});
});
});
88 changes: 78 additions & 10 deletions flottform/server/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ type EndpointInfo = {
session: RTCSessionDescriptionInit;
iceCandidates: RTCIceCandidateInit[];
};
lastUpdate: number;
};
type SafeEndpointInfo = Omit<EndpointInfo, 'hostKey' | 'clientKey'>;
type SafeEndpointInfo = Omit<EndpointInfo, 'hostKey' | 'clientKey' | 'lastUpdate'>;

const DEFAULT_CLEANUP_PERIOD = 30 * 60 * 1000;
const DEFAULT_ENTRY_TIME_TO_LIVE_IN_MS = 25 * 60 * 1000;

function createRandomHostKey(): string {
return crypto.randomUUID();
Expand All @@ -25,8 +29,52 @@ function createRandomEndpointId(): string {

class FlottformDatabase {
private map = new Map<EndpointId, EndpointInfo>();
private cleanupTimeoutId: NodeJS.Timeout | null = null;
private cleanupPeriod: number;
private entryTimeToLive: number;

constructor({
cleanupPeriod = DEFAULT_CLEANUP_PERIOD,
entryTimeToLive = DEFAULT_ENTRY_TIME_TO_LIVE_IN_MS
}: {
cleanupPeriod?: number;
entryTimeToLive?: number;
} = {}) {
this.cleanupPeriod = cleanupPeriod;
this.entryTimeToLive = entryTimeToLive;
this.startCleanup();
}

constructor() {}
private startCleanup() {
this.cleanupTimeoutId = setTimeout(this.cleanupFn.bind(this), this.cleanupPeriod);
}

private cleanupFn() {
if (this.map && this.map.size !== 0) {
const now = Date.now();
// Loop over all entries and delete the stale ones.
for (const [endpointId, endpointInfo] of this.map) {
const lastUpdated = endpointInfo.lastUpdate;
if (now - lastUpdated > this.entryTimeToLive) {
this.map.delete(endpointId);
}
}
}
this.cleanupTimeoutId = setTimeout(this.startCleanup.bind(this), this.cleanupPeriod);
}

private stopCleanup() {
// Clear the interval to stop cleanup
if (this.cleanupTimeoutId) {
clearTimeout(this.cleanupTimeoutId);
this.cleanupTimeoutId = null;
}
}

// Stop the cleanup when the database is no longer needed
destroy() {
this.stopCleanup();
}

async createEndpoint({ session }: { session: RTCSessionDescriptionInit }): Promise<EndpointInfo> {
const entry = {
Expand All @@ -35,7 +83,8 @@ class FlottformDatabase {
hostInfo: {
session,
iceCandidates: []
}
},
lastUpdate: Date.now()
};
this.map.set(entry.endpointId, entry);
return entry;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exposing lastUpdate, see comment below

Expand All @@ -46,7 +95,8 @@ class FlottformDatabase {
if (!entry) {
throw Error('Endpoint not found');
}
const { hostKey: _ignore1, clientKey: _ignore2, ...endpoint } = entry;
entry.lastUpdate = Date.now();
const { hostKey: _ignore1, clientKey: _ignore2, lastUpdate: _ignore3, ...endpoint } = entry;

return endpoint;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lastUpdate should not be exposed to the client as it should only be used internally for cleanup

}
Expand All @@ -72,11 +122,17 @@ class FlottformDatabase {

const newInfo = {
...existingSession,
hostInfo: { ...existingSession.hostInfo, session, iceCandidates }
hostInfo: { ...existingSession.hostInfo, session, iceCandidates },
lastUpdate: Date.now()
};
this.map.set(endpointId, newInfo);

const { hostKey: _ignore1, clientKey: _ignore2, ...newEndpoint } = newInfo;
const {
hostKey: _ignore1,
clientKey: _ignore2,
lastUpdate: _ignore3,
...newEndpoint
} = newInfo;

return newEndpoint;
}
Expand Down Expand Up @@ -105,11 +161,17 @@ class FlottformDatabase {
const newInfo = {
...existingSession,
clientKey,
clientInfo: { session, iceCandidates }
clientInfo: { session, iceCandidates },
lastUpdate: Date.now()
};
this.map.set(endpointId, newInfo);

const { hostKey: _ignore1, clientKey: _ignore2, ...newEndpoint } = newInfo;
const {
hostKey: _ignore1,
clientKey: _ignore2,
lastUpdate: _ignore3,
...newEndpoint
} = newInfo;

return newEndpoint;
}
Expand All @@ -130,8 +192,14 @@ class FlottformDatabase {
}
}

export async function createFlottformDatabase(): Promise<FlottformDatabase> {
return new FlottformDatabase();
export async function createFlottformDatabase({
cleanupPeriod = DEFAULT_CLEANUP_PERIOD,
entryTimeToLive = DEFAULT_ENTRY_TIME_TO_LIVE_IN_MS
}: {
cleanupPeriod?: number;
entryTimeToLive?: number;
} = {}): Promise<FlottformDatabase> {
return new FlottformDatabase({ cleanupPeriod, entryTimeToLive });
}

export type { FlottformDatabase };