Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the multiple server-side requests bug #1408

Merged
merged 9 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
header?: Header;
/** to parse a HTML table and load the data */
from: HTMLElement;
storage: Storage<any>;

Check warning on line 38 in src/config.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

Unexpected any. Specify a different type

Check warning on line 38 in src/config.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

Unexpected any. Specify a different type

Check warning on line 38 in src/config.ts

View workflow job for this annotation

GitHub Actions / build (14.x)

Unexpected any. Specify a different type

Check warning on line 38 in src/config.ts

View workflow job for this annotation

GitHub Actions / build (14.x)

Unexpected any. Specify a different type
/** Pipeline process throttle timeout in milliseconds */
processingThrottleMs: number;
pipeline: Pipeline<Tabular>;
/** to automatically calculate the columns width */
autoWidth: boolean;
Expand All @@ -53,7 +55,7 @@
columns: OneDArray<TColumn | string | ComponentChild>;
search: SearchConfig | boolean;
language: Language;
plugins?: Plugin<any>[];

Check warning on line 58 in src/config.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

Unexpected any. Specify a different type

Check warning on line 58 in src/config.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

Unexpected any. Specify a different type

Check warning on line 58 in src/config.ts

View workflow job for this annotation

GitHub Actions / build (14.x)

Unexpected any. Specify a different type

Check warning on line 58 in src/config.ts

View workflow job for this annotation

GitHub Actions / build (14.x)

Unexpected any. Specify a different type
style?: Partial<{
table: CSSDeclaration;
td: CSSDeclaration;
Expand Down Expand Up @@ -128,6 +130,7 @@
tableRef: createRef(),
width: '100%',
height: 'auto',
processingThrottleMs: 100,
autoWidth: true,
style: {},
className: {},
Expand Down
2 changes: 1 addition & 1 deletion src/hooks/useSelector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export default function useSelector<T>(selector: (state) => T) {
});

return unsubscribe;
}, []);
}, [store, current]);

return current;
}
2 changes: 1 addition & 1 deletion src/pipeline/extractor/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@
} from '../processor';

interface StorageExtractorProps extends PipelineProcessorProps {
storage: Storage<any>;

Check warning on line 9 in src/pipeline/extractor/storage.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

Unexpected any. Specify a different type

Check warning on line 9 in src/pipeline/extractor/storage.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

Unexpected any. Specify a different type

Check warning on line 9 in src/pipeline/extractor/storage.ts

View workflow job for this annotation

GitHub Actions / build (14.x)

Unexpected any. Specify a different type

Check warning on line 9 in src/pipeline/extractor/storage.ts

View workflow job for this annotation

GitHub Actions / build (14.x)

Unexpected any. Specify a different type
}

class StorageExtractor extends PipelineProcessor<
Promise<StorageResponse>,
StorageResponse,
StorageExtractorProps
> {
get type(): ProcessorType {
return ProcessorType.Extractor;
}

async _process(opts: any): Promise<StorageResponse> {

Check warning on line 20 in src/pipeline/extractor/storage.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

Unexpected any. Specify a different type

Check warning on line 20 in src/pipeline/extractor/storage.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

Unexpected any. Specify a different type

Check warning on line 20 in src/pipeline/extractor/storage.ts

View workflow job for this annotation

GitHub Actions / build (14.x)

Unexpected any. Specify a different type

Check warning on line 20 in src/pipeline/extractor/storage.ts

View workflow job for this annotation

GitHub Actions / build (14.x)

Unexpected any. Specify a different type
return await this.props.storage.get(opts);
}
}
Expand Down
77 changes: 54 additions & 23 deletions src/pipeline/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ import { ID } from '../util/id';
import log from '../util/log';
import { EventEmitter } from '../util/eventEmitter';

interface PipelineEvents<T> {
interface PipelineEvents<R> {
/**
* Generic updated event. Triggers the callback function when the pipeline
* is updated, including when a new processor is registered, a processor's props
* get updated, etc.
*/
updated: (processor: PipelineProcessor<any, any>) => void;
updated: <T, P>(processor: PipelineProcessor<T, P>) => void;
/**
* Triggers the callback function when a new
* processor is registered successfully
Expand All @@ -27,27 +27,29 @@ interface PipelineEvents<T> {
* afterProcess will not be called if there is an
* error in the pipeline (i.e a step throw an Error)
*/
afterProcess: (prev: T) => void;
afterProcess: (prev: R) => void;
/**
* Triggers the callback function when the pipeline
* fails to process all steps or at least one step
* throws an Error
*/
error: (prev: T) => void;
error: <T>(prev: T) => void;
}

class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
class Pipeline<R> extends EventEmitter<PipelineEvents<R>> {
// available steps for this pipeline
private readonly _steps: Map<ProcessorType, PipelineProcessor<T, P>[]> =
new Map<ProcessorType, PipelineProcessor<T, P>[]>();
private readonly _steps: Map<
ProcessorType,
PipelineProcessor<unknown, unknown>[]
> = new Map<ProcessorType, PipelineProcessor<unknown, unknown>[]>();
// used to cache the results of processors using their id field
private cache: Map<string, any> = new Map<string, any>();
private cache: Map<string, unknown> = new Map<string, unknown>();
// keeps the index of the last updated processor in the registered
// processors list and will be used to invalidate the cache
// -1 means all new processors should be processed
private lastProcessorIndexUpdated = -1;

constructor(steps?: PipelineProcessor<any, any>[]) {
constructor(steps?: PipelineProcessor<unknown, unknown>[]) {
super();

if (steps) {
Expand All @@ -59,7 +61,7 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
* Clears the `cache` array
*/
clearCache(): void {
this.cache = new Map<string, any>();
this.cache = new Map<string, object>();
this.lastProcessorIndexUpdated = -1;
}

Expand All @@ -69,30 +71,57 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
* @param processor
* @param priority
*/
register(
processor: PipelineProcessor<any, any>,
register<T, P>(
processor: PipelineProcessor<T, P>,
priority: number = null,
): void {
if (!processor) return;
): PipelineProcessor<T, P> {
if (!processor) {
throw Error('Processor is not defined');
}

if (processor.type === null) {
throw Error('Processor type is not defined');
}

if (this.findProcessorIndexByID(processor.id) > -1) {
throw Error(`Processor ID ${processor.id} is already defined`);
}

// binding the propsUpdated callback to the Pipeline
processor.on('propsUpdated', this.processorPropsUpdated.bind(this));

this.addProcessorByPriority(processor, priority);
this.afterRegistered(processor);

return processor;
}

/**
* Tries to register a new processor
* @param processor
* @param priority
*/
tryRegister<T, P>(
processor: PipelineProcessor<T, P>,
priority: number = null,
): PipelineProcessor<T, P> | undefined {
try {
return this.register(processor, priority);
} catch (_) {
// noop
}

return undefined;
}

/**
* Removes a processor from the list
*
* @param processor
*/
unregister(processor: PipelineProcessor<any, any>): void {
unregister<T, P>(processor: PipelineProcessor<T, P>): void {
if (!processor) return;
if (this.findProcessorIndexByID(processor.id) === -1) return;

const subSteps = this._steps.get(processor.type);

Expand All @@ -111,7 +140,7 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
* @param processor
* @param priority
*/
private addProcessorByPriority(
private addProcessorByPriority<T, P>(
processor: PipelineProcessor<T, P>,
priority: number,
): void {
Expand Down Expand Up @@ -142,8 +171,8 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
/**
* Flattens the _steps Map and returns a list of steps with their correct priorities
*/
get steps(): PipelineProcessor<T, P>[] {
let steps: PipelineProcessor<T, P>[] = [];
get steps(): PipelineProcessor<unknown, unknown>[] {
let steps: PipelineProcessor<unknown, unknown>[] = [];

for (const type of this.getSortedProcessorTypes()) {
const subSteps = this._steps.get(type);
Expand All @@ -163,7 +192,7 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
*
* @param type
*/
getStepsByType(type: ProcessorType): PipelineProcessor<T, P>[] {
getStepsByType(type: ProcessorType): PipelineProcessor<unknown, unknown>[] {
return this.steps.filter((process) => process.type === type);
}

Expand All @@ -182,7 +211,7 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
*
* @param data
*/
async process(data?: T): Promise<T> {
async process(data?: R): Promise<R> {
const lastProcessorIndexUpdated = this.lastProcessorIndexUpdated;
const steps = this.steps;

Expand All @@ -197,11 +226,11 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
// updated processor was before "processor".
// This is to ensure that we always have correct and up to date
// data from processors and also to skip them when necessary
prev = await processor.process(prev);
prev = (await processor.process(prev)) as R;
this.cache.set(processor.id, prev);
} else {
// cached results already exist
prev = this.cache.get(processor.id);
prev = this.cache.get(processor.id) as R;
}
}
} catch (e) {
Expand Down Expand Up @@ -236,7 +265,9 @@ class Pipeline<T, P = unknown> extends EventEmitter<PipelineEvents<T>> {
* This is used to invalid or skip a processor in
* the process() method
*/
private setLastProcessorIndex(processor: PipelineProcessor<T, P>): void {
private setLastProcessorIndex<T, P>(
processor: PipelineProcessor<T, P>,
): void {
const processorIndex = this.findProcessorIndexByID(processor.id);

if (this.lastProcessorIndexUpdated > processorIndex) {
Expand Down
21 changes: 15 additions & 6 deletions src/pipeline/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// e.g. Extractor = 0 will be processed before Transformer = 1
import { generateUUID, ID } from '../util/id';
import { EventEmitter } from '../util/eventEmitter';
import { deepEqual } from '../util/deepEqual';

export enum ProcessorType {
Initiator,
Expand All @@ -15,8 +16,8 @@ export enum ProcessorType {
Limit,
}

interface PipelineProcessorEvents<T, P> {
propsUpdated: (processor: PipelineProcessor<T, P>) => void;
interface PipelineProcessorEvents {
propsUpdated: <T, P>(processor: PipelineProcessor<T, P>) => void;
beforeProcess: (...args) => void;
afterProcess: (...args) => void;
}
Expand All @@ -27,9 +28,9 @@ export interface PipelineProcessorProps {}
export abstract class PipelineProcessor<
T,
P extends Partial<PipelineProcessorProps>,
> extends EventEmitter<PipelineProcessorEvents<T, P>> {
> extends EventEmitter<PipelineProcessorEvents> {
public readonly id: ID;
private readonly _props: P;
private _props: P;

abstract get type(): ProcessorType;
protected abstract _process(...args): T | Promise<T>;
Expand Down Expand Up @@ -62,8 +63,16 @@ export abstract class PipelineProcessor<
}

setProps(props: Partial<P>): this {
Object.assign(this._props, props);
this.emit('propsUpdated', this);
const updatedProps = {
...this._props,
...props,
};

if (!deepEqual(updatedProps, this._props)) {
this._props = updatedProps;
this.emit('propsUpdated', this);
}

return this;
}

Expand Down
9 changes: 9 additions & 0 deletions src/util/deepEqual.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/**
* Returns true if both objects are equal
* @param a left object
* @param b right object
* @returns
*/
export function deepEqual<A, B>(a: A, b: B) {
return JSON.stringify(a) === JSON.stringify(b);
}
42 changes: 28 additions & 14 deletions src/util/throttle.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,35 @@
/**
* Throttle a given function
* @param fn Function to be called
* @param wait Throttle timeout in milliseconds
* @returns Throttled function
*/
export const throttle = (fn: (...args) => void, wait = 100) => {

Check warning on line 7 in src/util/throttle.ts

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🌿 Branch is not covered

Warning! Not covered branch
let inThrottle: boolean;
let lastFn: ReturnType<typeof setTimeout>;
let lastTime: number;
let timeoutId: ReturnType<typeof setTimeout>;
let lastTime = Date.now();

const execute = (...args) => {
lastTime = Date.now();
fn(...args);
};

return (...args) => {
if (!inThrottle) {
fn(...args);
lastTime = Date.now();
inThrottle = true;
const currentTime = Date.now();
const elapsed = currentTime - lastTime;

if (elapsed >= wait) {
// If enough time has passed since the last call, execute the function immediately
execute(args);
} else {
clearTimeout(lastFn);
lastFn = setTimeout(() => {
if (Date.now() - lastTime >= wait) {
fn(...args);
lastTime = Date.now();
}
}, Math.max(wait - (Date.now() - lastTime), 0));
// If not enough time has passed, schedule the function call after the remaining delay
if (timeoutId) {
clearTimeout(timeoutId);
}

timeoutId = setTimeout(() => {
execute(args);
timeoutId = null;
}, wait - elapsed);
}
};
};
39 changes: 20 additions & 19 deletions src/view/container.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import log from '../util/log';
import { useEffect } from 'preact/hooks';
import * as actions from './actions';
import { useStore } from '../hooks/useStore';
import useSelector from '../../src/hooks/useSelector';
import { useConfig } from '../../src/hooks/useConfig';
import useSelector from '../hooks/useSelector';
import { useConfig } from '../hooks/useConfig';
import { throttle } from '../util/throttle';

export function Container() {
const config = useConfig();
Expand All @@ -19,6 +20,23 @@ export function Container() {
const tableRef = useSelector((state) => state.tableRef);
const tempRef = createRef();

const processPipeline = throttle(async () => {
dispatch(actions.SetLoadingData());

try {
const data = await config.pipeline.process();
dispatch(actions.SetData(data));

// TODO: do we need this setTimemout?
setTimeout(() => {
dispatch(actions.SetStatusToRendered());
}, 0);
} catch (e) {
log.error(e);
dispatch(actions.SetDataErrored());
}
}, config.processingThrottleMs);

useEffect(() => {
// set the initial header object
// we update the header width later when "data"
Expand All @@ -41,23 +59,6 @@ export function Container() {
}
}, [data, config, tempRef]);

const processPipeline = async () => {
dispatch(actions.SetLoadingData());

try {
const data = await config.pipeline.process();
dispatch(actions.SetData(data));

// TODO: do we need this setTimemout?
setTimeout(() => {
dispatch(actions.SetStatusToRendered());
}, 0);
} catch (e) {
log.error(e);
dispatch(actions.SetDataErrored());
}
};

return (
<div
role="complementary"
Expand Down
Loading
Loading