Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js: Implement Server#drain #2616

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions packages/grpc-js/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,50 @@ export class Server {
}
}

/**
* Gracefully close all connections associated with a previously bound port.
* After the grace time, forcefully close all remaining open connections.
*
* If port 0 was bound, only the actual bound port can be
* drained. For example, if bindAsync was called with "localhost:0" and the
* bound port result was 54321, it can be drained as "localhost:54321".
* @param port
* @param graceTimeMs
* @returns
*/
drain(port: string, graceTimeMs: number): void {
this.trace('drain port=' + port + ' graceTimeMs=' + graceTimeMs);
const portUri = this.normalizePort(port);
const splitPort = splitHostPort(portUri.path);
if (splitPort?.port === 0) {
throw new Error('Cannot drain port 0');
}
const boundPortObject = this.boundPorts.get(uriToString(portUri));
if (!boundPortObject) {
return;
}
const allSessions: Set<http2.Http2Session> = new Set();
for (const http2Server of boundPortObject.listeningServers) {
const serverEntry = this.http2Servers.get(http2Server);
if (!serverEntry) {
continue;
}
for (const session of serverEntry.sessions) {
allSessions.add(session);
this.closeSession(session, () => {
allSessions.delete(session);
});
}
}
/* After the grace time ends, send another goaway to all remaining sessions
* with the CANCEL code. */
setTimeout(() => {
for (const session of allSessions) {
session.destroy(http2.constants.NGHTTP2_CANCEL as any);
}
}, graceTimeMs).unref?.();
}

forceShutdown(): void {
for (const boundPortObject of this.boundPorts.values()) {
boundPortObject.cancelled = true;
Expand Down
58 changes: 58 additions & 0 deletions packages/grpc-js/test/test-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,64 @@ describe('Server', () => {
});
});

describe.only('drain', () => {
let client: ServiceClient;
let portNumber: number;
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
const echoService = loadProtoFile(protoFile)
.EchoService as ServiceClientConstructor;

const serviceImplementation = {
echo(call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>) {
callback(null, call.request);
},
echoBidiStream(call: ServerDuplexStream<any, any>) {
call.on('data', data => {
call.write(data);
});
call.on('end', () => {
call.end();
});
},
};

beforeEach(done => {
server.addService(echoService.service, serviceImplementation);

server.bindAsync(
'localhost:0',
ServerCredentials.createInsecure(),
(err, port) => {
assert.ifError(err);
portNumber = port;
client = new echoService(
`localhost:${port}`,
grpc.credentials.createInsecure()
);
server.start();
done();
}
);
});

afterEach(done => {
client.close();
server.tryShutdown(done);
});

it('Should cancel open calls after the grace period ends', done => {
const call = client.echoBidiStream();
call.on('error', (error: ServiceError) => {
assert.strictEqual(error.code, grpc.status.CANCELLED);
done();
});
call.on('data', () => {
server.drain(`localhost:${portNumber!}`, 100);
});
call.write({value: 'abc'});
});
});

describe('start', () => {
let server: Server;

Expand Down
Loading