Skip to content

Commit

Permalink
Merge pull request #18 from bitloops/develop
Browse files Browse the repository at this point in the history
v0.0.19 - Resubscribe upon reconnection
  • Loading branch information
danias authored Jan 6, 2022
2 parents 98b4527 + 3b0aa6f commit 7962205
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 20 deletions.
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bitloops",
"version": "0.0.18",
"version": "0.0.19",
"description": "NodeJS library for the Bitloops",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand All @@ -23,7 +23,7 @@
"Low-Code",
"Backend",
"Frontend",
"Worfklow Orchestration"
"Workflow Orchestration"
],
"author": "Bitloops",
"license": "Apache-2.0",
Expand All @@ -36,8 +36,8 @@
"eventsource": "^1.1.0"
},
"devDependencies": {
"@types/eventsource": "^1.1.7",
"@types/node": "^16.11.10",
"@types/eventsource": "^1.1.8",
"@types/node": "^17.0.8",
"nodemon": "^2.0.15",
"ts-node": "^10.3.0"
}
Expand Down
52 changes: 45 additions & 7 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,17 @@ export type BitloopsConfig = {
messagingSenderId: string;
};

/** Removes subscribe listener */
type UnSubscribe = () => void;

class Bitloops {
config: BitloopsConfig;
authType: AuthTypes;
authOptions: AuthenticationOptionsType | undefined;
subscribeConnection: EventSource;
subscribeConnectionId: string = '';
private subscribeConnection: EventSource;
private subscribeConnectionId: string = '';
private reconnectFreqSecs: number = 1;
private eventMap = new Map();

constructor(config: BitloopsConfig) {
this.config = config;
Expand Down Expand Up @@ -124,7 +129,8 @@ class Bitloops {
return true;
}

public async subscribe<dataType>(namedEvent: string, callback: (data: dataType) => void) {
public async subscribe<dataType>(namedEvent: string, callback: (data: dataType) => void): Promise<UnSubscribe> {
this.eventMap.set(namedEvent, callback);
const subscribeUrl = `${this.httpSecure()}://${this.config.server}/bitloops/events/subscribe/${
this.subscribeConnectionId
}`;
Expand All @@ -141,12 +147,20 @@ class Bitloops {

if (!this.subscribeConnectionId) {
this.subscribeConnectionId = response.data;
this.initializeSubscribeConnection();
this.setupEventSource();
}

this.subscribeConnection.addEventListener(namedEvent, (event) => {
const listenerCb = (event: MessageEvent<any>) => {
callback(JSON.parse(event.data));
});
}

this.subscribeConnection.addEventListener(namedEvent, listenerCb);

return () => {
this.subscribeConnection.removeEventListener(namedEvent, listenerCb);
this.eventMap.delete(namedEvent);
if (this.eventMap.size === 0) this.subscribeConnection.close();
}
}

private getAuthHeaderValues(
Expand Down Expand Up @@ -201,13 +215,34 @@ class Bitloops {
return headers;
}

private initializeSubscribeConnection() {
private sseReconnect() {
setTimeout(() => {
// console.log('Trying to reconnect sse with', this.reconnectFreqSecs);
this.setupEventSource();
this.reconnectFreqSecs = this.reconnectFreqSecs >= 60 ? 60 : this.reconnectFreqSecs * 2;
}, this.reconnectFreqSecs * 1000);
}

private async resubscribe() {
this.eventMap.forEach((callback, namedEvent) => {
this.subscribe(namedEvent, callback);
})
}

private setupEventSource() {
const url = `${this.httpSecure()}://${this.config.server}/bitloops/events/${this.subscribeConnectionId}`;

const headers = this.getAuthHeaders();
const eventSourceInitDict = { headers };

this.subscribeConnection = new EventSource(url, eventSourceInitDict);
this.resubscribe();

this.subscribeConnection.onopen = (e: any) => {
// console.log('Resetting retry timer...')
this.reconnectFreqSecs = 1;
}

this.subscribeConnection.onerror = (error: any) => {
this.subscribeConnection.close();
if (
Expand All @@ -221,6 +256,7 @@ class Bitloops {
this.authOptions?.authenticationType === AuthTypes.FirebaseUser &&
this.authOptions?.refreshTokenFunction
) {
/** On Auth error we can retry with same connId */
const newAccessToken = await this.authOptions.refreshTokenFunction();
if (newAccessToken) {
this.authOptions.user.accessToken = newAccessToken;
Expand All @@ -230,6 +266,8 @@ class Bitloops {
} else reject(error);
}
});
} else {
this.sseReconnect();
}
};
}
Expand Down
18 changes: 9 additions & 9 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@
resolved "https://registry.yarnpkg.com/@tsconfig/node16/-/node16-1.0.2.tgz#423c77877d0569db20e1fc80885ac4118314010e"
integrity sha512-eZxlbI8GZscaGS7kkc/trHTT5xgrjH3/1n2JDwusC9iahPKWMRvRjJSAN5mCXviuTGQ/lHnhvv8Q1YTpnfz9gA==

"@types/eventsource@^1.1.7":
version "1.1.7"
resolved "https://registry.yarnpkg.com/@types/eventsource/-/eventsource-1.1.7.tgz#685e864e7a2b1fb07d54c566a1a5c50d0a91ddc6"
integrity sha512-ac36T7U0sz2+vrYT3oTU4x0dnNOZcI2rmUFfSA8pIpXKYPNGHMZs8KzJ+WuF6aPRQu8RAvGgnoLUWMXhC7Widw==

"@types/node@^16.11.10":
version "16.11.10"
resolved "https://registry.yarnpkg.com/@types/node/-/node-16.11.10.tgz#2e3ad0a680d96367103d3e670d41c2fed3da61ae"
integrity sha512-3aRnHa1KlOEEhJ6+CvyHKK5vE9BcLGjtUpwvqYLRvYNQKMfabu3BwfJaA/SLW8dxe28LsNDjtHwePTuzn3gmOA==
"@types/eventsource@^1.1.8":
version "1.1.8"
resolved "https://registry.yarnpkg.com/@types/eventsource/-/eventsource-1.1.8.tgz#48ae1f3aaf9bb84c713038f354112cc7ceaad519"
integrity sha512-fJQNt9LijJCZwYvM6O30uLzdpAK9zs52Uc9iUW9M2Zsg0HQM6DLf6QysjC/wuFX+0798B8AppVMvgdO6IftPKQ==

"@types/node@^17.0.8":
version "17.0.8"
resolved "https://registry.yarnpkg.com/@types/node/-/node-17.0.8.tgz#50d680c8a8a78fe30abe6906453b21ad8ab0ad7b"
integrity sha512-YofkM6fGv4gDJq78g4j0mMuGMkZVxZDgtU0JRdx6FgiJDG+0fY0GKVolOV8WqVmEhLCXkQRjwDdKyPxJp/uucg==

abbrev@1:
version "1.1.1"
Expand Down

0 comments on commit 7962205

Please sign in to comment.