From fff03505cf4d62e7153a0bfda267f04d531be0ef Mon Sep 17 00:00:00 2001 From: Zefir Kirilov Date: Sat, 19 Aug 2023 19:16:43 +0300 Subject: [PATCH 01/10] implement Scheduler --- src/Scheduler.ts | 310 +++++++++++++++++++++++++++++++++++++++++++++++ src/Server.ts | 3 + 2 files changed, 313 insertions(+) create mode 100644 src/Scheduler.ts diff --git a/src/Scheduler.ts b/src/Scheduler.ts new file mode 100644 index 0000000..7353d50 --- /dev/null +++ b/src/Scheduler.ts @@ -0,0 +1,310 @@ +import EventEmitter from "node:events"; +import {randomUUID} from "node:crypto"; +import TypedEventEmitter from "./types/TypedEventEmitter"; + +class Scheduler { + /** + * Scheduler age (in ticks) + */ + #age: number = 0; + + /** + * Whether the scheduler is running + */ + #running: boolean = false; + + /** + * Scheduler tasks + */ + #tasks: Scheduler.Task[] = []; + + #schedulerStopResolve: ((value: true | PromiseLike) => void) | null = null; + #schedulerStopPromise: Promise | null = null; + + /** + * Time of last tick + */ + private lastTick: Date = new Date(); + + /** + * Create scheduler + * + * @param frequency Scheduler clock frequency in Hz + * @param [start] Start scheduler + */ + public constructor(public readonly frequency: number, start?: boolean) { + if (start) this.start(); + } + + /** + * Start scheduler + */ + public start(): void { + this.#running = true; + this.#schedulerStopPromise = new Promise(r => this.#schedulerStopResolve = r); + this._nextTick(); + } + + /** + * Stop scheduler. The scheduler can be re-started afterwards and any previously scheduled tasks will continue being executed. + * @returns Promise that resolves when the scheduler has paused. The promise resolves to false if the scheduler was not running. + */ + public pause(): Promise { + if (!this.#running) return Promise.resolve(false); + this.#running = false; + return this.#schedulerStopPromise!; + } + + /** + * Terminate scheduler. The scheduler is stopped and any previously scheduled tasks are marked as not planned and then deleted. + * @returns Promise that resolves when the scheduler has stopped. The promise resolves to false if the scheduler was not running. + */ + public stop(): Promise { + if (!this.#running) return Promise.resolve(false); + this.#running = false; + this.#tasks.forEach(task => { + task.emit("notPlanned"); + this._delete(task); + }); + return this.#schedulerStopPromise!; + } + + /** + * Scheduler age + */ + public get age(): number { + return this.#age; + } + + /** + * Whether the scheduler is running + */ + public get running(): boolean { + return this.#running; + } + + /** + * Convert milliseconds to scheduler ticks + * + * @param ms Milliseconds + */ + public msToTicks(ms: number): number { + return ms / (1000 / this.frequency); + } + + /** + * Convert scheduler ticks to milliseconds + * + * @param ticks Ticks + */ + public ticksToMs(ticks: number): number { + return ticks * (1000 / this.frequency); + } + + /** + * Estimate scheduler age at a specific date + * + * > [!NOTE] + * > If the scheduler is paused, IRL time will pass without the scheduler aging, resulting in incorrect estimation. + * > This estimation will only be correct if the scheduler is not paused (or terminated) before the given date. + * + * @param date Date to estimate scheduler age at + */ + public estimateAge(date: Date): number { + return this.age + this.msToTicks(date.getTime() - this.lastTick.getTime()); + } + + /** + * Scheduler tick + */ + private tick(): void { + const now = new Date(); + if (now.getTime() - this.lastTick.getTime() < this.ticksToMs(1)) return this._nextTick(); + ++this.#age; + this.lastTick = now; + + const tasks = this.#tasks.filter(task => task.targetAge <= this.#age).sort((a, b) => a.targetAge - b.targetAge); + for (const task of tasks) { + this._delete(task); + task.run(); + } + + this._nextTick(); + } + + /** + * Request next tick + */ + private _nextTick(): void { + if (!this.#running) { + if (this.#schedulerStopResolve) { + this.#schedulerStopResolve(true); + this.#schedulerStopResolve = null; + } + return; + } + setImmediate(this.tick.bind(this)); + } + + /** + * Schedule task to run at a specific scheduler age (tick) + * + * @param code Task code + * @param delay Target scheduler age (tick) to run task at + */ + public scheduleAge(code: () => void, delay: number): Scheduler.Task { + const task = new Scheduler.Task(code, this.#age + delay, this); + this.#tasks.push(task); + return task; + } + + /** + * Schedule task to run after the specified amount of ticks + * + * @param code Task code + * @param ticks Number of ticks to wait before running the task + */ + public scheduleTicks(code: () => void, ticks: number): Scheduler.Task { + return this.scheduleAge(code, this.age + ticks); + } + + /** + * Schedule task to be executed as soon as possible + * + * @param code Task code + */ + public schedule(code: () => void): Scheduler.Task { + return this.scheduleTicks(code, 0); + } + + /** + * Delete task from the scheduler queue + * + * @param task Task to cancel + * @internal + */ + public _delete(task: Scheduler.Task): boolean { + const index = this.#tasks.indexOf(task); + if (index < 0) return false; + this.#tasks.splice(index, 1); + return true; + } + + /** + * Cancel execution of a task + * + * @param task Task to cancel + * @returns `false` if the task was not found in the scheduler queue (possibly already executed), `true` otherwise + */ + public cancel(task: Scheduler.Task): boolean { + const deleted = this._delete(task); + if (deleted) task.emit("cancelled"); + return deleted; + } + + /** + * Get task from the scheduler queue by ID + * + * @param id Task ID + */ + public getTaskById(id: string): Scheduler.Task | undefined { + return this.#tasks.find(task => task.id === id); + } +} + +namespace Scheduler { + type TaskEvents = { + /** + * Task is not planned to be executed due to the scheduler being terminated + */ + "notPlanned": () => void; + + /** + * Task is cancelled + */ + "cancelled": () => void; + } + + /** + * Scheduler task + */ + export class Task extends (EventEmitter as new () => TypedEventEmitter) { + /** + * Task ID + */ + public readonly id = randomUUID(); + + /** + * Task code + */ + private readonly code: () => void; + + /** + * Target scheduler age (tick) to run task at + */ + public readonly targetAge: number; + + /** + * Task scheduler + */ + public readonly scheduler: Scheduler; + + /** + * Whether the task has been executed + */ + #executed: boolean = false; + + /** + * Create scheduler task + * + * @param code Task code + * @param targetAge Target scheduler age + * @param scheduler Scheduler + */ + public constructor(code: () => void, targetAge: number, scheduler: Scheduler) { + super(); + this.code = code; + this.targetAge = targetAge; + this.scheduler = scheduler; + } + + /** + * Whether the task has been executed + */ + public get executed(): boolean { + return this.#executed; + } + + /** + * The remaining ticks before the task is run. + * + * - `0`: the task is being run + * - positive int: the task will be run in this many ticks + * - negative int: the task was run this many ticks ago + * + * To check if the task was actually run, use {@link Task#executed} + */ + public get remainingTicks(): number { + return this.targetAge - this.scheduler.age; + } + + /** + * Cancel execution of this task + * @see Scheduler#cancel + */ + public cancel(): boolean { + return this.scheduler.cancel(this); + } + + /** + * Run task + * @internal + */ + public run(): void { + this.code(); + this.#executed = true; + } + } +} + +export default Scheduler; diff --git a/src/Server.ts b/src/Server.ts index 4ab6348..38917ba 100644 --- a/src/Server.ts +++ b/src/Server.ts @@ -10,6 +10,7 @@ import Connection from "./Connection.js"; import HandshakePacket from "./packet/client/HandshakePacket"; import LoginPacket from "./packet/client/LoginPacket"; import { Config } from "./Config.js"; +import Scheduler from "./Scheduler.js"; type ServerEvents = { /** @@ -67,6 +68,7 @@ type ServerEvents = { export default class Server extends (EventEmitter as new () => TypedEventEmitter) { private readonly server = net.createServer(); public readonly logger: Logger; + public readonly scheduler: Scheduler = new Scheduler(20); public readonly connections: ConnectionPool = new ConnectionPool(); public static readonly path: string = path.dirname(path.join(new URL(import.meta.url).pathname, "..")); @@ -94,6 +96,7 @@ export default class Server extends (EventEmitter as new () => TypedEventEmitter }), this.connections.disconnectAll(this.config.shutdownKickReason), ]); + await this.scheduler.stop(); this.emit("closed"); } From 45759ac9f1b34721c66edd61a6684115ea0375fe Mon Sep 17 00:00:00 2001 From: Zefir Kirilov Date: Sat, 19 Aug 2023 19:20:37 +0300 Subject: [PATCH 02/10] make scheduler tasks readonly --- src/Scheduler.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Scheduler.ts b/src/Scheduler.ts index 7353d50..d23efc7 100644 --- a/src/Scheduler.ts +++ b/src/Scheduler.ts @@ -16,7 +16,7 @@ class Scheduler { /** * Scheduler tasks */ - #tasks: Scheduler.Task[] = []; + readonly #tasks: Scheduler.Task[] = []; #schedulerStopResolve: ((value: true | PromiseLike) => void) | null = null; #schedulerStopPromise: Promise | null = null; From 0adc4324ba7bea8a44dfdbb9ff2e54b559e5f217 Mon Sep 17 00:00:00 2001 From: Zefir Kirilov Date: Sun, 20 Aug 2023 17:05:36 +0300 Subject: [PATCH 03/10] scheduler events --- src/Scheduler.ts | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/src/Scheduler.ts b/src/Scheduler.ts index d23efc7..e52a337 100644 --- a/src/Scheduler.ts +++ b/src/Scheduler.ts @@ -2,7 +2,24 @@ import EventEmitter from "node:events"; import {randomUUID} from "node:crypto"; import TypedEventEmitter from "./types/TypedEventEmitter"; -class Scheduler { +type SchedulerEvents = { + /** + * Scheduler is paused + */ + paused: () => void; + + /** + * Scheduler is started/resumed + */ + started: () => void; + + /** + * Scheduler terminated + */ + terminating: () => void; +} + +class Scheduler extends (EventEmitter as new () => TypedEventEmitter) { /** * Scheduler age (in ticks) */ @@ -33,6 +50,7 @@ class Scheduler { * @param [start] Start scheduler */ public constructor(public readonly frequency: number, start?: boolean) { + super(); if (start) this.start(); } @@ -43,6 +61,7 @@ class Scheduler { this.#running = true; this.#schedulerStopPromise = new Promise(r => this.#schedulerStopResolve = r); this._nextTick(); + this.emit("started"); } /** @@ -52,6 +71,7 @@ class Scheduler { public pause(): Promise { if (!this.#running) return Promise.resolve(false); this.#running = false; + this.emit("paused"); return this.#schedulerStopPromise!; } @@ -66,6 +86,7 @@ class Scheduler { task.emit("notPlanned"); this._delete(task); }); + this.emit("terminating"); return this.#schedulerStopPromise!; } From a05d2ac0d8a61ced4d700651efb486643f6c1e00 Mon Sep 17 00:00:00 2001 From: Zefir Kirilov Date: Sun, 20 Aug 2023 17:06:59 +0300 Subject: [PATCH 04/10] rename Scheduler#_delete to Scheduler#delete --- src/Scheduler.ts | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/Scheduler.ts b/src/Scheduler.ts index e52a337..0ab7f9a 100644 --- a/src/Scheduler.ts +++ b/src/Scheduler.ts @@ -84,8 +84,8 @@ class Scheduler extends (EventEmitter as new () => TypedEventEmitter { task.emit("notPlanned"); - this._delete(task); }); + this.delete(task); this.emit("terminating"); return this.#schedulerStopPromise!; } @@ -143,10 +143,9 @@ class Scheduler extends (EventEmitter as new () => TypedEventEmitter task.targetAge <= this.#age).sort((a, b) => a.targetAge - b.targetAge); for (const task of tasks) { - this._delete(task); + this.delete(task); task.run(); } @@ -204,7 +203,7 @@ class Scheduler extends (EventEmitter as new () => TypedEventEmitter TypedEventEmitter Date: Sun, 20 Aug 2023 17:07:14 +0300 Subject: [PATCH 05/10] start scheduler as soon as server is started --- src/Server.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/Server.ts b/src/Server.ts index 38917ba..3b53c14 100644 --- a/src/Server.ts +++ b/src/Server.ts @@ -81,6 +81,10 @@ export default class Server extends (EventEmitter as new () => TypedEventEmitter } public start() { + this.scheduler.on("started", () => this.logger.debug("Scheduler started, freq=" + this.scheduler.frequency + "Hz")); + this.scheduler.on("paused", () => this.logger.debug("Scheduler paused, age=" + this.scheduler.age)); + this.scheduler.on("terminating", () => this.logger.debug("Scheduler terminated, age=" + this.scheduler.age)); + this.scheduler.start(); this.server.listen(this.config.port, () => this.emit("listening", this.config.port)); this.server.on("connection", this.onConnection.bind(this)); } From 26e3dcb90aa02e6367f80865ecfc8b4e2e28c5b4 Mon Sep 17 00:00:00 2001 From: Zefir Kirilov Date: Sun, 20 Aug 2023 17:07:49 +0300 Subject: [PATCH 06/10] fix scheduleAge bug not scheduling on target age --- src/Scheduler.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Scheduler.ts b/src/Scheduler.ts index 0ab7f9a..346db04 100644 --- a/src/Scheduler.ts +++ b/src/Scheduler.ts @@ -170,10 +170,10 @@ class Scheduler extends (EventEmitter as new () => TypedEventEmitter void, delay: number): Scheduler.Task { - const task = new Scheduler.Task(code, this.#age + delay, this); + public scheduleAge(code: () => void, targetAge: number): Scheduler.Task { + const task = new Scheduler.Task(code, targetAge, this); this.#tasks.push(task); return task; } From accc69bde7b5ac8e6a69137b7c4983db45ca6131 Mon Sep 17 00:00:00 2001 From: Zefir Kirilov Date: Sun, 20 Aug 2023 17:08:02 +0300 Subject: [PATCH 07/10] improve scheduler termination --- src/Scheduler.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/Scheduler.ts b/src/Scheduler.ts index 346db04..69b504a 100644 --- a/src/Scheduler.ts +++ b/src/Scheduler.ts @@ -82,10 +82,12 @@ class Scheduler extends (EventEmitter as new () => TypedEventEmitter { if (!this.#running) return Promise.resolve(false); this.#running = false; - this.#tasks.forEach(task => { + while (this.#tasks.length > 0) { + const task = this.#tasks.pop()!; task.emit("notPlanned"); - }); this.delete(task); + task.removeAllListeners(); + } this.emit("terminating"); return this.#schedulerStopPromise!; } From 0fd1d1555ef5aca79029f82ed457e526f81073af Mon Sep 17 00:00:00 2001 From: Zefir Kirilov Date: Sun, 20 Aug 2023 17:08:28 +0300 Subject: [PATCH 08/10] allow directly scheduling Scheduler.Task --- src/Scheduler.ts | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/Scheduler.ts b/src/Scheduler.ts index 69b504a..326278e 100644 --- a/src/Scheduler.ts +++ b/src/Scheduler.ts @@ -195,8 +195,19 @@ class Scheduler extends (EventEmitter as new () => TypedEventEmitter void): Scheduler.Task { - return this.scheduleTicks(code, 0); + public schedule(code: () => void): Scheduler.Task; + /** + * Schedule a task + * + * @param task The task + */ + public schedule(task: Scheduler.Task): Scheduler.Task; + public schedule(a: (() => void) | Scheduler.Task): Scheduler.Task { + if (a instanceof Scheduler.Task) { + this.#tasks.push(a); + return a; + } + else return this.scheduleTicks(a, 0); } /** From 61039cf3221987099f9d1b1311f84708923d82f1 Mon Sep 17 00:00:00 2001 From: Zefir Kirilov Date: Sun, 20 Aug 2023 17:09:09 +0300 Subject: [PATCH 09/10] remove listeners from task once its been executed by the scheduler --- src/Scheduler.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Scheduler.ts b/src/Scheduler.ts index 326278e..fa0c865 100644 --- a/src/Scheduler.ts +++ b/src/Scheduler.ts @@ -336,6 +336,7 @@ namespace Scheduler { public run(): void { this.code(); this.#executed = true; + this.removeAllListeners(); } } } From fdff6b266b7950ea2cf517e7d1ac39e0752bcf22 Mon Sep 17 00:00:00 2001 From: Zefir Kirilov Date: Sun, 20 Aug 2023 17:09:52 +0300 Subject: [PATCH 10/10] a repeating task class (Scheduler.RepeatingTask) --- src/Scheduler.ts | 122 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/src/Scheduler.ts b/src/Scheduler.ts index fa0c865..560bf66 100644 --- a/src/Scheduler.ts +++ b/src/Scheduler.ts @@ -339,6 +339,128 @@ namespace Scheduler { this.removeAllListeners(); } } + + type RepeatingTaskEvents = { + /** + * All repeats have been executed + */ + "completed": () => void; + } + + /** + * A repeating task + */ + export class RepeatingTask extends (EventEmitter as new () => TypedEventEmitter) { + /** + * Number of times the task will repeat. This may be `Infinity`, in which case the tasks repeats until the scheduler is terminated. + */ + public readonly repeats: number; + + /** + * Interval between each repeat + */ + public readonly interval: number; + + /** + * Target scheduler age (tick) for first execution + */ + public readonly targetAge: number; + + /** + * Task scheduler + */ + public readonly scheduler: Scheduler; + + /** + * Task code + */ + private readonly code: () => void; + + /** + * Current task + */ + #task: Task | null = null; + + /** + * Number of tasks that have been executed + */ + #executed: number = 0; + + /** + * Whether this repeating task has been cancelled + */ + #cancelled: boolean = false; + + /** + * Create repeating task + * + * @param code Task code + * @param interval Interval between each repeat + * @param scheduler Scheduler + * @param [repeats] Number of times the task will repeat. This may be `Infinity`, in which case the tasks repeats until the scheduler is terminated. Default: `Infinity` + * @param [targetAge] Target scheduler age (tick) for first execution. Default: `0` (next tick) + */ + public constructor(code: () => void, interval: number, scheduler: Scheduler, repeats: number = Infinity, targetAge: number = 0) { + super(); + this.code = code; + this.interval = interval; + this.scheduler = scheduler; + this.repeats = repeats; + this.targetAge = targetAge; + if (this.repeats > 0) this.createTask(); + + this.scheduler.on("terminating", () => { + //console.log(this.task?.executed, this.task); + if (this.task?.executed) this.emit("notPlanned"); + }); + } + + /** + * Cancel this repeating task + */ + public cancel(): void { + if (this.#cancelled) return; + this.#cancelled = true; + if (this.#task) this.#task.cancel(); + else this.emit("cancelled"); + } + + /** + * Create task + */ + private createTask(): void { + if (this.executed === 0) this.#task = this.scheduler.scheduleAge(() => this.taskCode(), this.targetAge); + else this.#task = this.scheduler.scheduleAge(() => this.taskCode(), this.scheduler.age + this.interval); + this.#task.once("cancelled", () => this.emit("cancelled")); + this.#task.once("notPlanned", () => this.emit("notPlanned")); + } + + /** + * Scheduled task code + */ + private taskCode(): void { + this.code(); + ++this.#executed; + if (this.#executed < this.repeats) { + if (!this.#cancelled) this.createTask(); + } + else this.emit("completed"); + } + + /** + * Get current task + */ + public get task(): Task | null { + return this.#task; + } + + /** + * Number of times the task has been executed + */ + public get executed(): number { + return this.#executed; + } + } } export default Scheduler;