-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: archive SUCCESS/FAILED notifications #354
Changes from 2 commits
d1416fb
e22f9ea
d6de3fc
88bfdd5
79a979a
6c74dea
e3a5d41
0b75e7b
d51a8b2
384188b
25df0d1
dbba9df
2123548
63e49ca
72c6e41
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,152 @@ | ||||||||||||
import { MigrationInterface, QueryRunner, Table, TableForeignKey } from 'typeorm'; | ||||||||||||
|
||||||||||||
export class ArchiveCompletedNotifications1730724383210 implements MigrationInterface { | ||||||||||||
public async up(queryRunner: QueryRunner): Promise<void> { | ||||||||||||
const table = await queryRunner.getTable('notify_notification_retries'); | ||||||||||||
const foreignKey = table.foreignKeys.find( | ||||||||||||
(fk) => fk.columnNames.indexOf('notification_id') !== -1, | ||||||||||||
); | ||||||||||||
await queryRunner.dropForeignKey('notify_notification_retries', foreignKey); | ||||||||||||
|
||||||||||||
await queryRunner.createTable( | ||||||||||||
new Table({ | ||||||||||||
name: 'archived_notifications', | ||||||||||||
xixas marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
columns: [ | ||||||||||||
{ | ||||||||||||
name: 'id', | ||||||||||||
type: 'int', | ||||||||||||
isPrimary: true, | ||||||||||||
isGenerated: true, | ||||||||||||
generationStrategy: 'increment', | ||||||||||||
}, | ||||||||||||
{ | ||||||||||||
name: 'notification_id', | ||||||||||||
type: 'int', | ||||||||||||
}, | ||||||||||||
xixas marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
{ | ||||||||||||
name: 'channel_type', | ||||||||||||
type: 'tinyint', | ||||||||||||
}, | ||||||||||||
{ | ||||||||||||
name: 'data', | ||||||||||||
type: 'text', | ||||||||||||
isNullable: true, | ||||||||||||
}, | ||||||||||||
{ | ||||||||||||
name: 'delivery_status', | ||||||||||||
type: 'tinyint', | ||||||||||||
default: 1, | ||||||||||||
}, | ||||||||||||
{ | ||||||||||||
name: 'result', | ||||||||||||
type: 'text', | ||||||||||||
isNullable: true, | ||||||||||||
}, | ||||||||||||
{ | ||||||||||||
name: 'created_on', | ||||||||||||
type: 'timestamp', | ||||||||||||
default: 'CURRENT_TIMESTAMP', | ||||||||||||
}, | ||||||||||||
{ | ||||||||||||
name: 'updated_on', | ||||||||||||
type: 'timestamp', | ||||||||||||
default: 'CURRENT_TIMESTAMP', | ||||||||||||
onUpdate: 'CURRENT_TIMESTAMP', | ||||||||||||
}, | ||||||||||||
{ | ||||||||||||
name: 'created_by', | ||||||||||||
type: 'varchar', | ||||||||||||
}, | ||||||||||||
{ | ||||||||||||
name: 'updated_by', | ||||||||||||
type: 'varchar', | ||||||||||||
}, | ||||||||||||
{ | ||||||||||||
name: 'status', | ||||||||||||
type: 'tinyint', | ||||||||||||
default: 1, | ||||||||||||
}, | ||||||||||||
{ | ||||||||||||
name: 'application_id', | ||||||||||||
type: 'int', | ||||||||||||
isNullable: true, | ||||||||||||
}, | ||||||||||||
{ | ||||||||||||
name: 'provider_id', | ||||||||||||
type: 'int', | ||||||||||||
isNullable: true, | ||||||||||||
}, | ||||||||||||
{ | ||||||||||||
name: 'retry_count', | ||||||||||||
type: 'int', | ||||||||||||
length: '3', | ||||||||||||
default: 0, | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove the unsupported 'length' property from the 'retry_count' column. The Apply this diff to remove the unsupported property: {
name: 'retry_count',
type: 'int',
- length: '3',
default: 0,
}, 📝 Committable suggestion
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed d6de3fc There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kshitij-k-osmosys, thank you for fixing the issue. If you found this review helpful, would you consider giving us a shout-out on X? |
||||||||||||
}, | ||||||||||||
], | ||||||||||||
}), | ||||||||||||
); | ||||||||||||
|
||||||||||||
// Create Foreign keys for archived_notifications | ||||||||||||
await queryRunner.createForeignKey( | ||||||||||||
'archived_notifications', | ||||||||||||
new TableForeignKey({ | ||||||||||||
columnNames: ['channel_type'], | ||||||||||||
referencedColumnNames: ['master_id'], | ||||||||||||
referencedTableName: 'notify_master_providers', | ||||||||||||
onDelete: 'CASCADE', | ||||||||||||
}), | ||||||||||||
); | ||||||||||||
|
||||||||||||
await queryRunner.createForeignKey( | ||||||||||||
'archived_notifications', | ||||||||||||
new TableForeignKey({ | ||||||||||||
columnNames: ['provider_id'], | ||||||||||||
referencedColumnNames: ['provider_id'], | ||||||||||||
referencedTableName: 'notify_providers', | ||||||||||||
onDelete: 'CASCADE', | ||||||||||||
}), | ||||||||||||
); | ||||||||||||
osm-vishnukyatannawar marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
} | ||||||||||||
|
||||||||||||
public async down(queryRunner: QueryRunner): Promise<void> { | ||||||||||||
// To drop the auto generated foreign key for archived_notifications | ||||||||||||
const archived_notifications_table = await queryRunner.getTable('archived_notifications'); | ||||||||||||
const archived_notifications_providerIdforeignKey = | ||||||||||||
archived_notifications_table?.foreignKeys.find( | ||||||||||||
(fk) => fk.columnNames.indexOf('provider_id') !== -1, | ||||||||||||
); | ||||||||||||
|
||||||||||||
// providerIdforeignKey | ||||||||||||
xixas marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
if (archived_notifications_providerIdforeignKey) { | ||||||||||||
await queryRunner.dropForeignKey( | ||||||||||||
'archived_notifications', | ||||||||||||
archived_notifications_providerIdforeignKey, | ||||||||||||
); | ||||||||||||
} | ||||||||||||
|
||||||||||||
// channelTypeforeignKey | ||||||||||||
xixas marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
const archived_notifications_channelTypeforeignKey = | ||||||||||||
archived_notifications_table?.foreignKeys.find( | ||||||||||||
(fk) => fk.columnNames.indexOf('channel_type') !== -1, | ||||||||||||
); | ||||||||||||
|
||||||||||||
if (archived_notifications_channelTypeforeignKey) { | ||||||||||||
await queryRunner.dropForeignKey( | ||||||||||||
'archived_notifications', | ||||||||||||
archived_notifications_channelTypeforeignKey, | ||||||||||||
); | ||||||||||||
} | ||||||||||||
|
||||||||||||
await queryRunner.dropTable('archived_notifications'); | ||||||||||||
|
||||||||||||
await queryRunner.createForeignKey( | ||||||||||||
'notify_notification_retries', | ||||||||||||
new TableForeignKey({ | ||||||||||||
columnNames: ['notification_id'], | ||||||||||||
referencedColumnNames: ['id'], | ||||||||||||
referencedTableName: 'notify_notifications', | ||||||||||||
onDelete: 'CASCADE', | ||||||||||||
}), | ||||||||||||
); | ||||||||||||
} | ||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
import { forwardRef, Logger, Module } from '@nestjs/common'; | ||
import { ArchivedNotificationsService } from './archived-notifications.service'; | ||
import { ArchivedNotification } from './entities/archived-notification.entity'; | ||
import { TypeOrmModule } from '@nestjs/typeorm'; | ||
import { NotificationsModule } from '../notifications/notifications.module'; | ||
|
||
@Module({ | ||
imports: [ | ||
TypeOrmModule.forFeature([ArchivedNotification]), | ||
forwardRef(() => NotificationsModule), | ||
], | ||
providers: [ArchivedNotificationsService, Logger], | ||
exports: [ArchivedNotificationsService], | ||
}) | ||
export class ArchivedNotificationsModule {} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
import { Test, TestingModule } from '@nestjs/testing'; | ||
import { ArchivedNotificationsService } from './archived-notifications.service'; | ||
|
||
describe('ArchivedNotificationsService', () => { | ||
let service: ArchivedNotificationsService; | ||
|
||
beforeEach(async () => { | ||
const module: TestingModule = await Test.createTestingModule({ | ||
providers: [ArchivedNotificationsService], | ||
}).compile(); | ||
Comment on lines
+8
to
+10
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add required dependencies to the test module. The service likely requires dependencies for database operations and configuration. Consider adding:
Apply this diff: const module: TestingModule = await Test.createTestingModule({
- providers: [ArchivedNotificationsService],
+ providers: [
+ ArchivedNotificationsService,
+ {
+ provide: getRepositoryToken(Notification),
+ useValue: createMock<Repository<Notification>>(),
+ },
+ {
+ provide: getRepositoryToken(ArchivedNotification),
+ useValue: createMock<Repository<ArchivedNotification>>(),
+ },
+ {
+ provide: ConfigService,
+ useValue: createMock<ConfigService>({
+ get: jest.fn().mockReturnValue(100), // mock archive limit
+ }),
+ },
+ ],
}).compile();
|
||
|
||
service = module.get<ArchivedNotificationsService>(ArchivedNotificationsService); | ||
}); | ||
|
||
it('should be defined', () => { | ||
expect(service).toBeDefined(); | ||
}); | ||
}); | ||
xixas marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
import { forwardRef, Inject, Injectable, Logger } from '@nestjs/common'; | ||
import { ArchivedNotification } from './entities/archived-notification.entity'; | ||
import { Cron, CronExpression } from '@nestjs/schedule'; | ||
import { Repository } from 'typeorm'; | ||
import { InjectRepository } from '@nestjs/typeorm'; | ||
import { Notification } from 'src/modules/notifications/entities/notification.entity'; | ||
import { NotificationsService } from '../notifications/notifications.service'; | ||
import { ConfigService } from '@nestjs/config'; | ||
|
||
const configService = new ConfigService(); | ||
xixas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
@Injectable() | ||
export class ArchivedNotificationsService { | ||
protected readonly logger = new Logger(ArchivedNotificationsService.name); | ||
|
||
constructor( | ||
@InjectRepository(ArchivedNotification) | ||
private readonly archivedNotificationRepository: Repository<ArchivedNotification>, | ||
@Inject(forwardRef(() => NotificationsService)) | ||
protected readonly notificationsService: NotificationsService, | ||
) {} | ||
|
||
private convertToArchivedNotifications(notifications: Notification[]): ArchivedNotification[] { | ||
return notifications.map((notification) => { | ||
const archivedNotification = new ArchivedNotification(); | ||
archivedNotification.applicationId = notification.applicationId; | ||
archivedNotification.channelType = notification.channelType; | ||
archivedNotification.createdBy = notification.createdBy; | ||
archivedNotification.createdOn = notification.createdOn; | ||
archivedNotification.data = notification.data; | ||
archivedNotification.deliveryStatus = notification.deliveryStatus; | ||
archivedNotification.notification_id = notification.id; | ||
archivedNotification.providerId = notification.providerId; | ||
archivedNotification.result = notification.result; | ||
archivedNotification.retryCount = notification.retryCount; | ||
archivedNotification.updatedBy = notification.updatedBy; | ||
archivedNotification.updatedOn = notification.updatedOn; | ||
archivedNotification.status = notification.status; | ||
|
||
this.logger.debug( | ||
`Created ArchivedNotification array using Notification ID: ${notification.id}, deliveryStatus: ${notification.deliveryStatus}`, | ||
); | ||
return archivedNotification; | ||
}); | ||
} | ||
|
||
async moveNotificationsToArchive(): Promise<void> { | ||
const archiveLimit = configService.get<number>('ARCHIVE_LIMIT', 1000); | ||
|
||
try { | ||
// Step 1: Retrieve the notifications to archive | ||
this.logger.log(`Retrieve the top ${archiveLimit} notifications to archive`); | ||
const notificationsToArchive = | ||
await this.notificationsService.findNotificationsToArchive(archiveLimit); | ||
|
||
if (notificationsToArchive.length === 0) { | ||
this.logger.log('No notifications to archive at this time.'); | ||
return; | ||
} | ||
|
||
// Step 2: Convert notifications to archived notifications | ||
const archivedNotificationsArray = | ||
this.convertToArchivedNotifications(notificationsToArchive); | ||
|
||
// Step 3: Insert notifications into the archive table | ||
this.logger.log(`Inserting archived notifications into the archive table`); | ||
await this.archivedNotificationRepository.save(archivedNotificationsArray, { | ||
transaction: true, | ||
}); | ||
|
||
// Step 4: Delete notifications from the main table by IDs | ||
this.logger.log(`Deleting notifications from the main table by IDs`); | ||
await this.notificationsService.deleteArchivedNotifications(notificationsToArchive); | ||
|
||
xixas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
this.logger.log(`Archive notifications task completed`); | ||
} catch (error) { | ||
this.logger.error('Failed to archive notifications:', error); | ||
} | ||
} | ||
|
||
@Cron(CronExpression.EVERY_HOUR) | ||
async ArchiveCompletedNotificationsCron(): Promise<void> { | ||
this.logger.log('Running archive notifications task'); | ||
this.moveNotificationsToArchive(); | ||
xixas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also suggested that we keep the cron time for this job in .env and default to 1 hour. Because for different instances based on their load we might want to override when we want to trigger this job. For example for the SaaS instance we would like to run it once in a day. But for OQSHA instance we might want to run it every 1 hour or 2 hour.
So lets try to keep this customization. Can be done as part of different task.