Skip to content

Commit

Permalink
Feature add unsubscribe handler (#31)
Browse files Browse the repository at this point in the history
* added unsubscribe handler

* removed redundant log

* fixed unbsubscribe type

* removed spaces from slashes

* Unsubscribe changes
Co-authored-by: Markos Girgis <[email protected]>

* Change rest unsubscribe payload

* Remove console.logs

Co-authored-by: ellik95 <[email protected]>
Co-authored-by: Markos Girgis <[email protected]>
  • Loading branch information
3 people authored Mar 1, 2022
1 parent 29d8fcb commit 6c32451
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 33 deletions.
8 changes: 7 additions & 1 deletion src/definitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ export type BitloopsUser = {
};

/** Removes subscribe listener */
export type Unsubscribe = () => void;
export type Unsubscribe = () => Promise<void>;

export type UnsubscribeParams = {
subscriptionId: string;
namedEvent: string;
listenerCallback: (event: MessageEvent<any>) => void;
}

export enum AuthTypes {
Anonymous = 'Anonymous',
Expand Down
91 changes: 59 additions & 32 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
IBitloopsAuthenticationOptions,
Unsubscribe,
IInternalStorage,
UnsubscribeParams,
} from './definitions';
import { isTokenExpired } from './helpers';
import InternalStorageFactory from './InternalStorage/InternalStorageFactory';
Expand Down Expand Up @@ -146,7 +147,7 @@ class Bitloops {
namedEvent: string,
callback: (data: DataType) => void,
): Promise<Unsubscribe> {
console.log('subscribing topic:', namedEvent);
// console.log('subscribing topic:', namedEvent);
this.eventMap.set(namedEvent, callback);
/** Retry if connection is being initialized */
if (this.subscriptionId === '' && this.sseIsBeingInitialized) {
Expand All @@ -170,11 +171,9 @@ class Bitloops {
console.error('registerTopicORConnection error');
// console.error('registerTopicORConnection', error);
this.sseIsBeingInitialized = false;
// this.eventMap.delete(namedEvent);
throw error;
return () => null;
throw new Error(`Unsubscribe error: ${JSON.stringify(error)}`);
}
console.log('registerTopicORConnection success', response.data);
// console.log('registerTopicORConnection success', response.data);

/** If you are the initiator, establish sse connection */
if (this.sseIsBeingInitialized === true && this.subscriptionId === '') {
Expand All @@ -187,17 +186,14 @@ class Bitloops {
*/

const listenerCallback = (event: MessageEvent<any>) => {
console.log(`received event for namedEvent: ${namedEvent}`);
// console.log(`received event for namedEvent: ${namedEvent}`);
callback(JSON.parse(event.data));
};
// console.log('this.subscribeConnection', this.subscribeConnection);
// console.log(`add event listener for namedEvent: ${namedEvent}`);
this.subscribeConnection.addEventListener(namedEvent, listenerCallback);

return () => {
this.subscribeConnection.removeEventListener(namedEvent, listenerCallback);
this.eventMap.delete(namedEvent);
if (this.eventMap.size === 0) this.subscribeConnection.close();
};
return this.unsubscribe({ namedEvent, subscriptionId: this.subscriptionId, listenerCallback });
}

private httpSecure(): 'http' | 'https' {
Expand All @@ -219,17 +215,51 @@ class Bitloops {
return headers;
}

/**
* Removes event listener from subscription.
* Deletes events from mapping that had been subscribed.
* Handles remaining dead subscription connections, in order to not send events.
* @param subscriptionId
* @param namedEvent
* @param listenerCallback
* @returns void
*/
private unsubscribe({ subscriptionId, namedEvent, listenerCallback }: UnsubscribeParams) {
return async (): Promise<void> => {
this.subscribeConnection.removeEventListener(namedEvent, listenerCallback);
// console.log(`removed eventListener for ${namedEvent}`);
this.eventMap.delete(namedEvent);
if (this.eventMap.size === 0) this.subscribeConnection.close();

const unsubscribeUrl = `${this.httpSecure()}://${
this.config.server
}/bitloops/events/unsubscribe/${subscriptionId}`;

const headers = await this.getAuthHeaders();

await this.axiosHandler(
{
url: unsubscribeUrl,
method: 'POST',
headers,
data: { workspaceId: this.config.workspaceId, topic: namedEvent },
},
Bitloops.axiosInstance,
);
};
}

/**
* Gets a new connection Id if called from the first subscriber
* In all cases it registers the topic to the Connection Id
* @param subscriptionConnectionId
* @param subscriptionId
* @param namedEvent
* @returns
*/
private async registerTopicORConnection(subscriptionConnectionId: string, namedEvent: string) {
private async registerTopicORConnection(subscriptionId: string, namedEvent: string) {
const subscribeUrl = `${this.httpSecure()}://${
this.config.server
}/bitloops/events/subscribe/${subscriptionConnectionId}`;
}/bitloops/events/subscribe/${subscriptionId}`;

const headers = await this.getAuthHeaders();
try {
Expand Down Expand Up @@ -261,30 +291,27 @@ class Bitloops {
}

private async tryToResubscribe() {
console.log('Attempting to resubscribe');
console.log(' this.eventMap.length', this.eventMap.size);
// console.log('Attempting to resubscribe');
// console.log(' this.eventMap.length', this.eventMap.size);
const subscribePromises = Array.from(this.eventMap.entries()).map(([namedEvent, callback]) =>
this.subscribe(namedEvent, callback),
);
try {
console.log('this.eventMap length', subscribePromises.length);
// console.log('this.eventMap length', subscribePromises.length);
await Promise.all(subscribePromises);
console.log('Resubscribed all topic successfully!');
// console.log('Resubscribed all topic successfully!');
// All subscribes were successful => done
} catch (error) {
// >= 1 subscribes failed => retry
console.log(`Failed to resubscribe, retrying... in ${this.reconnectFreqSecs}`);
// console.log(`Failed to resubscribe, retrying... in ${this.reconnectFreqSecs}`);
this.subscribeConnection.close();
this.sseReconnect();
}
}

private async setupEventSource() {
const subscriptionConnectionId = this.subscriptionId;
console.log('setting up eventSource with', subscriptionConnectionId);
const url = `${this.httpSecure()}://${
this.config.server
}/bitloops/events/${subscriptionConnectionId}`;
const { subscriptionId } = this;
const url = `${this.httpSecure()}://${this.config.server}/bitloops/events/${subscriptionId}`;

const headers = await this.getAuthHeaders();
const eventSourceInitDict = { headers };
Expand All @@ -294,13 +321,13 @@ class Bitloops {
// if (!initialRun) this.resubscribe();

this.subscribeConnection.onopen = () => {
console.log('Resetting retry timer...');
this.reconnectFreqSecs = 1;
};

// eslint-disable-next-line @typescript-eslint/no-unused-vars
this.subscribeConnection.onerror = (error: any) => {
// on error, rest will clear our connectionId so we need to create a new one
console.log('subscribeConnection.onerror, closing and re-trying', error);
// console.log('subscribeConnection.onerror, closing and re-trying', error);
this.subscribeConnection.close();
this.subscriptionId = '';
this.sseReconnect();
Expand Down Expand Up @@ -344,15 +371,15 @@ class Bitloops {
// console.log('isAccessTokenExpired', isAccessTokenExpired);

if (isRefreshTokenExpired) {
console.log('refresh expired, logging out');
// console.log('refresh expired, logging out');
this.auth.clearAuthentication();
return {
...config,
cancelToken: new CancelToken((cancel) => cancel('Cancel repeated request')),
};
}
if (isAccessTokenExpired) {
console.log('access token expired');
// console.log('access token expired');
const newUser = await this.refreshToken();
if (!config.headers) config.headers = {};
config.headers.Authorization = `User ${newUser.accessToken}`;
Expand All @@ -377,11 +404,11 @@ class Bitloops {
const bitloopsConfig = this.config;
if (
bitloopsConfig?.auth?.authenticationType === AuthTypes.User &&
error.response.status === 401 &&
error?.response?.status === 401 &&
!originalRequest.retry
) {
originalRequest.retry = true;
console.log('before refreshh');
// console.log('before refreshh');
await this.refreshToken();
return instance.request(originalRequest);
}
Expand Down Expand Up @@ -409,7 +436,7 @@ class Bitloops {
axios,
);
if (error) {
console.log('Refresh token was invalid');
// console.log('Refresh token was invalid');
// invalid refresh token
// clean refresh_token
// logout user
Expand All @@ -423,7 +450,7 @@ class Bitloops {
accessToken: newAccessToken,
refreshToken: newRefreshToken,
};
console.log('Updated refresh token');
// console.log('Updated refresh token');
await this.storage.saveUser(newUser);
return newUser;
}
Expand Down

0 comments on commit 6c32451

Please sign in to comment.