|
| 1 | +import EventSource from 'eventsource-ts'; |
| 2 | +import { |
| 3 | + AuthTypes, |
| 4 | + BitloopsConfig, |
| 5 | + IInternalStorage, |
| 6 | + Unsubscribe, |
| 7 | + UnsubscribeParams, |
| 8 | +} from '../definitions'; |
| 9 | +import HTTP from '../HTTP'; |
| 10 | + |
| 11 | +export default class ServerSentEvents { |
| 12 | + public static instance: ServerSentEvents; |
| 13 | + |
| 14 | + private http: HTTP; |
| 15 | + |
| 16 | + private storage: IInternalStorage; |
| 17 | + |
| 18 | + private config: BitloopsConfig; |
| 19 | + |
| 20 | + private subscribeConnection: EventSource; |
| 21 | + |
| 22 | + private subscriptionId: string = ''; |
| 23 | + |
| 24 | + private readonly eventMap = new Map(); |
| 25 | + |
| 26 | + private _sseIsBeingInitialized: boolean = false; |
| 27 | + |
| 28 | + private reconnectFreqSecs: number = 1; |
| 29 | + |
| 30 | + private constructor(http: HTTP, storage: IInternalStorage, config: BitloopsConfig) { |
| 31 | + this.http = http; |
| 32 | + this.config = config; |
| 33 | + this.storage = storage; |
| 34 | + } |
| 35 | + |
| 36 | + public static getInstance(http: HTTP, storage: IInternalStorage, config: BitloopsConfig) { |
| 37 | + if (!ServerSentEvents.instance) { |
| 38 | + ServerSentEvents.instance = new ServerSentEvents(http, storage, config); |
| 39 | + } |
| 40 | + return ServerSentEvents.instance; |
| 41 | + } |
| 42 | + |
| 43 | + private get sseIsBeingInitialized() { |
| 44 | + return this._sseIsBeingInitialized; |
| 45 | + } |
| 46 | + |
| 47 | + private set sseIsBeingInitialized(flagValue: boolean) { |
| 48 | + this._sseIsBeingInitialized = flagValue; |
| 49 | + } |
| 50 | + |
| 51 | + /** |
| 52 | + * @param namedEvent |
| 53 | + * @event Triggers callback when messages are pushed |
| 54 | + */ |
| 55 | + public async subscribe<DataType>( |
| 56 | + namedEvent: string, |
| 57 | + callback: (data: DataType) => void, |
| 58 | + ): Promise<Unsubscribe> { |
| 59 | + console.log('subscribing topic:', namedEvent); |
| 60 | + this.eventMap.set(namedEvent, callback); |
| 61 | + /** Retry if connection is being initialized */ |
| 62 | + if (this.subscriptionId === '' && this.sseIsBeingInitialized) { |
| 63 | + return new Promise((resolve) => { |
| 64 | + setTimeout(() => resolve(this.subscribe(namedEvent, callback)), 100); |
| 65 | + }); |
| 66 | + } |
| 67 | + /** Set initializing flag if you are the initiator */ |
| 68 | + if (this.subscriptionId === '' && this.sseIsBeingInitialized === false) { |
| 69 | + this.sseIsBeingInitialized = true; |
| 70 | + } |
| 71 | + |
| 72 | + /** |
| 73 | + * Becomes Critical section when subscriptionId = '' |
| 74 | + * and sse connection is being Initialized |
| 75 | + * If you are the initiator, response contains new subscriptionId from server |
| 76 | + */ |
| 77 | + const { data: response, error } = await this.registerTopicORConnection( |
| 78 | + this.subscriptionId, |
| 79 | + namedEvent, |
| 80 | + ); |
| 81 | + |
| 82 | + if (error || response === null) { |
| 83 | + console.error('registerTopicORConnection error', error); |
| 84 | + // console.error('registerTopicORConnection', error); |
| 85 | + this.sseIsBeingInitialized = false; |
| 86 | + // TODO differentiate errors - Throw on host unreachable |
| 87 | + throw new Error(`Got error response from REST: ${JSON.stringify(error)}`); |
| 88 | + } |
| 89 | + console.log('registerTopicORConnection success', response.data); |
| 90 | + |
| 91 | + /** If you are the initiator, establish sse connection */ |
| 92 | + if (this.sseIsBeingInitialized === true && this.subscriptionId === '') { |
| 93 | + this.subscriptionId = response.data; |
| 94 | + this.sseIsBeingInitialized = false; |
| 95 | + await this.setupEventSource(); |
| 96 | + } |
| 97 | + /** |
| 98 | + * End of critical section |
| 99 | + */ |
| 100 | + |
| 101 | + const listenerCallback = (event: MessageEvent<any>) => { |
| 102 | + console.log(`received event for namedEvent: ${namedEvent}`); |
| 103 | + callback(JSON.parse(event.data)); |
| 104 | + }; |
| 105 | + console.log('this.subscribeConnection', this.subscribeConnection); |
| 106 | + console.log(`add event listener for namedEvent: ${namedEvent}`); |
| 107 | + this.subscribeConnection.addEventListener(namedEvent, listenerCallback); |
| 108 | + |
| 109 | + return this.unsubscribe({ namedEvent, subscriptionId: this.subscriptionId, listenerCallback }); |
| 110 | + } |
| 111 | + |
| 112 | + /** |
| 113 | + * Gets a new connection Id if called from the first subscriber |
| 114 | + * In all cases it registers the topic to the Connection Id |
| 115 | + * @param subscriptionId |
| 116 | + * @param namedEvent |
| 117 | + * @returns |
| 118 | + */ |
| 119 | + private async registerTopicORConnection(subscriptionId: string, namedEvent: string) { |
| 120 | + const subscribeUrl = `${this.config.ssl === false ? 'http' : 'https'}://${ |
| 121 | + this.config.server |
| 122 | + }/bitloops/events/subscribe/${subscriptionId}`; |
| 123 | + |
| 124 | + const headers = await this.getAuthHeaders(); |
| 125 | + console.log('Sending headers', headers); |
| 126 | + return this.http.handler({ |
| 127 | + url: subscribeUrl, |
| 128 | + method: 'POST', |
| 129 | + headers, |
| 130 | + data: { topics: [namedEvent], workspaceId: this.config.workspaceId }, |
| 131 | + }); |
| 132 | + } |
| 133 | + |
| 134 | + /** |
| 135 | + * Removes event listener from subscription. |
| 136 | + * Deletes events from mapping that had been subscribed. |
| 137 | + * Handles remaining dead subscription connections, in order to not send events. |
| 138 | + * @param subscriptionId |
| 139 | + * @param namedEvent |
| 140 | + * @param listenerCallback |
| 141 | + * @returns void |
| 142 | + */ |
| 143 | + private unsubscribe({ subscriptionId, namedEvent, listenerCallback }: UnsubscribeParams) { |
| 144 | + return async (): Promise<void> => { |
| 145 | + this.subscribeConnection.removeEventListener(namedEvent, listenerCallback); |
| 146 | + console.log(`removed eventListener for ${namedEvent}`); |
| 147 | + this.eventMap.delete(namedEvent); |
| 148 | + if (this.eventMap.size === 0) this.subscribeConnection.close(); |
| 149 | + |
| 150 | + const unsubscribeUrl = `${this.config.ssl === false ? 'http' : 'https'}://${ |
| 151 | + this.config.server |
| 152 | + }/bitloops/events/unsubscribe/${subscriptionId}`; |
| 153 | + |
| 154 | + const headers = await this.getAuthHeaders(); |
| 155 | + |
| 156 | + await this.http.handler({ |
| 157 | + url: unsubscribeUrl, |
| 158 | + method: 'POST', |
| 159 | + headers, |
| 160 | + data: { workspaceId: this.config.workspaceId, topic: namedEvent }, |
| 161 | + }); |
| 162 | + }; |
| 163 | + } |
| 164 | + |
| 165 | + /** |
| 166 | + * Ask for new connection |
| 167 | + */ |
| 168 | + private sseReconnect() { |
| 169 | + setTimeout(async () => { |
| 170 | + console.log('Trying to reconnect sse with', this.reconnectFreqSecs); |
| 171 | + // await this.setupEventSource(); |
| 172 | + this.reconnectFreqSecs = this.reconnectFreqSecs >= 60 ? 60 : this.reconnectFreqSecs * 2; |
| 173 | + return this.tryToResubscribe(); |
| 174 | + }, this.reconnectFreqSecs * 1000); |
| 175 | + } |
| 176 | + |
| 177 | + private async tryToResubscribe() { |
| 178 | + console.log('Attempting to resubscribe'); |
| 179 | + console.log(' this.eventMap.length', this.eventMap.size); |
| 180 | + const subscribePromises = Array.from(this.eventMap.entries()).map(([namedEvent, callback]) => |
| 181 | + this.subscribe(namedEvent, callback), |
| 182 | + ); |
| 183 | + try { |
| 184 | + console.log('this.eventMap length', subscribePromises.length); |
| 185 | + await Promise.all(subscribePromises); |
| 186 | + console.log('Resubscribed all topic successfully!'); |
| 187 | + // All subscribes were successful => done |
| 188 | + } catch (error) { |
| 189 | + // >= 1 subscribes failed => retry |
| 190 | + console.log(`Failed to resubscribe, retrying... in ${this.reconnectFreqSecs}`); |
| 191 | + this.subscribeConnection.close(); |
| 192 | + this.sseReconnect(); |
| 193 | + } |
| 194 | + } |
| 195 | + |
| 196 | + private async setupEventSource() { |
| 197 | + const { subscriptionId } = this; |
| 198 | + const url = `${this.config.ssl === false ? 'http' : 'https'}://${ |
| 199 | + this.config.server |
| 200 | + }/bitloops/events/${subscriptionId}`; |
| 201 | + |
| 202 | + const headers = await this.getAuthHeaders(); |
| 203 | + const eventSourceInitDict = { headers }; |
| 204 | + |
| 205 | + // Need to subscribe with a valid subscriptionConnectionId, or rest will reject us |
| 206 | + this.subscribeConnection = new EventSource(url, eventSourceInitDict); |
| 207 | + // if (!initialRun) this.resubscribe(); |
| 208 | + |
| 209 | + this.subscribeConnection.onopen = () => { |
| 210 | + this.reconnectFreqSecs = 1; |
| 211 | + }; |
| 212 | + |
| 213 | + // eslint-disable-next-line @typescript-eslint/no-unused-vars |
| 214 | + this.subscribeConnection.onerror = (error: any) => { |
| 215 | + // on error, rest will clear our connectionId so we need to create a new one |
| 216 | + console.log('subscribeConnection.onerror, closing and re-trying', error); |
| 217 | + this.subscribeConnection.close(); |
| 218 | + this.subscriptionId = ''; |
| 219 | + this.sseReconnect(); |
| 220 | + }; |
| 221 | + } |
| 222 | + |
| 223 | + /** |
| 224 | + * |
| 225 | + * @returns |
| 226 | + */ |
| 227 | + private async getAuthHeaders() { |
| 228 | + const headers = { 'Content-Type': 'application/json', Authorization: 'Unauthorized ' }; |
| 229 | + const { config } = this; |
| 230 | + const user = await this.storage.getUser(); |
| 231 | + if (config?.auth?.authenticationType === AuthTypes.User && user?.uid) { |
| 232 | + const sessionUuid = await this.storage.getSessionUuid(); |
| 233 | + headers['provider-id'] = config?.auth.providerId; |
| 234 | + headers['client-id'] = config?.auth.clientId; |
| 235 | + headers.Authorization = `User ${user.accessToken}`; |
| 236 | + headers['session-uuid'] = sessionUuid; |
| 237 | + } |
| 238 | + return headers; |
| 239 | + } |
| 240 | +} |
0 commit comments