-
Notifications
You must be signed in to change notification settings - Fork 95
/
Copy pathapp.ts
90 lines (74 loc) · 2.12 KB
/
app.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import * as Koa from 'koa'
import * as Router from 'koa-router'
import { rabbit } from './Rabbit'
import chalk from 'chalk'
import * as body from 'koa-body'
const app = new Koa()
const router = new Router()
const log = console.log
// 定义一个 Promise 避免每次都重复调用
const CHANNEL = rabbit.getChannel()
const Queue = 'node'
const ex = 'logs'
const key = 'anonymous.info'
app.use(body())
// 普通的消息队列 WorkQueues
router.post('/send', async ctx => {
const ch = await CHANNEL
// 定义信道具有持久性
ch.assertQueue(Queue, { durable: true })
if (typeof ctx.request.body.content !== 'string') {
ctx.status = 400
ctx.body = {
status: 'faild'
}
return
}
ch.sendToQueue(Queue, Buffer.from(ctx.request.body.content))
ctx.body = {
result: 'success'
}
log(chalk.magenta`[x] Sent${ctx.request.body.content}`)
})
// 带 exchange 的消息队列 Publis&Subscribe
router.post('/exchange', async ctx => {
const ch = await CHANNEL
ch.assertExchange(ex, 'fanout', { durable: false })
try {
// 默认为空的名字
ch.publish(ex, '', Buffer.from(ctx.request.body.content))
ctx.body = {
result: 'success'
}
} catch (error) {
console.error(chalk.red`${error.toString()}`)
ctx.body = error
}
})
// 带routing的exchange
router.post('/routing', async ctx => {
const ch = await CHANNEL
// 设置 exchange 的名字 以及分发方法
ch.assertExchange('direct_logs', 'direct', { durable: false })
try {
// 向管道分发 info 级的方法
ch.publish('direct_logs', 'info', Buffer.from(ctx.request.body.content))
ctx.body = {
result: 'success'
}
} catch (error) {
console.error(chalk.red`${error.toString()}`)
ctx.body = error
}
})
// topic的
router.post('/topic', async ctx => {
const ch = await CHANNEL
ch.assertExchange(ex, 'topic', { durable: false })
ch.publish(ex, key, Buffer.from(ctx.request.body.content))
})
app.use(router.routes())
const server = app.listen(3000, () => {
const address = server.address()
log(chalk.green`Server is lisitening at ${address.address}:${address.port.toString()}`)
})