diff --git a/src/api.ts b/src/api.ts index 426e55d..68c761f 100644 --- a/src/api.ts +++ b/src/api.ts @@ -44,7 +44,7 @@ router.post('/ai/summary/:id', async (req, res) => { const aiSummary = new AiSummary(id, storageEngine(process.env.AI_SUMMARY_SUBDIR)); try { - const cachedSummary = await aiSummary.getCache(); + const cachedSummary = (await aiSummary.isCacheable()) && (await aiSummary.getCache()); let summary = ''; @@ -66,7 +66,7 @@ router.post('/ai/tts/:id', async (req, res) => { const aiTextTpSpeech = new AiTextToSpeech(id, storageEngine(process.env.AI_TTS_SUBDIR)); try { - const cachedAudio = await aiTextTpSpeech.getCache(); + const cachedAudio = (await aiTextTpSpeech.isCacheable()) && (await aiTextTpSpeech.getCache()); let audio: Buffer; diff --git a/src/helpers/snapshot.ts b/src/helpers/snapshot.ts index c7c1a0b..ea07c6b 100644 --- a/src/helpers/snapshot.ts +++ b/src/helpers/snapshot.ts @@ -4,7 +4,7 @@ import { fetchWithKeepAlive } from './utils'; export type Proposal = { id: string; - state: string; + state: 'pending' | 'active' | 'closed'; choices: string[]; space: Space; votes: number; @@ -12,6 +12,7 @@ export type Proposal = { title: string; body: string; discussion: string; + updated: number; }; export type Vote = { @@ -78,6 +79,7 @@ const PROPOSAL_QUERY = gql` network name } + updated } } `; diff --git a/src/lib/ai/summary.ts b/src/lib/ai/summary.ts index c770219..8893f29 100644 --- a/src/lib/ai/summary.ts +++ b/src/lib/ai/summary.ts @@ -3,6 +3,8 @@ import { fetchProposal, Proposal } from '../../helpers/snapshot'; import { IStorage } from '../storage/types'; import Cache from '../cache'; +const tempCacheIds = new Map(); + class Summary extends Cache { proposal?: Proposal | null; openAi: OpenAI; @@ -16,9 +18,8 @@ class Summary extends Cache { async isCacheable() { this.proposal = await fetchProposal(this.id); - if (!this.proposal) { - throw new Error('RECORD_NOT_FOUND'); - } + if (!this.proposal) throw new Error('RECORD_NOT_FOUND'); + if (this.#cacheExpired()) return false; return true; } @@ -53,6 +54,18 @@ class Summary extends Cache { throw e.error?.code ? new Error(e.error?.code.toUpperCase()) : e; } }; + + #cacheExpired = () => { + const { id, state, updated } = this.proposal!; + + if (state !== 'pending') return false; + + return tempCacheIds.has(id) && tempCacheIds.get(id) !== updated; + }; + + afterCreateCache() { + tempCacheIds.set(this.proposal!.id, this.proposal!.updated); + } } export default Summary; diff --git a/src/lib/ai/textToSpeech.ts b/src/lib/ai/textToSpeech.ts index afdf091..094e1d6 100644 --- a/src/lib/ai/textToSpeech.ts +++ b/src/lib/ai/textToSpeech.ts @@ -6,6 +6,7 @@ import { IStorage } from '../storage/types'; const MIN_BODY_LENGTH = 1; const MAX_BODY_LENGTH = 4096; +const tempCacheIds = new Map(); export default class TextToSpeech extends Cache { proposal?: Proposal | null; @@ -20,9 +21,8 @@ export default class TextToSpeech extends Cache { async isCacheable() { this.proposal = await fetchProposal(this.id); - if (!this.proposal) { - throw new Error('RECORD_NOT_FOUND'); - } + if (!this.proposal) throw new Error('RECORD_NOT_FOUND'); + if (this.#cacheExpired()) return false; return true; } @@ -47,4 +47,16 @@ export default class TextToSpeech extends Cache { throw e.error?.code ? new Error(e.error?.code.toUpperCase()) : e; } }; + + #cacheExpired = () => { + const { id, state, updated } = this.proposal!; + + if (state !== 'pending') return false; + + return tempCacheIds.has(id) && tempCacheIds.get(id) !== updated; + }; + + afterCreateCache() { + tempCacheIds.set(this.proposal!.id, this.proposal!.updated); + } } diff --git a/src/lib/cache.ts b/src/lib/cache.ts index d931b48..d7c8f43 100644 --- a/src/lib/cache.ts +++ b/src/lib/cache.ts @@ -36,11 +36,15 @@ export default class Cache { await this.isCacheable(); const content = await this.getContent(); - console.log(`[votes-report] File cache ready to be saved`); + console.log(`[${this.constructor.name}] File cache ready to be saved`); this.storage.set(this.filename, content); this.afterCreateCache(); return content; } + + toString() { + return `${this.constructor.name}#${this.id}`; + } } diff --git a/src/lib/queue.ts b/src/lib/queue.ts index 951fe9c..ddca9f7 100644 --- a/src/lib/queue.ts +++ b/src/lib/queue.ts @@ -1,7 +1,7 @@ import { sleep } from '../helpers/utils'; import { capture } from '@snapshot-labs/snapshot-sentry'; -import Cache from './cache'; import { timeQueueProcess } from './metrics'; +import type Cache from './cache'; const queues = new Map(); const processingItems = new Map(); @@ -9,22 +9,30 @@ const processingItems = new Map(); async function processItem(cacheable: Cache) { console.log(`[queue] Processing queue item: ${cacheable}`); try { + if ( + ['Summary', 'TextToSpeech'].includes(cacheable.constructor.name) && + !(await cacheable.getCache()) + ) { + return; + } + const end = timeQueueProcess.startTimer({ name: cacheable.constructor.name }); - processingItems.set(cacheable.id, cacheable); + processingItems.set(cacheable.toString(), cacheable); + await cacheable.createCache(); end(); } catch (e) { - capture(e, { id: cacheable.id }); + capture(e, { id: cacheable.toString() }); console.error(`[queue] Error while processing item`, e); } finally { - queues.delete(cacheable.id); - processingItems.delete(cacheable.id); + queues.delete(cacheable.toString()); + processingItems.delete(cacheable.toString()); } } export function queue(cacheable: Cache) { - if (!queues.has(cacheable.id)) { - queues.set(cacheable.id, cacheable); + if (!queues.has(cacheable.toString())) { + queues.set(cacheable.toString(), cacheable); } return queues.size; @@ -35,8 +43,8 @@ export function size() { } export function getProgress(id: string) { - if (processingItems.has(id)) { - return processingItems.get(id)?.generationProgress as number; + if (processingItems.has(id.toString())) { + return processingItems.get(id.toString())?.generationProgress as number; } return 0; diff --git a/src/lib/votesReport.ts b/src/lib/votesReport.ts index 3da7220..4ed6120 100644 --- a/src/lib/votesReport.ts +++ b/src/lib/votesReport.ts @@ -104,10 +104,6 @@ class VotesReport extends Cache { return votes; }; - toString() { - return `VotesReport#${this.id}`; - } - #formatCsvLine = (vote: Vote) => { let choices: Vote['choice'][] = []; diff --git a/src/webhook.ts b/src/webhook.ts index 0df7488..79a0f70 100644 --- a/src/webhook.ts +++ b/src/webhook.ts @@ -2,21 +2,28 @@ import express from 'express'; import { rpcError, rpcSuccess, storageEngine } from './helpers/utils'; import { capture } from '@snapshot-labs/snapshot-sentry'; import VotesReport from './lib/votesReport'; +import Summary from './lib/ai/summary'; +import TextToSpeech from './lib/ai/textToSpeech'; import { queue } from './lib/queue'; const router = express.Router(); -function processVotesReport(id: string, event: string) { +function processEvent(id: string, event: string) { if (event == 'proposal/end') { queue(new VotesReport(id, storageEngine(process.env.VOTE_REPORT_SUBDIR))); } + + if (event === 'proposal/start') { + queue(new Summary(id, storageEngine(process.env.AI_SUMMARY_SUBDIR))); + queue(new TextToSpeech(id, storageEngine(process.env.AI_TTS_SUBDIR))); + } } router.post('/webhook', (req, res) => { const body = req.body || {}; const event = body.event?.toString() ?? ''; // eslint-disable-next-line @typescript-eslint/no-unused-vars - const { type, id } = body.id?.toString().split('/'); + const [type, id] = body.id?.toString().split('/'); if (req.headers['authentication'] !== `${process.env.WEBHOOK_AUTH_TOKEN ?? ''}`) { return rpcError(res, 'UNAUTHORIZED', id); @@ -27,7 +34,7 @@ router.post('/webhook', (req, res) => { } try { - processVotesReport(id, event); + processEvent(id, event); return rpcSuccess(res, 'Webhook received', id); } catch (e) { capture(e, { body }); diff --git a/test/unit/lib/ai/summary.test.ts b/test/unit/lib/ai/summary.test.ts new file mode 100644 index 0000000..13d1497 --- /dev/null +++ b/test/unit/lib/ai/summary.test.ts @@ -0,0 +1,78 @@ +import Summary from '../../../../src/lib/ai/summary'; +import * as snapshotHelper from '../../../../src/helpers/snapshot'; +import { storageEngine } from '../../../../src/helpers/utils'; + +const fetchProposalMock = jest.spyOn(snapshotHelper, 'fetchProposal'); + +describe('AI summary', () => { + describe('isCacheable()', () => { + describe('when the proposal is pending', () => { + it('returns true if the proposal has not been cached yet', () => { + expect.assertions(2); + const summary = new Summary('1', storageEngine()); + fetchProposalMock.mockResolvedValueOnce({ + id: '2', + state: 'pending', + updated: 1 + } as snapshotHelper.Proposal); + + expect(summary.isCacheable()).resolves.toBe(true); + expect(fetchProposalMock).toHaveBeenCalledTimes(1); + }); + + it('returns true if the proposal has not been updated since last cache', async () => { + expect.assertions(2); + const summary = new Summary('summary-1', storageEngine()); + fetchProposalMock.mockResolvedValueOnce({ + id: '1', + state: 'pending', + updated: 1 + } as snapshotHelper.Proposal); + await summary.isCacheable(); + summary.afterCreateCache(); + + fetchProposalMock.mockResolvedValueOnce({ + id: '1', + state: 'pending', + updated: 2 + } as snapshotHelper.Proposal); + + expect(summary.isCacheable()).resolves.toBe(false); + expect(fetchProposalMock).toHaveBeenCalledTimes(2); + }); + + it('returns false if the proposal has been updated since last cache', async () => { + expect.assertions(2); + const summary = new Summary('1', storageEngine()); + fetchProposalMock.mockResolvedValue({ + id: '3', + state: 'pending', + updated: 1 + } as snapshotHelper.Proposal); + await summary.isCacheable(); + summary.afterCreateCache(); + + expect(summary.isCacheable()).resolves.toBe(true); + expect(fetchProposalMock).toHaveBeenCalledTimes(2); + }); + }); + + it('returns true when the proposal exist', async () => { + expect.assertions(2); + const summary = new Summary('1', storageEngine()); + fetchProposalMock.mockResolvedValueOnce({} as snapshotHelper.Proposal); + + expect(summary.isCacheable()).resolves.toBe(true); + expect(fetchProposalMock).toHaveBeenCalledTimes(1); + }); + + it('returns false when the proposal does not exist', () => { + expect.assertions(2); + const summary = new Summary('1', storageEngine()); + fetchProposalMock.mockRejectedValueOnce(new Error('RECORD_NOT_FOUND')); + + expect(summary.isCacheable()).rejects.toThrow('RECORD_NOT_FOUND'); + expect(fetchProposalMock).toHaveBeenCalledTimes(1); + }); + }); +});