Skip to content

Commit

Permalink
fix: exported action log is incomplete
Browse files Browse the repository at this point in the history
  • Loading branch information
sdjdd committed Mar 26, 2024
1 parent 323ac14 commit 586da17
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 160 deletions.
4 changes: 3 additions & 1 deletion next/api/src/controller/customer-service-action-log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ export class CustomerServiceActionLogController {
@Query('to', ParseDatePipe) to: Date | undefined,
@Query('operatorIds', ParseCsvPipe) operatorIds: string[] | undefined,
@Query('pageSize', new ParseIntPipe({ min: 1, max: 1000 })) pageSize = 10,
@Query('desc', ParseBoolPipe) desc: boolean | undefined
@Query('desc', ParseBoolPipe) desc: boolean | undefined,
@Query('exclude', ParseCsvPipe) exclude: string[] | undefined
) {
if (!from || !to) {
throw new BadRequestError('Date range params "from" and "to" are required');
Expand All @@ -39,6 +40,7 @@ export class CustomerServiceActionLogController {
operatorIds,
limit: pageSize,
desc,
exclude,
});

const ticketIds = new Set<string>();
Expand Down
301 changes: 150 additions & 151 deletions next/api/src/service/customer-service-action-log.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import { addMilliseconds, subMilliseconds } from 'date-fns';
import { isEqual } from 'date-fns';
import _ from 'lodash';

import { OpsLog } from '@/model/OpsLog';
import { ReplyRevision } from '@/model/ReplyRevision';
import { User } from '@/model/User';
import { Reply } from '@/model/Reply';
import { Model, Query, QueryBuilder } from '@/orm';

export interface GetCustomerServiceActionLogsOptions {
from: Date;
to: Date;
operatorIds?: string[];
limit?: number;
desc?: boolean;
exclude?: string[];
}

export enum CustomerServiceActionLogType {
Expand Down Expand Up @@ -41,48 +43,98 @@ interface Reader<T = any> {
read: () => Promise<T | undefined>;
}

interface BufferReaderValue<T> {
value: T;
done: boolean;
type DateField<T> = Extract<
{
[K in keyof T]: T[K] extends Date ? K : never;
}[keyof T],
string
>;

interface DataReaderOptions<M extends typeof Model> {
range: [Date, Date];
dateField: DateField<InstanceType<M>>;
bufferSize: number;
onQuery?: (query: Query<M>) => void;
desc?: boolean;
exclude?: string[];
}

interface BufferReaderOptions<TState, TData> {
state: TState;
read: (state: TState) => Promise<BufferReaderValue<TData[]>>;
}
class DataReader<M extends typeof Model> implements Reader<InstanceType<M>> {
private range: [Date, Date];
private dateField: DateField<InstanceType<M>>;
private bufferSize: number;
private onQuery?: (query: Query<M>) => void;
private desc?: boolean;
private exclude: string[];

class BufferReader<TData, TState> implements Reader<TData> {
private buffer: TData[] = [];
private buffer: any[] = [];
private pos = 0;
private done = false;

constructor(private options: BufferReaderOptions<TState, TData>) {}

private async load() {
if (this.done) {
return;
}
const value = await this.options.read(this.options.state);
this.buffer = [...this.buffer.slice(this.pos), ...value.value];
this.pos = 0;
this.done = value.done;
constructor(private model: M, options: DataReaderOptions<M>) {
this.range = options.range;
this.dateField = options.dateField;
this.bufferSize = options.bufferSize;
this.onQuery = options.onQuery;
this.desc = options.desc;
this.exclude = options.exclude || [];
}

async peek() {
if (this.pos === this.buffer.length) {
await this.load();
async peek(): Promise<InstanceType<M> | undefined> {
if (!this.done && this.pos === this.buffer.length) {
const query = new QueryBuilder(this.model);
this.onQuery?.(query);
query.where(this.dateField, '>=', this.range[0]);
query.where(this.dateField, '<=', this.range[1]);
if (this.exclude.length) {
query.where('objectId', 'not-in', this.exclude);
}
query.orderBy(this.dateField, this.desc ? 'desc' : 'asc');
query.limit(this.bufferSize);
const values = await query.find({ useMasterKey: true });
this.buffer = values;
this.pos = 0;
this.done = values.length < this.bufferSize;
if (values.length) {
const lastDate = values[values.length - 1][this.dateField] as Date;
this.exclude = values
.filter((v) => isEqual(v[this.dateField] as Date, lastDate))
.map((v) => v.id);
if (this.desc) {
this.range = [this.range[0], lastDate];
} else {
this.range = [lastDate, this.range[1]];
}
}
}
if (this.pos < this.buffer.length) {
return this.buffer[this.pos];
}
}

async read() {
if (this.pos === this.buffer.length) {
await this.load();
const value = await this.peek();
if (value) {
this.pos += 1;
return value;
}
if (this.pos < this.buffer.length) {
return this.buffer[this.pos++];
}
}

class MapReader<T, U> implements Reader<U> {
constructor(private reader: Reader<T>, private transfrom: (value: T) => U) {}

async peek() {
const value = await this.reader.peek();
if (value) {
return this.transfrom(value);
}
}

async read() {
const value = await this.reader.read();
if (value) {
return this.transfrom(value);
}
}
}
Expand Down Expand Up @@ -127,151 +179,98 @@ async function take<T>(reader: Reader<T>, count: number) {
return result;
}

function convertReplyToActionLog(reply: Reply): CustomerServiceActionLog {
return {
id: reply.id,
type: CustomerServiceActionLogType.Reply,
operatorId: reply.authorId,
reply,
ts: reply.createdAt,
};
}

function convertReplyRevisionToActionLog(rv: ReplyRevision): CustomerServiceActionLog {
return {
id: rv.id,
type: CustomerServiceActionLogType.Reply,
operatorId: rv.operatorId,
reply: rv.reply,
revision: rv,
ts: rv.actionTime,
};
}

function convertOpsLogToActionLog(opsLog: OpsLog): CustomerServiceActionLog {
return {
id: opsLog.id,
type: CustomerServiceActionLogType.OpsLog,
operatorId: opsLog.data.operator.objectId,
opsLog,
ts: opsLog.createdAt,
};
}

export class CustomerServiceActionLogService {
async getLogs(options: GetCustomerServiceActionLogsOptions) {
const { limit = 10, desc } = options;
const { from, to, limit = 10, desc, operatorIds, exclude } = options;
const perCount = limit <= 100 ? limit : Math.min(Math.floor(limit / 2), 1000);

const replyReader = new BufferReader({
state: {
window: [options.from, options.to],
operatorIds: options.operatorIds,
desc: options.desc,
perCount,
},
read: async (state) => {
const query = Reply.queryBuilder()
.where('createdAt', '>=', state.window[0])
.where('createdAt', '<=', state.window[1])
.where('isCustomerService', '==', true)
.limit(state.perCount)
.orderBy('createdAt', state.desc ? 'desc' : 'asc');
if (state.operatorIds) {
const pointers = state.operatorIds.map(User.ptr.bind(User));
query.where('author', 'in', pointers);
}

const replies = await query.find({ useMasterKey: true });
const operatorPointers = operatorIds?.map(User.ptr.bind(User));

if (replies.length) {
const last = replies[replies.length - 1];
if (state.desc) {
state.window[1] = subMilliseconds(last.createdAt, 1);
} else {
state.window[0] = addMilliseconds(last.createdAt, 1);
}
const replyReader = new DataReader(Reply, {
range: [from, to],
dateField: 'createdAt',
bufferSize: perCount,
onQuery: (query) => {
query.where('isCustomerService', '==', true);
if (operatorPointers) {
query.where('author', 'in', operatorPointers);
}

const value = replies.map<CustomerServiceActionLog>((reply) => ({
id: reply.id,
type: CustomerServiceActionLogType.Reply,
operatorId: reply.authorId,
reply,
ts: reply.createdAt,
}));

return {
value,
done: replies.length < state.perCount,
};
},
desc,
exclude,
});

const replyRevisionReader = new BufferReader({
state: {
window: [options.from, options.to],
operatorIds: options.operatorIds,
desc: options.desc,
perCount,
},
read: async (state) => {
const query = ReplyRevision.queryBuilder()
.where('actionTime', '>=', state.window[0])
.where('actionTime', '<=', state.window[1])
.where('action', 'in', ['update', 'delete'])
.preload('reply')
.limit(state.perCount)
.orderBy('actionTime', state.desc ? 'desc' : 'asc');
if (state.operatorIds) {
const pointers = state.operatorIds.map(User.ptr.bind(User));
query.where('operator', 'in', pointers);
}

const revisions = await query.find({ useMasterKey: true });

if (revisions.length) {
const last = revisions[revisions.length - 1];
if (state.desc) {
state.window[1] = subMilliseconds(last.actionTime, 1);
} else {
state.window[0] = addMilliseconds(last.actionTime, 1);
}
const replyRevisionReader = new DataReader(ReplyRevision, {
range: [from, to],
dateField: 'actionTime',
bufferSize: perCount,
onQuery: (query) => {
query.where('action', 'in', ['update', 'delete']);
if (operatorPointers) {
query.where('operator', 'in', operatorPointers);
}

const value = revisions.map<CustomerServiceActionLog>((rv) => ({
id: rv.id,
type: CustomerServiceActionLogType.Reply,
operatorId: rv.operatorId,
reply: rv.reply,
revision: rv,
ts: rv.actionTime,
}));

return {
value,
done: revisions.length < state.perCount,
};
query.preload('reply');
},
desc,
exclude,
});

const opsLogReader = new BufferReader({
state: {
window: [options.from, options.to],
operatorIds: options.operatorIds,
desc: options.desc,
perCount,
},
read: async (state) => {
const query = OpsLog.queryBuilder()
.where('createdAt', '>=', state.window[0])
.where('createdAt', '<=', state.window[1])
.limit(state.perCount)
.orderBy('createdAt', state.desc ? 'desc' : 'asc');
if (state.operatorIds) {
query.where('data.operator.objectId', 'in', state.operatorIds);
const opsLogReader = new DataReader(OpsLog, {
range: [from, to],
dateField: 'createdAt',
bufferSize: perCount,
onQuery: (query) => {
if (operatorIds) {
query.where('data.operator.objectId', 'in', operatorIds);
} else {
query.where('data.operator.objectId', 'exists');
query.where('data.operator.objectId', '!=', 'system');
}

const opsLogs = await query.find({ useMasterKey: true });

if (opsLogs.length) {
const last = opsLogs[opsLogs.length - 1];
if (state.desc) {
state.window[1] = subMilliseconds(last.createdAt, 1);
} else {
state.window[0] = addMilliseconds(last.createdAt, 1);
}
}

const value = opsLogs.map<CustomerServiceActionLog>((opsLog) => ({
id: opsLog.id,
type: CustomerServiceActionLogType.OpsLog,
operatorId: opsLog.data.operator.objectId,
opsLog,
ts: opsLog.createdAt,
}));

return {
value,
done: opsLogs.length < state.perCount,
};
},
desc,
exclude,
});

const replyLogReader = new MapReader(replyReader, convertReplyToActionLog);
const replyRevisionLogReader = new MapReader(
replyRevisionReader,
convertReplyRevisionToActionLog
);
const opsLogLogReader = new MapReader(opsLogReader, convertOpsLogToActionLog);

const sortReader = new SortReader(
[replyReader, replyRevisionReader, opsLogReader],
[replyLogReader, replyRevisionLogReader, opsLogLogReader],
desc ? (a, b) => b.ts.getTime() - a.ts.getTime() : (a, b) => a.ts.getTime() - b.ts.getTime()
);

Expand Down
Loading

0 comments on commit 586da17

Please sign in to comment.