Skip to content

Commit

Permalink
Task namespace with new interface (#807)
Browse files Browse the repository at this point in the history
* Task namespace with new interface

* taskworker include

* extend task from applyeventlisteners

* base namespace class to handle the listen method

* topic attach to event name

* type update

* remove older Task api

* stack test update for Task

* changeset include

* refactor and e2e test case

* rename task emitter

* listen function public explicitly

* index worker file

* utility function to prefix the event

* correct type of taskworker
  • Loading branch information
iAmmar7 authored and edolix committed Aug 8, 2023
1 parent 1fe0726 commit 4b2a809
Show file tree
Hide file tree
Showing 19 changed files with 481 additions and 396 deletions.
6 changes: 6 additions & 0 deletions .changeset/violet-boats-count.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@signalwire/realtime-api': major
'@signalwire/core': major
---

Task namespace with new interface
123 changes: 79 additions & 44 deletions internal/e2e-realtime-api/src/task.test.ts
Original file line number Diff line number Diff line change
@@ -1,57 +1,92 @@
import { Task } from '@signalwire/realtime-api'
import { SignalWire } from '@signalwire/realtime-api'
import { createTestRunner } from './utils'

const handler = () => {
return new Promise<number>(async (resolve, reject) => {
const context = 'task-e2e'
const firstPayload = {
id: Date.now(),
item: 'first',
}
const lastPayload = {
id: Date.now(),
item: 'last',
}
try {
const client = await SignalWire({
host: process.env.RELAY_HOST || 'relay.swire.io',
project: process.env.RELAY_PROJECT as string,
token: process.env.RELAY_TOKEN as string,
})

const client = new Task.Client({
host: process.env.RELAY_HOST as string,
project: process.env.RELAY_PROJECT as string,
token: process.env.RELAY_TOKEN as string,
contexts: [context],
})
const firstPayload = {
id: Date.now(),
topic: 'home',
}
const secondPayload = {
id: Date.now(),
topic: 'home',
}
const thirdPayload = {
id: Date.now(),
topic: 'office',
}

let counter = 0
let counter = 0
const unsubHomeOffice = await client.task.listen({
topics: ['home', 'office'],
onTaskReceived: (payload) => {
if (
payload.topic !== 'home' ||
payload.id !== firstPayload.id ||
payload.id !== secondPayload.id ||
counter > 3
) {
console.error('Invalid payload on `home` context', payload)
return reject(4)
}
counter++
},
})

client.on('task.received', (payload) => {
if (payload.id === firstPayload.id && payload.item === 'first') {
counter++
} else if (payload.id === lastPayload.id && payload.item === 'last') {
counter++
} else {
console.error('Invalid payload on `task.received`', payload)
return reject(4)
}
const unsubOffice = await client.task.listen({
topics: ['office'],
onTaskReceived: (payload) => {
if (
payload.topic !== 'office' ||
payload.id !== thirdPayload.id ||
counter > 3
) {
console.error('Invalid payload on `home` context', payload)
return reject(4)
}
counter++

if (counter === 2) {
return resolve(0)
}
})
if (counter === 3) {
return resolve(0)
}
},
})

await client.task.send({
topic: 'home',
message: firstPayload,
})

await client.task.send({
topic: 'home',
message: secondPayload,
})

await Task.send({
host: process.env.RELAY_HOST as string,
project: process.env.RELAY_PROJECT as string,
token: process.env.RELAY_TOKEN as string,
context,
message: firstPayload,
})
await unsubHomeOffice()

await Task.send({
host: process.env.RELAY_HOST as string,
project: process.env.RELAY_PROJECT as string,
token: process.env.RELAY_TOKEN as string,
context,
message: lastPayload,
})
// This message should not reach the listener
await client.task.send({
topic: 'home',
message: secondPayload,
})

await client.task.send({
topic: 'office',
message: thirdPayload,
})

await unsubOffice()
} catch (error) {
console.log('Task test error', error)
reject(error)
}
})
}

Expand Down
63 changes: 42 additions & 21 deletions internal/playground-realtime-api/src/task/index.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,52 @@
import { Task } from '@signalwire/realtime-api'

const client = new Task.Client({
host: process.env.HOST || 'relay.swire.io',
project: process.env.PROJECT as string,
token: process.env.TOKEN as string,
contexts: ['office'],
debug: {
logWsTraffic: true,
},
})

client.on('task.received', (payload) => {
console.log('Task Received', payload)
})

setTimeout(async () => {
console.log('Sending to the client..')
await Task.send({
import { SignalWire } from '@signalwire/realtime-api'
;(async () => {
const client = await SignalWire({
host: process.env.HOST || 'relay.swire.io',
project: process.env.PROJECT as string,
token: process.env.TOKEN as string,
context: 'office',
})

const removeOfficeListeners = await client.task.listen({
topics: ['office', 'home'],
onTaskReceived: (payload) => {
console.log('Task received under the "office" or "home" context', payload)
},
})

const removeWorkplaceListeners = await client.task.listen({
topics: ['workplace', 'home'],
onTaskReceived: (payload) => {
console.log(
'Task received under the "workplace" or "home" context',
payload
)
},
})

console.log('Sending a message to office..')
await client.task.send({
topic: 'office',
message: { yo: ['bro', 1, true] },
})

console.log('Sending a message to home..')
await client.task.send({
topic: 'home',
message: { yo: ['bro', 2, true] },
})

await removeOfficeListeners()

console.log('Sending a message to workplace..')
await client.task.send({
topic: 'workplace',
message: { yo: ['bro', 3, true] },
})

await removeWorkplaceListeners()

setTimeout(async () => {
console.log('Disconnect the client..')
client.disconnect()
}, 2000)
}, 2000)
})()
15 changes: 5 additions & 10 deletions internal/stack-tests/src/task/app.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
import { Task } from '@signalwire/realtime-api'
import { SignalWire } from '@signalwire/realtime-api'
import tap from 'tap'

async function run() {
try {
const task = new Task.Client({
const client = await SignalWire({
host: process.env.RELAY_HOST || 'relay.swire.io',
project: process.env.RELAY_PROJECT as string,
token: process.env.RELAY_TOKEN as string,
contexts: [process.env.RELAY_CONTEXT as string],
})

tap.ok(task.on, 'task.on is defined')
tap.ok(task.once, 'task.once is defined')
tap.ok(task.off, 'task.off is defined')
tap.ok(task.removeAllListeners, 'task.removeAllListeners is defined')
tap.ok(task.addContexts, 'task.addContexts is defined')
tap.ok(task.disconnect, 'task.disconnect is defined')
tap.ok(task.removeContexts, 'task.removeContexts is defined')
tap.ok(client.task, 'client.task is defined')
tap.ok(client.task.listen, 'client.task.listen is defined')
tap.ok(client.task.send, 'client.task.send is defined')

process.exit(0)
} catch (error) {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/BaseComponent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ export class BaseComponent<
}

/** @internal */
protected runWorker<Hooks extends SDKWorkerHooks = SDKWorkerHooks>(
public runWorker<Hooks extends SDKWorkerHooks = SDKWorkerHooks>(
name: string,
def: SDKWorkerDefinition<Hooks>
) {
Expand Down
Loading

0 comments on commit 4b2a809

Please sign in to comment.