Skip to content

Commit

Permalink
fix: set opaque on request (#560)
Browse files Browse the repository at this point in the history
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

## Release Notes

- **New Features**
	- Enhanced timing metrics for HTTP requests, now including `dnslookup`.
	- Improved logging format for better clarity in debug outputs.

- **Bug Fixes**
- Refined error handling for various timeout errors, improving
granularity in error reporting.
- Enhanced error handling in fetch operations with detailed logging of
request IDs and errors.

- **Tests**
- Updated test cases for fetch functionality to include timing
assertions and improved error handling checks.
	- Enhanced clarity in assertions within timing tests.

- **Chores**
	- Added a new symbol for representing internal opaque request values.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
fengmk2 authored Dec 7, 2024
1 parent ed6868b commit 8cf5c3b
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 25 deletions.
2 changes: 1 addition & 1 deletion src/HttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ export class HttpClient extends EventEmitter {
// socket assigned
queuing: 0,
// dns lookup time
// dnslookup: 0,
dnslookup: 0,
// socket connected
connected: 0,
// request headers sent
Expand Down
2 changes: 1 addition & 1 deletion src/Response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export type Timing = {
// socket assigned
queuing: number;
// dns lookup time
// dnslookup: number;
dnslookup: number;
// socket connected
connected: number;
// request headers sent
Expand Down
22 changes: 12 additions & 10 deletions src/diagnosticsChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ export function initDiagnosticsChannel() {
const opaque = getRequestOpaque(request, kHandler);
// ignore non HttpClient Request
if (!opaque || !opaque[symbols.kRequestId]) return;
debug('[%s] Request#%d %s %s, path: %s, headers: %o',

Reflect.set(request, symbols.kRequestInternalOpaque, opaque);
debug('[%s] Request#%d %s %s, path: %s, headers: %j',
name, opaque[symbols.kRequestId], request.method, request.origin, request.path, request.headers);
if (!opaque[symbols.kEnableRequestTiming]) return;
opaque[symbols.kRequestTiming].queuing = performanceTime(opaque[symbols.kRequestStartTime]);
Expand All @@ -114,10 +116,10 @@ export function initDiagnosticsChannel() {
sock[symbols.kSocketConnectProtocol] = connectParams.protocol;
sock[symbols.kSocketConnectHost] = connectParams.host;
sock[symbols.kSocketConnectPort] = connectParams.port;
debug('[%s] Socket#%d connectError, connectParams: %o, error: %s, (sock: %o)',
debug('[%s] Socket#%d connectError, connectParams: %j, error: %s, (sock: %j)',
name, sock[symbols.kSocketId], connectParams, (error as Error).message, formatSocket(sock));
} else {
debug('[%s] connectError, connectParams: %o, error: %o',
debug('[%s] connectError, connectParams: %j, error: %o',
name, connectParams, error);
}
});
Expand All @@ -136,13 +138,13 @@ export function initDiagnosticsChannel() {
socket[symbols.kSocketConnectProtocol] = connectParams.protocol;
socket[symbols.kSocketConnectHost] = connectParams.host;
socket[symbols.kSocketConnectPort] = connectParams.port;
debug('[%s] Socket#%d connected (sock: %o)', name, socket[symbols.kSocketId], formatSocket(socket));
debug('[%s] Socket#%d connected (sock: %j)', name, socket[symbols.kSocketId], formatSocket(socket));
});

// This message is published right before the first byte of the request is written to the socket.
subscribe('undici:client:sendHeaders', (message, name) => {
const { request, socket } = message as DiagnosticsChannel.ClientSendHeadersMessage & { socket: SocketExtend };
const opaque = getRequestOpaque(request, kHandler);
const opaque = Reflect.get(request, symbols.kRequestInternalOpaque);
if (!opaque || !opaque[symbols.kRequestId]) {
debug('[%s] opaque not found', name);
return;
Expand All @@ -151,7 +153,7 @@ export function initDiagnosticsChannel() {
(socket[symbols.kHandledRequests] as number)++;
// attach socket to opaque
opaque[symbols.kRequestSocket] = socket;
debug('[%s] Request#%d send headers on Socket#%d (handled %d requests, sock: %o)',
debug('[%s] Request#%d send headers on Socket#%d (handled %d requests, sock: %j)',
name, opaque[symbols.kRequestId], socket[symbols.kSocketId], socket[symbols.kHandledRequests],
formatSocket(socket));

Expand All @@ -167,7 +169,7 @@ export function initDiagnosticsChannel() {

subscribe('undici:request:bodySent', (message, name) => {
const { request } = message as DiagnosticsChannel.RequestBodySentMessage;
const opaque = getRequestOpaque(request, kHandler);
const opaque = Reflect.get(request, symbols.kRequestInternalOpaque);
if (!opaque || !opaque[symbols.kRequestId]) {
debug('[%s] opaque not found', name);
return;
Expand All @@ -181,7 +183,7 @@ export function initDiagnosticsChannel() {
// This message is published after the response headers have been received, i.e. the response has been completed.
subscribe('undici:request:headers', (message, name) => {
const { request, response } = message as DiagnosticsChannel.RequestHeadersMessage;
const opaque = getRequestOpaque(request, kHandler);
const opaque = Reflect.get(request, symbols.kRequestInternalOpaque);
if (!opaque || !opaque[symbols.kRequestId]) {
debug('[%s] opaque not found', name);
return;
Expand All @@ -191,7 +193,7 @@ export function initDiagnosticsChannel() {
const socket = opaque[symbols.kRequestSocket];
if (socket) {
socket[symbols.kHandledResponses]++;
debug('[%s] Request#%d get %s response headers on Socket#%d (handled %d responses, sock: %o)',
debug('[%s] Request#%d get %s response headers on Socket#%d (handled %d responses, sock: %j)',
name, opaque[symbols.kRequestId], response.statusCode, socket[symbols.kSocketId], socket[symbols.kHandledResponses],
formatSocket(socket));
} else {
Expand All @@ -206,7 +208,7 @@ export function initDiagnosticsChannel() {
// This message is published after the response body and trailers have been received, i.e. the response has been completed.
subscribe('undici:request:trailers', (message, name) => {
const { request } = message as DiagnosticsChannel.RequestTrailersMessage;
const opaque = getRequestOpaque(request, kHandler);
const opaque = Reflect.get(request, symbols.kRequestInternalOpaque);
if (!opaque || !opaque[symbols.kRequestId]) {
debug('[%s] opaque not found', name);
return;
Expand Down
13 changes: 9 additions & 4 deletions src/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { AsyncLocalStorage } from 'node:async_hooks';
import { debuglog } from 'node:util';
import {
fetch as UndiciFetch,
RequestInfo,
Expand Down Expand Up @@ -41,6 +42,8 @@ import { RawResponseWithMeta, SocketInfo } from './Response.js';
import { IncomingHttpHeaders } from './IncomingHttpHeaders.js';
import { BaseAgent, BaseAgentOptions } from './BaseAgent.js';

const debug = debuglog('urllib:fetch');

export interface UrllibRequestInit extends RequestInit {
// default is true
timing?: boolean;
Expand Down Expand Up @@ -137,7 +140,7 @@ export class FetchFactory {
// socket assigned
queuing: 0,
// dns lookup time
// dnslookup: 0,
dnslookup: 0,
// socket connected
connected: 0,
// request headers sent
Expand Down Expand Up @@ -218,8 +221,9 @@ export class FetchFactory {
res = await UndiciFetch(input, init);
});
} catch (e: any) {
updateSocketInfo(socketInfo, internalOpaque /* , rawError */);
updateSocketInfo(socketInfo, internalOpaque, e);
urllibResponse.rt = performanceTime(requestStartTime);
debug('Request#%d throw error: %s', requestId, e);
channels.fetchResponse.publish({
fetch: fetchMeta,
error: e,
Expand All @@ -234,7 +238,7 @@ export class FetchFactory {

// get undici internal response
const state = getResponseState(res!);
updateSocketInfo(socketInfo, internalOpaque /* , rawError */);
updateSocketInfo(socketInfo, internalOpaque);

urllibResponse.headers = convertHeader(res!.headers);
urllibResponse.status = urllibResponse.statusCode = res!.status;
Expand All @@ -243,7 +247,8 @@ export class FetchFactory {
urllibResponse.size = parseInt(urllibResponse.headers['content-length']);
}
urllibResponse.rt = performanceTime(requestStartTime);

debug('Request#%d got response, status: %s, headers: %j, timing: %j, socket: %j',
requestId, urllibResponse.status, urllibResponse.headers, timing, urllibResponse.socket);
channels.fetchResponse.publish({
fetch: fetchMeta,
timingInfo: state.timingInfo,
Expand Down
1 change: 1 addition & 0 deletions src/symbols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ export default {
kEnableRequestTiming: Symbol('enable request timing or not'),
kRequestTiming: Symbol('request timing'),
kRequestOriginalOpaque: Symbol('request original opaque'),
kRequestInternalOpaque: Symbol('request internal opaque'),
kErrorSocket: Symbol('socket of error'),
};
24 changes: 18 additions & 6 deletions test/fetch.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import assert from 'node:assert/strict';
import diagnosticsChannel from 'node:diagnostics_channel';
import { setTimeout as sleep } from 'node:timers/promises';
import { describe, it, beforeAll, afterAll } from 'vitest';
import { startServer } from './fixtures/server.js';
import {
Expand All @@ -20,7 +21,6 @@ describe('fetch.test.ts', () => {
await close();
});


it('fetch should work', async () => {
let requestDiagnosticsMessage: RequestDiagnosticsMessage;
let responseDiagnosticsMessage: ResponseDiagnosticsMessage;
Expand All @@ -40,19 +40,27 @@ describe('fetch.test.ts', () => {
});
FetchFactory.setClientOptions({});

const response = await fetch(`${_url}html`);
let response = await fetch(`${_url}html`);

assert(response);
assert(requestDiagnosticsMessage!.request);
assert(responseDiagnosticsMessage!.request);
assert(responseDiagnosticsMessage!.response);
assert(responseDiagnosticsMessage!.response.socket.localAddress);
assert([ '127.0.0.1', '::1' ].includes(responseDiagnosticsMessage!.response.socket.localAddress));

assert(fetchDiagnosticsMessage!.fetch);
assert(fetchResponseDiagnosticsMessage!.fetch);
assert(fetchResponseDiagnosticsMessage!.response);
assert(fetchResponseDiagnosticsMessage!.timingInfo);

await sleep(1);
// again, keep alive
response = await fetch(`${_url}html`);
// console.log(responseDiagnosticsMessage!.response.socket);
assert(responseDiagnosticsMessage!.response.socket.handledRequests > 1);
assert(responseDiagnosticsMessage!.response.socket.handledResponses > 1);

const stats = FetchFactory.getDispatcherPoolStats();
assert(stats);
assert(Object.keys(stats).length > 0);
Expand All @@ -77,17 +85,21 @@ describe('fetch.test.ts', () => {
});
FetchFactory.setClientOptions({});

try {
await assert.rejects(async () => {
await fetch(`${_url}html?timeout=9999`, {
signal: AbortSignal.timeout(100),
});
} catch (error) {
console.log(error);
}
}, (err: any) => {
assert.equal(err.name, 'TimeoutError');
assert.equal(err.message, 'The operation was aborted due to timeout');
return true;
});

assert(requestDiagnosticsMessage!.request);
assert(responseDiagnosticsMessage!.request);
assert(responseDiagnosticsMessage!.response);
// console.log(responseDiagnosticsMessage!.response.socket);
assert(responseDiagnosticsMessage!.response.socket.localAddress);
assert([ '127.0.0.1', '::1' ].includes(responseDiagnosticsMessage!.response.socket.localAddress));

assert(fetchDiagnosticsMessage!.fetch);
Expand Down
7 changes: 4 additions & 3 deletions test/options.timing.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ describe('options.timing.test.ts', () => {
assert(res.timing.contentDownload > 0);
assert(res.timing.contentDownload > res.timing.waiting);
assert(res.timing.contentDownload <= res.rt);
assert(res.socket.handledResponses === 1);
assert.equal(res.socket.handledResponses, 1);

// again connected should be zero
await sleep(1);
Expand All @@ -45,13 +45,14 @@ describe('options.timing.test.ts', () => {
res = response.res as RawResponseWithMeta;
assert(res.timing.waiting > 0);
assert(res.timing.queuing > 0);
assert(res.timing.connected === 0);
assert.equal(res.timing.connected, 0);
assert(res.timing.requestHeadersSent > 0);
assert(res.timing.requestSent > 0);
assert(res.timing.contentDownload > 0);
assert(res.timing.contentDownload > res.timing.waiting);
assert(res.timing.contentDownload <= res.rt);
assert(res.socket.handledResponses === 2);
assert.equal(res.socket.handledResponses, 2);
// console.log(res.timing);
});

it('should timing = false work', async () => {
Expand Down

0 comments on commit 8cf5c3b

Please sign in to comment.