diff --git a/lib/process-manager.ts b/lib/process-manager.ts index b28d789086d9..3c31f2faa123 100644 --- a/lib/process-manager.ts +++ b/lib/process-manager.ts @@ -23,45 +23,46 @@ export const processManagers: ProcessManager[] = []; export const disabled = false; class SubprocessStream extends Streams.ObjectReadWriteStream { - process: ChildProcess; + process: StreamProcessWrapper; taskId: number; - constructor(process: ChildProcess, taskId: number) { + constructor(process: StreamProcessWrapper, taskId: number) { super(); this.process = process; this.taskId = taskId; - this.process.send(`${taskId}\nNEW`); + this.process.process.send(`${taskId}\nNEW`); } _write(message: string) { - if (!this.process.connected) { + if (!this.process.process.connected) { this.pushError(new Error(`Process disconnected (possibly crashed?)`)); return; } - this.process.send(`${this.taskId}\nWRITE\n${message}`); + this.process.process.send(`${this.taskId}\nWRITE\n${message}`); // responses are handled in ProcessWrapper } _writeEnd() { - this.process.send(`${this.taskId}\nWRITEEND`); + this.process.process.send(`${this.taskId}\nWRITEEND`); } _destroy() { - if (!this.process.connected) return; - this.process.send(`${this.taskId}\nDESTROY`); + if (!this.process.process.connected) return; + this.process.process.send(`${this.taskId}\nDESTROY`); + this.process.deleteStream(this.taskId); + this.process = null!; } } class RawSubprocessStream extends Streams.ObjectReadWriteStream { - process: ChildProcess & {process: undefined} | Worker; - constructor(process: ChildProcess | Worker) { + process: RawProcessWrapper; + constructor(process: RawProcessWrapper) { super(); - this.process = process as any; + this.process = process; } _write(message: string) { - const isConnected = (this.process.process ? this.process.process : this.process).connected; - if (!isConnected) { + if (!this.process.getProcess().connected) { // no error because the crash handler should already have shown an error, and // sometimes harmless messages are sent during cleanup return; } - this.process.send(message); + this.process.process.send(message); // responses are handled in ProcessWrapper } } @@ -194,7 +195,7 @@ export class StreamProcessWrapper implements ProcessWrapper { const taskId = parseInt(message.slice(0, nlLoc)); const stream = this.activeStreams.get(taskId); - if (!stream) throw new Error(`Invalid taskId ${message.slice(0, nlLoc)}`); + if (!stream) return; // stream already destroyed message = message.slice(nlLoc + 1); nlLoc = message.indexOf('\n'); @@ -235,7 +236,7 @@ export class StreamProcessWrapper implements ProcessWrapper { createStream(): SubprocessStream { this.taskId++; const taskId = this.taskId; - const stream = new SubprocessStream(this.process, taskId); + const stream = new SubprocessStream(this, taskId); this.activeStreams.set(taskId, stream); return stream; } @@ -316,7 +317,7 @@ export class RawProcessWrapper implements ProcessWrapper, StreamWorker { this.stream.push(message); }); - this.stream = new RawSubprocessStream(this.process); + this.stream = new RawSubprocessStream(this); } getProcess() { @@ -432,9 +433,9 @@ export abstract class ProcessManager { this.releasingProcesses = this.releasingProcesses.concat(processes); return Promise.all(released); } - spawn(count = 1) { + spawn(count = 1, force?: boolean) { if (!this.isParentProcess) return; - if (disabled) return; + if (disabled && !force) return; while (this.processes.length < count) { const process = this.createProcess(); process.process.on('disconnect', () => this.releaseCrashed(process)); diff --git a/sim/battle-stream.ts b/sim/battle-stream.ts index 3f9c825405e4..01d394b3e447 100644 --- a/sim/battle-stream.ts +++ b/sim/battle-stream.ts @@ -117,9 +117,9 @@ export class BattleStream extends Streams.ObjectReadWriteStream { } } - _end() { - // this is in theory synchronous... - this.pushEnd(); + _writeEnd() { + // if battle already ended, we don't need to pushEnd. + if (!this.atEOF) this.pushEnd(); this._destroy(); } @@ -274,7 +274,7 @@ export class BattleTextStream extends Streams.ReadWriteStream { } } - _end() { + _writeEnd() { return this.battleStream.writeEnd(); } } diff --git a/sim/tools/runner.ts b/sim/tools/runner.ts index e5a13dcbbfdd..893dff73e1d9 100644 --- a/sim/tools/runner.ts +++ b/sim/tools/runner.ts @@ -175,11 +175,11 @@ class DualStream { this.compare(); } - end() { - // We need to compare first because _end() destroys the battle object. + writeEnd() { + // We need to compare first because _writeEnd() destroys the battle object. this.compare(true); - this.control._end(); - this.test._end(); + this.control._writeEnd(); + this.test._writeEnd(); } compare(end?: boolean) { diff --git a/test/server/room-battle.js b/test/server/room-battle.js index 5214d7ec535c..44ed4a74d03c 100644 --- a/test/server/room-battle.js +++ b/test/server/room-battle.js @@ -34,4 +34,44 @@ describe('Simulator abstraction layer features', function () { }); }); }); + + describe('BattleStream', function () { + it('should work (slow)', async function () { + Config.simulatorprocesses = 1; + const PM = require('../../.server-dist/room-battle').PM; + assert.equal(PM.processes.length, 0); + PM.spawn(1, true); + assert.equal(PM.processes[0].load, 0); + + const stream = PM.createStream(); + assert.equal(PM.processes[0].load, 1); + stream.write( + '>version a2393dfd2a2da5594148bf99eea514e72b136c2c\n' + + '>start {"formatid":"gen8randombattle","seed":[9619,36790,28450,62465],"rated":"Rated battle"}\n' + + '>player p1 {"name":"p1","avatar":"ethan","team":"","rating":1507,"seed":[59512,58581,51338,7861]}\n' + + '>player p2 {"name":"p2","avatar":"dawn","team":"","rating":1447,"seed":[33758,53485,62378,29757]}\n' + + '>p1 move 1\n' + + '>p2 move 1\n' + ); + assert((await stream.read()).includes('|move|')); + stream.destroy(); + assert.equal(PM.processes[0].load, 0); + + const stream2 = PM.createStream(); + assert.equal(PM.processes[0].load, 1); + stream2.write( + '>version a2393dfd2a2da5594148bf99eea514e72b136c2c\n' + + '>start {"formatid":"gen8randombattle","seed":[9619,36790,28450,62465],"rated":"Rated battle"}\n' + + '>player p1 {"name":"p1","avatar":"ethan","team":"","rating":1507,"seed":[59512,58581,51338,7861]}\n' + + '>player p2 {"name":"p2","avatar":"dawn","team":"","rating":1447,"seed":[33758,53485,62378,29757]}\n' + + '>p1 move 1\n' + + '>p2 move 1\n' + ); + assert((await stream2.read()).includes('|move|')); + stream2.writeEnd(); + await stream2.readAll(); + assert.equal(PM.processes[0].load, 0); + PM.unspawn(); + }); + }); });