Skip to content

Commit

Permalink
Fix stream destroying
Browse files Browse the repository at this point in the history
`SubprocessStream` wasn't getting properly destroyed.
  • Loading branch information
Zarel committed Jun 19, 2020
1 parent 2505ce1 commit dffd9db
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 27 deletions.
39 changes: 20 additions & 19 deletions lib/process-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,45 +23,46 @@ export const processManagers: ProcessManager[] = [];
export const disabled = false;

class SubprocessStream extends Streams.ObjectReadWriteStream<string> {
process: ChildProcess;
process: StreamProcessWrapper;
taskId: number;
constructor(process: ChildProcess, taskId: number) {
constructor(process: StreamProcessWrapper, taskId: number) {
super();
this.process = process;
this.taskId = taskId;
this.process.send(`${taskId}\nNEW`);
this.process.process.send(`${taskId}\nNEW`);
}
_write(message: string) {
if (!this.process.connected) {
if (!this.process.process.connected) {
this.pushError(new Error(`Process disconnected (possibly crashed?)`));
return;
}
this.process.send(`${this.taskId}\nWRITE\n${message}`);
this.process.process.send(`${this.taskId}\nWRITE\n${message}`);
// responses are handled in ProcessWrapper
}
_writeEnd() {
this.process.send(`${this.taskId}\nWRITEEND`);
this.process.process.send(`${this.taskId}\nWRITEEND`);
}
_destroy() {
if (!this.process.connected) return;
this.process.send(`${this.taskId}\nDESTROY`);
if (!this.process.process.connected) return;
this.process.process.send(`${this.taskId}\nDESTROY`);
this.process.deleteStream(this.taskId);
this.process = null!;
}
}

class RawSubprocessStream extends Streams.ObjectReadWriteStream<string> {
process: ChildProcess & {process: undefined} | Worker;
constructor(process: ChildProcess | Worker) {
process: RawProcessWrapper;
constructor(process: RawProcessWrapper) {
super();
this.process = process as any;
this.process = process;
}
_write(message: string) {
const isConnected = (this.process.process ? this.process.process : this.process).connected;
if (!isConnected) {
if (!this.process.getProcess().connected) {
// no error because the crash handler should already have shown an error, and
// sometimes harmless messages are sent during cleanup
return;
}
this.process.send(message);
this.process.process.send(message);
// responses are handled in ProcessWrapper
}
}
Expand Down Expand Up @@ -194,7 +195,7 @@ export class StreamProcessWrapper implements ProcessWrapper {

const taskId = parseInt(message.slice(0, nlLoc));
const stream = this.activeStreams.get(taskId);
if (!stream) throw new Error(`Invalid taskId ${message.slice(0, nlLoc)}`);
if (!stream) return; // stream already destroyed

message = message.slice(nlLoc + 1);
nlLoc = message.indexOf('\n');
Expand Down Expand Up @@ -235,7 +236,7 @@ export class StreamProcessWrapper implements ProcessWrapper {
createStream(): SubprocessStream {
this.taskId++;
const taskId = this.taskId;
const stream = new SubprocessStream(this.process, taskId);
const stream = new SubprocessStream(this, taskId);
this.activeStreams.set(taskId, stream);
return stream;
}
Expand Down Expand Up @@ -316,7 +317,7 @@ export class RawProcessWrapper implements ProcessWrapper, StreamWorker {
this.stream.push(message);
});

this.stream = new RawSubprocessStream(this.process);
this.stream = new RawSubprocessStream(this);
}

getProcess() {
Expand Down Expand Up @@ -432,9 +433,9 @@ export abstract class ProcessManager {
this.releasingProcesses = this.releasingProcesses.concat(processes);
return Promise.all(released);
}
spawn(count = 1) {
spawn(count = 1, force?: boolean) {
if (!this.isParentProcess) return;
if (disabled) return;
if (disabled && !force) return;
while (this.processes.length < count) {
const process = this.createProcess();
process.process.on('disconnect', () => this.releaseCrashed(process));
Expand Down
8 changes: 4 additions & 4 deletions sim/battle-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ export class BattleStream extends Streams.ObjectReadWriteStream<string> {
}
}

_end() {
// this is in theory synchronous...
this.pushEnd();
_writeEnd() {
// if battle already ended, we don't need to pushEnd.
if (!this.atEOF) this.pushEnd();
this._destroy();
}

Expand Down Expand Up @@ -274,7 +274,7 @@ export class BattleTextStream extends Streams.ReadWriteStream {
}
}

_end() {
_writeEnd() {
return this.battleStream.writeEnd();
}
}
8 changes: 4 additions & 4 deletions sim/tools/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,11 @@ class DualStream {
this.compare();
}

end() {
// We need to compare first because _end() destroys the battle object.
writeEnd() {
// We need to compare first because _writeEnd() destroys the battle object.
this.compare(true);
this.control._end();
this.test._end();
this.control._writeEnd();
this.test._writeEnd();
}

compare(end?: boolean) {
Expand Down
40 changes: 40 additions & 0 deletions test/server/room-battle.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,44 @@ describe('Simulator abstraction layer features', function () {
});
});
});

describe('BattleStream', function () {
it('should work (slow)', async function () {
Config.simulatorprocesses = 1;
const PM = require('../../.server-dist/room-battle').PM;
assert.equal(PM.processes.length, 0);
PM.spawn(1, true);
assert.equal(PM.processes[0].load, 0);

const stream = PM.createStream();
assert.equal(PM.processes[0].load, 1);
stream.write(
'>version a2393dfd2a2da5594148bf99eea514e72b136c2c\n' +
'>start {"formatid":"gen8randombattle","seed":[9619,36790,28450,62465],"rated":"Rated battle"}\n' +
'>player p1 {"name":"p1","avatar":"ethan","team":"","rating":1507,"seed":[59512,58581,51338,7861]}\n' +
'>player p2 {"name":"p2","avatar":"dawn","team":"","rating":1447,"seed":[33758,53485,62378,29757]}\n' +
'>p1 move 1\n' +
'>p2 move 1\n'
);
assert((await stream.read()).includes('|move|'));
stream.destroy();
assert.equal(PM.processes[0].load, 0);

const stream2 = PM.createStream();
assert.equal(PM.processes[0].load, 1);
stream2.write(
'>version a2393dfd2a2da5594148bf99eea514e72b136c2c\n' +
'>start {"formatid":"gen8randombattle","seed":[9619,36790,28450,62465],"rated":"Rated battle"}\n' +
'>player p1 {"name":"p1","avatar":"ethan","team":"","rating":1507,"seed":[59512,58581,51338,7861]}\n' +
'>player p2 {"name":"p2","avatar":"dawn","team":"","rating":1447,"seed":[33758,53485,62378,29757]}\n' +
'>p1 move 1\n' +
'>p2 move 1\n'
);
assert((await stream2.read()).includes('|move|'));
stream2.writeEnd();
await stream2.readAll();
assert.equal(PM.processes[0].load, 0);
PM.unspawn();
});
});
});

0 comments on commit dffd9db

Please sign in to comment.