Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use a more robust queue system, with retry #54

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ SERVICE_PUSHER_BEAMS_INSTANCE_ID=
SERVICE_PUSHER_BEAMS_SECRET_KEY=
HUB_URL=
SNAPSHOT_URI=
REDIS_URL=
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @wa0x6e It won't work without Redis? Redis is usually expensive, retry can't be in memory or on mysql DB?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the light queue libraries uses Redis as backend, it's built for that.

Redis is not used to store the retry, but is used as storage solution for the whole queue system.

As for the redis instance, only a small instance will be required, as data will be cleared as queue is cleared, only data for failed/waiting for retry jobs will be stored.
Another repo (envelop) is also using the queue system, we could share the same redis instance for all repo using a queue system

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For that we need @bonustrack 's approval 😢 because he denied Redis a few times as far as I remember

@bonustrack what you think? we could use Redis here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I've seen REDIS_URL in a few places, so I just assumed that it was common.

24 changes: 0 additions & 24 deletions .eslintrc

This file was deleted.

20 changes: 10 additions & 10 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ jobs:
node-version: [16.10.x]

steps:
- uses: actions/[email protected]
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v1
with:
node-version: ${{ matrix.node-version }}
- name: yarn install, build and lint
run: |
yarn install --frozen-lockfile
yarn build
yarn lint
- uses: actions/[email protected]
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v1
with:
node-version: ${{ matrix.node-version }}
- name: yarn install, build and lint
run: |
yarn install --frozen-lockfile
yarn build
yarn lint
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ dist
build
.env


# Remove some common IDE working directories
.idea
.vscode
8 changes: 0 additions & 8 deletions .prettierrc

This file was deleted.

28 changes: 19 additions & 9 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,25 @@
"license": "MIT",
"scripts": {
"lint": "eslint . --ext .ts --fix",
"typecheck": "tsc --noEmit",
"build": "tsc",
"dev": "nodemon src/index.ts",
"start": "node dist/src/index.js"
"dev": "concurrently --n express,runner \"nodemon src/index.ts\" \"nodemon src/runner.ts\"",
"start": "concurrently --n express,runner \"node dist/src/index.js\" \"node dist/src/runner.js\""
},
"eslintConfig": {
"extends": "@snapshot-labs"
},
"prettier": "@snapshot-labs/prettier-config",
"dependencies": {
"@pusher/push-notifications-server": "^1.2.5",
"@snapshot-labs/snapshot.js": "^0.3.68",
"bluebird": "^3.7.2",
"body-parser": "^1.19.0",
"bullmq": "^3.12.0",
"concurrently": "^8.0.1",
"connection-string": "^1.0.1",
"cors": "^2.8.5",
"discord.js": "^14.3.0",
"dotenv": "^10.0.0",
"eslint": "^6.7.2",
"express": "^4.17.1",
"lodash.chunk": "^4.2.0",
"mysql": "^2.18.1",
Expand All @@ -27,11 +32,16 @@
"typescript": "^4.8.3"
},
"devDependencies": {
"@snapshot-labs/eslint-config": "^0.1.0-beta.9",
"@snapshot-labs/prettier-config": "^0.1.0-beta.7",
"@types/bluebird": "^3.5.38",
"@types/cors": "^2.8.13",
"@types/express": "^4.17.11",
"@types/node": "^17.0.23",
"@typescript-eslint/eslint-plugin": "^2.33.0",
"@typescript-eslint/parser": "^2.33.0",
"eslint-plugin-prettier": "^3.1.3",
"prettier": "^1.19.1"
"@types/lodash.chunk": "^4.2.7",
"@types/mysql": "^2.15.21",
"@types/node": "^18.0.0",
"@types/remove-markdown": "^0.3.1",
"eslint": "^8.28.0",
"prettier": "^2.8.0"
}
}
8 changes: 4 additions & 4 deletions src/api.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import express from 'express';
import { sendEvent } from './events';
import pkg from '../package.json';
import { last_mci } from './replay';
import { getLastMci } from './replay';
import { httpNotificationsQueue } from './queues';

const router = express.Router();

router.get('/', async (req, res) => {
return res.json({
name: pkg.name,
version: pkg.version,
last_mci
last_mci: getLastMci()
});
});

Expand All @@ -23,7 +23,7 @@ router.get('/test', async (req, res) => {
};
try {
new URL(url);
await sendEvent(event, url);
await httpNotificationsQueue.add('http', { event, to: url });
return res.json({ url, success: true });
} catch (e) {
return res.json({ url, error: e });
Expand Down
45 changes: 15 additions & 30 deletions src/discord.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import db from './helpers/mysql';
import removeMd from 'remove-markdown';
import { shortenAddress } from './helpers/utils';
import { subs, loadSubscriptions } from './subscriptions';
import { checkSpace, getProposal } from './helpers/proposal';
import { checkSpace, getProposal } from './helpers/snapshot';

const CLIENT_ID = process.env.DISCORD_CLIENT_ID || '';
const token = process.env.DISCORD_TOKEN || '';
Expand Down Expand Up @@ -69,16 +69,10 @@ const commands = [
.setDMPermission(false)
.setDefaultMemberPermissions(0)
.addChannelOption(option =>
option
.setName('channel')
.setDescription('Channel to post the events')
.setRequired(true)
option.setName('channel').setDescription('Channel to post the events').setRequired(true)
)
.addStringOption(option =>
option
.setName('space')
.setDescription('space to subscribe to')
.setRequired(true)
option.setName('space').setDescription('space to subscribe to').setRequired(true)
)
.addStringOption(option => option.setName('mention').setDescription('Mention role')),
new SlashCommandBuilder()
Expand All @@ -87,33 +81,26 @@ const commands = [
.setDMPermission(false)
.setDefaultMemberPermissions(0)
.addChannelOption(option =>
option
.setName('channel')
.setDescription('Channel to post the events')
.setRequired(true)
option.setName('channel').setDescription('Channel to post the events').setRequired(true)
)
.addStringOption(option =>
option
.setName('space')
.setDescription('space to subscribe to')
.setRequired(true)
option.setName('space').setDescription('space to subscribe to').setRequired(true)
)
];

const rest = new REST({ version: '10' }).setToken(token);

(async () => {
export async function start() {
try {
const rest = new REST({ version: '10' }).setToken(token);

console.log('Started refreshing application (/) commands.');
await rest.put(Routes.applicationCommands(CLIENT_ID), { body: commands });
console.log('Successfully reloaded application (/) commands.');
} catch (error) {
console.error(error);
}
})();

client.login(token);

client.login(token);
} catch (e) {
console.log('Unable to start discord bot', e);
}
}
export const setActivity = (message, url?) => {
try {
client.user.setActivity(message, { type: 'WATCHING', url });
Expand Down Expand Up @@ -229,9 +216,9 @@ async function snapshotCommandHandler(interaction, commandType) {
const spaceExist = await checkSpace(space);
if (!spaceExist) return interaction.reply(`Space not found: ${inlineCode(space)}`);

const subscription = [interaction.guildId, channelId, space, mention || '', ts];
const subscription = [interaction.guildId, channelId, space, mention || '', ts, ts];
await db.queryAsync(
`INSERT INTO subscriptions (guild, channel, space, mention, created) VALUES (?, ?, ?, ?, ?)
`INSERT INTO subscriptions (guild, channel, space, mention, created, updated) VALUES (?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE guild = ?, channel = ?, space = ?, mention = ?, updated = ?`,
[...subscription, ...subscription]
);
Expand Down Expand Up @@ -358,5 +345,3 @@ export const sendEventToDiscordSubscribers = async (event, proposalId) => {
}
return { success: true };
};

export default client;
Loading