diff --git a/src/constants/neo4j.constant.ts b/src/constants/neo4j.constant.ts index b15ffb5..f5ce31e 100644 --- a/src/constants/neo4j.constant.ts +++ b/src/constants/neo4j.constant.ts @@ -10,4 +10,8 @@ export const NEO4J_PLATFORM_INFO = { platform: 'DiscoursePlatform', member: 'DiscourseMember', }, + [PlatformNames.Telegram]: { + platform: 'TelegramPlatform', + member: 'TelegramMember', + }, }; diff --git a/src/controllers/index.ts b/src/controllers/index.ts index 8d09a2a..56d55d6 100644 --- a/src/controllers/index.ts +++ b/src/controllers/index.ts @@ -10,6 +10,7 @@ import categoryController from './category.controller'; import moduleController from './module.controller'; import discourseController from './discourse.controller'; import nftController from './nft.controller'; +import telegramController from './telegram.controller'; export { authController, @@ -24,4 +25,5 @@ export { moduleController, discourseController, nftController, + telegramController, }; diff --git a/src/controllers/telegram.controller.ts b/src/controllers/telegram.controller.ts new file mode 100644 index 0000000..1132f8e --- /dev/null +++ b/src/controllers/telegram.controller.ts @@ -0,0 +1,142 @@ +import { Response } from 'express'; +import moment from 'moment-timezone'; + +import { DatabaseManager } from '@togethercrew.dev/db'; + +import { activityCompostionsTypes } from '../config/memberBreakDownTables'; +import { IAuthAndPlatform } from '../interfaces/Request.interface'; +import { telegramService } from '../services'; +import { SupportedNeo4jPlatforms } from '../types/neo4j.type'; +import { catchAsync, charts, date, pick } from '../utils'; + +const heatmapChart = catchAsync(async function (req: IAuthAndPlatform, res: Response) { + const platformConnection = await DatabaseManager.getInstance().getPlatformDb(req.platform?.id); + let heatmaps = await telegramService.heatmapService.getHeatmapChart(platformConnection, req.body); + const timeZoneOffset = parseInt(moment().tz(req.body.timeZone).format('Z')); + + if (timeZoneOffset !== 0) { + heatmaps = date.shiftHeatmapsHours(heatmaps, timeZoneOffset); + } + heatmaps = charts.fillHeatmapChart(heatmaps); + res.send(heatmaps); +}); + +const lineGraph = catchAsync(async function (req: IAuthAndPlatform, res: Response) { + const platformConnection = await DatabaseManager.getInstance().getPlatformDb(req.platform?.id); + let lineGraph = await telegramService.heatmapService.lineGraph( + platformConnection, + req.body.startDate, + req.body.endDate, + ); + lineGraph = charts.fillHeatmapLineGraph(lineGraph, req.body.startDate, req.body.endDate); + res.send(lineGraph); +}); + +const membersInteractionsNetworkGraph = catchAsync(async function (req: IAuthAndPlatform, res: Response) { + const networkGraphData = await telegramService.memberActivityService.getMembersInteractionsNetworkGraph( + req.platform.id, + req.platform?.name as SupportedNeo4jPlatforms, + ); + res.send(networkGraphData); +}); + +const activeMembersCompositionTable = catchAsync(async function (req: IAuthAndPlatform, res: Response) { + const filter = pick({ ...req.query, ...req.body }, ['activityComposition', 'ngu']); + const options = pick(req.query, ['sortBy', 'limit', 'page']); + const platformConnection = await DatabaseManager.getInstance().getPlatformDb(req.platform?.id); + const activityCompostionFields = + telegramService.memberActivityService.getActivityCompositionOfActiveMembersComposition(); + const memberActivity = await telegramService.memberActivityService.getLastDocumentForTablesUsage( + platformConnection, + activityCompostionFields, + ); + console.log('memberActivity',memberActivity) + const members = await telegramService.membersService.queryMembersForTables( + platformConnection, + filter, + options, + memberActivity, + activityCompostionsTypes.activeMembersComposition, + ); + console.log('members',members) + if (members) { + members.results.forEach((member) => { + member.ngu = telegramService.membersService.getNgu(member); + member.activityComposition = telegramService.memberActivityService.getActivityComposition( + member, + memberActivity, + filter.activityComposition, + ); + }); + } + res.send(members); +}); + +const activeMembersOnboardingTable = catchAsync(async function (req: IAuthAndPlatform, res: Response) { + const filter = pick({ ...req.query, ...req.body }, ['activityComposition', 'ngu']); + const options = pick(req.query, ['sortBy', 'limit', 'page']); + const platformConnection = await DatabaseManager.getInstance().getPlatformDb(req.platform?.id); + const activityCompostionFields = + telegramService.memberActivityService.getActivityCompositionOfActiveMembersOnboarding(); + const memberActivity = await telegramService.memberActivityService.getLastDocumentForTablesUsage( + platformConnection, + activityCompostionFields, + ); + const members = await telegramService.membersService.queryMembersForTables( + platformConnection, + filter, + options, + memberActivity, + activityCompostionsTypes.activeMembersOnboarding, + ); + if (members) { + members.results.forEach((member) => { + member.ngu = telegramService.membersService.getNgu(member); + member.activityComposition = telegramService.memberActivityService.getActivityComposition( + member, + memberActivity, + filter.activityComposition, + ); + }); + } + res.send(members); +}); + +const disengagedMembersCompositionTable = catchAsync(async function (req: IAuthAndPlatform, res: Response) { + const filter = pick({ ...req.query, ...req.body }, ['activityComposition', 'ngu']); + const options = pick(req.query, ['sortBy', 'limit', 'page']); + const platformConnection = await DatabaseManager.getInstance().getPlatformDb(req.platform?.id); + const activityCompostionFields = + telegramService.memberActivityService.getActivityCompositionOfDisengagedComposition(); + const memberActivity = await telegramService.memberActivityService.getLastDocumentForTablesUsage( + platformConnection, + activityCompostionFields, + ); + const members = await telegramService.membersService.queryMembersForTables( + platformConnection, + filter, + options, + memberActivity, + activityCompostionsTypes.disengagedMembersCompostion, + ); + if (members) { + members.results.forEach((member) => { + member.ngu = telegramService.membersService.getNgu(member); + member.activityComposition = telegramService.memberActivityService.getActivityComposition( + member, + memberActivity, + filter.activityComposition, + ); + }); + } + res.send(members); +}); + +export default { + heatmapChart, + lineGraph, + membersInteractionsNetworkGraph, + activeMembersCompositionTable, + activeMembersOnboardingTable, + disengagedMembersCompositionTable, +}; diff --git a/src/docs/memberActivity.doc.yml b/src/docs/memberActivity.doc.yml index 1f51095..acbed78 100644 --- a/src/docs/memberActivity.doc.yml +++ b/src/docs/memberActivity.doc.yml @@ -164,7 +164,7 @@ paths: post: tags: - [Member-Activity] - summary: Get data for inactive members line graph - discord only + summary: Get data for inactive members line graph - Discord, Discourse and Telegram security: - bearerAuth: [] parameters: @@ -229,7 +229,7 @@ paths: post: tags: - [Member-Activity] - summary: Get data for active members interactions graph - discord only + summary: Get data for active members interactions graph - Discord, Discourse and Telegram security: - bearerAuth: [] parameters: @@ -333,7 +333,7 @@ paths: get: tags: - [Member-Activity] - summary: Get data for fragmentation score - discord only + summary: Get data for fragmentation score - Discord, Discourse and Telegram security: - bearerAuth: [] parameters: @@ -382,7 +382,7 @@ paths: get: tags: - [Member-Activity] - summary: Get data for decentralisation score - discord only + summary: Get data for decentralisation score - Discord, Discourse and Telegram security: - bearerAuth: [] parameters: @@ -431,7 +431,7 @@ paths: post: tags: - [Member-Activity] - summary: Get data for active members onboarding line graph - discord only + summary: Get data for active members onboarding line graph - Discord, Discourse and Telegram security: - bearerAuth: [] parameters: @@ -510,7 +510,7 @@ paths: post: tags: - [Member-Activity] - summary: Get data for active members composition table - discord only + summary: Get data for active members composition table - Discord, Discourse and Telegram security: - bearerAuth: [] description: for now sortBy just can apply for ngu and joinedAt(DaoMemberSince in UI) @@ -655,7 +655,7 @@ paths: post: tags: - [Member-Activity] - summary: Get data for active members onboarding table - discord only + summary: Get data for active members onboarding table - Discord, Discourse and Telegram security: - bearerAuth: [] description: for now sortBy just can apply for ngu and joinedAt(DaoMemberSince in UI) @@ -800,7 +800,7 @@ paths: post: tags: - [Member-Activity] - summary: Get data for disengaged members composition table - discord only + summary: Get data for disengaged members composition table - Discord, Discourse and Telegram security: - bearerAuth: [] description: for now sortBy just can apply for ngu and joinedAt(DaoMemberSince in UI) diff --git a/src/docs/telegram.doc.yml b/src/docs/telegram.doc.yml new file mode 100644 index 0000000..1cd43ed --- /dev/null +++ b/src/docs/telegram.doc.yml @@ -0,0 +1,553 @@ +paths: + /api/v1/telegram/heatmaps/{platformId}/heatmap-chart: + post: + tags: + - [Telegram] + summary: Get data for heatmap chart - telegram only + security: + - bearerAuth: [] + parameters: + - in: path + name: platformId + required: true + schema: + type: string + description: Platform Id + requestBody: + required: true + content: + application/json: + schema: + type: object + properties: + startDate: + type: string + format: date-time + endDate: + type: string + format: date-time + timeZone: + type: string + example: + startDate: '2023-01-17T13:02:10.911+00:00' + endDate: '2023-01-29T10:50:01.513Z' + timeZone: 'America/Fortaleza' + responses: + '200': + description: OK + content: + application/json: + schema: + type: array + items: + type: array + items: + type: number + example: [[6, 5, 28], [7, 10, 28]] + '400': + description: Bad Request + $ref: '#/components/responses/BadRequest' + '401': + description: Unauthorized + $ref: '#/components/responses/Unauthorized' + '404': + description: NotFound + $ref: '#/components/responses/NotFound' + /api/v1/telegram/heatmaps/{platformId}/line-graph: + post: + tags: + - [Telegram] + summary: Get data for line graph - telegram only + security: + - bearerAuth: [] + parameters: + - in: path + name: platformId + required: true + schema: + type: string + description: Platform Id + requestBody: + required: true + content: + application/json: + schema: + type: object + properties: + startDate: + type: string + format: date-time + endDate: + type: string + format: date-time + example: + startDate: '2023-01-17T13:02:10.911+00:00' + endDate: '2023-01-29T10:50:01.513Z' + responses: + '200': + description: OK + content: + application/json: + schema: + type: object + properties: + categories: + type: array + items: + type: string + series: + type: array + items: + type: object + properties: + name: + type: string + data: + type: array + items: + type: number + emojis: + type: number + messages: + type: number + emojiPercentageChange: + type: number + msgPercentageChange: + type: number + '400': + description: Bad Request + $ref: '#/components/responses/BadRequest' + '401': + description: Unauthorized + $ref: '#/components/responses/Unauthorized' + '404': + description: NotFound + $ref: '#/components/responses/NotFound' + /api/v1/telegram/member-activity/{platformId}/members-interactions-network-graph: + post: + tags: + - [Telegram] + summary: Get data for active members interactions graph + security: + - bearerAuth: [] + parameters: + - in: path + name: platformId + required: true + schema: + type: string + description: Platform Id + responses: + '200': + description: Successful operation + content: + application/json: + schema: + type: array + items: + type: object + properties: + from: + type: object + properties: + id: + type: string + radius: + type: integer + username: + type: string + stats: + type: string + avatar: + type: string + example: 'b50adff099924dd5e6b72d13f77eb9d7' + joinedAt: + type: string + format: date-time + example: '2022-05-30T15:46:52.924+00:00' + roles: + type: array + items: + type: object + properties: + roleId: + type: string + name: + type: string + example: 'role1' + color: + type: string + example: '#000000' + ngu: + type: string + example: 'HajBehzadTalast' + + to: + type: object + properties: + id: + type: string + radius: + type: integer + username: + type: string + stats: + type: string + avatar: + type: string + example: 'b50adff099924dd5e6b72d13f77eb9d7' + joinedAt: + type: string + format: date-time + example: '2022-05-30T15:46:52.924+00:00' + roles: + type: array + items: + type: object + properties: + roleId: + type: string + example: 'discordRoleId1' + name: + type: string + example: 'role1' + color: + type: string + example: '#000000' + ngu: + type: string + example: 'HajBehzadTalast' + + width: + type: integer + '401': + description: Unauthorized + $ref: '#/components/responses/Unauthorized' + '404': + description: NotFound + $ref: '#/components/responses/NotFound' + /api/v1/telegram/member-activity/{platformId}/active-members-composition-table: + post: + tags: + - [Telegram] + summary: Get data for active members composition table + security: + - bearerAuth: [] + description: for now sortBy just can apply for ngu and joined_at(DaoMemberSince in UI) + parameters: + - in: path + name: platformId + required: true + schema: + type: string + description: Platform Id + - in: query + name: activityComposition + schema: + type: array + items: + type: string + enum: ['all_active', 'all_new_active', 'all_consistent', 'all_vital', 'all_new_disengaged', 'others'] + example: ['all_active', 'all_new_active'] + - in: query + name: ngu + required: false + schema: + type: string + example: 'Nima' + - in: query + name: sortBy + required: false + schema: + type: string + example: 'joined_at:desc,ngu:asc' + - in: query + name: limit + required: false + schema: + type: integer + format: int32 + example: 10 + default: 10 + - in: query + name: page + required: false + schema: + type: integer + format: int32 + example: 2 + default: 1 + responses: + '200': + description: OK + content: + application/json: + schema: + type: object + properties: + results: + type: array + items: + type: object + properties: + id: + type: integer + username: + type: string + example: 'Behzad_Rabiei' + ngu: + type: string + example: 'Behzad' + avatar: + type: string + example: 'b50adff099924dd5e6b72d13f77eb9d7' + activityComposition: + type: array + items: + type: string + example: ['Newly active', 'Vital'] + joined_at: + type: string + format: date-time + example: '2022-05-30T15:46:52.924+00:00' + limit: + type: integer + example: 10 + page: + type: integer + example: 1 + totalPages: + type: integer + example: 2 + totalResults: + type: integer + example: 20 + '400': + description: Bad Request + $ref: '#/components/responses/BadRequest' + '401': + description: Unauthorized + $ref: '#/components/responses/Unauthorized' + '404': + description: NotFound + $ref: '#/components/responses/NotFound' + /api/v1/telegram/member-activity/{platformId}/active-members-onboarding-table: + post: + tags: + - [Telegram] + summary: Get data for active members onboarding table + security: + - bearerAuth: [] + description: for now sortBy just can apply for ngu and joined_at(DaoMemberSince in UI) + parameters: + - in: path + name: platformId + required: true + schema: + type: string + description: Platform Id + - in: query + name: activityComposition + schema: + type: array + items: + type: string + enum: ['all_joined', 'all_new_active', 'all_still_active', 'all_dropped', 'others'] + example: ['all_joined', 'all_new_active'] + - in: query + name: ngu + required: false + schema: + type: string + example: 'Nima' + - in: query + name: sortBy + required: false + schema: + type: string + example: 'joined_at:desc,ngu:asc' + - in: query + name: limit + required: false + schema: + type: integer + format: int32 + example: 10 + default: 10 + - in: query + name: page + required: false + schema: + type: integer + format: int32 + example: 2 + default: 1 + + responses: + '200': + description: OK + content: + application/json: + schema: + type: object + properties: + results: + type: array + items: + type: object + properties: + id: + type: integer + username: + type: string + example: 'Behzad_Rabiei' + ngu: + type: string + example: 'Behzad' + avatar: + type: string + example: 'b50adff099924dd5e6b72d13f77eb9d7' + activityComposition: + type: array + items: + type: string + example: ['Newly active', 'Dropped'] + joined_at: + type: string + format: date-time + example: '2022-05-30T15:46:52.924+00:00' + limit: + type: integer + example: 10 + page: + type: integer + example: 1 + totalPages: + type: integer + example: 2 + totalResults: + type: integer + example: 20 + '400': + description: Bad Request + $ref: '#/components/responses/BadRequest' + '401': + description: Unauthorized + $ref: '#/components/responses/Unauthorized' + '404': + description: NotFound + $ref: '#/components/responses/NotFound' + /api/v1/telegram/member-activity/{platformId}/disengaged-members-composition-table: + post: + tags: + - [Telegram] + summary: Get data for disengaged members composition table + security: + - bearerAuth: [] + description: for now sortBy just can apply for ngu and joined_at(DaoMemberSince in UI) + parameters: + - in: path + name: platformId + required: true + schema: + type: string + description: Platform Id + - in: query + name: activityComposition + schema: + type: array + items: + type: string + enum: + [ + 'all_new_disengaged', + 'all_disengaged_were_newly_active', + 'all_disengaged_were_consistenly_active', + 'all_disengaged_were_vital', + 'others', + ] + example: ['all_new_disengaged', 'all_disengaged_were_newly_active'] + style: form + explode: true + - in: query + name: ngu + required: false + schema: + type: string + example: 'Nima' + - in: query + name: sortBy + required: false + schema: + type: string + example: 'joined_at:desc,ngu:asc' + - in: query + name: limit + required: false + schema: + type: integer + format: int32 + example: 10 + default: 10 + - in: query + name: page + required: false + schema: + type: integer + format: int32 + example: 2 + default: 1 + responses: + '200': + description: OK + content: + application/json: + schema: + type: object + properties: + results: + type: array + items: + type: object + properties: + id: + type: integer + username: + type: string + example: 'Behzad_Rabiei' + ngu: + type: string + example: 'Behzad' + avatar: + type: string + example: 'b50adff099924dd5e6b72d13f77eb9d7' + activityComposition: + type: array + items: + type: string + example: ['Were vital members', 'Were newly active'] + joined_at: + type: string + format: date-time + example: '2022-05-30T15:46:52.924+00:00' + limit: + type: integer + example: 10 + page: + type: integer + example: 1 + totalPages: + type: integer + example: 2 + totalResults: + type: integer + example: 20 + '400': + description: Bad Request + $ref: '#/components/responses/BadRequest' + '401': + description: Unauthorized + $ref: '#/components/responses/Unauthorized' + '404': + description: NotFound + $ref: '#/components/responses/NotFound' diff --git a/src/routes/v1/index.ts b/src/routes/v1/index.ts index bfd2136..64a52dc 100644 --- a/src/routes/v1/index.ts +++ b/src/routes/v1/index.ts @@ -12,6 +12,7 @@ import categoryRoute from './category.route'; import moduleRoute from './module.route'; import discourseRoute from './discourse.route'; import nftRoute from './nft.route'; +import telegramRoute from './telegram.route'; const router = express.Router(); const defaultRoutes = [ @@ -68,6 +69,10 @@ const defaultRoutes = [ path: '/nft', route: nftRoute, }, + { + path: '/telegram', + route: telegramRoute, + }, ]; defaultRoutes.forEach((route) => { diff --git a/src/routes/v1/telegram.route.ts b/src/routes/v1/telegram.route.ts new file mode 100644 index 0000000..89eec03 --- /dev/null +++ b/src/routes/v1/telegram.route.ts @@ -0,0 +1,48 @@ +import express from 'express'; +import { telegramController } from '../../controllers'; +import { telegramValidation } from '../../validations'; + +import { auth, validate } from '../../middlewares'; +const router = express.Router(); + +// Routes +router.post( + '/heatmaps/:platformId/heatmap-chart', + auth('admin', 'view'), + validate(telegramValidation.heatmapChart), + telegramController.heatmapChart, +); +router.post( + '/heatmaps/:platformId/line-graph', + auth('admin', 'view'), + validate(telegramValidation.lineGraph), + telegramController.lineGraph, +); + +router.post( + '/member-activity/:platformId/members-interactions-network-graph', + auth('admin', 'view'), + validate(telegramValidation.membersInteractionsNetworkGraph), + telegramController.membersInteractionsNetworkGraph, +); + +router.post( + '/member-activity/:platformId/active-members-composition-table', + auth('admin', 'view'), + validate(telegramValidation.activeMembersCompositionTable), + telegramController.activeMembersCompositionTable, +); +router.post( + '/member-activity/:platformId/active-members-onboarding-table', + auth('admin', 'view'), + validate(telegramValidation.activeMembersOnboardingTable), + telegramController.activeMembersOnboardingTable, +); +router.post( + '/member-activity/:platformId/disengaged-members-composition-table', + auth('admin', 'view'), + validate(telegramValidation.disengagedMembersCompositionTable), + telegramController.disengagedMembersCompositionTable, +); + +export default router; diff --git a/src/services/index.ts b/src/services/index.ts index 401f7d2..c7eb9dc 100644 --- a/src/services/index.ts +++ b/src/services/index.ts @@ -16,7 +16,7 @@ import notionService from './notion'; import discourseService from './discourse'; import ociService from './oci.service'; import nftService from './nft.service'; - +import telegramService from './telegram'; export { userService, authService, @@ -36,4 +36,5 @@ export { discourseService, ociService, nftService, + telegramService, }; diff --git a/src/services/telegram/heatmap.service.ts b/src/services/telegram/heatmap.service.ts new file mode 100644 index 0000000..58bfdd1 --- /dev/null +++ b/src/services/telegram/heatmap.service.ts @@ -0,0 +1,313 @@ +import { Connection } from 'mongoose'; + +import parentLogger from '../../config/logger'; +import { date, math } from '../../utils'; + +const logger = parentLogger.child({ module: 'DiscourseHeatmapService' }); + +/** + * get heatmap chart + * @param {Connection} platformConnection + * @param {IHeatmapChartRequestBody} body + * @returns {Array>} + */ +async function getHeatmapChart(platformConnection: Connection, body: any) { + const { startDate, endDate} = body; + try { + let matchStage: any = { + $and: [{ date: { $gte: new Date(startDate) } }, { date: { $lte: new Date(endDate) } }], + }; + + + const heatmaps = await platformConnection.models.HeatMap.aggregate([ + // Stage1 : convert date from string to date type and extract needed data + { + $project: { + _id: 0, + date: { $convert: { input: '$date', to: 'date' } }, + chat_messages: 1, + replier: 1, + }, + }, + + // Stage2: find heatmaps between startDate and endDate + { + $match: matchStage, + }, + + // Stage3 : provide one document for each element of interactions array + { + $unwind: { + path: '$chat_messages', + includeArrayIndex: 'arrayIndex', + }, + }, + + // Stage4 : extract needed data + { + $project: { + dayOfWeek: { $add: [{ $dayOfWeek: '$date' }, -1] }, + hour: { $add: ['$arrayIndex', 1] }, + interactions: { + $add: ['$chat_messages', { $arrayElemAt: ['$replier', '$arrayIndex'] }], + }, + }, + }, + + // Stage5 : group documents based on day and hour + { + $group: { + _id: { dayOfWeek: '$dayOfWeek', hour: '$hour' }, + interactions: { $sum: '$interactions' }, + }, + }, + ]); + + // Convert Arrays of objects to array of 2D arrays + return heatmaps.map((object) => [object._id.dayOfWeek, object._id.hour, object.interactions]); + } catch (error) { + logger.error({ platform_connection: platformConnection.name, body, error }, 'Failed to get heatmap chart'); + return []; + } +} + +/** + * get line graph + * @param {Connection} connection + * @param {Date} startDate + * @param {Date} endDate + * @returns {Object} + */ +async function lineGraph(platformConnection: Connection, startDate: Date, endDate: Date) { + const start = new Date(startDate); + const end = new Date(endDate); + try { + const heatmaps = await platformConnection.models.HeatMap.aggregate([ + // Stage 1: Convert date from string to date type and extract needed data + { + $project: { + _id: 0, + date: { $convert: { input: '$date', to: 'date' } }, + chat_messages: 1, + replier: 1, + reacter: 1, + + }, + }, + + // Stage 2: Filter documents based on date range + { + $match: { + date: { + $gte: new Date(start), + $lte: new Date(end), + }, + }, + }, + + // Stage 3: Add month names array for later use + { + $addFields: { + monthNames: ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'], + }, + }, + + // Stage 4: Calculate statistics and concatenate day-month field + { + $project: { + date: 1, + day_month: { + $concat: [ + { $dateToString: { format: '%d', date: '$date' } }, + ' ', + { + $arrayElemAt: ['$monthNames', { $subtract: [{ $month: '$date' }, 1] }], + }, + ], + }, + chat_messages_messages: { + $reduce: { + input: '$chat_messages', + initialValue: 0, + in: { $sum: ['$$value', '$$this'] }, + }, + }, + total_replier: { + $reduce: { + input: '$replier', + initialValue: 0, + in: { $sum: ['$$value', '$$this'] }, + }, + }, + emojis: { + $reduce: { + input: '$reacter', + initialValue: 0, + in: { $sum: ['$$value', '$$this'] }, + }, + }, + }, + }, + + // Stage 5: Sort documents by date + { + $sort: { date: 1 }, + }, + + // Stage 6: Group documents by day and compute summary statistics + { + $group: { + _id: { + date: '$date', + day_month: '$day_month', + }, + emojis: { $sum: '$emojis' }, + messages: { $sum: { $sum: ['$chat_messages_messages', '$total_replier'] } }, + }, + }, + + // Stage 7: Sort documents by date + { + $sort: { '_id.date': 1 }, + }, + + // Stage 8: Transform group data into final format for charting + { + $group: { + _id: null, + categories: { $push: '$_id.day_month' }, + emojis: { $push: '$emojis' }, + messages: { $push: '$messages' }, + // totalEmojis: { $sum: "$emojis" }, + // totalMessages: { $sum: "$messages" } + lastMessages: { $last: '$messages' }, + lastEmojis: { $last: '$emojis' }, + }, + }, + // Stage 9: Project data into final format + { + $project: { + _id: 0, + categories: '$categories', + series: [ + { name: 'emojis', data: '$emojis' }, + { name: 'messages', data: '$messages' }, + ], + emojis: '$lastEmojis', + messages: '$lastMessages', + }, + }, + ]); + + if (heatmaps.length === 0) { + return { + categories: [], + series: [], + emojis: 0, + messages: 0, + msgPercentageChange: 0, + emojiPercentageChange: 0, + }; + } + + const adjustedDate = date.calculateAdjustedDate(endDate, heatmaps[0].categories[heatmaps[0].categories.length - 1]); + const adjustedHeatmap = await platformConnection.models.HeatMap.aggregate([ + // Stage 1: Convert date from string to date type and extract needed data + { + $project: { + _id: 0, + date: { $convert: { input: '$date', to: 'date' } }, + chat_messages: 1, + replier: 1, + reacter: 1, + }, + }, + + // Stage 2: Filter documents based on date + { + $match: { + date: { + $gte: new Date(adjustedDate), + $lt: new Date(new Date(adjustedDate).getTime() + 24 * 60 * 60 * 1000), // add one day in milliseconds + }, + }, + }, + + // Stage 3: Calculate statistics and concatenate day-month field + { + $project: { + total_chat_messages: { + $reduce: { + input: '$chat_messages', + initialValue: 0, + in: { $sum: ['$$value', '$$this'] }, + }, + }, + total_replier: { + $reduce: { + input: '$replier', + initialValue: 0, + in: { $sum: ['$$value', '$$this'] }, + }, + }, + emojis: { + $reduce: { + input: '$reacter', + initialValue: 0, + in: { $sum: ['$$value', '$$this'] }, + }, + }, + }, + }, + + // Stage 4: Group documents by null (aggregate all) and sum up all the values + { + $group: { + _id: null, // Aggregate all documents + total_chat_messages: { $sum: '$total_chat_messages' }, + total_replier: { $sum: '$total_replier' }, + total_emojis: { $sum: '$emojis' }, + }, + }, + // Stage 5: Transform totals into 'messages' and 'emojis' + { + $project: { + _id: 0, + messages: { + $add: ['$total_chat_messages', '$total_replier'], + }, + emojis: '$total_emojis', + }, + }, + ]); + + if (adjustedHeatmap.length === 0) { + return { + ...heatmaps[0], + msgPercentageChange: 'N/A', + emojiPercentageChange: 'N/A', + }; + } + + return { + ...heatmaps[0], + msgPercentageChange: math.calculatePercentageChange(adjustedHeatmap[0].messages, heatmaps[0].messages), + emojiPercentageChange: math.calculatePercentageChange(adjustedHeatmap[0].emojis, heatmaps[0].emojis), + }; + } catch (err) { + logger.error({ platform_connection: platformConnection.name, startDate, endDate }, 'Failed to get line graph data'); + return { + categories: [], + series: [], + emojis: 0, + messages: 0, + msgPercentageChange: 0, + emojiPercentageChange: 0, + }; + } +} + +export default { + getHeatmapChart, + lineGraph, +}; diff --git a/src/services/telegram/index.ts b/src/services/telegram/index.ts new file mode 100644 index 0000000..c1adf8f --- /dev/null +++ b/src/services/telegram/index.ts @@ -0,0 +1,9 @@ +import heatmapService from './heatmap.service'; +import memberActivityService from './memberActivity.service'; +import membersService from './members.service'; + +export default { + heatmapService, + memberActivityService, + membersService, +}; diff --git a/src/services/telegram/memberActivity.service.ts b/src/services/telegram/memberActivity.service.ts new file mode 100644 index 0000000..155e5bb --- /dev/null +++ b/src/services/telegram/memberActivity.service.ts @@ -0,0 +1,272 @@ +import httpStatus from 'http-status'; +import { Connection } from 'mongoose'; + +import { DatabaseManager } from '@togethercrew.dev/db'; + +import parentLogger from '../../config/logger'; +import { NEO4J_PLATFORM_INFO } from '../../constants/neo4j.constant'; +import * as Neo4j from '../../neo4j'; +import { SupportedNeo4jPlatforms } from '../../types/neo4j.type'; +import { ApiError } from '../../utils'; +import NodeStats from '../../utils/enums/nodeStats.enum'; + +const logger = parentLogger.child({ module: 'DiscourseMemberActivityService' }); + +function getNgu(user:any): string { + const { first_name, last_name, username } = user.options; + const combinedName = [first_name, last_name].filter(Boolean).join(" "); + return combinedName || username; +} + +type networkGraphUserInformationType = { + username: string; + avatar: string | null | undefined; + joinedAt: Date | null; + roles: []; + ngu: string; +}; + +function getUserInformationForNetworkGraph(user: any): networkGraphUserInformationType { + return { + username: user.options.username, + avatar: user.options.avatar, + joinedAt: user.joined_at, + roles: [], + ngu: getNgu(user), + }; +} + +type memberInteractionType = { id: string; radius: number; stats: NodeStats } & networkGraphUserInformationType; +type memberInteractionsGraphResponseType = { width: number; from: memberInteractionType; to: memberInteractionType }[]; +async function getMembersInteractionsNetworkGraph( + platformId: string, + platformName: SupportedNeo4jPlatforms, +): Promise { + try { + const platformConnection = await DatabaseManager.getInstance().getPlatformDb(platformId); + const usersInNetworkGraph: string[] = []; + // userInteraction + const usersInteractionsQuery = ` + MATCH () -[r:INTERACTED_WITH {platformId: "${platformId}"}]-() + WITH max(r.date) as latest_date + MATCH (a:${NEO4J_PLATFORM_INFO[platformName].member})-[r:INTERACTED_WITH {platformId: "${platformId}", date: latest_date}]->(b:${NEO4J_PLATFORM_INFO[platformName].member}) + RETURN a, r, b`; + + console.log(usersInteractionsQuery) + const neo4jUsersInteractionsData = await Neo4j.read(usersInteractionsQuery); + const { records: neo4jUsersInteractions } = neo4jUsersInteractionsData; + const usersInteractions = neo4jUsersInteractions.map((usersInteraction) => { + // @ts-ignore + const { _fieldLookup, _fields } = usersInteraction; + const a = _fields[_fieldLookup['a']]; + const r = _fields[_fieldLookup['r']]; + const b = _fields[_fieldLookup['b']]; + + const aUserId = a?.properties?.id as string; + const rWeeklyInteraction = r?.properties?.weight as number; + const bUserId = b?.properties?.id as string; + + usersInNetworkGraph.push(aUserId); + usersInNetworkGraph.push(bUserId); + const interaction = { + aUserId, + bUserId, + rWeeklyInteraction, + }; + + return interaction; + }); + console.log(neo4jUsersInteractionsData,usersInteractions,NEO4J_PLATFORM_INFO[platformName].member,NEO4J_PLATFORM_INFO[platformName].platform, platformId ) + + // userRadius + const userRadiusQuery = ` + MATCH () -[r:INTERACTED_WITH {platformId: "${platformId}"}]-() + WITH max(r.date) as latest_date + MATCH (a:${NEO4J_PLATFORM_INFO[platformName].member}) -[r:INTERACTED_WITH {date: latest_date, platformId :"${platformId}"}]-(:${NEO4J_PLATFORM_INFO[platformName].member}) + WITH a, r + RETURN a.id as userId, SUM(r.weight) as radius`; + const neo4jUserRadiusData = await Neo4j.read(userRadiusQuery); + const { records: neo4jUserRadius } = neo4jUserRadiusData; + const userRadius = neo4jUserRadius.map((userRadius) => { + // @ts-ignore + const { _fieldLookup, _fields } = userRadius; + const userId = _fields[_fieldLookup['userId']] as string; + const radius = _fields[_fieldLookup['radius']] as number; + + return { userId, radius }; + }); + // userStatus + const userStatusQuery = ` + MATCH () -[r:INTERACTED_IN]-(g:${NEO4J_PLATFORM_INFO[platformName].platform} {id: "${platformId}"}) + WITH max(r.date) as latest_date + MATCH (a:${NEO4J_PLATFORM_INFO[platformName].member})-[r:INTERACTED_IN {date: latest_date}]->(g:${NEO4J_PLATFORM_INFO[platformName].platform} {id: "${platformId}"}) + RETURN a.id as userId, r.status as status`; + const neo4jUserStatusData = await Neo4j.read(userStatusQuery); + const { records: neo4jUserStatus } = neo4jUserStatusData; + const userStatus = neo4jUserStatus.map((userStatus) => { + // @ts-ignore + const { _fieldLookup, _fields } = userStatus; + const userId = _fields[_fieldLookup['userId']] as string; + const status = _fields[_fieldLookup['status']] as number; + const stats = + status == 0 ? NodeStats.SENDER : status == 1 ? NodeStats.RECEIVER : status == 2 ? NodeStats.BALANCED : null; + + return { userId, stats }; + }); + + const usersInfo = await platformConnection.db + .collection('rawmembers') + .find({ id: { $in: usersInNetworkGraph } }) + .toArray(); + + // prepare data + const response = usersInteractions.flatMap((interaction) => { + const { aUserId, bUserId, rWeeklyInteraction } = interaction; + // Radius + const aUserRadiusObj = userRadius.find((userRadius) => userRadius.userId == aUserId); + const aUserRadius = aUserRadiusObj?.radius as number; + const bUserRadiusObj = userRadius.find((userRadius) => userRadius.userId == bUserId); + const bUserRadius = bUserRadiusObj?.radius as number; + // Status + const aUserStatsObj = userStatus.find((userStatus) => userStatus.userId == aUserId); + const aUserStats = aUserStatsObj?.stats; + const bUserStatsObj = userStatus.find((userStatus) => userStatus.userId == bUserId); + const bUserStats = bUserStatsObj?.stats; + + // userInfo + const aUser = usersInfo.find((user) => user.id === aUserId); + const bUser = usersInfo.find((user) => user.id === bUserId); + if (!aUser || !bUser) return []; + + const aInfo = getUserInformationForNetworkGraph(aUser); + const bInfo = getUserInformationForNetworkGraph(bUser); + + if (!aUserStats || !bUserStats) { + return []; + } + + return { + from: { id: aUserId, radius: aUserRadius, stats: aUserStats, ...aInfo }, + to: { id: bUserId, radius: bUserRadius, stats: bUserStats, ...bInfo }, + width: rWeeklyInteraction, + }; + }); + + return response; + } catch (error) { + logger.error(error, 'Failed to get discourse members interaction network graph'); + throw new ApiError(httpStatus.INTERNAL_SERVER_ERROR, 'Failed to get discourse members interaction network graph'); + } +} +/** + * Constructs a projection stage object for MongoDB aggregation pipeline based on the provided activity composition fields. + * + * @param {Array} fields - The activity composition fields to include in the projection. Each field corresponds to a property in the database documents. + * @returns {Stage} The projection stage object. It includes a '_id' field set to '0', an 'all' field with an empty '$setUnion', and additional fields based on the 'fields' parameter. Each additional field is prefixed with a '$'. + */ +function buildProjectStageBasedOnActivityComposition(fields: Array) { + const initialStage: { + _id: string; + all: { $setUnion: Array }; + [key: string]: string | { $setUnion: Array }; + } = { + _id: '0', + all: { $setUnion: [] }, + }; + + const finalStage = fields.reduce((stage, field) => { + stage[field] = `$${field}`; + stage.all.$setUnion.push(`$${field}`); + return stage; + }, initialStage); + + return finalStage; +} + +/** + * get activity composition fileds of active member onboarding table + * @returns {Object} + */ +function getActivityCompositionOfActiveMembersComposition() { + return ['all_active', 'all_new_active', 'all_consistent', 'all_vital', 'all_new_disengaged']; +} + +/** + * get activity composition fileds of active member compostion table + * @returns {Object} + */ +function getActivityCompositionOfActiveMembersOnboarding() { + return ['all_joined', 'all_new_active', 'all_still_active', 'all_dropped']; +} + +/** + * get activity composition fileds of disengaged member compostion table + * @returns {Object} + */ +function getActivityCompositionOfDisengagedComposition() { + return [ + 'all_new_disengaged', + 'all_disengaged_were_newly_active', + 'all_disengaged_were_consistently_active', + 'all_disengaged_were_vital', + ]; +} + +/** + * get last member activity document for usage of member activity table + * @param {Connection} platformConnection + * @param {Any} activityComposition + * @returns {Object} + */ +async function getLastDocumentForTablesUsage(platformConnection: Connection, activityComposition: Array) { + const projectStage = buildProjectStageBasedOnActivityComposition(activityComposition); + const lastDocument = await platformConnection.models.MemberActivity.aggregate([ + { $sort: { date: -1 } }, + { $limit: 1 }, + { $project: projectStage }, + ]); + return lastDocument[0]; +} + +function getActivityComposition(discourseMember: any, memberActivity: any, activityComposition: Array) { + const activityTypes = [ + { key: 'all_new_active', message: 'Newly active' }, + { key: 'all_new_disengaged', message: 'Became disengaged' }, + { key: 'all_active', message: 'Active members' }, + { key: 'all_consistent', message: 'Consistently active' }, + { key: 'all_vital', message: 'Vital member' }, + { key: 'all_joined', message: 'Joined' }, + { key: 'all_dropped', message: 'Dropped' }, + { key: 'all_still_active', message: 'Still active' }, + { key: 'all_disengaged_were_newly_active', message: 'Were newly active' }, + { key: 'all_disengaged_were_consistently_active', message: 'Were consistenly active' }, + { key: 'all_disengaged_were_vital', message: 'Were vital members' }, + ]; + + const activityCompositions = []; + + activityTypes.forEach((activityType) => { + if ( + memberActivity[activityType.key] && + memberActivity[activityType.key].includes(discourseMember.id) && + (!activityComposition || activityComposition.length === 0 || activityComposition.includes(activityType.key)) + ) { + activityCompositions.push(activityType.message); + } + }); + + if (activityCompositions.length === 0) { + activityCompositions.push('Others'); + } + + return activityCompositions; +} + +export default { + getMembersInteractionsNetworkGraph, + getLastDocumentForTablesUsage, + getActivityComposition, + getActivityCompositionOfActiveMembersComposition, + getActivityCompositionOfActiveMembersOnboarding, + getActivityCompositionOfDisengagedComposition, +}; diff --git a/src/services/telegram/members.service.ts b/src/services/telegram/members.service.ts new file mode 100644 index 0000000..6d87afe --- /dev/null +++ b/src/services/telegram/members.service.ts @@ -0,0 +1,170 @@ +import { Connection } from 'mongoose'; + +import parentLogger from '../../config/logger'; +import { sort } from '../../utils'; + +const logger = parentLogger.child({ module: 'DiscourseMemberService' }); + +type Filter = { + activityComposition?: Array; + ngu?: string; +}; + +type Options = { + sortBy?: string; + limit?: string; + page?: string; +}; + + +function getNgu(user: any): string { + const { firstName, lastName, username } = user; + const combinedName = [firstName, lastName].filter(Boolean).join(" "); + return combinedName || username; +} +/** + * Query members with a filter and options. + * @param {Connection} platformConnection - The MongoDB connection. + * @param {Filter} filter - The filter object + * @param {Options} options - The options object with fields like 'sortBy', 'limit' and 'page'. + * @param {any} memberActivity - The document containing the last member activity. + * @param {Array} activityCompostionsTypes - An array containing types of activity compositions. + * @returns {Promise} - An object with the query results and other information like 'limit', 'page', 'totalPages', 'totalResults'. + */ +async function queryMembersForTables( + platformConnection: Connection, + filter: Filter, + options: Options, + memberActivity: any, + activityCompostionsTypes: Array, +) { + try { + const { ngu, activityComposition } = filter; + const { sortBy } = options; + const limit = options.limit && parseInt(options.limit, 10) > 0 ? parseInt(options.limit, 10) : 10; + const page = options.page && parseInt(options.page, 10) > 0 ? parseInt(options.page, 10) : 1; + const sortParams: Record = sortBy ? sort.sortByHandler(sortBy) : { 'options.username': 1 }; + + let matchStage: any = {}; + let allActivityIds: string[] = []; + + const memberActivityDocument = await platformConnection.models.MemberActivity.findOne() + .sort({ date: -1 }) + .select({ date: 1, _id: 0 }); + + if (activityComposition && activityComposition.length > 0) { + // If 'others' is in activityComposition, we exclude all IDs that are part of other activities + if (activityComposition.includes('others')) { + allActivityIds = activityCompostionsTypes + .filter((activity) => activity !== 'others') + .flatMap((activity) => memberActivity[activity]); + + matchStage.id = { $nin: allActivityIds }; + console.log('activityComposition.INCLUDE',matchStage) + + } + + // If specific activity compositions are mentioned along with 'others', we add them separately + if (activityComposition.some((activity) => activity !== 'others')) { + const specificActivityIds = activityComposition + .filter((activity) => activity !== 'others') + .flatMap((activity) => memberActivity[activity]); + + if (matchStage.id) { + matchStage = { $or: [{ id: { $in: specificActivityIds } }, matchStage] }; + } else { + matchStage.id = { $in: specificActivityIds }; + } + console.log('activityComposition.some',matchStage) + + } + } + + if (ngu) { + matchStage.$or = [ + { 'options.username': { $regex: ngu, $options: 'i' } }, + { 'options.first_name': { $regex: ngu, $options: 'i' } }, + { 'options.last_name': { $regex: ngu, $options: 'i' } }, + ]; + console.log('ngu',matchStage) + + } + + if (memberActivityDocument) { + const date = new Date(memberActivityDocument.date); + date.setHours(23, 59, 59, 999); + matchStage.$or = [ + { joined_at: null }, + { joined_at: { $lte: date } } + ]; + console.log('memberActivityDocument',matchStage) + } + + + const totalResults = await platformConnection.db.collection('rawmembers').countDocuments(matchStage); + + console.log(1000,totalResults,matchStage) + const results = await platformConnection.db + .collection('rawmembers') + .aggregate([ + { + $match: matchStage, + }, + { + $sort: sortParams, + }, + { + $skip: limit * (page - 1), + }, + { + $limit: limit, + }, + { + $project: { + id: 1, + username: '$options.username', + avatar: '$options.avatar', + joined_at: 1, + firstName: '$options.first_name', + lastName: '$options.last_name', + _id: 0, + }, + }, + ]) + .toArray(); + + const totalPages = Math.ceil(totalResults / limit); + + return { + results, + limit, + page, + totalPages, + totalResults, + }; + } catch (error) { + logger.error( + { + platformConnection: platformConnection.name, + filter, + options, + memberActivity, + activityCompostionsTypes, + error, + }, + 'Failed to query members', + ); + return { + results: [], + limit: 10, + page: 1, + totalPages: 0, + totalResults: 0, + }; + } +} + +export default { + queryMembersForTables, + getNgu, +}; diff --git a/src/validations/index.ts b/src/validations/index.ts index 6636c8c..3c31c79 100644 --- a/src/validations/index.ts +++ b/src/validations/index.ts @@ -9,7 +9,7 @@ import announcementValidation from './announcement.validation'; import moduleValidation from './module.validation'; import discourseValidation from './discourse.validation'; import nftValidation from './nft.validation'; - +import telegramValidation from './telegram.validation'; export { authValidation, guildValidation, @@ -22,4 +22,5 @@ export { moduleValidation, discourseValidation, nftValidation, + telegramValidation, }; diff --git a/src/validations/telegram.validation.ts b/src/validations/telegram.validation.ts new file mode 100644 index 0000000..65ceb2d --- /dev/null +++ b/src/validations/telegram.validation.ts @@ -0,0 +1,118 @@ +import Joi from 'joi'; +import { objectId } from './custom.validation'; + +const heatmapChart = { + params: Joi.object() + .required() + .keys({ + platformId: Joi.string().custom(objectId).required(), + }), + body: Joi.object().required().keys({ + startDate: Joi.date().required(), + endDate: Joi.date().required(), + timeZone: Joi.string().required(), + }), +}; + +const lineGraph = { + params: Joi.object() + .required() + .keys({ + platformId: Joi.string().custom(objectId).required(), + }), + body: Joi.object().required().keys({ + startDate: Joi.date().required(), + endDate: Joi.date().required(), + }), +}; + +const membersInteractionsNetworkGraph = { + params: Joi.object() + .required() + .keys({ + platformId: Joi.string().custom(objectId), + }), +}; + +const activeMembersCompositionTable = { + params: Joi.object() + .required() + .keys({ + platformId: Joi.string().custom(objectId), + }), + query: Joi.object() + .required() + .keys({ + activityComposition: Joi.array() + .items( + Joi.string().valid( + 'all_active', + 'all_new_active', + 'all_consistent', + 'all_vital', + 'all_new_disengaged', + 'others', + ), + ) + .single(), + ngu: Joi.string(), + sortBy: Joi.string(), + limit: Joi.number().integer(), + page: Joi.number().integer(), + }), +}; + +const activeMembersOnboardingTable = { + params: Joi.object() + .required() + .keys({ + platformId: Joi.string().custom(objectId), + }), + query: Joi.object() + .required() + .keys({ + activityComposition: Joi.array() + .items(Joi.string().valid('all_joined', 'all_new_active', 'all_still_active', 'all_dropped', 'others')) + .single(), + ngu: Joi.string(), + sortBy: Joi.string(), + limit: Joi.number().integer(), + page: Joi.number().integer(), + }), +}; + +const disengagedMembersCompositionTable = { + params: Joi.object() + .required() + .keys({ + platformId: Joi.string().custom(objectId), + }), + query: Joi.object() + .required() + .keys({ + activityComposition: Joi.array() + .items( + Joi.string().valid( + 'all_new_disengaged', + 'all_disengaged_were_newly_active', + 'all_disengaged_were_consistently_active', + 'all_disengaged_were_vital', + 'others', + ), + ) + .single(), + ngu: Joi.string(), + sortBy: Joi.string(), + limit: Joi.number().integer(), + page: Joi.number().integer(), + }), +}; + +export default { + heatmapChart, + lineGraph, + membersInteractionsNetworkGraph, + activeMembersCompositionTable, + activeMembersOnboardingTable, + disengagedMembersCompositionTable, +};