Skip to content

Commit

Permalink
Fixed resubscription upon reconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
danias committed Jan 6, 2022
1 parent de75e47 commit 3b0aa6f
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 9 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"Low-Code",
"Backend",
"Frontend",
"Worfklow Orchestration"
"Workflow Orchestration"
],
"author": "Bitloops",
"license": "Apache-2.0",
Expand Down
21 changes: 13 additions & 8 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import axios from 'axios';
import EventSource, { ReadyState } from 'eventsource';
import EventSource from 'eventsource';

export enum AuthTypes {
Anonymous = 'Anonymous',
Expand Down Expand Up @@ -51,6 +51,7 @@ class Bitloops {
private subscribeConnection: EventSource;
private subscribeConnectionId: string = '';
private reconnectFreqSecs: number = 1;
private eventMap = new Map();

constructor(config: BitloopsConfig) {
this.config = config;
Expand Down Expand Up @@ -129,6 +130,7 @@ class Bitloops {
}

public async subscribe<dataType>(namedEvent: string, callback: (data: dataType) => void): Promise<UnSubscribe> {
this.eventMap.set(namedEvent, callback);
const subscribeUrl = `${this.httpSecure()}://${this.config.server}/bitloops/events/subscribe/${
this.subscribeConnectionId
}`;
Expand Down Expand Up @@ -156,8 +158,8 @@ class Bitloops {

return () => {
this.subscribeConnection.removeEventListener(namedEvent, listenerCb);
// Remove topic from state
// Close connection when topics=0
this.eventMap.delete(namedEvent);
if (this.eventMap.size === 0) this.subscribeConnection.close();
}
}

Expand Down Expand Up @@ -217,17 +219,24 @@ class Bitloops {
setTimeout(() => {
// console.log('Trying to reconnect sse with', this.reconnectFreqSecs);
this.setupEventSource();
this.reconnectFreqSecs = this.reconnectFreqSecs >= 64 ? 64 : this.reconnectFreqSecs * 2;
this.reconnectFreqSecs = this.reconnectFreqSecs >= 60 ? 60 : this.reconnectFreqSecs * 2;
}, this.reconnectFreqSecs * 1000);
}

private async resubscribe() {
this.eventMap.forEach((callback, namedEvent) => {
this.subscribe(namedEvent, callback);
})
}

private setupEventSource() {
const url = `${this.httpSecure()}://${this.config.server}/bitloops/events/${this.subscribeConnectionId}`;

const headers = this.getAuthHeaders();
const eventSourceInitDict = { headers };

this.subscribeConnection = new EventSource(url, eventSourceInitDict);
this.resubscribe();

this.subscribeConnection.onopen = (e: any) => {
// console.log('Resetting retry timer...')
Expand Down Expand Up @@ -258,10 +267,6 @@ class Bitloops {
}
});
} else {
/** Network error, connId is garbage, need to
* TODO re-subscribe and fetch
* a new One when reconnecting
TODO re-subscribe for new Conn-Id with list of topics */
this.sseReconnect();
}
};
Expand Down

0 comments on commit 3b0aa6f

Please sign in to comment.