-
-
Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathagent.ts
131 lines (126 loc) · 3.51 KB
/
agent.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// Copyright 2019-2020 Yusuke Sakurai. All rights reserved. MIT license.
import { ClientResponse, HttpBody } from "./server.ts";
import { assert } from "./vendor/https/deno.land/std/testing/asserts.ts";
import { readResponse, writeRequest } from "./serveio.ts";
import { deferred } from "./vendor/https/deno.land/std/async/mod.ts";
import { BufReader, BufWriter } from "./vendor/https/deno.land/std/io/bufio.ts";
import { ConnectionClosedError, UnexpectedEofError } from "./error.ts";
/** keep-alive http agent for single host. each message will be sent in serial */
export interface Agent {
/** Hostname of host. deno.land of deno.land:80 */
hostname: string;
/** Port of host. 80 of deno.land:80 */
port: number;
/** send request to host. it throws EOF if conn is closed */
send(opts: AgentSendOptions): Promise<ClientResponse>;
/** tcp connection for http agent */
conn: Deno.Conn;
}
export interface AgentOptions {
cancel?: Promise<void>;
timeout?: number; // ms
}
/** http agent send options */
export interface AgentSendOptions {
/** relative path that continues after base url. must begin with /. include queries, hash */
path: string;
/** http method. */
method: string;
/** http headers */
headers?: Headers;
/** http body */
body?: HttpBody;
}
const kPortMap = {
"http:": 80,
"https:": 443,
};
export function createAgent(
baseUrl: string,
opts: AgentOptions = {},
): Agent {
let connected = false;
let connecting = false;
let _conn: Deno.Conn;
let connectDeferred = deferred<void>();
let bufReader: BufReader;
let bufWriter: BufWriter;
const url = new URL(baseUrl);
assert(
url.protocol === "http:" || url.protocol === "https:",
`scheme must be http or https: ${url.protocol}`,
);
const hostname = url.hostname;
let port = url.port ? parseInt(url.port) : kPortMap[url.protocol];
assert(port !== void 0, `unexpected protocol: ${url.protocol}`);
const connect = async () => {
if (connected) return;
if (connecting) return connectDeferred;
connecting = true;
const opts: Deno.ConnectOptions = {
port,
transport: "tcp",
};
if (url.hostname) {
opts.hostname = hostname;
}
if (url.protocol === "http:") {
_conn = await Deno.connect(opts);
} else {
_conn = await Deno.connectTls(opts);
}
bufReader = new BufReader(_conn);
bufWriter = new BufWriter(_conn);
connected = true;
connecting = false;
connectDeferred.resolve();
};
let prevResponse: ClientResponse;
let sending = false;
async function send(
sendOptions: AgentSendOptions,
): Promise<ClientResponse> {
if (sending) {
throw new Error("It is not able to send http request concurrently");
}
sending = true;
if (!connected) {
await connect();
}
const { path, method, headers, body } = sendOptions;
const destUrl = new URL(path, url);
try {
if (prevResponse) {
await prevResponse.body.close();
}
await writeRequest(bufWriter, {
url: destUrl.toString(),
method,
headers,
body,
});
const res = await readResponse(bufReader, opts);
return (prevResponse = Object.assign(res, {
bufWriter,
bufReader,
conn: _conn,
}));
} catch (e) {
if (e instanceof UnexpectedEofError) {
throw new ConnectionClosedError();
} else {
throw e;
}
} finally {
sending = false;
}
}
return {
hostname,
port,
send,
get conn() {
return _conn;
},
};
}