diff --git a/next/api/src/controller/customer-service-action-log.ts b/next/api/src/controller/customer-service-action-log.ts index 0b3e7ceb4..deb492084 100644 --- a/next/api/src/controller/customer-service-action-log.ts +++ b/next/api/src/controller/customer-service-action-log.ts @@ -24,7 +24,8 @@ export class CustomerServiceActionLogController { @Query('to', ParseDatePipe) to: Date | undefined, @Query('operatorIds', ParseCsvPipe) operatorIds: string[] | undefined, @Query('pageSize', new ParseIntPipe({ min: 1, max: 1000 })) pageSize = 10, - @Query('desc', ParseBoolPipe) desc: boolean | undefined + @Query('desc', ParseBoolPipe) desc: boolean | undefined, + @Query('exclude', ParseCsvPipe) exclude: string[] | undefined ) { if (!from || !to) { throw new BadRequestError('Date range params "from" and "to" are required'); @@ -39,6 +40,7 @@ export class CustomerServiceActionLogController { operatorIds, limit: pageSize, desc, + exclude, }); const ticketIds = new Set(); diff --git a/next/api/src/service/customer-service-action-log.ts b/next/api/src/service/customer-service-action-log.ts index 43a1599d2..a598a6f86 100644 --- a/next/api/src/service/customer-service-action-log.ts +++ b/next/api/src/service/customer-service-action-log.ts @@ -1,10 +1,11 @@ -import { addMilliseconds, subMilliseconds } from 'date-fns'; +import { isEqual } from 'date-fns'; import _ from 'lodash'; import { OpsLog } from '@/model/OpsLog'; import { ReplyRevision } from '@/model/ReplyRevision'; import { User } from '@/model/User'; import { Reply } from '@/model/Reply'; +import { Model, Query, QueryBuilder } from '@/orm'; export interface GetCustomerServiceActionLogsOptions { from: Date; @@ -12,6 +13,7 @@ export interface GetCustomerServiceActionLogsOptions { operatorIds?: string[]; limit?: number; desc?: boolean; + exclude?: string[]; } export enum CustomerServiceActionLogType { @@ -41,36 +43,69 @@ interface Reader { read: () => Promise; } -interface BufferReaderValue { - value: T; - done: boolean; +type DateField = Extract< + { + [K in keyof T]: T[K] extends Date ? K : never; + }[keyof T], + string +>; + +interface DataReaderOptions { + range: [Date, Date]; + dateField: DateField>; + bufferSize: number; + onQuery?: (query: Query) => void; + desc?: boolean; + exclude?: string[]; } -interface BufferReaderOptions { - state: TState; - read: (state: TState) => Promise>; -} +class DataReader implements Reader> { + private range: [Date, Date]; + private dateField: DateField>; + private bufferSize: number; + private onQuery?: (query: Query) => void; + private desc?: boolean; + private exclude: string[]; -class BufferReader implements Reader { - private buffer: TData[] = []; + private buffer: any[] = []; private pos = 0; private done = false; - constructor(private options: BufferReaderOptions) {} - - private async load() { - if (this.done) { - return; - } - const value = await this.options.read(this.options.state); - this.buffer = [...this.buffer.slice(this.pos), ...value.value]; - this.pos = 0; - this.done = value.done; + constructor(private model: M, options: DataReaderOptions) { + this.range = options.range; + this.dateField = options.dateField; + this.bufferSize = options.bufferSize; + this.onQuery = options.onQuery; + this.desc = options.desc; + this.exclude = options.exclude || []; } - async peek() { - if (this.pos === this.buffer.length) { - await this.load(); + async peek(): Promise | undefined> { + if (!this.done && this.pos === this.buffer.length) { + const query = new QueryBuilder(this.model); + this.onQuery?.(query); + query.where(this.dateField, '>=', this.range[0]); + query.where(this.dateField, '<=', this.range[1]); + if (this.exclude.length) { + query.where('objectId', 'not-in', this.exclude); + } + query.orderBy(this.dateField, this.desc ? 'desc' : 'asc'); + query.limit(this.bufferSize); + const values = await query.find({ useMasterKey: true }); + this.buffer = values; + this.pos = 0; + this.done = values.length < this.bufferSize; + if (values.length) { + const lastDate = values[values.length - 1][this.dateField] as Date; + this.exclude = values + .filter((v) => isEqual(v[this.dateField] as Date, lastDate)) + .map((v) => v.id); + if (this.desc) { + this.range = [this.range[0], lastDate]; + } else { + this.range = [lastDate, this.range[1]]; + } + } } if (this.pos < this.buffer.length) { return this.buffer[this.pos]; @@ -78,11 +113,28 @@ class BufferReader implements Reader { } async read() { - if (this.pos === this.buffer.length) { - await this.load(); + const value = await this.peek(); + if (value) { + this.pos += 1; + return value; } - if (this.pos < this.buffer.length) { - return this.buffer[this.pos++]; + } +} + +class MapReader implements Reader { + constructor(private reader: Reader, private transfrom: (value: T) => U) {} + + async peek() { + const value = await this.reader.peek(); + if (value) { + return this.transfrom(value); + } + } + + async read() { + const value = await this.reader.read(); + if (value) { + return this.transfrom(value); } } } @@ -127,151 +179,98 @@ async function take(reader: Reader, count: number) { return result; } +function convertReplyToActionLog(reply: Reply): CustomerServiceActionLog { + return { + id: reply.id, + type: CustomerServiceActionLogType.Reply, + operatorId: reply.authorId, + reply, + ts: reply.createdAt, + }; +} + +function convertReplyRevisionToActionLog(rv: ReplyRevision): CustomerServiceActionLog { + return { + id: rv.id, + type: CustomerServiceActionLogType.Reply, + operatorId: rv.operatorId, + reply: rv.reply, + revision: rv, + ts: rv.actionTime, + }; +} + +function convertOpsLogToActionLog(opsLog: OpsLog): CustomerServiceActionLog { + return { + id: opsLog.id, + type: CustomerServiceActionLogType.OpsLog, + operatorId: opsLog.data.operator.objectId, + opsLog, + ts: opsLog.createdAt, + }; +} + export class CustomerServiceActionLogService { async getLogs(options: GetCustomerServiceActionLogsOptions) { - const { limit = 10, desc } = options; + const { from, to, limit = 10, desc, operatorIds, exclude } = options; const perCount = limit <= 100 ? limit : Math.min(Math.floor(limit / 2), 1000); - const replyReader = new BufferReader({ - state: { - window: [options.from, options.to], - operatorIds: options.operatorIds, - desc: options.desc, - perCount, - }, - read: async (state) => { - const query = Reply.queryBuilder() - .where('createdAt', '>=', state.window[0]) - .where('createdAt', '<=', state.window[1]) - .where('isCustomerService', '==', true) - .limit(state.perCount) - .orderBy('createdAt', state.desc ? 'desc' : 'asc'); - if (state.operatorIds) { - const pointers = state.operatorIds.map(User.ptr.bind(User)); - query.where('author', 'in', pointers); - } - - const replies = await query.find({ useMasterKey: true }); + const operatorPointers = operatorIds?.map(User.ptr.bind(User)); - if (replies.length) { - const last = replies[replies.length - 1]; - if (state.desc) { - state.window[1] = subMilliseconds(last.createdAt, 1); - } else { - state.window[0] = addMilliseconds(last.createdAt, 1); - } + const replyReader = new DataReader(Reply, { + range: [from, to], + dateField: 'createdAt', + bufferSize: perCount, + onQuery: (query) => { + query.where('isCustomerService', '==', true); + if (operatorPointers) { + query.where('author', 'in', operatorPointers); } - - const value = replies.map((reply) => ({ - id: reply.id, - type: CustomerServiceActionLogType.Reply, - operatorId: reply.authorId, - reply, - ts: reply.createdAt, - })); - - return { - value, - done: replies.length < state.perCount, - }; }, + desc, + exclude, }); - const replyRevisionReader = new BufferReader({ - state: { - window: [options.from, options.to], - operatorIds: options.operatorIds, - desc: options.desc, - perCount, - }, - read: async (state) => { - const query = ReplyRevision.queryBuilder() - .where('actionTime', '>=', state.window[0]) - .where('actionTime', '<=', state.window[1]) - .where('action', 'in', ['update', 'delete']) - .preload('reply') - .limit(state.perCount) - .orderBy('actionTime', state.desc ? 'desc' : 'asc'); - if (state.operatorIds) { - const pointers = state.operatorIds.map(User.ptr.bind(User)); - query.where('operator', 'in', pointers); - } - - const revisions = await query.find({ useMasterKey: true }); - - if (revisions.length) { - const last = revisions[revisions.length - 1]; - if (state.desc) { - state.window[1] = subMilliseconds(last.actionTime, 1); - } else { - state.window[0] = addMilliseconds(last.actionTime, 1); - } + const replyRevisionReader = new DataReader(ReplyRevision, { + range: [from, to], + dateField: 'actionTime', + bufferSize: perCount, + onQuery: (query) => { + query.where('action', 'in', ['update', 'delete']); + if (operatorPointers) { + query.where('operator', 'in', operatorPointers); } - - const value = revisions.map((rv) => ({ - id: rv.id, - type: CustomerServiceActionLogType.Reply, - operatorId: rv.operatorId, - reply: rv.reply, - revision: rv, - ts: rv.actionTime, - })); - - return { - value, - done: revisions.length < state.perCount, - }; + query.preload('reply'); }, + desc, + exclude, }); - const opsLogReader = new BufferReader({ - state: { - window: [options.from, options.to], - operatorIds: options.operatorIds, - desc: options.desc, - perCount, - }, - read: async (state) => { - const query = OpsLog.queryBuilder() - .where('createdAt', '>=', state.window[0]) - .where('createdAt', '<=', state.window[1]) - .limit(state.perCount) - .orderBy('createdAt', state.desc ? 'desc' : 'asc'); - if (state.operatorIds) { - query.where('data.operator.objectId', 'in', state.operatorIds); + const opsLogReader = new DataReader(OpsLog, { + range: [from, to], + dateField: 'createdAt', + bufferSize: perCount, + onQuery: (query) => { + if (operatorIds) { + query.where('data.operator.objectId', 'in', operatorIds); } else { query.where('data.operator.objectId', 'exists'); query.where('data.operator.objectId', '!=', 'system'); } - - const opsLogs = await query.find({ useMasterKey: true }); - - if (opsLogs.length) { - const last = opsLogs[opsLogs.length - 1]; - if (state.desc) { - state.window[1] = subMilliseconds(last.createdAt, 1); - } else { - state.window[0] = addMilliseconds(last.createdAt, 1); - } - } - - const value = opsLogs.map((opsLog) => ({ - id: opsLog.id, - type: CustomerServiceActionLogType.OpsLog, - operatorId: opsLog.data.operator.objectId, - opsLog, - ts: opsLog.createdAt, - })); - - return { - value, - done: opsLogs.length < state.perCount, - }; }, + desc, + exclude, }); + const replyLogReader = new MapReader(replyReader, convertReplyToActionLog); + const replyRevisionLogReader = new MapReader( + replyRevisionReader, + convertReplyRevisionToActionLog + ); + const opsLogLogReader = new MapReader(opsLogReader, convertOpsLogToActionLog); + const sortReader = new SortReader( - [replyReader, replyRevisionReader, opsLogReader], + [replyLogReader, replyRevisionLogReader, opsLogLogReader], desc ? (a, b) => b.ts.getTime() - a.ts.getTime() : (a, b) => a.ts.getTime() - b.ts.getTime() ); diff --git a/next/web/src/App/Admin/Stats/CustomerServiceAction/action-log-collector.ts b/next/web/src/App/Admin/Stats/CustomerServiceAction/action-log-collector.ts index 4511a4edb..eef4871ef 100644 --- a/next/web/src/App/Admin/Stats/CustomerServiceAction/action-log-collector.ts +++ b/next/web/src/App/Admin/Stats/CustomerServiceAction/action-log-collector.ts @@ -14,6 +14,7 @@ export interface ActionLogCollectorOptions { export class ActionLogCollector { private window: [Date, Date]; private operatorIds?: string[]; + private exclude?: string[]; private logChunks: T[][] = []; private transform: (data: GetCustomerServiceActionLogsResult) => T[]; @@ -42,6 +43,7 @@ export class ActionLogCollector { to: this.window[1].toISOString(), operatorIds: this.operatorIds, pageSize, + exclude: this.exclude, }); const logs = this.transform(data); @@ -56,9 +58,9 @@ export class ActionLogCollector { return; } - this.window[0] = moment(data.logs[data.logs.length - 1].ts) - .add(1, 'ms') - .toDate(); + const cursor = new Date(data.logs[data.logs.length - 1].ts); + this.window[0] = cursor; + this.exclude = data.logs.filter((log) => moment(log.ts).isSame(cursor)).map((log) => log.id); setTimeout(() => this._collect(), 100); } catch (error) { diff --git a/next/web/src/App/Admin/Stats/CustomerServiceAction/index.tsx b/next/web/src/App/Admin/Stats/CustomerServiceAction/index.tsx index 85716afbc..19b854fbe 100644 --- a/next/web/src/App/Admin/Stats/CustomerServiceAction/index.tsx +++ b/next/web/src/App/Admin/Stats/CustomerServiceAction/index.tsx @@ -63,6 +63,7 @@ export function CustomerServiceAction() { desc?: boolean; hasPrevPage?: boolean; hasNextPage?: boolean; + exclude?: string[]; }>({}); const options = useMemo(() => { @@ -76,8 +77,9 @@ export function CustomerServiceAction() { // 多拿一个用来判断是否还有下一页 pageSize: pageSize + 1, desc: pagination.desc || undefined, + exclude: pagination.exclude, }; - }, [filters, pagination.cursor, pagination.desc]); + }, [filters, pagination.cursor, pagination.desc, pagination.exclude]); const { data, isFetching } = useQuery({ enabled: !!options, @@ -102,17 +104,19 @@ export function CustomerServiceAction() { const changePage = (direction: 'prev' | 'next') => { if (!logs || !logs.length) return; if (direction === 'prev') { - const firstLog = logs[0]; + const cursor = moment(logs[0].ts); setPagination({ - cursor: moment(firstLog.ts).subtract(1, 'ms'), + cursor, desc: true, hasNextPage: true, + exclude: logs.filter((log) => moment(log.ts).isSame(cursor)).map((log) => log.id), }); } else { - const lastLog = logs[logs.length - 1]; + const cursor = moment(logs[logs.length - 1].ts); setPagination({ - cursor: moment(lastLog.ts).add(1, 'ms'), + cursor, hasPrevPage: true, + exclude: logs.filter((log) => moment(log.ts).isSame(cursor)).map((log) => log.id), }); } }; diff --git a/next/web/src/api/customer-service-action-log.ts b/next/web/src/api/customer-service-action-log.ts index cc20c234b..741dbf7af 100644 --- a/next/web/src/api/customer-service-action-log.ts +++ b/next/web/src/api/customer-service-action-log.ts @@ -28,6 +28,7 @@ export interface GetCustomerServiceActionLogsOptions { operatorIds?: string[]; pageSize?: number; desc?: boolean; + exclude?: string[]; } export interface GetCustomerServiceActionLogsResult { @@ -42,6 +43,7 @@ export async function getCustomerServiceActionLogs({ operatorIds, pageSize, desc, + exclude, }: GetCustomerServiceActionLogsOptions) { const res = await http.get( '/api/2/customer-service-action-logs', @@ -52,6 +54,7 @@ export async function getCustomerServiceActionLogs({ operatorIds: operatorIds?.join(','), pageSize, desc, + exclude: exclude?.join(','), }, } );