From 4b2a809b43288378f3f4122dec33c36ba271d5df Mon Sep 17 00:00:00 2001 From: Ammar Ansari Date: Wed, 21 Jun 2023 19:52:43 +0200 Subject: [PATCH] Task namespace with new interface (#807) * Task namespace with new interface * taskworker include * extend task from applyeventlisteners * base namespace class to handle the listen method * topic attach to event name * type update * remove older Task api * stack test update for Task * changeset include * refactor and e2e test case * rename task emitter * listen function public explicitly * index worker file * utility function to prefix the event * correct type of taskworker --- .changeset/violet-boats-count.md | 6 + internal/e2e-realtime-api/src/task.test.ts | 123 +++++++++----- .../playground-realtime-api/src/task/index.ts | 63 ++++--- internal/stack-tests/src/task/app.ts | 15 +- packages/core/src/BaseComponent.ts | 2 +- packages/realtime-api/src/BaseNamespace.ts | 157 ++++++++++++++++++ packages/realtime-api/src/SWClient.ts | 41 +++++ packages/realtime-api/src/SignalWire.ts | 14 ++ .../src/{ => client}/createClient.test.ts | 6 +- .../realtime-api/src/client/createClient.ts | 17 ++ packages/realtime-api/src/createClient.ts | 61 ------- packages/realtime-api/src/index.ts | 40 +---- packages/realtime-api/src/task/Task.ts | 108 +++++++----- packages/realtime-api/src/task/TaskClient.ts | 66 -------- packages/realtime-api/src/task/send.ts | 87 ---------- packages/realtime-api/src/task/workers.ts | 27 --- .../realtime-api/src/task/workers/index.ts | 1 + .../src/task/workers/taskWorker.ts | 38 +++++ packages/realtime-api/src/utils/internals.ts | 5 + 19 files changed, 481 insertions(+), 396 deletions(-) create mode 100644 .changeset/violet-boats-count.md create mode 100644 packages/realtime-api/src/BaseNamespace.ts create mode 100644 packages/realtime-api/src/SWClient.ts create mode 100644 packages/realtime-api/src/SignalWire.ts rename packages/realtime-api/src/{ => client}/createClient.test.ts (95%) create mode 100644 packages/realtime-api/src/client/createClient.ts delete mode 100644 packages/realtime-api/src/createClient.ts delete mode 100644 packages/realtime-api/src/task/TaskClient.ts delete mode 100644 packages/realtime-api/src/task/send.ts delete mode 100644 packages/realtime-api/src/task/workers.ts create mode 100644 packages/realtime-api/src/task/workers/index.ts create mode 100644 packages/realtime-api/src/task/workers/taskWorker.ts diff --git a/.changeset/violet-boats-count.md b/.changeset/violet-boats-count.md new file mode 100644 index 0000000000..b2b8c89472 --- /dev/null +++ b/.changeset/violet-boats-count.md @@ -0,0 +1,6 @@ +--- +'@signalwire/realtime-api': major +'@signalwire/core': major +--- + +Task namespace with new interface diff --git a/internal/e2e-realtime-api/src/task.test.ts b/internal/e2e-realtime-api/src/task.test.ts index d482960b9d..79474854c1 100644 --- a/internal/e2e-realtime-api/src/task.test.ts +++ b/internal/e2e-realtime-api/src/task.test.ts @@ -1,57 +1,92 @@ -import { Task } from '@signalwire/realtime-api' +import { SignalWire } from '@signalwire/realtime-api' import { createTestRunner } from './utils' const handler = () => { return new Promise(async (resolve, reject) => { - const context = 'task-e2e' - const firstPayload = { - id: Date.now(), - item: 'first', - } - const lastPayload = { - id: Date.now(), - item: 'last', - } + try { + const client = await SignalWire({ + host: process.env.RELAY_HOST || 'relay.swire.io', + project: process.env.RELAY_PROJECT as string, + token: process.env.RELAY_TOKEN as string, + }) - const client = new Task.Client({ - host: process.env.RELAY_HOST as string, - project: process.env.RELAY_PROJECT as string, - token: process.env.RELAY_TOKEN as string, - contexts: [context], - }) + const firstPayload = { + id: Date.now(), + topic: 'home', + } + const secondPayload = { + id: Date.now(), + topic: 'home', + } + const thirdPayload = { + id: Date.now(), + topic: 'office', + } - let counter = 0 + let counter = 0 + const unsubHomeOffice = await client.task.listen({ + topics: ['home', 'office'], + onTaskReceived: (payload) => { + if ( + payload.topic !== 'home' || + payload.id !== firstPayload.id || + payload.id !== secondPayload.id || + counter > 3 + ) { + console.error('Invalid payload on `home` context', payload) + return reject(4) + } + counter++ + }, + }) - client.on('task.received', (payload) => { - if (payload.id === firstPayload.id && payload.item === 'first') { - counter++ - } else if (payload.id === lastPayload.id && payload.item === 'last') { - counter++ - } else { - console.error('Invalid payload on `task.received`', payload) - return reject(4) - } + const unsubOffice = await client.task.listen({ + topics: ['office'], + onTaskReceived: (payload) => { + if ( + payload.topic !== 'office' || + payload.id !== thirdPayload.id || + counter > 3 + ) { + console.error('Invalid payload on `home` context', payload) + return reject(4) + } + counter++ - if (counter === 2) { - return resolve(0) - } - }) + if (counter === 3) { + return resolve(0) + } + }, + }) + + await client.task.send({ + topic: 'home', + message: firstPayload, + }) + + await client.task.send({ + topic: 'home', + message: secondPayload, + }) - await Task.send({ - host: process.env.RELAY_HOST as string, - project: process.env.RELAY_PROJECT as string, - token: process.env.RELAY_TOKEN as string, - context, - message: firstPayload, - }) + await unsubHomeOffice() - await Task.send({ - host: process.env.RELAY_HOST as string, - project: process.env.RELAY_PROJECT as string, - token: process.env.RELAY_TOKEN as string, - context, - message: lastPayload, - }) + // This message should not reach the listener + await client.task.send({ + topic: 'home', + message: secondPayload, + }) + + await client.task.send({ + topic: 'office', + message: thirdPayload, + }) + + await unsubOffice() + } catch (error) { + console.log('Task test error', error) + reject(error) + } }) } diff --git a/internal/playground-realtime-api/src/task/index.ts b/internal/playground-realtime-api/src/task/index.ts index 62a014fef4..6189fc4c4b 100644 --- a/internal/playground-realtime-api/src/task/index.ts +++ b/internal/playground-realtime-api/src/task/index.ts @@ -1,31 +1,52 @@ -import { Task } from '@signalwire/realtime-api' - -const client = new Task.Client({ - host: process.env.HOST || 'relay.swire.io', - project: process.env.PROJECT as string, - token: process.env.TOKEN as string, - contexts: ['office'], - debug: { - logWsTraffic: true, - }, -}) - -client.on('task.received', (payload) => { - console.log('Task Received', payload) -}) - -setTimeout(async () => { - console.log('Sending to the client..') - await Task.send({ +import { SignalWire } from '@signalwire/realtime-api' +;(async () => { + const client = await SignalWire({ host: process.env.HOST || 'relay.swire.io', project: process.env.PROJECT as string, token: process.env.TOKEN as string, - context: 'office', + }) + + const removeOfficeListeners = await client.task.listen({ + topics: ['office', 'home'], + onTaskReceived: (payload) => { + console.log('Task received under the "office" or "home" context', payload) + }, + }) + + const removeWorkplaceListeners = await client.task.listen({ + topics: ['workplace', 'home'], + onTaskReceived: (payload) => { + console.log( + 'Task received under the "workplace" or "home" context', + payload + ) + }, + }) + + console.log('Sending a message to office..') + await client.task.send({ + topic: 'office', message: { yo: ['bro', 1, true] }, }) + console.log('Sending a message to home..') + await client.task.send({ + topic: 'home', + message: { yo: ['bro', 2, true] }, + }) + + await removeOfficeListeners() + + console.log('Sending a message to workplace..') + await client.task.send({ + topic: 'workplace', + message: { yo: ['bro', 3, true] }, + }) + + await removeWorkplaceListeners() + setTimeout(async () => { console.log('Disconnect the client..') client.disconnect() }, 2000) -}, 2000) +})() diff --git a/internal/stack-tests/src/task/app.ts b/internal/stack-tests/src/task/app.ts index db35d05470..da35604a31 100644 --- a/internal/stack-tests/src/task/app.ts +++ b/internal/stack-tests/src/task/app.ts @@ -1,22 +1,17 @@ -import { Task } from '@signalwire/realtime-api' +import { SignalWire } from '@signalwire/realtime-api' import tap from 'tap' async function run() { try { - const task = new Task.Client({ + const client = await SignalWire({ host: process.env.RELAY_HOST || 'relay.swire.io', project: process.env.RELAY_PROJECT as string, token: process.env.RELAY_TOKEN as string, - contexts: [process.env.RELAY_CONTEXT as string], }) - tap.ok(task.on, 'task.on is defined') - tap.ok(task.once, 'task.once is defined') - tap.ok(task.off, 'task.off is defined') - tap.ok(task.removeAllListeners, 'task.removeAllListeners is defined') - tap.ok(task.addContexts, 'task.addContexts is defined') - tap.ok(task.disconnect, 'task.disconnect is defined') - tap.ok(task.removeContexts, 'task.removeContexts is defined') + tap.ok(client.task, 'client.task is defined') + tap.ok(client.task.listen, 'client.task.listen is defined') + tap.ok(client.task.send, 'client.task.send is defined') process.exit(0) } catch (error) { diff --git a/packages/core/src/BaseComponent.ts b/packages/core/src/BaseComponent.ts index 9191872674..7629845111 100644 --- a/packages/core/src/BaseComponent.ts +++ b/packages/core/src/BaseComponent.ts @@ -301,7 +301,7 @@ export class BaseComponent< } /** @internal */ - protected runWorker( + public runWorker( name: string, def: SDKWorkerDefinition ) { diff --git a/packages/realtime-api/src/BaseNamespace.ts b/packages/realtime-api/src/BaseNamespace.ts new file mode 100644 index 0000000000..51c912046e --- /dev/null +++ b/packages/realtime-api/src/BaseNamespace.ts @@ -0,0 +1,157 @@ +import { EventEmitter, ExecuteParams, uuid } from '@signalwire/core' +import type { Client } from './client/Client' +import { SWClient } from './SWClient' +import { prefixEvent } from './utils/internals' + +export interface ListenOptions { + topics: string[] +} + +type ListenersKeys = keyof Omit + +type ListenerMap = Map< + string, + { + topics: Set + listeners: Omit + unsub: () => Promise + } +> + +export class BaseNamespace { + protected _client: Client + protected _sw: SWClient + protected _eventMap: Record + private _namespaceEmitter = new EventEmitter() + private _listenerMap: ListenerMap = new Map() + + constructor(options: { swClient: SWClient }) { + this._sw = options.swClient + this._client = options.swClient.client + } + + get emitter() { + return this._namespaceEmitter + } + + private addTopics(topics: string[]) { + const executeParams: ExecuteParams = { + method: 'signalwire.receive', + params: { + contexts: topics, + }, + } + return this._client.execute(executeParams) + } + + private removeTopics(topics: string[]) { + const executeParams: ExecuteParams = { + method: 'signalwire.unreceive', + params: { + contexts: topics, + }, + } + return this._client.execute(executeParams) + } + + public listen(listenOptions: T) { + return new Promise<() => Promise>(async (resolve, reject) => { + try { + const { topics } = listenOptions + if (topics?.length < 1) { + throw new Error( + 'Invalid options: topics should be an array with at least one topic!' + ) + } + const unsub = await this.subscribe(listenOptions) + resolve(unsub) + } catch (error) { + reject(error) + } + }) + } + + protected async subscribe(listenOptions: T) { + const { topics, ...listeners } = listenOptions + const _uuid = uuid() + + // Attach listeners + this._attachListeners(topics, listeners) + await this.addTopics(topics) + + const unsub = () => { + return new Promise(async (resolve, reject) => { + try { + // Remove the topics + const topicsToRemove = topics.filter( + (topic) => !this.hasOtherListeners(_uuid, topic) + ) + if (topicsToRemove.length > 0) { + await this.removeTopics(topicsToRemove) + } + + // Remove listeners + this._detachListeners(topics, listeners) + + // Remove task from the task listener array + this.removeFromListenerMap(_uuid) + + resolve() + } catch (error) { + reject(error) + } + }) + } + + this._listenerMap.set(_uuid, { + topics: new Set([...topics]), + listeners, + unsub, + }) + + return unsub + } + + private _attachListeners(topics: string[], listeners: Omit) { + const listenerKeys = Object.keys(listeners) as Array + topics.forEach((topic) => { + listenerKeys.forEach((key) => { + if (typeof listeners[key] === 'function' && this._eventMap[key]) { + const event = prefixEvent(topic, this._eventMap[key]) + this.emitter.on(event, listeners[key]) + } + }) + }) + } + + private _detachListeners(topics: string[], listeners: Omit) { + const listenerKeys = Object.keys(listeners) as Array + topics.forEach((topic) => { + listenerKeys.forEach((key) => { + if (typeof listeners[key] === 'function' && this._eventMap[key]) { + const event = prefixEvent(topic, this._eventMap[key]) + this.emitter.off(event, listeners[key]) + } + }) + }) + } + + private hasOtherListeners(uuid: string, topic: string) { + for (const [key, listener] of this._listenerMap) { + if (key === uuid) continue + if (listener.topics.has(topic)) return true + } + return false + } + + protected async unsubscribeAll() { + await Promise.all( + [...this._listenerMap.values()].map(({ unsub }) => unsub()) + ) + this._listenerMap.clear() + } + + private removeFromListenerMap(id: string) { + return this._listenerMap.delete(id) + } +} diff --git a/packages/realtime-api/src/SWClient.ts b/packages/realtime-api/src/SWClient.ts new file mode 100644 index 0000000000..a6bc27d726 --- /dev/null +++ b/packages/realtime-api/src/SWClient.ts @@ -0,0 +1,41 @@ +import { createClient } from './client/createClient' +import type { Client } from './client/Client' +import { clientConnect } from './client/clientConnect' +import { Task } from './task/Task' + +export interface SWClientOptions { + host?: string + project: string + token: string + logLevel?: 'trace' | 'debug' | 'info' | 'warn' | 'error' | 'silent' + debug?: { + logWsTraffic?: boolean + } +} + +export class SWClient { + private _task: Task + + public userOptions: SWClientOptions + public client: Client + + constructor(options: SWClientOptions) { + this.userOptions = options + this.client = createClient(options) + } + + async connect() { + await clientConnect(this.client) + } + + disconnect() { + this.client.disconnect() + } + + get task() { + if (!this._task) { + this._task = new Task(this) + } + return this._task + } +} diff --git a/packages/realtime-api/src/SignalWire.ts b/packages/realtime-api/src/SignalWire.ts new file mode 100644 index 0000000000..245773b6b6 --- /dev/null +++ b/packages/realtime-api/src/SignalWire.ts @@ -0,0 +1,14 @@ +import { SWClient, SWClientOptions } from './SWClient' + +export const SignalWire = (options: SWClientOptions): Promise => { + return new Promise(async (resolve, reject) => { + const swClient = new SWClient(options) + + try { + await swClient.connect() + resolve(swClient) + } catch (error) { + reject(error) + } + }) +} diff --git a/packages/realtime-api/src/createClient.test.ts b/packages/realtime-api/src/client/createClient.test.ts similarity index 95% rename from packages/realtime-api/src/createClient.test.ts rename to packages/realtime-api/src/client/createClient.test.ts index acd032aa8f..d6b9e328a6 100644 --- a/packages/realtime-api/src/createClient.test.ts +++ b/packages/realtime-api/src/client/createClient.test.ts @@ -49,7 +49,7 @@ describe('createClient', () => { it('should throw an error when invalid credentials are provided', async () => { expect.assertions(1) - const client = await createClient({ + const client = createClient({ // @ts-expect-error host, token: '', @@ -65,7 +65,7 @@ describe('createClient', () => { it('should resolve `connect()` when the client is authorized', async () => { expect.assertions(1) - const client = await createClient({ + const client = createClient({ // @ts-expect-error host, token, @@ -102,7 +102,7 @@ describe('createClient', () => { socket.on('message', messageHandler) }) - const client = await createClient({ + const client = createClient({ // @ts-expect-error host: h, token, diff --git a/packages/realtime-api/src/client/createClient.ts b/packages/realtime-api/src/client/createClient.ts new file mode 100644 index 0000000000..fc46a84600 --- /dev/null +++ b/packages/realtime-api/src/client/createClient.ts @@ -0,0 +1,17 @@ +import { connect, ClientEvents } from '@signalwire/core' +import { setupInternals } from '../utils/internals' +import { Client } from './Client' + +export const createClient = (userOptions: { + project: string + token: string + logLevel?: 'trace' | 'debug' | 'info' | 'warn' | 'error' | 'silent' +}) => { + const { emitter, store } = setupInternals(userOptions) + const client = connect({ + store, + Component: Client, + })({ ...userOptions, store, emitter }) + + return client +} diff --git a/packages/realtime-api/src/createClient.ts b/packages/realtime-api/src/createClient.ts deleted file mode 100644 index 4f6cb5aff4..0000000000 --- a/packages/realtime-api/src/createClient.ts +++ /dev/null @@ -1,61 +0,0 @@ -import { - ClientEvents, - configureStore, - connect, - getEventEmitter, - UserOptions, - InternalUserOptions, -} from '@signalwire/core' -import { Client, RealtimeClient } from './Client' -import { Session } from './Session' - -/** @internal */ -export interface CreateClientOptions extends UserOptions {} -export type { RealtimeClient, ClientEvents } - -/** - * Creates a real-time Client. - * @param userOptions - * @param userOptions.project SignalWire project id, e.g. `a10d8a9f-2166-4e82-56ff-118bc3a4840f` - * @param userOptions.token SignalWire project token, e.g. `PT9e5660c101cd140a1c93a0197640a369cf5f16975a0079c9` - * @param userOptions.logLevel logging level - * @returns an instance of a real-time Client. - * - * @example - * ```typescript - * const client = await createClient({ - * project: '', - * token: '' - * }) - * ``` - * - * @deprecated You no longer need to create the client - * manually. You can use the product constructors, like - * {@link Video.Client}, to access the same functionality. - */ -export const createClient: (userOptions: { - project?: string - token: string - logLevel?: 'trace' | 'debug' | 'info' | 'warn' | 'error' | 'silent' -}) => Promise = - // Note: types are inlined for clarity of documentation - async (userOptions) => { - const baseUserOptions: InternalUserOptions = { - ...userOptions, - emitter: getEventEmitter(), - } - const store = configureStore({ - userOptions: baseUserOptions, - SessionConstructor: Session, - }) - - const client = connect({ - store, - Component: Client, - sessionListeners: { - authStatus: 'onAuth', - }, - })(baseUserOptions) - - return client - } diff --git a/packages/realtime-api/src/index.ts b/packages/realtime-api/src/index.ts index 1657810a61..0048a5fe96 100644 --- a/packages/realtime-api/src/index.ts +++ b/packages/realtime-api/src/index.ts @@ -50,8 +50,6 @@ */ export * as Video from './video/Video' -export * from './createClient' - /** * Access the Chat API Consumer. You can instantiate a {@link Chat.Client} to * subscribe to Chat events. Please check {@link Chat.ChatClientApiEvents} @@ -99,42 +97,6 @@ export * as PubSub from './pubSub/PubSub' /** @ignore */ export * from './configure' -/** - * Access the Task API. You can instantiate a {@link Task.Client} to - * receive tasks from a different application. Please check - * {@link Task.TaskClientApiEvents} for the full list of events that - * a {@link Task.Client} can subscribe to. - * - * @example - * - * The following example listens for incoming tasks. - * - * ```javascript - * const client = new Task.Client({ - * project: "", - * token: "", - * contexts: ['office'] - * }) - * - * client.on('task.received', (payload) => { - * console.log('Task Received', payload) - * // Do something with the payload... - * }) - * ``` - * - * From a different process, even on a different machine, you can then send tasks: - * - * ```js - * await Task.send({ - * project: "", - * token: "", - * context: 'office', - * message: { hello: ['world', true] }, - * }) - * ``` - */ -export * as Task from './task/Task' - /** * Access the Messaging API. You can instantiate a {@link Messaging.Client} to * send or receive SMS and MMS. Please check @@ -199,3 +161,5 @@ export * as Messaging from './messaging/Messaging' * ``` */ export * as Voice from './voice/Voice' + +export { SignalWire } from './SignalWire' diff --git a/packages/realtime-api/src/task/Task.ts b/packages/realtime-api/src/task/Task.ts index 3970c3b2cd..cd734d2cb9 100644 --- a/packages/realtime-api/src/task/Task.ts +++ b/packages/realtime-api/src/task/Task.ts @@ -1,55 +1,87 @@ +import { request } from 'node:https' import { - DisconnectableClientContract, - BaseComponentOptions, - BaseComponent, - ClientContextContract, + EventEmitter, + TaskInboundEvent, + TaskReceivedEventName, } from '@signalwire/core' -import { connect } from '@signalwire/core' -import type { TaskClientApiEvents } from '../types' -import { RealtimeClient } from '../client/index' +import { SWClient } from '../SWClient' import { taskWorker } from './workers' +import { ListenOptions, BaseNamespace } from '../BaseNamespace' -export interface Task - extends DisconnectableClientContract, - ClientContextContract { - /** @internal */ - _session: RealtimeClient - /** - * Disconnects this client. The client will stop receiving events and you will - * need to create a new instance if you want to use it again. - * - * @example - * - * ```js - * client.disconnect() - * ``` - */ - disconnect(): void +const PATH = '/api/relay/rest/tasks' +const HOST = 'relay.signalwire.com' + +interface TaskListenOptions extends ListenOptions { + onTaskReceived?: (payload: TaskInboundEvent['message']) => unknown } -/** @internal */ -class TaskAPI extends BaseComponent { - constructor(options: BaseComponentOptions) { - super(options) +type TaskListenersKeys = keyof Omit + +export class Task extends BaseNamespace { + private _taskEmitter = new EventEmitter() + protected _eventMap: Record = { + onTaskReceived: 'task.received', + } + + constructor(options: SWClient) { + super({ swClient: options }) - this.runWorker('taskWorker', { + this._client.runWorker('taskWorker', { worker: taskWorker, + initialState: { + taskEmitter: this._taskEmitter, + }, }) } -} -/** @internal */ -export const createTaskObject = (params: BaseComponentOptions): Task => { - const task = connect({ - store: params.store, - Component: TaskAPI, - })(params) + get emitter() { + return this._taskEmitter + } - return task + send({ + topic, + message, + }: { + topic: string + message: Record + }) { + const { userOptions } = this._sw + if (!userOptions.project || !userOptions.token) { + throw new Error('Invalid options: project and token are required!') + } + return new Promise((resolve, reject) => { + try { + const Authorization = `Basic ${Buffer.from( + `${userOptions.project}:${userOptions.token}` + ).toString('base64')}` + + const data = JSON.stringify({ context: topic, message }) + const options = { + host: userOptions.host ?? HOST, + port: 443, + method: 'POST', + path: PATH, + headers: { + Authorization, + 'Content-Type': 'application/json', + 'Content-Length': data.length, + }, + } + const req = request(options, ({ statusCode }) => { + statusCode === 204 ? resolve() : reject() + }) + + req.on('error', reject) + + req.write(data) + req.end() + } catch (error) { + reject(error) + } + }) + } } -export * from './TaskClient' -export * from './send' export type { TaskReceivedEventName } from '@signalwire/core' export type { TaskClientApiEvents, diff --git a/packages/realtime-api/src/task/TaskClient.ts b/packages/realtime-api/src/task/TaskClient.ts deleted file mode 100644 index 2932197352..0000000000 --- a/packages/realtime-api/src/task/TaskClient.ts +++ /dev/null @@ -1,66 +0,0 @@ -import type { UserOptions } from '@signalwire/core' -import { setupClient, clientConnect } from '../client/index' -import type { Task } from './Task' -import { createTaskObject } from './Task' -import { clientContextInterceptorsFactory } from '../common/clientContext' - -interface TaskClient extends Task { - new (opts: TaskClientOptions): this -} - -export interface TaskClientOptions - extends Omit { - contexts: string[] -} - -/** - * Creates a new Task client. - * - * @param options - {@link TaskClientOptions} - * - * @example - * - * ```js - * import { Task } from '@signalwire/realtime-api' - * - * const taskClient = new Task.Client({ - * project: '', - * token: '', - * contexts: [''], - * }) - * ``` - */ -const TaskClient = function (options?: TaskClientOptions) { - const { client, store } = setupClient(options) - - const task = createTaskObject({ - store, - }) - - const disconnect = () => client.disconnect() - - const interceptors = { - ...clientContextInterceptorsFactory(client), - _session: client, - disconnect, - } as const - - return new Proxy>(task, { - get(target: TaskClient, prop: keyof TaskClient, receiver: any) { - if (prop in interceptors) { - // @ts-expect-error - return interceptors[prop] - } - - // Always connect the underlying client if the user call a function on the Proxy - if (typeof target[prop] === 'function') { - clientConnect(client) - } - - return Reflect.get(target, prop, receiver) - }, - }) - // For consistency with other constructors we'll make TS force the use of `new` -} as unknown as { new (options?: TaskClientOptions): TaskClient } - -export { TaskClient as Client } diff --git a/packages/realtime-api/src/task/send.ts b/packages/realtime-api/src/task/send.ts deleted file mode 100644 index 44e4ecb3b3..0000000000 --- a/packages/realtime-api/src/task/send.ts +++ /dev/null @@ -1,87 +0,0 @@ -import { request } from 'node:https' - -const PATH = '/api/relay/rest/tasks' -const HOST = 'relay.signalwire.com' - -/** Parameters for {@link send} */ -export interface TaskSendParams { - /** @ignore */ - host?: string - /** SignalWire project id, e.g. `a10d8a9f-2166-4e82-56ff-118bc3a4840f` */ - project: string - /** SignalWire project token, e.g. `PT9e5660c101cd140a1c93a0197640a369cf5f16975a0079c9` */ - token: string - /** Context to send the task to */ - context: string - /** Message to send */ - message: Record -} - -/** - * Send a job to your Task Client in a specific context. - * - * @param params - * @returns - * - * @example - * - * > Send a task with a message to then make an outbound Call. - * - * ```js - * const message = { - * 'action': 'call', - * 'from': '+18881112222' - * 'to': '+18881113333' - * } - * - * await Task.send({ - * project: "", - * token: "", - * context: 'office', - * message: message, - * }) - * ``` - * - */ -export const send = ({ - host = HOST, - project, - token, - context, - message, -}: TaskSendParams) => { - if (!project || !token) { - throw new Error('Invalid options: project and token are required!') - } - - return new Promise((resolve, reject) => { - try { - const Authorization = `Basic ${Buffer.from( - `${project}:${token}` - ).toString('base64')}` - - const data = JSON.stringify({ context, message }) - const options = { - host, - port: 443, - method: 'POST', - path: PATH, - headers: { - Authorization, - 'Content-Type': 'application/json', - 'Content-Length': data.length, - }, - } - const req = request(options, ({ statusCode }) => { - statusCode === 204 ? resolve() : reject() - }) - - req.on('error', reject) - - req.write(data) - req.end() - } catch (error) { - reject(error) - } - }) -} diff --git a/packages/realtime-api/src/task/workers.ts b/packages/realtime-api/src/task/workers.ts deleted file mode 100644 index 77a2e2552c..0000000000 --- a/packages/realtime-api/src/task/workers.ts +++ /dev/null @@ -1,27 +0,0 @@ -import { - getLogger, - sagaEffects, - SagaIterator, - SDKWorker, - SDKActions, -} from '@signalwire/core' -import type { Task } from './Task' - -export const taskWorker: SDKWorker = function* (options): SagaIterator { - getLogger().trace('taskWorker started') - const { channels, instance } = options - const { swEventChannel } = channels - - while (true) { - const action = yield sagaEffects.take( - swEventChannel, - (action: SDKActions) => { - return action.type === 'queuing.relay.tasks' - } - ) - - instance.emit('task.received', action.payload.message) - } - - getLogger().trace('taskWorker ended') -} diff --git a/packages/realtime-api/src/task/workers/index.ts b/packages/realtime-api/src/task/workers/index.ts new file mode 100644 index 0000000000..aeb7a96ae6 --- /dev/null +++ b/packages/realtime-api/src/task/workers/index.ts @@ -0,0 +1 @@ +export * from './taskWorker' diff --git a/packages/realtime-api/src/task/workers/taskWorker.ts b/packages/realtime-api/src/task/workers/taskWorker.ts new file mode 100644 index 0000000000..e1fb9d1233 --- /dev/null +++ b/packages/realtime-api/src/task/workers/taskWorker.ts @@ -0,0 +1,38 @@ +import { + getLogger, + sagaEffects, + SagaIterator, + SDKWorker, + SDKActions, + TaskAction, +} from '@signalwire/core' +import { prefixEvent } from '../../utils/internals' +import { Task } from '../Task' + +export const taskWorker: SDKWorker = function* (options): SagaIterator { + getLogger().trace('taskWorker started') + const { + channels: { swEventChannel }, + initialState: { taskEmitter }, + } = options + + function* worker(action: TaskAction) { + const { context } = action.payload + + taskEmitter.emit( + prefixEvent(context, 'task.received'), + action.payload.message + ) + } + + const isTaskEvent = (action: SDKActions) => + action.type === 'queuing.relay.tasks' + + while (true) { + const action = yield sagaEffects.take(swEventChannel, isTaskEvent) + + yield sagaEffects.fork(worker, action) + } + + getLogger().trace('taskWorker ended') +} diff --git a/packages/realtime-api/src/utils/internals.ts b/packages/realtime-api/src/utils/internals.ts index 17993fa487..7e7123e198 100644 --- a/packages/realtime-api/src/utils/internals.ts +++ b/packages/realtime-api/src/utils/internals.ts @@ -62,3 +62,8 @@ export const getCredentials = (options?: GetCredentialsOptions) => { return { project, token } } + +export const prefixEvent = (prefix: string, event: string) => { + if (typeof prefix !== 'string' || typeof event !== 'string') return event + return `${prefix}.${event}` +}